OpenDDS::DCPS::TcpConnection Class Reference

#include <TcpConnection.h>

Inheritance diagram for OpenDDS::DCPS::TcpConnection:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpConnection:

Collaboration graph
[legend]
List of all members.

Public Types

 INIT_STATE
 LOST_STATE
 RECONNECTED_STATE
 PASSIVE_WAITING_STATE
 PASSIVE_TIMEOUT_CALLED_STATE
enum  ReconnectState {
  INIT_STATE, LOST_STATE, RECONNECTED_STATE, PASSIVE_WAITING_STATE,
  PASSIVE_TIMEOUT_CALLED_STATE
}
 States are used during reconnecting. More...

Public Member Functions

 TcpConnection ()
 Passive side constructor (acceptor).
 TcpConnection (const ACE_INET_Addr &remote_address, Priority priority, const TcpInst_rch &config)
 Active side constructor (connector).
virtual ~TcpConnection ()
std::size_t & id ()
int active_open ()
void disconnect ()
virtual int open (void *arg)
void set_receive_strategy (TcpReceiveStrategy *receive_strategy)
void remove_receive_strategy ()
void set_send_strategy (TcpSendStrategy *send_strategy)
void remove_send_strategy ()
virtual int handle_input (ACE_HANDLE)
 We pass this "event" along to the receive_strategy.
virtual int handle_output (ACE_HANDLE)
 Handle back pressure when sending.
virtual int close (u_long)
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
void set_sock_options (TcpInst *tcp_config)
int reconnect (bool on_new_association=false)
bool is_connector () const
bool is_connected () const
 Return true if connection is connected.
void transfer (TcpConnection *connection)
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
void set_datalink (TcpDataLink *link)
void notify_lost_on_backpressure_timeout ()
ACE_INET_Addr get_remote_address ()
void relink_from_send (bool do_suspend)
 Reconnect initiated by send strategy.
void relink_from_recv (bool do_suspend)
 Reconnect initiated by receive strategy.
bool tear_link ()
void shutdown ()
Prioritytransport_priority ()
 Access TRANSPORT_PRIORITY.value policy value if set.
Priority transport_priority () const

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType

Private Member Functions

int active_establishment (bool initiate_connect=true)
int active_reconnect_i ()
int passive_reconnect_i ()
int active_reconnect_on_new_association ()
int handle_setup_input (ACE_HANDLE h)

Private Attributes

LockType reconnect_lock_
ACE_Atomic_Op< ACE_SYNCH_MUTEX,
bool > 
connected_
bool is_connector_
 Flag indicate this connection object is the connector or acceptor.
TcpReceiveStrategy_rch receive_strategy_
 Reference to the receiving strategy.
TcpSendStrategy_rch send_strategy_
 Reference to the send strategy.
ACE_INET_Addr remote_address_
 Remote address.
ACE_INET_Addr local_address_
 Local address.
TcpInst_rch tcp_config_
 The configuration used by this connection.
TcpDataLink_rch link_
 Datalink object which is needed for connection lost callback.
int passive_reconnect_timer_id_
TcpReconnectTask reconnect_task_
ReconnectState reconnect_state_
 The state indicates each step of the reconnecting.
ACE_Time_Value last_reconnect_attempted_
 Last time the connection is re-established.
Priority transport_priority_
 TRANSPORT_PRIORITY.value policy value.
bool shutdown_
 shutdown flag
bool passive_setup_
ACE_Message_Block passive_setup_buffer_
TcpTransport_rch transport_during_setup_
std::size_t id_
 Small unique identifying value.

Detailed Description

Definition at line 33 of file TcpConnection.h.


Member Typedef Documentation

typedef ACE_Guard<LockType> OpenDDS::DCPS::TcpConnection::GuardType [private]

Definition at line 154 of file TcpConnection.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TcpConnection::LockType [private]

Definition at line 153 of file TcpConnection.h.


Member Enumeration Documentation

enum OpenDDS::DCPS::TcpConnection::ReconnectState

States are used during reconnecting.

Enumerator:
INIT_STATE 
LOST_STATE 
RECONNECTED_STATE 
PASSIVE_WAITING_STATE 
PASSIVE_TIMEOUT_CALLED_STATE 

Definition at line 39 of file TcpConnection.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::TcpConnection::TcpConnection (  ) 

Passive side constructor (acceptor).

Definition at line 41 of file TcpConnection.cpp.

References DBG_ENTRY_LVL.

00042   : connected_(false)
00043   , is_connector_(false)
00044   , passive_reconnect_timer_id_(-1)
00045   , reconnect_task_(this)
00046   , reconnect_state_(INIT_STATE)
00047   , last_reconnect_attempted_(ACE_Time_Value::zero)
00048   , transport_priority_(0)  // TRANSPORT_PRIORITY.value default value - 0.
00049   , shutdown_(false)
00050   , passive_setup_(false)
00051   , passive_setup_buffer_(sizeof(ACE_UINT32))
00052   , id_(0)
00053 {
00054   DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00055 
00056   if (this->reconnect_task_.open()) {
00057     ACE_ERROR((LM_ERROR,
00058                ACE_TEXT("(%P|%t) ERROR: Reconnect task failed to open : %p\n"),
00059                ACE_TEXT("open")));
00060   }
00061 
00062 }

OpenDDS::DCPS::TcpConnection::TcpConnection ( const ACE_INET_Addr &  remote_address,
Priority  priority,
const TcpInst_rch config 
)

Active side constructor (connector).

Definition at line 64 of file TcpConnection.cpp.

References DBG_ENTRY_LVL.

00067   : connected_(false)
00068   , is_connector_(true)
00069   , remote_address_(remote_address)
00070   , local_address_(config->local_address())
00071   , tcp_config_(config)
00072   , passive_reconnect_timer_id_(-1)
00073   , reconnect_task_(this)
00074   , reconnect_state_(INIT_STATE)
00075   , last_reconnect_attempted_(ACE_Time_Value::zero)
00076   , transport_priority_(priority)
00077   , shutdown_(false)
00078   , passive_setup_(false)
00079 {
00080   DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00081 
00082   // Open the reconnect task
00083   if (this->reconnect_task_.open()) {
00084     ACE_ERROR((LM_ERROR,
00085                ACE_TEXT("(%P|%t) ERROR: Reconnect task failed to open : %p\n"),
00086                ACE_TEXT("open")));
00087   }
00088 
00089 }

OpenDDS::DCPS::TcpConnection::~TcpConnection (  )  [virtual]

Definition at line 90 of file TcpConnection.cpp.

References OpenDDS::DCPS::QueueTaskBase< T >::close(), DBG_ENTRY_LVL, link_, reconnect_task_, and OpenDDS::DCPS::Transport_debug_level.

00091 {
00092   DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6);
00093 
00094   // The Reconnect task belongs to the Connection object.
00095   // Cleanup before leaving the house.
00096   this->reconnect_task_.close(1);
00097   //this->reconnect_task_.wait ();
00098 
00099   if (!this->link_.is_nil()) {
00100     if (Transport_debug_level > 5) {
00101       ACE_DEBUG((LM_DEBUG,
00102                  ACE_TEXT("(%P|%t) TcpConnection::~TcpConnection: about to notify link[%@] connection deleted\n"),
00103                  this->link_.in()));
00104     }
00105     this->link_->notify_connection_deleted();
00106   }
00107 
00108 }


Member Function Documentation

int OpenDDS::DCPS::TcpConnection::active_establishment ( bool  initiate_connect = true  )  [private]

Attempt an active connection establishment to the remote address. The local address is sent to the remote (passive) side to identify ourselves to the remote side. Note this method is not thread protected. The caller need acquire the reconnect_lock_ before calling this function.

Definition at line 455 of file TcpConnection.cpp.

References OpenDDS::DCPS::DirectPriorityMapper::codepoint(), connected_, DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), link_, local_address_, remote_address_, set_sock_options(), tcp_config_, and VDBG.

Referenced by active_open().

00456 {
00457   DBG_ENTRY_LVL("TcpConnection","active_establishment",6);
00458 
00459   // Safety check - This should not happen since is_connector_ defaults to
00460   // true and the role in a connection connector is not changed when reconnecting.
00461   if (this->is_connector_ == false) {
00462     ACE_ERROR_RETURN((LM_ERROR,
00463                       "(%P|%t) ERROR: Failed to connect because it's previously an acceptor.\n"),
00464                      -1);
00465   }
00466 
00467   if (this->shutdown_)
00468     return -1;
00469 
00470   // Now use a connector object to establish the connection.
00471   ACE_SOCK_Connector connector;
00472 
00473   if (initiate_connect && connector.connect(this->peer(), remote_address_) != 0) {
00474 
00475     ACE_ERROR_RETURN((LM_ERROR,
00476                       ACE_TEXT("(%P|%t) ERROR: Failed to connect. %p\n%C"),
00477                       ACE_TEXT("connect"), this->tcp_config_->dump_to_str().c_str()),
00478                      -1);
00479 
00480   } else {
00481     this->connected_ = true;
00482     const std::string remote_host = this->remote_address_.get_host_addr();
00483     VDBG((LM_DEBUG, "(%P|%t) DBG:   active_establishment(%C:%d->%C:%d)\n",
00484           this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00485           remote_host.c_str(), this->remote_address_.get_port_number()));
00486   }
00487 
00488   // Set the DiffServ codepoint according to the priority value.
00489   DirectPriorityMapper mapper(this->transport_priority_);
00490   this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer());
00491 
00492   set_sock_options(tcp_config_.in());
00493 
00494   // In order to complete the connection establishment from the active
00495   // side, we need to tell the remote side about our public address.
00496   // It will use that as an "identifier" of sorts.  To the other
00497   // (passive) side, our local_address that we send here will be known
00498   // as the remote_address.
00499   std::string address = tcp_config_->get_public_address();
00500   ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1;
00501 
00502   ACE_UINT32 nlen = htonl(len);
00503 
00504   if (this->peer().send_n(&nlen,
00505                           sizeof(ACE_UINT32)) == -1) {
00506     // TBD later - Anything we are supposed to do to close the connection.
00507     ACE_ERROR_RETURN((LM_ERROR,
00508                       "(%P|%t) ERROR: Unable to send address string length to "
00509                       "the passive side to complete the active connection "
00510                       "establishment.\n"),
00511                      -1);
00512   }
00513 
00514   if (this->peer().send_n(address.c_str(), len)  == -1) {
00515     // TBD later - Anything we are supposed to do to close the connection.
00516     ACE_ERROR_RETURN((LM_ERROR,
00517                       "(%P|%t) ERROR: Unable to send our address to "
00518                       "the passive side to complete the active connection "
00519                       "establishment.\n"),
00520                      -1);
00521   }
00522 
00523   ACE_UINT32 npriority = htonl(this->transport_priority_);
00524 
00525   if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) {
00526     // TBD later - Anything we are supposed to do to close the connection.
00527     ACE_ERROR_RETURN((LM_ERROR,
00528                       "(%P|%t) ERROR: Unable to send publication priority to "
00529                       "the passive side to complete the active connection "
00530                       "establishment.\n"),
00531                      -1);
00532   }
00533 
00534   return 0;
00535 }

int OpenDDS::DCPS::TcpConnection::active_open (  ) 

Protocol setup (handshake) on the active side. The local address is sent to the remote (passive) side to identify ourselves to the remote side.

Definition at line 593 of file TcpConnection.cpp.

References active_establishment(), connected_, DBG_ENTRY_LVL, and reconnect_lock_.

Referenced by open().

00594 {
00595   DBG_ENTRY_LVL("TcpConnection","active_open",6);
00596 
00597   GuardType guard(reconnect_lock_);
00598 
00599   if (connected_.value()) {
00600     return 0;
00601   }
00602 
00603   return active_establishment(false /* !initiate_connect */);
00604 }

int OpenDDS::DCPS::TcpConnection::active_reconnect_i (  )  [private]

Definition at line 680 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, INIT_STATE, last_reconnect_attempted_, link_, local_address_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, reconnect_delay(), reconnect_state_, OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, send_strategy_, tcp_config_, and VDBG.

00681 {
00682   DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6);
00683 
00684   GuardType guard(this->reconnect_lock_);
00685 
00686   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00687         "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %d\n",
00688         this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00689         this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00690         this->reconnect_state_));
00691   if (DCPS_debug_level >= 1) {
00692     ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::"
00693           "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %d\n",
00694           this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00695           this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00696           this->reconnect_state_));
00697   }
00698   // We need reset the state to INIT_STATE if we are previously reconnected.
00699   // This would allow re-establishing connection after the re-established
00700   // connection lost again.
00701   if (ACE_OS::gettimeofday() - this->last_reconnect_attempted_ > reconnect_delay
00702       && this->reconnect_state_ == RECONNECTED_STATE) {
00703     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00704           "We are in RECONNECTED_STATE and now flip reconnect state to INIT_STATE.\n"));
00705     this->reconnect_state_ = INIT_STATE;
00706   }
00707 
00708   if (this->reconnect_state_ == INIT_STATE) {
00709     // Suspend send once.
00710     this->send_strategy_->suspend_send();
00711 
00712     this->disconnect();
00713 
00714     if (this->tcp_config_->conn_retry_attempts_ > 0) {
00715       this->link_->notify(DataLink::DISCONNECTED);
00716     }
00717 
00718     // else the conn_retry_attempts is 0 then we do not need this extra
00719     // notify_disconnected() since the user application should get the
00720     // notify_lost() without delay.
00721 
00722     double retry_delay_msec = this->tcp_config_->conn_retry_initial_delay_;
00723     int ret = -1;
00724 
00725     for (int i = 0; i < this->tcp_config_->conn_retry_attempts_; ++i) {
00726       ret = this->active_establishment();
00727 
00728       if (this->shutdown_)
00729         break;
00730 
00731       if (ret == -1) {
00732         ACE_Time_Value delay_tv(((int)retry_delay_msec)/1000,
00733                                 ((int)retry_delay_msec)%1000*1000);
00734         ACE_OS::sleep(delay_tv);
00735         retry_delay_msec *= this->tcp_config_->conn_retry_backoff_multiplier_;
00736 
00737       } else {
00738         break;
00739       }
00740     }
00741 
00742     if (ret == -1) {
00743       if (this->tcp_config_->conn_retry_attempts_ > 0) {
00744         ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00745                    this->link_->get_transport_impl()->config()->name().c_str(),
00746                    this->remote_address_.get_host_addr(),
00747                    this->remote_address_.get_port_number()));
00748 
00749       } else {
00750         ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C:%d.\n",
00751                    this->link_->get_transport_impl()->config()->name().c_str(),
00752                    this->remote_address_.get_host_addr(),
00753                    this->remote_address_.get_port_number()));
00754       }
00755 
00756       this->reconnect_state_ = LOST_STATE;
00757       this->link_->notify(DataLink::LOST);
00758       this->send_strategy_->terminate_send();
00759 
00760     } else {
00761       ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C:%d.\n",
00762                  this->link_->get_transport_impl()->config()->name().c_str(),
00763                  this->remote_address_.get_host_addr(),
00764                  this->remote_address_.get_port_number()));
00765       if (this->receive_strategy_->get_reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) {
00766         ACE_ERROR_RETURN((LM_ERROR,
00767                           "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_i() can't register "
00768                           "with reactor %X %p\n", this, ACE_TEXT("register_handler")),
00769                          -1);
00770       }
00771       this->reconnect_state_ = RECONNECTED_STATE;
00772       this->link_->notify(DataLink::RECONNECTED);
00773       this->send_strategy_->resume_send();
00774     }
00775 
00776     this->last_reconnect_attempted_ = ACE_OS::gettimeofday();
00777   }
00778 
00779   return this->reconnect_state_ == LOST_STATE ? -1 : 0;
00780 }

int OpenDDS::DCPS::TcpConnection::active_reconnect_on_new_association (  )  [private]

Definition at line 607 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, and INIT_STATE.

Referenced by reconnect().

00608 {
00609   DBG_ENTRY_LVL("TcpConnection","active_reconnect_on_new_association",6);
00610   GuardType guard(this->reconnect_lock_);
00611 
00612   if (this->connected_ == true)
00613     return 0;
00614 
00615   else if (this->active_establishment() == 0) {
00616     this->reconnect_state_ = INIT_STATE;
00617     this->send_strategy_->resume_send();
00618     return 0;
00619   }
00620 
00621   return -1;
00622 }

int OpenDDS::DCPS::TcpConnection::close ( u_long   )  [virtual]

Definition at line 355 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, and disconnect().

00356 {
00357   DBG_ENTRY_LVL("TcpConnection","close",6);
00358 
00359   // TBD SOON - Find out exactly when close() is called.
00360   //            I have no clue when and who might call this.
00361 
00362   if (!this->send_strategy_.is_nil())
00363     this->send_strategy_->terminate_send();
00364 
00365   this->disconnect();
00366 
00367   return 0;
00368 }

void OpenDDS::DCPS::TcpConnection::disconnect (  ) 

This will be called by the DataLink (that "owns" us) when the TcpTransport has been told to shutdown(), or when the DataLink finds itself no longer needed, and is "self-releasing".

Definition at line 111 of file TcpConnection.cpp.

References connected_, and DBG_ENTRY_LVL.

Referenced by active_reconnect_i(), close(), handle_close(), and notify_lost_on_backpressure_timeout().

00112 {
00113   DBG_ENTRY_LVL("TcpConnection","disconnect",6);
00114   this->connected_ = false;
00115 
00116   if (!this->receive_strategy_.is_nil()) {
00117     this->receive_strategy_->get_reactor()->remove_handler(this,
00118                                                            ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00119   }
00120 
00121   this->peer().close();
00122 
00123 }

ACE_INLINE ACE_INET_Addr OpenDDS::DCPS::TcpConnection::get_remote_address (  ) 

Definition at line 60 of file TcpConnection.inl.

References remote_address_.

00061 {
00062   return this->remote_address_;
00063 }

int OpenDDS::DCPS::TcpConnection::handle_close ( ACE_HANDLE  ,
ACE_Reactor_Mask   
) [virtual]

Definition at line 371 of file TcpConnection.cpp.

References OpenDDS::DCPS::QueueTaskBase< T >::add(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, OpenDDS::DCPS::DO_RECONNECT, OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, receive_strategy_, reconnect_task_, remote_address_, and send_strategy_.

00372 {
00373   DBG_ENTRY_LVL("TcpConnection","handle_close",6);
00374 
00375   if (DCPS_debug_level >= 1) {
00376     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C:%d.\n",
00377                this->link_->get_transport_impl()->config()->name().c_str(),
00378                this->remote_address_.get_host_addr(),
00379                this->remote_address_.get_port_number()));
00380   }
00381 
00382   bool graceful = !this->receive_strategy_.is_nil() && this->receive_strategy_->gracefully_disconnected();
00383 
00384   if (!this->send_strategy_.is_nil()) {
00385     if (graceful) {
00386       this->send_strategy_->terminate_send();
00387     } else {
00388       this->send_strategy_->suspend_send();
00389     }
00390   }
00391 
00392   this->disconnect();
00393 
00394   if (graceful) {
00395     this->link_->notify(DataLink::DISCONNECTED);
00396   } else {
00397     ReconnectOpType op = DO_RECONNECT;
00398     this->reconnect_task_.add(op);
00399   }
00400 
00401   return 0;
00402 }

int OpenDDS::DCPS::TcpConnection::handle_input ( ACE_HANDLE   )  [virtual]

We pass this "event" along to the receive_strategy.

Definition at line 312 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, handle_setup_input(), OpenDDS::DCPS::RcHandle< T >::is_nil(), passive_setup_, and receive_strategy_.

00313 {
00314   DBG_ENTRY_LVL("TcpConnection","handle_input",6);
00315 
00316   if (passive_setup_) {
00317     return handle_setup_input(fd);
00318   }
00319 
00320   if (receive_strategy_.is_nil()) {
00321     return 0;
00322   }
00323 
00324   return receive_strategy_->handle_dds_input(fd);
00325 }

int OpenDDS::DCPS::TcpConnection::handle_output ( ACE_HANDLE   )  [virtual]

Handle back pressure when sending.

Definition at line 328 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, id_, send_strategy_, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO.

00329 {
00330   DBG_ENTRY_LVL("TcpConnection","handle_output",6);
00331 
00332   if (!this->send_strategy_.is_nil()) {
00333     if (DCPS_debug_level > 9) {
00334       ACE_DEBUG((LM_DEBUG,
00335                  ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ")
00336                  ACE_TEXT("sending queued data.\n"),
00337                  id_));
00338     }
00339 
00340     // Process data to be sent from the queue.
00341     if (ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO
00342         != send_strategy_->perform_work()) {
00343 
00344       // Stop handling output ready events when there is nothing to output.
00345       // N.B. This calls back into the reactor.  Is the reactor lock
00346       //      recursive?
00347       send_strategy_->schedule_output();
00348     }
00349   }
00350 
00351   return 0;
00352 }

int OpenDDS::DCPS::TcpConnection::handle_setup_input ( ACE_HANDLE  h  )  [private]

During the connection setup phase, the passive side sets passive_setup_, redirecting handle_input() events here (there is no recv strategy yet).

Definition at line 240 of file TcpConnection.cpp.

References connected_, local_address_, passive_setup_, passive_setup_buffer_, reconnect_state_, remote_address_, transport_during_setup_, transport_priority_, VDBG, and VDBG_LVL.

Referenced by handle_input().

00241 {
00242   const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(),
00243                                   passive_setup_buffer_.space(),
00244                                   &ACE_Time_Value::zero);
00245 
00246   if (ret < 0 && errno == ETIME) {
00247     return 0;
00248   }
00249 
00250   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input %@ "
00251             "recv returned %b %m.\n", this, ret), 4);
00252 
00253   if (ret <= 0) {
00254     return -1;
00255   }
00256 
00257   passive_setup_buffer_.wr_ptr(ret);
00258   // Parse the setup message: <len><addr><prio>
00259   // len and prio are network order 32-bit ints
00260   // addr is a string of length len, including null
00261   ACE_UINT32 nlen = 0;
00262 
00263   if (passive_setup_buffer_.length() >= sizeof(nlen)) {
00264 
00265     ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen));
00266     passive_setup_buffer_.rd_ptr(sizeof(nlen));
00267     ACE_UINT32 hlen = ntohl(nlen);
00268     passive_setup_buffer_.size(hlen + 2 * sizeof(nlen));
00269 
00270     ACE_UINT32 nprio = 0;
00271 
00272     if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) {
00273 
00274       const std::string bufstr(passive_setup_buffer_.rd_ptr());
00275       const NetworkAddress network_order_address(bufstr);
00276       network_order_address.to_addr(remote_address_);
00277 
00278       ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio));
00279       transport_priority_ = ntohl(nprio);
00280 
00281       passive_setup_buffer_.reset();
00282       passive_setup_ = false;
00283 
00284       VDBG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input "
00285             "%@ %C:%d->%C:%d, priority==%d, reconnect_state = %d\n", this,
00286             remote_address_.get_host_addr(), remote_address_.get_port_number(),
00287             local_address_.get_host_addr(), local_address_.get_port_number(),
00288             transport_priority_, reconnect_state_));
00289 
00290       // remove from reactor, normal recv strategy setup will add us back
00291       if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) {
00292         VDBG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input "
00293               "remove_handler failed %m.\n"));
00294       }
00295 
00296       const TcpConnection_rch self(this, false);
00297 
00298       transport_during_setup_->passive_connection(remote_address_, self);
00299       transport_during_setup_ = 0;
00300       connected_ = true;
00301 
00302       return 0;
00303     }
00304   }
00305 
00306   passive_setup_buffer_.rd_ptr(passive_setup_buffer_.base());
00307 
00308   return 0;
00309 }

int OpenDDS::DCPS::TcpConnection::handle_timeout ( const ACE_Time_Value &  tv,
const void *  arg 
)

A timer is scheduled on acceptor side to check if a new connection is accepted after the connection is lost.

Definition at line 785 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), DBG_ENTRY_LVL, link_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_state_, RECONNECTED_STATE, remote_address_, and tear_link().

00787 {
00788   DBG_ENTRY_LVL("TcpConnection","handle_timeout",6);
00789 
00790   GuardType guard(this->reconnect_lock_);
00791 
00792   switch (this->reconnect_state_) {
00793   case PASSIVE_WAITING_STATE: {
00794     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00795                this->link_->get_transport_impl()->config()->name().c_str(),
00796                this->remote_address_.get_host_addr(),
00797                this->remote_address_.get_port_number()));
00798 
00799     this->reconnect_state_ = PASSIVE_TIMEOUT_CALLED_STATE;
00800     // We stay in PASSIVE_TIMEOUT_CALLED_STATE indicates there is no new connection.
00801     // Now we need declare the connection is lost.
00802     this->link_->notify(DataLink::LOST);
00803 
00804     // The handle_timeout may be called after the connection is re-established
00805     // and the send strategy of this old connection is reset to nil.
00806     if (!this->send_strategy_.is_nil())
00807       this->send_strategy_->terminate_send();
00808 
00809     this->reconnect_state_ = LOST_STATE;
00810 
00811     this->tear_link();
00812 
00813   }
00814   break;
00815 
00816   case RECONNECTED_STATE:
00817     // reconnected successfully.
00818     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C:%d.\n",
00819                this->link_->get_transport_impl()->config()->name().c_str(),
00820                this->remote_address_.get_host_addr(),
00821                this->remote_address_.get_port_number()));
00822     break;
00823 
00824   default :
00825     ACE_ERROR((LM_ERROR,
00826                ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ")
00827                ACE_TEXT(" unknown state or it should not be in state=%d \n"), this->reconnect_state_));
00828     break;
00829   }
00830 
00831   // Take back the "copy" we gave to reactor when we schedule the timer.
00832   this->_remove_ref();
00833 
00834   return 0;
00835 }

ACE_INLINE std::size_t & OpenDDS::DCPS::TcpConnection::id (  ) 

Definition at line 18 of file TcpConnection.inl.

References id_.

00019 {
00020   return id_;
00021 }

ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connected (  )  const

Return true if connection is connected.

Definition at line 46 of file TcpConnection.inl.

References connected_.

00047 {
00048   return this->connected_.value();
00049 }

ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connector (  )  const

Return true if the object represents the connector side, otherwise it's the acceptor side. The acceptor/connector role is not changed when re-establishing the connection.

Definition at line 40 of file TcpConnection.inl.

References is_connector_.

00041 {
00042   return this->is_connector_;
00043 }

void OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout (  ) 

This function is called when the backpressure occurs and timed out after "max_output_pause_period". The lost connection notification should be sent and the connection needs be closed since we declared it as a "lost" connection.

Definition at line 929 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, disconnect(), INIT_STATE, link_, OpenDDS::DCPS::DataLink::LOST, LOST_STATE, reconnect_state_, and send_strategy_.

00930 {
00931   DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6);
00932   bool notify_lost = false;
00933   {
00934     GuardType guard(this->reconnect_lock_);
00935 
00936     if (this->reconnect_state_ == INIT_STATE) {
00937       this->reconnect_state_ = LOST_STATE;
00938       notify_lost = true;
00939 
00940       this->disconnect();
00941     }
00942   }
00943 
00944   if (notify_lost) {
00945     this->link_->notify(DataLink::LOST);
00946     this->send_strategy_->terminate_send();
00947   }
00948 
00949 }

int OpenDDS::DCPS::TcpConnection::open ( void *  arg  )  [virtual]

Definition at line 150 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject< T >::_add_ref(), active_open(), DBG_ENTRY_LVL, OpenDDS::DCPS::TcpAcceptor::get_configuration(), OpenDDS::DCPS::RcHandle< T >::in(), is_connector_, OpenDDS::DCPS::RcHandle< T >::is_nil(), link_, local_address_, passive_setup_, passive_setup_buffer_, remote_address_, set_sock_options(), tcp_config_, OpenDDS::DCPS::TcpAcceptor::transport(), transport_during_setup_, transport_priority_, and VDBG_LVL.

00151 {
00152   DBG_ENTRY_LVL("TcpConnection","open",6);
00153 
00154   if (is_connector_) {
00155 
00156     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::open active.\n"), 2);
00157     // Take over the refcount from TcpTransport::connect_datalink().
00158     const TcpConnection_rch self(this);
00159     const TcpTransport_rch transport = link_->get_transport_impl();
00160 
00161     const bool is_loop(local_address_ == remote_address_);
00162     const PriorityKey key(transport_priority_, remote_address_,
00163                           is_loop, false /* !active */);
00164 
00165     int active_open_ = active_open();
00166 
00167     int connect_tcp_datalink_ = transport->connect_tcp_datalink(link_, self);
00168 
00169     if (active_open_ == -1 || connect_tcp_datalink_ == -1) {
00170       // if (active_open() == -1 ||
00171       //       transport->connect_tcp_datalink(link_, self) == -1) {
00172 
00173       transport->async_connect_failed(key);
00174 
00175       return -1;
00176     }
00177 
00178     return 0;
00179   }
00180 
00181   // The passed-in arg is really the acceptor object that created this
00182   // TcpConnection object, and is also the caller of this open()
00183   // method.  We need to cast the arg to the TcpAcceptor* type.
00184   TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg);
00185 
00186   if (acceptor == 0) {
00187     // The cast failed.
00188     ACE_ERROR_RETURN((LM_ERROR,
00189                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00190                       ACE_TEXT("failed to cast void* arg to ")
00191                       ACE_TEXT("TcpAcceptor* type.\n")),
00192                      -1);
00193   }
00194 
00195   // Now we need to ask the TcpAcceptor object to provide us with
00196   // a pointer to the TcpTransport object that "owns" the acceptor.
00197   TcpTransport_rch transport = acceptor->transport();
00198 
00199   if (transport.is_nil()) {
00200     // The acceptor gave us a nil transport (smart) pointer.
00201     ACE_ERROR_RETURN((LM_ERROR,
00202                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00203                       ACE_TEXT("acceptor's transport is nil.\n")),
00204                      -1);
00205   }
00206 
00207   TcpInst* tcp_config = acceptor->get_configuration();
00208 
00209   // Keep a "copy" of the reference to TcpInst object
00210   // for ourselves.
00211   tcp_config->_add_ref();
00212   tcp_config_ = tcp_config;
00213   local_address_ = tcp_config_->local_address();
00214 
00215   set_sock_options(tcp_config_.in());
00216 
00217   // We expect that the active side of the connection (the remote side
00218   // in this case) will supply its listening ACE_INET_Addr as the first
00219   // message it sends to the socket.  This is a one-way connection
00220   // establishment protocol message.
00221   passive_setup_ = true;
00222   transport_during_setup_ = transport;
00223   passive_setup_buffer_.size(sizeof(ACE_UINT32));
00224 
00225   if (reactor()->register_handler(this, READ_MASK) == -1) {
00226     ACE_ERROR_RETURN((LM_ERROR,
00227                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00228                       ACE_TEXT("unable to register with the reactor.%p\n"),
00229                       ACE_TEXT("register_handler")),
00230                      -1);
00231   }
00232 
00233   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::open passive handle=%d.\n",
00234             static_cast<int>(intptr_t(get_handle()))), 2);
00235 
00236   return 0;
00237 }

int OpenDDS::DCPS::TcpConnection::passive_reconnect_i (  )  [private]

Definition at line 628 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), connected_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataLink::DISCONNECTED, INIT_STATE, link_, passive_reconnect_timer_id_, PASSIVE_WAITING_STATE, and reconnect_state_.

00629 {
00630   DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6);
00631   GuardType guard(this->reconnect_lock_);
00632 
00633   // The passive_reconnect_timer_id_ is used as flag to allow the timer scheduled just once.
00634   if (this->reconnect_state_ == INIT_STATE) {
00635     // Mark the connection lost since the recv/send just failed.
00636     this->connected_ = false;
00637 
00638     if (this->tcp_config_->passive_reconnect_duration_ == 0)
00639       return -1;
00640 
00641     ACE_Time_Value timeout(this->tcp_config_->passive_reconnect_duration_/1000,
00642                            this->tcp_config_->passive_reconnect_duration_%1000 * 1000);
00643     this->reconnect_state_ = PASSIVE_WAITING_STATE;
00644     this->link_->notify(DataLink::DISCONNECTED);
00645 
00646     // It is possible that the passive reconnect is called after the new connection
00647     // is accepted and the receive_strategy of this old connection is reset to nil.
00648     if (!this->receive_strategy_.is_nil()) {
00649       TcpReceiveStrategy* rs
00650         = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00651 
00652       // Give a copy to reactor.
00653       this->_add_ref();
00654       this->passive_reconnect_timer_id_ = rs->get_reactor()->schedule_timer(this, 0, timeout);
00655 
00656       if (this->passive_reconnect_timer_id_ == -1) {
00657         this->_remove_ref();
00658         ACE_ERROR_RETURN((LM_ERROR,
00659                           ACE_TEXT("(%P|%t) ERROR: TcpConnection::passive_reconnect_i")
00660                           ACE_TEXT(", %p.\n"), ACE_TEXT("schedule_timer")),
00661                          -1);
00662       }
00663     }
00664   }
00665 
00666   return 0;
00667 }

int OpenDDS::DCPS::TcpConnection::reconnect ( bool  on_new_association = false  ) 

This function is called to re-establish the connection. If this object is the connector side of the connection then it tries to reconnect to the remote, if it's the acceptor side of the connection then it schedules a timer to check if it passively accepted a connection from remote. The on_new_association true indicates this is called when the connection is previous lost and new association is added. The connector side needs to try to actively reconnect to remote.

Definition at line 545 of file TcpConnection.cpp.

References active_reconnect_on_new_association(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, and remote_address_.

00546 {
00547   DBG_ENTRY_LVL("TcpConnection","reconnect",6);
00548   if (DCPS_debug_level >= 1) {
00549     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect initiated on transport: %C to %C:%d.\n",
00550                this->link_->get_transport_impl()->config()->name().c_str(),
00551                this->remote_address_.get_host_addr(),
00552                this->remote_address_.get_port_number()));
00553   }
00554 
00555   if (on_new_association) {
00556     if (DCPS_debug_level >= 1) {
00557       ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect on new association\n"));
00558     }
00559     return this->active_reconnect_on_new_association();
00560   }
00561 
00562   // If on_new_association is false, it's called by the reconnect task.
00563   // We need make sure if the link release is pending. If does, do
00564   // not try to reconnect.
00565   else if (!this->link_->is_release_pending()) {
00566     if (DCPS_debug_level >= 1) {
00567       ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect release not currently pending\n"));
00568     }
00569     // Try to reconnect if it's connector previously.
00570     if (this->is_connector_ && this->active_reconnect_i() == -1) {
00571       if (DCPS_debug_level >= 1) {
00572         ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is connector but active_reconnect_i failed\n"));
00573       }
00574       return -1;
00575     }
00576 
00577     // Schedule a timer to see if a incoming connection is accepted when timeout.
00578     else if (!this->is_connector_ && this->passive_reconnect_i() == -1) {
00579       if (DCPS_debug_level >= 1) {
00580         ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is acceptor but passive_reconnect_i failed\n"));
00581       }
00582       return -1;
00583     }
00584 
00585   }
00586   if (DCPS_debug_level >= 1) {
00587     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect returning 0\n"));
00588   }
00589   return 0;
00590 }

void OpenDDS::DCPS::TcpConnection::relink_from_recv ( bool  do_suspend  ) 

Reconnect initiated by receive strategy.

This is called by TcpReceiveStrategy when a disconnect is detected. It simply suspends any sends and lets the handle_close() handle the reconnect logic.

Definition at line 971 of file TcpConnection.cpp.

References DBG_ENTRY_LVL.

00972 {
00973   DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6);
00974 
00975   if (do_suspend && !this->send_strategy_.is_nil())
00976     this->send_strategy_->suspend_send();
00977 }

void OpenDDS::DCPS::TcpConnection::relink_from_send ( bool  do_suspend  ) 

Reconnect initiated by send strategy.

This is called by TcpSendStrategy when a send fails and a reconnect should be initiated. This method suspends any sends and kicks the reconnect thread into action.

Definition at line 956 of file TcpConnection.cpp.

References OpenDDS::DCPS::QueueTaskBase< T >::add(), DBG_ENTRY_LVL, OpenDDS::DCPS::DO_RECONNECT, and reconnect_task_.

00957 {
00958   DBG_ENTRY_LVL("TcpConnection","relink_from_send",6);
00959 
00960   if (do_suspend && !this->send_strategy_.is_nil())
00961     this->send_strategy_->suspend_send();
00962 
00963   ReconnectOpType op = DO_RECONNECT;
00964   this->reconnect_task_.add(op);
00965 }

ACE_INLINE void OpenDDS::DCPS::TcpConnection::remove_receive_strategy (  ) 

Definition at line 24 of file TcpConnection.inl.

References DBG_ENTRY_LVL, and receive_strategy_.

00025 {
00026   DBG_ENTRY_LVL("TcpConnection","remove_receive_strategy",6);
00027 
00028   this->receive_strategy_ = 0;
00029 }

ACE_INLINE void OpenDDS::DCPS::TcpConnection::remove_send_strategy (  ) 

Definition at line 32 of file TcpConnection.inl.

References DBG_ENTRY_LVL, and send_strategy_.

00033 {
00034   DBG_ENTRY_LVL("TcpConnection","remove_send_strategy",6);
00035 
00036   this->send_strategy_ = 0;
00037 }

ACE_INLINE void OpenDDS::DCPS::TcpConnection::set_datalink ( TcpDataLink link  ) 

Cache the reference to the datalink object for lost connection callbacks.

Definition at line 52 of file TcpConnection.inl.

References OpenDDS::DCPS::RcObject< T >::_add_ref(), and link_.

00053 {
00054   // Keep a "copy" of the reference to the data link for ourselves.
00055   link->_add_ref();
00056   this->link_ = link;
00057 }

void OpenDDS::DCPS::TcpConnection::set_receive_strategy ( TcpReceiveStrategy receive_strategy  ) 

Definition at line 129 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject< T >::_add_ref(), and DBG_ENTRY_LVL.

00130 {
00131   DBG_ENTRY_LVL("TcpConnection","set_receive_strategy",6);
00132 
00133   // Make a "copy" for ourselves
00134   receive_strategy->_add_ref();
00135   this->receive_strategy_ = receive_strategy;
00136 }

void OpenDDS::DCPS::TcpConnection::set_send_strategy ( TcpSendStrategy send_strategy  ) 

Give a "copy" of the TcpSendStrategy object to this connection object.

Definition at line 140 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject< T >::_add_ref(), and DBG_ENTRY_LVL.

00141 {
00142   DBG_ENTRY_LVL("TcpConnection","set_send_strategy",6);
00143 
00144   // Make a "copy" for ourselves
00145   send_strategy->_add_ref();
00146   this->send_strategy_ = send_strategy;
00147 }

void OpenDDS::DCPS::TcpConnection::set_sock_options ( TcpInst tcp_config  ) 

Definition at line 405 of file TcpConnection.cpp.

References OpenDDS::DCPS::TcpInst::enable_nagle_algorithm_.

Referenced by active_establishment(), and open().

00406 {
00407 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00408   int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00409   int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00410   //ACE_SOCK_Stream sock = ACE_static_cast(ACE_SOCK_Stream, this->peer() );
00411 #  if !defined (ACE_LACKS_SOCKET_BUFSIZ)
00412 
00413   // A little screwy double negative logic: disabling nagle involves
00414   // enabling TCP_NODELAY
00415   int opt = (tcp_config->enable_nagle_algorithm_ == false);
00416 
00417   if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
00418     ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n"));
00419   }
00420 
00421   if (this->peer().set_option(SOL_SOCKET,
00422                               SO_SNDBUF,
00423                               (void *) &snd_size,
00424                               sizeof(snd_size)) == -1
00425       && errno != ENOTSUP) {
00426     ACE_ERROR((LM_ERROR,
00427                "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
00428                snd_size));
00429     return;
00430   }
00431 
00432   if (this->peer().set_option(SOL_SOCKET,
00433                               SO_RCVBUF,
00434                               (void *) &rcv_size,
00435                               sizeof(int)) == -1
00436       && errno != ENOTSUP) {
00437     ACE_ERROR((LM_ERROR,
00438                "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m \n",
00439                rcv_size));
00440     return;
00441   }
00442 
00443 #  else
00444   ACE_UNUSED_ARG(tcp_config);
00445   ACE_UNUSED_ARG(snd_size);
00446   ACE_UNUSED_ARG(rcv_size);
00447 #  endif /* !ACE_LACKS_SOCKET_BUFSIZ */
00448 
00449 #else
00450   ACE_UNUSED_ARG(tcp_config);
00451 #endif /* !ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
00452 }

void OpenDDS::DCPS::TcpConnection::shutdown (  ) 

Definition at line 988 of file TcpConnection.cpp.

References OpenDDS::DCPS::QueueTaskBase< T >::close(), DBG_ENTRY_LVL, reconnect_task_, and shutdown_.

00989 {
00990   DBG_ENTRY_LVL("TcpConnection","shutdown",6);
00991   this->shutdown_ = true;
00992 
00993   this->reconnect_task_.close(1);
00994 
00995 }

bool OpenDDS::DCPS::TcpConnection::tear_link (  ) 

Called by the reconnect task to inform us that the link & any associated data can be torn down. This call is done with no DCPS/transport locks held.

Definition at line 980 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, and link_.

Referenced by handle_timeout().

00981 {
00982   DBG_ENTRY_LVL("TcpConnection","tear_link",6);
00983 
00984   return this->link_->release_resources();
00985 }

void OpenDDS::DCPS::TcpConnection::transfer ( TcpConnection connection  ) 

This object would be "old" connection object and the provided is the new connection object. The "old" connection object will copy its states to to the "new" connection object. This is called by the TcpDataLink when a new connection is accepted (with a new TcpConnection object). We need make the state in "new" connection object consistent with the "old" connection object.

Definition at line 844 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), OpenDDS::DCPS::QueueTaskBase< T >::close(), DBG_ENTRY_LVL, OpenDDS::DCPS::TcpReceiveStrategy::get_reactor(), OpenDDS::DCPS::RcHandle< T >::in(), INIT_STATE, is_connector_, link_, local_address_, LOST_STATE, passive_reconnect_timer_id_, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, receive_strategy_, reconnect_state_, reconnect_task_, OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, send_strategy_, tcp_config_, and VDBG.

00845 {
00846   DBG_ENTRY_LVL("TcpConnection","transfer",6);
00847 
00848   GuardType guard(this->reconnect_lock_);
00849 
00850   bool notify_reconnect = false;
00851 
00852   switch (this->reconnect_state_) {
00853   case INIT_STATE:
00854     // We have not detected the lost connection and the peer is faster than us and
00855     // re-established the connection. so do not notify reconnected.
00856     break;
00857 
00858   case LOST_STATE:
00859 
00860     // The reconnect timed out.
00861   case PASSIVE_TIMEOUT_CALLED_STATE:
00862     // TODO: If the handle_timeout is called before the old connection
00863     // transfer its state to new connection then should we disconnect
00864     // the new connection or keep it alive ?
00865     // I think we should keep the connection, the user will get a
00866     // lost connection notification and then a reconnected notification.
00867     notify_reconnect = true;
00868     break;
00869 
00870   case PASSIVE_WAITING_STATE: {
00871     TcpReceiveStrategy* rs
00872       = dynamic_cast <TcpReceiveStrategy*>(this->receive_strategy_.in());
00873 
00874     // Cancel the timer since we got new connection.
00875     if (rs->get_reactor()->cancel_timer(this) == -1) {
00876       ACE_ERROR((LM_ERROR,
00877                  ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00878                  ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
00879 
00880     } else
00881       passive_reconnect_timer_id_ = -1;
00882 
00883     this->_remove_ref();
00884     notify_reconnect = true;
00885   }
00886   break;
00887 
00888   default :
00889     ACE_ERROR((LM_ERROR,
00890                ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00891                ACE_TEXT(" unknown state or it should not be in state=%d \n"), this->reconnect_state_));
00892     break;
00893   }
00894 
00895   // Verify if this acceptor side.
00896   if (this->is_connector_ || connection->is_connector_) {
00897     ACE_ERROR((LM_ERROR,
00898                ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00899                ACE_TEXT(" should NOT be called by the connector side \n")));
00900   }
00901 
00902   this->reconnect_task_.close(1);
00903   connection->receive_strategy_ = this->receive_strategy_;
00904   connection->send_strategy_ = this->send_strategy_;
00905   connection->remote_address_ = this->remote_address_;
00906   connection->local_address_ = this->local_address_;
00907   connection->tcp_config_ = this->tcp_config_;
00908   connection->link_ = this->link_;
00909 
00910   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00911         "transfer(%C:%d->%C:%d) passive reconnected. new con %@   "
00912         " old con %@ \n",
00913         this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00914         this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00915         connection, this));
00916 
00917   if (notify_reconnect) {
00918     this->reconnect_state_ = RECONNECTED_STATE;
00919     this->link_->notify(DataLink::RECONNECTED);
00920   }
00921 
00922 }

ACE_INLINE OpenDDS::DCPS::Priority OpenDDS::DCPS::TcpConnection::transport_priority (  )  const

Definition at line 74 of file TcpConnection.inl.

References transport_priority_.

00075 {
00076   return this->transport_priority_;
00077 }

ACE_INLINE OpenDDS::DCPS::Priority & OpenDDS::DCPS::TcpConnection::transport_priority (  ) 

Access TRANSPORT_PRIORITY.value policy value if set.

Definition at line 67 of file TcpConnection.inl.

References transport_priority_.

00068 {
00069   return this->transport_priority_;
00070 }


Member Data Documentation

ACE_Atomic_Op<ACE_SYNCH_MUTEX, bool> OpenDDS::DCPS::TcpConnection::connected_ [private]

Flag indicates if connected or disconneted. It's set to true when actively connecting or passively acepting succeeds and set to false whenever the peer stream is closed.

Definition at line 163 of file TcpConnection.h.

Referenced by active_establishment(), active_open(), disconnect(), handle_setup_input(), is_connected(), and passive_reconnect_i().

std::size_t OpenDDS::DCPS::TcpConnection::id_ [private]

Small unique identifying value.

Definition at line 215 of file TcpConnection.h.

Referenced by handle_output(), and id().

bool OpenDDS::DCPS::TcpConnection::is_connector_ [private]

Flag indicate this connection object is the connector or acceptor.

Definition at line 166 of file TcpConnection.h.

Referenced by is_connector(), open(), and transfer().

ACE_Time_Value OpenDDS::DCPS::TcpConnection::last_reconnect_attempted_ [private]

Last time the connection is re-established.

Definition at line 202 of file TcpConnection.h.

Referenced by active_reconnect_i().

TcpDataLink_rch OpenDDS::DCPS::TcpConnection::link_ [private]

Datalink object which is needed for connection lost callback.

Definition at line 184 of file TcpConnection.h.

Referenced by active_establishment(), active_reconnect_i(), handle_close(), handle_timeout(), notify_lost_on_backpressure_timeout(), open(), passive_reconnect_i(), set_datalink(), tear_link(), transfer(), and ~TcpConnection().

ACE_INET_Addr OpenDDS::DCPS::TcpConnection::local_address_ [private]

Local address.

Definition at line 178 of file TcpConnection.h.

Referenced by active_establishment(), active_reconnect_i(), handle_setup_input(), open(), and transfer().

int OpenDDS::DCPS::TcpConnection::passive_reconnect_timer_id_ [private]

The id of the scheduled timer. The timer is scheduled to check if the connection is re-established during the passive_reconnect_duration_. This id controls that the timer is just scheduled once when there are multiple threads detect the lost connection.

Definition at line 190 of file TcpConnection.h.

Referenced by passive_reconnect_i(), and transfer().

bool OpenDDS::DCPS::TcpConnection::passive_setup_ [private]

Definition at line 210 of file TcpConnection.h.

Referenced by handle_input(), handle_setup_input(), and open().

ACE_Message_Block OpenDDS::DCPS::TcpConnection::passive_setup_buffer_ [private]

Definition at line 211 of file TcpConnection.h.

Referenced by handle_setup_input(), and open().

TcpReceiveStrategy_rch OpenDDS::DCPS::TcpConnection::receive_strategy_ [private]

Reference to the receiving strategy.

Definition at line 169 of file TcpConnection.h.

Referenced by handle_close(), handle_input(), remove_receive_strategy(), and transfer().

LockType OpenDDS::DCPS::TcpConnection::reconnect_lock_ [private]

Lock to avoid the reconnect() called multiple times when both send() and recv() fail.

Definition at line 158 of file TcpConnection.h.

Referenced by active_open().

ReconnectState OpenDDS::DCPS::TcpConnection::reconnect_state_ [private]

The state indicates each step of the reconnecting.

Definition at line 199 of file TcpConnection.h.

Referenced by active_reconnect_i(), handle_setup_input(), handle_timeout(), notify_lost_on_backpressure_timeout(), passive_reconnect_i(), and transfer().

TcpReconnectTask OpenDDS::DCPS::TcpConnection::reconnect_task_ [private]

The task to do the reconnecting. TODO: We might need reuse the PerConnectionSynch thread to do the reconnecting or create the reconnect task when we need reconnect.

Definition at line 196 of file TcpConnection.h.

Referenced by handle_close(), relink_from_send(), shutdown(), transfer(), and ~TcpConnection().

ACE_INET_Addr OpenDDS::DCPS::TcpConnection::remote_address_ [private]

Remote address.

Definition at line 175 of file TcpConnection.h.

Referenced by active_establishment(), active_reconnect_i(), get_remote_address(), handle_close(), handle_setup_input(), handle_timeout(), open(), reconnect(), and transfer().

TcpSendStrategy_rch OpenDDS::DCPS::TcpConnection::send_strategy_ [private]

Reference to the send strategy.

Definition at line 172 of file TcpConnection.h.

Referenced by active_reconnect_i(), handle_close(), handle_output(), notify_lost_on_backpressure_timeout(), remove_send_strategy(), and transfer().

bool OpenDDS::DCPS::TcpConnection::shutdown_ [private]

shutdown flag

Definition at line 208 of file TcpConnection.h.

Referenced by shutdown().

TcpInst_rch OpenDDS::DCPS::TcpConnection::tcp_config_ [private]

The configuration used by this connection.

Definition at line 181 of file TcpConnection.h.

Referenced by active_establishment(), active_reconnect_i(), open(), and transfer().

TcpTransport_rch OpenDDS::DCPS::TcpConnection::transport_during_setup_ [private]

Definition at line 212 of file TcpConnection.h.

Referenced by handle_setup_input(), and open().

Priority OpenDDS::DCPS::TcpConnection::transport_priority_ [private]

TRANSPORT_PRIORITY.value policy value.

Definition at line 205 of file TcpConnection.h.

Referenced by handle_setup_input(), open(), and transport_priority().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:38 2016 for OpenDDS by  doxygen 1.4.7