#include <TcpConnection.h>
Inheritance diagram for OpenDDS::DCPS::TcpConnection:
Public Types | |
INIT_STATE | |
LOST_STATE | |
RECONNECTED_STATE | |
PASSIVE_WAITING_STATE | |
PASSIVE_TIMEOUT_CALLED_STATE | |
enum | ReconnectState { INIT_STATE, LOST_STATE, RECONNECTED_STATE, PASSIVE_WAITING_STATE, PASSIVE_TIMEOUT_CALLED_STATE } |
States are used during reconnecting. More... | |
Public Member Functions | |
TcpConnection () | |
Passive side constructor (acceptor). | |
TcpConnection (const ACE_INET_Addr &remote_address, Priority priority, const TcpInst_rch &config) | |
Active side constructor (connector). | |
virtual | ~TcpConnection () |
std::size_t & | id () |
int | active_open () |
void | disconnect () |
virtual int | open (void *arg) |
void | set_receive_strategy (TcpReceiveStrategy *receive_strategy) |
void | remove_receive_strategy () |
void | set_send_strategy (TcpSendStrategy *send_strategy) |
void | remove_send_strategy () |
virtual int | handle_input (ACE_HANDLE) |
We pass this "event" along to the receive_strategy. | |
virtual int | handle_output (ACE_HANDLE) |
Handle back pressure when sending. | |
virtual int | close (u_long) |
virtual int | handle_close (ACE_HANDLE, ACE_Reactor_Mask) |
void | set_sock_options (TcpInst *tcp_config) |
int | reconnect (bool on_new_association=false) |
bool | is_connector () const |
bool | is_connected () const |
Return true if connection is connected. | |
void | transfer (TcpConnection *connection) |
int | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
void | set_datalink (TcpDataLink *link) |
void | notify_lost_on_backpressure_timeout () |
ACE_INET_Addr | get_remote_address () |
void | relink_from_send (bool do_suspend) |
Reconnect initiated by send strategy. | |
void | relink_from_recv (bool do_suspend) |
Reconnect initiated by receive strategy. | |
bool | tear_link () |
void | shutdown () |
Priority & | transport_priority () |
Access TRANSPORT_PRIORITY.value policy value if set. | |
Priority | transport_priority () const |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
Private Member Functions | |
int | active_establishment (bool initiate_connect=true) |
int | active_reconnect_i () |
int | passive_reconnect_i () |
int | active_reconnect_on_new_association () |
int | handle_setup_input (ACE_HANDLE h) |
Private Attributes | |
LockType | reconnect_lock_ |
ACE_Atomic_Op< ACE_SYNCH_MUTEX, bool > | connected_ |
bool | is_connector_ |
Flag indicate this connection object is the connector or acceptor. | |
TcpReceiveStrategy_rch | receive_strategy_ |
Reference to the receiving strategy. | |
TcpSendStrategy_rch | send_strategy_ |
Reference to the send strategy. | |
ACE_INET_Addr | remote_address_ |
Remote address. | |
ACE_INET_Addr | local_address_ |
Local address. | |
TcpInst_rch | tcp_config_ |
The configuration used by this connection. | |
TcpDataLink_rch | link_ |
Datalink object which is needed for connection lost callback. | |
int | passive_reconnect_timer_id_ |
TcpReconnectTask | reconnect_task_ |
ReconnectState | reconnect_state_ |
The state indicates each step of the reconnecting. | |
ACE_Time_Value | last_reconnect_attempted_ |
Last time the connection is re-established. | |
Priority | transport_priority_ |
TRANSPORT_PRIORITY.value policy value. | |
bool | shutdown_ |
shutdown flag | |
bool | passive_setup_ |
ACE_Message_Block | passive_setup_buffer_ |
TcpTransport_rch | transport_during_setup_ |
std::size_t | id_ |
Small unique identifying value. |
Definition at line 33 of file TcpConnection.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::TcpConnection::GuardType [private] |
Definition at line 154 of file TcpConnection.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TcpConnection::LockType [private] |
Definition at line 153 of file TcpConnection.h.
States are used during reconnecting.
INIT_STATE | |
LOST_STATE | |
RECONNECTED_STATE | |
PASSIVE_WAITING_STATE | |
PASSIVE_TIMEOUT_CALLED_STATE |
Definition at line 39 of file TcpConnection.h.
00039 { 00040 INIT_STATE, 00041 LOST_STATE, 00042 RECONNECTED_STATE, 00043 PASSIVE_WAITING_STATE, 00044 PASSIVE_TIMEOUT_CALLED_STATE 00045 };
OpenDDS::DCPS::TcpConnection::TcpConnection | ( | ) |
Passive side constructor (acceptor).
Definition at line 41 of file TcpConnection.cpp.
References DBG_ENTRY_LVL.
00042 : connected_(false) 00043 , is_connector_(false) 00044 , passive_reconnect_timer_id_(-1) 00045 , reconnect_task_(this) 00046 , reconnect_state_(INIT_STATE) 00047 , last_reconnect_attempted_(ACE_Time_Value::zero) 00048 , transport_priority_(0) // TRANSPORT_PRIORITY.value default value - 0. 00049 , shutdown_(false) 00050 , passive_setup_(false) 00051 , passive_setup_buffer_(sizeof(ACE_UINT32)) 00052 , id_(0) 00053 { 00054 DBG_ENTRY_LVL("TcpConnection","TcpConnection",6); 00055 00056 if (this->reconnect_task_.open()) { 00057 ACE_ERROR((LM_ERROR, 00058 ACE_TEXT("(%P|%t) ERROR: Reconnect task failed to open : %p\n"), 00059 ACE_TEXT("open"))); 00060 } 00061 00062 }
OpenDDS::DCPS::TcpConnection::TcpConnection | ( | const ACE_INET_Addr & | remote_address, | |
Priority | priority, | |||
const TcpInst_rch & | config | |||
) |
Active side constructor (connector).
Definition at line 64 of file TcpConnection.cpp.
References DBG_ENTRY_LVL.
00067 : connected_(false) 00068 , is_connector_(true) 00069 , remote_address_(remote_address) 00070 , local_address_(config->local_address()) 00071 , tcp_config_(config) 00072 , passive_reconnect_timer_id_(-1) 00073 , reconnect_task_(this) 00074 , reconnect_state_(INIT_STATE) 00075 , last_reconnect_attempted_(ACE_Time_Value::zero) 00076 , transport_priority_(priority) 00077 , shutdown_(false) 00078 , passive_setup_(false) 00079 { 00080 DBG_ENTRY_LVL("TcpConnection","TcpConnection",6); 00081 00082 // Open the reconnect task 00083 if (this->reconnect_task_.open()) { 00084 ACE_ERROR((LM_ERROR, 00085 ACE_TEXT("(%P|%t) ERROR: Reconnect task failed to open : %p\n"), 00086 ACE_TEXT("open"))); 00087 } 00088 00089 }
OpenDDS::DCPS::TcpConnection::~TcpConnection | ( | ) | [virtual] |
Definition at line 90 of file TcpConnection.cpp.
References OpenDDS::DCPS::QueueTaskBase< T >::close(), DBG_ENTRY_LVL, link_, reconnect_task_, and OpenDDS::DCPS::Transport_debug_level.
00091 { 00092 DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6); 00093 00094 // The Reconnect task belongs to the Connection object. 00095 // Cleanup before leaving the house. 00096 this->reconnect_task_.close(1); 00097 //this->reconnect_task_.wait (); 00098 00099 if (!this->link_.is_nil()) { 00100 if (Transport_debug_level > 5) { 00101 ACE_DEBUG((LM_DEBUG, 00102 ACE_TEXT("(%P|%t) TcpConnection::~TcpConnection: about to notify link[%@] connection deleted\n"), 00103 this->link_.in())); 00104 } 00105 this->link_->notify_connection_deleted(); 00106 } 00107 00108 }
int OpenDDS::DCPS::TcpConnection::active_establishment | ( | bool | initiate_connect = true |
) | [private] |
Attempt an active connection establishment to the remote address. The local address is sent to the remote (passive) side to identify ourselves to the remote side. Note this method is not thread protected. The caller need acquire the reconnect_lock_ before calling this function.
Definition at line 455 of file TcpConnection.cpp.
References OpenDDS::DCPS::DirectPriorityMapper::codepoint(), connected_, DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), link_, local_address_, remote_address_, set_sock_options(), tcp_config_, and VDBG.
Referenced by active_open().
00456 { 00457 DBG_ENTRY_LVL("TcpConnection","active_establishment",6); 00458 00459 // Safety check - This should not happen since is_connector_ defaults to 00460 // true and the role in a connection connector is not changed when reconnecting. 00461 if (this->is_connector_ == false) { 00462 ACE_ERROR_RETURN((LM_ERROR, 00463 "(%P|%t) ERROR: Failed to connect because it's previously an acceptor.\n"), 00464 -1); 00465 } 00466 00467 if (this->shutdown_) 00468 return -1; 00469 00470 // Now use a connector object to establish the connection. 00471 ACE_SOCK_Connector connector; 00472 00473 if (initiate_connect && connector.connect(this->peer(), remote_address_) != 0) { 00474 00475 ACE_ERROR_RETURN((LM_ERROR, 00476 ACE_TEXT("(%P|%t) ERROR: Failed to connect. %p\n%C"), 00477 ACE_TEXT("connect"), this->tcp_config_->dump_to_str().c_str()), 00478 -1); 00479 00480 } else { 00481 this->connected_ = true; 00482 const std::string remote_host = this->remote_address_.get_host_addr(); 00483 VDBG((LM_DEBUG, "(%P|%t) DBG: active_establishment(%C:%d->%C:%d)\n", 00484 this->local_address_.get_host_addr(), this->local_address_.get_port_number(), 00485 remote_host.c_str(), this->remote_address_.get_port_number())); 00486 } 00487 00488 // Set the DiffServ codepoint according to the priority value. 00489 DirectPriorityMapper mapper(this->transport_priority_); 00490 this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer()); 00491 00492 set_sock_options(tcp_config_.in()); 00493 00494 // In order to complete the connection establishment from the active 00495 // side, we need to tell the remote side about our public address. 00496 // It will use that as an "identifier" of sorts. To the other 00497 // (passive) side, our local_address that we send here will be known 00498 // as the remote_address. 00499 std::string address = tcp_config_->get_public_address(); 00500 ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1; 00501 00502 ACE_UINT32 nlen = htonl(len); 00503 00504 if (this->peer().send_n(&nlen, 00505 sizeof(ACE_UINT32)) == -1) { 00506 // TBD later - Anything we are supposed to do to close the connection. 00507 ACE_ERROR_RETURN((LM_ERROR, 00508 "(%P|%t) ERROR: Unable to send address string length to " 00509 "the passive side to complete the active connection " 00510 "establishment.\n"), 00511 -1); 00512 } 00513 00514 if (this->peer().send_n(address.c_str(), len) == -1) { 00515 // TBD later - Anything we are supposed to do to close the connection. 00516 ACE_ERROR_RETURN((LM_ERROR, 00517 "(%P|%t) ERROR: Unable to send our address to " 00518 "the passive side to complete the active connection " 00519 "establishment.\n"), 00520 -1); 00521 } 00522 00523 ACE_UINT32 npriority = htonl(this->transport_priority_); 00524 00525 if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) { 00526 // TBD later - Anything we are supposed to do to close the connection. 00527 ACE_ERROR_RETURN((LM_ERROR, 00528 "(%P|%t) ERROR: Unable to send publication priority to " 00529 "the passive side to complete the active connection " 00530 "establishment.\n"), 00531 -1); 00532 } 00533 00534 return 0; 00535 }
int OpenDDS::DCPS::TcpConnection::active_open | ( | ) |
Protocol setup (handshake) on the active side. The local address is sent to the remote (passive) side to identify ourselves to the remote side.
Definition at line 593 of file TcpConnection.cpp.
References active_establishment(), connected_, DBG_ENTRY_LVL, and reconnect_lock_.
Referenced by open().
00594 { 00595 DBG_ENTRY_LVL("TcpConnection","active_open",6); 00596 00597 GuardType guard(reconnect_lock_); 00598 00599 if (connected_.value()) { 00600 return 0; 00601 } 00602 00603 return active_establishment(false /* !initiate_connect */); 00604 }
int OpenDDS::DCPS::TcpConnection::active_reconnect_i | ( | ) | [private] |
Definition at line 680 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, INIT_STATE, last_reconnect_attempted_, link_, local_address_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, reconnect_delay(), reconnect_state_, OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, send_strategy_, tcp_config_, and VDBG.
00681 { 00682 DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6); 00683 00684 GuardType guard(this->reconnect_lock_); 00685 00686 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00687 "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %d\n", 00688 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(), 00689 this->local_address_.get_host_addr(), this->local_address_.get_port_number(), 00690 this->reconnect_state_)); 00691 if (DCPS_debug_level >= 1) { 00692 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::" 00693 "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %d\n", 00694 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(), 00695 this->local_address_.get_host_addr(), this->local_address_.get_port_number(), 00696 this->reconnect_state_)); 00697 } 00698 // We need reset the state to INIT_STATE if we are previously reconnected. 00699 // This would allow re-establishing connection after the re-established 00700 // connection lost again. 00701 if (ACE_OS::gettimeofday() - this->last_reconnect_attempted_ > reconnect_delay 00702 && this->reconnect_state_ == RECONNECTED_STATE) { 00703 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00704 "We are in RECONNECTED_STATE and now flip reconnect state to INIT_STATE.\n")); 00705 this->reconnect_state_ = INIT_STATE; 00706 } 00707 00708 if (this->reconnect_state_ == INIT_STATE) { 00709 // Suspend send once. 00710 this->send_strategy_->suspend_send(); 00711 00712 this->disconnect(); 00713 00714 if (this->tcp_config_->conn_retry_attempts_ > 0) { 00715 this->link_->notify(DataLink::DISCONNECTED); 00716 } 00717 00718 // else the conn_retry_attempts is 0 then we do not need this extra 00719 // notify_disconnected() since the user application should get the 00720 // notify_lost() without delay. 00721 00722 double retry_delay_msec = this->tcp_config_->conn_retry_initial_delay_; 00723 int ret = -1; 00724 00725 for (int i = 0; i < this->tcp_config_->conn_retry_attempts_; ++i) { 00726 ret = this->active_establishment(); 00727 00728 if (this->shutdown_) 00729 break; 00730 00731 if (ret == -1) { 00732 ACE_Time_Value delay_tv(((int)retry_delay_msec)/1000, 00733 ((int)retry_delay_msec)%1000*1000); 00734 ACE_OS::sleep(delay_tv); 00735 retry_delay_msec *= this->tcp_config_->conn_retry_backoff_multiplier_; 00736 00737 } else { 00738 break; 00739 } 00740 } 00741 00742 if (ret == -1) { 00743 if (this->tcp_config_->conn_retry_attempts_ > 0) { 00744 ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C:%d.\n", 00745 this->link_->get_transport_impl()->config()->name().c_str(), 00746 this->remote_address_.get_host_addr(), 00747 this->remote_address_.get_port_number())); 00748 00749 } else { 00750 ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C:%d.\n", 00751 this->link_->get_transport_impl()->config()->name().c_str(), 00752 this->remote_address_.get_host_addr(), 00753 this->remote_address_.get_port_number())); 00754 } 00755 00756 this->reconnect_state_ = LOST_STATE; 00757 this->link_->notify(DataLink::LOST); 00758 this->send_strategy_->terminate_send(); 00759 00760 } else { 00761 ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C:%d.\n", 00762 this->link_->get_transport_impl()->config()->name().c_str(), 00763 this->remote_address_.get_host_addr(), 00764 this->remote_address_.get_port_number())); 00765 if (this->receive_strategy_->get_reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) { 00766 ACE_ERROR_RETURN((LM_ERROR, 00767 "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_i() can't register " 00768 "with reactor %X %p\n", this, ACE_TEXT("register_handler")), 00769 -1); 00770 } 00771 this->reconnect_state_ = RECONNECTED_STATE; 00772 this->link_->notify(DataLink::RECONNECTED); 00773 this->send_strategy_->resume_send(); 00774 } 00775 00776 this->last_reconnect_attempted_ = ACE_OS::gettimeofday(); 00777 } 00778 00779 return this->reconnect_state_ == LOST_STATE ? -1 : 0; 00780 }
int OpenDDS::DCPS::TcpConnection::active_reconnect_on_new_association | ( | ) | [private] |
Definition at line 607 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, and INIT_STATE.
Referenced by reconnect().
00608 { 00609 DBG_ENTRY_LVL("TcpConnection","active_reconnect_on_new_association",6); 00610 GuardType guard(this->reconnect_lock_); 00611 00612 if (this->connected_ == true) 00613 return 0; 00614 00615 else if (this->active_establishment() == 0) { 00616 this->reconnect_state_ = INIT_STATE; 00617 this->send_strategy_->resume_send(); 00618 return 0; 00619 } 00620 00621 return -1; 00622 }
int OpenDDS::DCPS::TcpConnection::close | ( | u_long | ) | [virtual] |
Definition at line 355 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, and disconnect().
00356 { 00357 DBG_ENTRY_LVL("TcpConnection","close",6); 00358 00359 // TBD SOON - Find out exactly when close() is called. 00360 // I have no clue when and who might call this. 00361 00362 if (!this->send_strategy_.is_nil()) 00363 this->send_strategy_->terminate_send(); 00364 00365 this->disconnect(); 00366 00367 return 0; 00368 }
void OpenDDS::DCPS::TcpConnection::disconnect | ( | ) |
This will be called by the DataLink (that "owns" us) when the TcpTransport has been told to shutdown(), or when the DataLink finds itself no longer needed, and is "self-releasing".
Definition at line 111 of file TcpConnection.cpp.
References connected_, and DBG_ENTRY_LVL.
Referenced by active_reconnect_i(), close(), handle_close(), and notify_lost_on_backpressure_timeout().
00112 { 00113 DBG_ENTRY_LVL("TcpConnection","disconnect",6); 00114 this->connected_ = false; 00115 00116 if (!this->receive_strategy_.is_nil()) { 00117 this->receive_strategy_->get_reactor()->remove_handler(this, 00118 ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL); 00119 } 00120 00121 this->peer().close(); 00122 00123 }
ACE_INLINE ACE_INET_Addr OpenDDS::DCPS::TcpConnection::get_remote_address | ( | ) |
Definition at line 60 of file TcpConnection.inl.
References remote_address_.
00061 { 00062 return this->remote_address_; 00063 }
int OpenDDS::DCPS::TcpConnection::handle_close | ( | ACE_HANDLE | , | |
ACE_Reactor_Mask | ||||
) | [virtual] |
Definition at line 371 of file TcpConnection.cpp.
References OpenDDS::DCPS::QueueTaskBase< T >::add(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, OpenDDS::DCPS::DO_RECONNECT, OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, receive_strategy_, reconnect_task_, remote_address_, and send_strategy_.
00372 { 00373 DBG_ENTRY_LVL("TcpConnection","handle_close",6); 00374 00375 if (DCPS_debug_level >= 1) { 00376 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C:%d.\n", 00377 this->link_->get_transport_impl()->config()->name().c_str(), 00378 this->remote_address_.get_host_addr(), 00379 this->remote_address_.get_port_number())); 00380 } 00381 00382 bool graceful = !this->receive_strategy_.is_nil() && this->receive_strategy_->gracefully_disconnected(); 00383 00384 if (!this->send_strategy_.is_nil()) { 00385 if (graceful) { 00386 this->send_strategy_->terminate_send(); 00387 } else { 00388 this->send_strategy_->suspend_send(); 00389 } 00390 } 00391 00392 this->disconnect(); 00393 00394 if (graceful) { 00395 this->link_->notify(DataLink::DISCONNECTED); 00396 } else { 00397 ReconnectOpType op = DO_RECONNECT; 00398 this->reconnect_task_.add(op); 00399 } 00400 00401 return 0; 00402 }
int OpenDDS::DCPS::TcpConnection::handle_input | ( | ACE_HANDLE | ) | [virtual] |
We pass this "event" along to the receive_strategy.
Definition at line 312 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, handle_setup_input(), OpenDDS::DCPS::RcHandle< T >::is_nil(), passive_setup_, and receive_strategy_.
00313 { 00314 DBG_ENTRY_LVL("TcpConnection","handle_input",6); 00315 00316 if (passive_setup_) { 00317 return handle_setup_input(fd); 00318 } 00319 00320 if (receive_strategy_.is_nil()) { 00321 return 0; 00322 } 00323 00324 return receive_strategy_->handle_dds_input(fd); 00325 }
int OpenDDS::DCPS::TcpConnection::handle_output | ( | ACE_HANDLE | ) | [virtual] |
Handle back pressure when sending.
Definition at line 328 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, id_, send_strategy_, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO.
00329 { 00330 DBG_ENTRY_LVL("TcpConnection","handle_output",6); 00331 00332 if (!this->send_strategy_.is_nil()) { 00333 if (DCPS_debug_level > 9) { 00334 ACE_DEBUG((LM_DEBUG, 00335 ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ") 00336 ACE_TEXT("sending queued data.\n"), 00337 id_)); 00338 } 00339 00340 // Process data to be sent from the queue. 00341 if (ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO 00342 != send_strategy_->perform_work()) { 00343 00344 // Stop handling output ready events when there is nothing to output. 00345 // N.B. This calls back into the reactor. Is the reactor lock 00346 // recursive? 00347 send_strategy_->schedule_output(); 00348 } 00349 } 00350 00351 return 0; 00352 }
int OpenDDS::DCPS::TcpConnection::handle_setup_input | ( | ACE_HANDLE | h | ) | [private] |
During the connection setup phase, the passive side sets passive_setup_, redirecting handle_input() events here (there is no recv strategy yet).
Definition at line 240 of file TcpConnection.cpp.
References connected_, local_address_, passive_setup_, passive_setup_buffer_, reconnect_state_, remote_address_, transport_during_setup_, transport_priority_, VDBG, and VDBG_LVL.
Referenced by handle_input().
00241 { 00242 const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(), 00243 passive_setup_buffer_.space(), 00244 &ACE_Time_Value::zero); 00245 00246 if (ret < 0 && errno == ETIME) { 00247 return 0; 00248 } 00249 00250 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input %@ " 00251 "recv returned %b %m.\n", this, ret), 4); 00252 00253 if (ret <= 0) { 00254 return -1; 00255 } 00256 00257 passive_setup_buffer_.wr_ptr(ret); 00258 // Parse the setup message: <len><addr><prio> 00259 // len and prio are network order 32-bit ints 00260 // addr is a string of length len, including null 00261 ACE_UINT32 nlen = 0; 00262 00263 if (passive_setup_buffer_.length() >= sizeof(nlen)) { 00264 00265 ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen)); 00266 passive_setup_buffer_.rd_ptr(sizeof(nlen)); 00267 ACE_UINT32 hlen = ntohl(nlen); 00268 passive_setup_buffer_.size(hlen + 2 * sizeof(nlen)); 00269 00270 ACE_UINT32 nprio = 0; 00271 00272 if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) { 00273 00274 const std::string bufstr(passive_setup_buffer_.rd_ptr()); 00275 const NetworkAddress network_order_address(bufstr); 00276 network_order_address.to_addr(remote_address_); 00277 00278 ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio)); 00279 transport_priority_ = ntohl(nprio); 00280 00281 passive_setup_buffer_.reset(); 00282 passive_setup_ = false; 00283 00284 VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input " 00285 "%@ %C:%d->%C:%d, priority==%d, reconnect_state = %d\n", this, 00286 remote_address_.get_host_addr(), remote_address_.get_port_number(), 00287 local_address_.get_host_addr(), local_address_.get_port_number(), 00288 transport_priority_, reconnect_state_)); 00289 00290 // remove from reactor, normal recv strategy setup will add us back 00291 if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) { 00292 VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input " 00293 "remove_handler failed %m.\n")); 00294 } 00295 00296 const TcpConnection_rch self(this, false); 00297 00298 transport_during_setup_->passive_connection(remote_address_, self); 00299 transport_during_setup_ = 0; 00300 connected_ = true; 00301 00302 return 0; 00303 } 00304 } 00305 00306 passive_setup_buffer_.rd_ptr(passive_setup_buffer_.base()); 00307 00308 return 0; 00309 }
int OpenDDS::DCPS::TcpConnection::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) |
A timer is scheduled on acceptor side to check if a new connection is accepted after the connection is lost.
Definition at line 785 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), DBG_ENTRY_LVL, link_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_state_, RECONNECTED_STATE, remote_address_, and tear_link().
00787 { 00788 DBG_ENTRY_LVL("TcpConnection","handle_timeout",6); 00789 00790 GuardType guard(this->reconnect_lock_); 00791 00792 switch (this->reconnect_state_) { 00793 case PASSIVE_WAITING_STATE: { 00794 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C:%d.\n", 00795 this->link_->get_transport_impl()->config()->name().c_str(), 00796 this->remote_address_.get_host_addr(), 00797 this->remote_address_.get_port_number())); 00798 00799 this->reconnect_state_ = PASSIVE_TIMEOUT_CALLED_STATE; 00800 // We stay in PASSIVE_TIMEOUT_CALLED_STATE indicates there is no new connection. 00801 // Now we need declare the connection is lost. 00802 this->link_->notify(DataLink::LOST); 00803 00804 // The handle_timeout may be called after the connection is re-established 00805 // and the send strategy of this old connection is reset to nil. 00806 if (!this->send_strategy_.is_nil()) 00807 this->send_strategy_->terminate_send(); 00808 00809 this->reconnect_state_ = LOST_STATE; 00810 00811 this->tear_link(); 00812 00813 } 00814 break; 00815 00816 case RECONNECTED_STATE: 00817 // reconnected successfully. 00818 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C:%d.\n", 00819 this->link_->get_transport_impl()->config()->name().c_str(), 00820 this->remote_address_.get_host_addr(), 00821 this->remote_address_.get_port_number())); 00822 break; 00823 00824 default : 00825 ACE_ERROR((LM_ERROR, 00826 ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ") 00827 ACE_TEXT(" unknown state or it should not be in state=%d \n"), this->reconnect_state_)); 00828 break; 00829 } 00830 00831 // Take back the "copy" we gave to reactor when we schedule the timer. 00832 this->_remove_ref(); 00833 00834 return 0; 00835 }
ACE_INLINE std::size_t & OpenDDS::DCPS::TcpConnection::id | ( | ) |
ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connected | ( | ) | const |
Return true if connection is connected.
Definition at line 46 of file TcpConnection.inl.
References connected_.
00047 { 00048 return this->connected_.value(); 00049 }
ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connector | ( | ) | const |
Return true if the object represents the connector side, otherwise it's the acceptor side. The acceptor/connector role is not changed when re-establishing the connection.
Definition at line 40 of file TcpConnection.inl.
References is_connector_.
00041 { 00042 return this->is_connector_; 00043 }
void OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout | ( | ) |
This function is called when the backpressure occurs and timed out after "max_output_pause_period". The lost connection notification should be sent and the connection needs be closed since we declared it as a "lost" connection.
Definition at line 929 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, disconnect(), INIT_STATE, link_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, reconnect_state_, and send_strategy_.
00930 { 00931 DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6); 00932 bool notify_lost = false; 00933 { 00934 GuardType guard(this->reconnect_lock_); 00935 00936 if (this->reconnect_state_ == INIT_STATE) { 00937 this->reconnect_state_ = LOST_STATE; 00938 notify_lost = true; 00939 00940 this->disconnect(); 00941 } 00942 } 00943 00944 if (notify_lost) { 00945 this->link_->notify(DataLink::LOST); 00946 this->send_strategy_->terminate_send(); 00947 } 00948 00949 }
int OpenDDS::DCPS::TcpConnection::open | ( | void * | arg | ) | [virtual] |
Definition at line 150 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject< T >::_add_ref(), active_open(), DBG_ENTRY_LVL, OpenDDS::DCPS::TcpAcceptor::get_configuration(), OpenDDS::DCPS::RcHandle< T >::in(), is_connector_, OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, local_address_, passive_setup_, passive_setup_buffer_, remote_address_, set_sock_options(), tcp_config_, OpenDDS::DCPS::TcpAcceptor::transport(), transport_during_setup_, transport_priority_, and VDBG_LVL.
00151 { 00152 DBG_ENTRY_LVL("TcpConnection","open",6); 00153 00154 if (is_connector_) { 00155 00156 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open active.\n"), 2); 00157 // Take over the refcount from TcpTransport::connect_datalink(). 00158 const TcpConnection_rch self(this); 00159 const TcpTransport_rch transport = link_->get_transport_impl(); 00160 00161 const bool is_loop(local_address_ == remote_address_); 00162 const PriorityKey key(transport_priority_, remote_address_, 00163 is_loop, false /* !active */); 00164 00165 int active_open_ = active_open(); 00166 00167 int connect_tcp_datalink_ = transport->connect_tcp_datalink(link_, self); 00168 00169 if (active_open_ == -1 || connect_tcp_datalink_ == -1) { 00170 // if (active_open() == -1 || 00171 // transport->connect_tcp_datalink(link_, self) == -1) { 00172 00173 transport->async_connect_failed(key); 00174 00175 return -1; 00176 } 00177 00178 return 0; 00179 } 00180 00181 // The passed-in arg is really the acceptor object that created this 00182 // TcpConnection object, and is also the caller of this open() 00183 // method. We need to cast the arg to the TcpAcceptor* type. 00184 TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg); 00185 00186 if (acceptor == 0) { 00187 // The cast failed. 00188 ACE_ERROR_RETURN((LM_ERROR, 00189 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ") 00190 ACE_TEXT("failed to cast void* arg to ") 00191 ACE_TEXT("TcpAcceptor* type.\n")), 00192 -1); 00193 } 00194 00195 // Now we need to ask the TcpAcceptor object to provide us with 00196 // a pointer to the TcpTransport object that "owns" the acceptor. 00197 TcpTransport_rch transport = acceptor->transport(); 00198 00199 if (transport.is_nil()) { 00200 // The acceptor gave us a nil transport (smart) pointer. 00201 ACE_ERROR_RETURN((LM_ERROR, 00202 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ") 00203 ACE_TEXT("acceptor's transport is nil.\n")), 00204 -1); 00205 } 00206 00207 TcpInst* tcp_config = acceptor->get_configuration(); 00208 00209 // Keep a "copy" of the reference to TcpInst object 00210 // for ourselves. 00211 tcp_config->_add_ref(); 00212 tcp_config_ = tcp_config; 00213 local_address_ = tcp_config_->local_address(); 00214 00215 set_sock_options(tcp_config_.in()); 00216 00217 // We expect that the active side of the connection (the remote side 00218 // in this case) will supply its listening ACE_INET_Addr as the first 00219 // message it sends to the socket. This is a one-way connection 00220 // establishment protocol message. 00221 passive_setup_ = true; 00222 transport_during_setup_ = transport; 00223 passive_setup_buffer_.size(sizeof(ACE_UINT32)); 00224 00225 if (reactor()->register_handler(this, READ_MASK) == -1) { 00226 ACE_ERROR_RETURN((LM_ERROR, 00227 ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ") 00228 ACE_TEXT("unable to register with the reactor.%p\n"), 00229 ACE_TEXT("register_handler")), 00230 -1); 00231 } 00232 00233 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open passive handle=%d.\n", 00234 static_cast<int>(intptr_t(get_handle()))), 2); 00235 00236 return 0; 00237 }
int OpenDDS::DCPS::TcpConnection::passive_reconnect_i | ( | ) | [private] |
Definition at line 628 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), connected_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataLink::DISCONNECTED, INIT_STATE, link_, passive_reconnect_timer_id_, PASSIVE_WAITING_STATE, and reconnect_state_.
00629 { 00630 DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6); 00631 GuardType guard(this->reconnect_lock_); 00632 00633 // The passive_reconnect_timer_id_ is used as flag to allow the timer scheduled just once. 00634 if (this->reconnect_state_ == INIT_STATE) { 00635 // Mark the connection lost since the recv/send just failed. 00636 this->connected_ = false; 00637 00638 if (this->tcp_config_->passive_reconnect_duration_ == 0) 00639 return -1; 00640 00641 ACE_Time_Value timeout(this->tcp_config_->passive_reconnect_duration_/1000, 00642 this->tcp_config_->passive_reconnect_duration_%1000 * 1000); 00643 this->reconnect_state_ = PASSIVE_WAITING_STATE; 00644 this->link_->notify(DataLink::DISCONNECTED); 00645 00646 // It is possible that the passive reconnect is called after the new connection 00647 // is accepted and the receive_strategy of this old connection is reset to nil. 00648 if (!this->receive_strategy_.is_nil()) { 00649 TcpReceiveStrategy* rs 00650 = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in()); 00651 00652 // Give a copy to reactor. 00653 this->_add_ref(); 00654 this->passive_reconnect_timer_id_ = rs->get_reactor()->schedule_timer(this, 0, timeout); 00655 00656 if (this->passive_reconnect_timer_id_ == -1) { 00657 this->_remove_ref(); 00658 ACE_ERROR_RETURN((LM_ERROR, 00659 ACE_TEXT("(%P|%t) ERROR: TcpConnection::passive_reconnect_i") 00660 ACE_TEXT(", %p.\n"), ACE_TEXT("schedule_timer")), 00661 -1); 00662 } 00663 } 00664 } 00665 00666 return 0; 00667 }
int OpenDDS::DCPS::TcpConnection::reconnect | ( | bool | on_new_association = false |
) |
This function is called to re-establish the connection. If this object is the connector side of the connection then it tries to reconnect to the remote, if it's the acceptor side of the connection then it schedules a timer to check if it passively accepted a connection from remote. The on_new_association true indicates this is called when the connection is previous lost and new association is added. The connector side needs to try to actively reconnect to remote.
Definition at line 545 of file TcpConnection.cpp.
References active_reconnect_on_new_association(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, and remote_address_.
00546 { 00547 DBG_ENTRY_LVL("TcpConnection","reconnect",6); 00548 if (DCPS_debug_level >= 1) { 00549 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect initiated on transport: %C to %C:%d.\n", 00550 this->link_->get_transport_impl()->config()->name().c_str(), 00551 this->remote_address_.get_host_addr(), 00552 this->remote_address_.get_port_number())); 00553 } 00554 00555 if (on_new_association) { 00556 if (DCPS_debug_level >= 1) { 00557 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect on new association\n")); 00558 } 00559 return this->active_reconnect_on_new_association(); 00560 } 00561 00562 // If on_new_association is false, it's called by the reconnect task. 00563 // We need make sure if the link release is pending. If does, do 00564 // not try to reconnect. 00565 else if (!this->link_->is_release_pending()) { 00566 if (DCPS_debug_level >= 1) { 00567 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect release not currently pending\n")); 00568 } 00569 // Try to reconnect if it's connector previously. 00570 if (this->is_connector_ && this->active_reconnect_i() == -1) { 00571 if (DCPS_debug_level >= 1) { 00572 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is connector but active_reconnect_i failed\n")); 00573 } 00574 return -1; 00575 } 00576 00577 // Schedule a timer to see if a incoming connection is accepted when timeout. 00578 else if (!this->is_connector_ && this->passive_reconnect_i() == -1) { 00579 if (DCPS_debug_level >= 1) { 00580 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is acceptor but passive_reconnect_i failed\n")); 00581 } 00582 return -1; 00583 } 00584 00585 } 00586 if (DCPS_debug_level >= 1) { 00587 ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect returning 0\n")); 00588 } 00589 return 0; 00590 }
void OpenDDS::DCPS::TcpConnection::relink_from_recv | ( | bool | do_suspend | ) |
Reconnect initiated by receive strategy.
This is called by TcpReceiveStrategy when a disconnect is detected. It simply suspends any sends and lets the handle_close() handle the reconnect logic.
Definition at line 971 of file TcpConnection.cpp.
References DBG_ENTRY_LVL.
00972 { 00973 DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6); 00974 00975 if (do_suspend && !this->send_strategy_.is_nil()) 00976 this->send_strategy_->suspend_send(); 00977 }
void OpenDDS::DCPS::TcpConnection::relink_from_send | ( | bool | do_suspend | ) |
Reconnect initiated by send strategy.
This is called by TcpSendStrategy when a send fails and a reconnect should be initiated. This method suspends any sends and kicks the reconnect thread into action.
Definition at line 956 of file TcpConnection.cpp.
References OpenDDS::DCPS::QueueTaskBase< T >::add(), DBG_ENTRY_LVL, OpenDDS::DCPS::DO_RECONNECT, and reconnect_task_.
00957 { 00958 DBG_ENTRY_LVL("TcpConnection","relink_from_send",6); 00959 00960 if (do_suspend && !this->send_strategy_.is_nil()) 00961 this->send_strategy_->suspend_send(); 00962 00963 ReconnectOpType op = DO_RECONNECT; 00964 this->reconnect_task_.add(op); 00965 }
ACE_INLINE void OpenDDS::DCPS::TcpConnection::remove_receive_strategy | ( | ) |
Definition at line 24 of file TcpConnection.inl.
References DBG_ENTRY_LVL, and receive_strategy_.
00025 { 00026 DBG_ENTRY_LVL("TcpConnection","remove_receive_strategy",6); 00027 00028 this->receive_strategy_ = 0; 00029 }
ACE_INLINE void OpenDDS::DCPS::TcpConnection::remove_send_strategy | ( | ) |
Definition at line 32 of file TcpConnection.inl.
References DBG_ENTRY_LVL, and send_strategy_.
00033 { 00034 DBG_ENTRY_LVL("TcpConnection","remove_send_strategy",6); 00035 00036 this->send_strategy_ = 0; 00037 }
ACE_INLINE void OpenDDS::DCPS::TcpConnection::set_datalink | ( | TcpDataLink * | link | ) |
Cache the reference to the datalink object for lost connection callbacks.
Definition at line 52 of file TcpConnection.inl.
References OpenDDS::DCPS::RcObject< T >::_add_ref(), and link_.
00053 { 00054 // Keep a "copy" of the reference to the data link for ourselves. 00055 link->_add_ref(); 00056 this->link_ = link; 00057 }
void OpenDDS::DCPS::TcpConnection::set_receive_strategy | ( | TcpReceiveStrategy * | receive_strategy | ) |
Definition at line 129 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject< T >::_add_ref(), and DBG_ENTRY_LVL.
00130 { 00131 DBG_ENTRY_LVL("TcpConnection","set_receive_strategy",6); 00132 00133 // Make a "copy" for ourselves 00134 receive_strategy->_add_ref(); 00135 this->receive_strategy_ = receive_strategy; 00136 }
void OpenDDS::DCPS::TcpConnection::set_send_strategy | ( | TcpSendStrategy * | send_strategy | ) |
Give a "copy" of the TcpSendStrategy object to this connection object.
Definition at line 140 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject< T >::_add_ref(), and DBG_ENTRY_LVL.
00141 { 00142 DBG_ENTRY_LVL("TcpConnection","set_send_strategy",6); 00143 00144 // Make a "copy" for ourselves 00145 send_strategy->_add_ref(); 00146 this->send_strategy_ = send_strategy; 00147 }
void OpenDDS::DCPS::TcpConnection::set_sock_options | ( | TcpInst * | tcp_config | ) |
Definition at line 405 of file TcpConnection.cpp.
References OpenDDS::DCPS::TcpInst::enable_nagle_algorithm_.
Referenced by active_establishment(), and open().
00406 { 00407 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ) 00408 int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; 00409 int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ; 00410 //ACE_SOCK_Stream sock = ACE_static_cast(ACE_SOCK_Stream, this->peer() ); 00411 # if !defined (ACE_LACKS_SOCKET_BUFSIZ) 00412 00413 // A little screwy double negative logic: disabling nagle involves 00414 // enabling TCP_NODELAY 00415 int opt = (tcp_config->enable_nagle_algorithm_ == false); 00416 00417 if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) { 00418 ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n")); 00419 } 00420 00421 if (this->peer().set_option(SOL_SOCKET, 00422 SO_SNDBUF, 00423 (void *) &snd_size, 00424 sizeof(snd_size)) == -1 00425 && errno != ENOTSUP) { 00426 ACE_ERROR((LM_ERROR, 00427 "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n", 00428 snd_size)); 00429 return; 00430 } 00431 00432 if (this->peer().set_option(SOL_SOCKET, 00433 SO_RCVBUF, 00434 (void *) &rcv_size, 00435 sizeof(int)) == -1 00436 && errno != ENOTSUP) { 00437 ACE_ERROR((LM_ERROR, 00438 "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m \n", 00439 rcv_size)); 00440 return; 00441 } 00442 00443 # else 00444 ACE_UNUSED_ARG(tcp_config); 00445 ACE_UNUSED_ARG(snd_size); 00446 ACE_UNUSED_ARG(rcv_size); 00447 # endif /* !ACE_LACKS_SOCKET_BUFSIZ */ 00448 00449 #else 00450 ACE_UNUSED_ARG(tcp_config); 00451 #endif /* !ACE_DEFAULT_MAX_SOCKET_BUFSIZ */ 00452 }
void OpenDDS::DCPS::TcpConnection::shutdown | ( | ) |
Definition at line 988 of file TcpConnection.cpp.
References OpenDDS::DCPS::QueueTaskBase< T >::close(), DBG_ENTRY_LVL, reconnect_task_, and shutdown_.
00989 { 00990 DBG_ENTRY_LVL("TcpConnection","shutdown",6); 00991 this->shutdown_ = true; 00992 00993 this->reconnect_task_.close(1); 00994 00995 }
bool OpenDDS::DCPS::TcpConnection::tear_link | ( | ) |
Called by the reconnect task to inform us that the link & any associated data can be torn down. This call is done with no DCPS/transport locks held.
Definition at line 980 of file TcpConnection.cpp.
References DBG_ENTRY_LVL, and link_.
Referenced by handle_timeout().
00981 { 00982 DBG_ENTRY_LVL("TcpConnection","tear_link",6); 00983 00984 return this->link_->release_resources(); 00985 }
void OpenDDS::DCPS::TcpConnection::transfer | ( | TcpConnection * | connection | ) |
This object would be "old" connection object and the provided is the new connection object. The "old" connection object will copy its states to to the "new" connection object. This is called by the TcpDataLink when a new connection is accepted (with a new TcpConnection object). We need make the state in "new" connection object consistent with the "old" connection object.
Definition at line 844 of file TcpConnection.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), OpenDDS::DCPS::QueueTaskBase< T >::close(), DBG_ENTRY_LVL, OpenDDS::DCPS::TcpReceiveStrategy::get_reactor(), OpenDDS::DCPS::RcHandle< T >::in(), INIT_STATE, is_connector_, link_, local_address_, LOST_STATE, passive_reconnect_timer_id_, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, receive_strategy_, reconnect_state_, reconnect_task_, OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, send_strategy_, tcp_config_, and VDBG.
00845 { 00846 DBG_ENTRY_LVL("TcpConnection","transfer",6); 00847 00848 GuardType guard(this->reconnect_lock_); 00849 00850 bool notify_reconnect = false; 00851 00852 switch (this->reconnect_state_) { 00853 case INIT_STATE: 00854 // We have not detected the lost connection and the peer is faster than us and 00855 // re-established the connection. so do not notify reconnected. 00856 break; 00857 00858 case LOST_STATE: 00859 00860 // The reconnect timed out. 00861 case PASSIVE_TIMEOUT_CALLED_STATE: 00862 // TODO: If the handle_timeout is called before the old connection 00863 // transfer its state to new connection then should we disconnect 00864 // the new connection or keep it alive ? 00865 // I think we should keep the connection, the user will get a 00866 // lost connection notification and then a reconnected notification. 00867 notify_reconnect = true; 00868 break; 00869 00870 case PASSIVE_WAITING_STATE: { 00871 TcpReceiveStrategy* rs 00872 = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in()); 00873 00874 // Cancel the timer since we got new connection. 00875 if (rs->get_reactor()->cancel_timer(this) == -1) { 00876 ACE_ERROR((LM_ERROR, 00877 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ") 00878 ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer"))); 00879 00880 } else 00881 passive_reconnect_timer_id_ = -1; 00882 00883 this->_remove_ref(); 00884 notify_reconnect = true; 00885 } 00886 break; 00887 00888 default : 00889 ACE_ERROR((LM_ERROR, 00890 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ") 00891 ACE_TEXT(" unknown state or it should not be in state=%d \n"), this->reconnect_state_)); 00892 break; 00893 } 00894 00895 // Verify if this acceptor side. 00896 if (this->is_connector_ || connection->is_connector_) { 00897 ACE_ERROR((LM_ERROR, 00898 ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ") 00899 ACE_TEXT(" should NOT be called by the connector side \n"))); 00900 } 00901 00902 this->reconnect_task_.close(1); 00903 connection->receive_strategy_ = this->receive_strategy_; 00904 connection->send_strategy_ = this->send_strategy_; 00905 connection->remote_address_ = this->remote_address_; 00906 connection->local_address_ = this->local_address_; 00907 connection->tcp_config_ = this->tcp_config_; 00908 connection->link_ = this->link_; 00909 00910 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00911 "transfer(%C:%d->%C:%d) passive reconnected. new con %@ " 00912 " old con %@ \n", 00913 this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(), 00914 this->local_address_.get_host_addr(), this->local_address_.get_port_number(), 00915 connection, this)); 00916 00917 if (notify_reconnect) { 00918 this->reconnect_state_ = RECONNECTED_STATE; 00919 this->link_->notify(DataLink::RECONNECTED); 00920 } 00921 00922 }
ACE_INLINE OpenDDS::DCPS::Priority OpenDDS::DCPS::TcpConnection::transport_priority | ( | ) | const |
Definition at line 74 of file TcpConnection.inl.
References transport_priority_.
00075 { 00076 return this->transport_priority_; 00077 }
ACE_INLINE OpenDDS::DCPS::Priority & OpenDDS::DCPS::TcpConnection::transport_priority | ( | ) |
Access TRANSPORT_PRIORITY.value policy value if set.
Definition at line 67 of file TcpConnection.inl.
References transport_priority_.
00068 { 00069 return this->transport_priority_; 00070 }
ACE_Atomic_Op<ACE_SYNCH_MUTEX, bool> OpenDDS::DCPS::TcpConnection::connected_ [private] |
Flag indicates if connected or disconneted. It's set to true when actively connecting or passively acepting succeeds and set to false whenever the peer stream is closed.
Definition at line 163 of file TcpConnection.h.
Referenced by active_establishment(), active_open(), disconnect(), handle_setup_input(), is_connected(), and passive_reconnect_i().
std::size_t OpenDDS::DCPS::TcpConnection::id_ [private] |
Small unique identifying value.
Definition at line 215 of file TcpConnection.h.
Referenced by handle_output(), and id().
bool OpenDDS::DCPS::TcpConnection::is_connector_ [private] |
Flag indicate this connection object is the connector or acceptor.
Definition at line 166 of file TcpConnection.h.
Referenced by is_connector(), open(), and transfer().
ACE_Time_Value OpenDDS::DCPS::TcpConnection::last_reconnect_attempted_ [private] |
Last time the connection is re-established.
Definition at line 202 of file TcpConnection.h.
Referenced by active_reconnect_i().
Datalink object which is needed for connection lost callback.
Definition at line 184 of file TcpConnection.h.
Referenced by active_establishment(), active_reconnect_i(), handle_close(), handle_timeout(), notify_lost_on_backpressure_timeout(), open(), passive_reconnect_i(), set_datalink(), tear_link(), transfer(), and ~TcpConnection().
ACE_INET_Addr OpenDDS::DCPS::TcpConnection::local_address_ [private] |
Local address.
Definition at line 178 of file TcpConnection.h.
Referenced by active_establishment(), active_reconnect_i(), handle_setup_input(), open(), and transfer().
The id of the scheduled timer. The timer is scheduled to check if the connection is re-established during the passive_reconnect_duration_. This id controls that the timer is just scheduled once when there are multiple threads detect the lost connection.
Definition at line 190 of file TcpConnection.h.
Referenced by passive_reconnect_i(), and transfer().
bool OpenDDS::DCPS::TcpConnection::passive_setup_ [private] |
Definition at line 210 of file TcpConnection.h.
Referenced by handle_input(), handle_setup_input(), and open().
ACE_Message_Block OpenDDS::DCPS::TcpConnection::passive_setup_buffer_ [private] |
Reference to the receiving strategy.
Definition at line 169 of file TcpConnection.h.
Referenced by handle_close(), handle_input(), remove_receive_strategy(), and transfer().
Lock to avoid the reconnect() called multiple times when both send() and recv() fail.
Definition at line 158 of file TcpConnection.h.
Referenced by active_open().
The state indicates each step of the reconnecting.
Definition at line 199 of file TcpConnection.h.
Referenced by active_reconnect_i(), handle_setup_input(), handle_timeout(), notify_lost_on_backpressure_timeout(), passive_reconnect_i(), and transfer().
The task to do the reconnecting. TODO: We might need reuse the PerConnectionSynch thread to do the reconnecting or create the reconnect task when we need reconnect.
Definition at line 196 of file TcpConnection.h.
Referenced by handle_close(), relink_from_send(), shutdown(), transfer(), and ~TcpConnection().
ACE_INET_Addr OpenDDS::DCPS::TcpConnection::remote_address_ [private] |
Remote address.
Definition at line 175 of file TcpConnection.h.
Referenced by active_establishment(), active_reconnect_i(), get_remote_address(), handle_close(), handle_setup_input(), handle_timeout(), open(), reconnect(), and transfer().
Reference to the send strategy.
Definition at line 172 of file TcpConnection.h.
Referenced by active_reconnect_i(), handle_close(), handle_output(), notify_lost_on_backpressure_timeout(), remove_send_strategy(), and transfer().
bool OpenDDS::DCPS::TcpConnection::shutdown_ [private] |
The configuration used by this connection.
Definition at line 181 of file TcpConnection.h.
Referenced by active_establishment(), active_reconnect_i(), open(), and transfer().
TRANSPORT_PRIORITY.value policy value.
Definition at line 205 of file TcpConnection.h.
Referenced by handle_setup_input(), open(), and transport_priority().