OpenDDS  Snapshot(2023/04/28-20:55)
TcpTransport.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "TcpTransport.h"
9 #include "TcpAcceptor.h"
10 #include "TcpSendStrategy.h"
11 #include "TcpReceiveStrategy.h"
12 #include "TcpInst.h"
13 #include "TcpDataLink.h"
14 #include "TcpSynchResource.h"
15 #include "TcpConnection.h"
16 
18 #include <dds/DCPS/ReactorTask.h>
22 #include <dds/DCPS/debug.h>
23 #include <dds/DCPS/GuidConverter.h>
24 #include <dds/DCPS/LogAddr.h>
27 #include <dds/DCPS/RcHandle_T.h>
28 
29 #include <sstream>
30 
32 
33 namespace OpenDDS {
34 namespace DCPS {
35 
37  : TransportImpl(inst)
38  , acceptor_(new TcpAcceptor(RcHandle<TcpTransport>(this, inc_count())))
39  , last_link_(0)
40 {
41  DBG_ENTRY_LVL("TcpTransport","TcpTransport",6);
42 
43  if (!(configure_i(inst) && open())) {
44  this->shutdown();
46  }
47 
48 }
49 
51 {
52  DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6);
53 }
54 
55 
58 {
60 }
61 
64  Priority priority,
65  bool active)
66 {
67  const ACE_INET_Addr remote_address = AssociationData::get_remote_address(remote);
68  TcpInst_rch cfg = config();
69  const bool is_loopback = cfg && (remote_address == cfg->local_address());
70  return PriorityKey(priority, remote_address, is_loopback, active);
71 }
72 
75  const ConnectionAttribs& attribs,
76  const TransportClient_rch& client)
77 {
78  DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6);
79 
80  if (is_shut_down()) {
81  return AcceptConnectResult();
82  }
83 
84  const PriorityKey key =
85  blob_to_key(remote.blob_, attribs.priority_, true /*active*/);
86 
87  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey "
88  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
89  key.priority(), LogAddr(key.address()).c_str(), key.is_loopback(),
90  key.is_active()), 0);
91 
92  TcpDataLink_rch link;
93  {
94  GuardType guard(links_lock_);
95 
96  if (find_datalink_i(key, link)) {
97  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0);
98  link->add_on_start_callback(client, remote.repo_id_);
99  add_pending_connection(client, link);
100  link->do_association_actions();
102  }
103 
104  link = make_rch<TcpDataLink>(rchandle_from(this), key.address(), attribs.priority_,
105  key.is_loopback(), true /*active*/);
106  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0);
107  if (links_.bind(key, link) != 0 /*OK*/) {
108  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
109  "Unable to bind new TcpDataLink[%@] to "
110  "TcpTransport in links_ map.\n", link.in()));
111  return AcceptConnectResult();
112  }
113 
114  link->add_on_start_callback(client, remote.repo_id_);
115  add_pending_connection(client, link);
116  }
117 
118  int ret = -1; // Default to failure in case config() is returns an invalid RCH
119  errno = EINVAL; // Anything other than EWOULDBLOCK
120 
121  TcpInst_rch cfg = config();
122  if (cfg) {
123  TcpConnection_rch connection(make_rch<TcpConnection>(key.address(), link->transport_priority(), cfg));
124  connection->set_datalink(link);
125 
126  TcpConnection* pConn = connection.in();
127 
128  // Can't make this call while holding onto TransportClient::lock_
129  ACE_Time_Value conn_timeout;
130  conn_timeout.msec(cfg->active_conn_timeout_period_);
131 
133  }
134 
135  if (ret == -1 && errno != EWOULDBLOCK) {
136 
137  VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2);
138  ACE_ERROR((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"));
139  //If the connection fails and, in the interim between releasing
140  //lock and re-acquiring to remove the failed link, another association may have found
141  //the datalink in links_ (always using find_datalink_i) so must allow the other
142  //association to either try to connect again (might succeed for it)
143  //or try another transport. If find_datalink_i was called for this datalink, an
144  //on_start_callback will be registered and can be invoked.
145  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0);
146  {
147  GuardType guard(links_lock_);
148  if (links_.unbind(key, link) != 0 /*OK*/) {
149  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
150  "Unable to unbind failed TcpDataLink[%@] from "
151  "TcpTransport links_ map.\n", link.in()));
152  }
153  }
154  link->invoke_on_start_callbacks(false);
155 
156  return AcceptConnectResult();
157  }
158 
159  if (ret == 0) {
160  // connect() completed synchronously and called TcpConnection::active_open().
161  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink "
162  "completed synchronously.\n"), 0);
164  }
165 
166  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0);
168 }
169 
170 void
172 {
173  if (DCPS_debug_level >= 2) {
174  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: Failed to make active connection.\n"));
175  }
176  GuardType guard(links_lock_);
177  TcpDataLink_rch link;
178  links_.find(key, link);
179  links_.unbind(key);
180  guard.release();
181 
182  if (link.in()) {
183  link->invoke_on_start_callbacks(false);
184  }
185 }
186 
187 //Called with links_lock_ held
188 bool
190 {
191  DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6);
192 
193  if (links_.find(key, link) == 0 /*OK*/) {
194  return true;
195  } else if (pending_release_links_.find(key, link) == 0 /*OK*/) {
196  if (link->cancel_release()) {
197  link->set_release_pending(false);
198 
199  if (pending_release_links_.unbind(key, link) == 0 /*OK*/
200  && links_.bind(key, link) == 0 /*OK*/) {
201  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
202  ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0);
203  return true;
204  }
205  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
206  ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0);
207  } else {
208  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
209  ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0);
210  }
211  link.reset(); // don't return link to TransportClient
212  return false;
213  }
214 
215  return false;
216 }
217 
220  const ConnectionAttribs& attribs,
221  const TransportClient_rch& client)
222 {
223  DBG_ENTRY_LVL("TcpTransport", "accept_datalink", 6);
224 
225  if (is_shut_down()) {
226  return AcceptConnectResult();
227  }
228 
229 
230  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C "
231  "accepting connection from remote %C\n",
232  LogGuid(attribs.local_id_).c_str(),
233  LogGuid(remote.repo_id_).c_str()), 5);
234 
235  const PriorityKey key =
236  blob_to_key(remote.blob_, attribs.priority_, false /* !active */);
237 
238  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey "
239  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n", attribs.priority_,
240  LogAddr(key.address()).c_str(), key.is_loopback(), key.is_active()), 2);
241 
242  TcpDataLink_rch link;
243  {
244  GuardType guard(links_lock_);
245 
246  if (find_datalink_i(key, link)) {
247  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink found datalink link[%@]\n", link.in()), 0);
248  link->add_on_start_callback(client, remote.repo_id_);
249  add_pending_connection(client, link);
250  guard.release();
251  link->do_association_actions();
253  }
254 
255  link = make_rch<TcpDataLink>(rchandle_from(this), key.address(), key.priority(),
256  key.is_loopback(), key.is_active());
257  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink create new link[%@]\n", link.in()), 0);
258  if (links_.bind(key, link) != 0 /*OK*/) {
260  "(%P|%t) ERROR: TcpTransport::accept_datalink "
261  "Unable to bind new TcpDataLink[%@] to "
262  "TcpTransport in links_ map.\n", link.in()));
263  return AcceptConnectResult();
264  }
265 
266  link->add_on_start_callback(client, remote.repo_id_);
267  add_pending_connection(client, link);
268  }
269 
270  TcpConnection_rch connection;
271  {
273  const ConnectionMap::iterator iter = connections_.find(key);
274 
275  if (iter != connections_.end()) {
276  connection = iter->second;
277  connections_.erase(iter);
278  }
279  }
280 
281  if (connection.is_nil()) {
283  }
284 
285  if (connect_tcp_datalink(*link, connection) == -1) {
286  GuardType guard(links_lock_);
287  links_.unbind(key);
288  link.reset();
289  }
290 
291  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
292  "connected link %@.\n", link.in()), 2);
294 }
295 
296 void
298  const GUID_t& remote_id,
299  bool /*disassociate*/,
300  bool /*association_failed*/)
301 {
302  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting "
303  "stop connecting to remote: %C\n",
304  LogGuid(remote_id).c_str()), 5);
305 
307  typedef PendConnMap::iterator iter_t;
308  const std::pair<iter_t, iter_t> range =
309  pending_connections_.equal_range(client);
310 
311  for (iter_t iter = range.first; iter != range.second; ++iter) {
312  iter->second->remove_on_start_callback(client, remote_id);
313  }
314 
315  pending_connections_.erase(range.first, range.second);
316 }
317 
318 bool
320 {
321  DBG_ENTRY_LVL("TcpTransport", "configure_i", 6);
322 
323  if (!config) {
324  return false;
325  }
326 
327  this->create_reactor_task(false, "TcpTransport" + config->name());
328 
329  connector_.open(reactor_task()->get_reactor());
330 
331  // Override with DCPSDefaultAddress.
332  if (config->local_address() == ACE_INET_Addr() &&
333  TheServiceParticipant->default_address().to_addr() != ACE_INET_Addr()) {
335  ACE_TEXT("(%P|%t) TcpTransport::configure_i overriding with DCPSDefaultAddress\n")), 2);
336 
337  config->local_address(TheServiceParticipant->default_address().to_addr());
338  }
339 
340  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::configure_i opening acceptor for %C on %C\n"),
341  config->local_address_string().c_str(), LogAddr(config->local_address()).c_str()), 2);
342 
343  // Open our acceptor object so that we can accept passive connections
344  // on our config->local_address_.
345  if (this->acceptor_->open(config->local_address(),
346  this->reactor_task()->get_reactor()) != 0) {
347  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C: %p\n"),
348  LogAddr(config->local_address()).c_str(), ACE_TEXT("open")), false);
349  }
350 
351  // update the port number (incase port zero was given).
352  ACE_INET_Addr address;
353 
354  if (this->acceptor_->acceptor().get_local_addr(address) != 0) {
356  ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ")
357  ACE_TEXT("- %p"),
358  ACE_TEXT("cannot get local addr\n")));
359  }
360 
361  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C\n"),
362  LogAddr(address).c_str()), 2);
363 
364  const unsigned short port = address.get_port_number();
365 
366  // As default, the acceptor will be listening on INADDR_ANY but advertise with the fully
367  // qualified hostname and actual listening port number.
368  if (config->local_address().is_any()) {
369  const std::string hostname = get_fully_qualified_hostname();
370  config->local_address(port, hostname.c_str());
371  if (config->local_address() == ACE_INET_Addr()) {
373  ACE_TEXT("(%P|%t) ERROR: Failed to resolve a local address using fully qualified hostname '%C'\n"),
374  hostname.c_str()),
375  false);
376  }
377  }
378 
379  // Now we got the actual listening port. Update the port number in the configuration
380  // if it's 0 originally.
381  else if (config->local_address().get_port_number() == 0) {
382  config->local_address_set_port(port);
383  }
384 
385  // Ahhh... The sweet smell of success!
386  return true;
387 }
388 
389 void
391 {
392  GuardType guard(links_lock_);
393 
394  AddrLinkMap::ENTRY* entry;
395 
396  for (AddrLinkMap::ITERATOR itr(links_); itr.next(entry); itr.advance()) {
397  entry->int_id_->client_stop(local_id);
398  }
399 
400  for (AddrLinkMap::ITERATOR itr(pending_release_links_); itr.next(entry); itr.advance()) {
401  entry->int_id_->client_stop(local_id);
402  }
403 }
404 
405 void
407 {
408  DBG_ENTRY_LVL("TcpTransport","shutdown_i",6);
409 
410  {
411  GuardType guard(links_lock_);
412 
413  AddrLinkMap::ENTRY* entry;
414 
415  for (AddrLinkMap::ITERATOR itr(links_);
416  itr.next(entry);
417  itr.advance()) {
418  entry->int_id_->pre_stop_i();
419  }
420  }
421 
422  // Don't accept any more connections.
423  acceptor_->close();
424  acceptor_->transport_shutdown();
425 
426  {
427  {
429 
430  for (ConnectionMap::iterator it = connections_.begin(); it != connections_.end(); ++it) {
431  it->second->shutdown();
432  }
433  connections_.clear();
434  }
435  {
437  pending_connections_.clear();
438  }
439  }
440 
441  // Disconnect all of our DataLinks, and clear our links_ collection.
442  {
443  GuardType guard(links_lock_);
444 
445  AddrLinkMap::ENTRY* entry;
446 
447  for (AddrLinkMap::ITERATOR itr(links_);
448  itr.next(entry);
449  itr.advance()) {
450  entry->int_id_->transport_shutdown();
451  }
452 
453  links_.unbind_all();
454 
456  itr.next(entry);
457  itr.advance()) {
458  entry->int_id_->transport_shutdown();
459  }
460 
462  }
463 
464  // Tell our acceptor about this event so that it can drop its reference
465  // it holds to this TcpTransport object (via smart-pointer).
466  acceptor_->transport_shutdown();
467 }
468 
469 bool
471 {
472  DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6);
473 
474  TcpInst_rch cfg = config();
475  if (cfg) {
476  VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address string <%C>\n",
477  cfg->get_public_address().c_str()), 2);
478 
479  cfg->populate_locator(local_info, flags);
480  return true;
481  }
482 
483  return false;
484 }
485 
486 void
488 {
489  DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6);
490 
491  TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
492 
493  if (tcp_link == 0) {
494  // Really an assertion failure
496  "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to "
497  "TcpDataLink.\n"));
498  return;
499  }
500 
501  TcpDataLink_rch released_link;
502 
503  // Possible actions that will be taken to release the link.
504  enum LinkAction { None, StopLink, ScheduleLinkRelease };
505  LinkAction linkAction = None;
506 
507  // Scope for locking to protect the links (and pending_release) containers.
508  GuardType guard(this->links_lock_);
509 
510  // Attempt to remove the TcpDataLink from our links_ map.
512  tcp_link->transport_priority(),
513  tcp_link->remote_address(),
514  tcp_link->is_loopback(),
515  tcp_link->is_active());
516 
518  "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey "
519  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
520  link,
521  tcp_link->transport_priority(),
522  LogAddr(tcp_link->remote_address()).c_str(),
523  (int)tcp_link->is_loopback(),
524  (int)tcp_link->is_active()), 2);
525 
526  if (this->links_.unbind(key, released_link) != 0) {
527  //No op
528  } else if (link->datalink_release_delay() > TimeDuration::zero_value) {
529  link->set_scheduling_release(true);
530 
532  "(%P|%t) TcpTransport::release_datalink datalink_release_delay "
533  "is %C\n",
534  link->datalink_release_delay().str().c_str()), 4);
535 
536  // Atomic value update, safe to perform here.
537  released_link->set_release_pending(true);
538 
539  switch (this->pending_release_links_.bind(key, released_link)) {
540  case -1:
542  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
543  "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind")));
544  linkAction = StopLink;
545  break;
546 
547  case 1:
549  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
550  "pending_release_links_ map: already bound\n", released_link.in()));
551  linkAction = StopLink;
552  break;
553 
554  case 0:
555  linkAction = ScheduleLinkRelease;
556  break;
557 
558  default:
559  break;
560  }
561 
562  } else { // datalink_release_delay_ is 0
563  link->set_scheduling_release(true);
564 
565  linkAction = StopLink;
566  }
567 
568  // Actions are executed outside of the lock scope.
569  switch (linkAction) {
570  case StopLink:
572  break;
573 
574  case ScheduleLinkRelease:
575  link->schedule_delayed_release();
576  break;
577 
578  case None:
579  break;
580  }
581 
582  if (DCPS_debug_level > 9) {
583  std::stringstream buffer;
584  buffer << *link;
586  ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ")
587  ACE_TEXT("link[%@] with priority %d released.\n%C"),
588  link,
589  link->transport_priority(),
590  buffer.str().c_str()));
591  }
592 }
593 
594 /// This method is called by a TcpConnection object that has been
595 /// created and opened by our acceptor_ as a result of passively
596 /// accepting a connection on our local address. Ultimately, the connection
597 /// object needs to be paired with a DataLink object that is (or will be)
598 /// expecting this passive connection to be established.
599 void
601  const TcpConnection_rch& connection)
602 {
603  DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6);
604 
605  if (is_shut_down()) {
606  return;
607  }
608 
609  TcpInst_rch cfg = config();
610  if (!cfg) {
611  return;
612  }
613 
614  const PriorityKey key(connection->transport_priority(),
615  remote_address,
616  remote_address == cfg->local_address(),
617  connection->is_connector());
618 
619  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
620  ACE_TEXT("established with %C.\n"),
621  LogAddr(remote_address).c_str()), 2);
622 
623  GuardType connection_guard(connections_lock_);
624  TcpDataLink_rch link;
625  {
626  GuardType guard(links_lock_);
627  links_.find(key, link);
628  }
629 
630  if (!link.is_nil()) {
631  connection_guard.release();
632 
633  if (connect_tcp_datalink(*link, connection) == -1) {
635  ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
636  ACE_TEXT("ERROR: connect_tcp_datalink failed\n")), 5);
637  GuardType guard(links_lock_);
638  links_.unbind(key);
639 
640  } else {
641  this->fresh_link(connection);
642  }
643 
644  return;
645  }
646 
647  // If we reach this point, this link was not in links_, so the
648  // accept_datalink() call hasn't happened yet. Store in connections_ for the
649  // accept_datalink() method to find.
651  ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - # of before connections: %d\n"),
652  connections_.size()), 5);
653  const ConnectionMap::iterator where = connections_.find(key);
654 
655  if (where != connections_.end()) {
657  ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
658  ACE_TEXT("ERROR: connection with %C at priority %d already exists, ")
659  ACE_TEXT("overwriting previously established connection.\n"),
660  LogAddr(remote_address).c_str(),
661  connection->transport_priority()));
662  }
663 
664  connections_[key] = connection;
666  ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - # of after connections: %d\n"),
667  connections_.size()), 5);
668 
669  this->fresh_link(connection);
670 }
671 
672 /// Common code used by accept_datalink(), passive_connection(), and active completion.
673 int
675  const TcpConnection_rch& connection)
676 {
677  DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6);
678 
679  if (link.reuse_existing_connection(connection) == 0) {
680  return 0;
681  }
682 
683  TcpInst_rch cfg = config();
684  if (!cfg) {
685  return -1;
686  }
687 
688  ++last_link_;
689 
690  if (DCPS_debug_level > 4) {
692  ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ")
693  ACE_TEXT("creating send strategy with priority %d.\n"),
694  last_link_.load(), link.transport_priority()));
695  }
696 
697  connection->id() = last_link_;
698 
699  TcpSendStrategy_rch send_strategy (
700  make_rch<TcpSendStrategy>(last_link_.load(), ref(link),
701  new TcpSynchResource(link,
702  cfg->max_output_pause_period_),
703  this->reactor_task(), link.transport_priority()));
704 
705  TcpReceiveStrategy_rch receive_strategy(
706  make_rch<TcpReceiveStrategy>(ref(link), this->reactor_task()));
707 
708  if (link.connect(connection, send_strategy, receive_strategy) != 0) {
709  return -1;
710  }
711 
712  return 0;
713 }
714 
715 /// This function is called by the TcpReconnectTask thread to check if the passively
716 /// accepted connection is the re-established connection. If it is, then the "old" connection
717 /// object in the datalink is replaced by the "new" connection object.
718 int
720 {
721  DBG_ENTRY_LVL("TcpTransport","fresh_link",6);
722 
723  TcpDataLink_rch link;
724  GuardType guard(this->links_lock_);
725 
726  if (is_shut_down()) {
727  return 0;
728  }
729 
730  TcpInst_rch cfg = config();
731  if (!cfg) {
732  return -1;
733  }
734 
735  PriorityKey key(connection->transport_priority(),
736  connection->get_remote_address(),
737  connection->get_remote_address() == cfg->local_address(),
738  connection->is_connector());
739 
740  if (this->links_.find(key, link) == 0) {
741  TcpConnection_rch old_con = link->get_connection();
742 
743  // The connection is accepted but may not be associated with the datalink
744  // at this point. The thread calling add_associations() will associate
745  // the datalink with the connection in make_passive_connection().
746  if (old_con.is_nil()) {
747  return 0;
748  }
749 
750  if (old_con.in() != connection.in())
751  // Replace the "old" connection object with the "new" connection object.
752  {
753  return link->reconnect(connection);
754  }
755  }
756 
757  return 0;
758 }
759 
760 void
762 {
763  TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
764 
765  if (tcp_link == 0) {
766  // Really an assertion failure
768  "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
769  "Failed to downcast DataLink to TcpDataLink.\n"));
770  return;
771  }
772 
773  // Attempt to remove the TcpDataLink from our links_ map.
775  tcp_link->transport_priority(),
776  tcp_link->remote_address(),
777  tcp_link->is_loopback(),
778  tcp_link->is_active());
779 
781  "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey "
782  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
783  link,
784  tcp_link->transport_priority(),
785  LogAddr(tcp_link->remote_address()).c_str(),
786  (int)tcp_link->is_loopback(),
787  (int)tcp_link->is_active()), 2);
788 
789  GuardType guard(this->links_lock_);
790 
793  "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
794  "Failed to find link %@ tcp_link %@ PriorityKey "
795  "prio=%d, addr=%C, is_loopback=%d, is_active=%d\n",
796  link,
797  tcp_link,
798  tcp_link->transport_priority(),
799  LogAddr(tcp_link->remote_address()).c_str(),
800  (int)tcp_link->is_loopback(),
801  (int)tcp_link->is_active()));
802  }
803 }
804 
805 
806 int
808  // Overriding fini() so that ACE_Connector<TcpConnection, ACE_SOCK_Connector>::close() won't be
809  // invoked in the process shutting down reactor. Without overrinding fini(), close() would be called
810  // from destructor and from reactor in different threads which leads to synchronization issues.
811  return 0;
812 }
813 
814 }
815 }
816 
AddrLinkMap links_
This is the map of connected DataLinks.
Definition: TcpTransport.h:155
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
const TimeDuration & datalink_release_delay() const
Definition: DataLink.inl:62
virtual void release_datalink(DataLink *link)
Called by the DataLink to release itself.
virtual int connect(SVC_HANDLER *&svc_handler, const typename PEER_CONNECTOR::PEER_ADDR &remote_addr, const ACE_Synch_Options &synch_options=ACE_Synch_Options::defaults, const typename PEER_CONNECTOR::PEER_ADDR &local_addr=reinterpret_cast< const peer_addr_type & >(peer_addr_type::sap_any), int reuse_addr=0, int flags=O_RDWR, int perms=0)
bool find_datalink_i(const PriorityKey &key, TcpDataLink_rch &link)
LockType links_lock_
This lock is used to protect the links_ data member.
Definition: TcpTransport.h:159
void passive_connection(const ACE_INET_Addr &remote_address, const TcpConnection_rch &connection)
TcpTransport(const TcpInst_rch &inst)
sequence< octet > key
Encapsulate a priority value and internet address as a key.
Definition: PriorityKey.h:52
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
void set_scheduling_release(bool scheduling_release)
Definition: DataLink.inl:161
const char * c_str() const
void schedule_stop(const MonotonicTimePoint &schedule_to_stop_at)
Definition: DataLink.cpp:325
int fresh_link(TcpConnection_rch connection)
unique_ptr< TcpAcceptor > acceptor_
Used to accept passive connections on our local_address_.
Definition: TcpTransport.h:144
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22
int release(void)
virtual int open(ACE_Reactor *r=ACE_Reactor::instance(), int flags=0)
ACE_INET_Addr & address()
Definition: PriorityKey.inl:72
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
LM_DEBUG
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
void async_connect_failed(const PriorityKey &key)
int connect(const TcpConnection_rch &connection, const RcHandle< TcpSendStrategy > &send_strategy, const RcHandle< TcpReceiveStrategy > &receive_strategy)
Priority & transport_priority()
Definition: DataLink.inl:21
int reconnect(const TcpConnection_rch &connection)
int hostname(char name[], size_t maxnamelen)
virtual void unbind_link(DataLink *link)
Remove any pending_release mappings.
int unbind(const EXT_ID &ext_id)
int bind(const EXT_ID &item, const INT_ID &int_id)
PriorityKey blob_to_key(const TransportBLOB &remote, Priority priority, bool active)
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)
void set_release_pending(bool flag)
Set release pending flag.
LM_WARNING
bool add_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
Definition: DataLink.cpp:111
int find(const EXT_ID &ext_id, INT_ID &int_id) const
Atomic< size_t > last_link_
Definition: TcpTransport.h:168
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
ACE_TEXT("TCP_Factory")
static const TimeDuration zero_value
Definition: TimeDuration.h:31
ReactorTask_rch reactor_task()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_CDR::Long Priority
virtual void client_stop(const GUID_t &local_id)
static ACE_INET_Addr get_remote_address(const TransportBLOB &remote)
u_short get_port_number(void) const
unsigned long msec(void) const
virtual bool configure_i(const TcpInst_rch &config)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define EWOULDBLOCK
String str(unsigned decimal_places=3, bool just_sec=false) const
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14
String get_fully_qualified_hostname(ACE_INET_Addr *addr)
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
Connector connector_
Open TcpConnections using non-blocking connect.
Definition: TcpTransport.h:152
int connect_tcp_datalink(TcpDataLink &link, const TcpConnection_rch &connection)
Common code used by accept_datalink(), passive_connection(), and active completion.
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
T load() const
Definition: Atomic.h:33
AddrLinkMap pending_release_links_
Definition: TcpTransport.h:156
int next(ACE_Hash_Map_Entry< EXT_ID, INT_ID > *&next_entry) const
int reuse_existing_connection(const TcpConnection_rch &connection)
TcpInst_rch config() const
#define TheServiceParticipant
DDS::OctetSeq TransportBLOB
const ACE_INET_Addr & remote_address() const
Accessor for the remote address.
Definition: TcpDataLink.inl:15
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual bool connection_info_i(TransportLocator &local_info, ConnectionInfoFlags flags) const
size_t ConnectionInfoFlags
TransportInst_rch config() const
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194