OpenDDS::DCPS::TcpConnection Class Reference

#include <TcpConnection.h>

Inheritance diagram for OpenDDS::DCPS::TcpConnection:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpConnection:
Collaboration graph
[legend]

List of all members.

Public Types

enum  ReconnectState {
  INIT_STATE, LOST_STATE, RECONNECTED_STATE, PASSIVE_WAITING_STATE,
  PASSIVE_TIMEOUT_CALLED_STATE
}
 

States are used during reconnecting.

More...

Public Member Functions

 TcpConnection ()
 Passive side constructor (acceptor).
 TcpConnection (const ACE_INET_Addr &remote_address, Priority priority, const TcpInst &config)
 Active side constructor (connector).
virtual ~TcpConnection ()
std::size_t & id ()
int active_open ()
void disconnect ()
virtual int open (void *arg)
TcpSendStrategy_rch send_strategy ()
TcpReceiveStrategy_rch receive_strategy ()
virtual int handle_input (ACE_HANDLE)
 We pass this "event" along to the receive_strategy.
virtual int handle_output (ACE_HANDLE)
 Handle back pressure when sending.
virtual int close (u_long)
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
void set_sock_options (const TcpInst *tcp_config)
int reconnect (bool on_new_association=false)
bool is_connector () const
bool is_connected () const
 Return true if connection is connected.
void transfer (TcpConnection *connection)
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
void set_datalink (const TcpDataLink_rch &link)
void notify_lost_on_backpressure_timeout ()
ACE_INET_Addr get_remote_address ()
void relink_from_send (bool do_suspend)
 Reconnect initiated by send strategy.
void relink_from_recv (bool do_suspend)
 Reconnect initiated by receive strategy.
bool tear_link ()
void shutdown ()
Prioritytransport_priority ()
 Access TRANSPORT_PRIORITY.value policy value if set.
Priority transport_priority () const
virtual
ACE_Event_Handler::Reference_Count 
add_reference ()
virtual
ACE_Event_Handler::Reference_Count 
remove_reference ()

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType

Private Member Functions

int active_establishment (bool initiate_connect=true)
int active_reconnect_i ()
int passive_reconnect_i ()
int active_reconnect_on_new_association ()
int handle_setup_input (ACE_HANDLE h)
const std::string & config_name () const
void spawn_reconnect_thread ()
OPENDDS_STRING reconnect_state_string () const
 Get name of the current reconnect state as a string.

Static Private Member Functions

static ACE_THR_FUNC_RETURN reconnect_thread_fun (void *conn)

Private Attributes

LockType reconnect_lock_
ACE_Atomic_Op< ACE_SYNCH_MUTEX,
bool > 
connected_
bool is_connector_
 Flag indicate this connection object is the connector or acceptor.
ACE_INET_Addr remote_address_
 Remote address.
ACE_INET_Addr local_address_
 Local address.
const TcpInsttcp_config_
 The configuration used by this connection.
TcpDataLink_rch link_
 Datalink object which is needed for connection lost callback.
int passive_reconnect_timer_id_
ReconnectState reconnect_state_
 The state indicates each step of the reconnecting.
ACE_Time_Value last_reconnect_attempted_
 Last time the connection is re-established.
Priority transport_priority_
 TRANSPORT_PRIORITY.value policy value.
bool shutdown_
 shutdown flag
bool passive_setup_
ACE_Message_Block passive_setup_buffer_
TcpTransporttransport_during_setup_
std::size_t id_
 Small unique identifying value.
ACE_thread_t reconnect_thread_

Detailed Description

Definition at line 36 of file TcpConnection.h.


Member Typedef Documentation

Definition at line 159 of file TcpConnection.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TcpConnection::LockType [private]

Definition at line 158 of file TcpConnection.h.


Member Enumeration Documentation

States are used during reconnecting.

Enumerator:
INIT_STATE 
LOST_STATE 
RECONNECTED_STATE 
PASSIVE_WAITING_STATE 
PASSIVE_TIMEOUT_CALLED_STATE 

Definition at line 42 of file TcpConnection.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::TcpConnection::TcpConnection (  ) 

Passive side constructor (acceptor).

Definition at line 44 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::reference_counting_policy(), and ACE_Event_Handler::Reference_Counting_Policy::value().

00045   : connected_(false)
00046   , is_connector_(false)
00047   , tcp_config_(0)
00048   , passive_reconnect_timer_id_(-1)
00049   , reconnect_state_(INIT_STATE)
00050   , last_reconnect_attempted_(ACE_Time_Value::zero)
00051   , transport_priority_(0)  // TRANSPORT_PRIORITY.value default value - 0.
00052   , shutdown_(false)
00053   , passive_setup_(false)
00054   , passive_setup_buffer_(sizeof(ACE_UINT32))
00055   , transport_during_setup_(0)
00056   , id_(0)
00057   , reconnect_thread_(0)
00058 {
00059   DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00060 
00061   this->reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00062 }

Here is the call graph for this function:

OpenDDS::DCPS::TcpConnection::TcpConnection ( const ACE_INET_Addr remote_address,
Priority  priority,
const TcpInst config 
)

Active side constructor (connector).

Definition at line 64 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::reference_counting_policy(), and ACE_Event_Handler::Reference_Counting_Policy::value().

00067   : connected_(false)
00068   , is_connector_(true)
00069   , remote_address_(remote_address)
00070   , local_address_(config.local_address())
00071   , tcp_config_(&config)
00072   , passive_reconnect_timer_id_(-1)
00073   , reconnect_state_(INIT_STATE)
00074   , last_reconnect_attempted_(ACE_Time_Value::zero)
00075   , transport_priority_(priority)
00076   , shutdown_(false)
00077   , passive_setup_(false)
00078   , transport_during_setup_(0)
00079   , id_(0)
00080   , reconnect_thread_(0)
00081 {
00082   DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00083   this->reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00084 
00085 }

Here is the call graph for this function:

OpenDDS::DCPS::TcpConnection::~TcpConnection (  )  [virtual]

Definition at line 86 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, ACE_Thread_Manager::instance(), ACE_Thread_Manager::join(), reconnect_thread_, ACE_OS::thr_equal(), and ACE_OS::thr_self().

00087 {
00088   DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6);
00089   if (reconnect_thread_ &&
00090     // This is for Windows, where join doesn't check if the thread is the same
00091     // and the thread will hang itself if it tries to join itself.
00092     !ACE_OS::thr_equal(ACE_OS::thr_self(), reconnect_thread_)
00093   ) {
00094     ACE_Thread_Manager::instance()->join(reconnect_thread_);
00095   }
00096 }

Here is the call graph for this function:


Member Function Documentation

int OpenDDS::DCPS::TcpConnection::active_establishment ( bool  initiate_connect = true  )  [private]

Attempt an active connection establishment to the remote address. The local address is sent to the remote (passive) side to identify ourselves to the remote side. Note this method is not thread protected. The caller need acquire the reconnect_lock_ before calling this function.

Definition at line 438 of file TcpConnection.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DirectPriorityMapper::codepoint(), ACE_SOCK_Connector::connect(), connected_, DBG_ENTRY_LVL, OpenDDS::DCPS::TcpInst::dump_to_str(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), OpenDDS::DCPS::TcpInst::get_public_address(), is_connector_, len, link_, LM_DEBUG, LM_ERROR, local_address_, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), remote_address_, send_n(), set_sock_options(), shutdown_, tcp_config_, transport_priority_, and VDBG.

Referenced by active_open(), active_reconnect_i(), and active_reconnect_on_new_association().

00439 {
00440   DBG_ENTRY_LVL("TcpConnection","active_establishment",6);
00441 
00442   // Safety check - This should not happen since is_connector_ defaults to
00443   // true and the role in a connection connector is not changed when reconnecting.
00444   if (this->is_connector_ == false) {
00445     ACE_ERROR_RETURN((LM_ERROR,
00446                       "(%P|%t) ERROR: Failed to connect because it's previously an acceptor.\n"),
00447                      -1);
00448   }
00449 
00450   if (this->shutdown_)
00451     return -1;
00452 
00453   // Now use a connector object to establish the connection.
00454   ACE_SOCK_Connector connector;
00455 
00456   if (initiate_connect && connector.connect(this->peer(), remote_address_) != 0) {
00457 
00458     ACE_DEBUG((LM_DEBUG,
00459                       ACE_TEXT("(%P|%t) DBG: Failed to connect. this->shutdown_=%d %p\n%C"),
00460                       int(this->shutdown_), ACE_TEXT("connect"), this->tcp_config_->dump_to_str().c_str()));
00461                       return -1;
00462 
00463   } else {
00464     this->connected_ = true;
00465     const std::string remote_host = this->remote_address_.get_host_addr();
00466     VDBG((LM_DEBUG, "(%P|%t) DBG:   active_establishment(%C:%d->%C:%d)\n",
00467           this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00468           remote_host.c_str(), this->remote_address_.get_port_number()));
00469   }
00470 
00471   // Set the DiffServ codepoint according to the priority value.
00472   DirectPriorityMapper mapper(this->transport_priority_);
00473   this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer());
00474 
00475   set_sock_options(tcp_config_);
00476 
00477   // In order to complete the connection establishment from the active
00478   // side, we need to tell the remote side about our public address.
00479   // It will use that as an "identifier" of sorts.  To the other
00480   // (passive) side, our local_address that we send here will be known
00481   // as the remote_address.
00482   std::string address = tcp_config_->get_public_address();
00483   ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1;
00484 
00485   ACE_UINT32 nlen = htonl(len);
00486 
00487   if (this->peer().send_n(&nlen,
00488                           sizeof(ACE_UINT32)) == -1) {
00489     // TBD later - Anything we are supposed to do to close the connection.
00490     ACE_ERROR_RETURN((LM_ERROR,
00491                       "(%P|%t) ERROR: Unable to send address string length to "
00492                       "the passive side to complete the active connection "
00493                       "establishment.\n"),
00494                      -1);
00495   }
00496 
00497   if (this->peer().send_n(address.c_str(), len)  == -1) {
00498     // TBD later - Anything we are supposed to do to close the connection.
00499     ACE_ERROR_RETURN((LM_ERROR,
00500                       "(%P|%t) ERROR: Unable to send our address to "
00501                       "the passive side to complete the active connection "
00502                       "establishment.\n"),
00503                      -1);
00504   }
00505 
00506   ACE_UINT32 npriority = htonl(this->transport_priority_);
00507 
00508   if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) {
00509     // TBD later - Anything we are supposed to do to close the connection.
00510     ACE_ERROR_RETURN((LM_ERROR,
00511                       "(%P|%t) ERROR: Unable to send publication priority to "
00512                       "the passive side to complete the active connection "
00513                       "establishment.\n"),
00514                      -1);
00515   }
00516 
00517   return 0;
00518 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::TcpConnection::active_open (  ) 

Protocol setup (handshake) on the active side. The local address is sent to the remote (passive) side to identify ourselves to the remote side.

Definition at line 576 of file TcpConnection.cpp.

References active_establishment(), connected_, DBG_ENTRY_LVL, reconnect_lock_, shutdown_, and ACE_Atomic_Op< ACE_LOCK, TYPE >::value().

Referenced by open().

00577 {
00578   DBG_ENTRY_LVL("TcpConnection","active_open",6);
00579 
00580   GuardType guard(reconnect_lock_);
00581   if (this->shutdown_)
00582     return -1;
00583 
00584   if (connected_.value()) {
00585     return 0;
00586   }
00587 
00588   return active_establishment(false /* !initiate_connect */);
00589 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::TcpConnection::active_reconnect_i (  )  [private]

Definition at line 668 of file TcpConnection.cpp.

References ACE_TEXT(), active_establishment(), config_name(), OpenDDS::DCPS::TcpInst::conn_retry_attempts_, OpenDDS::DCPS::TcpInst::conn_retry_backoff_multiplier_, OpenDDS::DCPS::TcpInst::conn_retry_initial_delay_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), ACE_Task< ACE_NULL_SYNCH >::gettimeofday(), ACE_OS::gettimeofday(), INIT_STATE, last_reconnect_attempted_, link_, LM_DEBUG, LM_ERROR, local_address_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, ACE_Event_Handler::READ_MASK, receive_strategy(), reconnect_delay(), reconnect_lock_, reconnect_state_, reconnect_state_string(), OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, send_strategy(), shutdown_, ACE_OS::sleep(), tcp_config_, and VDBG.

Referenced by reconnect().

00669 {
00670   DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6);
00671 
00672   GuardType guard(this->reconnect_lock_);
00673   if (this->shutdown_)
00674     return -1;
00675 
00676   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00677         "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %C\n",
00678         this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00679         this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00680         this->reconnect_state_string().c_str()));
00681   if (DCPS_debug_level >= 1) {
00682     ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::"
00683           "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %C\n",
00684           this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00685           this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00686           this->reconnect_state_string().c_str()));
00687   }
00688   // We need reset the state to INIT_STATE if we are previously reconnected.
00689   // This would allow re-establishing connection after the re-established
00690   // connection lost again.
00691   if (ACE_OS::gettimeofday() - this->last_reconnect_attempted_ > reconnect_delay
00692       && this->reconnect_state_ == RECONNECTED_STATE) {
00693     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00694           "We are in RECONNECTED_STATE and now flip reconnect state to INIT_STATE.\n"));
00695     this->reconnect_state_ = INIT_STATE;
00696   }
00697 
00698   if (this->reconnect_state_ == INIT_STATE) {
00699     // Suspend send once.
00700     TcpSendStrategy_rch send_strategy = this->send_strategy();
00701     if (send_strategy)
00702       send_strategy->suspend_send();
00703     this->reconnect_lock_.release();
00704     this->disconnect();
00705     this->reconnect_lock_.acquire();
00706 
00707     if (this->shutdown_)
00708       return -1;
00709 
00710     if (this->tcp_config_->conn_retry_attempts_ > 0) {
00711       this->link_->notify(DataLink::DISCONNECTED);
00712     }
00713 
00714     // else the conn_retry_attempts is 0 then we do not need this extra
00715     // notify_disconnected() since the user application should get the
00716     // notify_lost() without delay.
00717 
00718     double retry_delay_msec = this->tcp_config_->conn_retry_initial_delay_;
00719     int ret = -1;
00720 
00721     for (int i = 0; i < this->tcp_config_->conn_retry_attempts_; ++i) {
00722       ret = this->active_establishment();
00723 
00724       if (this->shutdown_)
00725         break;
00726 
00727       if (ret == -1) {
00728         ACE_Time_Value delay_tv(((int)retry_delay_msec)/1000,
00729                                 ((int)retry_delay_msec)%1000*1000);
00730         ACE_OS::sleep(delay_tv);
00731         retry_delay_msec *= this->tcp_config_->conn_retry_backoff_multiplier_;
00732 
00733       } else {
00734         break;
00735       }
00736     }
00737 
00738     if (ret == -1) {
00739       if (this->tcp_config_->conn_retry_attempts_ > 0) {
00740         ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00741                    this->config_name().c_str(),
00742                    this->remote_address_.get_host_addr(),
00743                    this->remote_address_.get_port_number()));
00744 
00745       } else {
00746         ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C:%d.\n",
00747                    this->config_name().c_str(),
00748                    this->remote_address_.get_host_addr(),
00749                    this->remote_address_.get_port_number()));
00750       }
00751 
00752       this->reconnect_state_ = LOST_STATE;
00753       this->link_->notify(DataLink::LOST);
00754       if (send_strategy)
00755         send_strategy->terminate_send();
00756 
00757     } else {
00758       ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C:%d.\n",
00759                  this->config_name().c_str(),
00760                  this->remote_address_.get_host_addr(),
00761                  this->remote_address_.get_port_number()));
00762       TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00763       if (receive_strategy->get_reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) {
00764         ACE_ERROR_RETURN((LM_ERROR,
00765                           "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_i() can't register "
00766                           "with reactor %X %p\n", this, ACE_TEXT("register_handler")),
00767                          -1);
00768       }
00769       this->reconnect_state_ = RECONNECTED_STATE;
00770       this->link_->notify(DataLink::RECONNECTED);
00771       send_strategy->resume_send();
00772     }
00773 
00774     this->last_reconnect_attempted_ = ACE_OS::gettimeofday();
00775   }
00776 
00777   return this->reconnect_state_ == LOST_STATE ? -1 : 0;
00778 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::TcpConnection::active_reconnect_on_new_association (  )  [private]

Definition at line 592 of file TcpConnection.cpp.

References active_establishment(), connected_, DBG_ENTRY_LVL, INIT_STATE, reconnect_lock_, reconnect_state_, send_strategy(), and shutdown_.

Referenced by reconnect().

00593 {
00594   DBG_ENTRY_LVL("TcpConnection","active_reconnect_on_new_association",6);
00595   GuardType guard(this->reconnect_lock_);
00596   if (this->shutdown_)
00597     return -1;
00598 
00599   if (this->connected_ == true)
00600     return 0;
00601 
00602   else if (this->active_establishment() == 0) {
00603     this->reconnect_state_ = INIT_STATE;
00604     TcpSendStrategy_rch send_strategy = this->send_strategy();
00605     if (send_strategy)
00606       send_strategy->resume_send();
00607     return 0;
00608   }
00609 
00610   return -1;
00611 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_Event_Handler::Reference_Count OpenDDS::DCPS::TcpConnection::add_reference ( void   )  [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 999 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject::_add_ref().

01000 {
01001   RcObject::_add_ref();
01002   return 1;
01003 }

Here is the call graph for this function:

int OpenDDS::DCPS::TcpConnection::close ( u_long   )  [virtual]

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 331 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, disconnect(), and send_strategy().

00332 {
00333   DBG_ENTRY_LVL("TcpConnection","close",6);
00334 
00335   // TBD SOON - Find out exactly when close() is called.
00336   //            I have no clue when and who might call this.
00337   TcpSendStrategy_rch send_strategy = this->send_strategy();
00338   if (send_strategy)
00339     send_strategy->terminate_send();
00340 
00341   this->disconnect();
00342 
00343   return 0;
00344 }

Here is the call graph for this function:

const std::string & OpenDDS::DCPS::TcpConnection::config_name (  )  const [private]

Definition at line 347 of file TcpConnection.cpp.

References link_.

Referenced by active_reconnect_i(), handle_close(), handle_timeout(), and reconnect().

00348 {
00349   return this->link_->impl().config().name();
00350 }

Here is the caller graph for this function:

void OpenDDS::DCPS::TcpConnection::disconnect ( void   ) 

This will be called by the DataLink (that "owns" us) when the TcpTransport has been told to shutdown(), or when the DataLink finds itself no longer needed, and is "self-releasing".

Definition at line 111 of file TcpConnection.cpp.

References connected_, DBG_ENTRY_LVL, ACE_Event_Handler::DONT_CALL, link_, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), ACE_Event_Handler::READ_MASK, and receive_strategy().

Referenced by active_reconnect_i(), close(), handle_close(), and notify_lost_on_backpressure_timeout().

00112 {
00113   DBG_ENTRY_LVL("TcpConnection","disconnect",6);
00114   this->connected_ = false;
00115   TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00116   if (receive_strategy) {
00117     receive_strategy->get_reactor()->remove_handler(this,
00118                                                     ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00119   }
00120 
00121   if (this->link_) {
00122     this->link_->drop_pending_request_acks();
00123   }
00124 
00125   this->peer().close();
00126 
00127 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE ACE_INET_Addr OpenDDS::DCPS::TcpConnection::get_remote_address (  ) 

Definition at line 46 of file TcpConnection.inl.

References remote_address_.

00047 {
00048   return this->remote_address_;
00049 }

int OpenDDS::DCPS::TcpConnection::handle_close ( ACE_HANDLE  ,
ACE_Reactor_Mask   
) [virtual]

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 353 of file TcpConnection.cpp.

References config_name(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), link_, LM_DEBUG, receive_strategy(), remote_address_, send_strategy(), and spawn_reconnect_thread().

00354 {
00355   DBG_ENTRY_LVL("TcpConnection","handle_close",6);
00356 
00357   if (DCPS_debug_level >= 1) {
00358     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C:%d.\n",
00359                this->config_name().c_str(),
00360                this->remote_address_.get_host_addr(),
00361                this->remote_address_.get_port_number()));
00362   }
00363 
00364   TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00365   bool graceful = receive_strategy && receive_strategy->gracefully_disconnected();
00366 
00367   TcpSendStrategy_rch send_strategy = this->send_strategy();
00368   if (send_strategy) {
00369     if (graceful) {
00370       send_strategy->terminate_send();
00371     } else {
00372       send_strategy->suspend_send();
00373     }
00374   }
00375 
00376   this->disconnect();
00377 
00378   if (graceful) {
00379     this->link_->notify(DataLink::DISCONNECTED);
00380   } else {
00381     this->spawn_reconnect_thread();
00382   }
00383 
00384   return 0;
00385 }

Here is the call graph for this function:

int OpenDDS::DCPS::TcpConnection::handle_input ( ACE_HANDLE  fd  )  [virtual]

We pass this "event" along to the receive_strategy.

Reimplemented from ACE_Event_Handler.

Definition at line 288 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, handle_setup_input(), passive_setup_, and receive_strategy().

00289 {
00290   DBG_ENTRY_LVL("TcpConnection","handle_input",6);
00291 
00292   if (passive_setup_) {
00293     return handle_setup_input(fd);
00294   }
00295   TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00296   if (!receive_strategy) {
00297     return 0;
00298   }
00299 
00300   return receive_strategy->handle_dds_input(fd);
00301 }

Here is the call graph for this function:

int OpenDDS::DCPS::TcpConnection::handle_output ( ACE_HANDLE   )  [virtual]

Handle back pressure when sending.

Reimplemented from ACE_Event_Handler.

Definition at line 304 of file TcpConnection.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, id_, LM_DEBUG, send_strategy(), and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO.

00305 {
00306   DBG_ENTRY_LVL("TcpConnection","handle_output",6);
00307   TcpSendStrategy_rch send_strategy = this->send_strategy();
00308   if (send_strategy) {
00309     if (DCPS_debug_level > 9) {
00310       ACE_DEBUG((LM_DEBUG,
00311                  ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ")
00312                  ACE_TEXT("sending queued data.\n"),
00313                  id_));
00314     }
00315 
00316     // Process data to be sent from the queue.
00317     if (ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO
00318         != send_strategy->perform_work()) {
00319 
00320       // Stop handling output ready events when there is nothing to output.
00321       // N.B. This calls back into the reactor.  Is the reactor lock
00322       //      recursive?
00323       send_strategy->schedule_output();
00324     }
00325   }
00326 
00327   return 0;
00328 }

Here is the call graph for this function:

int OpenDDS::DCPS::TcpConnection::handle_setup_input ( ACE_HANDLE  h  )  [private]

During the connection setup phase, the passive side sets passive_setup_, redirecting handle_input() events here (there is no recv strategy yet).

Definition at line 219 of file TcpConnection.cpp.

References ACE_Message_Block::base(), connected_, ACE_Event_Handler::DONT_CALL, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), ACE_Message_Block::length(), LM_DEBUG, local_address_, ACE_OS::memcpy(), OpenDDS::DCPS::TcpTransport::passive_connection(), passive_setup_, passive_setup_buffer_, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), OpenDDS::DCPS::rchandle_from(), ACE_Message_Block::rd_ptr(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, reconnect_state_string(), remote_address_, ACE_Message_Block::reset(), ACE_Message_Block::size(), ACE_Message_Block::space(), transport_during_setup_, transport_priority_, VDBG, VDBG_LVL, ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.

Referenced by handle_input().

00220 {
00221   const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(),
00222                                   passive_setup_buffer_.space(),
00223                                   &ACE_Time_Value::zero);
00224 
00225   if (ret < 0 && errno == ETIME) {
00226     return 0;
00227   }
00228 
00229   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input %@ "
00230             "recv returned %b %m.\n", this, ret), 4);
00231 
00232   if (ret <= 0) {
00233     return -1;
00234   }
00235 
00236   passive_setup_buffer_.wr_ptr(ret);
00237   // Parse the setup message: <len><addr><prio>
00238   // len and prio are network order 32-bit ints
00239   // addr is a string of length len, including null
00240   ACE_UINT32 nlen = 0;
00241 
00242   if (passive_setup_buffer_.length() >= sizeof(nlen)) {
00243 
00244     ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen));
00245     passive_setup_buffer_.rd_ptr(sizeof(nlen));
00246     ACE_UINT32 hlen = ntohl(nlen);
00247     passive_setup_buffer_.size(hlen + 2 * sizeof(nlen));
00248 
00249     ACE_UINT32 nprio = 0;
00250 
00251     if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) {
00252 
00253       const std::string bufstr(passive_setup_buffer_.rd_ptr());
00254       const NetworkAddress network_order_address(bufstr);
00255       network_order_address.to_addr(remote_address_);
00256 
00257       ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio));
00258       transport_priority_ = ntohl(nprio);
00259 
00260       passive_setup_buffer_.reset();
00261       passive_setup_ = false;
00262 
00263       VDBG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input "
00264             "%@ %C:%d->%C:%d, priority==%d, reconnect_state = %C\n", this,
00265             remote_address_.get_host_addr(), remote_address_.get_port_number(),
00266             local_address_.get_host_addr(), local_address_.get_port_number(),
00267             transport_priority_, reconnect_state_string().c_str()));
00268 
00269       // remove from reactor, normal recv strategy setup will add us back
00270       if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) {
00271         VDBG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input "
00272               "remove_handler failed %m.\n"));
00273       }
00274 
00275       transport_during_setup_->passive_connection(remote_address_, rchandle_from(this));
00276       connected_ = true;
00277 
00278       return 0;
00279     }
00280   }
00281 
00282   passive_setup_buffer_.rd_ptr(passive_setup_buffer_.base());
00283 
00284   return 0;
00285 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::TcpConnection::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
) [virtual]

A timer is scheduled on acceptor side to check if a new connection is accepted after the connection is lost.

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 783 of file TcpConnection.cpp.

References ACE_TEXT(), config_name(), DBG_ENTRY_LVL, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_lock_, reconnect_state_, RECONNECTED_STATE, remote_address_, send_strategy(), and tear_link().

00785 {
00786   DBG_ENTRY_LVL("TcpConnection","handle_timeout",6);
00787 
00788   GuardType guard(this->reconnect_lock_);
00789 
00790   switch (this->reconnect_state_) {
00791   case PASSIVE_WAITING_STATE: {
00792     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00793                this->config_name().c_str(),
00794                this->remote_address_.get_host_addr(),
00795                this->remote_address_.get_port_number()));
00796 
00797     this->reconnect_state_ = PASSIVE_TIMEOUT_CALLED_STATE;
00798     // We stay in PASSIVE_TIMEOUT_CALLED_STATE indicates there is no new connection.
00799     // Now we need declare the connection is lost.
00800     this->link_->notify(DataLink::LOST);
00801 
00802     // The handle_timeout may be called after the connection is re-established
00803     // and the send strategy of this old connection is reset to nil.
00804     TcpSendStrategy_rch send_strategy = this->send_strategy();
00805     if (send_strategy)
00806       send_strategy->terminate_send();
00807 
00808     this->reconnect_state_ = LOST_STATE;
00809 
00810     this->tear_link();
00811 
00812   }
00813   break;
00814 
00815   case RECONNECTED_STATE:
00816     // reconnected successfully.
00817     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C:%d.\n",
00818                this->config_name().c_str(),
00819                this->remote_address_.get_host_addr(),
00820                this->remote_address_.get_port_number()));
00821     break;
00822 
00823   default :
00824     ACE_ERROR((LM_ERROR,
00825                ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ")
00826                ACE_TEXT(" unknown state or it should not be in state=%d \n"),
00827                reconnect_state_));
00828     break;
00829   }
00830 
00831   return 0;
00832 }

Here is the call graph for this function:

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE std::size_t & OpenDDS::DCPS::TcpConnection::id ( void   ) 

Definition at line 20 of file TcpConnection.inl.

References id_.

00021 {
00022   return id_;
00023 }

ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connected ( void   )  const

Return true if connection is connected.

Definition at line 33 of file TcpConnection.inl.

References connected_, and ACE_Atomic_Op< ACE_LOCK, TYPE >::value().

00034 {
00035   return this->connected_.value();
00036 }

Here is the call graph for this function:

ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connector (  )  const

Return true if the object represents the connector side, otherwise it's the acceptor side. The acceptor/connector role is not changed when re-establishing the connection.

Definition at line 27 of file TcpConnection.inl.

References is_connector_.

00028 {
00029   return this->is_connector_;
00030 }

void OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout (  ) 

This function is called when the backpressure occurs and timed out after "max_output_pause_period". The lost connection notification should be sent and the connection needs be closed since we declared it as a "lost" connection.

Definition at line 927 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, disconnect(), INIT_STATE, link_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, reconnect_lock_, reconnect_state_, and send_strategy().

00928 {
00929   DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6);
00930   bool notify_lost = false;
00931   {
00932     GuardType guard(this->reconnect_lock_);
00933 
00934     if (this->reconnect_state_ == INIT_STATE) {
00935       this->reconnect_state_ = LOST_STATE;
00936       notify_lost = true;
00937 
00938     }
00939   }
00940 
00941   if (notify_lost) {
00942     this->disconnect();
00943     this->link_->notify(DataLink::LOST);
00944 
00945     TcpSendStrategy_rch send_strategy = this->send_strategy();
00946     if (send_strategy)
00947       send_strategy->terminate_send();
00948   }
00949 
00950 }

Here is the call graph for this function:

int OpenDDS::DCPS::TcpConnection::open ( void *  arg  )  [virtual]

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 130 of file TcpConnection.cpp.

References ACE_TEXT(), active_open(), OpenDDS::DCPS::TcpTransport::async_connect_failed(), OpenDDS::DCPS::TcpTransport::connect_tcp_datalink(), DBG_ENTRY_LVL, OpenDDS::DCPS::TcpAcceptor::get_configuration(), ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::get_handle(), is_connector_, link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TcpInst::local_address(), local_address_, passive_setup_, passive_setup_buffer_, OpenDDS::DCPS::rchandle_from(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, remote_address_, set_sock_options(), ACE_Message_Block::size(), tcp_config_, OpenDDS::DCPS::TcpAcceptor::transport(), transport_during_setup_, transport_priority_, and VDBG_LVL.

00131 {
00132   DBG_ENTRY_LVL("TcpConnection","open",6);
00133 
00134   if (is_connector_) {
00135 
00136     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::open active.\n"), 2);
00137     // Take over the refcount from TcpTransport::connect_datalink().
00138 
00139     TcpTransport& transport = static_cast<TcpTransport&>(link_->impl());
00140 
00141     const bool is_loop(local_address_ == remote_address_);
00142     const PriorityKey key(transport_priority_, remote_address_,
00143                           is_loop, false /* !active */);
00144 
00145     int active_open_ = active_open();
00146 
00147     int connect_tcp_datalink_ = transport.connect_tcp_datalink(*link_, rchandle_from(this));
00148 
00149     if (active_open_ == -1 || connect_tcp_datalink_ == -1) {
00150       // if (active_open() == -1 ||
00151       //       transport->connect_tcp_datalink(link_, self) == -1) {
00152 
00153       transport.async_connect_failed(key);
00154 
00155       return -1;
00156     }
00157 
00158     return 0;
00159   }
00160 
00161   // The passed-in arg is really the acceptor object that created this
00162   // TcpConnection object, and is also the caller of this open()
00163   // method.  We need to cast the arg to the TcpAcceptor* type.
00164   TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg);
00165 
00166   if (acceptor == 0) {
00167     // The cast failed.
00168     ACE_ERROR_RETURN((LM_ERROR,
00169                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00170                       ACE_TEXT("failed to cast void* arg to ")
00171                       ACE_TEXT("TcpAcceptor* type.\n")),
00172                      -1);
00173   }
00174 
00175   TcpConnection_rch self(this, keep_count());
00176 
00177   // Now we need to ask the TcpAcceptor object to provide us with
00178   // a pointer to the TcpTransport object that "owns" the acceptor.
00179   TcpTransport* transport = acceptor->transport();
00180 
00181   if (!transport) {
00182     // The acceptor gave us a nil transport (smart) pointer.
00183     ACE_ERROR_RETURN((LM_ERROR,
00184                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00185                       ACE_TEXT("acceptor's transport is nil.\n")),
00186                      -1);
00187   }
00188 
00189   // Keep a "copy" of the reference to TcpInst object
00190   // for ourselves.
00191   tcp_config_ =  &acceptor->get_configuration();
00192   local_address_ = tcp_config_->local_address();
00193 
00194   set_sock_options(tcp_config_);
00195 
00196   // We expect that the active side of the connection (the remote side
00197   // in this case) will supply its listening ACE_INET_Addr as the first
00198   // message it sends to the socket.  This is a one-way connection
00199   // establishment protocol message.
00200   passive_setup_ = true;
00201   transport_during_setup_ = transport;
00202   passive_setup_buffer_.size(sizeof(ACE_UINT32));
00203 
00204   if (reactor()->register_handler(this, READ_MASK) == -1) {
00205     ACE_ERROR_RETURN((LM_ERROR,
00206                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00207                       ACE_TEXT("unable to register with the reactor.%p\n"),
00208                       ACE_TEXT("register_handler")),
00209                      -1);
00210   }
00211 
00212   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::open passive handle=%d.\n",
00213             static_cast<int>(intptr_t(get_handle()))), 2);
00214 
00215   return 0;
00216 }

Here is the call graph for this function:

int OpenDDS::DCPS::TcpConnection::passive_reconnect_i (  )  [private]

Definition at line 617 of file TcpConnection.cpp.

References ACE_TEXT(), connected_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataLink::DISCONNECTED, INIT_STATE, link_, LM_ERROR, OpenDDS::DCPS::TcpInst::passive_reconnect_duration_, passive_reconnect_timer_id_, PASSIVE_WAITING_STATE, receive_strategy(), reconnect_lock_, reconnect_state_, shutdown_, and tcp_config_.

Referenced by reconnect().

00618 {
00619   DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6);
00620   GuardType guard(this->reconnect_lock_);
00621   if (this->shutdown_)
00622     return -1;
00623 
00624   // The passive_reconnect_timer_id_ is used as flag to allow the timer scheduled just once.
00625   if (this->reconnect_state_ == INIT_STATE) {
00626     // Mark the connection lost since the recv/send just failed.
00627     this->connected_ = false;
00628 
00629     if (this->tcp_config_->passive_reconnect_duration_ == 0)
00630       return -1;
00631 
00632     ACE_Time_Value timeout(this->tcp_config_->passive_reconnect_duration_/1000,
00633                            this->tcp_config_->passive_reconnect_duration_%1000 * 1000);
00634     this->reconnect_state_ = PASSIVE_WAITING_STATE;
00635     this->link_->notify(DataLink::DISCONNECTED);
00636 
00637     // It is possible that the passive reconnect is called after the new connection
00638     // is accepted and the receive_strategy of this old connection is reset to nil.
00639     TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00640     if (this->receive_strategy()) {
00641 
00642       // Give a copy to reactor.
00643       this->passive_reconnect_timer_id_ = receive_strategy->get_reactor()->schedule_timer(this, 0, timeout);
00644 
00645       if (this->passive_reconnect_timer_id_ == -1) {
00646         ACE_ERROR_RETURN((LM_ERROR,
00647                           ACE_TEXT("(%P|%t) ERROR: TcpConnection::passive_reconnect_i")
00648                           ACE_TEXT(", %p.\n"), ACE_TEXT("schedule_timer")),
00649                          -1);
00650       }
00651     }
00652   }
00653 
00654   return 0;
00655 }

Here is the call graph for this function:

Here is the caller graph for this function:

OpenDDS::DCPS::TcpReceiveStrategy_rch OpenDDS::DCPS::TcpConnection::receive_strategy (  ) 

Definition at line 105 of file TcpConnection.cpp.

References link_.

Referenced by active_reconnect_i(), disconnect(), handle_close(), handle_input(), passive_reconnect_i(), and transfer().

00106 {
00107   return this->link_->receive_strategy();
00108 }

Here is the caller graph for this function:

int OpenDDS::DCPS::TcpConnection::reconnect ( bool  on_new_association = false  ) 

This function is called to re-establish the connection. If this object is the connector side of the connection then it tries to reconnect to the remote, if it's the acceptor side of the connection then it schedules a timer to check if it passively accepted a connection from remote. The on_new_association true indicates this is called when the connection is previous lost and new association is added. The connector side needs to try to actively reconnect to remote.

Definition at line 528 of file TcpConnection.cpp.

References active_reconnect_i(), active_reconnect_on_new_association(), config_name(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), is_connector_, link_, LM_DEBUG, passive_reconnect_i(), and remote_address_.

Referenced by OpenDDS::DCPS::TcpReconnectTask::execute().

00529 {
00530   DBG_ENTRY_LVL("TcpConnection","reconnect",6);
00531   if (DCPS_debug_level >= 1) {
00532     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect initiated on transport: %C to %C:%d.\n",
00533                this->config_name().c_str(),
00534                this->remote_address_.get_host_addr(),
00535                this->remote_address_.get_port_number()));
00536   }
00537 
00538   if (on_new_association) {
00539     if (DCPS_debug_level >= 1) {
00540       ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect on new association\n"));
00541     }
00542     return this->active_reconnect_on_new_association();
00543   }
00544 
00545   // If on_new_association is false, it's called by the reconnect task.
00546   // We need make sure if the link release is pending. If does, do
00547   // not try to reconnect.
00548   else if (!this->link_->is_release_pending()) {
00549     if (DCPS_debug_level >= 1) {
00550       ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect release not currently pending\n"));
00551     }
00552     // Try to reconnect if it's connector previously.
00553     if (this->is_connector_ && this->active_reconnect_i() == -1) {
00554       if (DCPS_debug_level >= 1) {
00555         ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is connector but active_reconnect_i failed\n"));
00556       }
00557       return -1;
00558     }
00559 
00560     // Schedule a timer to see if a incoming connection is accepted when timeout.
00561     else if (!this->is_connector_ && this->passive_reconnect_i() == -1) {
00562       if (DCPS_debug_level >= 1) {
00563         ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is acceptor but passive_reconnect_i failed\n"));
00564       }
00565       return -1;
00566     }
00567 
00568   }
00569   if (DCPS_debug_level >= 1) {
00570     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect returning 0\n"));
00571   }
00572   return 0;
00573 }

Here is the call graph for this function:

Here is the caller graph for this function:

OPENDDS_STRING OpenDDS::DCPS::TcpConnection::reconnect_state_string (  )  const [private]

Get name of the current reconnect state as a string.

Definition at line 1059 of file TcpConnection.cpp.

References ACE_TEXT(), INIT_STATE, LM_ERROR, LOST_STATE, OPENDDS_STRING, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_state_, RECONNECTED_STATE, and OpenDDS::DCPS::to_dds_string().

Referenced by active_reconnect_i(), and handle_setup_input().

01060 {
01061   switch (reconnect_state_) {
01062   case INIT_STATE:
01063     return "INIT_STATE";
01064   case LOST_STATE:
01065     return "LOST_STATE";
01066   case RECONNECTED_STATE:
01067     return "RECONENCTED_STATE";
01068   case PASSIVE_WAITING_STATE:
01069     return "PASSIVE_WAITING_STATE";
01070   case PASSIVE_TIMEOUT_CALLED_STATE:
01071     return "PASSIVE_TIMEOUT_CALLED_STATE";
01072   default:
01073     ACE_ERROR((LM_ERROR, ACE_TEXT(
01074       "OpenDDS::DCPS::TcpConnection::reconnect_state_string(): "
01075       "%d is either completely invalid or at least not defined in this function.\n"),
01076       reconnect_state_
01077     ));
01078     return OPENDDS_STRING("(Unknown Reconnect State: ")
01079       + to_dds_string(reconnect_state_) + ")";
01080   }
01081 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_THR_FUNC_RETURN OpenDDS::DCPS::TcpConnection::reconnect_thread_fun ( void *  conn  )  [static, private]

Definition at line 1035 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, SIG_SETMASK, ACE_OS::sigfillset(), sigset_t, and ACE_OS::thr_sigsetmask().

Referenced by spawn_reconnect_thread().

01036 {
01037   DBG_ENTRY_LVL("TcpConnection","reconnect_thread_fun",6);
01038 
01039   // Ignore all signals to avoid
01040   //     ERROR: <something descriptive> Interrupted system call
01041   // The main thread will handle signals.
01042   sigset_t set;
01043   ACE_OS::sigfillset(&set);
01044   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
01045 
01046   // Make sure the associated transport_config outlives the connection object.
01047   RcHandle<TransportInst> transport_config;
01048   TcpConnection_rch connection(static_cast<TcpConnection*>(arg), keep_count());
01049   transport_config = RcHandle<TransportInst>(&connection->link_->impl().config(), keep_count());
01050 
01051   if (connection->reconnect() == -1) {
01052     connection->tear_link();
01053   }
01054 
01055   return 0;
01056 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TcpConnection::relink_from_recv ( bool  do_suspend  ) 

Reconnect initiated by receive strategy.

This is called by TcpReceiveStrategy when a disconnect is detected. It simply suspends any sends and lets the handle_close() handle the reconnect logic.

Definition at line 972 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, and send_strategy().

00973 {
00974   DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6);
00975   TcpSendStrategy_rch send_strategy = this->send_strategy();
00976   if (do_suspend && send_strategy)
00977     send_strategy->suspend_send();
00978 }

Here is the call graph for this function:

void OpenDDS::DCPS::TcpConnection::relink_from_send ( bool  do_suspend  ) 

Reconnect initiated by send strategy.

This is called by TcpSendStrategy when a send fails and a reconnect should be initiated. This method suspends any sends and kicks the reconnect thread into action.

Definition at line 957 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, send_strategy(), and spawn_reconnect_thread().

00958 {
00959   DBG_ENTRY_LVL("TcpConnection","relink_from_send",6);
00960 
00961   TcpSendStrategy_rch send_strategy = this->send_strategy();
00962   if (do_suspend && send_strategy)
00963     send_strategy->suspend_send();
00964 
00965   this->spawn_reconnect_thread();
00966 }

Here is the call graph for this function:

ACE_Event_Handler::Reference_Count OpenDDS::DCPS::TcpConnection::remove_reference ( void   )  [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 1006 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject::_remove_ref().

01007 {
01008   RcObject::_remove_ref();
01009   return 1;
01010 }

Here is the call graph for this function:

OpenDDS::DCPS::TcpSendStrategy_rch OpenDDS::DCPS::TcpConnection::send_strategy (  ) 

Definition at line 99 of file TcpConnection.cpp.

References link_.

Referenced by active_reconnect_i(), active_reconnect_on_new_association(), close(), handle_close(), handle_output(), handle_timeout(), notify_lost_on_backpressure_timeout(), relink_from_recv(), and relink_from_send().

00100 {
00101   return this->link_->send_strategy();
00102 }

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::TcpConnection::set_datalink ( const TcpDataLink_rch link  ) 

Cache the reference to the datalink object for lost connection callbacks.

Definition at line 39 of file TcpConnection.inl.

References link_.

00040 {
00041   // Keep a "copy" of the reference to the data link for ourselves.
00042   this->link_ = link;
00043 }

void OpenDDS::DCPS::TcpConnection::set_sock_options ( const TcpInst tcp_config  ) 

Definition at line 388 of file TcpConnection.cpp.

References OpenDDS::DCPS::TcpInst::enable_nagle_algorithm_, LM_ERROR, and ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer().

Referenced by active_establishment(), and open().

00389 {
00390 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00391   int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00392   int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00393   //ACE_SOCK_Stream sock = ACE_static_cast(ACE_SOCK_Stream, this->peer() );
00394 #  if !defined (ACE_LACKS_SOCKET_BUFSIZ)
00395 
00396   // A little screwy double negative logic: disabling nagle involves
00397   // enabling TCP_NODELAY
00398   int opt = (tcp_config->enable_nagle_algorithm_ == false);
00399 
00400   if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
00401     ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n"));
00402   }
00403 
00404   if (this->peer().set_option(SOL_SOCKET,
00405                               SO_SNDBUF,
00406                               (void *) &snd_size,
00407                               sizeof(snd_size)) == -1
00408       && errno != ENOTSUP) {
00409     ACE_ERROR((LM_ERROR,
00410                "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
00411                snd_size));
00412     return;
00413   }
00414 
00415   if (this->peer().set_option(SOL_SOCKET,
00416                               SO_RCVBUF,
00417                               (void *) &rcv_size,
00418                               sizeof(int)) == -1
00419       && errno != ENOTSUP) {
00420     ACE_ERROR((LM_ERROR,
00421                "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m \n",
00422                rcv_size));
00423     return;
00424   }
00425 
00426 #  else
00427   ACE_UNUSED_ARG(tcp_config);
00428   ACE_UNUSED_ARG(snd_size);
00429   ACE_UNUSED_ARG(rcv_size);
00430 #  endif /* !ACE_LACKS_SOCKET_BUFSIZ */
00431 
00432 #else
00433   ACE_UNUSED_ARG(tcp_config);
00434 #endif /* !ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
00435 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TcpConnection::shutdown ( void   ) 

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 989 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, reconnect_lock_, and shutdown_.

00990 {
00991   DBG_ENTRY_LVL("TcpConnection","shutdown",6);
00992   GuardType guard(this->reconnect_lock_);
00993   this->shutdown_ = true;
00994   ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>::shutdown();
00995 
00996 }

void OpenDDS::DCPS::TcpConnection::spawn_reconnect_thread (  )  [private]

Definition at line 1013 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject::_add_ref(), OpenDDS::DCPS::RcObject::_remove_ref(), DBG_ENTRY_LVL, ACE_Thread_Manager::instance(), link_, reconnect_lock_, reconnect_thread_, reconnect_thread_fun(), and shutdown_.

Referenced by handle_close(), and relink_from_send().

01014 {
01015   DBG_ENTRY_LVL("TcpConnection","spawn_reconnect_thread",6);
01016   GuardType guard(this->reconnect_lock_);
01017   if (!shutdown_) {
01018     // Make sure the associated transport_config outlives the connection object.
01019     TransportInst& transport_config = this->link_->impl().config();
01020     transport_config._add_ref();
01021     // add the reference count to be picked up from the new thread
01022     this->_add_ref();
01023     if (ACE_Thread_Manager::instance()->spawn(&reconnect_thread_fun,
01024                                               this,
01025                                               THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED,
01026                                               &reconnect_thread_) == -1){
01027       // we need to decrement the reference count when thread creation fails.
01028       this->_remove_ref();
01029       transport_config._remove_ref();
01030     }
01031   }
01032 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TcpConnection::tear_link (  ) 

Called by the reconnect task to inform us that the link & any associated data can be torn down. This call is done with no DCPS/transport locks held.

Definition at line 981 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, and link_.

Referenced by OpenDDS::DCPS::TcpReconnectTask::execute(), and handle_timeout().

00982 {
00983   DBG_ENTRY_LVL("TcpConnection","tear_link",6);
00984 
00985   return this->link_->release_resources();
00986 }

Here is the caller graph for this function:

void OpenDDS::DCPS::TcpConnection::transfer ( TcpConnection connection  ) 

This object would be "old" connection object and the provided is the new connection object. The "old" connection object will copy its states to to the "new" connection object. This is called by the TcpDataLink when a new connection is accepted (with a new TcpConnection object). We need make the state in "new" connection object consistent with the "old" connection object.

Definition at line 841 of file TcpConnection.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), INIT_STATE, ACE_Thread_Manager::instance(), is_connector_, ACE_Thread_Manager::join(), link_, LM_DEBUG, LM_ERROR, local_address_, LOST_STATE, passive_reconnect_timer_id_, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, receive_strategy(), reconnect_lock_, reconnect_state_, reconnect_thread_, OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, tcp_config_, and VDBG.

00842 {
00843   DBG_ENTRY_LVL("TcpConnection","transfer",6);
00844 
00845   GuardType guard(this->reconnect_lock_);
00846 
00847   bool notify_reconnect = false;
00848 
00849   switch (this->reconnect_state_) {
00850   case INIT_STATE:
00851     // We have not detected the lost connection and the peer is faster than us and
00852     // re-established the connection. so do not notify reconnected.
00853     break;
00854 
00855   case LOST_STATE:
00856 
00857     // The reconnect timed out.
00858   case PASSIVE_TIMEOUT_CALLED_STATE:
00859     // TODO: If the handle_timeout is called before the old connection
00860     // transfer its state to new connection then should we disconnect
00861     // the new connection or keep it alive ?
00862     // I think we should keep the connection, the user will get a
00863     // lost connection notification and then a reconnected notification.
00864     notify_reconnect = true;
00865     break;
00866 
00867   case PASSIVE_WAITING_STATE: {
00868     // Cancel the timer since we got new connection.
00869     TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00870     if (receive_strategy->get_reactor()->cancel_timer(this) == -1) {
00871       ACE_ERROR((LM_ERROR,
00872                  ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00873                  ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
00874 
00875     } else
00876       passive_reconnect_timer_id_ = -1;
00877 
00878     notify_reconnect = true;
00879   }
00880   break;
00881 
00882   default :
00883     ACE_ERROR((LM_ERROR,
00884                ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00885                ACE_TEXT(" unknown state or it should not be in state=%i \n"),
00886                reconnect_state_));
00887     break;
00888   }
00889 
00890   // Verify if this acceptor side.
00891   if (this->is_connector_ || connection->is_connector_) {
00892     ACE_ERROR((LM_ERROR,
00893                ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00894                ACE_TEXT(" should NOT be called by the connector side \n")));
00895   }
00896 
00897   if (reconnect_thread_) {
00898       ACE_Thread_Manager::instance()->join(reconnect_thread_);
00899       reconnect_thread_ = 0;
00900   }
00901   // connection->receive_strategy_ = this->receive_strategy_;
00902   // connection->send_strategy_ = this->send_strategy_;
00903   connection->remote_address_ = this->remote_address_;
00904   connection->local_address_ = this->local_address_;
00905   connection->tcp_config_ = this->tcp_config_;
00906   connection->link_ = this->link_;
00907 
00908   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00909         "transfer(%C:%d->%C:%d) passive reconnected. new con %@   "
00910         " old con %@ \n",
00911         this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00912         this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00913         connection, this));
00914 
00915   if (notify_reconnect) {
00916     this->reconnect_state_ = RECONNECTED_STATE;
00917     this->link_->notify(DataLink::RECONNECTED);
00918   }
00919 
00920 }

Here is the call graph for this function:

ACE_INLINE OpenDDS::DCPS::Priority OpenDDS::DCPS::TcpConnection::transport_priority (  )  const

Definition at line 60 of file TcpConnection.inl.

References transport_priority_.

00061 {
00062   return this->transport_priority_;
00063 }

ACE_INLINE OpenDDS::DCPS::Priority & OpenDDS::DCPS::TcpConnection::transport_priority (  ) 

Access TRANSPORT_PRIORITY.value policy value if set.

Definition at line 53 of file TcpConnection.inl.

References transport_priority_.

00054 {
00055   return this->transport_priority_;
00056 }


Member Data Documentation

ACE_Atomic_Op<ACE_SYNCH_MUTEX, bool> OpenDDS::DCPS::TcpConnection::connected_ [private]

Flag indicates if connected or disconnected. It's set to true when actively connecting or passively accepting succeeds and set to false whenever the peer stream is closed.

Definition at line 168 of file TcpConnection.h.

Referenced by active_establishment(), active_open(), active_reconnect_on_new_association(), disconnect(), handle_setup_input(), is_connected(), and passive_reconnect_i().

std::size_t OpenDDS::DCPS::TcpConnection::id_ [private]

Small unique identifying value.

Definition at line 208 of file TcpConnection.h.

Referenced by handle_output(), and id().

Flag indicate this connection object is the connector or acceptor.

Definition at line 171 of file TcpConnection.h.

Referenced by active_establishment(), is_connector(), open(), reconnect(), and transfer().

Last time the connection is re-established.

Definition at line 195 of file TcpConnection.h.

Referenced by active_reconnect_i().

Local address.

Definition at line 177 of file TcpConnection.h.

Referenced by active_establishment(), active_reconnect_i(), handle_setup_input(), open(), and transfer().

The id of the scheduled timer. The timer is scheduled to check if the connection is re-established during the passive_reconnect_duration_. This id controls that the timer is just scheduled once when there are multiple threads detect the lost connection.

Definition at line 189 of file TcpConnection.h.

Referenced by passive_reconnect_i(), and transfer().

Definition at line 203 of file TcpConnection.h.

Referenced by handle_input(), handle_setup_input(), and open().

Definition at line 204 of file TcpConnection.h.

Referenced by handle_setup_input(), and open().

Definition at line 209 of file TcpConnection.h.

Referenced by spawn_reconnect_thread(), transfer(), and ~TcpConnection().

The configuration used by this connection.

Definition at line 180 of file TcpConnection.h.

Referenced by active_establishment(), active_reconnect_i(), open(), passive_reconnect_i(), and transfer().

Definition at line 205 of file TcpConnection.h.

Referenced by handle_setup_input(), and open().

TRANSPORT_PRIORITY.value policy value.

Definition at line 198 of file TcpConnection.h.

Referenced by active_establishment(), handle_setup_input(), open(), and transport_priority().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1