TcpConnection.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Tcp_pch.h"
00009 #include "TcpConnection.h"
00010 #include "TcpTransport.h"
00011 #include "TcpInst.h"
00012 #include "TcpDataLink.h"
00013 #include "TcpReceiveStrategy.h"
00014 #include "TcpSendStrategy.h"
00015 #include "TcpReconnectTask.h"
00016 #include "dds/DCPS/transport/framework/DirectPriorityMapper.h"
00017 #include "dds/DCPS/transport/framework/PriorityKey.h"
00018 
00019 #include "ace/os_include/netinet/os_tcp.h"
00020 #include "ace/OS_NS_arpa_inet.h"
00021 #include <sstream>
00022 #include <string>
00023 
00024 #if !defined (__ACE_INLINE__)
00025 #include "TcpConnection.inl"
00026 #endif /* __ACE_INLINE__ */
00027 
00028 // The connection lost can be detected by both send and receive strategy. When
00029 // that happens, both of them add a request to the reconnect task. The reconnect
00030 // will be attempted when the first request is dequeued and the second request
00031 // just look the state to determine if the connection is good. To distinguish
00032 // if the request is queued because the lost connection is detected by different
00033 // threads or is because the re-established connection lost again, we need the
00034 // reconnect_delay to help to identify these two cases so we can reset the reconnect
00035 // state to trigger reconnecting after a re-established connection is lost.
00036 
00037 // The reconnect delay is the period from the last time the reconnect attempt
00038 // completes to when the reconnect request is dequeued.
00039 const ACE_Time_Value reconnect_delay(2);
00040 
00041 OpenDDS::DCPS::TcpConnection::TcpConnection()
00042   : connected_(false)
00043   , is_connector_(false)
00044   , passive_reconnect_timer_id_(-1)
00045   , reconnect_task_(this)
00046   , reconnect_state_(INIT_STATE)
00047   , last_reconnect_attempted_(ACE_Time_Value::zero)
00048   , transport_priority_(0)  // TRANSPORT_PRIORITY.value default value - 0.
00049   , shutdown_(false)
00050   , passive_setup_(false)
00051   , passive_setup_buffer_(sizeof(ACE_UINT32))
00052   , id_(0)
00053 {
00054   DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00055 
00056   if (this->reconnect_task_.open()) {
00057     ACE_ERROR((LM_ERROR,
00058                ACE_TEXT("(%P|%t) ERROR: Reconnect task failed to open : %p\n"),
00059                ACE_TEXT("open")));
00060   }
00061 
00062 }
00063 
00064 OpenDDS::DCPS::TcpConnection::TcpConnection(const ACE_INET_Addr& remote_address,
00065                                             Priority priority,
00066                                             const TcpInst_rch& config)
00067   : connected_(false)
00068   , is_connector_(true)
00069   , remote_address_(remote_address)
00070   , local_address_(config->local_address())
00071   , tcp_config_(config)
00072   , passive_reconnect_timer_id_(-1)
00073   , reconnect_task_(this)
00074   , reconnect_state_(INIT_STATE)
00075   , last_reconnect_attempted_(ACE_Time_Value::zero)
00076   , transport_priority_(priority)
00077   , shutdown_(false)
00078   , passive_setup_(false)
00079 {
00080   DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00081 
00082   // Open the reconnect task
00083   if (this->reconnect_task_.open()) {
00084     ACE_ERROR((LM_ERROR,
00085                ACE_TEXT("(%P|%t) ERROR: Reconnect task failed to open : %p\n"),
00086                ACE_TEXT("open")));
00087   }
00088 
00089 }
00090 OpenDDS::DCPS::TcpConnection::~TcpConnection()
00091 {
00092   DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6);
00093 
00094   // The Reconnect task belongs to the Connection object.
00095   // Cleanup before leaving the house.
00096   this->reconnect_task_.close(1);
00097   //this->reconnect_task_.wait ();
00098 
00099   if (!this->link_.is_nil()) {
00100     if (Transport_debug_level > 5) {
00101       ACE_DEBUG((LM_DEBUG,
00102                  ACE_TEXT("(%P|%t) TcpConnection::~TcpConnection: about to notify link[%@] connection deleted\n"),
00103                  this->link_.in()));
00104     }
00105     this->link_->notify_connection_deleted();
00106   }
00107 
00108 }
00109 
00110 void
00111 OpenDDS::DCPS::TcpConnection::disconnect()
00112 {
00113   DBG_ENTRY_LVL("TcpConnection","disconnect",6);
00114   this->connected_ = false;
00115 
00116   if (!this->receive_strategy_.is_nil()) {
00117     this->receive_strategy_->get_reactor()->remove_handler(this,
00118                                                            ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00119   }
00120 
00121   this->peer().close();
00122 
00123 }
00124 
00125 // This can not be inlined due to circular dependencies disallowing
00126 // visibility into the receive strategy to call add_ref().  Oh well.
00127 void
00128 OpenDDS::DCPS::TcpConnection::set_receive_strategy
00129 (TcpReceiveStrategy* receive_strategy)
00130 {
00131   DBG_ENTRY_LVL("TcpConnection","set_receive_strategy",6);
00132 
00133   // Make a "copy" for ourselves
00134   receive_strategy->_add_ref();
00135   this->receive_strategy_ = receive_strategy;
00136 }
00137 
00138 void
00139 OpenDDS::DCPS::TcpConnection::set_send_strategy
00140 (TcpSendStrategy* send_strategy)
00141 {
00142   DBG_ENTRY_LVL("TcpConnection","set_send_strategy",6);
00143 
00144   // Make a "copy" for ourselves
00145   send_strategy->_add_ref();
00146   this->send_strategy_ = send_strategy;
00147 }
00148 
00149 int
00150 OpenDDS::DCPS::TcpConnection::open(void* arg)
00151 {
00152   DBG_ENTRY_LVL("TcpConnection","open",6);
00153 
00154   if (is_connector_) {
00155 
00156     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::open active.\n"), 2);
00157     // Take over the refcount from TcpTransport::connect_datalink().
00158     const TcpConnection_rch self(this);
00159     const TcpTransport_rch transport = link_->get_transport_impl();
00160 
00161     const bool is_loop(local_address_ == remote_address_);
00162     const PriorityKey key(transport_priority_, remote_address_,
00163                           is_loop, false /* !active */);
00164 
00165     int active_open_ = active_open();
00166 
00167     int connect_tcp_datalink_ = transport->connect_tcp_datalink(link_, self);
00168 
00169     if (active_open_ == -1 || connect_tcp_datalink_ == -1) {
00170       // if (active_open() == -1 ||
00171       //       transport->connect_tcp_datalink(link_, self) == -1) {
00172 
00173       transport->async_connect_failed(key);
00174 
00175       return -1;
00176     }
00177 
00178     return 0;
00179   }
00180 
00181   // The passed-in arg is really the acceptor object that created this
00182   // TcpConnection object, and is also the caller of this open()
00183   // method.  We need to cast the arg to the TcpAcceptor* type.
00184   TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg);
00185 
00186   if (acceptor == 0) {
00187     // The cast failed.
00188     ACE_ERROR_RETURN((LM_ERROR,
00189                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00190                       ACE_TEXT("failed to cast void* arg to ")
00191                       ACE_TEXT("TcpAcceptor* type.\n")),
00192                      -1);
00193   }
00194 
00195   // Now we need to ask the TcpAcceptor object to provide us with
00196   // a pointer to the TcpTransport object that "owns" the acceptor.
00197   TcpTransport_rch transport = acceptor->transport();
00198 
00199   if (transport.is_nil()) {
00200     // The acceptor gave us a nil transport (smart) pointer.
00201     ACE_ERROR_RETURN((LM_ERROR,
00202                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00203                       ACE_TEXT("acceptor's transport is nil.\n")),
00204                      -1);
00205   }
00206 
00207   TcpInst* tcp_config = acceptor->get_configuration();
00208 
00209   // Keep a "copy" of the reference to TcpInst object
00210   // for ourselves.
00211   tcp_config->_add_ref();
00212   tcp_config_ = tcp_config;
00213   local_address_ = tcp_config_->local_address();
00214 
00215   set_sock_options(tcp_config_.in());
00216 
00217   // We expect that the active side of the connection (the remote side
00218   // in this case) will supply its listening ACE_INET_Addr as the first
00219   // message it sends to the socket.  This is a one-way connection
00220   // establishment protocol message.
00221   passive_setup_ = true;
00222   transport_during_setup_ = transport;
00223   passive_setup_buffer_.size(sizeof(ACE_UINT32));
00224 
00225   if (reactor()->register_handler(this, READ_MASK) == -1) {
00226     ACE_ERROR_RETURN((LM_ERROR,
00227                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00228                       ACE_TEXT("unable to register with the reactor.%p\n"),
00229                       ACE_TEXT("register_handler")),
00230                      -1);
00231   }
00232 
00233   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::open passive handle=%d.\n",
00234             static_cast<int>(intptr_t(get_handle()))), 2);
00235 
00236   return 0;
00237 }
00238 
00239 int
00240 OpenDDS::DCPS::TcpConnection::handle_setup_input(ACE_HANDLE /*h*/)
00241 {
00242   const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(),
00243                                   passive_setup_buffer_.space(),
00244                                   &ACE_Time_Value::zero);
00245 
00246   if (ret < 0 && errno == ETIME) {
00247     return 0;
00248   }
00249 
00250   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input %@ "
00251             "recv returned %b %m.\n", this, ret), 4);
00252 
00253   if (ret <= 0) {
00254     return -1;
00255   }
00256 
00257   passive_setup_buffer_.wr_ptr(ret);
00258   // Parse the setup message: <len><addr><prio>
00259   // len and prio are network order 32-bit ints
00260   // addr is a string of length len, including null
00261   ACE_UINT32 nlen = 0;
00262 
00263   if (passive_setup_buffer_.length() >= sizeof(nlen)) {
00264 
00265     ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen));
00266     passive_setup_buffer_.rd_ptr(sizeof(nlen));
00267     ACE_UINT32 hlen = ntohl(nlen);
00268     passive_setup_buffer_.size(hlen + 2 * sizeof(nlen));
00269 
00270     ACE_UINT32 nprio = 0;
00271 
00272     if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) {
00273 
00274       const std::string bufstr(passive_setup_buffer_.rd_ptr());
00275       const NetworkAddress network_order_address(bufstr);
00276       network_order_address.to_addr(remote_address_);
00277 
00278       ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio));
00279       transport_priority_ = ntohl(nprio);
00280 
00281       passive_setup_buffer_.reset();
00282       passive_setup_ = false;
00283 
00284       VDBG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input "
00285             "%@ %C:%d->%C:%d, priority==%d, reconnect_state = %d\n", this,
00286             remote_address_.get_host_addr(), remote_address_.get_port_number(),
00287             local_address_.get_host_addr(), local_address_.get_port_number(),
00288             transport_priority_, reconnect_state_));
00289 
00290       // remove from reactor, normal recv strategy setup will add us back
00291       if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) {
00292         VDBG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input "
00293               "remove_handler failed %m.\n"));
00294       }
00295 
00296       const TcpConnection_rch self(this, false);
00297 
00298       transport_during_setup_->passive_connection(remote_address_, self);
00299       transport_during_setup_ = 0;
00300       connected_ = true;
00301 
00302       return 0;
00303     }
00304   }
00305 
00306   passive_setup_buffer_.rd_ptr(passive_setup_buffer_.base());
00307 
00308   return 0;
00309 }
00310 
00311 int
00312 OpenDDS::DCPS::TcpConnection::handle_input(ACE_HANDLE fd)
00313 {
00314   DBG_ENTRY_LVL("TcpConnection","handle_input",6);
00315 
00316   if (passive_setup_) {
00317     return handle_setup_input(fd);
00318   }
00319 
00320   if (receive_strategy_.is_nil()) {
00321     return 0;
00322   }
00323 
00324   return receive_strategy_->handle_dds_input(fd);
00325 }
00326 
00327 int
00328 OpenDDS::DCPS::TcpConnection::handle_output(ACE_HANDLE)
00329 {
00330   DBG_ENTRY_LVL("TcpConnection","handle_output",6);
00331 
00332   if (!this->send_strategy_.is_nil()) {
00333     if (DCPS_debug_level > 9) {
00334       ACE_DEBUG((LM_DEBUG,
00335                  ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ")
00336                  ACE_TEXT("sending queued data.\n"),
00337                  id_));
00338     }
00339 
00340     // Process data to be sent from the queue.
00341     if (ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO
00342         != send_strategy_->perform_work()) {
00343 
00344       // Stop handling output ready events when there is nothing to output.
00345       // N.B. This calls back into the reactor.  Is the reactor lock
00346       //      recursive?
00347       send_strategy_->schedule_output();
00348     }
00349   }
00350 
00351   return 0;
00352 }
00353 
00354 int
00355 OpenDDS::DCPS::TcpConnection::close(u_long)
00356 {
00357   DBG_ENTRY_LVL("TcpConnection","close",6);
00358 
00359   // TBD SOON - Find out exactly when close() is called.
00360   //            I have no clue when and who might call this.
00361 
00362   if (!this->send_strategy_.is_nil())
00363     this->send_strategy_->terminate_send();
00364 
00365   this->disconnect();
00366 
00367   return 0;
00368 }
00369 
00370 int
00371 OpenDDS::DCPS::TcpConnection::handle_close(ACE_HANDLE, ACE_Reactor_Mask)
00372 {
00373   DBG_ENTRY_LVL("TcpConnection","handle_close",6);
00374 
00375   if (DCPS_debug_level >= 1) {
00376     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C:%d.\n",
00377                this->link_->get_transport_impl()->config()->name().c_str(),
00378                this->remote_address_.get_host_addr(),
00379                this->remote_address_.get_port_number()));
00380   }
00381 
00382   bool graceful = !this->receive_strategy_.is_nil() && this->receive_strategy_->gracefully_disconnected();
00383 
00384   if (!this->send_strategy_.is_nil()) {
00385     if (graceful) {
00386       this->send_strategy_->terminate_send();
00387     } else {
00388       this->send_strategy_->suspend_send();
00389     }
00390   }
00391 
00392   this->disconnect();
00393 
00394   if (graceful) {
00395     this->link_->notify(DataLink::DISCONNECTED);
00396   } else {
00397     ReconnectOpType op = DO_RECONNECT;
00398     this->reconnect_task_.add(op);
00399   }
00400 
00401   return 0;
00402 }
00403 
00404 void
00405 OpenDDS::DCPS::TcpConnection::set_sock_options(TcpInst* tcp_config)
00406 {
00407 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00408   int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00409   int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00410   //ACE_SOCK_Stream sock = ACE_static_cast(ACE_SOCK_Stream, this->peer() );
00411 #  if !defined (ACE_LACKS_SOCKET_BUFSIZ)
00412 
00413   // A little screwy double negative logic: disabling nagle involves
00414   // enabling TCP_NODELAY
00415   int opt = (tcp_config->enable_nagle_algorithm_ == false);
00416 
00417   if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
00418     ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n"));
00419   }
00420 
00421   if (this->peer().set_option(SOL_SOCKET,
00422                               SO_SNDBUF,
00423                               (void *) &snd_size,
00424                               sizeof(snd_size)) == -1
00425       && errno != ENOTSUP) {
00426     ACE_ERROR((LM_ERROR,
00427                "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
00428                snd_size));
00429     return;
00430   }
00431 
00432   if (this->peer().set_option(SOL_SOCKET,
00433                               SO_RCVBUF,
00434                               (void *) &rcv_size,
00435                               sizeof(int)) == -1
00436       && errno != ENOTSUP) {
00437     ACE_ERROR((LM_ERROR,
00438                "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m \n",
00439                rcv_size));
00440     return;
00441   }
00442 
00443 #  else
00444   ACE_UNUSED_ARG(tcp_config);
00445   ACE_UNUSED_ARG(snd_size);
00446   ACE_UNUSED_ARG(rcv_size);
00447 #  endif /* !ACE_LACKS_SOCKET_BUFSIZ */
00448 
00449 #else
00450   ACE_UNUSED_ARG(tcp_config);
00451 #endif /* !ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
00452 }
00453 
00454 int
00455 OpenDDS::DCPS::TcpConnection::active_establishment(bool initiate_connect)
00456 {
00457   DBG_ENTRY_LVL("TcpConnection","active_establishment",6);
00458 
00459   // Safety check - This should not happen since is_connector_ defaults to
00460   // true and the role in a connection connector is not changed when reconnecting.
00461   if (this->is_connector_ == false) {
00462     ACE_ERROR_RETURN((LM_ERROR,
00463                       "(%P|%t) ERROR: Failed to connect because it's previously an acceptor.\n"),
00464                      -1);
00465   }
00466 
00467   if (this->shutdown_)
00468     return -1;
00469 
00470   // Now use a connector object to establish the connection.
00471   ACE_SOCK_Connector connector;
00472 
00473   if (initiate_connect && connector.connect(this->peer(), remote_address_) != 0) {
00474 
00475     ACE_ERROR_RETURN((LM_ERROR,
00476                       ACE_TEXT("(%P|%t) ERROR: Failed to connect. %p\n%C"),
00477                       ACE_TEXT("connect"), this->tcp_config_->dump_to_str().c_str()),
00478                      -1);
00479 
00480   } else {
00481     this->connected_ = true;
00482     const std::string remote_host = this->remote_address_.get_host_addr();
00483     VDBG((LM_DEBUG, "(%P|%t) DBG:   active_establishment(%C:%d->%C:%d)\n",
00484           this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00485           remote_host.c_str(), this->remote_address_.get_port_number()));
00486   }
00487 
00488   // Set the DiffServ codepoint according to the priority value.
00489   DirectPriorityMapper mapper(this->transport_priority_);
00490   this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer());
00491 
00492   set_sock_options(tcp_config_.in());
00493 
00494   // In order to complete the connection establishment from the active
00495   // side, we need to tell the remote side about our public address.
00496   // It will use that as an "identifier" of sorts.  To the other
00497   // (passive) side, our local_address that we send here will be known
00498   // as the remote_address.
00499   std::string address = tcp_config_->get_public_address();
00500   ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1;
00501 
00502   ACE_UINT32 nlen = htonl(len);
00503 
00504   if (this->peer().send_n(&nlen,
00505                           sizeof(ACE_UINT32)) == -1) {
00506     // TBD later - Anything we are supposed to do to close the connection.
00507     ACE_ERROR_RETURN((LM_ERROR,
00508                       "(%P|%t) ERROR: Unable to send address string length to "
00509                       "the passive side to complete the active connection "
00510                       "establishment.\n"),
00511                      -1);
00512   }
00513 
00514   if (this->peer().send_n(address.c_str(), len)  == -1) {
00515     // TBD later - Anything we are supposed to do to close the connection.
00516     ACE_ERROR_RETURN((LM_ERROR,
00517                       "(%P|%t) ERROR: Unable to send our address to "
00518                       "the passive side to complete the active connection "
00519                       "establishment.\n"),
00520                      -1);
00521   }
00522 
00523   ACE_UINT32 npriority = htonl(this->transport_priority_);
00524 
00525   if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) {
00526     // TBD later - Anything we are supposed to do to close the connection.
00527     ACE_ERROR_RETURN((LM_ERROR,
00528                       "(%P|%t) ERROR: Unable to send publication priority to "
00529                       "the passive side to complete the active connection "
00530                       "establishment.\n"),
00531                      -1);
00532   }
00533 
00534   return 0;
00535 }
00536 
00537 /// This function is called to re-establish the connection. If this object
00538 /// is the connector side of the connection then it tries to reconnect to the
00539 /// remote, if it's the acceptor side of the connection then it schedules a timer
00540 /// to check if it passively accepted a connection from remote.
00541 /// The on_new_association true indicates this is called when the connection is
00542 /// previous lost and new association is added. The connector side needs to try to
00543 /// actively reconnect to remote.
00544 int
00545 OpenDDS::DCPS::TcpConnection::reconnect(bool on_new_association)
00546 {
00547   DBG_ENTRY_LVL("TcpConnection","reconnect",6);
00548   if (DCPS_debug_level >= 1) {
00549     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect initiated on transport: %C to %C:%d.\n",
00550                this->link_->get_transport_impl()->config()->name().c_str(),
00551                this->remote_address_.get_host_addr(),
00552                this->remote_address_.get_port_number()));
00553   }
00554 
00555   if (on_new_association) {
00556     if (DCPS_debug_level >= 1) {
00557       ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect on new association\n"));
00558     }
00559     return this->active_reconnect_on_new_association();
00560   }
00561 
00562   // If on_new_association is false, it's called by the reconnect task.
00563   // We need make sure if the link release is pending. If does, do
00564   // not try to reconnect.
00565   else if (!this->link_->is_release_pending()) {
00566     if (DCPS_debug_level >= 1) {
00567       ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect release not currently pending\n"));
00568     }
00569     // Try to reconnect if it's connector previously.
00570     if (this->is_connector_ && this->active_reconnect_i() == -1) {
00571       if (DCPS_debug_level >= 1) {
00572         ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is connector but active_reconnect_i failed\n"));
00573       }
00574       return -1;
00575     }
00576 
00577     // Schedule a timer to see if a incoming connection is accepted when timeout.
00578     else if (!this->is_connector_ && this->passive_reconnect_i() == -1) {
00579       if (DCPS_debug_level >= 1) {
00580         ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is acceptor but passive_reconnect_i failed\n"));
00581       }
00582       return -1;
00583     }
00584 
00585   }
00586   if (DCPS_debug_level >= 1) {
00587     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect returning 0\n"));
00588   }
00589   return 0;
00590 }
00591 
00592 int
00593 OpenDDS::DCPS::TcpConnection::active_open()
00594 {
00595   DBG_ENTRY_LVL("TcpConnection","active_open",6);
00596 
00597   GuardType guard(reconnect_lock_);
00598 
00599   if (connected_.value()) {
00600     return 0;
00601   }
00602 
00603   return active_establishment(false /* !initiate_connect */);
00604 }
00605 
00606 int
00607 OpenDDS::DCPS::TcpConnection::active_reconnect_on_new_association()
00608 {
00609   DBG_ENTRY_LVL("TcpConnection","active_reconnect_on_new_association",6);
00610   GuardType guard(this->reconnect_lock_);
00611 
00612   if (this->connected_ == true)
00613     return 0;
00614 
00615   else if (this->active_establishment() == 0) {
00616     this->reconnect_state_ = INIT_STATE;
00617     this->send_strategy_->resume_send();
00618     return 0;
00619   }
00620 
00621   return -1;
00622 }
00623 
00624 // This method is called on acceptor side when the lost connection is detected.
00625 // A timer is scheduled to check if a new connection is created within the
00626 // passive_reconnect_duration_ period.
00627 int
00628 OpenDDS::DCPS::TcpConnection::passive_reconnect_i()
00629 {
00630   DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6);
00631   GuardType guard(this->reconnect_lock_);
00632 
00633   // The passive_reconnect_timer_id_ is used as flag to allow the timer scheduled just once.
00634   if (this->reconnect_state_ == INIT_STATE) {
00635     // Mark the connection lost since the recv/send just failed.
00636     this->connected_ = false;
00637 
00638     if (this->tcp_config_->passive_reconnect_duration_ == 0)
00639       return -1;
00640 
00641     ACE_Time_Value timeout(this->tcp_config_->passive_reconnect_duration_/1000,
00642                            this->tcp_config_->passive_reconnect_duration_%1000 * 1000);
00643     this->reconnect_state_ = PASSIVE_WAITING_STATE;
00644     this->link_->notify(DataLink::DISCONNECTED);
00645 
00646     // It is possible that the passive reconnect is called after the new connection
00647     // is accepted and the receive_strategy of this old connection is reset to nil.
00648     if (!this->receive_strategy_.is_nil()) {
00649       TcpReceiveStrategy* rs
00650         = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00651 
00652       // Give a copy to reactor.
00653       this->_add_ref();
00654       this->passive_reconnect_timer_id_ = rs->get_reactor()->schedule_timer(this, 0, timeout);
00655 
00656       if (this->passive_reconnect_timer_id_ == -1) {
00657         this->_remove_ref();
00658         ACE_ERROR_RETURN((LM_ERROR,
00659                           ACE_TEXT("(%P|%t) ERROR: TcpConnection::passive_reconnect_i")
00660                           ACE_TEXT(", %p.\n"), ACE_TEXT("schedule_timer")),
00661                          -1);
00662       }
00663     }
00664   }
00665 
00666   return 0;
00667 }
00668 
00669 // This is the active reconnect implementation. The backoff algorithm is used as the
00670 // reconnect strategy. e.g.
00671 // With conn_retry_initial_interval = 500, conn_retry_backoff_multiplier = 2.0 and
00672 // conn_retry_attempts = 6 the reconnect attempts will be:
00673 // - first at 0 seconds(upon detection of the disconnect)
00674 // - second at 0.5 seconds
00675 // - third at 1.0 (2*0.5) seconds
00676 // - fourth at 2.0 (2*1.0) seconds
00677 // - fifth at 4.0 (2*2.0) seconds
00678 // - sixth at  8.0 (2*4.0) seconds
00679 int
00680 OpenDDS::DCPS::TcpConnection::active_reconnect_i()
00681 {
00682   DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6);
00683 
00684   GuardType guard(this->reconnect_lock_);
00685 
00686   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00687         "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %d\n",
00688         this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00689         this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00690         this->reconnect_state_));
00691   if (DCPS_debug_level >= 1) {
00692     ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::"
00693           "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %d\n",
00694           this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00695           this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00696           this->reconnect_state_));
00697   }
00698   // We need reset the state to INIT_STATE if we are previously reconnected.
00699   // This would allow re-establishing connection after the re-established
00700   // connection lost again.
00701   if (ACE_OS::gettimeofday() - this->last_reconnect_attempted_ > reconnect_delay
00702       && this->reconnect_state_ == RECONNECTED_STATE) {
00703     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00704           "We are in RECONNECTED_STATE and now flip reconnect state to INIT_STATE.\n"));
00705     this->reconnect_state_ = INIT_STATE;
00706   }
00707 
00708   if (this->reconnect_state_ == INIT_STATE) {
00709     // Suspend send once.
00710     this->send_strategy_->suspend_send();
00711 
00712     this->disconnect();
00713 
00714     if (this->tcp_config_->conn_retry_attempts_ > 0) {
00715       this->link_->notify(DataLink::DISCONNECTED);
00716     }
00717 
00718     // else the conn_retry_attempts is 0 then we do not need this extra
00719     // notify_disconnected() since the user application should get the
00720     // notify_lost() without delay.
00721 
00722     double retry_delay_msec = this->tcp_config_->conn_retry_initial_delay_;
00723     int ret = -1;
00724 
00725     for (int i = 0; i < this->tcp_config_->conn_retry_attempts_; ++i) {
00726       ret = this->active_establishment();
00727 
00728       if (this->shutdown_)
00729         break;
00730 
00731       if (ret == -1) {
00732         ACE_Time_Value delay_tv(((int)retry_delay_msec)/1000,
00733                                 ((int)retry_delay_msec)%1000*1000);
00734         ACE_OS::sleep(delay_tv);
00735         retry_delay_msec *= this->tcp_config_->conn_retry_backoff_multiplier_;
00736 
00737       } else {
00738         break;
00739       }
00740     }
00741 
00742     if (ret == -1) {
00743       if (this->tcp_config_->conn_retry_attempts_ > 0) {
00744         ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00745                    this->link_->get_transport_impl()->config()->name().c_str(),
00746                    this->remote_address_.get_host_addr(),
00747                    this->remote_address_.get_port_number()));
00748 
00749       } else {
00750         ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C:%d.\n",
00751                    this->link_->get_transport_impl()->config()->name().c_str(),
00752                    this->remote_address_.get_host_addr(),
00753                    this->remote_address_.get_port_number()));
00754       }
00755 
00756       this->reconnect_state_ = LOST_STATE;
00757       this->link_->notify(DataLink::LOST);
00758       this->send_strategy_->terminate_send();
00759 
00760     } else {
00761       ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C:%d.\n",
00762                  this->link_->get_transport_impl()->config()->name().c_str(),
00763                  this->remote_address_.get_host_addr(),
00764                  this->remote_address_.get_port_number()));
00765       if (this->receive_strategy_->get_reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) {
00766         ACE_ERROR_RETURN((LM_ERROR,
00767                           "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_i() can't register "
00768                           "with reactor %X %p\n", this, ACE_TEXT("register_handler")),
00769                          -1);
00770       }
00771       this->reconnect_state_ = RECONNECTED_STATE;
00772       this->link_->notify(DataLink::RECONNECTED);
00773       this->send_strategy_->resume_send();
00774     }
00775 
00776     this->last_reconnect_attempted_ = ACE_OS::gettimeofday();
00777   }
00778 
00779   return this->reconnect_state_ == LOST_STATE ? -1 : 0;
00780 }
00781 
00782 /// A timer is scheduled on acceptor side to check if a new connection
00783 /// is accepted after the connection is lost.
00784 int
00785 OpenDDS::DCPS::TcpConnection::handle_timeout(const ACE_Time_Value &,
00786                                              const void *)
00787 {
00788   DBG_ENTRY_LVL("TcpConnection","handle_timeout",6);
00789 
00790   GuardType guard(this->reconnect_lock_);
00791 
00792   switch (this->reconnect_state_) {
00793   case PASSIVE_WAITING_STATE: {
00794     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00795                this->link_->get_transport_impl()->config()->name().c_str(),
00796                this->remote_address_.get_host_addr(),
00797                this->remote_address_.get_port_number()));
00798 
00799     this->reconnect_state_ = PASSIVE_TIMEOUT_CALLED_STATE;
00800     // We stay in PASSIVE_TIMEOUT_CALLED_STATE indicates there is no new connection.
00801     // Now we need declare the connection is lost.
00802     this->link_->notify(DataLink::LOST);
00803 
00804     // The handle_timeout may be called after the connection is re-established
00805     // and the send strategy of this old connection is reset to nil.
00806     if (!this->send_strategy_.is_nil())
00807       this->send_strategy_->terminate_send();
00808 
00809     this->reconnect_state_ = LOST_STATE;
00810 
00811     this->tear_link();
00812 
00813   }
00814   break;
00815 
00816   case RECONNECTED_STATE:
00817     // reconnected successfully.
00818     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C:%d.\n",
00819                this->link_->get_transport_impl()->config()->name().c_str(),
00820                this->remote_address_.get_host_addr(),
00821                this->remote_address_.get_port_number()));
00822     break;
00823 
00824   default :
00825     ACE_ERROR((LM_ERROR,
00826                ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ")
00827                ACE_TEXT(" unknown state or it should not be in state=%d \n"), this->reconnect_state_));
00828     break;
00829   }
00830 
00831   // Take back the "copy" we gave to reactor when we schedule the timer.
00832   this->_remove_ref();
00833 
00834   return 0;
00835 }
00836 
00837 /// This object would be "old" connection object and the provided is the new
00838 /// connection object.  The "old" connection object will copy its states to
00839 /// to the "new" connection object. This is called by the TcpDataLink
00840 /// when a new connection is accepted (with a new TcpConnection object).
00841 /// We need make the state in "new" connection object consistent with the "old"
00842 /// connection object.
00843 void
00844 OpenDDS::DCPS::TcpConnection::transfer(TcpConnection* connection)
00845 {
00846   DBG_ENTRY_LVL("TcpConnection","transfer",6);
00847 
00848   GuardType guard(this->reconnect_lock_);
00849 
00850   bool notify_reconnect = false;
00851 
00852   switch (this->reconnect_state_) {
00853   case INIT_STATE:
00854     // We have not detected the lost connection and the peer is faster than us and
00855     // re-established the connection. so do not notify reconnected.
00856     break;
00857 
00858   case LOST_STATE:
00859 
00860     // The reconnect timed out.
00861   case PASSIVE_TIMEOUT_CALLED_STATE:
00862     // TODO: If the handle_timeout is called before the old connection
00863     // transfer its state to new connection then should we disconnect
00864     // the new connection or keep it alive ?
00865     // I think we should keep the connection, the user will get a
00866     // lost connection notification and then a reconnected notification.
00867     notify_reconnect = true;
00868     break;
00869 
00870   case PASSIVE_WAITING_STATE: {
00871     TcpReceiveStrategy* rs
00872       = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00873 
00874     // Cancel the timer since we got new connection.
00875     if (rs->get_reactor()->cancel_timer(this) == -1) {
00876       ACE_ERROR((LM_ERROR,
00877                  ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00878                  ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
00879 
00880     } else
00881       passive_reconnect_timer_id_ = -1;
00882 
00883     this->_remove_ref();
00884     notify_reconnect = true;
00885   }
00886   break;
00887 
00888   default :
00889     ACE_ERROR((LM_ERROR,
00890                ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00891                ACE_TEXT(" unknown state or it should not be in state=%d \n"), this->reconnect_state_));
00892     break;
00893   }
00894 
00895   // Verify if this acceptor side.
00896   if (this->is_connector_ || connection->is_connector_) {
00897     ACE_ERROR((LM_ERROR,
00898                ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00899                ACE_TEXT(" should NOT be called by the connector side \n")));
00900   }
00901 
00902   this->reconnect_task_.close(1);
00903   connection->receive_strategy_ = this->receive_strategy_;
00904   connection->send_strategy_ = this->send_strategy_;
00905   connection->remote_address_ = this->remote_address_;
00906   connection->local_address_ = this->local_address_;
00907   connection->tcp_config_ = this->tcp_config_;
00908   connection->link_ = this->link_;
00909 
00910   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00911         "transfer(%C:%d->%C:%d) passive reconnected. new con %@   "
00912         " old con %@ \n",
00913         this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00914         this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00915         connection, this));
00916 
00917   if (notify_reconnect) {
00918     this->reconnect_state_ = RECONNECTED_STATE;
00919     this->link_->notify(DataLink::RECONNECTED);
00920   }
00921 
00922 }
00923 
00924 /// This function is called when the backpressure occurs and timed out after
00925 /// "max_output_pause_period". The lost connection notification should be sent
00926 /// and the connection needs be closed since we declared it as a "lost"
00927 /// connection.
00928 void
00929 OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout()
00930 {
00931   DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6);
00932   bool notify_lost = false;
00933   {
00934     GuardType guard(this->reconnect_lock_);
00935 
00936     if (this->reconnect_state_ == INIT_STATE) {
00937       this->reconnect_state_ = LOST_STATE;
00938       notify_lost = true;
00939 
00940       this->disconnect();
00941     }
00942   }
00943 
00944   if (notify_lost) {
00945     this->link_->notify(DataLink::LOST);
00946     this->send_strategy_->terminate_send();
00947   }
00948 
00949 }
00950 
00951 /// This is called by TcpSendStrategy when a send fails
00952 /// and a reconnect should be initiated. This method
00953 /// suspends any sends and kicks the reconnect thread into
00954 /// action.
00955 void
00956 OpenDDS::DCPS::TcpConnection::relink_from_send(bool do_suspend)
00957 {
00958   DBG_ENTRY_LVL("TcpConnection","relink_from_send",6);
00959 
00960   if (do_suspend && !this->send_strategy_.is_nil())
00961     this->send_strategy_->suspend_send();
00962 
00963   ReconnectOpType op = DO_RECONNECT;
00964   this->reconnect_task_.add(op);
00965 }
00966 
00967 /// This is called by TcpReceiveStrategy when a disconnect
00968 /// is detected.  It simply suspends any sends and lets
00969 /// the handle_close() handle the reconnect logic.
00970 void
00971 OpenDDS::DCPS::TcpConnection::relink_from_recv(bool do_suspend)
00972 {
00973   DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6);
00974 
00975   if (do_suspend && !this->send_strategy_.is_nil())
00976     this->send_strategy_->suspend_send();
00977 }
00978 
00979 bool
00980 OpenDDS::DCPS::TcpConnection::tear_link()
00981 {
00982   DBG_ENTRY_LVL("TcpConnection","tear_link",6);
00983 
00984   return this->link_->release_resources();
00985 }
00986 
00987 void
00988 OpenDDS::DCPS::TcpConnection::shutdown()
00989 {
00990   DBG_ENTRY_LVL("TcpConnection","shutdown",6);
00991   this->shutdown_ = true;
00992 
00993   this->reconnect_task_.close(1);
00994 
00995 }

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7