OpenDDS  Snapshot(2023/04/28-20:55)
TcpConnection.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "TcpConnection.h"
9 #include "TcpTransport.h"
10 #include "TcpInst.h"
11 #include "TcpDataLink.h"
12 #include "TcpReceiveStrategy.h"
13 #include "TcpSendStrategy.h"
16 #include <dds/DCPS/LogAddr.h>
17 #include "dds/DCPS/Time_Helper.h"
18 
20 using ::OpenDDS::DCPS::TimeDuration;
21 
23 #include "ace/OS_NS_arpa_inet.h"
24 #include "ace/OS_NS_unistd.h"
25 #include <sstream>
26 #include <string>
27 #include <cmath>
28 
29 #if !defined (__ACE_INLINE__)
30 #include "TcpConnection.inl"
31 #endif /* __ACE_INLINE__ */
32 
34 
36  : is_connector_(false)
37  , tcp_config_()
38  , reconnect_state_(INIT_STATE)
39  , transport_priority_(0) // TRANSPORT_PRIORITY.value default value - 0.
40  , shutdown_(false)
41  , passive_setup_(false)
42  , passive_setup_buffer_(sizeof(ACE_UINT32))
43  , transport_during_setup_(0)
44  , id_(0)
45  , conn_retry_counter_(0)
46 {
47  DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
49 }
50 
53  const TcpInst_rch& config)
54  : is_connector_(true)
55  , remote_address_(remote_address)
56  , local_address_(config->local_address())
57  , tcp_config_(config)
59  , transport_priority_(priority)
60  , shutdown_(false)
61  , passive_setup_(false)
63  , id_(0)
65 {
66  DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
68 }
69 
71 {
72  DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6);
73  shutdown();
74 }
75 
76 void
78 {
79  DBG_ENTRY_LVL("TcpConnection","set_datalink",6);
80 
82 
83  link_ = link;
84  if (link_) {
86  } else {
87  impl_.reset();
88  }
89 }
90 
91 
94 {
95  return this->link_->send_strategy();
96 }
97 
100 {
101  return this->link_->receive_strategy();
102 }
103 
104 void
106 {
107  DBG_ENTRY_LVL("TcpConnection","disconnect",6);
108 
109  if (this->link_) {
111  }
112 
113  this->peer().close();
114 }
115 
116 int
118 {
119  DBG_ENTRY_LVL("TcpConnection","open",6);
120 
121  if (is_connector_) {
123  return active_reconnect_open();
124  }
125  return active_open();
126  }
127  return passive_open(arg);
128 }
129 
130 int
132 {
133  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::active_open.\n"), 2);
134  VDBG((LM_DEBUG, "(%P|%t) DBG: active_open(%C->%C)\n",
135  LogAddr(local_address_).c_str(), LogAddr(remote_address_).c_str()));
136 
138 
139  if (transport) {
140  if (on_active_connection_established() != -1 && transport->connect_tcp_datalink(*link_, rchandle_from(this)) != -1) {
141  return 0;
142  }
143 
144  const bool is_loop(local_address_ == remote_address_);
146  is_loop, true /* active */);
147  transport->async_connect_failed(key);
148  }
149  return -1;
150 }
151 
152 int
154 {
155  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::passive_open.\n"), 2);
156  // The passed-in arg is really the acceptor object that created this
157  // TcpConnection object, and is also the caller of this open()
158  // method. We need to cast the arg to the TcpAcceptor* type.
159  TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg);
160 
161  if (acceptor == 0) {
162  // The cast failed.
164  ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
165  ACE_TEXT("failed to cast void* arg to ")
166  ACE_TEXT("TcpAcceptor* type.\n")),
167  -1);
168  }
169 
170  TcpConnection_rch self(this, keep_count());
171 
172  // Now we need to ask the TcpAcceptor object to provide us with
173  // a pointer to the TcpTransport object that "owns" the acceptor.
174  RcHandle<TcpTransport> transport = acceptor->transport();
175 
176  if (!transport) {
177  // The acceptor gave us a nil transport (smart) pointer.
179  ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
180  ACE_TEXT("acceptor's transport is nil.\n")),
181  -1);
182  }
183 
184  // Keep a "copy" of the reference to TcpInst object
185  // for ourselves.
186  TcpInst_rch cfg = transport->config();
187  if (!cfg) {
188  if (log_level >= LogLevel::Notice) {
189  ACE_ERROR((LM_NOTICE, "((%P|%t)) NOTICE: TcpConnection::open() - Invalid Transport Instance.\n"));
190  }
191  return -1;
192  }
193  tcp_config_ = cfg;
194  local_address_ = cfg->local_address();
195 
196  set_sock_options(cfg);
197 
198  // We expect that the active side of the connection (the remote side
199  // in this case) will supply its listening ACE_INET_Addr as the first
200  // message it sends to the socket. This is a one-way connection
201  // establishment protocol message.
202  passive_setup_ = true;
203  transport_during_setup_ = transport.get();
204  passive_setup_buffer_.size(sizeof(ACE_UINT32));
205 
206  if (reactor()->register_handler(this, READ_MASK) == -1) {
208  ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
209  ACE_TEXT("unable to register with the reactor.%p\n"),
210  ACE_TEXT("register_handler")),
211  -1);
212  }
213 
214  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::open passive handle=%d.\n",
215  static_cast<int>(intptr_t(get_handle()))), 2);
216 
217  return 0;
218 }
219 
220 int
222 {
223  const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(),
226 
227  if (ret < 0 && errno == ETIME) {
228  return 0;
229  }
230 
231  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input %@ "
232  "recv returned %b %m.\n", this, ret), 4);
233 
234  if (ret <= 0) {
235  return -1;
236  }
237 
239  // Parse the setup message: <len><addr><prio>
240  // len and prio are network order 32-bit ints
241  // addr is a string of length len, including null
242  ACE_UINT32 nlen = 0;
243 
244  if (passive_setup_buffer_.length() >= sizeof(nlen)) {
245 
246  ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen));
247  passive_setup_buffer_.rd_ptr(sizeof(nlen));
248  const ACE_UINT32 hlen = ntohl(nlen);
249  passive_setup_buffer_.size(hlen + 2 * sizeof(nlen));
250 
251  ACE_UINT32 nprio = 0;
252 
253  if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) {
254 
255  const std::string bufstr(passive_setup_buffer_.rd_ptr());
256  const NetworkResource network_resource(bufstr);
257  network_resource.to_addr(remote_address_);
258 
259  ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio));
260  transport_priority_ = ntohl(nprio);
261 
263  passive_setup_ = false;
264 
265  VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input "
266  "%@ %C->%C, priority==%d, reconnect_state = %C\n", this,
267  LogAddr(remote_address_).c_str(), LogAddr(local_address_).c_str(),
269  if (DCPS_debug_level > 9) {
270  network_resource.dump();
271  }
272 
273  // remove from reactor, normal recv strategy setup will add us back
274  if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) {
275  VDBG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::handle_setup_input "
276  "remove_handler failed %m.\n"));
277  }
278 
280 
281  return 0;
282  }
283  }
284 
286 
287  return 0;
288 }
289 
290 int
292 {
293  DBG_ENTRY_LVL("TcpConnection","handle_input",6);
294 
295  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
296 
297  if (passive_setup_) {
298  return handle_setup_input(fd);
299  }
301  if (!receive_strategy) {
302  return 0;
303  }
304 
305  return receive_strategy->handle_dds_input(fd);
306 }
307 
308 int
310 {
311  DBG_ENTRY_LVL("TcpConnection","handle_output",6);
312 
313  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
314 
316  if (send_strategy) {
317  if (DCPS_debug_level > 9) {
319  ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ")
320  ACE_TEXT("sending queued data.\n"),
321  id_));
322  }
323 
324  // Process data to be sent from the queue.
326  != send_strategy->perform_work()) {
327 
328  // Stop handling output ready events when there is nothing to output.
329  // N.B. This calls back into the reactor. Is the reactor lock
330  // recursive?
331  send_strategy->schedule_output();
332  }
333  }
334 
335  return 0;
336 }
337 
338 int
340 {
341  DBG_ENTRY_LVL("TcpConnection","close",6);
342 
343  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::close, reconnect_state_=%C\n", reconnect_state_string()));
344 
345  TcpInst_rch cfg = tcp_config_.lock();
347  // This would be called when using ACE_Connector to initiate an async connect and
348  // the network stack detects the destination is unreachable before timeout.
349  if (DCPS_debug_level >= 1) {
350  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection()::close() on transport: %C to %C because of reconnect failure.\n",
351  config_name().c_str(), LogAddr(remote_address_).c_str()));
352  }
353 
354  if (conn_retry_counter_ >= cfg->conn_retry_attempts_) {
356  } else {
358  if (transport) {
359  transport->connector_.close();
361  }
362  }
363  } else {
365  if (ss) {
366  ss->terminate_send();
367  }
368 
369  disconnect();
370  }
371 
372  return 0;
373 }
374 
375 const std::string&
377 {
378  static const std::string null_name("(couldn't get name)");
379  TcpInst_rch cfg = tcp_config_.lock();
380  return cfg ? cfg->name() : null_name;
381 }
382 
383 int
385 {
386  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
387 
388  DBG_ENTRY_LVL("TcpConnection","handle_close",6);
389 
390  if (DCPS_debug_level >= 1) {
391  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C , reconnect_state = %C.\n",
393  }
394 
395  GuardType guard(reconnect_lock_);
396  TcpDataLink_rch link = link_;
397 
398  if (!link) {
399  if (DCPS_debug_level >= 1) {
400  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() link is null.\n"));
401  }
402  return 0;
403  }
404 
407 
408  const bool graceful = receive_strategy && receive_strategy->gracefully_disconnected();
409 
410  if (send_strategy) {
411  if (graceful) {
412  send_strategy->terminate_send();
413  } else {
414  send_strategy->suspend_send();
415  }
416  }
417 
418  this->disconnect();
419 
420  if (graceful) {
422  } else if (this->is_connector_) {
423  this->active_reconnect_i();
424  } else {
425  this->passive_reconnect_i();
426  }
427 
428  return 0;
429 }
430 
431 void
433 {
434 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
435  int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
436  int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
437  //ACE_SOCK_Stream sock = ACE_static_cast(ACE_SOCK_Stream, this->peer() );
438 # if !defined (ACE_LACKS_SOCKET_BUFSIZ)
439 
440  // A little screwy double negative logic: disabling nagle involves
441  // enabling TCP_NODELAY
442  int opt = (tcp_config->enable_nagle_algorithm_ == false);
443 
444  if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
445  ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n"));
446  }
447 
448  if (this->peer().set_option(SOL_SOCKET,
449  SO_SNDBUF,
450  (void *) &snd_size,
451  sizeof(snd_size)) == -1
452  && errno != ENOTSUP) {
454  "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
455  snd_size));
456  return;
457  }
458 
459  if (this->peer().set_option(SOL_SOCKET,
460  SO_RCVBUF,
461  (void *) &rcv_size,
462  sizeof(int)) == -1
463  && errno != ENOTSUP) {
465  "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m\n",
466  rcv_size));
467  return;
468  }
469 
470 # else
471  ACE_UNUSED_ARG(tcp_config);
472  ACE_UNUSED_ARG(snd_size);
473  ACE_UNUSED_ARG(rcv_size);
474 # endif /* !ACE_LACKS_SOCKET_BUFSIZ */
475 
476 #else
477  ACE_UNUSED_ARG(tcp_config);
478 #endif /* !ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
479 }
480 
481 int
483 {
484 
485  // Set the DiffServ codepoint according to the priority value.
487  this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer());
488 
489  TcpInst_rch cfg = tcp_config_.lock();
490  if (!cfg) {
491  if (log_level >= LogLevel::Notice) {
492  ACE_ERROR((LM_NOTICE, "((%P|%t)) NOTICE: TcpConnection::on_active_connection_established() - Invalid Transport Instance.\n"));
493  }
494  return -1;
495  }
496  set_sock_options(cfg);
497 
498  // In order to complete the connection establishment from the active
499  // side, we need to tell the remote side about our public address.
500  // It will use that as an "identifier" of sorts. To the other
501  // (passive) side, our local_address that we send here will be known
502  // as the remote_address.
503  const std::string address = cfg->get_public_address();
504 
505  if (DCPS_debug_level >= 2) {
507  "(%P|%t) TcpConnection::on_active_connection_established: "
508  "Sending public address <%C> to remote side\n",
509  address.c_str()));
510  }
511 
512  ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1;
513 
514  ACE_UINT32 nlen = htonl(len);
515 
516  if (this->peer().send_n(&nlen, sizeof(ACE_UINT32)) == -1) {
517  if (DCPS_debug_level >= 2) {
519  "(%P|%t) WARNING: TcpConnection::on_active_connection_established: "
520  "Unable to send address string length to "
521  "the passive side to complete the active connection "
522  "establishment.\n"));
523  }
524  return -1;
525  }
526 
527  if (this->peer().send_n(address.c_str(), len) == -1) {
528  if (DCPS_debug_level >= 2) {
530  "(%P|%t) WARNING: TcpConnection::on_active_connection_established: "
531  "Unable to send our address to "
532  "the passive side to complete the active connection "
533  "establishment.\n"));
534  }
535  return -1;
536  }
537 
538  ACE_UINT32 npriority = htonl(this->transport_priority_);
539 
540  if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) {
541  if (DCPS_debug_level >= 2) {
543  "(%P|%t) WARNING: TcpConnection::on_active_connection_established: "
544  "Unable to send publication priority to "
545  "the passive side to complete the active connection "
546  "establishment.\n"));
547  }
548  return -1;
549  }
550 
551  return 0;
552 }
553 
554 // This method is called on acceptor side when the lost connection is detected.
555 // A timer is scheduled to check if a new connection is created within the
556 // passive_reconnect_duration_ period.
557 void
559 {
560  DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6);
561 
562  if (this->shutdown_) {
563  return;
564  }
565 
566  TcpInst_rch cfg = tcp_config_.lock();
567  if (!cfg) {
568  return;
569  }
570 
571  if (this->reconnect_state_ == INIT_STATE) {
572  // Mark the connection lost since the recv/send just failed.
573  // this->connected_ = false;
574 
575  if (cfg->passive_reconnect_duration_ == 0)
576  return;
577 
580 
581  TimeDuration delay = TimeDuration::from_msec(cfg->passive_reconnect_duration_);
582  this->reactor()->schedule_timer(this, 0, delay.value(), TimeDuration::zero_value.value());
583  }
584 }
585 
586 // This is the active reconnect implementation. The backoff algorithm is used as the
587 // reconnect strategy. e.g.
588 // With conn_retry_initial_interval = 500, conn_retry_backoff_multiplier = 2.0 and
589 // conn_retry_attempts = 6 the reconnect attempts will be:
590 // - first at 0 seconds(upon detection of the disconnect)
591 // - second at 0.5 seconds
592 // - third at 1.0 (2*0.5) seconds
593 // - fourth at 2.0 (2*1.0) seconds
594 // - fifth at 4.0 (2*2.0) seconds
595 // - sixth at 8.0 (2*4.0) seconds
596 void
598 {
599  DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6);
600 
601  if (this->link_->is_release_pending()) {
602  return;
603  }
604 
605  if (this->shutdown_) {
606  return;
607  }
608 
609  TcpInst_rch cfg = tcp_config_.lock();
610  if (!cfg) {
611  return;
612  }
613 
614  if (this->conn_retry_counter_ < cfg->conn_retry_attempts_ ) {
616  if (this->conn_retry_counter_ == 0)
618 
619  double retry_delay_msec = cfg->conn_retry_initial_delay_;
620  retry_delay_msec *= std::pow(cfg->conn_retry_backoff_multiplier_, this->conn_retry_counter_);
621 
622  if (DCPS_debug_level >= 1) {
623  ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::"
624  "active_reconnect_i(%C->%C) reconnect_state = %C, conn_retry_counter_=%d, retry_delay_msec=%f\n",
625  LogAddr(remote_address_).c_str(), LogAddr(local_address_).c_str(),
626  reconnect_state_string(), this->conn_retry_counter_, retry_delay_msec));
627  }
628 
629  ACE_Time_Value timeout;
630  timeout.msec(static_cast<int>(retry_delay_msec));
631 
632  TcpConnection* pconn = this;
633  int ret = -1;
634  errno = ENODEV;
635  {
637  if (transport) {
639  ACE_Guard<ACE_Reverse_Lock<LockType> > guard(rev_lock);
640  // We need to temporarily release the lock here because the connect could occasionally be synchronous
641  // if the source and destination are on the same host. When the call become synchronous, active_reconnect_open()
642  // would be called and try to acquired the lock in the same thread.
643  ret = transport->connector_.connect(pconn, this->remote_address_, ACE_Synch_Options::asynch);
644  }
645  }
646 
647  if (ret == -1 && errno != EWOULDBLOCK)
648  {
649  if (errno == EALREADY) {
650  // This could happen on Windows, it may due to the close() on non-blocking socket needs more time to complete.
651  // In this case, we just wait another second to initiate the connect again without incrementing the conn_retry_counter_.
652  timeout.sec(1);
654  if (DCPS_debug_level >= 1) {
655  ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TcpConnection::"
656  "active_reconnect_i() socket operation is already in progress, wait another second to initiate the connect\n"));
657  }
658  } else {
659  ACE_ERROR((LM_ERROR, "(%P|%t) TcpConnection::active_reconnect_i error %m.\n"));
660  }
662  }
663 
664  this->reactor()->schedule_timer(this, 0, timeout);
665  this->conn_retry_counter_ ++;
666  } else {
667  this->handle_stop_reconnecting();
668  }
669 }
670 
671 void
673 {
674  if (link_) {
678  if (send_strategy) {
679  send_strategy->terminate_send();
680  }
681  }
682 }
683 
684 void
686 {
689  TcpInst_rch cfg = tcp_config_.lock();
690  if (cfg && cfg->conn_retry_attempts_ > 0) {
691  ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C.\n",
692  config_name().c_str(), LogAddr(remote_address_).c_str()));
693  } else {
694  ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C.\n",
695  config_name().c_str(), LogAddr(remote_address_).c_str()));
696  }
697 }
698 
699 /// A timer is scheduled on acceptor side to check if a new connection
700 /// is accepted after the connection is lost.
701 int
703  const void *)
704 {
705  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
706 
707  DBG_ENTRY_LVL("TcpConnection","handle_timeout",6);
708  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, this->reconnect_state_ = %C\n", reconnect_state_string()));
709  GuardType guard(this->reconnect_lock_);
710 
711  switch (this->reconnect_state_) {
712  case PASSIVE_WAITING_STATE: {
713  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C.\n",
714  config_name().c_str(), LogAddr(remote_address_).c_str()));
715 
717  // We stay in PASSIVE_TIMEOUT_CALLED_STATE indicates there is no new connection.
718  // Now we need declare the connection is lost.
719  this->notify_connection_lost();
721  this->tear_link();
722  }
723  break;
724 
725  case RECONNECTED_STATE:
726  // reconnected successfully.
727  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C.\n",
728  config_name().c_str(), LogAddr(remote_address_).c_str()));
729  break;
730 
731  case INIT_STATE: {
732  // couldn't initialize connection successfully.
733  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, failed connection initialization due to timeout.: %C to %C.\n",
734  config_name().c_str(), LogAddr(remote_address_).c_str()));
735 
736  // build key and remove from service
737  const bool is_loop(local_address_ == remote_address_);
739  is_loop, true /* active */);
740 
742  if (transport) {
743  transport->async_connect_failed(key);
744  }
745  break;
746  }
748  // we get the timeout before the network stack reports the destination is unreachable
749  // cancel the async connect operation and retry it.
750  {
752  if (transport) {
753  transport->connector_.cancel(this);
754  }
755  }
756  this->active_reconnect_i();
757  break;
758  }
760  this->active_reconnect_i();
761  break;
762  case LOST_STATE:
763  break;
764  default :
766  ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ")
767  ACE_TEXT(" unknown state or it should not be in state = %d\n"),
769  break;
770  }
771 
772  return 0;
773 }
774 
775 /// This object would be "old" connection object and the provided is the new
776 /// connection object. The "old" connection object will copy its states to
777 /// to the "new" connection object. This is called by the TcpDataLink
778 /// when a new connection is accepted (with a new TcpConnection object).
779 /// We need make the state in "new" connection object consistent with the "old"
780 /// connection object.
781 void
783 {
784  DBG_ENTRY_LVL("TcpConnection","transfer",6);
785  GuardType guard(reconnect_lock_);
786 
787  if (shutdown_) {
788  return;
789  }
790 
791  bool notify_reconnect = false;
792 
793  switch (this->reconnect_state_) {
794  case INIT_STATE:
795  // We have not detected the lost connection and the peer is faster than us and
796  // re-established the connection. so do not notify reconnected.
797  break;
798 
799  case LOST_STATE:
800 
801  // The reconnect timed out.
803  // TODO: If the handle_timeout is called before the old connection
804  // transfer its state to new connection then should we disconnect
805  // the new connection or keep it alive ?
806  // I think we should keep the connection, the user will get a
807  // lost connection notification and then a reconnected notification.
808  notify_reconnect = true;
809  break;
810 
811  case PASSIVE_WAITING_STATE: {
812  // we just let the timer expires by itself. When the timer
813  // expires, it already transitions to the RECONNECTED_STATE,
814  // and do nothing in handle_timeout(). We don't need to delete
815  // the timer explicitly.
816  notify_reconnect = true;
817  }
818  break;
819 
820  default :
822  ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
823  ACE_TEXT(" unknown state or it should not be in state=%i\n"),
825  break;
826  }
827 
828  // Verify if this acceptor side.
829  if (this->is_connector_ || connection->is_connector_) {
831  ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
832  ACE_TEXT(" should NOT be called by the connector side\n")));
833  }
834 
835  connection->remote_address_ = this->remote_address_;
836  connection->local_address_ = this->local_address_;
837  connection->tcp_config_ = this->tcp_config_;
838  connection->link_ = this->link_;
839  connection->impl_ = this->impl_;
840 
841  VDBG((LM_DEBUG, "(%P|%t) DBG: transfer(%C->%C) passive reconnected. new con %@ old con %@\n",
842  LogAddr(remote_address_).c_str(), LogAddr(local_address_).c_str(),
843  connection, this));
844 
845  if (notify_reconnect) {
848  }
849 
850 }
851 
852 /// This function is called when the backpressure occurs and timed out after
853 /// "max_output_pause_period". The lost connection notification should be sent
854 /// and the connection needs be closed since we declared it as a "lost"
855 /// connection.
856 void
858 {
859  DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6);
860  bool notify_lost = false;
861  {
862  GuardType guard(this->reconnect_lock_);
863 
864  if (this->reconnect_state_ == INIT_STATE) {
866  notify_lost = true;
867 
868  }
869  }
870 
871  if (notify_lost) {
872  this->disconnect();
873  this->notify_connection_lost();
874  }
875 
876 }
877 
878 /// This is called by TcpSendStrategy when a send fails
879 /// and a reconnect should be initiated. This method
880 /// suspends any sends and kicks the reconnect thread into
881 /// action.
882 void
884 {
885  DBG_ENTRY_LVL("TcpConnection","relink_from_send",6);
886 
888  if (do_suspend && send_strategy)
889  send_strategy->suspend_send();
890 }
891 
892 /// This is called by TcpReceiveStrategy when a disconnect
893 /// is detected. It simply suspends any sends and lets
894 /// the handle_close() handle the reconnect logic.
895 void
897 {
898  DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6);
900  if (do_suspend && send_strategy)
901  send_strategy->suspend_send();
902 }
903 
904 void
906 {
907  DBG_ENTRY_LVL("TcpConnection","tear_link",6);
908 
909  return link_->release_resources();
910 }
911 
912 void
914 {
915  DBG_ENTRY_LVL("TcpConnection", "shutdown", 6);
916 
917  GuardType guard(reconnect_lock_);
918  shutdown_ = true;
919 
921 }
922 
925 {
927  return 1;
928 }
929 
932 {
934  return 1;
935 }
936 
938 {
939  switch (reconnect_state_) {
940  case INIT_STATE:
941  return "INIT_STATE";
942  case LOST_STATE:
943  return "LOST_STATE";
944  case RECONNECTED_STATE:
945  return "RECONNECTED_STATE";
947  return "ACTIVE_RECONNECTING_STATE";
949  return "ACTIVE_WAITING_STATE";
951  return "PASSIVE_WAITING_STATE";
953  return "PASSIVE_TIMEOUT_CALLED_STATE";
954  default:
955  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: TcpConnection::reconnect_state_string: ")
956  ACE_TEXT("%d is either invalid or not recognized.\n"),
958  return "Invalid reconnect state";
959  }
960 }
961 
962 
963 int
965 {
966  DBG_ENTRY_LVL("TcpConnection","active_reconnect_open",6);
967 
968  ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C.\n",
969  config_name().c_str(), LogAddr(remote_address_).c_str()));
970 
971  GuardType guard(reconnect_lock_);
972 
973  if (shutdown_) {
974  return 0;
975  }
976 
977  if (on_active_connection_established() == -1) {
978  return -1;
979  }
980 
981  int result = reactor()->register_handler(this, READ_MASK);
982  if (result == -1) {
984  "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_open() can't register "
985  "with reactor %X %p\n", this, ACE_TEXT("register_handler")),
986  -1);
987  }
988 
991  this->send_strategy()->resume_send();
992  this->conn_retry_counter_ = 0;
993 
994  return 0;
995 }
996 
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
virtual int connect(SVC_HANDLER *&svc_handler, const typename PEER_CONNECTOR::PEER_ADDR &remote_addr, const ACE_Synch_Options &synch_options=ACE_Synch_Options::defaults, const typename PEER_CONNECTOR::PEER_ADDR &local_addr=reinterpret_cast< const peer_addr_type & >(peer_addr_type::sap_any), int reuse_addr=0, int flags=O_RDWR, int perms=0)
Reference_Counting_Policy & reference_counting_policy(void)
#define ENOTSUP
ssize_t send_n(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0, size_t *bytes_transferred=0)
size_t length(void) const
unsigned long ACE_Reactor_Mask
void passive_connection(const ACE_INET_Addr &remote_address, const TcpConnection_rch &connection)
TcpDataLink_rch link_
Datalink object which is needed for connection lost callback.
LockType reconnect_lock_
Lock to synchronize state between reactor and non-reactor threads.
void reset(void)
sequence< octet > key
Encapsulate a priority value and internet address as a key.
Definition: PriorityKey.h:52
bool shutdown_
shutdown flag
void * memcpy(void *t, const void *s, size_t len)
virtual short codepoint() const
Access the mapped DiffServ codepoint value.
std::size_t id_
Small unique identifying value.
const ACE_Time_Value & value() const
virtual ACE_Event_Handler::Reference_Count add_reference()
TcpReceiveStrategy_rch receive_strategy()
int ssize_t
ACE_Message_Block passive_setup_buffer_
static ACE_Synch_Options asynch
char * rd_ptr(void) const
const std::string & config_name() const
#define SOL_SOCKET
Priority transport_priority_
TRANSPORT_PRIORITY.value policy value.
TcpConnection()
Passive side constructor (acceptor)
virtual void _add_ref()
Definition: RcObject.h:69
void shutdown(void)
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
virtual int open(void *arg)
#define SO_SNDBUF
TcpTransport * transport_during_setup_
LM_DEBUG
#define TCP_NODELAY
void notify(ConnectionNotice notice)
Definition: DataLink.cpp:848
static TimeDuration from_msec(const ACE_UINT64 &ms)
#define VDBG(DBG_ARGS)
void async_connect_failed(const PriorityKey &key)
TcpSendStrategy_rch send_strategy()
virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask)
size_t size(void) const
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
int on_active_connection_established()
Handle the logic after an active connection has been established.
void set_dscp_codepoint(int cp, ACE_SOCK &socket)
Definition: DataLink.cpp:1115
virtual ACE_Event_Handler::Reference_Count remove_reference()
#define IPPROTO_TCP
TcpReceiveStrategy_rch receive_strategy()
Defines a wrapper around address info which is used for advertise.
LM_NOTICE
virtual int handle_output(ACE_HANDLE)
Handle back pressure when sending.
TcpTransport_rch impl_
Impl object which is needed for connection objects and reconnect task.
virtual int priority(void) const
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
#define ETIME
time_t sec(void) const
char * wr_ptr(void) const
LM_WARNING
RcHandle< TcpTransport > transport()
Definition: TcpAcceptor.cpp:32
virtual int cancel(SVC_HANDLER *svc_handler)
#define ACE_DEFAULT_MAX_SOCKET_BUFSIZ
virtual void _remove_ref()
Definition: RcObject.h:74
virtual ACE_Reactor * reactor(void) const
void transfer(TcpConnection *connection)
void relink_from_send(bool do_suspend)
Reconnect initiated by send strategy.
ACE_TEXT("TCP_Factory")
size_t space(void) const
static const TimeDuration zero_value
Definition: TimeDuration.h:31
map TRANSPORT_PRIORITY values directly.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_CDR::Long Priority
OpenDDS_Dcps_Export LogLevel log_level
unsigned long msec(void) const
WeakRcHandle< TcpInst > tcp_config_
The configuration used by this connection.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define EWOULDBLOCK
int handle_timeout(const ACE_Time_Value &tv, const void *arg)
#define EALREADY
ACE_INET_Addr remote_address_
Remote address.
static const ACE_Time_Value zero
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ReconnectState reconnect_state_
The state indicates each step of the reconnecting.
void relink_from_recv(bool do_suspend)
Reconnect initiated by receive strategy.
Connector connector_
Open TcpConnections using non-blocking connect.
Definition: TcpTransport.h:152
int connect_tcp_datalink(TcpDataLink &link, const TcpConnection_rch &connection)
Common code used by accept_datalink(), passive_connection(), and active completion.
int handle_setup_input(ACE_HANDLE h)
#define SO_RCVBUF
TcpSendStrategy_rch send_strategy()
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
virtual int close(void)
bool is_release_pending() const
Get release pending flag.
#define TheServiceParticipant
const char * reconnect_state_string() const
Get name of the current reconnect state as a string.
LM_ERROR
char * base(void) const
void set_sock_options(const TcpInst_rch &tcp_config)
virtual int handle_input(ACE_HANDLE)
We pass this "event" along to the receive_strategy.
bool is_connector_
Flag indicate this connection object is the connector or acceptor.
ACE_INET_Addr local_address_
Local address.
void set_datalink(const TcpDataLink_rch &link)