00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpConnection.h"
00010 #include "TcpTransport.h"
00011 #include "TcpInst.h"
00012 #include "TcpDataLink.h"
00013 #include "TcpReceiveStrategy.h"
00014 #include "TcpSendStrategy.h"
00015 #include "TcpReconnectTask.h"
00016 #include "dds/DCPS/transport/framework/DirectPriorityMapper.h"
00017 #include "dds/DCPS/transport/framework/PriorityKey.h"
00018
00019 #include "ace/os_include/netinet/os_tcp.h"
00020 #include "ace/OS_NS_arpa_inet.h"
00021 #include <sstream>
00022 #include <string>
00023
00024 #if !defined (__ACE_INLINE__)
00025 #include "TcpConnection.inl"
00026 #endif
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 const ACE_Time_Value reconnect_delay(2);
00040
00041 OpenDDS::DCPS::TcpConnection::TcpConnection()
00042 : connected_(false)
00043 , is_connector_(false)
00044 , passive_reconnect_timer_id_(-1)
00045 , reconnect_task_(this)
00046 , reconnect_state_(INIT_STATE)
00047 , last_reconnect_attempted_(ACE_Time_Value::zero)
00048 , transport_priority_(0)
00049 , shutdown_(false)
00050 , passive_setup_(false)
00051 , passive_setup_buffer_(sizeof(ACE_UINT32))
00052 , id_(0)
00053 {
00054 DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00055
00056 if (this->reconnect_task_.open()) {
00057 ACE_ERROR((LM_ERROR,
00058 ACE_TEXT("(%P|%t) ERROR: Reconnect task failed to open : %p\n"),
00059 ACE_TEXT("open")));
00060 }
00061
00062 }
00063
00064 OpenDDS::DCPS::TcpConnection::TcpConnection(const ACE_INET_Addr& remote_address,
00065 Priority priority,
00066 const TcpInst_rch& config)
00067 : connected_(false)
00068 , is_connector_(true)
00069 , remote_address_(remote_address)
00070 , local_address_(config->local_address())
00071 , tcp_config_(config)
00072 , passive_reconnect_timer_id_(-1)
00073 , reconnect_task_(this)
00074 , reconnect_state_(INIT_STATE)
00075 , last_reconnect_attempted_(ACE_Time_Value::zero)
00076 , transport_priority_(priority)
00077 , shutdown_(false)
00078 , passive_setup_(false)
00079 {
00080 DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00081
00082
00083 if (this->reconnect_task_.open()) {
00084 ACE_ERROR((LM_ERROR,
00085 ACE_TEXT("(%P|%t) ERROR: Reconnect task failed to open : %p\n"),
00086 ACE_TEXT("open")));
00087 }
00088
00089 }
00090 OpenDDS::DCPS::TcpConnection::~TcpConnection()
00091 {
00092 DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6);
00093
00094
00095
00096 this->reconnect_task_.close(1);
00097
00098
00099 if (!this->link_.is_nil()) {
00100 if (Transport_debug_level > 5) {
00101 ACE_DEBUG((LM_DEBUG,
00102 ACE_TEXT("(%P|%t) TcpConnection::~TcpConnection: about to notify link[%@] connection deleted\n"),
00103 this->link_.in()));
00104 }
00105 this->link_->notify_connection_deleted();
00106 }
00107
00108 }
00109
00110 void
00111 OpenDDS::DCPS::TcpConnection::disconnect()
00112 {
00113 DBG_ENTRY_LVL("TcpConnection","disconnect",6);
00114 this->connected_ = false;
00115
00116 if (!this->receive_strategy_.is_nil()) {
00117 this->receive_strategy_->get_reactor()->remove_handler(this,
00118 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00119 }
00120
00121 this->peer().close();
00122
00123 }
00124
00125
00126
00127 void
00128 OpenDDS::DCPS::TcpConnection::set_receive_strategy
00129 (TcpReceiveStrategy* receive_strategy)
00130 {
00131 DBG_ENTRY_LVL("TcpConnection","set_receive_strategy",6);
00132
00133
00134 receive_strategy->_add_ref();
00135 this->receive_strategy_ = receive_strategy;
00136 }
00137
00138 void
00139 OpenDDS::DCPS::TcpConnection::set_send_strategy
00140 (TcpSendStrategy* send_strategy)
00141 {
00142 DBG_ENTRY_LVL("TcpConnection","set_send_strategy",6);
00143
00144
00145 send_strategy->_add_ref();
00146 this->send_strategy_ = send_strategy;
00147 }
00148
00149 int
00150 OpenDDS::DCPS::TcpConnection::open(void* arg)
00151 {
00152 DBG_ENTRY_LVL("TcpConnection","open",6);
00153
00154 if (is_connector_) {
00155
00156 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open active.\n"), 2);
00157
00158 const TcpConnection_rch self(this);
00159 const TcpTransport_rch transport = link_->get_transport_impl();
00160
00161 const bool is_loop(local_address_ == remote_address_);
00162 const PriorityKey key(transport_priority_, remote_address_,
00163 is_loop, false );
00164
00165 int active_open_ = active_open();
00166
00167 int connect_tcp_datalink_ = transport->connect_tcp_datalink(link_, self);
00168
00169 if (active_open_ == -1 || connect_tcp_datalink_ == -1) {
00170
00171
00172
00173 transport->async_connect_failed(key);
00174
00175 return -1;
00176 }
00177
00178 return 0;
00179 }
00180
00181
00182
00183
00184 TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg);
00185
00186 if (acceptor == 0) {
00187
00188 ACE_ERROR_RETURN((LM_ERROR,
00189 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00190 ACE_TEXT("failed to cast void* arg to ")
00191 ACE_TEXT("TcpAcceptor* type.\n")),
00192 -1);
00193 }
00194
00195
00196
00197 TcpTransport_rch transport = acceptor->transport();
00198
00199 if (transport.is_nil()) {
00200
00201 ACE_ERROR_RETURN((LM_ERROR,
00202 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00203 ACE_TEXT("acceptor's transport is nil.\n")),
00204 -1);
00205 }
00206
00207 TcpInst* tcp_config = acceptor->get_configuration();
00208
00209
00210
00211 tcp_config->_add_ref();
00212 tcp_config_ = tcp_config;
00213 local_address_ = tcp_config_->local_address();
00214
00215 set_sock_options(tcp_config_.in());
00216
00217
00218
00219
00220
00221 passive_setup_ = true;
00222 transport_during_setup_ = transport;
00223 passive_setup_buffer_.size(sizeof(ACE_UINT32));
00224
00225 if (reactor()->register_handler(this, READ_MASK) == -1) {
00226 ACE_ERROR_RETURN((LM_ERROR,
00227 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00228 ACE_TEXT("unable to register with the reactor.%p\n"),
00229 ACE_TEXT("register_handler")),
00230 -1);
00231 }
00232
00233 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open passive handle=%d.\n",
00234 static_cast<int>(intptr_t(get_handle()))), 2);
00235
00236 return 0;
00237 }
00238
00239 int
00240 OpenDDS::DCPS::TcpConnection::handle_setup_input(ACE_HANDLE )
00241 {
00242 const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(),
00243 passive_setup_buffer_.space(),
00244 &ACE_Time_Value::zero);
00245
00246 if (ret < 0 && errno == ETIME) {
00247 return 0;
00248 }
00249
00250 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input %@ "
00251 "recv returned %b %m.\n", this, ret), 4);
00252
00253 if (ret <= 0) {
00254 return -1;
00255 }
00256
00257 passive_setup_buffer_.wr_ptr(ret);
00258
00259
00260
00261 ACE_UINT32 nlen = 0;
00262
00263 if (passive_setup_buffer_.length() >= sizeof(nlen)) {
00264
00265 ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen));
00266 passive_setup_buffer_.rd_ptr(sizeof(nlen));
00267 ACE_UINT32 hlen = ntohl(nlen);
00268 passive_setup_buffer_.size(hlen + 2 * sizeof(nlen));
00269
00270 ACE_UINT32 nprio = 0;
00271
00272 if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) {
00273
00274 const std::string bufstr(passive_setup_buffer_.rd_ptr());
00275 const NetworkAddress network_order_address(bufstr);
00276 network_order_address.to_addr(remote_address_);
00277
00278 ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio));
00279 transport_priority_ = ntohl(nprio);
00280
00281 passive_setup_buffer_.reset();
00282 passive_setup_ = false;
00283
00284 VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input "
00285 "%@ %C:%d->%C:%d, priority==%d, reconnect_state = %d\n", this,
00286 remote_address_.get_host_addr(), remote_address_.get_port_number(),
00287 local_address_.get_host_addr(), local_address_.get_port_number(),
00288 transport_priority_, reconnect_state_));
00289
00290
00291 if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) {
00292 VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input "
00293 "remove_handler failed %m.\n"));
00294 }
00295
00296 const TcpConnection_rch self(this, false);
00297
00298 transport_during_setup_->passive_connection(remote_address_, self);
00299 transport_during_setup_ = 0;
00300 connected_ = true;
00301
00302 return 0;
00303 }
00304 }
00305
00306 passive_setup_buffer_.rd_ptr(passive_setup_buffer_.base());
00307
00308 return 0;
00309 }
00310
00311 int
00312 OpenDDS::DCPS::TcpConnection::handle_input(ACE_HANDLE fd)
00313 {
00314 DBG_ENTRY_LVL("TcpConnection","handle_input",6);
00315
00316 if (passive_setup_) {
00317 return handle_setup_input(fd);
00318 }
00319
00320 if (receive_strategy_.is_nil()) {
00321 return 0;
00322 }
00323
00324 return receive_strategy_->handle_dds_input(fd);
00325 }
00326
00327 int
00328 OpenDDS::DCPS::TcpConnection::handle_output(ACE_HANDLE)
00329 {
00330 DBG_ENTRY_LVL("TcpConnection","handle_output",6);
00331
00332 if (!this->send_strategy_.is_nil()) {
00333 if (DCPS_debug_level > 9) {
00334 ACE_DEBUG((LM_DEBUG,
00335 ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ")
00336 ACE_TEXT("sending queued data.\n"),
00337 id_));
00338 }
00339
00340
00341 if (ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO
00342 != send_strategy_->perform_work()) {
00343
00344
00345
00346
00347 send_strategy_->schedule_output();
00348 }
00349 }
00350
00351 return 0;
00352 }
00353
00354 int
00355 OpenDDS::DCPS::TcpConnection::close(u_long)
00356 {
00357 DBG_ENTRY_LVL("TcpConnection","close",6);
00358
00359
00360
00361
00362 if (!this->send_strategy_.is_nil())
00363 this->send_strategy_->terminate_send();
00364
00365 this->disconnect();
00366
00367 return 0;
00368 }
00369
00370 int
00371 OpenDDS::DCPS::TcpConnection::handle_close(ACE_HANDLE, ACE_Reactor_Mask)
00372 {
00373 DBG_ENTRY_LVL("TcpConnection","handle_close",6);
00374
00375 if (DCPS_debug_level >= 1) {
00376 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C:%d.\n",
00377 this->link_->get_transport_impl()->config()->name().c_str(),
00378 this->remote_address_.get_host_addr(),
00379 this->remote_address_.get_port_number()));
00380 }
00381
00382 bool graceful = !this->receive_strategy_.is_nil() && this->receive_strategy_->gracefully_disconnected();
00383
00384 if (!this->send_strategy_.is_nil()) {
00385 if (graceful) {
00386 this->send_strategy_->terminate_send();
00387 } else {
00388 this->send_strategy_->suspend_send();
00389 }
00390 }
00391
00392 this->disconnect();
00393
00394 if (graceful) {
00395 this->link_->notify(DataLink::DISCONNECTED);
00396 } else {
00397 ReconnectOpType op = DO_RECONNECT;
00398 this->reconnect_task_.add(op);
00399 }
00400
00401 return 0;
00402 }
00403
00404 void
00405 OpenDDS::DCPS::TcpConnection::set_sock_options(TcpInst* tcp_config)
00406 {
00407 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00408 int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00409 int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00410
00411 # if !defined (ACE_LACKS_SOCKET_BUFSIZ)
00412
00413
00414
00415 int opt = (tcp_config->enable_nagle_algorithm_ == false);
00416
00417 if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
00418 ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n"));
00419 }
00420
00421 if (this->peer().set_option(SOL_SOCKET,
00422 SO_SNDBUF,
00423 (void *) &snd_size,
00424 sizeof(snd_size)) == -1
00425 && errno != ENOTSUP) {
00426 ACE_ERROR((LM_ERROR,
00427 "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
00428 snd_size));
00429 return;
00430 }
00431
00432 if (this->peer().set_option(SOL_SOCKET,
00433 SO_RCVBUF,
00434 (void *) &rcv_size,
00435 sizeof(int)) == -1
00436 && errno != ENOTSUP) {
00437 ACE_ERROR((LM_ERROR,
00438 "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m \n",
00439 rcv_size));
00440 return;
00441 }
00442
00443 # else
00444 ACE_UNUSED_ARG(tcp_config);
00445 ACE_UNUSED_ARG(snd_size);
00446 ACE_UNUSED_ARG(rcv_size);
00447 # endif
00448
00449 #else
00450 ACE_UNUSED_ARG(tcp_config);
00451 #endif
00452 }
00453
00454 int
00455 OpenDDS::DCPS::TcpConnection::active_establishment(bool initiate_connect)
00456 {
00457 DBG_ENTRY_LVL("TcpConnection","active_establishment",6);
00458
00459
00460
00461 if (this->is_connector_ == false) {
00462 ACE_ERROR_RETURN((LM_ERROR,
00463 "(%P|%t) ERROR: Failed to connect because it's previously an acceptor.\n"),
00464 -1);
00465 }
00466
00467 if (this->shutdown_)
00468 return -1;
00469
00470
00471 ACE_SOCK_Connector connector;
00472
00473 if (initiate_connect && connector.connect(this->peer(), remote_address_) != 0) {
00474
00475 ACE_ERROR_RETURN((LM_ERROR,
00476 ACE_TEXT("(%P|%t) ERROR: Failed to connect. %p\n%C"),
00477 ACE_TEXT("connect"), this->tcp_config_->dump_to_str().c_str()),
00478 -1);
00479
00480 } else {
00481 this->connected_ = true;
00482 const std::string remote_host = this->remote_address_.get_host_addr();
00483 VDBG((LM_DEBUG, "(%P|%t) DBG: active_establishment(%C:%d->%C:%d)\n",
00484 this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00485 remote_host.c_str(), this->remote_address_.get_port_number()));
00486 }
00487
00488
00489 DirectPriorityMapper mapper(this->transport_priority_);
00490 this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer());
00491
00492 set_sock_options(tcp_config_.in());
00493
00494
00495
00496
00497
00498
00499 std::string address = tcp_config_->get_public_address();
00500 ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1;
00501
00502 ACE_UINT32 nlen = htonl(len);
00503
00504 if (this->peer().send_n(&nlen,
00505 sizeof(ACE_UINT32)) == -1) {
00506
00507 ACE_ERROR_RETURN((LM_ERROR,
00508 "(%P|%t) ERROR: Unable to send address string length to "
00509 "the passive side to complete the active connection "
00510 "establishment.\n"),
00511 -1);
00512 }
00513
00514 if (this->peer().send_n(address.c_str(), len) == -1) {
00515
00516 ACE_ERROR_RETURN((LM_ERROR,
00517 "(%P|%t) ERROR: Unable to send our address to "
00518 "the passive side to complete the active connection "
00519 "establishment.\n"),
00520 -1);
00521 }
00522
00523 ACE_UINT32 npriority = htonl(this->transport_priority_);
00524
00525 if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) {
00526
00527 ACE_ERROR_RETURN((LM_ERROR,
00528 "(%P|%t) ERROR: Unable to send publication priority to "
00529 "the passive side to complete the active connection "
00530 "establishment.\n"),
00531 -1);
00532 }
00533
00534 return 0;
00535 }
00536
00537
00538
00539
00540
00541
00542
00543
00544 int
00545 OpenDDS::DCPS::TcpConnection::reconnect(bool on_new_association)
00546 {
00547 DBG_ENTRY_LVL("TcpConnection","reconnect",6);
00548 if (DCPS_debug_level >= 1) {
00549 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect initiated on transport: %C to %C:%d.\n",
00550 this->link_->get_transport_impl()->config()->name().c_str(),
00551 this->remote_address_.get_host_addr(),
00552 this->remote_address_.get_port_number()));
00553 }
00554
00555 if (on_new_association) {
00556 if (DCPS_debug_level >= 1) {
00557 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect on new association\n"));
00558 }
00559 return this->active_reconnect_on_new_association();
00560 }
00561
00562
00563
00564
00565 else if (!this->link_->is_release_pending()) {
00566 if (DCPS_debug_level >= 1) {
00567 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect release not currently pending\n"));
00568 }
00569
00570 if (this->is_connector_ && this->active_reconnect_i() == -1) {
00571 if (DCPS_debug_level >= 1) {
00572 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is connector but active_reconnect_i failed\n"));
00573 }
00574 return -1;
00575 }
00576
00577
00578 else if (!this->is_connector_ && this->passive_reconnect_i() == -1) {
00579 if (DCPS_debug_level >= 1) {
00580 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is acceptor but passive_reconnect_i failed\n"));
00581 }
00582 return -1;
00583 }
00584
00585 }
00586 if (DCPS_debug_level >= 1) {
00587 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect returning 0\n"));
00588 }
00589 return 0;
00590 }
00591
00592 int
00593 OpenDDS::DCPS::TcpConnection::active_open()
00594 {
00595 DBG_ENTRY_LVL("TcpConnection","active_open",6);
00596
00597 GuardType guard(reconnect_lock_);
00598
00599 if (connected_.value()) {
00600 return 0;
00601 }
00602
00603 return active_establishment(false );
00604 }
00605
00606 int
00607 OpenDDS::DCPS::TcpConnection::active_reconnect_on_new_association()
00608 {
00609 DBG_ENTRY_LVL("TcpConnection","active_reconnect_on_new_association",6);
00610 GuardType guard(this->reconnect_lock_);
00611
00612 if (this->connected_ == true)
00613 return 0;
00614
00615 else if (this->active_establishment() == 0) {
00616 this->reconnect_state_ = INIT_STATE;
00617 this->send_strategy_->resume_send();
00618 return 0;
00619 }
00620
00621 return -1;
00622 }
00623
00624
00625
00626
00627 int
00628 OpenDDS::DCPS::TcpConnection::passive_reconnect_i()
00629 {
00630 DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6);
00631 GuardType guard(this->reconnect_lock_);
00632
00633
00634 if (this->reconnect_state_ == INIT_STATE) {
00635
00636 this->connected_ = false;
00637
00638 if (this->tcp_config_->passive_reconnect_duration_ == 0)
00639 return -1;
00640
00641 ACE_Time_Value timeout(this->tcp_config_->passive_reconnect_duration_/1000,
00642 this->tcp_config_->passive_reconnect_duration_%1000 * 1000);
00643 this->reconnect_state_ = PASSIVE_WAITING_STATE;
00644 this->link_->notify(DataLink::DISCONNECTED);
00645
00646
00647
00648 if (!this->receive_strategy_.is_nil()) {
00649 TcpReceiveStrategy* rs
00650 = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00651
00652
00653 this->_add_ref();
00654 this->passive_reconnect_timer_id_ = rs->get_reactor()->schedule_timer(this, 0, timeout);
00655
00656 if (this->passive_reconnect_timer_id_ == -1) {
00657 this->_remove_ref();
00658 ACE_ERROR_RETURN((LM_ERROR,
00659 ACE_TEXT("(%P|%t) ERROR: TcpConnection::passive_reconnect_i")
00660 ACE_TEXT(", %p.\n"), ACE_TEXT("schedule_timer")),
00661 -1);
00662 }
00663 }
00664 }
00665
00666 return 0;
00667 }
00668
00669
00670
00671
00672
00673
00674
00675
00676
00677
00678
00679 int
00680 OpenDDS::DCPS::TcpConnection::active_reconnect_i()
00681 {
00682 DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6);
00683
00684 GuardType guard(this->reconnect_lock_);
00685
00686 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00687 "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %d\n",
00688 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00689 this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00690 this->reconnect_state_));
00691 if (DCPS_debug_level >= 1) {
00692 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::"
00693 "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %d\n",
00694 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00695 this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00696 this->reconnect_state_));
00697 }
00698
00699
00700
00701 if (ACE_OS::gettimeofday() - this->last_reconnect_attempted_ > reconnect_delay
00702 && this->reconnect_state_ == RECONNECTED_STATE) {
00703 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00704 "We are in RECONNECTED_STATE and now flip reconnect state to INIT_STATE.\n"));
00705 this->reconnect_state_ = INIT_STATE;
00706 }
00707
00708 if (this->reconnect_state_ == INIT_STATE) {
00709
00710 this->send_strategy_->suspend_send();
00711
00712 this->disconnect();
00713
00714 if (this->tcp_config_->conn_retry_attempts_ > 0) {
00715 this->link_->notify(DataLink::DISCONNECTED);
00716 }
00717
00718
00719
00720
00721
00722 double retry_delay_msec = this->tcp_config_->conn_retry_initial_delay_;
00723 int ret = -1;
00724
00725 for (int i = 0; i < this->tcp_config_->conn_retry_attempts_; ++i) {
00726 ret = this->active_establishment();
00727
00728 if (this->shutdown_)
00729 break;
00730
00731 if (ret == -1) {
00732 ACE_Time_Value delay_tv(((int)retry_delay_msec)/1000,
00733 ((int)retry_delay_msec)%1000*1000);
00734 ACE_OS::sleep(delay_tv);
00735 retry_delay_msec *= this->tcp_config_->conn_retry_backoff_multiplier_;
00736
00737 } else {
00738 break;
00739 }
00740 }
00741
00742 if (ret == -1) {
00743 if (this->tcp_config_->conn_retry_attempts_ > 0) {
00744 ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00745 this->link_->get_transport_impl()->config()->name().c_str(),
00746 this->remote_address_.get_host_addr(),
00747 this->remote_address_.get_port_number()));
00748
00749 } else {
00750 ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C:%d.\n",
00751 this->link_->get_transport_impl()->config()->name().c_str(),
00752 this->remote_address_.get_host_addr(),
00753 this->remote_address_.get_port_number()));
00754 }
00755
00756 this->reconnect_state_ = LOST_STATE;
00757 this->link_->notify(DataLink::LOST);
00758 this->send_strategy_->terminate_send();
00759
00760 } else {
00761 ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C:%d.\n",
00762 this->link_->get_transport_impl()->config()->name().c_str(),
00763 this->remote_address_.get_host_addr(),
00764 this->remote_address_.get_port_number()));
00765 if (this->receive_strategy_->get_reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) {
00766 ACE_ERROR_RETURN((LM_ERROR,
00767 "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_i() can't register "
00768 "with reactor %X %p\n", this, ACE_TEXT("register_handler")),
00769 -1);
00770 }
00771 this->reconnect_state_ = RECONNECTED_STATE;
00772 this->link_->notify(DataLink::RECONNECTED);
00773 this->send_strategy_->resume_send();
00774 }
00775
00776 this->last_reconnect_attempted_ = ACE_OS::gettimeofday();
00777 }
00778
00779 return this->reconnect_state_ == LOST_STATE ? -1 : 0;
00780 }
00781
00782
00783
00784 int
00785 OpenDDS::DCPS::TcpConnection::handle_timeout(const ACE_Time_Value &,
00786 const void *)
00787 {
00788 DBG_ENTRY_LVL("TcpConnection","handle_timeout",6);
00789
00790 GuardType guard(this->reconnect_lock_);
00791
00792 switch (this->reconnect_state_) {
00793 case PASSIVE_WAITING_STATE: {
00794 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00795 this->link_->get_transport_impl()->config()->name().c_str(),
00796 this->remote_address_.get_host_addr(),
00797 this->remote_address_.get_port_number()));
00798
00799 this->reconnect_state_ = PASSIVE_TIMEOUT_CALLED_STATE;
00800
00801
00802 this->link_->notify(DataLink::LOST);
00803
00804
00805
00806 if (!this->send_strategy_.is_nil())
00807 this->send_strategy_->terminate_send();
00808
00809 this->reconnect_state_ = LOST_STATE;
00810
00811 this->tear_link();
00812
00813 }
00814 break;
00815
00816 case RECONNECTED_STATE:
00817
00818 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C:%d.\n",
00819 this->link_->get_transport_impl()->config()->name().c_str(),
00820 this->remote_address_.get_host_addr(),
00821 this->remote_address_.get_port_number()));
00822 break;
00823
00824 default :
00825 ACE_ERROR((LM_ERROR,
00826 ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ")
00827 ACE_TEXT(" unknown state or it should not be in state=%d \n"), this->reconnect_state_));
00828 break;
00829 }
00830
00831
00832 this->_remove_ref();
00833
00834 return 0;
00835 }
00836
00837
00838
00839
00840
00841
00842
00843 void
00844 OpenDDS::DCPS::TcpConnection::transfer(TcpConnection* connection)
00845 {
00846 DBG_ENTRY_LVL("TcpConnection","transfer",6);
00847
00848 GuardType guard(this->reconnect_lock_);
00849
00850 bool notify_reconnect = false;
00851
00852 switch (this->reconnect_state_) {
00853 case INIT_STATE:
00854
00855
00856 break;
00857
00858 case LOST_STATE:
00859
00860
00861 case PASSIVE_TIMEOUT_CALLED_STATE:
00862
00863
00864
00865
00866
00867 notify_reconnect = true;
00868 break;
00869
00870 case PASSIVE_WAITING_STATE: {
00871 TcpReceiveStrategy* rs
00872 = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00873
00874
00875 if (rs->get_reactor()->cancel_timer(this) == -1) {
00876 ACE_ERROR((LM_ERROR,
00877 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00878 ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
00879
00880 } else
00881 passive_reconnect_timer_id_ = -1;
00882
00883 this->_remove_ref();
00884 notify_reconnect = true;
00885 }
00886 break;
00887
00888 default :
00889 ACE_ERROR((LM_ERROR,
00890 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00891 ACE_TEXT(" unknown state or it should not be in state=%d \n"), this->reconnect_state_));
00892 break;
00893 }
00894
00895
00896 if (this->is_connector_ || connection->is_connector_) {
00897 ACE_ERROR((LM_ERROR,
00898 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00899 ACE_TEXT(" should NOT be called by the connector side \n")));
00900 }
00901
00902 this->reconnect_task_.close(1);
00903 connection->receive_strategy_ = this->receive_strategy_;
00904 connection->send_strategy_ = this->send_strategy_;
00905 connection->remote_address_ = this->remote_address_;
00906 connection->local_address_ = this->local_address_;
00907 connection->tcp_config_ = this->tcp_config_;
00908 connection->link_ = this->link_;
00909
00910 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00911 "transfer(%C:%d->%C:%d) passive reconnected. new con %@ "
00912 " old con %@ \n",
00913 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00914 this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00915 connection, this));
00916
00917 if (notify_reconnect) {
00918 this->reconnect_state_ = RECONNECTED_STATE;
00919 this->link_->notify(DataLink::RECONNECTED);
00920 }
00921
00922 }
00923
00924
00925
00926
00927
00928 void
00929 OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout()
00930 {
00931 DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6);
00932 bool notify_lost = false;
00933 {
00934 GuardType guard(this->reconnect_lock_);
00935
00936 if (this->reconnect_state_ == INIT_STATE) {
00937 this->reconnect_state_ = LOST_STATE;
00938 notify_lost = true;
00939
00940 this->disconnect();
00941 }
00942 }
00943
00944 if (notify_lost) {
00945 this->link_->notify(DataLink::LOST);
00946 this->send_strategy_->terminate_send();
00947 }
00948
00949 }
00950
00951
00952
00953
00954
00955 void
00956 OpenDDS::DCPS::TcpConnection::relink_from_send(bool do_suspend)
00957 {
00958 DBG_ENTRY_LVL("TcpConnection","relink_from_send",6);
00959
00960 if (do_suspend && !this->send_strategy_.is_nil())
00961 this->send_strategy_->suspend_send();
00962
00963 ReconnectOpType op = DO_RECONNECT;
00964 this->reconnect_task_.add(op);
00965 }
00966
00967
00968
00969
00970 void
00971 OpenDDS::DCPS::TcpConnection::relink_from_recv(bool do_suspend)
00972 {
00973 DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6);
00974
00975 if (do_suspend && !this->send_strategy_.is_nil())
00976 this->send_strategy_->suspend_send();
00977 }
00978
00979 bool
00980 OpenDDS::DCPS::TcpConnection::tear_link()
00981 {
00982 DBG_ENTRY_LVL("TcpConnection","tear_link",6);
00983
00984 return this->link_->release_resources();
00985 }
00986
00987 void
00988 OpenDDS::DCPS::TcpConnection::shutdown()
00989 {
00990 DBG_ENTRY_LVL("TcpConnection","shutdown",6);
00991 this->shutdown_ = true;
00992
00993 this->reconnect_task_.close(1);
00994
00995 }