OpenDDS  Snapshot(2023/04/07-19:43)
Public Types | Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::TcpConnection Class Reference

#include <TcpConnection.h>

Inheritance diagram for OpenDDS::DCPS::TcpConnection:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpConnection:
Collaboration graph
[legend]

Public Types

enum  ReconnectState {
  INIT_STATE, LOST_STATE, RECONNECTED_STATE, ACTIVE_RECONNECTING_STATE,
  ACTIVE_WAITING_STATE, PASSIVE_WAITING_STATE, PASSIVE_TIMEOUT_CALLED_STATE
}
 States are used during reconnecting. More...
 
- Public Types inherited from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >
typedef ACE_SOCK_STREAM ::PEER_ADDR addr_type
 
typedef ACE_SOCK_STREAM stream_type
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 

Public Member Functions

 TcpConnection ()
 Passive side constructor (acceptor) More...
 
 TcpConnection (const ACE_INET_Addr &remote_address, Priority priority, const TcpInst_rch &config)
 Active side constructor (connector) More...
 
virtual ~TcpConnection ()
 
std::size_t & id ()
 
int active_open ()
 
int active_reconnect_open ()
 
int passive_open (void *)
 
void disconnect ()
 
virtual int open (void *arg)
 
TcpSendStrategy_rch send_strategy ()
 
TcpReceiveStrategy_rch receive_strategy ()
 
virtual int handle_input (ACE_HANDLE)
 We pass this "event" along to the receive_strategy. More...
 
virtual int handle_output (ACE_HANDLE)
 Handle back pressure when sending. More...
 
virtual int close (u_long)
 
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask)
 
void set_sock_options (const TcpInst_rch &tcp_config)
 
bool is_connector () const
 
void transfer (TcpConnection *connection)
 
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 
void set_datalink (const TcpDataLink_rch &link)
 
void notify_lost_on_backpressure_timeout ()
 
ACE_INET_Addr get_remote_address ()
 
void relink_from_send (bool do_suspend)
 Reconnect initiated by send strategy. More...
 
void relink_from_recv (bool do_suspend)
 Reconnect initiated by receive strategy. More...
 
void tear_link ()
 
void shutdown ()
 
TcpTransport_rch impl ()
 
Prioritytransport_priority ()
 Access TRANSPORT_PRIORITY.value policy value if set. More...
 
Priority transport_priority () const
 
virtual ACE_Event_Handler::Reference_Count add_reference ()
 
virtual ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >
 ACE_Svc_Handler (ACE_Thread_Manager *thr_mgr=0, ACE_Message_Queue< ACE_NULL_SYNCH > *mq=0, ACE_Reactor *reactor=ACE_Reactor::instance())
 
virtual ~ACE_Svc_Handler (void)
 
virtual int idle (u_long flags=0)
 
virtual ACE_Recyclable_State recycle_state (void) const
 
virtual int recycle_state (ACE_Recyclable_State new_state)
 
virtual void cleanup_hint (void **act_holder=0)
 
virtual int init (int argc, ACE_TCHAR *argv[])
 
virtual int fini (void)
 
virtual int info (ACE_TCHAR **info_string, size_t length) const
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
ACE_SOCK_STREAMpeer (void) const
 
void * operator new (size_t n)
 
void * operator new (size_t n, void *p)
 
virtual void destroy (void)
 
void operator delete (void *)
 
void operator delete (void *, void *)
 
void shutdown (void)
 
void dump (void) const
 
virtual void recycler (ACE_Connection_Recycling_Strategy *recycler, const void *recycling_act)
 
virtual ACE_Connection_Recycling_Strategyrecycler (void) const
 
virtual const void * recycling_act (void) const
 
virtual int recycle (void *=0)
 
- Public Member Functions inherited from ACE_Task< ACE_NULL_SYNCH >
 ACE_Task (ACE_Thread_Manager *thr_mgr=0, ACE_Message_Queue< ACE_SYNCH_USE, ACE_System_Time_Policy > *mq=0)
 
virtual ~ACE_Task (void)
 
ACE_Message_Queue< ACE_SYNCH_USE, ACE_System_Time_Policy > * msg_queue (void)
 
void msg_queue (ACE_Message_Queue< ACE_SYNCH_USE, ACE_System_Time_Policy > *)
 
ACE_Time_Value_T< ACE_System_Time_Policygettimeofday (void) const
 
void set_time_policy (ACE_System_Time_Policy const &time_policy)
 
int putq (ACE_Message_Block *, ACE_Time_Value *timeout=0)
 
int getq (ACE_Message_Block *&mb, ACE_Time_Value *timeout=0)
 
int ungetq (ACE_Message_Block *, ACE_Time_Value *timeout=0)
 
int reply (ACE_Message_Block *mb, ACE_Time_Value *tv=0)
 
int put_next (ACE_Message_Block *msg, ACE_Time_Value *timeout=0)
 
const ACE_TCHARname (void) const
 
ACE_Task< ACE_SYNCH_USE, ACE_System_Time_Policy > * next (void)
 
void next (ACE_Task< ACE_SYNCH_USE, ACE_System_Time_Policy > *)
 
ACE_Task< ACE_SYNCH_USE, ACE_System_Time_Policy > * sibling (void)
 
ACE_Module< ACE_SYNCH_USE, ACE_System_Time_Policy > * module (void) const
 
int flush (u_long flag=ACE_Task_Flags::ACE_FLUSHALL)
 
void water_marks (ACE_IO_Cntl_Msg::ACE_IO_Cntl_Cmds, size_t)
 
void dump (void) const
 
- Public Member Functions inherited from ACE_Task_Base
 ACE_Task_Base (ACE_Thread_Manager *=0)
 
virtual ~ACE_Task_Base (void)
 
virtual int module_closed (void)
 
virtual int put (ACE_Message_Block *, ACE_Time_Value *=0)
 
virtual int svc (void)
 
virtual int activate (long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
 
virtual int wait (void)
 
virtual int suspend (void)
 
virtual int resume (void)
 
int grp_id (void) const
 
void grp_id (int)
 
ACE_Thread_Managerthr_mgr (void) const
 
void thr_mgr (ACE_Thread_Manager *)
 
int is_reader (void) const
 
int is_writer (void) const
 
size_t thr_count (void) const
 
ACE_thread_t last_thread (void) const
 
- Public Member Functions inherited from ACE_Service_Object
 ACE_Service_Object (ACE_Reactor *=0)
 
virtual ~ACE_Service_Object (void)
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from ACE_Shared_Object
 ACE_Shared_Object (void)
 
virtual ~ACE_Shared_Object (void)
 

Private Types

typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 

Private Member Functions

int on_active_connection_established ()
 Handle the logic after an active connection has been established. More...
 
void active_reconnect_i ()
 
void passive_reconnect_i ()
 
void notify_connection_lost ()
 
void handle_stop_reconnecting ()
 
int handle_setup_input (ACE_HANDLE h)
 
const std::string & config_name () const
 
const char * reconnect_state_string () const
 Get name of the current reconnect state as a string. More...
 

Private Attributes

LockType reconnect_lock_
 Lock to synchronize state between reactor and non-reactor threads. More...
 
bool is_connector_
 Flag indicate this connection object is the connector or acceptor. More...
 
ACE_INET_Addr remote_address_
 Remote address. More...
 
ACE_INET_Addr local_address_
 Local address. More...
 
WeakRcHandle< TcpInsttcp_config_
 The configuration used by this connection. More...
 
TcpDataLink_rch link_
 Datalink object which is needed for connection lost callback. More...
 
TcpTransport_rch impl_
 Impl object which is needed for connection objects and reconnect task. More...
 
ReconnectState reconnect_state_
 The state indicates each step of the reconnecting. More...
 
Priority transport_priority_
 TRANSPORT_PRIORITY.value policy value. More...
 
bool shutdown_
 shutdown flag More...
 
bool passive_setup_
 
ACE_Message_Block passive_setup_buffer_
 
TcpTransporttransport_during_setup_
 
std::size_t id_
 Small unique identifying value. More...
 
int conn_retry_counter_
 

Additional Inherited Members

- Static Public Member Functions inherited from ACE_Task_Base
static ACE_THR_FUNC_RETURN svc_run (void *)
 
static void cleanup (void *object, void *params)
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Task< ACE_NULL_SYNCH >
ACE_Message_Queue< ACE_SYNCH_USE, ACE_System_Time_Policy > * msg_queue_
 
bool delete_msg_queue_
 
ACE_Module< ACE_SYNCH_USE, ACE_System_Time_Policy > * mod_
 
ACE_Task< ACE_SYNCH_USE, ACE_System_Time_Policy > * next_
 
 ACE_ALLOC_HOOK_DECLARE
 
- Public Attributes inherited from ACE_Service_Object
 ACE_ALLOC_HOOK_DECLARE
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >
ACE_SOCK_STREAM peer_
 
bool dynamic_
 
bool closing_
 
ACE_Connection_Recycling_Strategyrecycler_
 
const void * recycling_act_
 
- Protected Attributes inherited from ACE_Task_Base
size_t thr_count_
 
ACE_Thread_Managerthr_mgr_
 
u_long flags_
 
int grp_id_
 
ACE_thread_t last_thread_id_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 37 of file TcpConnection.h.

Member Typedef Documentation

◆ GuardType

Definition at line 156 of file TcpConnection.h.

◆ LockType

Definition at line 155 of file TcpConnection.h.

Member Enumeration Documentation

◆ ReconnectState

Constructor & Destructor Documentation

◆ TcpConnection() [1/2]

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL OpenDDS::DCPS::TcpConnection::TcpConnection ( )

Passive side constructor (acceptor)

Definition at line 35 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::reference_counting_policy(), and ACE_Event_Handler::Reference_Counting_Policy::value().

36  : is_connector_(false)
37  , tcp_config_()
39  , transport_priority_(0) // TRANSPORT_PRIORITY.value default value - 0.
40  , shutdown_(false)
41  , passive_setup_(false)
42  , passive_setup_buffer_(sizeof(ACE_UINT32))
44  , id_(0)
46 {
47  DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
49 }
Reference_Counting_Policy & reference_counting_policy(void)
TcpTransport * transport_during_setup_
bool shutdown_
shutdown flag
std::size_t id_
Small unique identifying value.
ACE_Message_Block passive_setup_buffer_
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.
bool is_connector_
Flag indicate this connection object is the connector or acceptor.

◆ TcpConnection() [2/2]

OpenDDS::DCPS::TcpConnection::TcpConnection ( const ACE_INET_Addr remote_address,
Priority  priority,
const TcpInst_rch config 
)

Active side constructor (connector)

Definition at line 51 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, ACE_Event_Handler::Reference_Counting_Policy::ENABLED, ACE_Event_Handler::reference_counting_policy(), and ACE_Event_Handler::Reference_Counting_Policy::value().

54  : is_connector_(true)
55  , remote_address_(remote_address)
56  , local_address_(config->local_address())
57  , tcp_config_(config)
60  , shutdown_(false)
61  , passive_setup_(false)
63  , id_(0)
65 {
66  DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
68 }
Reference_Counting_Policy & reference_counting_policy(void)
TcpTransport * transport_during_setup_
bool shutdown_
shutdown flag
std::size_t id_
Small unique identifying value.
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.
virtual int priority(void) const
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
ACE_INET_Addr remote_address_
Remote address.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.
bool is_connector_
Flag indicate this connection object is the connector or acceptor.
ACE_INET_Addr local_address_
Local address.

◆ ~TcpConnection()

OpenDDS::DCPS::TcpConnection::~TcpConnection ( )
virtual

Definition at line 70 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, and shutdown().

71 {
72  DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6);
73  shutdown();
74 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ active_open()

int OpenDDS::DCPS::TcpConnection::active_open ( )

Protocol setup (handshake) on the active side. The local address is sent to the remote (passive) side to identify ourselves to the remote side.

Definition at line 131 of file TcpConnection.cpp.

References OpenDDS::DCPS::TcpTransport::async_connect_failed(), OpenDDS::DCPS::TcpTransport::connect_tcp_datalink(), OpenDDS::DCPS::dynamic_rchandle_cast(), OpenDDS::DCPS::DataLink::impl(), link_, LM_DEBUG, local_address_, on_active_connection_established(), OpenDDS::DCPS::rchandle_from(), remote_address_, transport_priority_, VDBG, and VDBG_LVL.

Referenced by open().

132 {
133  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::active_open.\n"), 2);
134  VDBG((LM_DEBUG, "(%P|%t) DBG: active_open(%C->%C)\n",
135  LogAddr(local_address_).c_str(), LogAddr(remote_address_).c_str()));
136 
137  RcHandle<TcpTransport> transport = dynamic_rchandle_cast<TcpTransport>(link_->impl());
138 
139  if (transport) {
140  if (on_active_connection_established() != -1 && transport->connect_tcp_datalink(*link_, rchandle_from(this)) != -1) {
141  return 0;
142  }
143 
144  const bool is_loop(local_address_ == remote_address_);
145  const PriorityKey key(transport_priority_, remote_address_,
146  is_loop, true /* active */);
147  transport->async_connect_failed(key);
148  }
149  return -1;
150 }
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
sequence< octet > key
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
int on_active_connection_established()
Handle the logic after an active connection has been established.
#define VDBG(DBG_ARGS)
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
ACE_INET_Addr remote_address_
Remote address.
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define VDBG_LVL(DBG_ARGS, LEVEL)
ACE_INET_Addr local_address_
Local address.

◆ active_reconnect_i()

void OpenDDS::DCPS::TcpConnection::active_reconnect_i ( )
private

Definition at line 597 of file TcpConnection.cpp.

References ACE_DEBUG, ACE_ERROR, ACTIVE_RECONNECTING_STATE, ACTIVE_WAITING_STATE, ACE_Synch_Options::asynch, conn_retry_counter_, ACE_Connector< class, class >::connect(), OpenDDS::DCPS::TcpTransport::connector_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataLink::DISCONNECTED, OpenDDS::DCPS::dynamic_rchandle_cast(), EALREADY, EWOULDBLOCK, handle_stop_reconnecting(), OpenDDS::DCPS::DataLink::impl(), OpenDDS::DCPS::TcpDataLink::is_release_pending(), link_, LM_DEBUG, LM_ERROR, local_address_, ACE_Time_Value::msec(), OpenDDS::DCPS::DataLink::notify(), ACE_Event_Handler::reactor(), reconnect_lock_, reconnect_state_, reconnect_state_string(), remote_address_, ACE_Reactor::schedule_timer(), ACE_Time_Value::sec(), shutdown_, and tcp_config_.

Referenced by handle_close(), handle_timeout(), and impl().

598 {
599  DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6);
600 
601  if (this->link_->is_release_pending()) {
602  return;
603  }
604 
605  if (this->shutdown_) {
606  return;
607  }
608 
609  TcpInst_rch cfg = tcp_config_.lock();
610  if (!cfg) {
611  return;
612  }
613 
614  if (this->conn_retry_counter_ < cfg->conn_retry_attempts_ ) {
616  if (this->conn_retry_counter_ == 0)
618 
619  double retry_delay_msec = cfg->conn_retry_initial_delay_;
620  retry_delay_msec *= std::pow(cfg->conn_retry_backoff_multiplier_, this->conn_retry_counter_);
621 
622  if (DCPS_debug_level >= 1) {
623  ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::"
624  "active_reconnect_i(%C->%C) reconnect_state = %C, conn_retry_counter_=%d, retry_delay_msec=%f\n",
625  LogAddr(remote_address_).c_str(), LogAddr(local_address_).c_str(),
626  reconnect_state_string(), this->conn_retry_counter_, retry_delay_msec));
627  }
628 
629  ACE_Time_Value timeout;
630  timeout.msec(static_cast<int>(retry_delay_msec));
631 
632  TcpConnection* pconn = this;
633  int ret = -1;
634  errno = ENODEV;
635  {
636  RcHandle<TcpTransport> transport = dynamic_rchandle_cast<TcpTransport>(link_->impl());
637  if (transport) {
639  ACE_Guard<ACE_Reverse_Lock<LockType> > guard(rev_lock);
640  // We need to temporarily release the lock here because the connect could occasionally be synchronous
641  // if the source and destination are on the same host. When the call become synchronous, active_reconnect_open()
642  // would be called and try to acquired the lock in the same thread.
643  ret = transport->connector_.connect(pconn, this->remote_address_, ACE_Synch_Options::asynch);
644  }
645  }
646 
647  if (ret == -1 && errno != EWOULDBLOCK)
648  {
649  if (errno == EALREADY) {
650  // This could happen on Windows, it may due to the close() on non-blocking socket needs more time to complete.
651  // In this case, we just wait another second to initiate the connect again without incrementing the conn_retry_counter_.
652  timeout.sec(1);
654  if (DCPS_debug_level >= 1) {
655  ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::"
656  "active_reconnect_i() socket operation is already in progress, wait another second to initiate the connect\n"));
657  }
658  } else {
659  ACE_ERROR((LM_ERROR, "(%P|%t) TcpConnection::active_reconnect_i error %m.\n"));
660  }
662  }
663 
664  this->reactor()->schedule_timer(this, 0, timeout);
665  this->conn_retry_counter_ ++;
666  } else {
667  this->handle_stop_reconnecting();
668  }
669 }
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
void notify(ConnectionNotice notice)
Definition: DataLink.cpp:848
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
static ACE_Synch_Options asynch
bool shutdown_
shutdown flag
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
TcpConnection()
Passive side constructor (acceptor)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
time_t sec(void) const
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
virtual ACE_Reactor * reactor(void) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
ACE_INET_Addr remote_address_
Remote address.
unsigned long msec(void) const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
bool is_release_pending() const
Get release pending flag.
const char * reconnect_state_string() const
Get name of the current reconnect state as a string.
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.
ACE_INET_Addr local_address_
Local address.

◆ active_reconnect_open()

int OpenDDS::DCPS::TcpConnection::active_reconnect_open ( )

Definition at line 964 of file TcpConnection.cpp.

References ACE_DEBUG, ACE_ERROR_RETURN, ACE_TEXT(), config_name(), conn_retry_counter_, DBG_ENTRY_LVL, link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::DataLink::notify(), on_active_connection_established(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, reconnect_lock_, reconnect_state_, OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, ACE_Reactor::register_handler(), remote_address_, send_strategy(), and shutdown_.

Referenced by open().

965 {
966  DBG_ENTRY_LVL("TcpConnection","active_reconnect_open",6);
967 
968  ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C.\n",
969  config_name().c_str(), LogAddr(remote_address_).c_str()));
970 
971  GuardType guard(reconnect_lock_);
972 
973  if (shutdown_) {
974  return 0;
975  }
976 
977  if (on_active_connection_established() == -1) {
978  return -1;
979  }
980 
981  int result = reactor()->register_handler(this, READ_MASK);
982  if (result == -1) {
983  ACE_ERROR_RETURN((LM_ERROR,
984  "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_open() can't register "
985  "with reactor %X %p\n", this, ACE_TEXT("register_handler")),
986  -1);
987  }
988 
991  this->send_strategy()->resume_send();
992  this->conn_retry_counter_ = 0;
993 
994  return 0;
995 }
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
#define ACE_DEBUG(X)
void notify(ConnectionNotice notice)
Definition: DataLink.cpp:848
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
bool shutdown_
shutdown flag
ACE_Guard< LockType > GuardType
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
const std::string & config_name() const
int on_active_connection_established()
Handle the logic after an active connection has been established.
virtual ACE_Reactor * reactor(void) const
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
ACE_TEXT("TCP_Factory")
TcpSendStrategy_rch send_strategy()
ACE_INET_Addr remote_address_
Remote address.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define ACE_ERROR_RETURN(X, Y)

◆ add_reference()

ACE_Event_Handler::Reference_Count OpenDDS::DCPS::TcpConnection::add_reference ( void  )
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 924 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject::_add_ref().

Referenced by impl().

925 {
927  return 1;
928 }
virtual void _add_ref()
Definition: RcObject.h:69

◆ close()

int OpenDDS::DCPS::TcpConnection::close ( u_long  )
virtual

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 339 of file TcpConnection.cpp.

References ACE_DEBUG, ACTIVE_RECONNECTING_STATE, ACTIVE_WAITING_STATE, ACE_Connector< class, class >::close(), config_name(), conn_retry_counter_, OpenDDS::DCPS::TcpTransport::connector_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::dynamic_rchandle_cast(), handle_stop_reconnecting(), OpenDDS::DCPS::DataLink::impl(), link_, LM_DEBUG, reconnect_state_, reconnect_state_string(), remote_address_, send_strategy(), and tcp_config_.

340 {
341  DBG_ENTRY_LVL("TcpConnection","close",6);
342 
343  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::close, reconnect_state_=%C\n", reconnect_state_string()));
344 
345  TcpInst_rch cfg = tcp_config_.lock();
347  // This would be called when using ACE_Connector to initiate an async connect and
348  // the network stack detects the destination is unreachable before timeout.
349  if (DCPS_debug_level >= 1) {
350  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection()::close() on transport: %C to %C because of reconnect failure.\n",
351  config_name().c_str(), LogAddr(remote_address_).c_str()));
352  }
353 
354  if (conn_retry_counter_ >= cfg->conn_retry_attempts_) {
356  } else {
357  TcpTransport_rch transport = dynamic_rchandle_cast<TcpTransport>(link_->impl());
358  if (transport) {
359  transport->connector_.close();
361  }
362  }
363  } else {
365  if (ss) {
366  ss->terminate_send();
367  }
368 
369  disconnect();
370  }
371 
372  return 0;
373 }
#define ACE_DEBUG(X)
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
const std::string & config_name() const
RcHandle< TcpTransport > TcpTransport_rch
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
TcpSendStrategy_rch send_strategy()
ACE_INET_Addr remote_address_
Remote address.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const char * reconnect_state_string() const
Get name of the current reconnect state as a string.
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.
RcHandle< TcpSendStrategy > TcpSendStrategy_rch

◆ config_name()

const std::string & OpenDDS::DCPS::TcpConnection::config_name ( ) const
private

Definition at line 376 of file TcpConnection.cpp.

References tcp_config_.

Referenced by active_reconnect_open(), close(), handle_close(), handle_stop_reconnecting(), handle_timeout(), and impl().

377 {
378  static const std::string null_name("(couldn't get name)");
379  TcpInst_rch cfg = tcp_config_.lock();
380  return cfg ? cfg->name() : null_name;
381 }
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.

◆ disconnect()

void OpenDDS::DCPS::TcpConnection::disconnect ( void  )

This will be called by the DataLink (that "owns" us) when the TcpTransport has been told to shutdown(), or when the DataLink finds itself no longer needed, and is "self-releasing".

Definition at line 105 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks(), link_, and ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer().

Referenced by close(), handle_close(), and notify_lost_on_backpressure_timeout().

106 {
107  DBG_ENTRY_LVL("TcpConnection","disconnect",6);
108 
109  if (this->link_) {
111  }
112 
113  this->peer().close();
114 }
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ get_remote_address()

ACE_INLINE ACE_INET_Addr OpenDDS::DCPS::TcpConnection::get_remote_address ( )

Definition at line 35 of file TcpConnection.inl.

References ACE_INLINE, and remote_address_.

36 {
37  return this->remote_address_;
38 }
ACE_INET_Addr remote_address_
Remote address.

◆ handle_close()

int OpenDDS::DCPS::TcpConnection::handle_close ( ACE_HANDLE  ,
ACE_Reactor_Mask   
)
virtual

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 384 of file TcpConnection.cpp.

References ACE_DEBUG, active_reconnect_i(), config_name(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, disconnect(), OpenDDS::DCPS::DataLink::DISCONNECTED, is_connector_, link_, LM_DEBUG, OpenDDS::DCPS::DataLink::notify(), passive_reconnect_i(), OpenDDS::DCPS::TcpDataLink::receive_strategy(), receive_strategy(), reconnect_lock_, reconnect_state_string(), remote_address_, OpenDDS::DCPS::TcpDataLink::send_strategy(), send_strategy(), and TheServiceParticipant.

385 {
386  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
387 
388  DBG_ENTRY_LVL("TcpConnection","handle_close",6);
389 
390  if (DCPS_debug_level >= 1) {
391  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C , reconnect_state = %C.\n",
392  config_name().c_str(), LogAddr(remote_address_).c_str(), reconnect_state_string()));
393  }
394 
395  GuardType guard(reconnect_lock_);
396  TcpDataLink_rch link = link_;
397 
398  if (!link) {
399  if (DCPS_debug_level >= 1) {
400  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() link is null.\n"));
401  }
402  return 0;
403  }
404 
405  TcpReceiveStrategy_rch receive_strategy = link->receive_strategy();
406  TcpSendStrategy_rch send_strategy = link->send_strategy();
407 
408  const bool graceful = receive_strategy && receive_strategy->gracefully_disconnected();
409 
410  if (send_strategy) {
411  if (graceful) {
412  send_strategy->terminate_send();
413  } else {
414  send_strategy->suspend_send();
415  }
416  }
417 
418  this->disconnect();
419 
420  if (graceful) {
421  link->notify(DataLink::DISCONNECTED);
422  } else if (this->is_connector_) {
423  this->active_reconnect_i();
424  } else {
425  this->passive_reconnect_i();
426  }
427 
428  return 0;
429 }
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
#define ACE_DEBUG(X)
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
TcpReceiveStrategy_rch receive_strategy()
ACE_Guard< LockType > GuardType
const std::string & config_name() const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
RcHandle< TcpDataLink > TcpDataLink_rch
TcpSendStrategy_rch send_strategy()
ACE_INET_Addr remote_address_
Remote address.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const char * reconnect_state_string() const
Get name of the current reconnect state as a string.
#define TheServiceParticipant
RcHandle< TcpReceiveStrategy > TcpReceiveStrategy_rch
RcHandle< TcpSendStrategy > TcpSendStrategy_rch
bool is_connector_
Flag indicate this connection object is the connector or acceptor.

◆ handle_input()

int OpenDDS::DCPS::TcpConnection::handle_input ( ACE_HANDLE  fd)
virtual

We pass this "event" along to the receive_strategy.

Reimplemented from ACE_Event_Handler.

Definition at line 291 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, handle_setup_input(), passive_setup_, receive_strategy(), and TheServiceParticipant.

292 {
293  DBG_ENTRY_LVL("TcpConnection","handle_input",6);
294 
295  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
296 
297  if (passive_setup_) {
298  return handle_setup_input(fd);
299  }
301  if (!receive_strategy) {
302  return 0;
303  }
304 
305  return receive_strategy->handle_dds_input(fd);
306 }
TcpReceiveStrategy_rch receive_strategy()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
int handle_setup_input(ACE_HANDLE h)
#define TheServiceParticipant
RcHandle< TcpReceiveStrategy > TcpReceiveStrategy_rch

◆ handle_output()

int OpenDDS::DCPS::TcpConnection::handle_output ( ACE_HANDLE  )
virtual

Handle back pressure when sending.

Reimplemented from ACE_Event_Handler.

Definition at line 309 of file TcpConnection.cpp.

References ACE_DEBUG, ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, id_, LM_DEBUG, send_strategy(), TheServiceParticipant, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO.

310 {
311  DBG_ENTRY_LVL("TcpConnection","handle_output",6);
312 
313  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
314 
316  if (send_strategy) {
317  if (DCPS_debug_level > 9) {
318  ACE_DEBUG((LM_DEBUG,
319  ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ")
320  ACE_TEXT("sending queued data.\n"),
321  id_));
322  }
323 
324  // Process data to be sent from the queue.
326  != send_strategy->perform_work()) {
327 
328  // Stop handling output ready events when there is nothing to output.
329  // N.B. This calls back into the reactor. Is the reactor lock
330  // recursive?
331  send_strategy->schedule_output();
332  }
333  }
334 
335  return 0;
336 }
#define ACE_DEBUG(X)
std::size_t id_
Small unique identifying value.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
TcpSendStrategy_rch send_strategy()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define TheServiceParticipant
RcHandle< TcpSendStrategy > TcpSendStrategy_rch

◆ handle_setup_input()

int OpenDDS::DCPS::TcpConnection::handle_setup_input ( ACE_HANDLE  h)
private

During the connection setup phase, the passive side sets passive_setup_, redirecting handle_input() events here (there is no recv strategy yet).

Definition at line 221 of file TcpConnection.cpp.

References ACE_Message_Block::base(), OpenDDS::DCPS::DCPS_debug_level, ACE_Event_Handler::DONT_CALL, ETIME, ACE_Message_Block::length(), LM_DEBUG, local_address_, ACE_OS::memcpy(), OpenDDS::DCPS::TcpTransport::passive_connection(), passive_setup_, passive_setup_buffer_, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), OpenDDS::DCPS::rchandle_from(), ACE_Message_Block::rd_ptr(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, reconnect_state_string(), remote_address_, ACE_Message_Block::reset(), ACE_Message_Block::size(), ACE_Message_Block::space(), transport_during_setup_, transport_priority_, VDBG, VDBG_LVL, ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.

Referenced by handle_input(), and impl().

222 {
223  const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(),
226 
227  if (ret < 0 && errno == ETIME) {
228  return 0;
229  }
230 
231  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input %@ "
232  "recv returned %b %m.\n", this, ret), 4);
233 
234  if (ret <= 0) {
235  return -1;
236  }
237 
239  // Parse the setup message: <len><addr><prio>
240  // len and prio are network order 32-bit ints
241  // addr is a string of length len, including null
242  ACE_UINT32 nlen = 0;
243 
244  if (passive_setup_buffer_.length() >= sizeof(nlen)) {
245 
246  ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen));
247  passive_setup_buffer_.rd_ptr(sizeof(nlen));
248  const ACE_UINT32 hlen = ntohl(nlen);
249  passive_setup_buffer_.size(hlen + 2 * sizeof(nlen));
250 
251  ACE_UINT32 nprio = 0;
252 
253  if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) {
254 
255  const std::string bufstr(passive_setup_buffer_.rd_ptr());
256  const NetworkResource network_resource(bufstr);
257  network_resource.to_addr(remote_address_);
258 
259  ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio));
260  transport_priority_ = ntohl(nprio);
261 
263  passive_setup_ = false;
264 
265  VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input "
266  "%@ %C->%C, priority==%d, reconnect_state = %C\n", this,
267  LogAddr(remote_address_).c_str(), LogAddr(local_address_).c_str(),
269  if (DCPS_debug_level > 9) {
270  network_resource.dump();
271  }
272 
273  // remove from reactor, normal recv strategy setup will add us back
274  if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) {
275  VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input "
276  "remove_handler failed %m.\n"));
277  }
278 
280 
281  return 0;
282  }
283  }
284 
286 
287  return 0;
288 }
size_t length(void) const
void reset(void)
void * memcpy(void *t, const void *s, size_t len)
TcpTransport * transport_during_setup_
void passive_connection(const ACE_INET_Addr &remote_address, const TcpConnection_rch &connection)
int ssize_t
char * rd_ptr(void) const
ACE_Message_Block passive_setup_buffer_
#define VDBG(DBG_ARGS)
size_t size(void) const
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.
char * wr_ptr(void) const
virtual ACE_Reactor * reactor(void) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
size_t space(void) const
ACE_INET_Addr remote_address_
Remote address.
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
const char * reconnect_state_string() const
Get name of the current reconnect state as a string.
static const ACE_Time_Value zero
#define VDBG_LVL(DBG_ARGS, LEVEL)
char * base(void) const
ACE_INET_Addr local_address_
Local address.

◆ handle_stop_reconnecting()

void OpenDDS::DCPS::TcpConnection::handle_stop_reconnecting ( )
private

Definition at line 685 of file TcpConnection.cpp.

References ACE_DEBUG, config_name(), LM_DEBUG, LOST_STATE, notify_connection_lost(), reconnect_state_, remote_address_, and tcp_config_.

Referenced by active_reconnect_i(), close(), and impl().

686 {
689  TcpInst_rch cfg = tcp_config_.lock();
690  if (cfg && cfg->conn_retry_attempts_ > 0) {
691  ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C.\n",
692  config_name().c_str(), LogAddr(remote_address_).c_str()));
693  } else {
694  ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C.\n",
695  config_name().c_str(), LogAddr(remote_address_).c_str()));
696  }
697 }
#define ACE_DEBUG(X)
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
const std::string & config_name() const
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
ACE_INET_Addr remote_address_
Remote address.
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.

◆ handle_timeout()

int OpenDDS::DCPS::TcpConnection::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
)
virtual

A timer is scheduled on acceptor side to check if a new connection is accepted after the connection is lost.

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 702 of file TcpConnection.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), active_reconnect_i(), ACTIVE_RECONNECTING_STATE, ACTIVE_WAITING_STATE, OpenDDS::DCPS::TcpTransport::async_connect_failed(), ACE_Connector< class, class >::cancel(), config_name(), OpenDDS::DCPS::TcpTransport::connector_, DBG_ENTRY_LVL, OpenDDS::DCPS::dynamic_rchandle_cast(), OpenDDS::DCPS::DataLink::impl(), INIT_STATE, link_, LM_DEBUG, LM_ERROR, local_address_, LOST_STATE, notify_connection_lost(), PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_lock_, reconnect_state_, reconnect_state_string(), RECONNECTED_STATE, remote_address_, tear_link(), TheServiceParticipant, and transport_priority_.

704 {
705  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
706 
707  DBG_ENTRY_LVL("TcpConnection","handle_timeout",6);
708  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, this->reconnect_state_ = %C\n", reconnect_state_string()));
709  GuardType guard(this->reconnect_lock_);
710 
711  switch (this->reconnect_state_) {
712  case PASSIVE_WAITING_STATE: {
713  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C.\n",
714  config_name().c_str(), LogAddr(remote_address_).c_str()));
715 
717  // We stay in PASSIVE_TIMEOUT_CALLED_STATE indicates there is no new connection.
718  // Now we need declare the connection is lost.
719  this->notify_connection_lost();
721  this->tear_link();
722  }
723  break;
724 
725  case RECONNECTED_STATE:
726  // reconnected successfully.
727  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C.\n",
728  config_name().c_str(), LogAddr(remote_address_).c_str()));
729  break;
730 
731  case INIT_STATE: {
732  // couldn't initialize connection successfully.
733  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, failed connection initialization due to timeout.: %C to %C.\n",
734  config_name().c_str(), LogAddr(remote_address_).c_str()));
735 
736  // build key and remove from service
737  const bool is_loop(local_address_ == remote_address_);
738  const PriorityKey key(transport_priority_, remote_address_,
739  is_loop, true /* active */);
740 
741  RcHandle<TcpTransport> transport = dynamic_rchandle_cast<TcpTransport>(link_->impl());
742  if (transport) {
743  transport->async_connect_failed(key);
744  }
745  break;
746  }
748  // we get the timeout before the network stack reports the destination is unreachable
749  // cancel the async connect operation and retry it.
750  {
751  RcHandle<TcpTransport> transport = dynamic_rchandle_cast<TcpTransport>(link_->impl());
752  if (transport) {
753  transport->connector_.cancel(this);
754  }
755  }
756  this->active_reconnect_i();
757  break;
758  }
760  this->active_reconnect_i();
761  break;
762  case LOST_STATE:
763  break;
764  default :
765  ACE_ERROR((LM_ERROR,
766  ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ")
767  ACE_TEXT(" unknown state or it should not be in state = %d\n"),
769  break;
770  }
771 
772  return 0;
773 }
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
sequence< octet > key
ACE_Guard< LockType > GuardType
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
const std::string & config_name() const
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
ACE_TEXT("TCP_Factory")
ACE_INET_Addr remote_address_
Remote address.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const char * reconnect_state_string() const
Get name of the current reconnect state as a string.
#define TheServiceParticipant
ACE_INET_Addr local_address_
Local address.

◆ id()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE std::size_t & OpenDDS::DCPS::TcpConnection::id ( void  )

Definition at line 22 of file TcpConnection.inl.

References ACE_INLINE, and id_.

23 {
24  return id_;
25 }
std::size_t id_
Small unique identifying value.

◆ impl()

TcpTransport_rch OpenDDS::DCPS::TcpConnection::impl ( void  )
inline

◆ is_connector()

ACE_INLINE bool OpenDDS::DCPS::TcpConnection::is_connector ( ) const

Return true if the object represents the connector side, otherwise it's the acceptor side. The acceptor/connector role is not changed when re-establishing the connection.

Definition at line 29 of file TcpConnection.inl.

References ACE_INLINE, and is_connector_.

30 {
31  return this->is_connector_;
32 }
bool is_connector_
Flag indicate this connection object is the connector or acceptor.

◆ notify_connection_lost()

void OpenDDS::DCPS::TcpConnection::notify_connection_lost ( )
private

Definition at line 672 of file TcpConnection.cpp.

References OpenDDS::DCPS::TcpDataLink::drop_pending_request_acks(), link_, OpenDDS::DCPS::DataLink::LOST, OpenDDS::DCPS::DataLink::notify(), OpenDDS::DCPS::TcpDataLink::send_strategy(), and send_strategy().

Referenced by handle_stop_reconnecting(), handle_timeout(), impl(), and notify_lost_on_backpressure_timeout().

673 {
674  if (link_) {
678  if (send_strategy) {
679  send_strategy->terminate_send();
680  }
681  }
682 }
void notify(ConnectionNotice notice)
Definition: DataLink.cpp:848
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
TcpSendStrategy_rch send_strategy()
TcpSendStrategy_rch send_strategy()
RcHandle< TcpSendStrategy > TcpSendStrategy_rch

◆ notify_lost_on_backpressure_timeout()

void OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout ( )

This function is called when the backpressure occurs and timed out after "max_output_pause_period". The lost connection notification should be sent and the connection needs be closed since we declared it as a "lost" connection.

Definition at line 857 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, disconnect(), INIT_STATE, LOST_STATE, notify_connection_lost(), reconnect_lock_, and reconnect_state_.

858 {
859  DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6);
860  bool notify_lost = false;
861  {
862  GuardType guard(this->reconnect_lock_);
863 
864  if (this->reconnect_state_ == INIT_STATE) {
866  notify_lost = true;
867 
868  }
869  }
870 
871  if (notify_lost) {
872  this->disconnect();
873  this->notify_connection_lost();
874  }
875 
876 }
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
ACE_Guard< LockType > GuardType
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ on_active_connection_established()

int OpenDDS::DCPS::TcpConnection::on_active_connection_established ( )
private

Handle the logic after an active connection has been established.

Definition at line 482 of file TcpConnection.cpp.

References ACE_DEBUG, ACE_ERROR, OpenDDS::DCPS::DirectPriorityMapper::codepoint(), OpenDDS::DCPS::DCPS_debug_level, link_, LM_DEBUG, LM_NOTICE, LM_WARNING, OpenDDS::DCPS::log_level, OpenDDS::DCPS::LogLevel::Notice, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), send_n(), OpenDDS::DCPS::DataLink::set_dscp_codepoint(), set_sock_options(), tcp_config_, and transport_priority_.

Referenced by active_open(), active_reconnect_open(), and impl().

483 {
484 
485  // Set the DiffServ codepoint according to the priority value.
486  DirectPriorityMapper mapper(this->transport_priority_);
487  this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer());
488 
489  TcpInst_rch cfg = tcp_config_.lock();
490  if (!cfg) {
491  if (log_level >= LogLevel::Notice) {
492  ACE_ERROR((LM_NOTICE, "((%P|%t)) NOTICE: TcpConnection::on_active_connection_established() - Invalid Transport Instance.\n"));
493  }
494  return -1;
495  }
496  set_sock_options(cfg);
497 
498  // In order to complete the connection establishment from the active
499  // side, we need to tell the remote side about our public address.
500  // It will use that as an "identifier" of sorts. To the other
501  // (passive) side, our local_address that we send here will be known
502  // as the remote_address.
503  const std::string address = cfg->get_public_address();
504 
505  if (DCPS_debug_level >= 2) {
506  ACE_DEBUG((LM_DEBUG,
507  "(%P|%t) TcpConnection::on_active_connection_established: "
508  "Sending public address <%C> to remote side\n",
509  address.c_str()));
510  }
511 
512  ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1;
513 
514  ACE_UINT32 nlen = htonl(len);
515 
516  if (this->peer().send_n(&nlen, sizeof(ACE_UINT32)) == -1) {
517  if (DCPS_debug_level >= 2) {
518  ACE_DEBUG((LM_WARNING,
519  "(%P|%t) WARNING: TcpConnection::on_active_connection_established: "
520  "Unable to send address string length to "
521  "the passive side to complete the active connection "
522  "establishment.\n"));
523  }
524  return -1;
525  }
526 
527  if (this->peer().send_n(address.c_str(), len) == -1) {
528  if (DCPS_debug_level >= 2) {
529  ACE_DEBUG((LM_WARNING,
530  "(%P|%t) WARNING: TcpConnection::on_active_connection_established: "
531  "Unable to send our address to "
532  "the passive side to complete the active connection "
533  "establishment.\n"));
534  }
535  return -1;
536  }
537 
538  ACE_UINT32 npriority = htonl(this->transport_priority_);
539 
540  if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) {
541  if (DCPS_debug_level >= 2) {
542  ACE_DEBUG((LM_WARNING,
543  "(%P|%t) WARNING: TcpConnection::on_active_connection_established: "
544  "Unable to send publication priority to "
545  "the passive side to complete the active connection "
546  "establishment.\n"));
547  }
548  return -1;
549  }
550 
551  return 0;
552 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
ssize_t send_n(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0, size_t *bytes_transferred=0)
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
void set_dscp_codepoint(int cp, ACE_SOCK &socket)
Definition: DataLink.cpp:1115
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void set_sock_options(const TcpInst_rch &tcp_config)
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.

◆ open()

int OpenDDS::DCPS::TcpConnection::open ( void *  arg)
virtual

Reimplemented from ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >.

Definition at line 117 of file TcpConnection.cpp.

References active_open(), active_reconnect_open(), ACTIVE_RECONNECTING_STATE, DBG_ENTRY_LVL, is_connector_, passive_open(), and reconnect_state_.

118 {
119  DBG_ENTRY_LVL("TcpConnection","open",6);
120 
121  if (is_connector_) {
123  return active_reconnect_open();
124  }
125  return active_open();
126  }
127  return passive_open(arg);
128 }
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
bool is_connector_
Flag indicate this connection object is the connector or acceptor.

◆ passive_open()

int OpenDDS::DCPS::TcpConnection::passive_open ( void *  arg)

Definition at line 153 of file TcpConnection.cpp.

References ACE_ERROR, ACE_ERROR_RETURN, ACE_TEXT(), ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::get_handle(), LM_DEBUG, LM_ERROR, LM_NOTICE, local_address_, OpenDDS::DCPS::log_level, OpenDDS::DCPS::LogLevel::Notice, passive_setup_, passive_setup_buffer_, ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, set_sock_options(), ACE_Message_Block::size(), tcp_config_, OpenDDS::DCPS::TcpAcceptor::transport(), transport_during_setup_, and VDBG_LVL.

Referenced by open().

154 {
155  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::passive_open.\n"), 2);
156  // The passed-in arg is really the acceptor object that created this
157  // TcpConnection object, and is also the caller of this open()
158  // method. We need to cast the arg to the TcpAcceptor* type.
159  TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg);
160 
161  if (acceptor == 0) {
162  // The cast failed.
163  ACE_ERROR_RETURN((LM_ERROR,
164  ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
165  ACE_TEXT("failed to cast void* arg to ")
166  ACE_TEXT("TcpAcceptor* type.\n")),
167  -1);
168  }
169 
170  TcpConnection_rch self(this, keep_count());
171 
172  // Now we need to ask the TcpAcceptor object to provide us with
173  // a pointer to the TcpTransport object that "owns" the acceptor.
174  RcHandle<TcpTransport> transport = acceptor->transport();
175 
176  if (!transport) {
177  // The acceptor gave us a nil transport (smart) pointer.
178  ACE_ERROR_RETURN((LM_ERROR,
179  ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
180  ACE_TEXT("acceptor's transport is nil.\n")),
181  -1);
182  }
183 
184  // Keep a "copy" of the reference to TcpInst object
185  // for ourselves.
186  TcpInst_rch cfg = transport->config();
187  if (!cfg) {
188  if (log_level >= LogLevel::Notice) {
189  ACE_ERROR((LM_NOTICE, "((%P|%t)) NOTICE: TcpConnection::open() - Invalid Transport Instance.\n"));
190  }
191  return -1;
192  }
193  tcp_config_ = cfg;
194  local_address_ = cfg->local_address();
195 
196  set_sock_options(cfg);
197 
198  // We expect that the active side of the connection (the remote side
199  // in this case) will supply its listening ACE_INET_Addr as the first
200  // message it sends to the socket. This is a one-way connection
201  // establishment protocol message.
202  passive_setup_ = true;
203  transport_during_setup_ = transport.get();
204  passive_setup_buffer_.size(sizeof(ACE_UINT32));
205 
206  if (reactor()->register_handler(this, READ_MASK) == -1) {
207  ACE_ERROR_RETURN((LM_ERROR,
208  ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
209  ACE_TEXT("unable to register with the reactor.%p\n"),
210  ACE_TEXT("register_handler")),
211  -1);
212  }
213 
214  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open passive handle=%d.\n",
215  static_cast<int>(intptr_t(get_handle()))), 2);
216 
217  return 0;
218 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_ERROR(X)
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
TcpTransport * transport_during_setup_
RcHandle< TcpConnection > TcpConnection_rch
ACE_Message_Block passive_setup_buffer_
size_t size(void) const
virtual ACE_Reactor * reactor(void) const
ACE_TEXT("TCP_Factory")
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define ACE_ERROR_RETURN(X, Y)
void set_sock_options(const TcpInst_rch &tcp_config)
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.
ACE_INET_Addr local_address_
Local address.

◆ passive_reconnect_i()

void OpenDDS::DCPS::TcpConnection::passive_reconnect_i ( )
private

Definition at line 558 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::DataLink::DISCONNECTED, OpenDDS::DCPS::TimeDuration::from_msec(), INIT_STATE, link_, OpenDDS::DCPS::DataLink::notify(), PASSIVE_WAITING_STATE, ACE_Event_Handler::reactor(), reconnect_state_, ACE_Reactor::schedule_timer(), shutdown_, tcp_config_, OpenDDS::DCPS::TimeDuration::value(), and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by handle_close(), and impl().

559 {
560  DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6);
561 
562  if (this->shutdown_) {
563  return;
564  }
565 
566  TcpInst_rch cfg = tcp_config_.lock();
567  if (!cfg) {
568  return;
569  }
570 
571  if (this->reconnect_state_ == INIT_STATE) {
572  // Mark the connection lost since the recv/send just failed.
573  // this->connected_ = false;
574 
575  if (cfg->passive_reconnect_duration_ == 0)
576  return;
577 
580 
581  TimeDuration delay = TimeDuration::from_msec(cfg->passive_reconnect_duration_);
582  this->reactor()->schedule_timer(this, 0, delay.value(), TimeDuration::zero_value.value());
583  }
584 }
void notify(ConnectionNotice notice)
Definition: DataLink.cpp:848
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
RcHandle< TcpInst > TcpInst_rch
Definition: TcpInst_rch.h:18
static const TimeDuration zero_value
Definition: TimeDuration.h:31
bool shutdown_
shutdown flag
const ACE_Time_Value & value() const
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
virtual ACE_Reactor * reactor(void) const
static TimeDuration from_msec(const ACE_UINT64 &ms)
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.

◆ receive_strategy()

OpenDDS::DCPS::TcpReceiveStrategy_rch OpenDDS::DCPS::TcpConnection::receive_strategy ( )

Definition at line 99 of file TcpConnection.cpp.

References link_, and OpenDDS::DCPS::TcpDataLink::receive_strategy().

Referenced by handle_close(), and handle_input().

100 {
101  return this->link_->receive_strategy();
102 }
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
TcpReceiveStrategy_rch receive_strategy()

◆ reconnect_state_string()

const char * OpenDDS::DCPS::TcpConnection::reconnect_state_string ( ) const
private

Get name of the current reconnect state as a string.

Definition at line 937 of file TcpConnection.cpp.

References ACE_ERROR, ACE_TEXT(), ACTIVE_RECONNECTING_STATE, ACTIVE_WAITING_STATE, INIT_STATE, LM_ERROR, LOST_STATE, PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_state_, and RECONNECTED_STATE.

Referenced by active_reconnect_i(), close(), handle_close(), handle_setup_input(), and handle_timeout().

938 {
939  switch (reconnect_state_) {
940  case INIT_STATE:
941  return "INIT_STATE";
942  case LOST_STATE:
943  return "LOST_STATE";
944  case RECONNECTED_STATE:
945  return "RECONNECTED_STATE";
947  return "ACTIVE_RECONNECTING_STATE";
949  return "ACTIVE_WAITING_STATE";
951  return "PASSIVE_WAITING_STATE";
953  return "PASSIVE_TIMEOUT_CALLED_STATE";
954  default:
955  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: TcpConnection::reconnect_state_string: ")
956  ACE_TEXT("%d is either invalid or not recognized.\n"),
958  return "Invalid reconnect state";
959  }
960 }
#define ACE_ERROR(X)
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
ACE_TEXT("TCP_Factory")

◆ relink_from_recv()

void OpenDDS::DCPS::TcpConnection::relink_from_recv ( bool  do_suspend)

Reconnect initiated by receive strategy.

This is called by TcpReceiveStrategy when a disconnect is detected. It simply suspends any sends and lets the handle_close() handle the reconnect logic.

Definition at line 896 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, and send_strategy().

897 {
898  DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6);
900  if (do_suspend && send_strategy)
901  send_strategy->suspend_send();
902 }
TcpSendStrategy_rch send_strategy()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
RcHandle< TcpSendStrategy > TcpSendStrategy_rch

◆ relink_from_send()

void OpenDDS::DCPS::TcpConnection::relink_from_send ( bool  do_suspend)

Reconnect initiated by send strategy.

This is called by TcpSendStrategy when a send fails and a reconnect should be initiated. This method suspends any sends and kicks the reconnect thread into action.

Definition at line 883 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, and send_strategy().

884 {
885  DBG_ENTRY_LVL("TcpConnection","relink_from_send",6);
886 
888  if (do_suspend && send_strategy)
889  send_strategy->suspend_send();
890 }
TcpSendStrategy_rch send_strategy()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
RcHandle< TcpSendStrategy > TcpSendStrategy_rch

◆ remove_reference()

ACE_Event_Handler::Reference_Count OpenDDS::DCPS::TcpConnection::remove_reference ( void  )
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 931 of file TcpConnection.cpp.

References OpenDDS::DCPS::RcObject::_remove_ref().

Referenced by impl().

932 {
934  return 1;
935 }
virtual void _remove_ref()
Definition: RcObject.h:74

◆ send_strategy()

OpenDDS::DCPS::TcpSendStrategy_rch OpenDDS::DCPS::TcpConnection::send_strategy ( )

Definition at line 93 of file TcpConnection.cpp.

References link_, and OpenDDS::DCPS::TcpDataLink::send_strategy().

Referenced by active_reconnect_open(), close(), handle_close(), handle_output(), notify_connection_lost(), relink_from_recv(), and relink_from_send().

94 {
95  return this->link_->send_strategy();
96 }
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
TcpSendStrategy_rch send_strategy()

◆ set_datalink()

void OpenDDS::DCPS::TcpConnection::set_datalink ( const TcpDataLink_rch link)

Cache the reference to the datalink object for lost connection callbacks.

Definition at line 77 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::dynamic_rchandle_cast(), OpenDDS::DCPS::DataLink::impl(), impl_, link_, reconnect_lock_, and OpenDDS::DCPS::RcHandle< T >::reset().

78 {
79  DBG_ENTRY_LVL("TcpConnection","set_datalink",6);
80 
82 
83  link_ = link;
84  if (link_) {
85  impl_ = dynamic_rchandle_cast<TcpTransport>(link_->impl());
86  } else {
87  impl_.reset();
88  }
89 }
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
ACE_Guard< LockType > GuardType
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
TcpTransport_rch impl_
Impl object which is needed for connection objects and reconnect task.
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ set_sock_options()

void OpenDDS::DCPS::TcpConnection::set_sock_options ( const TcpInst_rch tcp_config)

Definition at line 432 of file TcpConnection.cpp.

References ACE_DEFAULT_MAX_SOCKET_BUFSIZ, ACE_ERROR, ENOTSUP, IPPROTO_TCP, LM_ERROR, ACE_Svc_Handler< ACE_SOCK_STREAM, ACE_NULL_SYNCH >::peer(), SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, and TCP_NODELAY.

Referenced by on_active_connection_established(), and passive_open().

433 {
434 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
435  int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
436  int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
437  //ACE_SOCK_Stream sock = ACE_static_cast(ACE_SOCK_Stream, this->peer() );
438 # if !defined (ACE_LACKS_SOCKET_BUFSIZ)
439 
440  // A little screwy double negative logic: disabling nagle involves
441  // enabling TCP_NODELAY
442  int opt = (tcp_config->enable_nagle_algorithm_ == false);
443 
444  if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
445  ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n"));
446  }
447 
448  if (this->peer().set_option(SOL_SOCKET,
449  SO_SNDBUF,
450  (void *) &snd_size,
451  sizeof(snd_size)) == -1
452  && errno != ENOTSUP) {
453  ACE_ERROR((LM_ERROR,
454  "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
455  snd_size));
456  return;
457  }
458 
459  if (this->peer().set_option(SOL_SOCKET,
460  SO_RCVBUF,
461  (void *) &rcv_size,
462  sizeof(int)) == -1
463  && errno != ENOTSUP) {
464  ACE_ERROR((LM_ERROR,
465  "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m\n",
466  rcv_size));
467  return;
468  }
469 
470 # else
471  ACE_UNUSED_ARG(tcp_config);
472  ACE_UNUSED_ARG(snd_size);
473  ACE_UNUSED_ARG(rcv_size);
474 # endif /* !ACE_LACKS_SOCKET_BUFSIZ */
475 
476 #else
477  ACE_UNUSED_ARG(tcp_config);
478 #endif /* !ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
479 }
#define ACE_ERROR(X)
#define ACE_DEFAULT_MAX_SOCKET_BUFSIZ

◆ shutdown()

void OpenDDS::DCPS::TcpConnection::shutdown ( void  )

Definition at line 913 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, reconnect_lock_, ACE_Svc_Handler< PEER_STREAM, SYNCH_TRAITS >::shutdown(), and shutdown_.

Referenced by ~TcpConnection().

914 {
915  DBG_ENTRY_LVL("TcpConnection", "shutdown", 6);
916 
917  GuardType guard(reconnect_lock_);
918  shutdown_ = true;
919 
921 }
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
bool shutdown_
shutdown flag
ACE_Guard< LockType > GuardType
void shutdown(void)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ tear_link()

void OpenDDS::DCPS::TcpConnection::tear_link ( )

Called by the reconnect task to inform us that the link & any associated data can be torn down. This call is done with no DCPS/transport locks held.

Definition at line 905 of file TcpConnection.cpp.

References DBG_ENTRY_LVL, link_, and OpenDDS::DCPS::DataLink::release_resources().

Referenced by handle_timeout().

906 {
907  DBG_ENTRY_LVL("TcpConnection","tear_link",6);
908 
909  return link_->release_resources();
910 }
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ transfer()

void OpenDDS::DCPS::TcpConnection::transfer ( TcpConnection connection)

This object would be "old" connection object and the provided is the new connection object. The "old" connection object will copy its states to to the "new" connection object. This is called by the TcpDataLink when a new connection is accepted (with a new TcpConnection object). We need make the state in "new" connection object consistent with the "old" connection object.

Definition at line 782 of file TcpConnection.cpp.

References ACE_ERROR, ACE_TEXT(), DBG_ENTRY_LVL, impl_, INIT_STATE, is_connector_, link_, LM_DEBUG, LM_ERROR, local_address_, LOST_STATE, OpenDDS::DCPS::DataLink::notify(), PASSIVE_TIMEOUT_CALLED_STATE, PASSIVE_WAITING_STATE, reconnect_lock_, reconnect_state_, OpenDDS::DCPS::DataLink::RECONNECTED, RECONNECTED_STATE, remote_address_, shutdown_, tcp_config_, and VDBG.

783 {
784  DBG_ENTRY_LVL("TcpConnection","transfer",6);
785  GuardType guard(reconnect_lock_);
786 
787  if (shutdown_) {
788  return;
789  }
790 
791  bool notify_reconnect = false;
792 
793  switch (this->reconnect_state_) {
794  case INIT_STATE:
795  // We have not detected the lost connection and the peer is faster than us and
796  // re-established the connection. so do not notify reconnected.
797  break;
798 
799  case LOST_STATE:
800 
801  // The reconnect timed out.
803  // TODO: If the handle_timeout is called before the old connection
804  // transfer its state to new connection then should we disconnect
805  // the new connection or keep it alive ?
806  // I think we should keep the connection, the user will get a
807  // lost connection notification and then a reconnected notification.
808  notify_reconnect = true;
809  break;
810 
811  case PASSIVE_WAITING_STATE: {
812  // we just let the timer expires by itself. When the timer
813  // expires, it already transitions to the RECONNECTED_STATE,
814  // and do nothing in handle_timeout(). We don't need to delete
815  // the timer explicitly.
816  notify_reconnect = true;
817  }
818  break;
819 
820  default :
821  ACE_ERROR((LM_ERROR,
822  ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
823  ACE_TEXT(" unknown state or it should not be in state=%i\n"),
825  break;
826  }
827 
828  // Verify if this acceptor side.
829  if (this->is_connector_ || connection->is_connector_) {
830  ACE_ERROR((LM_ERROR,
831  ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
832  ACE_TEXT(" should NOT be called by the connector side\n")));
833  }
834 
835  connection->remote_address_ = this->remote_address_;
836  connection->local_address_ = this->local_address_;
837  connection->tcp_config_ = this->tcp_config_;
838  connection->link_ = this->link_;
839  connection->impl_ = this->impl_;
840 
841  VDBG((LM_DEBUG, "(%P|%t) DBG: transfer(%C->%C) passive reconnected. new con %@ old con %@\n",
842  LogAddr(remote_address_).c_str(), LogAddr(local_address_).c_str(),
843  connection, this));
844 
845  if (notify_reconnect) {
848  }
849 
850 }
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
#define ACE_ERROR(X)
void notify(ConnectionNotice notice)
Definition: DataLink.cpp:848
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
bool shutdown_
shutdown flag
ACE_Guard< LockType > GuardType
TcpTransport_rch impl_
Impl object which is needed for connection objects and reconnect task.
#define VDBG(DBG_ARGS)
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
ACE_TEXT("TCP_Factory")
ACE_INET_Addr remote_address_
Remote address.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.
bool is_connector_
Flag indicate this connection object is the connector or acceptor.
ACE_INET_Addr local_address_
Local address.

◆ transport_priority() [1/2]

ACE_INLINE OpenDDS::DCPS::Priority & OpenDDS::DCPS::TcpConnection::transport_priority ( )

Access TRANSPORT_PRIORITY.value policy value if set.

Definition at line 42 of file TcpConnection.inl.

References ACE_INLINE, and transport_priority_.

Referenced by impl().

43 {
44  return this->transport_priority_;
45 }
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.

◆ transport_priority() [2/2]

ACE_INLINE OpenDDS::DCPS::Priority OpenDDS::DCPS::TcpConnection::transport_priority ( ) const

Definition at line 49 of file TcpConnection.inl.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL, and transport_priority_.

50 {
51  return this->transport_priority_;
52 }
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.

Member Data Documentation

◆ conn_retry_counter_

int OpenDDS::DCPS::TcpConnection::conn_retry_counter_
private

Definition at line 194 of file TcpConnection.h.

Referenced by active_reconnect_i(), active_reconnect_open(), and close().

◆ id_

std::size_t OpenDDS::DCPS::TcpConnection::id_
private

Small unique identifying value.

Definition at line 193 of file TcpConnection.h.

Referenced by handle_output(), and id().

◆ impl_

TcpTransport_rch OpenDDS::DCPS::TcpConnection::impl_
private

Impl object which is needed for connection objects and reconnect task.

Definition at line 177 of file TcpConnection.h.

Referenced by impl(), set_datalink(), and transfer().

◆ is_connector_

bool OpenDDS::DCPS::TcpConnection::is_connector_
private

Flag indicate this connection object is the connector or acceptor.

Definition at line 162 of file TcpConnection.h.

Referenced by handle_close(), is_connector(), open(), and transfer().

◆ link_

TcpDataLink_rch OpenDDS::DCPS::TcpConnection::link_
private

◆ local_address_

ACE_INET_Addr OpenDDS::DCPS::TcpConnection::local_address_
private

Local address.

Definition at line 168 of file TcpConnection.h.

Referenced by active_open(), active_reconnect_i(), handle_setup_input(), handle_timeout(), passive_open(), and transfer().

◆ passive_setup_

bool OpenDDS::DCPS::TcpConnection::passive_setup_
private

Definition at line 188 of file TcpConnection.h.

Referenced by handle_input(), handle_setup_input(), and passive_open().

◆ passive_setup_buffer_

ACE_Message_Block OpenDDS::DCPS::TcpConnection::passive_setup_buffer_
private

Definition at line 189 of file TcpConnection.h.

Referenced by handle_setup_input(), and passive_open().

◆ reconnect_lock_

LockType OpenDDS::DCPS::TcpConnection::reconnect_lock_
private

Lock to synchronize state between reactor and non-reactor threads.

Definition at line 159 of file TcpConnection.h.

Referenced by active_reconnect_i(), active_reconnect_open(), handle_close(), handle_timeout(), notify_lost_on_backpressure_timeout(), set_datalink(), shutdown(), and transfer().

◆ reconnect_state_

ReconnectState OpenDDS::DCPS::TcpConnection::reconnect_state_
private

◆ remote_address_

ACE_INET_Addr OpenDDS::DCPS::TcpConnection::remote_address_
private

◆ shutdown_

bool OpenDDS::DCPS::TcpConnection::shutdown_
private

shutdown flag

Definition at line 186 of file TcpConnection.h.

Referenced by active_reconnect_i(), active_reconnect_open(), passive_reconnect_i(), shutdown(), and transfer().

◆ tcp_config_

WeakRcHandle<TcpInst> OpenDDS::DCPS::TcpConnection::tcp_config_
private

◆ transport_during_setup_

TcpTransport* OpenDDS::DCPS::TcpConnection::transport_during_setup_
private

Definition at line 190 of file TcpConnection.h.

Referenced by handle_setup_input(), and passive_open().

◆ transport_priority_

Priority OpenDDS::DCPS::TcpConnection::transport_priority_
private

TRANSPORT_PRIORITY.value policy value.

Definition at line 183 of file TcpConnection.h.

Referenced by active_open(), handle_setup_input(), handle_timeout(), on_active_connection_established(), and transport_priority().


The documentation for this class was generated from the following files: