#include <TcpConnection.h>
Public Types | |
enum | ReconnectState { INIT_STATE, LOST_STATE, RECONNECTED_STATE, PASSIVE_WAITING_STATE, PASSIVE_TIMEOUT_CALLED_STATE } |
States are used during reconnecting. More... | |
Public Member Functions | |
TcpConnection () | |
Passive side constructor (acceptor). | |
TcpConnection (const ACE_INET_Addr &remote_address, Priority priority, const TcpInst &config) | |
Active side constructor (connector). | |
virtual | ~TcpConnection () |
std::size_t & | id () |
int | active_open () |
void | disconnect () |
virtual int | open (void *arg) |
TcpSendStrategy_rch | send_strategy () |
TcpReceiveStrategy_rch | receive_strategy () |
virtual int | handle_input (ACE_HANDLE) |
We pass this "event" along to the receive_strategy. | |
virtual int | handle_output (ACE_HANDLE) |
Handle back pressure when sending. | |
virtual int | close (u_long) |
virtual int | handle_close (ACE_HANDLE, ACE_Reactor_Mask) |
void | set_sock_options (const TcpInst *tcp_config) |
int | reconnect (bool on_new_association=false) |
bool | is_connector () const |
bool | is_connected () const |
Return true if connection is connected. | |
void | transfer (TcpConnection *connection) |
int | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
void | set_datalink (const TcpDataLink_rch &link) |
void | notify_lost_on_backpressure_timeout () |
ACE_INET_Addr | get_remote_address () |
void | relink_from_send (bool do_suspend) |
Reconnect initiated by send strategy. | |
void | relink_from_recv (bool do_suspend) |
Reconnect initiated by receive strategy. | |
bool | tear_link () |
void | shutdown () |
Priority & | transport_priority () |
Access TRANSPORT_PRIORITY.value policy value if set. | |
Priority | transport_priority () const |
virtual ACE_Event_Handler::Reference_Count | add_reference () |
virtual ACE_Event_Handler::Reference_Count | remove_reference () |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
Private Member Functions | |
int | active_establishment (bool initiate_connect=true) |
int | active_reconnect_i () |
int | passive_reconnect_i () |
int | active_reconnect_on_new_association () |
int | handle_setup_input (ACE_HANDLE h) |
const std::string & | config_name () const |
void | spawn_reconnect_thread () |
OPENDDS_STRING | reconnect_state_string () const |
Get name of the current reconnect state as a string. | |
Static Private Member Functions | |
static ACE_THR_FUNC_RETURN | reconnect_thread_fun (void *conn) |
Private Attributes | |
LockType | reconnect_lock_ |
ACE_Atomic_Op< ACE_SYNCH_MUTEX, bool > | connected_ |
bool | is_connector_ |
Flag indicate this connection object is the connector or acceptor. | |
ACE_INET_Addr | remote_address_ |
Remote address. | |
ACE_INET_Addr | local_address_ |
Local address. | |
const TcpInst * | tcp_config_ |
The configuration used by this connection. | |
TcpDataLink_rch | link_ |
Datalink object which is needed for connection lost callback. | |
int | passive_reconnect_timer_id_ |
ReconnectState | reconnect_state_ |
The state indicates each step of the reconnecting. | |
ACE_Time_Value | last_reconnect_attempted_ |
Last time the connection is re-established. | |
Priority | transport_priority_ |
TRANSPORT_PRIORITY.value policy value. | |
bool | shutdown_ |
shutdown flag | |
bool | passive_setup_ |
ACE_Message_Block | passive_setup_buffer_ |
TcpTransport * | transport_during_setup_ |
std::size_t | id_ |
Small unique identifying value. | |
ACE_thread_t | reconnect_thread_ |
Definition at line 36 of file TcpConnection.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::TcpConnection::GuardType [private] |
Definition at line 159 of file TcpConnection.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TcpConnection::LockType [private] |
Definition at line 158 of file TcpConnection.h.
States are used during reconnecting.
INIT_STATE | |
LOST_STATE | |
RECONNECTED_STATE | |
PASSIVE_WAITING_STATE | |
PASSIVE_TIMEOUT_CALLED_STATE |
Definition at line 42 of file TcpConnection.h.
00042 { 00043 INIT_STATE, 00044 LOST_STATE, 00045 RECONNECTED_STATE, 00046 PASSIVE_WAITING_STATE, 00047 PASSIVE_TIMEOUT_CALLED_STATE 00048 };
OpenDDS::DCPS::TcpConnection::TcpConnection | ( | ) |
Passive side constructor (acceptor).
Definition at line 44 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::reference_counting_policy(), and ACE_Event_Handler::Reference_Counting_Policy::value().
00045 : connected_(false) 00046 , is_connector_(false) 00047 , tcp_config_(0) 00048 , passive_reconnect_timer_id_(-1) 00049 , reconnect_state_(INIT_STATE) 00050 , last_reconnect_attempted_(ACE_Time_Value::zero) 00051 , transport_priority_(0) // TRANSPORT_PRIORITY.value default value - 0. 00052 , shutdown_(false) 00053 , passive_setup_(false) 00054 , passive_setup_buffer_(sizeof(ACE_UINT32)) 00055 , transport_during_setup_(0) 00056 , id_(0) 00057 , reconnect_thread_(0) 00058 { 00059 DBG_ENTRY_LVL("TcpConnection","TcpConnection",6); 00060 00061 this->reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED); 00062 }
OpenDDS::DCPS::TcpConnection::TcpConnection | ( | const ACE_INET_Addr & | remote_address, | |
Priority | priority, | |||
const TcpInst & | config | |||
) |
Active side constructor (connector).
Definition at line 64 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::reference_counting_policy(), and ACE_Event_Handler::Reference_Counting_Policy::value().
00067 : connected_(false) 00068 , is_connector_(true) 00069 , remote_address_(remote_address) 00070 , local_address_(config.local_address()) 00071 , tcp_config_(&config) 00072 , passive_reconnect_timer_id_(-1) 00073 , reconnect_state_(INIT_STATE) 00074 , last_reconnect_attempted_(ACE_Time_Value::zero) 00075 , transport_priority_(priority) 00076 , shutdown_(false) 00077 , passive_setup_(false) 00078 , transport_during_setup_(0) 00079 , id_(0) 00080 , reconnect_thread_(0) 00081 { 00082 DBG_ENTRY_LVL("TcpConnection","TcpConnection",6); 00083 this->reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED); 00084 00085 }
OpenDDS::DCPS::TcpConnection::~TcpConnection | ( | ) | [virtual] |
Definition at line 86 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, ACE_Thread_Manager::instance(), ACE_Thread_Manager::join(), reconnect_thread_, ACE_OS::thr_equal(), and ACE_OS::thr_self().
00087 { 00088 DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6); 00089 if (reconnect_thread_ && 00090 // This is for Windows, where join doesn't check if the thread is the same 00091 // and the thread will hang itself if it tries to join itself. 00092 !ACE_OS::thr_equal(ACE_OS::thr_self(), reconnect_thread_) 00093 ) { 00094 ACE_Thread_Manager::instance()->join(reconnect_thread_); 00095 } 00096 }
int OpenDDS::DCPS::TcpConnection::active_establishment | ( | bool | initiate_connect = true |
) | [private] |
Attempt an active connection establishment to the remote address. The local address is sent to the remote (passive) side to identify ourselves to the remote side. Note this method is not thread protected. The caller need acquire the reconnect_lock_ before calling this function.
Definition at line 438 of file TcpConnection.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DirectPriorityMapper::codepoint(), ACE_SOCK_Connector::connect(), connected_, DBG_ENTRY_LVL, OpenDDS::DCPS::TcpInst::dump_to_str(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), OpenDDS::DCPS::TcpInst::get_public_address(), is_connector_, len, link_, LM_DEBUG, LM_ERROR, local_address_, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), remote_address_, send_n(), set_sock_options(), shutdown_, tcp_config_, transport_priority_, and VDBG.
Referenced by active_open(), active_reconnect_i(), and active_reconnect_on_new_association().
00439 { 00440 DBG_ENTRY_LVL("TcpConnection","active_establishment",6); 00441 00442 // Safety check - This should not happen since is_connector_ defaults to 00443 // true and the role in a connection connector is not changed when reconnecting. 00444 if (this->is_connector_ == false) { 00445 ACE_ERROR_RETURN((LM_ERROR, 00446 "(%P|%t) ERROR: Failed to connect because it's previously an acceptor.\n"), 00447 -1); 00448 } 00449 00450 if (this->shutdown_) 00451 return -1; 00452 00453 // Now use a connector object to establish the connection. 00454 ACE_SOCK_Connector connector; 00455 00456 if (initiate_connect && connector.connect(this->peer(), remote_address_) != 0) { 00457 00458 ACE_DEBUG((LM_DEBUG, 00459 ACE_TEXT("(%P|%t) DBG: Failed to connect. this->shutdown_=%d %p\n%C"), 00460 int(this->shutdown_), ACE_TEXT("connect"), this->tcp_config_->dump_to_str().c_str())); 00461 return -1; 00462 00463 } else { 00464 this->connected_ = true; 00465 const std::string remote_host = this->remote_address_.get_host_addr(); 00466 VDBG((LM_DEBUG, "(%P|%t) DBG: active_establishment(%C:%d->%C:%d)\n", 00467 this->local_address_.get_host_addr(), this->local_address_.get_port_number(), 00468 remote_host.c_str(), this->remote_address_.get_port_number())); 00469 } 00470 00471 // Set the DiffServ codepoint according to the priority value. 00472 DirectPriorityMapper mapper(this->transport_priority_); 00473 this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer()); 00474 00475 set_sock_options(tcp_config_); 00476 00477 // In order to complete the connection establishment from the active 00478 // side, we need to tell the remote side about our public address. 00479 // It will use that as an "identifier" of sorts. To the other 00480 // (passive) side, our local_address that we send here will be known 00481 // as the remote_address. 00482 std::string address = tcp_config_->get_public_address(); 00483 ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1; 00484 00485 ACE_UINT32 nlen = htonl(len); 00486 00487 if (this->peer().send_n(&nlen, 00488 sizeof(ACE_UINT32)) == -1) { 00489 // TBD later - Anything we are supposed to do to close the connection. 00490 ACE_ERROR_RETURN((LM_ERROR, 00491 "(%P|%t) ERROR: Unable to send address string length to " 00492 "the passive side to complete the active connection " 00493 "establishment.\n"), 00494 -1); 00495 } 00496 00497 if (this->peer().send_n(address.c_str(), len) == -1) { 00498 // TBD later - Anything we are supposed to do to close the connection. 00499 ACE_ERROR_RETURN((LM_ERROR, 00500 "(%P|%t) ERROR: Unable to send our address to " 00501 "the passive side to complete the active connection " 00502 "establishment.\n"), 00503 -1); 00504 } 00505 00506 ACE_UINT32 npriority = htonl(this->transport_priority_); 00507 00508 if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) { 00509 // TBD later - Anything we are supposed to do to close the connection. 00510 ACE_ERROR_RETURN((LM_ERROR, 00511 "(%P|%t) ERROR: Unable to send publication priority to " 00512 "the passive side to complete the active connection " 00513 "establishment.\n"), 00514 -1); 00515 } 00516 00517 return 0; 00518 }
int OpenDDS::DCPS::TcpConnection::active_open | ( | ) |
Protocol setup (handshake) on the active side. The local address is sent to the remote (passive) side to identify ourselves to the remote side.
Definition at line 576 of file TcpConnection.cpp.
References active_establishment(), connected_, DBG_ENTRY_LVL, reconnect_lock_, shutdown_, and ACE_Atomic_Op< ACE_LOCK, TYPE >::value().
Referenced by open().
00577 { 00578 DBG_ENTRY_LVL("TcpConnection","active_open",6); 00579 00580 GuardType guard(reconnect_lock_); 00581 if (this->shutdown_) 00582 return -1; 00583 00584 if (connected_.value()) { 00585 return 0; 00586 } 00587 00588 return active_establishment(false /* !initiate_connect */); 00589 }
int OpenDDS::DCPS::TcpConnection::active_reconnect_i | ( | ) | [private] |
Definition at line 668 of file TcpConnection.cpp.
References ACE_TEXT(), active_establishment(), config_name(), OpenDDS::DCPS::TcpInst::conn_retry_attempts_, OpenDDS::DCPS::TcpInst::conn_retry_backoff_multiplier_, OpenDDS::DCPS::TcpInst::conn_retry_initial_delay_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), ACE_Task< ACE_NULL_SYNCH >::gettimeofday(), ACE_OS::gettimeofday(), INIT_STATE, last_reconnect_attempted_, link_, LM_DEBUG, LM_ERROR, local_address_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, ACE_Event_Handler::READ_MASK, receive_strategy(), reconnect_delay(), reconnect_lock_, reconnect_state_, reconnect_state_string(), OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, send_strategy(), shutdown_, ACE_OS::sleep(), tcp_config_, and VDBG.
Referenced by reconnect().
00669 { 00670 DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6); 00671 00672 GuardType guard(this->reconnect_lock_); 00673 if (this->shutdown_) 00674 return -1; 00675 00676 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00677 "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %C\n", 00678 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(), 00679 this->local_address_.get_host_addr(), this->local_address_.get_port_number(), 00680 this->reconnect_state_string().c_str())); 00681 if (DCPS_debug_level >= 1) { 00682 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::" 00683 "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %C\n", 00684 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(), 00685 this->local_address_.get_host_addr(), this->local_address_.get_port_number(), 00686 this->reconnect_state_string().c_str())); 00687 } 00688 // We need reset the state to INIT_STATE if we are previously reconnected. 00689 // This would allow re-establishing connection after the re-established 00690 // connection lost again. 00691 if (ACE_OS::gettimeofday() - this->last_reconnect_attempted_ > reconnect_delay 00692 && this->reconnect_state_ == RECONNECTED_STATE) { 00693 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00694 "We are in RECONNECTED_STATE and now flip reconnect state to INIT_STATE.\n")); 00695 this->reconnect_state_ = INIT_STATE; 00696 } 00697 00698 if (this->reconnect_state_ == INIT_STATE) { 00699 // Suspend send once. 00700 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00701 if (send_strategy) 00702 send_strategy->suspend_send(); 00703 this->reconnect_lock_.release(); 00704 this->disconnect(); 00705 this->reconnect_lock_.acquire(); 00706 00707 if (this->shutdown_) 00708 return -1; 00709 00710 if (this->tcp_config_->conn_retry_attempts_ > 0) { 00711 this->link_->notify(DataLink::DISCONNECTED); 00712 } 00713 00714 // else the conn_retry_attempts is 0 then we do not need this extra 00715 // notify_disconnected() since the user application should get the 00716 // notify_lost() without delay. 00717 00718 double retry_delay_msec = this->tcp_config_->conn_retry_initial_delay_; 00719 int ret = -1; 00720 00721 for (int i = 0; i < this->tcp_config_->conn_retry_attempts_; ++i) { 00722 ret = this->active_establishment(); 00723 00724 if (this->shutdown_) 00725 break; 00726 00727 if (ret == -1) { 00728 ACE_Time_Value delay_tv(((int)retry_delay_msec)/1000, 00729 ((int)retry_delay_msec)%1000*1000); 00730 ACE_OS::sleep(delay_tv); 00731 retry_delay_msec *= this->tcp_config_->conn_retry_backoff_multiplier_; 00732 00733 } else { 00734 break; 00735 } 00736 } 00737 00738 if (ret == -1) { 00739 if (this->tcp_config_->conn_retry_attempts_ > 0) { 00740 ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C:%d.\n", 00741 this->config_name().c_str(), 00742 this->remote_address_.get_host_addr(), 00743 this->remote_address_.get_port_number())); 00744 00745 } else { 00746 ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C:%d.\n", 00747 this->config_name().c_str(), 00748 this->remote_address_.get_host_addr(), 00749 this->remote_address_.get_port_number())); 00750 } 00751 00752 this->reconnect_state_ = LOST_STATE; 00753 this->link_->notify(DataLink::LOST); 00754 if (send_strategy) 00755 send_strategy->terminate_send(); 00756 00757 } else { 00758 ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C:%d.\n", 00759 this->config_name().c_str(), 00760 this->remote_address_.get_host_addr(), 00761 this->remote_address_.get_port_number())); 00762 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy(); 00763 if (receive_strategy->get_reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) { 00764 ACE_ERROR_RETURN((LM_ERROR, 00765 "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_i() can't register " 00766 "with reactor %X %p\n", this, ACE_TEXT("register_handler")), 00767 -1); 00768 } 00769 this->reconnect_state_ = RECONNECTED_STATE; 00770 this->link_->notify(DataLink::RECONNECTED); 00771 send_strategy->resume_send(); 00772 } 00773 00774 this->last_reconnect_attempted_ = ACE_OS::gettimeofday(); 00775 } 00776 00777 return this->reconnect_state_ == LOST_STATE ? -1 : 0; 00778 }
int OpenDDS::DCPS::TcpConnection::active_reconnect_on_new_association | ( | ) | [private] |
Definition at line 592 of file TcpConnection.cpp.
References active_establishment(), connected_, DBG_ENTRY_LVL, INIT_STATE, reconnect_lock_, reconnect_state_, send_strategy(), and shutdown_.
Referenced by reconnect().
00593 { 00594 DBG_ENTRY_LVL("TcpConnection","active_reconnect_on_new_association",6); 00595 GuardType guard(this->reconnect_lock_); 00596 if (this->shutdown_) 00597 return -1; 00598 00599 if (this->connected_ == true) 00600 return 0; 00601 00602 else if (this->active_establishment() == 0) { 00603 this->reconnect_state_ = INIT_STATE; 00604 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00605 if (send_strategy) 00606 send_strategy->resume_send(); 00607 return 0; 00608 } 00609 00610 return -1; 00611 }
ACE_Event_Handler::Reference_Count OpenDDS::DCPS::TcpConnection::add_reference | ( | void | ) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 999 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject::_add_ref().
01000 { 01001 RcObject::_add_ref(); 01002 return 1; 01003 }
int OpenDDS::DCPS::TcpConnection::close | ( | u_long | ) | [virtual] |
Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.
Definition at line 331 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, disconnect(), and send_strategy().
00332 { 00333 DBG_ENTRY_LVL("TcpConnection","close",6); 00334 00335 // TBD SOON - Find out exactly when close() is called. 00336 // I have no clue when and who might call this. 00337 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00338 if (send_strategy) 00339 send_strategy->terminate_send(); 00340 00341 this->disconnect(); 00342 00343 return 0; 00344 }
const std::string & OpenDDS::DCPS::TcpConnection::config_name | ( | ) | const [private] |
Definition at line 347 of file TcpConnection.cpp.
References link_.
Referenced by active_reconnect_i(), handle_close(), handle_timeout(), and reconnect().
00348 { 00349 return this->link_->impl().config().name(); 00350 }
void OpenDDS::DCPS::TcpConnection::disconnect | ( | void | ) |
This will be called by the DataLink (that "owns" us) when the TcpTransport has been told to shutdown(), or when the DataLink finds itself no longer needed, and is "self-releasing".
Definition at line 111 of file TcpConnection.cpp.
References connected_, DBG_ENTRY_LVL, ACE_Event_Handler::DONT_CALL, link_, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), ACE_Event_Handler::READ_MASK, and receive_strategy().
Referenced by active_reconnect_i(), close(), handle_close(), and notify_lost_on_backpressure_timeout().
00112 { 00113 DBG_ENTRY_LVL("TcpConnection","disconnect",6); 00114 this->connected_ = false; 00115 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy(); 00116 if (receive_strategy) { 00117 receive_strategy->get_reactor()->remove_handler(this, 00118 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); 00119 } 00120 00121 if (this->link_) { 00122 this->link_->drop_pending_request_acks(); 00123 } 00124 00125 this->peer().close(); 00126 00127 }
ACE_INLINE ACE_INET_Addr OpenDDS::DCPS::TcpConnection::get_remote_address | ( | ) |
Definition at line 46 of file TcpConnection.inl.
References remote_address_.
00047 { 00048 return this->remote_address_; 00049 }
int OpenDDS::DCPS::TcpConnection::handle_close | ( | ACE_HANDLE | , | |
ACE_Reactor_Mask | ||||
) | [virtual] |
Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.
Definition at line 353 of file TcpConnection.cpp.
References config_name(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), link_, LM_DEBUG, receive_strategy(), remote_address_, send_strategy(), and spawn_reconnect_thread().
00354 { 00355 DBG_ENTRY_LVL("TcpConnection","handle_close",6); 00356 00357 if (DCPS_debug_level >= 1) { 00358 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C:%d.\n", 00359 this->config_name().c_str(), 00360 this->remote_address_.get_host_addr(), 00361 this->remote_address_.get_port_number())); 00362 } 00363 00364 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy(); 00365 bool graceful = receive_strategy && receive_strategy->gracefully_disconnected(); 00366 00367 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00368 if (send_strategy) { 00369 if (graceful) { 00370 send_strategy->terminate_send(); 00371 } else { 00372 send_strategy->suspend_send(); 00373 } 00374 } 00375 00376 this->disconnect(); 00377 00378 if (graceful) { 00379 this->link_->notify(DataLink::DISCONNECTED); 00380 } else { 00381 this->spawn_reconnect_thread(); 00382 } 00383 00384 return 0; 00385 }
int OpenDDS::DCPS::TcpConnection::handle_input | ( | ACE_HANDLE | fd | ) | [virtual] |
We pass this "event" along to the receive_strategy.
Reimplemented from ACE_Event_Handler.
Definition at line 288 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, handle_setup_input(), passive_setup_, and receive_strategy().
00289 { 00290 DBG_ENTRY_LVL("TcpConnection","handle_input",6); 00291 00292 if (passive_setup_) { 00293 return handle_setup_input(fd); 00294 } 00295 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy(); 00296 if (!receive_strategy) { 00297 return 0; 00298 } 00299 00300 return receive_strategy->handle_dds_input(fd); 00301 }
int OpenDDS::DCPS::TcpConnection::handle_output | ( | ACE_HANDLE | ) | [virtual] |
Handle back pressure when sending.
Reimplemented from ACE_Event_Handler.
Definition at line 304 of file TcpConnection.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, id_, LM_DEBUG, send_strategy(), and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO.
00305 { 00306 DBG_ENTRY_LVL("TcpConnection","handle_output",6); 00307 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00308 if (send_strategy) { 00309 if (DCPS_debug_level > 9) { 00310 ACE_DEBUG((LM_DEBUG, 00311 ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ") 00312 ACE_TEXT("sending queued data.\n"), 00313 id_)); 00314 } 00315 00316 // Process data to be sent from the queue. 00317 if (ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO 00318 != send_strategy->perform_work()) { 00319 00320 // Stop handling output ready events when there is nothing to output. 00321 // N.B. This calls back into the reactor. Is the reactor lock 00322 // recursive? 00323 send_strategy->schedule_output(); 00324 } 00325 } 00326 00327 return 0; 00328 }
int OpenDDS::DCPS::TcpConnection::handle_setup_input | ( | ACE_HANDLE | h | ) | [private] |
During the connection setup phase, the passive side sets passive_setup_, redirecting handle_input() events here (there is no recv strategy yet).
Definition at line 219 of file TcpConnection.cpp.
References ACE_Message_Block::base(), connected_, ACE_Event_Handler::DONT_CALL, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), ACE_Message_Block::length(), LM_DEBUG, local_address_, ACE_OS::memcpy(), OpenDDS::DCPS::TcpTransport::passive_connection(), passive_setup_, passive_setup_buffer_, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), OpenDDS::DCPS::rchandle_from(), ACE_Message_Block::rd_ptr(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, reconnect_state_string(), remote_address_, ACE_Message_Block::reset(), ACE_Message_Block::size(), ACE_Message_Block::space(), transport_during_setup_, transport_priority_, VDBG, VDBG_LVL, ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.
Referenced by handle_input().
00220 { 00221 const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(), 00222 passive_setup_buffer_.space(), 00223 &ACE_Time_Value::zero); 00224 00225 if (ret < 0 && errno == ETIME) { 00226 return 0; 00227 } 00228 00229 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input %@ " 00230 "recv returned %b %m.\n", this, ret), 4); 00231 00232 if (ret <= 0) { 00233 return -1; 00234 } 00235 00236 passive_setup_buffer_.wr_ptr(ret); 00237 // Parse the setup message: <len><addr><prio> 00238 // len and prio are network order 32-bit ints 00239 // addr is a string of length len, including null 00240 ACE_UINT32 nlen = 0; 00241 00242 if (passive_setup_buffer_.length() >= sizeof(nlen)) { 00243 00244 ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen)); 00245 passive_setup_buffer_.rd_ptr(sizeof(nlen)); 00246 ACE_UINT32 hlen = ntohl(nlen); 00247 passive_setup_buffer_.size(hlen + 2 * sizeof(nlen)); 00248 00249 ACE_UINT32 nprio = 0; 00250 00251 if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) { 00252 00253 const std::string bufstr(passive_setup_buffer_.rd_ptr()); 00254 const NetworkAddress network_order_address(bufstr); 00255 network_order_address.to_addr(remote_address_); 00256 00257 ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio)); 00258 transport_priority_ = ntohl(nprio); 00259 00260 passive_setup_buffer_.reset(); 00261 passive_setup_ = false; 00262 00263 VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input " 00264 "%@ %C:%d->%C:%d, priority==%d, reconnect_state = %C\n", this, 00265 remote_address_.get_host_addr(), remote_address_.get_port_number(), 00266 local_address_.get_host_addr(), local_address_.get_port_number(), 00267 transport_priority_, reconnect_state_string().c_str())); 00268 00269 // remove from reactor, normal recv strategy setup will add us back 00270 if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) { 00271 VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input " 00272 "remove_handler failed %m.\n")); 00273 } 00274 00275 transport_during_setup_->passive_connection(remote_address_, rchandle_from(this)); 00276 connected_ = true; 00277 00278 return 0; 00279 } 00280 } 00281 00282 passive_setup_buffer_.rd_ptr(passive_setup_buffer_.base()); 00283 00284 return 0; 00285 }
int OpenDDS::DCPS::TcpConnection::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) | [virtual] |
A timer is scheduled on acceptor side to check if a new connection is accepted after the connection is lost.
Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.
Definition at line 783 of file TcpConnection.cpp.
References ACE_TEXT(), config_name(), DBG_ENTRY_LVL, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_lock_, reconnect_state_, RECONNECTED_STATE, remote_address_, send_strategy(), and tear_link().
00785 { 00786 DBG_ENTRY_LVL("TcpConnection","handle_timeout",6); 00787 00788 GuardType guard(this->reconnect_lock_); 00789 00790 switch (this->reconnect_state_) { 00791 case PASSIVE_WAITING_STATE: { 00792 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C:%d.\n", 00793 this->config_name().c_str(), 00794 this->remote_address_.get_host_addr(), 00795 this->remote_address_.get_port_number())); 00796 00797 this->reconnect_state_ = PASSIVE_TIMEOUT_CALLED_STATE; 00798 // We stay in PASSIVE_TIMEOUT_CALLED_STATE indicates there is no new connection. 00799 // Now we need declare the connection is lost. 00800 this->link_->notify(DataLink::LOST); 00801 00802 // The handle_timeout may be called after the connection is re-established 00803 // and the send strategy of this old connection is reset to nil. 00804 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00805 if (send_strategy) 00806 send_strategy->terminate_send(); 00807 00808 this->reconnect_state_ = LOST_STATE; 00809 00810 this->tear_link(); 00811 00812 } 00813 break; 00814 00815 case RECONNECTED_STATE: 00816 // reconnected successfully. 00817 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C:%d.\n", 00818 this->config_name().c_str(), 00819 this->remote_address_.get_host_addr(), 00820 this->remote_address_.get_port_number())); 00821 break; 00822 00823 default : 00824 ACE_ERROR((LM_ERROR, 00825 ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ") 00826 ACE_TEXT(" unknown state or it should not be in state=%d \n"), 00827 reconnect_state_)); 00828 break; 00829 } 00830 00831 return 0; 00832 }
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE std::size_t & OpenDDS::DCPS::TcpConnection::id | ( | void | ) |
ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connected | ( | void | ) | const |
Return true if connection is connected.
Definition at line 33 of file TcpConnection.inl.
References connected_, and ACE_Atomic_Op< ACE_LOCK, TYPE >::value().
00034 { 00035 return this->connected_.value(); 00036 }
ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connector | ( | ) | const |
Return true if the object represents the connector side, otherwise it's the acceptor side. The acceptor/connector role is not changed when re-establishing the connection.
Definition at line 27 of file TcpConnection.inl.
References is_connector_.
00028 { 00029 return this->is_connector_; 00030 }
void OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout | ( | ) |
This function is called when the backpressure occurs and timed out after "max_output_pause_period". The lost connection notification should be sent and the connection needs be closed since we declared it as a "lost" connection.
Definition at line 927 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, disconnect(), INIT_STATE, link_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, reconnect_lock_, reconnect_state_, and send_strategy().
00928 { 00929 DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6); 00930 bool notify_lost = false; 00931 { 00932 GuardType guard(this->reconnect_lock_); 00933 00934 if (this->reconnect_state_ == INIT_STATE) { 00935 this->reconnect_state_ = LOST_STATE; 00936 notify_lost = true; 00937 00938 } 00939 } 00940 00941 if (notify_lost) { 00942 this->disconnect(); 00943 this->link_->notify(DataLink::LOST); 00944 00945 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00946 if (send_strategy) 00947 send_strategy->terminate_send(); 00948 } 00949 00950 }
int OpenDDS::DCPS::TcpConnection::open | ( | void * | arg | ) | [virtual] |
Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.
Definition at line 130 of file TcpConnection.cpp.
References ACE_TEXT(), active_open(), OpenDDS::DCPS::TcpTransport::async_connect_failed(), OpenDDS::DCPS::TcpTransport::connect_tcp_datalink(), DBG_ENTRY_LVL, OpenDDS::DCPS::TcpAcceptor::get_configuration(), ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::get_handle(), is_connector_, link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TcpInst::local_address(), local_address_, passive_setup_, passive_setup_buffer_, OpenDDS::DCPS::rchandle_from(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, remote_address_, set_sock_options(), ACE_Message_Block::size(), tcp_config_, OpenDDS::DCPS::TcpAcceptor::transport(), transport_during_setup_, transport_priority_, and VDBG_LVL.
00131 { 00132 DBG_ENTRY_LVL("TcpConnection","open",6); 00133 00134 if (is_connector_) { 00135 00136 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open active.\n"), 2); 00137 // Take over the refcount from TcpTransport::connect_datalink(). 00138 00139 TcpTransport& transport = static_cast<TcpTransport&>(link_->impl()); 00140 00141 const bool is_loop(local_address_ == remote_address_); 00142 const PriorityKey key(transport_priority_, remote_address_, 00143 is_loop, false /* !active */); 00144 00145 int active_open_ = active_open(); 00146 00147 int connect_tcp_datalink_ = transport.connect_tcp_datalink(*link_, rchandle_from(this)); 00148 00149 if (active_open_ == -1 || connect_tcp_datalink_ == -1) { 00150 // if (active_open() == -1 || 00151 // transport->connect_tcp_datalink(link_, self) == -1) { 00152 00153 transport.async_connect_failed(key); 00154 00155 return -1; 00156 } 00157 00158 return 0; 00159 } 00160 00161 // The passed-in arg is really the acceptor object that created this 00162 // TcpConnection object, and is also the caller of this open() 00163 // method. We need to cast the arg to the TcpAcceptor* type. 00164 TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg); 00165 00166 if (acceptor == 0) { 00167 // The cast failed. 00168 ACE_ERROR_RETURN((LM_ERROR, 00169 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ") 00170 ACE_TEXT("failed to cast void* arg to ") 00171 ACE_TEXT("TcpAcceptor* type.\n")), 00172 -1); 00173 } 00174 00175 TcpConnection_rch self(this, keep_count()); 00176 00177 // Now we need to ask the TcpAcceptor object to provide us with 00178 // a pointer to the TcpTransport object that "owns" the acceptor. 00179 TcpTransport* transport = acceptor->transport(); 00180 00181 if (!transport) { 00182 // The acceptor gave us a nil transport (smart) pointer. 00183 ACE_ERROR_RETURN((LM_ERROR, 00184 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ") 00185 ACE_TEXT("acceptor's transport is nil.\n")), 00186 -1); 00187 } 00188 00189 // Keep a "copy" of the reference to TcpInst object 00190 // for ourselves. 00191 tcp_config_ = &acceptor->get_configuration(); 00192 local_address_ = tcp_config_->local_address(); 00193 00194 set_sock_options(tcp_config_); 00195 00196 // We expect that the active side of the connection (the remote side 00197 // in this case) will supply its listening ACE_INET_Addr as the first 00198 // message it sends to the socket. This is a one-way connection 00199 // establishment protocol message. 00200 passive_setup_ = true; 00201 transport_during_setup_ = transport; 00202 passive_setup_buffer_.size(sizeof(ACE_UINT32)); 00203 00204 if (reactor()->register_handler(this, READ_MASK) == -1) { 00205 ACE_ERROR_RETURN((LM_ERROR, 00206 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ") 00207 ACE_TEXT("unable to register with the reactor.%p\n"), 00208 ACE_TEXT("register_handler")), 00209 -1); 00210 } 00211 00212 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open passive handle=%d.\n", 00213 static_cast<int>(intptr_t(get_handle()))), 2); 00214 00215 return 0; 00216 }
int OpenDDS::DCPS::TcpConnection::passive_reconnect_i | ( | ) | [private] |
Definition at line 617 of file TcpConnection.cpp.
References ACE_TEXT(), connected_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataLink::DISCONNECTED, INIT_STATE, link_, LM_ERROR, OpenDDS::DCPS::TcpInst::passive_reconnect_duration_, passive_reconnect_timer_id_, PASSIVE_WAITING_STATE, receive_strategy(), reconnect_lock_, reconnect_state_, shutdown_, and tcp_config_.
Referenced by reconnect().
00618 { 00619 DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6); 00620 GuardType guard(this->reconnect_lock_); 00621 if (this->shutdown_) 00622 return -1; 00623 00624 // The passive_reconnect_timer_id_ is used as flag to allow the timer scheduled just once. 00625 if (this->reconnect_state_ == INIT_STATE) { 00626 // Mark the connection lost since the recv/send just failed. 00627 this->connected_ = false; 00628 00629 if (this->tcp_config_->passive_reconnect_duration_ == 0) 00630 return -1; 00631 00632 ACE_Time_Value timeout(this->tcp_config_->passive_reconnect_duration_/1000, 00633 this->tcp_config_->passive_reconnect_duration_%1000 * 1000); 00634 this->reconnect_state_ = PASSIVE_WAITING_STATE; 00635 this->link_->notify(DataLink::DISCONNECTED); 00636 00637 // It is possible that the passive reconnect is called after the new connection 00638 // is accepted and the receive_strategy of this old connection is reset to nil. 00639 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy(); 00640 if (this->receive_strategy()) { 00641 00642 // Give a copy to reactor. 00643 this->passive_reconnect_timer_id_ = receive_strategy->get_reactor()->schedule_timer(this, 0, timeout); 00644 00645 if (this->passive_reconnect_timer_id_ == -1) { 00646 ACE_ERROR_RETURN((LM_ERROR, 00647 ACE_TEXT("(%P|%t) ERROR: TcpConnection::passive_reconnect_i") 00648 ACE_TEXT(", %p.\n"), ACE_TEXT("schedule_timer")), 00649 -1); 00650 } 00651 } 00652 } 00653 00654 return 0; 00655 }
OpenDDS::DCPS::TcpReceiveStrategy_rch OpenDDS::DCPS::TcpConnection::receive_strategy | ( | ) |
Definition at line 105 of file TcpConnection.cpp.
References link_.
Referenced by active_reconnect_i(), disconnect(), handle_close(), handle_input(), passive_reconnect_i(), and transfer().
00106 { 00107 return this->link_->receive_strategy(); 00108 }
int OpenDDS::DCPS::TcpConnection::reconnect | ( | bool | on_new_association = false |
) |
This function is called to re-establish the connection. If this object is the connector side of the connection then it tries to reconnect to the remote, if it's the acceptor side of the connection then it schedules a timer to check if it passively accepted a connection from remote. The on_new_association true indicates this is called when the connection is previous lost and new association is added. The connector side needs to try to actively reconnect to remote.
Definition at line 528 of file TcpConnection.cpp.
References active_reconnect_i(), active_reconnect_on_new_association(), config_name(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), is_connector_, link_, LM_DEBUG, passive_reconnect_i(), and remote_address_.
Referenced by OpenDDS::DCPS::TcpReconnectTask::execute().
00529 { 00530 DBG_ENTRY_LVL("TcpConnection","reconnect",6); 00531 if (DCPS_debug_level >= 1) { 00532 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect initiated on transport: %C to %C:%d.\n", 00533 this->config_name().c_str(), 00534 this->remote_address_.get_host_addr(), 00535 this->remote_address_.get_port_number())); 00536 } 00537 00538 if (on_new_association) { 00539 if (DCPS_debug_level >= 1) { 00540 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect on new association\n")); 00541 } 00542 return this->active_reconnect_on_new_association(); 00543 } 00544 00545 // If on_new_association is false, it's called by the reconnect task. 00546 // We need make sure if the link release is pending. If does, do 00547 // not try to reconnect. 00548 else if (!this->link_->is_release_pending()) { 00549 if (DCPS_debug_level >= 1) { 00550 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect release not currently pending\n")); 00551 } 00552 // Try to reconnect if it's connector previously. 00553 if (this->is_connector_ && this->active_reconnect_i() == -1) { 00554 if (DCPS_debug_level >= 1) { 00555 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is connector but active_reconnect_i failed\n")); 00556 } 00557 return -1; 00558 } 00559 00560 // Schedule a timer to see if a incoming connection is accepted when timeout. 00561 else if (!this->is_connector_ && this->passive_reconnect_i() == -1) { 00562 if (DCPS_debug_level >= 1) { 00563 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is acceptor but passive_reconnect_i failed\n")); 00564 } 00565 return -1; 00566 } 00567 00568 } 00569 if (DCPS_debug_level >= 1) { 00570 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect returning 0\n")); 00571 } 00572 return 0; 00573 }
OPENDDS_STRING OpenDDS::DCPS::TcpConnection::reconnect_state_string | ( | ) | const [private] |
Get name of the current reconnect state as a string.
Definition at line 1059 of file TcpConnection.cpp.
References ACE_TEXT(), INIT_STATE, LM_ERROR, LOST_STATE, OPENDDS_STRING, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_state_, RECONNECTED_STATE, and OpenDDS::DCPS::to_dds_string().
Referenced by active_reconnect_i(), and handle_setup_input().
01060 { 01061 switch (reconnect_state_) { 01062 case INIT_STATE: 01063 return "INIT_STATE"; 01064 case LOST_STATE: 01065 return "LOST_STATE"; 01066 case RECONNECTED_STATE: 01067 return "RECONENCTED_STATE"; 01068 case PASSIVE_WAITING_STATE: 01069 return "PASSIVE_WAITING_STATE"; 01070 case PASSIVE_TIMEOUT_CALLED_STATE: 01071 return "PASSIVE_TIMEOUT_CALLED_STATE"; 01072 default: 01073 ACE_ERROR((LM_ERROR, ACE_TEXT( 01074 "OpenDDS::DCPS::TcpConnection::reconnect_state_string(): " 01075 "%d is either completely invalid or at least not defined in this function.\n"), 01076 reconnect_state_ 01077 )); 01078 return OPENDDS_STRING("(Unknown Reconnect State: ") 01079 + to_dds_string(reconnect_state_) + ")"; 01080 } 01081 }
ACE_THR_FUNC_RETURN OpenDDS::DCPS::TcpConnection::reconnect_thread_fun | ( | void * | conn | ) | [static, private] |
Definition at line 1035 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, SIG_SETMASK, ACE_OS::sigfillset(), sigset_t, and ACE_OS::thr_sigsetmask().
Referenced by spawn_reconnect_thread().
01036 { 01037 DBG_ENTRY_LVL("TcpConnection","reconnect_thread_fun",6); 01038 01039 // Ignore all signals to avoid 01040 // ERROR: <something descriptive> Interrupted system call 01041 // The main thread will handle signals. 01042 sigset_t set; 01043 ACE_OS::sigfillset(&set); 01044 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL); 01045 01046 // Make sure the associated transport_config outlives the connection object. 01047 RcHandle<TransportInst> transport_config; 01048 TcpConnection_rch connection(static_cast<TcpConnection*>(arg), keep_count()); 01049 transport_config = RcHandle<TransportInst>(&connection->link_->impl().config(), keep_count()); 01050 01051 if (connection->reconnect() == -1) { 01052 connection->tear_link(); 01053 } 01054 01055 return 0; 01056 }
void OpenDDS::DCPS::TcpConnection::relink_from_recv | ( | bool | do_suspend | ) |
Reconnect initiated by receive strategy.
This is called by TcpReceiveStrategy when a disconnect is detected. It simply suspends any sends and lets the handle_close() handle the reconnect logic.
Definition at line 972 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, and send_strategy().
00973 { 00974 DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6); 00975 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00976 if (do_suspend && send_strategy) 00977 send_strategy->suspend_send(); 00978 }
void OpenDDS::DCPS::TcpConnection::relink_from_send | ( | bool | do_suspend | ) |
Reconnect initiated by send strategy.
This is called by TcpSendStrategy when a send fails and a reconnect should be initiated. This method suspends any sends and kicks the reconnect thread into action.
Definition at line 957 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, send_strategy(), and spawn_reconnect_thread().
00958 { 00959 DBG_ENTRY_LVL("TcpConnection","relink_from_send",6); 00960 00961 TcpSendStrategy_rch send_strategy = this->send_strategy(); 00962 if (do_suspend && send_strategy) 00963 send_strategy->suspend_send(); 00964 00965 this->spawn_reconnect_thread(); 00966 }
ACE_Event_Handler::Reference_Count OpenDDS::DCPS::TcpConnection::remove_reference | ( | void | ) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 1006 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject::_remove_ref().
01007 { 01008 RcObject::_remove_ref(); 01009 return 1; 01010 }
OpenDDS::DCPS::TcpSendStrategy_rch OpenDDS::DCPS::TcpConnection::send_strategy | ( | ) |
Definition at line 99 of file TcpConnection.cpp.
References link_.
Referenced by active_reconnect_i(), active_reconnect_on_new_association(), close(), handle_close(), handle_output(), handle_timeout(), notify_lost_on_backpressure_timeout(), relink_from_recv(), and relink_from_send().
00100 { 00101 return this->link_->send_strategy(); 00102 }
ACE_INLINE void OpenDDS::DCPS::TcpConnection::set_datalink | ( | const TcpDataLink_rch & | link | ) |
Cache the reference to the datalink object for lost connection callbacks.
Definition at line 39 of file TcpConnection.inl.
References link_.
00040 { 00041 // Keep a "copy" of the reference to the data link for ourselves. 00042 this->link_ = link; 00043 }
void OpenDDS::DCPS::TcpConnection::set_sock_options | ( | const TcpInst * | tcp_config | ) |
Definition at line 388 of file TcpConnection.cpp.
References OpenDDS::DCPS::TcpInst::enable_nagle_algorithm_, LM_ERROR, and ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer().
Referenced by active_establishment(), and open().
00389 { 00390 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ) 00391 int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; 00392 int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; 00393 //ACE_SOCK_Stream sock = ACE_static_cast(ACE_SOCK_Stream, this->peer() ); 00394 # if !defined (ACE_LACKS_SOCKET_BUFSIZ) 00395 00396 // A little screwy double negative logic: disabling nagle involves 00397 // enabling TCP_NODELAY 00398 int opt = (tcp_config->enable_nagle_algorithm_ == false); 00399 00400 if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) { 00401 ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n")); 00402 } 00403 00404 if (this->peer().set_option(SOL_SOCKET, 00405 SO_SNDBUF, 00406 (void *) &snd_size, 00407 sizeof(snd_size)) == -1 00408 && errno != ENOTSUP) { 00409 ACE_ERROR((LM_ERROR, 00410 "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n", 00411 snd_size)); 00412 return; 00413 } 00414 00415 if (this->peer().set_option(SOL_SOCKET, 00416 SO_RCVBUF, 00417 (void *) &rcv_size, 00418 sizeof(int)) == -1 00419 && errno != ENOTSUP) { 00420 ACE_ERROR((LM_ERROR, 00421 "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m \n", 00422 rcv_size)); 00423 return; 00424 } 00425 00426 # else 00427 ACE_UNUSED_ARG(tcp_config); 00428 ACE_UNUSED_ARG(snd_size); 00429 ACE_UNUSED_ARG(rcv_size); 00430 # endif /* !ACE_LACKS_SOCKET_BUFSIZ */ 00431 00432 #else 00433 ACE_UNUSED_ARG(tcp_config); 00434 #endif /* !ACE_DEFAULT_MAX_SOCKET_BUFSIZ */ 00435 }
void OpenDDS::DCPS::TcpConnection::shutdown | ( | void | ) |
Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.
Definition at line 989 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, reconnect_lock_, and shutdown_.
00990 { 00991 DBG_ENTRY_LVL("TcpConnection","shutdown",6); 00992 GuardType guard(this->reconnect_lock_); 00993 this->shutdown_ = true; 00994 ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>::shutdown(); 00995 00996 }
void OpenDDS::DCPS::TcpConnection::spawn_reconnect_thread | ( | ) | [private] |
Definition at line 1013 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject::_add_ref(), OpenDDS::DCPS::RcObject::_remove_ref(), DBG_ENTRY_LVL, ACE_Thread_Manager::instance(), link_, reconnect_lock_, reconnect_thread_, reconnect_thread_fun(), and shutdown_.
Referenced by handle_close(), and relink_from_send().
01014 { 01015 DBG_ENTRY_LVL("TcpConnection","spawn_reconnect_thread",6); 01016 GuardType guard(this->reconnect_lock_); 01017 if (!shutdown_) { 01018 // Make sure the associated transport_config outlives the connection object. 01019 TransportInst& transport_config = this->link_->impl().config(); 01020 transport_config._add_ref(); 01021 // add the reference count to be picked up from the new thread 01022 this->_add_ref(); 01023 if (ACE_Thread_Manager::instance()->spawn(&reconnect_thread_fun, 01024 this, 01025 THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, 01026 &reconnect_thread_) == -1){ 01027 // we need to decrement the reference count when thread creation fails. 01028 this->_remove_ref(); 01029 transport_config._remove_ref(); 01030 } 01031 } 01032 }
bool OpenDDS::DCPS::TcpConnection::tear_link | ( | ) |
Called by the reconnect task to inform us that the link & any associated data can be torn down. This call is done with no DCPS/transport locks held.
Definition at line 981 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, and link_.
Referenced by OpenDDS::DCPS::TcpReconnectTask::execute(), and handle_timeout().
00982 { 00983 DBG_ENTRY_LVL("TcpConnection","tear_link",6); 00984 00985 return this->link_->release_resources(); 00986 }
void OpenDDS::DCPS::TcpConnection::transfer | ( | TcpConnection * | connection | ) |
This object would be "old" connection object and the provided is the new connection object. The "old" connection object will copy its states to to the "new" connection object. This is called by the TcpDataLink when a new connection is accepted (with a new TcpConnection object). We need make the state in "new" connection object consistent with the "old" connection object.
Definition at line 841 of file TcpConnection.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), INIT_STATE, ACE_Thread_Manager::instance(), is_connector_, ACE_Thread_Manager::join(), link_, LM_DEBUG, LM_ERROR, local_address_, LOST_STATE, passive_reconnect_timer_id_, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, receive_strategy(), reconnect_lock_, reconnect_state_, reconnect_thread_, OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, tcp_config_, and VDBG.
00842 { 00843 DBG_ENTRY_LVL("TcpConnection","transfer",6); 00844 00845 GuardType guard(this->reconnect_lock_); 00846 00847 bool notify_reconnect = false; 00848 00849 switch (this->reconnect_state_) { 00850 case INIT_STATE: 00851 // We have not detected the lost connection and the peer is faster than us and 00852 // re-established the connection. so do not notify reconnected. 00853 break; 00854 00855 case LOST_STATE: 00856 00857 // The reconnect timed out. 00858 case PASSIVE_TIMEOUT_CALLED_STATE: 00859 // TODO: If the handle_timeout is called before the old connection 00860 // transfer its state to new connection then should we disconnect 00861 // the new connection or keep it alive ? 00862 // I think we should keep the connection, the user will get a 00863 // lost connection notification and then a reconnected notification. 00864 notify_reconnect = true; 00865 break; 00866 00867 case PASSIVE_WAITING_STATE: { 00868 // Cancel the timer since we got new connection. 00869 TcpReceiveStrategy_rch receive_strategy = this->receive_strategy(); 00870 if (receive_strategy->get_reactor()->cancel_timer(this) == -1) { 00871 ACE_ERROR((LM_ERROR, 00872 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ") 00873 ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer"))); 00874 00875 } else 00876 passive_reconnect_timer_id_ = -1; 00877 00878 notify_reconnect = true; 00879 } 00880 break; 00881 00882 default : 00883 ACE_ERROR((LM_ERROR, 00884 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ") 00885 ACE_TEXT(" unknown state or it should not be in state=%i \n"), 00886 reconnect_state_)); 00887 break; 00888 } 00889 00890 // Verify if this acceptor side. 00891 if (this->is_connector_ || connection->is_connector_) { 00892 ACE_ERROR((LM_ERROR, 00893 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ") 00894 ACE_TEXT(" should NOT be called by the connector side \n"))); 00895 } 00896 00897 if (reconnect_thread_) { 00898 ACE_Thread_Manager::instance()->join(reconnect_thread_); 00899 reconnect_thread_ = 0; 00900 } 00901 // connection->receive_strategy_ = this->receive_strategy_; 00902 // connection->send_strategy_ = this->send_strategy_; 00903 connection->remote_address_ = this->remote_address_; 00904 connection->local_address_ = this->local_address_; 00905 connection->tcp_config_ = this->tcp_config_; 00906 connection->link_ = this->link_; 00907 00908 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00909 "transfer(%C:%d->%C:%d) passive reconnected. new con %@ " 00910 " old con %@ \n", 00911 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(), 00912 this->local_address_.get_host_addr(), this->local_address_.get_port_number(), 00913 connection, this)); 00914 00915 if (notify_reconnect) { 00916 this->reconnect_state_ = RECONNECTED_STATE; 00917 this->link_->notify(DataLink::RECONNECTED); 00918 } 00919 00920 }
ACE_INLINE OpenDDS::DCPS::Priority OpenDDS::DCPS::TcpConnection::transport_priority | ( | ) | const |
Definition at line 60 of file TcpConnection.inl.
References transport_priority_.
00061 { 00062 return this->transport_priority_; 00063 }
ACE_INLINE OpenDDS::DCPS::Priority & OpenDDS::DCPS::TcpConnection::transport_priority | ( | ) |
Access TRANSPORT_PRIORITY.value policy value if set.
Definition at line 53 of file TcpConnection.inl.
References transport_priority_.
00054 { 00055 return this->transport_priority_; 00056 }
ACE_Atomic_Op<ACE_SYNCH_MUTEX, bool> OpenDDS::DCPS::TcpConnection::connected_ [private] |
Flag indicates if connected or disconnected. It's set to true when actively connecting or passively accepting succeeds and set to false whenever the peer stream is closed.
Definition at line 168 of file TcpConnection.h.
Referenced by active_establishment(), active_open(), active_reconnect_on_new_association(), disconnect(), handle_setup_input(), is_connected(), and passive_reconnect_i().
std::size_t OpenDDS::DCPS::TcpConnection::id_ [private] |
Small unique identifying value.
Definition at line 208 of file TcpConnection.h.
Referenced by handle_output(), and id().
bool OpenDDS::DCPS::TcpConnection::is_connector_ [private] |
Flag indicate this connection object is the connector or acceptor.
Definition at line 171 of file TcpConnection.h.
Referenced by active_establishment(), is_connector(), open(), reconnect(), and transfer().
Last time the connection is re-established.
Definition at line 195 of file TcpConnection.h.
Referenced by active_reconnect_i().
Datalink object which is needed for connection lost callback.
Definition at line 183 of file TcpConnection.h.
Referenced by active_establishment(), active_reconnect_i(), config_name(), disconnect(), handle_close(), handle_timeout(), notify_lost_on_backpressure_timeout(), open(), passive_reconnect_i(), receive_strategy(), reconnect(), send_strategy(), set_datalink(), spawn_reconnect_thread(), tear_link(), and transfer().
Local address.
Definition at line 177 of file TcpConnection.h.
Referenced by active_establishment(), active_reconnect_i(), handle_setup_input(), open(), and transfer().
The id of the scheduled timer. The timer is scheduled to check if the connection is re-established during the passive_reconnect_duration_. This id controls that the timer is just scheduled once when there are multiple threads detect the lost connection.
Definition at line 189 of file TcpConnection.h.
Referenced by passive_reconnect_i(), and transfer().
bool OpenDDS::DCPS::TcpConnection::passive_setup_ [private] |
Definition at line 203 of file TcpConnection.h.
Referenced by handle_input(), handle_setup_input(), and open().
Definition at line 204 of file TcpConnection.h.
Referenced by handle_setup_input(), and open().
Lock to avoid the reconnect() called multiple times when both send() and recv() fail.
Definition at line 163 of file TcpConnection.h.
Referenced by active_open(), active_reconnect_i(), active_reconnect_on_new_association(), handle_timeout(), notify_lost_on_backpressure_timeout(), passive_reconnect_i(), shutdown(), spawn_reconnect_thread(), and transfer().
The state indicates each step of the reconnecting.
Definition at line 192 of file TcpConnection.h.
Referenced by active_reconnect_i(), active_reconnect_on_new_association(), handle_timeout(), notify_lost_on_backpressure_timeout(), passive_reconnect_i(), reconnect_state_string(), and transfer().
Definition at line 209 of file TcpConnection.h.
Referenced by spawn_reconnect_thread(), transfer(), and ~TcpConnection().
Remote address.
Definition at line 174 of file TcpConnection.h.
Referenced by active_establishment(), active_reconnect_i(), get_remote_address(), handle_close(), handle_setup_input(), handle_timeout(), open(), reconnect(), and transfer().
bool OpenDDS::DCPS::TcpConnection::shutdown_ [private] |
shutdown flag
Definition at line 201 of file TcpConnection.h.
Referenced by active_establishment(), active_open(), active_reconnect_i(), active_reconnect_on_new_association(), passive_reconnect_i(), shutdown(), and spawn_reconnect_thread().
const TcpInst* OpenDDS::DCPS::TcpConnection::tcp_config_ [private] |
The configuration used by this connection.
Definition at line 180 of file TcpConnection.h.
Referenced by active_establishment(), active_reconnect_i(), open(), passive_reconnect_i(), and transfer().
Definition at line 205 of file TcpConnection.h.
Referenced by handle_setup_input(), and open().
TRANSPORT_PRIORITY.value policy value.
Definition at line 198 of file TcpConnection.h.
Referenced by active_establishment(), handle_setup_input(), open(), and transport_priority().