00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpTransport.h"
00010 #include "TcpConnectionReplaceTask.h"
00011 #include "TcpAcceptor.h"
00012 #include "TcpSendStrategy.h"
00013 #include "TcpReceiveStrategy.h"
00014 #include "TcpInst.h"
00015 #include "TcpDataLink.h"
00016 #include "TcpSynchResource.h"
00017 #include "TcpConnection.h"
00018 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00019 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00020 #include "dds/DCPS/transport/framework/EntryExit.h"
00021 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00022 #include "dds/DCPS/AssociationData.h"
00023 #include "dds/DCPS/debug.h"
00024 #include "dds/DCPS/GuidConverter.h"
00025 #include "dds/DCPS/Service_Participant.h"
00026 #include "dds/DCPS/transport/framework/TransportClient.h"
00027
00028 #include <sstream>
00029
00030 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00031
00032 namespace OpenDDS {
00033 namespace DCPS {
00034
00035 TcpTransport::TcpTransport(TcpInst& inst)
00036 : TransportImpl(inst)
00037 , acceptor_(new TcpAcceptor(this))
00038 , con_checker_(new TcpConnectionReplaceTask(this))
00039 {
00040 DBG_ENTRY_LVL("TcpTransport","TcpTransport",6);
00041
00042 if (!(configure_i(inst) && open())) {
00043 this->shutdown();
00044 throw Transport::UnableToCreate();
00045 }
00046
00047 }
00048
00049 TcpTransport::~TcpTransport()
00050 {
00051 DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6);
00052 con_checker_->close(1);
00053 }
00054
00055
00056 TcpInst&
00057 TcpTransport::config() const
00058 {
00059 return static_cast<TcpInst&>(TransportImpl::config());
00060 }
00061
00062 PriorityKey
00063 TcpTransport::blob_to_key(const TransportBLOB& remote,
00064 Priority priority,
00065 bool active)
00066 {
00067 const ACE_INET_Addr remote_address =
00068 AssociationData::get_remote_address(remote);
00069 const bool is_loopback = remote_address == config().local_address();
00070 return PriorityKey(priority, remote_address, is_loopback, active);
00071 }
00072
00073 TransportImpl::AcceptConnectResult
00074 TcpTransport::connect_datalink(const RemoteTransport& remote,
00075 const ConnectionAttribs& attribs,
00076 const TransportClient_rch& client)
00077 {
00078 DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6);
00079
00080 const PriorityKey key =
00081 blob_to_key(remote.blob_, attribs.priority_, true );
00082
00083 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey "
00084 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00085 key.priority(), key.address().get_host_addr(),
00086 key.address().get_port_number(), key.is_loopback(),
00087 key.is_active()), 0);
00088
00089 TcpDataLink_rch link;
00090 {
00091 GuardType guard(links_lock_);
00092
00093 if (find_datalink_i(key, link, client, remote.repo_id_)) {
00094 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0);
00095 return link.is_nil()
00096 ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00097 : AcceptConnectResult(link);
00098 }
00099
00100 link = make_rch<TcpDataLink>(key.address(), ref(*this), attribs.priority_,
00101 key.is_loopback(), true );
00102 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0);
00103 if (links_.bind(key, link) != 0 ) {
00104 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00105 "Unable to bind new TcpDataLink[%@] to "
00106 "TcpTransport in links_ map.\n", link.in()));
00107 return AcceptConnectResult();
00108 }
00109 }
00110
00111 TcpConnection_rch connection(
00112 make_rch<TcpConnection>(key.address(), link->transport_priority(), this->config()));
00113 connection->set_datalink(link);
00114
00115 TcpConnection* pConn = connection.in();
00116
00117 ACE_TCHAR str[64];
00118 key.address().addr_to_string(str,sizeof(str)/sizeof(str[0]));
00119
00120
00121 const int ret =
00122 connector_.connect(pConn, key.address(), ACE_Synch_Options::asynch);
00123
00124 if (ret == -1 && errno != EWOULDBLOCK) {
00125
00126 VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2);
00127 ACE_ERROR((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"));
00128
00129
00130
00131
00132
00133
00134 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0);
00135 {
00136 GuardType guard(links_lock_);
00137 if (links_.unbind(key, link) != 0 ) {
00138 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00139 "Unable to unbind failed TcpDataLink[%@] from "
00140 "TcpTransport links_ map.\n", link.in()));
00141 }
00142 }
00143 link->invoke_on_start_callbacks(false);
00144
00145 return AcceptConnectResult();
00146 }
00147
00148 if (ret == 0) {
00149
00150 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink "
00151 "completed synchronously.\n"), 0);
00152 return AcceptConnectResult(link);
00153 }
00154
00155 if (!link->add_on_start_callback(client, remote.repo_id_)) {
00156
00157
00158 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink got link.\n"), 0);
00159 return AcceptConnectResult(link);
00160 }
00161
00162 GuardType connections_guard(connections_lock_);
00163
00164 add_pending_connection(client, link);
00165 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0);
00166 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00167 }
00168
00169 void
00170 TcpTransport::async_connect_failed(const PriorityKey& key)
00171 {
00172
00173 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Failed to make active connection.\n"));
00174 GuardType guard(links_lock_);
00175 TcpDataLink_rch link;
00176 links_.find(key, link);
00177 links_.unbind(key);
00178 guard.release();
00179
00180 if (link.in()) {
00181 link->invoke_on_start_callbacks(false);
00182 }
00183
00184 }
00185
00186
00187 bool
00188 TcpTransport::find_datalink_i(const PriorityKey& key, TcpDataLink_rch& link,
00189 const TransportClient_rch& client, const RepoId& remote_id)
00190 {
00191 DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6);
00192
00193 if (links_.find(key, link) == 0 ) {
00194 if (!link->add_on_start_callback(client, remote_id)) {
00195 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00196 ACE_TEXT("link[%@] found, already started.\n"), link.in()), 0);
00197
00198
00199 return true;
00200 }
00201
00202 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00203 ACE_TEXT("link[%@] found, add to pending connections.\n"), link.in()), 0);
00204 add_pending_connection(client, link);
00205 link.reset();
00206 return true;
00207
00208 } else if (pending_release_links_.find(key, link) == 0 ) {
00209 if (link->cancel_release()) {
00210 link->set_release_pending(false);
00211
00212 if (pending_release_links_.unbind(key, link) == 0
00213 && links_.bind(key, link) == 0 ) {
00214 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00215 ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0);
00216 return true;
00217 }
00218 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00219 ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0);
00220 } else {
00221 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00222 ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0);
00223 }
00224 link.reset();
00225 return false;
00226 }
00227
00228 return false;
00229 }
00230
00231 TransportImpl::AcceptConnectResult
00232 TcpTransport::accept_datalink(const RemoteTransport& remote,
00233 const ConnectionAttribs& attribs,
00234 const TransportClient_rch& client)
00235 {
00236 GuidConverter remote_conv(remote.repo_id_);
00237 GuidConverter local_conv(attribs.local_id_);
00238
00239 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C "
00240 "accepting connection from remote %C\n",
00241 std::string(local_conv).c_str(),
00242 std::string(remote_conv).c_str()), 5);
00243
00244 GuardType guard(connections_lock_);
00245 const PriorityKey key =
00246 blob_to_key(remote.blob_, attribs.priority_, false );
00247
00248 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey "
00249 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", attribs.priority_,
00250 key.address().get_host_addr(), key.address().get_port_number(),
00251 key.is_loopback(), key.is_active()), 2);
00252
00253 TcpDataLink_rch link;
00254 {
00255 GuardType guard(links_lock_);
00256
00257 if (find_datalink_i(key, link, client, remote.repo_id_)) {
00258 return link.is_nil()
00259 ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00260 : AcceptConnectResult(link);
00261
00262 } else {
00263 link = make_rch<TcpDataLink>(key.address(), ref(*this), key.priority(),
00264 key.is_loopback(), key.is_active());
00265
00266 if (links_.bind(key, link) != 0 ) {
00267 ACE_ERROR((LM_ERROR,
00268 "(%P|%t) ERROR: TcpTransport::accept_datalink "
00269 "Unable to bind new TcpDataLink to "
00270 "TcpTransport in links_ map.\n"));
00271 return AcceptConnectResult();
00272 }
00273 }
00274 }
00275
00276 TcpConnection_rch connection;
00277 const ConnectionMap::iterator iter = connections_.find(key);
00278
00279 if (iter != connections_.end()) {
00280 connection = iter->second;
00281 connections_.erase(iter);
00282 }
00283
00284 if (connection.is_nil()) {
00285 if (!link->add_on_start_callback(client, remote.repo_id_)) {
00286 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00287 "got started link %@.\n", link.in()), 0);
00288 return AcceptConnectResult(link);
00289 }
00290
00291 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00292 "no existing TcpConnection.\n"), 0);
00293
00294 add_pending_connection(client, link);
00295
00296
00297 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00298 }
00299
00300 guard.release();
00301
00302 if (connect_tcp_datalink(*link, connection) == -1) {
00303 GuardType guard(links_lock_);
00304 links_.unbind(key);
00305 link.reset();
00306 }
00307
00308 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00309 "connected link %@.\n", link.in()), 2);
00310 return AcceptConnectResult(link);
00311 }
00312
00313 void
00314 TcpTransport::stop_accepting_or_connecting(const TransportClient_wrch& client,
00315 const RepoId& remote_id)
00316 {
00317 GuidConverter remote_converted(remote_id);
00318 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting "
00319 "stop connecting to remote: %C\n",
00320 std::string(remote_converted).c_str()), 5);
00321
00322 GuardType guard(connections_lock_);
00323 typedef PendConnMap::iterator iter_t;
00324 const std::pair<iter_t, iter_t> range =
00325 pending_connections_.equal_range(client);
00326
00327 for (iter_t iter = range.first; iter != range.second; ++iter) {
00328 iter->second->remove_on_start_callback(client, remote_id);
00329 }
00330
00331 pending_connections_.erase(range.first, range.second);
00332 }
00333
00334 bool
00335 TcpTransport::configure_i(TcpInst& config)
00336 {
00337 DBG_ENTRY_LVL("TcpTransport", "configure_i", 6);
00338
00339 this->create_reactor_task();
00340
00341 connector_.open(reactor_task()->get_reactor());
00342
00343
00344 if (this->con_checker_->open()) {
00345 ACE_ERROR_RETURN((LM_ERROR,
00346 ACE_TEXT("(%P|%t) ERROR: connection checker failed to open : %p\n"),
00347 ACE_TEXT("open")),
00348 false);
00349 }
00350
00351
00352 if (config.local_address() == ACE_INET_Addr () &&
00353 !TheServiceParticipant->default_address ().empty ()) {
00354 config.local_address(0, TheServiceParticipant->default_address ().c_str ());
00355 }
00356
00357
00358
00359
00360 if (this->acceptor_->open(config.local_address(),
00361 this->reactor_task()->get_reactor()) != 0) {
00362
00363 ACE_ERROR_RETURN((LM_ERROR,
00364 ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C:%d: %p\n"),
00365 config.local_address().get_host_addr(),
00366 config.local_address().get_port_number(),
00367 ACE_TEXT("open")),
00368 false);
00369 }
00370
00371
00372 ACE_INET_Addr address;
00373
00374 if (this->acceptor_->acceptor().get_local_addr(address) != 0) {
00375 ACE_ERROR((LM_ERROR,
00376 ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ")
00377 ACE_TEXT("- %p"),
00378 ACE_TEXT("cannot get local addr\n")));
00379 }
00380
00381 OPENDDS_STRING listening_addr(address.get_host_addr());
00382 VDBG_LVL((LM_DEBUG,
00383 ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C:%hu\n"),
00384 listening_addr.c_str(), address.get_port_number()), 2);
00385
00386 unsigned short port = address.get_port_number();
00387
00388
00389
00390 if (config.local_address().is_any()) {
00391 std::string hostname = get_fully_qualified_hostname();
00392
00393 config.local_address(port, hostname.c_str());
00394 }
00395
00396
00397
00398 else if (config.local_address().get_port_number() == 0) {
00399 config.local_address_set_port(port);
00400 }
00401
00402
00403 return true;
00404 }
00405
00406
00407 void
00408 TcpTransport::shutdown_i()
00409 {
00410 DBG_ENTRY_LVL("TcpTransport","shutdown_i",6);
00411
00412 {
00413 GuardType guard(this->links_lock_);
00414
00415 AddrLinkMap::ENTRY* entry;
00416
00417 for (AddrLinkMap::ITERATOR itr(this->links_);
00418 itr.next(entry);
00419 itr.advance()) {
00420 entry->int_id_->pre_stop_i();
00421 }
00422 }
00423
00424
00425 this->acceptor_->close();
00426 this->acceptor_->transport_shutdown();
00427
00428 this->con_checker_->close(1);
00429
00430 {
00431 GuardType guard(this->connections_lock_);
00432
00433 this->connections_.clear();
00434 this->pending_connections_.clear();
00435 }
00436
00437
00438 {
00439 GuardType guard(this->links_lock_);
00440
00441 AddrLinkMap::ENTRY* entry;
00442
00443 for (AddrLinkMap::ITERATOR itr(this->links_);
00444 itr.next(entry);
00445 itr.advance()) {
00446 entry->int_id_->transport_shutdown();
00447 }
00448
00449 this->links_.unbind_all();
00450
00451 for (AddrLinkMap::ITERATOR itr(this->pending_release_links_);
00452 itr.next(entry);
00453 itr.advance()) {
00454 entry->int_id_->transport_shutdown();
00455 }
00456
00457 this->pending_release_links_.unbind_all();
00458 }
00459
00460
00461
00462 this->acceptor_->transport_shutdown();
00463 }
00464
00465 bool
00466 TcpTransport::connection_info_i(TransportLocator& local_info) const
00467 {
00468 DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6);
00469
00470 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address str %C\n",
00471 this->config().get_public_address().c_str()), 2);
00472
00473 this->config().populate_locator(local_info);
00474
00475 return true;
00476 }
00477
00478 void
00479 TcpTransport::release_datalink(DataLink* link)
00480 {
00481 DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6);
00482
00483 TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00484
00485 if (tcp_link == 0) {
00486
00487 ACE_ERROR((LM_ERROR,
00488 "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to "
00489 "TcpDataLink.\n"));
00490 return;
00491 }
00492
00493 TcpDataLink_rch released_link;
00494
00495
00496 enum LinkAction { None, StopLink, ScheduleLinkRelease };
00497 LinkAction linkAction = None;
00498
00499
00500 GuardType guard(this->links_lock_);
00501
00502
00503 PriorityKey key(
00504 tcp_link->transport_priority(),
00505 tcp_link->remote_address(),
00506 tcp_link->is_loopback(),
00507 tcp_link->is_active());
00508
00509 VDBG_LVL((LM_DEBUG,
00510 "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey "
00511 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00512 link,
00513 tcp_link->transport_priority(),
00514 tcp_link->remote_address().get_host_addr(),
00515 tcp_link->remote_address().get_port_number(),
00516 (int)tcp_link->is_loopback(),
00517 (int)tcp_link->is_active()), 2);
00518
00519 if (this->links_.unbind(key, released_link) != 0) {
00520
00521 } else if (link->datalink_release_delay() > ACE_Time_Value::zero) {
00522 link->set_scheduling_release(true);
00523
00524 VDBG_LVL((LM_DEBUG,
00525 "(%P|%t) TcpTransport::release_datalink datalink_release_delay "
00526 "is %: sec %d usec\n",
00527 link->datalink_release_delay().sec(),
00528 link->datalink_release_delay().usec()), 4);
00529
00530
00531 released_link->set_release_pending(true);
00532
00533 switch (this->pending_release_links_.bind(key, released_link)) {
00534 case -1:
00535 ACE_ERROR((LM_ERROR,
00536 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00537 "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind")));
00538 linkAction = StopLink;
00539 break;
00540
00541 case 1:
00542 ACE_ERROR((LM_ERROR,
00543 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00544 "pending_release_links_ map: already bound\n", released_link.in()));
00545 linkAction = StopLink;
00546 break;
00547
00548 case 0:
00549 linkAction = ScheduleLinkRelease;
00550 break;
00551
00552 default:
00553 break;
00554 }
00555
00556 } else {
00557 link->set_scheduling_release(true);
00558
00559 linkAction = StopLink;
00560 }
00561
00562
00563 ACE_Time_Value cancel_now = ACE_OS::gettimeofday();
00564 switch (linkAction) {
00565 case StopLink:
00566 link->schedule_stop(cancel_now);
00567 break;
00568
00569 case ScheduleLinkRelease:
00570
00571 link->schedule_delayed_release();
00572 break;
00573
00574 case None:
00575 break;
00576 }
00577
00578 if (DCPS_debug_level > 9) {
00579 std::stringstream buffer;
00580 buffer << *link;
00581 ACE_DEBUG((LM_DEBUG,
00582 ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ")
00583 ACE_TEXT("link[%@] with priority %d released.\n%C"),
00584 link,
00585 link->transport_priority(),
00586 buffer.str().c_str()));
00587 }
00588 }
00589
00590
00591
00592
00593
00594
00595 void
00596 TcpTransport::passive_connection(const ACE_INET_Addr& remote_address,
00597 const TcpConnection_rch& connection)
00598 {
00599 DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6);
00600
00601 const PriorityKey key(connection->transport_priority(),
00602 remote_address,
00603 remote_address == config().local_address(),
00604 connection->is_connector());
00605
00606 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
00607 ACE_TEXT("established with %C:%d.\n"),
00608 remote_address.get_host_name(),
00609 remote_address.get_port_number()), 2);
00610
00611 GuardType connection_guard(connections_lock_);
00612 TcpDataLink_rch link;
00613 {
00614 GuardType guard(links_lock_);
00615 links_.find(key, link);
00616 }
00617
00618 if (!link.is_nil()) {
00619 connection_guard.release();
00620
00621 if (connect_tcp_datalink(*link, connection) == -1) {
00622 VDBG_LVL((LM_ERROR,
00623 ACE_TEXT("(%P|%t) ERROR: connect_tcp_datalink failed\n")), 5);
00624 GuardType guard(links_lock_);
00625 links_.unbind(key);
00626
00627 } else {
00628 con_checker_->add(connection);
00629 }
00630
00631 return;
00632 }
00633
00634
00635
00636
00637 VDBG_LVL((LM_DEBUG, "(%P|%t) # of bef connections: %d\n", connections_.size()), 5);
00638 const ConnectionMap::iterator where = connections_.find(key);
00639
00640 if (where != connections_.end()) {
00641 ACE_ERROR((LM_ERROR,
00642 ACE_TEXT("(%P|%t) ERROR: TcpTransport::passive_connection() - ")
00643 ACE_TEXT("connection with %C:%d at priority %d already exists, ")
00644 ACE_TEXT("overwriting previously established connection.\n"),
00645 remote_address.get_host_name(),
00646 remote_address.get_port_number(),
00647 connection->transport_priority()));
00648 }
00649
00650 connections_[key] = connection;
00651 VDBG_LVL((LM_DEBUG, "(%P|%t) # of after connections: %d\n", connections_.size()), 5);
00652
00653 con_checker_->add(connection);
00654 }
00655
00656
00657 int
00658 TcpTransport::connect_tcp_datalink(TcpDataLink& link,
00659 const TcpConnection_rch& connection)
00660 {
00661 DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6);
00662
00663 if (link.reuse_existing_connection(connection) == 0) {
00664 return 0;
00665 }
00666
00667 ++last_link_;
00668
00669 if (DCPS_debug_level > 4) {
00670 ACE_DEBUG((LM_DEBUG,
00671 ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ")
00672 ACE_TEXT("creating send strategy with priority %d.\n"),
00673 last_link_, link.transport_priority()));
00674 }
00675
00676 connection->id() = last_link_;
00677
00678 TcpSendStrategy_rch send_strategy (
00679 make_rch<TcpSendStrategy>(last_link_, ref(link),
00680 new TcpSynchResource(link,
00681 this->config().max_output_pause_period_),
00682 this->reactor_task(), link.transport_priority()));
00683
00684 TcpReceiveStrategy_rch receive_strategy(
00685 make_rch<TcpReceiveStrategy>(ref(link), this->reactor_task()));
00686
00687 if (link.connect(connection, send_strategy, receive_strategy) != 0) {
00688 return -1;
00689 }
00690
00691 return 0;
00692 }
00693
00694
00695
00696
00697 int
00698 TcpTransport::fresh_link(TcpConnection_rch connection)
00699 {
00700 DBG_ENTRY_LVL("TcpTransport","fresh_link",6);
00701
00702 TcpDataLink_rch link;
00703 GuardType guard(this->links_lock_);
00704
00705 PriorityKey key(connection->transport_priority(),
00706 connection->get_remote_address(),
00707 connection->get_remote_address() == this->config().local_address(),
00708 connection->is_connector());
00709
00710 if (this->links_.find(key, link) == 0) {
00711 TcpConnection_rch old_con = link->get_connection();
00712
00713
00714
00715
00716 if (old_con.is_nil()) {
00717 return 0;
00718 }
00719
00720 if (old_con.in() != connection.in())
00721
00722 {
00723 return link->reconnect(connection);
00724 }
00725 }
00726
00727 return 0;
00728 }
00729
00730 void
00731 TcpTransport::unbind_link(DataLink* link)
00732 {
00733 TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00734
00735 if (tcp_link == 0) {
00736
00737 ACE_ERROR((LM_ERROR,
00738 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00739 "Failed to downcast DataLink to TcpDataLink.\n"));
00740 return;
00741 }
00742
00743
00744 PriorityKey key(
00745 tcp_link->transport_priority(),
00746 tcp_link->remote_address(),
00747 tcp_link->is_loopback(),
00748 tcp_link->is_active());
00749
00750 VDBG_LVL((LM_DEBUG,
00751 "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey "
00752 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00753 link,
00754 tcp_link->transport_priority(),
00755 tcp_link->remote_address().get_host_addr(),
00756 tcp_link->remote_address().get_port_number(),
00757 (int)tcp_link->is_loopback(),
00758 (int)tcp_link->is_active()), 2);
00759
00760 GuardType guard(this->links_lock_);
00761
00762 if (this->pending_release_links_.unbind(key) != 0 &&
00763 link->datalink_release_delay() > ACE_Time_Value::zero) {
00764 ACE_ERROR((LM_ERROR,
00765 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00766 "Failed to find link %@ tcp_link %@ PriorityKey "
00767 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00768 link,
00769 tcp_link,
00770 tcp_link->transport_priority(),
00771 tcp_link->remote_address().get_host_addr(),
00772 tcp_link->remote_address().get_port_number(),
00773 (int)tcp_link->is_loopback(),
00774 (int)tcp_link->is_active()));
00775 }
00776 }
00777
00778 }
00779 }
00780
00781 OPENDDS_END_VERSIONED_NAMESPACE_DECL