20 using ::OpenDDS::DCPS::TimeDuration;
29 #if !defined (__ACE_INLINE__) 36 : is_connector_(false)
38 , reconnect_state_(INIT_STATE)
39 , transport_priority_(0)
41 , passive_setup_(false)
42 , passive_setup_buffer_(sizeof(ACE_UINT32))
43 , transport_during_setup_(0)
45 , conn_retry_counter_(0)
113 this->
peer().close();
164 ACE_TEXT(
"(%P|%t) ERROR: TcpConnection::open() - ")
165 ACE_TEXT(
"failed to cast void* arg to ")
179 ACE_TEXT(
"(%P|%t) ERROR: TcpConnection::open() - ")
180 ACE_TEXT(
"acceptor's transport is nil.\n")),
189 ACE_ERROR((
LM_NOTICE,
"((%P|%t)) NOTICE: TcpConnection::open() - Invalid Transport Instance.\n"));
208 ACE_TEXT(
"(%P|%t) ERROR: TcpConnection::open() - ")
209 ACE_TEXT(
"unable to register with the reactor.%p\n"),
214 VDBG_LVL((
LM_DEBUG,
"(%P|%t) DBG: TcpConnection::open passive handle=%d.\n",
215 static_cast<int>(intptr_t(
get_handle()))), 2);
227 if (ret < 0 && errno ==
ETIME) {
232 "recv returned %b %m.\n",
this, ret), 4);
248 const ACE_UINT32 hlen = ntohl(nlen);
251 ACE_UINT32 nprio = 0;
265 VDBG((
LM_DEBUG,
"(%P|%t) DBG: TcpConnection::handle_setup_input " 266 "%@ %C->%C, priority==%d, reconnect_state = %C\n",
this,
270 network_resource.dump();
275 VDBG((
LM_DEBUG,
"(%P|%t) DBG: TcpConnection::handle_setup_input " 276 "remove_handler failed %m.\n"));
301 if (!receive_strategy) {
305 return receive_strategy->handle_dds_input(fd);
319 ACE_TEXT(
"(%P|%t) TcpConnection::handle_output() [%d] - ")
326 != send_strategy->perform_work()) {
331 send_strategy->schedule_output();
350 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) TcpConnection()::close() on transport: %C to %C because of reconnect failure.\n",
366 ss->terminate_send();
378 static const std::string null_name(
"(couldn't get name)");
380 return cfg ? cfg->name() : null_name;
391 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) TcpConnection::handle_close() called on transport: %C to %C , reconnect_state = %C.\n",
408 const bool graceful = receive_strategy && receive_strategy->gracefully_disconnected();
412 send_strategy->terminate_send();
414 send_strategy->suspend_send();
434 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ) 438 # if !defined (ACE_LACKS_SOCKET_BUFSIZ) 442 int opt = (tcp_config->enable_nagle_algorithm_ ==
false);
451 sizeof(snd_size)) == -1
454 "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
465 "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m\n",
471 ACE_UNUSED_ARG(tcp_config);
472 ACE_UNUSED_ARG(snd_size);
473 ACE_UNUSED_ARG(rcv_size);
477 ACE_UNUSED_ARG(tcp_config);
492 ACE_ERROR((
LM_NOTICE,
"((%P|%t)) NOTICE: TcpConnection::on_active_connection_established() - Invalid Transport Instance.\n"));
503 const std::string address = cfg->get_public_address();
507 "(%P|%t) TcpConnection::on_active_connection_established: " 508 "Sending public address <%C> to remote side\n",
512 ACE_UINT32 len =
static_cast<ACE_UINT32
>(address.length()) + 1;
514 ACE_UINT32 nlen = htonl(len);
516 if (this->
peer().
send_n(&nlen,
sizeof(ACE_UINT32)) == -1) {
519 "(%P|%t) WARNING: TcpConnection::on_active_connection_established: " 520 "Unable to send address string length to " 521 "the passive side to complete the active connection " 522 "establishment.\n"));
527 if (this->
peer().
send_n(address.c_str(), len) == -1) {
530 "(%P|%t) WARNING: TcpConnection::on_active_connection_established: " 531 "Unable to send our address to " 532 "the passive side to complete the active connection " 533 "establishment.\n"));
540 if (this->
peer().
send_n(&npriority,
sizeof(ACE_UINT32)) == -1) {
543 "(%P|%t) WARNING: TcpConnection::on_active_connection_established: " 544 "Unable to send publication priority to " 545 "the passive side to complete the active connection " 546 "establishment.\n"));
575 if (cfg->passive_reconnect_duration_ == 0)
614 if (this->conn_retry_counter_ < cfg->conn_retry_attempts_ ) {
619 double retry_delay_msec = cfg->conn_retry_initial_delay_;
620 retry_delay_msec *= std::pow(cfg->conn_retry_backoff_multiplier_, this->conn_retry_counter_);
624 "active_reconnect_i(%C->%C) reconnect_state = %C, conn_retry_counter_=%d, retry_delay_msec=%f\n",
630 timeout.
msec(static_cast<int>(retry_delay_msec));
656 "active_reconnect_i() socket operation is already in progress, wait another second to initiate the connect\n"));
679 send_strategy->terminate_send();
690 if (cfg && cfg->conn_retry_attempts_ > 0) {
691 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) we tried and failed to re-establish connection on transport: %C to %C.\n",
694 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) we did not try to re-establish connection on transport: %C to %C.\n",
713 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C.\n",
727 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C.\n",
733 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) TcpConnection::handle_timeout, failed connection initialization due to timeout.: %C to %C.\n",
766 ACE_TEXT(
"(%P|%t) ERROR: TcpConnection::handle_timeout, ")
767 ACE_TEXT(
" unknown state or it should not be in state = %d\n"),
791 bool notify_reconnect =
false;
808 notify_reconnect =
true;
816 notify_reconnect =
true;
822 ACE_TEXT(
"(%P|%t) ERROR: TcpConnection::transfer, ")
823 ACE_TEXT(
" unknown state or it should not be in state=%i\n"),
831 ACE_TEXT(
"(%P|%t) ERROR: TcpConnection::transfer, ")
832 ACE_TEXT(
" should NOT be called by the connector side\n")));
841 VDBG((
LM_DEBUG,
"(%P|%t) DBG: transfer(%C->%C) passive reconnected. new con %@ old con %@\n",
845 if (notify_reconnect) {
859 DBG_ENTRY_LVL(
"TcpConnection",
"notify_lost_on_backpressure_timeout",6);
860 bool notify_lost =
false;
888 if (do_suspend && send_strategy)
889 send_strategy->suspend_send();
900 if (do_suspend && send_strategy)
901 send_strategy->suspend_send();
945 return "RECONNECTED_STATE";
947 return "ACTIVE_RECONNECTING_STATE";
949 return "ACTIVE_WAITING_STATE";
951 return "PASSIVE_WAITING_STATE";
953 return "PASSIVE_TIMEOUT_CALLED_STATE";
956 ACE_TEXT(
"%d is either invalid or not recognized.\n"),
958 return "Invalid reconnect state";
968 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) re-established connection on transport: %C to %C.\n",
984 "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_open() can't register " 985 "with reactor %X %p\n",
this,
ACE_TEXT(
"register_handler")),
RcHandle< T > rchandle_from(T *pointer)
void active_reconnect_i()
TransportImpl_rch impl() const
virtual int connect(SVC_HANDLER *&svc_handler, const typename PEER_CONNECTOR::PEER_ADDR &remote_addr, const ACE_Synch_Options &synch_options=ACE_Synch_Options::defaults, const typename PEER_CONNECTOR::PEER_ADDR &local_addr=reinterpret_cast< const peer_addr_type & >(peer_addr_type::sap_any), int reuse_addr=0, int flags=O_RDWR, int perms=0)
Reference_Counting_Policy & reference_counting_policy(void)
virtual int close(u_long)
ssize_t send_n(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0, size_t *bytes_transferred=0)
size_t length(void) const
unsigned long ACE_Reactor_Mask
void passive_connection(const ACE_INET_Addr &remote_address, const TcpConnection_rch &connection)
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
Encapsulate a priority value and internet address as a key.
bool shutdown_
shutdown flag
void * memcpy(void *t, const void *s, size_t len)
virtual short codepoint() const
Access the mapped DiffServ codepoint value.
std::size_t id_
Small unique identifying value.
const ACE_Time_Value & value() const
virtual ACE_Event_Handler::Reference_Count add_reference()
TcpReceiveStrategy_rch receive_strategy()
ACE_Message_Block passive_setup_buffer_
static ACE_Synch_Options asynch
char * rd_ptr(void) const
const std::string & config_name() const
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.
TcpConnection()
Passive side constructor (acceptor)
void notify_connection_lost()
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
virtual int open(void *arg)
TcpTransport * transport_during_setup_
ACE_SOCK_STREAM & peer(void) const
void drop_pending_request_acks()
void notify(ConnectionNotice notice)
static TimeDuration from_msec(const ACE_UINT64 &ms)
void async_connect_failed(const PriorityKey &key)
TcpSendStrategy_rch send_strategy()
virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
int on_active_connection_established()
Handle the logic after an active connection has been established.
void set_dscp_codepoint(int cp, ACE_SOCK &socket)
virtual ACE_HANDLE get_handle(void) const
virtual ACE_Event_Handler::Reference_Count remove_reference()
TcpReceiveStrategy_rch receive_strategy()
Defines a wrapper around address info which is used for advertise.
virtual int handle_output(ACE_HANDLE)
Handle back pressure when sending.
TcpTransport_rch impl_
Impl object which is needed for connection objects and reconnect task.
virtual int priority(void) const
TimePoint_T< MonotonicClock > MonotonicTimePoint
char * wr_ptr(void) const
RcHandle< TcpTransport > transport()
virtual int cancel(SVC_HANDLER *svc_handler)
#define ACE_DEFAULT_MAX_SOCKET_BUFSIZ
virtual void _remove_ref()
virtual ACE_Reactor * reactor(void) const
void transfer(TcpConnection *connection)
void relink_from_send(bool do_suspend)
Reconnect initiated by send strategy.
static const TimeDuration zero_value
void notify_lost_on_backpressure_timeout()
map TRANSPORT_PRIORITY values directly.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
OpenDDS_Dcps_Export LogLevel log_level
unsigned long msec(void) const
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
int handle_timeout(const ACE_Time_Value &tv, const void *arg)
ACE_INET_Addr remote_address_
Remote address.
static const ACE_Time_Value zero
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
void relink_from_recv(bool do_suspend)
Reconnect initiated by receive strategy.
void passive_reconnect_i()
Connector connector_
Open TcpConnections using non-blocking connect.
int connect_tcp_datalink(TcpDataLink &link, const TcpConnection_rch &connection)
Common code used by accept_datalink(), passive_connection(), and active completion.
int handle_setup_input(ACE_HANDLE h)
TcpSendStrategy_rch send_strategy()
#define ACE_ERROR_RETURN(X, Y)
void handle_stop_reconnecting()
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
bool is_release_pending() const
Get release pending flag.
int active_reconnect_open()
#define TheServiceParticipant
const char * reconnect_state_string() const
Get name of the current reconnect state as a string.
void set_sock_options(const TcpInst_rch &tcp_config)
virtual int handle_input(ACE_HANDLE)
We pass this "event" along to the receive_strategy.
bool is_connector_
Flag indicate this connection object is the connector or acceptor.
ACE_INET_Addr local_address_
Local address.
void set_datalink(const TcpDataLink_rch &link)