00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpConnection.h"
00010 #include "TcpTransport.h"
00011 #include "TcpInst.h"
00012 #include "TcpDataLink.h"
00013 #include "TcpReceiveStrategy.h"
00014 #include "TcpSendStrategy.h"
00015 #include "TcpReconnectTask.h"
00016 #include "dds/DCPS/transport/framework/DirectPriorityMapper.h"
00017 #include "dds/DCPS/transport/framework/PriorityKey.h"
00018
00019 #include "ace/os_include/netinet/os_tcp.h"
00020 #include "ace/OS_NS_arpa_inet.h"
00021 #include "ace/OS_NS_unistd.h"
00022 #include <sstream>
00023 #include <string>
00024
00025 #if !defined (__ACE_INLINE__)
00026 #include "TcpConnection.inl"
00027 #endif
00028
00029 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042 const ACE_Time_Value reconnect_delay(2);
00043
00044 OpenDDS::DCPS::TcpConnection::TcpConnection()
00045 : connected_(false)
00046 , is_connector_(false)
00047 , tcp_config_(0)
00048 , passive_reconnect_timer_id_(-1)
00049 , reconnect_state_(INIT_STATE)
00050 , last_reconnect_attempted_(ACE_Time_Value::zero)
00051 , transport_priority_(0)
00052 , shutdown_(false)
00053 , passive_setup_(false)
00054 , passive_setup_buffer_(sizeof(ACE_UINT32))
00055 , transport_during_setup_(0)
00056 , id_(0)
00057 , reconnect_thread_(0)
00058 {
00059 DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00060
00061 this->reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00062 }
00063
00064 OpenDDS::DCPS::TcpConnection::TcpConnection(const ACE_INET_Addr& remote_address,
00065 Priority priority,
00066 const TcpInst& config)
00067 : connected_(false)
00068 , is_connector_(true)
00069 , remote_address_(remote_address)
00070 , local_address_(config.local_address())
00071 , tcp_config_(&config)
00072 , passive_reconnect_timer_id_(-1)
00073 , reconnect_state_(INIT_STATE)
00074 , last_reconnect_attempted_(ACE_Time_Value::zero)
00075 , transport_priority_(priority)
00076 , shutdown_(false)
00077 , passive_setup_(false)
00078 , transport_during_setup_(0)
00079 , id_(0)
00080 , reconnect_thread_(0)
00081 {
00082 DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00083 this->reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00084
00085 }
00086 OpenDDS::DCPS::TcpConnection::~TcpConnection()
00087 {
00088 DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6);
00089 if (reconnect_thread_ &&
00090
00091
00092 !ACE_OS::thr_equal(ACE_OS::thr_self(), reconnect_thread_)
00093 ) {
00094 ACE_Thread_Manager::instance()->join(reconnect_thread_);
00095 }
00096 }
00097
00098 OpenDDS::DCPS::TcpSendStrategy_rch
00099 OpenDDS::DCPS::TcpConnection::send_strategy()
00100 {
00101 return this->link_->send_strategy();
00102 }
00103
00104 OpenDDS::DCPS::TcpReceiveStrategy_rch
00105 OpenDDS::DCPS::TcpConnection::receive_strategy()
00106 {
00107 return this->link_->receive_strategy();
00108 }
00109
00110 void
00111 OpenDDS::DCPS::TcpConnection::disconnect()
00112 {
00113 DBG_ENTRY_LVL("TcpConnection","disconnect",6);
00114 this->connected_ = false;
00115 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00116 if (receive_strategy) {
00117 receive_strategy->get_reactor()->remove_handler(this,
00118 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00119 }
00120
00121 if (this->link_) {
00122 this->link_->drop_pending_request_acks();
00123 }
00124
00125 this->peer().close();
00126
00127 }
00128
00129 int
00130 OpenDDS::DCPS::TcpConnection::open(void* arg)
00131 {
00132 DBG_ENTRY_LVL("TcpConnection","open",6);
00133
00134 if (is_connector_) {
00135
00136 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open active.\n"), 2);
00137
00138
00139 TcpTransport& transport = static_cast<TcpTransport&>(link_->impl());
00140
00141 const bool is_loop(local_address_ == remote_address_);
00142 const PriorityKey key(transport_priority_, remote_address_,
00143 is_loop, false );
00144
00145 int active_open_ = active_open();
00146
00147 int connect_tcp_datalink_ = transport.connect_tcp_datalink(*link_, rchandle_from(this));
00148
00149 if (active_open_ == -1 || connect_tcp_datalink_ == -1) {
00150
00151
00152
00153 transport.async_connect_failed(key);
00154
00155 return -1;
00156 }
00157
00158 return 0;
00159 }
00160
00161
00162
00163
00164 TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg);
00165
00166 if (acceptor == 0) {
00167
00168 ACE_ERROR_RETURN((LM_ERROR,
00169 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00170 ACE_TEXT("failed to cast void* arg to ")
00171 ACE_TEXT("TcpAcceptor* type.\n")),
00172 -1);
00173 }
00174
00175 TcpConnection_rch self(this, keep_count());
00176
00177
00178
00179 TcpTransport* transport = acceptor->transport();
00180
00181 if (!transport) {
00182
00183 ACE_ERROR_RETURN((LM_ERROR,
00184 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00185 ACE_TEXT("acceptor's transport is nil.\n")),
00186 -1);
00187 }
00188
00189
00190
00191 tcp_config_ = &acceptor->get_configuration();
00192 local_address_ = tcp_config_->local_address();
00193
00194 set_sock_options(tcp_config_);
00195
00196
00197
00198
00199
00200 passive_setup_ = true;
00201 transport_during_setup_ = transport;
00202 passive_setup_buffer_.size(sizeof(ACE_UINT32));
00203
00204 if (reactor()->register_handler(this, READ_MASK) == -1) {
00205 ACE_ERROR_RETURN((LM_ERROR,
00206 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00207 ACE_TEXT("unable to register with the reactor.%p\n"),
00208 ACE_TEXT("register_handler")),
00209 -1);
00210 }
00211
00212 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open passive handle=%d.\n",
00213 static_cast<int>(intptr_t(get_handle()))), 2);
00214
00215 return 0;
00216 }
00217
00218 int
00219 OpenDDS::DCPS::TcpConnection::handle_setup_input(ACE_HANDLE )
00220 {
00221 const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(),
00222 passive_setup_buffer_.space(),
00223 &ACE_Time_Value::zero);
00224
00225 if (ret < 0 && errno == ETIME) {
00226 return 0;
00227 }
00228
00229 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input %@ "
00230 "recv returned %b %m.\n", this, ret), 4);
00231
00232 if (ret <= 0) {
00233 return -1;
00234 }
00235
00236 passive_setup_buffer_.wr_ptr(ret);
00237
00238
00239
00240 ACE_UINT32 nlen = 0;
00241
00242 if (passive_setup_buffer_.length() >= sizeof(nlen)) {
00243
00244 ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen));
00245 passive_setup_buffer_.rd_ptr(sizeof(nlen));
00246 ACE_UINT32 hlen = ntohl(nlen);
00247 passive_setup_buffer_.size(hlen + 2 * sizeof(nlen));
00248
00249 ACE_UINT32 nprio = 0;
00250
00251 if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) {
00252
00253 const std::string bufstr(passive_setup_buffer_.rd_ptr());
00254 const NetworkAddress network_order_address(bufstr);
00255 network_order_address.to_addr(remote_address_);
00256
00257 ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio));
00258 transport_priority_ = ntohl(nprio);
00259
00260 passive_setup_buffer_.reset();
00261 passive_setup_ = false;
00262
00263 VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input "
00264 "%@ %C:%d->%C:%d, priority==%d, reconnect_state = %C\n", this,
00265 remote_address_.get_host_addr(), remote_address_.get_port_number(),
00266 local_address_.get_host_addr(), local_address_.get_port_number(),
00267 transport_priority_, reconnect_state_string().c_str()));
00268
00269
00270 if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) {
00271 VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input "
00272 "remove_handler failed %m.\n"));
00273 }
00274
00275 transport_during_setup_->passive_connection(remote_address_, rchandle_from(this));
00276 connected_ = true;
00277
00278 return 0;
00279 }
00280 }
00281
00282 passive_setup_buffer_.rd_ptr(passive_setup_buffer_.base());
00283
00284 return 0;
00285 }
00286
00287 int
00288 OpenDDS::DCPS::TcpConnection::handle_input(ACE_HANDLE fd)
00289 {
00290 DBG_ENTRY_LVL("TcpConnection","handle_input",6);
00291
00292 if (passive_setup_) {
00293 return handle_setup_input(fd);
00294 }
00295 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00296 if (!receive_strategy) {
00297 return 0;
00298 }
00299
00300 return receive_strategy->handle_dds_input(fd);
00301 }
00302
00303 int
00304 OpenDDS::DCPS::TcpConnection::handle_output(ACE_HANDLE)
00305 {
00306 DBG_ENTRY_LVL("TcpConnection","handle_output",6);
00307 TcpSendStrategy_rch send_strategy = this->send_strategy();
00308 if (send_strategy) {
00309 if (DCPS_debug_level > 9) {
00310 ACE_DEBUG((LM_DEBUG,
00311 ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ")
00312 ACE_TEXT("sending queued data.\n"),
00313 id_));
00314 }
00315
00316
00317 if (ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO
00318 != send_strategy->perform_work()) {
00319
00320
00321
00322
00323 send_strategy->schedule_output();
00324 }
00325 }
00326
00327 return 0;
00328 }
00329
00330 int
00331 OpenDDS::DCPS::TcpConnection::close(u_long)
00332 {
00333 DBG_ENTRY_LVL("TcpConnection","close",6);
00334
00335
00336
00337 TcpSendStrategy_rch send_strategy = this->send_strategy();
00338 if (send_strategy)
00339 send_strategy->terminate_send();
00340
00341 this->disconnect();
00342
00343 return 0;
00344 }
00345
00346 const std::string&
00347 OpenDDS::DCPS::TcpConnection::config_name() const
00348 {
00349 return this->link_->impl().config().name();
00350 }
00351
00352 int
00353 OpenDDS::DCPS::TcpConnection::handle_close(ACE_HANDLE, ACE_Reactor_Mask)
00354 {
00355 DBG_ENTRY_LVL("TcpConnection","handle_close",6);
00356
00357 if (DCPS_debug_level >= 1) {
00358 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C:%d.\n",
00359 this->config_name().c_str(),
00360 this->remote_address_.get_host_addr(),
00361 this->remote_address_.get_port_number()));
00362 }
00363
00364 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00365 bool graceful = receive_strategy && receive_strategy->gracefully_disconnected();
00366
00367 TcpSendStrategy_rch send_strategy = this->send_strategy();
00368 if (send_strategy) {
00369 if (graceful) {
00370 send_strategy->terminate_send();
00371 } else {
00372 send_strategy->suspend_send();
00373 }
00374 }
00375
00376 this->disconnect();
00377
00378 if (graceful) {
00379 this->link_->notify(DataLink::DISCONNECTED);
00380 } else {
00381 this->spawn_reconnect_thread();
00382 }
00383
00384 return 0;
00385 }
00386
00387 void
00388 OpenDDS::DCPS::TcpConnection::set_sock_options(const TcpInst* tcp_config)
00389 {
00390 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00391 int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00392 int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00393
00394 # if !defined (ACE_LACKS_SOCKET_BUFSIZ)
00395
00396
00397
00398 int opt = (tcp_config->enable_nagle_algorithm_ == false);
00399
00400 if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
00401 ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n"));
00402 }
00403
00404 if (this->peer().set_option(SOL_SOCKET,
00405 SO_SNDBUF,
00406 (void *) &snd_size,
00407 sizeof(snd_size)) == -1
00408 && errno != ENOTSUP) {
00409 ACE_ERROR((LM_ERROR,
00410 "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
00411 snd_size));
00412 return;
00413 }
00414
00415 if (this->peer().set_option(SOL_SOCKET,
00416 SO_RCVBUF,
00417 (void *) &rcv_size,
00418 sizeof(int)) == -1
00419 && errno != ENOTSUP) {
00420 ACE_ERROR((LM_ERROR,
00421 "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m \n",
00422 rcv_size));
00423 return;
00424 }
00425
00426 # else
00427 ACE_UNUSED_ARG(tcp_config);
00428 ACE_UNUSED_ARG(snd_size);
00429 ACE_UNUSED_ARG(rcv_size);
00430 # endif
00431
00432 #else
00433 ACE_UNUSED_ARG(tcp_config);
00434 #endif
00435 }
00436
00437 int
00438 OpenDDS::DCPS::TcpConnection::active_establishment(bool initiate_connect)
00439 {
00440 DBG_ENTRY_LVL("TcpConnection","active_establishment",6);
00441
00442
00443
00444 if (this->is_connector_ == false) {
00445 ACE_ERROR_RETURN((LM_ERROR,
00446 "(%P|%t) ERROR: Failed to connect because it's previously an acceptor.\n"),
00447 -1);
00448 }
00449
00450 if (this->shutdown_)
00451 return -1;
00452
00453
00454 ACE_SOCK_Connector connector;
00455
00456 if (initiate_connect && connector.connect(this->peer(), remote_address_) != 0) {
00457
00458 ACE_DEBUG((LM_DEBUG,
00459 ACE_TEXT("(%P|%t) DBG: Failed to connect. this->shutdown_=%d %p\n%C"),
00460 int(this->shutdown_), ACE_TEXT("connect"), this->tcp_config_->dump_to_str().c_str()));
00461 return -1;
00462
00463 } else {
00464 this->connected_ = true;
00465 const std::string remote_host = this->remote_address_.get_host_addr();
00466 VDBG((LM_DEBUG, "(%P|%t) DBG: active_establishment(%C:%d->%C:%d)\n",
00467 this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00468 remote_host.c_str(), this->remote_address_.get_port_number()));
00469 }
00470
00471
00472 DirectPriorityMapper mapper(this->transport_priority_);
00473 this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer());
00474
00475 set_sock_options(tcp_config_);
00476
00477
00478
00479
00480
00481
00482 std::string address = tcp_config_->get_public_address();
00483 ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1;
00484
00485 ACE_UINT32 nlen = htonl(len);
00486
00487 if (this->peer().send_n(&nlen,
00488 sizeof(ACE_UINT32)) == -1) {
00489
00490 ACE_ERROR_RETURN((LM_ERROR,
00491 "(%P|%t) ERROR: Unable to send address string length to "
00492 "the passive side to complete the active connection "
00493 "establishment.\n"),
00494 -1);
00495 }
00496
00497 if (this->peer().send_n(address.c_str(), len) == -1) {
00498
00499 ACE_ERROR_RETURN((LM_ERROR,
00500 "(%P|%t) ERROR: Unable to send our address to "
00501 "the passive side to complete the active connection "
00502 "establishment.\n"),
00503 -1);
00504 }
00505
00506 ACE_UINT32 npriority = htonl(this->transport_priority_);
00507
00508 if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) {
00509
00510 ACE_ERROR_RETURN((LM_ERROR,
00511 "(%P|%t) ERROR: Unable to send publication priority to "
00512 "the passive side to complete the active connection "
00513 "establishment.\n"),
00514 -1);
00515 }
00516
00517 return 0;
00518 }
00519
00520
00521
00522
00523
00524
00525
00526
00527 int
00528 OpenDDS::DCPS::TcpConnection::reconnect(bool on_new_association)
00529 {
00530 DBG_ENTRY_LVL("TcpConnection","reconnect",6);
00531 if (DCPS_debug_level >= 1) {
00532 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect initiated on transport: %C to %C:%d.\n",
00533 this->config_name().c_str(),
00534 this->remote_address_.get_host_addr(),
00535 this->remote_address_.get_port_number()));
00536 }
00537
00538 if (on_new_association) {
00539 if (DCPS_debug_level >= 1) {
00540 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect on new association\n"));
00541 }
00542 return this->active_reconnect_on_new_association();
00543 }
00544
00545
00546
00547
00548 else if (!this->link_->is_release_pending()) {
00549 if (DCPS_debug_level >= 1) {
00550 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect release not currently pending\n"));
00551 }
00552
00553 if (this->is_connector_ && this->active_reconnect_i() == -1) {
00554 if (DCPS_debug_level >= 1) {
00555 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is connector but active_reconnect_i failed\n"));
00556 }
00557 return -1;
00558 }
00559
00560
00561 else if (!this->is_connector_ && this->passive_reconnect_i() == -1) {
00562 if (DCPS_debug_level >= 1) {
00563 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is acceptor but passive_reconnect_i failed\n"));
00564 }
00565 return -1;
00566 }
00567
00568 }
00569 if (DCPS_debug_level >= 1) {
00570 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect returning 0\n"));
00571 }
00572 return 0;
00573 }
00574
00575 int
00576 OpenDDS::DCPS::TcpConnection::active_open()
00577 {
00578 DBG_ENTRY_LVL("TcpConnection","active_open",6);
00579
00580 GuardType guard(reconnect_lock_);
00581 if (this->shutdown_)
00582 return -1;
00583
00584 if (connected_.value()) {
00585 return 0;
00586 }
00587
00588 return active_establishment(false );
00589 }
00590
00591 int
00592 OpenDDS::DCPS::TcpConnection::active_reconnect_on_new_association()
00593 {
00594 DBG_ENTRY_LVL("TcpConnection","active_reconnect_on_new_association",6);
00595 GuardType guard(this->reconnect_lock_);
00596 if (this->shutdown_)
00597 return -1;
00598
00599 if (this->connected_ == true)
00600 return 0;
00601
00602 else if (this->active_establishment() == 0) {
00603 this->reconnect_state_ = INIT_STATE;
00604 TcpSendStrategy_rch send_strategy = this->send_strategy();
00605 if (send_strategy)
00606 send_strategy->resume_send();
00607 return 0;
00608 }
00609
00610 return -1;
00611 }
00612
00613
00614
00615
00616 int
00617 OpenDDS::DCPS::TcpConnection::passive_reconnect_i()
00618 {
00619 DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6);
00620 GuardType guard(this->reconnect_lock_);
00621 if (this->shutdown_)
00622 return -1;
00623
00624
00625 if (this->reconnect_state_ == INIT_STATE) {
00626
00627 this->connected_ = false;
00628
00629 if (this->tcp_config_->passive_reconnect_duration_ == 0)
00630 return -1;
00631
00632 ACE_Time_Value timeout(this->tcp_config_->passive_reconnect_duration_/1000,
00633 this->tcp_config_->passive_reconnect_duration_%1000 * 1000);
00634 this->reconnect_state_ = PASSIVE_WAITING_STATE;
00635 this->link_->notify(DataLink::DISCONNECTED);
00636
00637
00638
00639 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00640 if (this->receive_strategy()) {
00641
00642
00643 this->passive_reconnect_timer_id_ = receive_strategy->get_reactor()->schedule_timer(this, 0, timeout);
00644
00645 if (this->passive_reconnect_timer_id_ == -1) {
00646 ACE_ERROR_RETURN((LM_ERROR,
00647 ACE_TEXT("(%P|%t) ERROR: TcpConnection::passive_reconnect_i")
00648 ACE_TEXT(", %p.\n"), ACE_TEXT("schedule_timer")),
00649 -1);
00650 }
00651 }
00652 }
00653
00654 return 0;
00655 }
00656
00657
00658
00659
00660
00661
00662
00663
00664
00665
00666
00667 int
00668 OpenDDS::DCPS::TcpConnection::active_reconnect_i()
00669 {
00670 DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6);
00671
00672 GuardType guard(this->reconnect_lock_);
00673 if (this->shutdown_)
00674 return -1;
00675
00676 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00677 "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %C\n",
00678 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00679 this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00680 this->reconnect_state_string().c_str()));
00681 if (DCPS_debug_level >= 1) {
00682 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::"
00683 "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %C\n",
00684 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00685 this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00686 this->reconnect_state_string().c_str()));
00687 }
00688
00689
00690
00691 if (ACE_OS::gettimeofday() - this->last_reconnect_attempted_ > reconnect_delay
00692 && this->reconnect_state_ == RECONNECTED_STATE) {
00693 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00694 "We are in RECONNECTED_STATE and now flip reconnect state to INIT_STATE.\n"));
00695 this->reconnect_state_ = INIT_STATE;
00696 }
00697
00698 if (this->reconnect_state_ == INIT_STATE) {
00699
00700 TcpSendStrategy_rch send_strategy = this->send_strategy();
00701 if (send_strategy)
00702 send_strategy->suspend_send();
00703 this->reconnect_lock_.release();
00704 this->disconnect();
00705 this->reconnect_lock_.acquire();
00706
00707 if (this->shutdown_)
00708 return -1;
00709
00710 if (this->tcp_config_->conn_retry_attempts_ > 0) {
00711 this->link_->notify(DataLink::DISCONNECTED);
00712 }
00713
00714
00715
00716
00717
00718 double retry_delay_msec = this->tcp_config_->conn_retry_initial_delay_;
00719 int ret = -1;
00720
00721 for (int i = 0; i < this->tcp_config_->conn_retry_attempts_; ++i) {
00722 ret = this->active_establishment();
00723
00724 if (this->shutdown_)
00725 break;
00726
00727 if (ret == -1) {
00728 ACE_Time_Value delay_tv(((int)retry_delay_msec)/1000,
00729 ((int)retry_delay_msec)%1000*1000);
00730 ACE_OS::sleep(delay_tv);
00731 retry_delay_msec *= this->tcp_config_->conn_retry_backoff_multiplier_;
00732
00733 } else {
00734 break;
00735 }
00736 }
00737
00738 if (ret == -1) {
00739 if (this->tcp_config_->conn_retry_attempts_ > 0) {
00740 ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00741 this->config_name().c_str(),
00742 this->remote_address_.get_host_addr(),
00743 this->remote_address_.get_port_number()));
00744
00745 } else {
00746 ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C:%d.\n",
00747 this->config_name().c_str(),
00748 this->remote_address_.get_host_addr(),
00749 this->remote_address_.get_port_number()));
00750 }
00751
00752 this->reconnect_state_ = LOST_STATE;
00753 this->link_->notify(DataLink::LOST);
00754 if (send_strategy)
00755 send_strategy->terminate_send();
00756
00757 } else {
00758 ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C:%d.\n",
00759 this->config_name().c_str(),
00760 this->remote_address_.get_host_addr(),
00761 this->remote_address_.get_port_number()));
00762 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00763 if (receive_strategy->get_reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) {
00764 ACE_ERROR_RETURN((LM_ERROR,
00765 "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_i() can't register "
00766 "with reactor %X %p\n", this, ACE_TEXT("register_handler")),
00767 -1);
00768 }
00769 this->reconnect_state_ = RECONNECTED_STATE;
00770 this->link_->notify(DataLink::RECONNECTED);
00771 send_strategy->resume_send();
00772 }
00773
00774 this->last_reconnect_attempted_ = ACE_OS::gettimeofday();
00775 }
00776
00777 return this->reconnect_state_ == LOST_STATE ? -1 : 0;
00778 }
00779
00780
00781
00782 int
00783 OpenDDS::DCPS::TcpConnection::handle_timeout(const ACE_Time_Value &,
00784 const void *)
00785 {
00786 DBG_ENTRY_LVL("TcpConnection","handle_timeout",6);
00787
00788 GuardType guard(this->reconnect_lock_);
00789
00790 switch (this->reconnect_state_) {
00791 case PASSIVE_WAITING_STATE: {
00792 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00793 this->config_name().c_str(),
00794 this->remote_address_.get_host_addr(),
00795 this->remote_address_.get_port_number()));
00796
00797 this->reconnect_state_ = PASSIVE_TIMEOUT_CALLED_STATE;
00798
00799
00800 this->link_->notify(DataLink::LOST);
00801
00802
00803
00804 TcpSendStrategy_rch send_strategy = this->send_strategy();
00805 if (send_strategy)
00806 send_strategy->terminate_send();
00807
00808 this->reconnect_state_ = LOST_STATE;
00809
00810 this->tear_link();
00811
00812 }
00813 break;
00814
00815 case RECONNECTED_STATE:
00816
00817 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C:%d.\n",
00818 this->config_name().c_str(),
00819 this->remote_address_.get_host_addr(),
00820 this->remote_address_.get_port_number()));
00821 break;
00822
00823 default :
00824 ACE_ERROR((LM_ERROR,
00825 ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ")
00826 ACE_TEXT(" unknown state or it should not be in state=%d \n"),
00827 reconnect_state_));
00828 break;
00829 }
00830
00831 return 0;
00832 }
00833
00834
00835
00836
00837
00838
00839
00840 void
00841 OpenDDS::DCPS::TcpConnection::transfer(TcpConnection* connection)
00842 {
00843 DBG_ENTRY_LVL("TcpConnection","transfer",6);
00844
00845 GuardType guard(this->reconnect_lock_);
00846
00847 bool notify_reconnect = false;
00848
00849 switch (this->reconnect_state_) {
00850 case INIT_STATE:
00851
00852
00853 break;
00854
00855 case LOST_STATE:
00856
00857
00858 case PASSIVE_TIMEOUT_CALLED_STATE:
00859
00860
00861
00862
00863
00864 notify_reconnect = true;
00865 break;
00866
00867 case PASSIVE_WAITING_STATE: {
00868
00869 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00870 if (receive_strategy->get_reactor()->cancel_timer(this) == -1) {
00871 ACE_ERROR((LM_ERROR,
00872 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00873 ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
00874
00875 } else
00876 passive_reconnect_timer_id_ = -1;
00877
00878 notify_reconnect = true;
00879 }
00880 break;
00881
00882 default :
00883 ACE_ERROR((LM_ERROR,
00884 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00885 ACE_TEXT(" unknown state or it should not be in state=%i \n"),
00886 reconnect_state_));
00887 break;
00888 }
00889
00890
00891 if (this->is_connector_ || connection->is_connector_) {
00892 ACE_ERROR((LM_ERROR,
00893 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00894 ACE_TEXT(" should NOT be called by the connector side \n")));
00895 }
00896
00897 if (reconnect_thread_) {
00898 ACE_Thread_Manager::instance()->join(reconnect_thread_);
00899 reconnect_thread_ = 0;
00900 }
00901
00902
00903 connection->remote_address_ = this->remote_address_;
00904 connection->local_address_ = this->local_address_;
00905 connection->tcp_config_ = this->tcp_config_;
00906 connection->link_ = this->link_;
00907
00908 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00909 "transfer(%C:%d->%C:%d) passive reconnected. new con %@ "
00910 " old con %@ \n",
00911 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00912 this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00913 connection, this));
00914
00915 if (notify_reconnect) {
00916 this->reconnect_state_ = RECONNECTED_STATE;
00917 this->link_->notify(DataLink::RECONNECTED);
00918 }
00919
00920 }
00921
00922
00923
00924
00925
00926 void
00927 OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout()
00928 {
00929 DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6);
00930 bool notify_lost = false;
00931 {
00932 GuardType guard(this->reconnect_lock_);
00933
00934 if (this->reconnect_state_ == INIT_STATE) {
00935 this->reconnect_state_ = LOST_STATE;
00936 notify_lost = true;
00937
00938 }
00939 }
00940
00941 if (notify_lost) {
00942 this->disconnect();
00943 this->link_->notify(DataLink::LOST);
00944
00945 TcpSendStrategy_rch send_strategy = this->send_strategy();
00946 if (send_strategy)
00947 send_strategy->terminate_send();
00948 }
00949
00950 }
00951
00952
00953
00954
00955
00956 void
00957 OpenDDS::DCPS::TcpConnection::relink_from_send(bool do_suspend)
00958 {
00959 DBG_ENTRY_LVL("TcpConnection","relink_from_send",6);
00960
00961 TcpSendStrategy_rch send_strategy = this->send_strategy();
00962 if (do_suspend && send_strategy)
00963 send_strategy->suspend_send();
00964
00965 this->spawn_reconnect_thread();
00966 }
00967
00968
00969
00970
00971 void
00972 OpenDDS::DCPS::TcpConnection::relink_from_recv(bool do_suspend)
00973 {
00974 DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6);
00975 TcpSendStrategy_rch send_strategy = this->send_strategy();
00976 if (do_suspend && send_strategy)
00977 send_strategy->suspend_send();
00978 }
00979
00980 bool
00981 OpenDDS::DCPS::TcpConnection::tear_link()
00982 {
00983 DBG_ENTRY_LVL("TcpConnection","tear_link",6);
00984
00985 return this->link_->release_resources();
00986 }
00987
00988 void
00989 OpenDDS::DCPS::TcpConnection::shutdown()
00990 {
00991 DBG_ENTRY_LVL("TcpConnection","shutdown",6);
00992 GuardType guard(this->reconnect_lock_);
00993 this->shutdown_ = true;
00994 ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>::shutdown();
00995
00996 }
00997
00998 ACE_Event_Handler::Reference_Count
00999 OpenDDS::DCPS::TcpConnection::add_reference()
01000 {
01001 RcObject::_add_ref();
01002 return 1;
01003 }
01004
01005 ACE_Event_Handler::Reference_Count
01006 OpenDDS::DCPS::TcpConnection::remove_reference()
01007 {
01008 RcObject::_remove_ref();
01009 return 1;
01010 }
01011
01012 void
01013 OpenDDS::DCPS::TcpConnection::spawn_reconnect_thread()
01014 {
01015 DBG_ENTRY_LVL("TcpConnection","spawn_reconnect_thread",6);
01016 GuardType guard(this->reconnect_lock_);
01017 if (!shutdown_) {
01018
01019 TransportInst& transport_config = this->link_->impl().config();
01020 transport_config._add_ref();
01021
01022 this->_add_ref();
01023 if (ACE_Thread_Manager::instance()->spawn(&reconnect_thread_fun,
01024 this,
01025 THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED,
01026 &reconnect_thread_) == -1){
01027
01028 this->_remove_ref();
01029 transport_config._remove_ref();
01030 }
01031 }
01032 }
01033
01034 ACE_THR_FUNC_RETURN
01035 OpenDDS::DCPS::TcpConnection::reconnect_thread_fun(void* arg)
01036 {
01037 DBG_ENTRY_LVL("TcpConnection","reconnect_thread_fun",6);
01038
01039
01040
01041
01042 sigset_t set;
01043 ACE_OS::sigfillset(&set);
01044 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
01045
01046
01047 RcHandle<TransportInst> transport_config;
01048 TcpConnection_rch connection(static_cast<TcpConnection*>(arg), keep_count());
01049 transport_config = RcHandle<TransportInst>(&connection->link_->impl().config(), keep_count());
01050
01051 if (connection->reconnect() == -1) {
01052 connection->tear_link();
01053 }
01054
01055 return 0;
01056 }
01057
01058 OPENDDS_STRING
01059 OpenDDS::DCPS::TcpConnection::reconnect_state_string() const
01060 {
01061 switch (reconnect_state_) {
01062 case INIT_STATE:
01063 return "INIT_STATE";
01064 case LOST_STATE:
01065 return "LOST_STATE";
01066 case RECONNECTED_STATE:
01067 return "RECONENCTED_STATE";
01068 case PASSIVE_WAITING_STATE:
01069 return "PASSIVE_WAITING_STATE";
01070 case PASSIVE_TIMEOUT_CALLED_STATE:
01071 return "PASSIVE_TIMEOUT_CALLED_STATE";
01072 default:
01073 ACE_ERROR((LM_ERROR, ACE_TEXT(
01074 "OpenDDS::DCPS::TcpConnection::reconnect_state_string(): "
01075 "%d is either completely invalid or at least not defined in this function.\n"),
01076 reconnect_state_
01077 ));
01078 return OPENDDS_STRING("(Unknown Reconnect State: ")
01079 + to_dds_string(reconnect_state_) + ")";
01080 }
01081 }
01082
01083 OPENDDS_END_VERSIONED_NAMESPACE_DECL