00001
00002
00003
00004
00005
00006
00007
00008 #include "Tcp_pch.h"
00009 #include "TcpTransport.h"
00010 #include "TcpConnectionReplaceTask.h"
00011 #include "TcpAcceptor.h"
00012 #include "TcpSendStrategy.h"
00013 #include "TcpReceiveStrategy.h"
00014 #include "TcpInst.h"
00015 #include "TcpDataLink.h"
00016 #include "TcpSynchResource.h"
00017 #include "TcpConnection.h"
00018 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00019 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00020 #include "dds/DCPS/transport/framework/EntryExit.h"
00021 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00022 #include "dds/DCPS/AssociationData.h"
00023 #include "dds/DCPS/debug.h"
00024 #include "dds/DCPS/GuidConverter.h"
00025 #include "dds/DCPS/Service_Participant.h"
00026
00027 #include <sstream>
00028
00029 namespace OpenDDS {
00030 namespace DCPS {
00031
00032 TcpTransport::TcpTransport(const TransportInst_rch& inst)
00033 : acceptor_(new TcpAcceptor(this)),
00034 con_checker_(new TcpConnectionReplaceTask(this))
00035 {
00036 DBG_ENTRY_LVL("TcpTransport","TcpTransport",6);
00037
00038 if (!inst.is_nil()) {
00039 if (!configure(inst.in())) {
00040 delete con_checker_;
00041 delete acceptor_;
00042 throw Transport::UnableToCreate();
00043 }
00044 }
00045 }
00046
00047 TcpTransport::~TcpTransport()
00048 {
00049 DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6);
00050 delete acceptor_;
00051
00052 con_checker_->close(1);
00053 delete con_checker_;
00054 }
00055
00056 PriorityKey
00057 TcpTransport::blob_to_key(const TransportBLOB& remote,
00058 Priority priority,
00059 bool active)
00060 {
00061 const ACE_INET_Addr remote_address =
00062 AssociationData::get_remote_address(remote);
00063 const bool is_loopback = remote_address == tcp_config_->local_address();
00064 return PriorityKey(priority, remote_address, is_loopback, active);
00065 }
00066
00067 TransportImpl::AcceptConnectResult
00068 TcpTransport::connect_datalink(const RemoteTransport& remote,
00069 const ConnectionAttribs& attribs,
00070 TransportClient* client)
00071 {
00072 DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6);
00073
00074 const PriorityKey key =
00075 blob_to_key(remote.blob_, attribs.priority_, true );
00076
00077 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey "
00078 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00079 key.priority(), key.address().get_host_addr(),
00080 key.address().get_port_number(), key.is_loopback(),
00081 key.is_active()), 0);
00082
00083 TcpDataLink_rch link;
00084 {
00085 GuardType guard(links_lock_);
00086
00087 if (find_datalink_i(key, link, client, remote.repo_id_)) {
00088 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0);
00089 return link.is_nil()
00090 ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00091 : AcceptConnectResult(link._retn());
00092 }
00093
00094 link = new TcpDataLink(key.address(), this, attribs.priority_,
00095 key.is_loopback(), true );
00096 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0);
00097 if (links_.bind(key, link) != 0 ) {
00098 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00099 "Unable to bind new TcpDataLink[%@] to "
00100 "TcpTransport in links_ map.\n", link.in()));
00101 return AcceptConnectResult();
00102 }
00103 }
00104
00105 TcpConnection_rch connection =
00106 new TcpConnection(key.address(), link->transport_priority(), tcp_config_);
00107 connection->set_datalink(link.in());
00108
00109 TcpConnection* pConn = connection.in();
00110 TcpConnection_rch reactor_refcount(connection);
00111
00112 ACE_TCHAR str[64];
00113 key.address().addr_to_string(str,sizeof(str)/sizeof(str[0]));
00114
00115
00116 const int ret =
00117 connector_.connect(pConn, key.address(), ACE_Synch_Options::asynch);
00118
00119 if (ret == -1 && errno != EWOULDBLOCK) {
00120
00121 VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2);
00122 ACE_DEBUG((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"));
00123
00124
00125
00126
00127
00128
00129 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0);
00130 {
00131 GuardType guard(links_lock_);
00132 if (links_.unbind(key, link) != 0 ) {
00133 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00134 "Unable to unbind failed TcpDataLink[%@] from "
00135 "TcpTransport links_ map.\n", link.in()));
00136 }
00137 }
00138 link->invoke_on_start_callbacks(false);
00139
00140 return AcceptConnectResult();
00141 }
00142
00143
00144
00145 (void) reactor_refcount._retn();
00146
00147 if (ret == 0) {
00148
00149 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink "
00150 "completed synchronously.\n"), 0);
00151 return AcceptConnectResult(link._retn());
00152 }
00153
00154 if (!link->add_on_start_callback(client, remote.repo_id_)) {
00155
00156
00157 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink got link.\n"), 0);
00158 return AcceptConnectResult(link._retn());
00159 }
00160
00161 GuardType connections_guard(connections_lock_);
00162
00163 add_pending_connection(client, link.in());
00164 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0);
00165 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00166 }
00167
00168 void
00169 TcpTransport::async_connect_failed(const PriorityKey& key)
00170 {
00171
00172 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Failed to make active connection.\n"));
00173 GuardType guard(links_lock_);
00174 TcpDataLink_rch link;
00175 links_.find(key, link);
00176 links_.unbind(key);
00177 guard.release();
00178
00179 if (link.in()) {
00180 link->invoke_on_start_callbacks(false);
00181 }
00182
00183 }
00184
00185
00186 bool
00187 TcpTransport::find_datalink_i(const PriorityKey& key, TcpDataLink_rch& link,
00188 TransportClient* client, const RepoId& remote_id)
00189 {
00190 DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6);
00191
00192 if (links_.find(key, link) == 0 ) {
00193 if (!link->add_on_start_callback(client, remote_id)) {
00194 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00195 ACE_TEXT("link[%@] found, already started.\n"), link.in()), 0);
00196
00197
00198 return true;
00199 }
00200
00201 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00202 ACE_TEXT("link[%@] found, add to pending connections.\n"), link.in()), 0);
00203 add_pending_connection(client, link.in());
00204 link = 0;
00205 return true;
00206
00207 } else if (pending_release_links_.find(key, link) == 0 ) {
00208 if (link->cancel_release()) {
00209 link->set_release_pending(false);
00210
00211 if (pending_release_links_.unbind(key, link) == 0
00212 && links_.bind(key, link) == 0 ) {
00213 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00214 ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0);
00215 return true;
00216 }
00217 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00218 ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0);
00219 } else {
00220 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00221 ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0);
00222 }
00223 link = 0;
00224 return false;
00225 }
00226
00227 return false;
00228 }
00229
00230 TransportImpl::AcceptConnectResult
00231 TcpTransport::accept_datalink(const RemoteTransport& remote,
00232 const ConnectionAttribs& attribs,
00233 TransportClient* client)
00234 {
00235 GuidConverter remote_conv(remote.repo_id_);
00236 GuidConverter local_conv(attribs.local_id_);
00237
00238 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C "
00239 "accepting connection from remote %C\n",
00240 std::string(local_conv).c_str(),
00241 std::string(remote_conv).c_str()), 5);
00242
00243 GuardType guard(connections_lock_);
00244 const PriorityKey key =
00245 blob_to_key(remote.blob_, attribs.priority_, false );
00246
00247 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey "
00248 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", attribs.priority_,
00249 key.address().get_host_addr(), key.address().get_port_number(),
00250 key.is_loopback(), key.is_active()), 2);
00251
00252 TcpDataLink_rch link;
00253 {
00254 GuardType guard(links_lock_);
00255
00256 if (find_datalink_i(key, link, client, remote.repo_id_)) {
00257 return link.is_nil()
00258 ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00259 : AcceptConnectResult(link._retn());
00260
00261 } else {
00262 link = new TcpDataLink(key.address(), this, key.priority(),
00263 key.is_loopback(), key.is_active());
00264
00265 if (links_.bind(key, link) != 0 ) {
00266 ACE_ERROR((LM_ERROR,
00267 "(%P|%t) ERROR: TcpTransport::accept_datalink "
00268 "Unable to bind new TcpDataLink to "
00269 "TcpTransport in links_ map.\n"));
00270 return AcceptConnectResult();
00271 }
00272 }
00273 }
00274
00275 TcpConnection_rch connection;
00276 const ConnectionMap::iterator iter = connections_.find(key);
00277
00278 if (iter != connections_.end()) {
00279 connection = iter->second;
00280 connections_.erase(iter);
00281 }
00282
00283 if (connection.is_nil()) {
00284 if (!link->add_on_start_callback(client, remote.repo_id_)) {
00285 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00286 "got started link %@.\n", link.in()), 0);
00287 return AcceptConnectResult(link._retn());
00288 }
00289
00290 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00291 "no existing TcpConnection.\n"), 0);
00292
00293 add_pending_connection(client, link.in());
00294
00295
00296 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00297 }
00298
00299 guard.release();
00300
00301 if (connect_tcp_datalink(link, connection) == -1) {
00302 GuardType guard(links_lock_);
00303 links_.unbind(key);
00304 link = 0;
00305 }
00306
00307 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00308 "connected link %@.\n", link.in()), 2);
00309 return AcceptConnectResult(link._retn());
00310 }
00311
00312 void
00313 TcpTransport::stop_accepting_or_connecting(TransportClient* client,
00314 const RepoId& remote_id)
00315 {
00316 GuidConverter remote_converted(remote_id);
00317 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting "
00318 "stop connecting to remote: %C\n",
00319 std::string(remote_converted).c_str()), 5);
00320
00321 GuardType guard(connections_lock_);
00322 typedef std::multimap<TransportClient*, DataLink_rch>::iterator iter_t;
00323 const std::pair<iter_t, iter_t> range =
00324 pending_connections_.equal_range(client);
00325
00326 for (iter_t iter = range.first; iter != range.second; ++iter) {
00327 iter->second->remove_on_start_callback(client, remote_id);
00328 }
00329
00330 pending_connections_.erase(range.first, range.second);
00331 }
00332
00333 bool
00334 TcpTransport::configure_i(TransportInst* config)
00335 {
00336 DBG_ENTRY_LVL("TcpTransport", "configure_i", 6);
00337
00338
00339 TcpInst* tcp_config =
00340 static_cast<TcpInst*>(config);
00341
00342 if (tcp_config == 0) {
00343
00344 ACE_ERROR_RETURN((LM_ERROR,
00345 "(%P|%t) ERROR: Failed downcast from TransportInst "
00346 "to TcpInst.\n"),
00347 false);
00348 }
00349
00350 this->create_reactor_task();
00351
00352
00353 this->reactor_task_ = reactor_task();
00354
00355 connector_.open(reactor_task_->get_reactor());
00356
00357
00358 tcp_config->_add_ref();
00359 this->tcp_config_ = tcp_config;
00360
00361
00362 if (this->con_checker_->open()) {
00363 ACE_ERROR_RETURN((LM_ERROR,
00364 ACE_TEXT("(%P|%t) ERROR: connection checker failed to open : %p\n"),
00365 ACE_TEXT("open")),
00366 false);
00367 }
00368
00369
00370 if (this->tcp_config_->local_address() == ACE_INET_Addr () &&
00371 !TheServiceParticipant->default_address ().empty ()) {
00372 this->tcp_config_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00373 }
00374
00375
00376
00377
00378 if (this->acceptor_->open(this->tcp_config_->local_address(),
00379 this->reactor_task_->get_reactor()) != 0) {
00380
00381
00382
00383 TcpInst_rch cfg = this->tcp_config_._retn();
00384
00385 ACE_ERROR_RETURN((LM_ERROR,
00386 ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C:%d: %p\n"),
00387 cfg->local_address().get_host_addr(),
00388 cfg->local_address().get_port_number(),
00389 ACE_TEXT("open")),
00390 false);
00391 }
00392
00393
00394 ACE_INET_Addr address;
00395
00396 if (this->acceptor_->acceptor().get_local_addr(address) != 0) {
00397 ACE_ERROR((LM_ERROR,
00398 ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ")
00399 ACE_TEXT("- %p"),
00400 ACE_TEXT("cannot get local addr\n")));
00401 }
00402
00403 OPENDDS_STRING listening_addr(address.get_host_addr());
00404 VDBG_LVL((LM_DEBUG,
00405 ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C:%hu\n"),
00406 listening_addr.c_str(), address.get_port_number()), 2);
00407
00408 unsigned short port = address.get_port_number();
00409
00410
00411
00412 if (tcp_config_->local_address().is_any()) {
00413 std::string hostname = get_fully_qualified_hostname();
00414
00415 this->tcp_config_->local_address(port, hostname.c_str());
00416 }
00417
00418
00419
00420 else if (tcp_config_->local_address().get_port_number() == 0) {
00421 tcp_config_->local_address_set_port(port);
00422 }
00423
00424
00425 return true;
00426 }
00427
00428 void
00429 TcpTransport::pre_shutdown_i()
00430 {
00431 DBG_ENTRY_LVL("TcpTransport","pre_shutdown_i",6);
00432
00433 GuardType guard(this->links_lock_);
00434
00435 AddrLinkMap::ENTRY* entry;
00436
00437 for (AddrLinkMap::ITERATOR itr(this->links_);
00438 itr.next(entry);
00439 itr.advance()) {
00440 entry->int_id_->pre_stop_i();
00441 }
00442 }
00443
00444 void
00445 TcpTransport::shutdown_i()
00446 {
00447 DBG_ENTRY_LVL("TcpTransport","shutdown_i",6);
00448
00449
00450 this->acceptor_->close();
00451 this->acceptor_->transport_shutdown();
00452
00453 this->con_checker_->close(1);
00454
00455 {
00456 GuardType guard(this->connections_lock_);
00457
00458 this->connections_.clear();
00459 this->pending_connections_.clear();
00460 }
00461
00462
00463 {
00464 GuardType guard(this->links_lock_);
00465
00466 AddrLinkMap::ENTRY* entry;
00467
00468 for (AddrLinkMap::ITERATOR itr(this->links_);
00469 itr.next(entry);
00470 itr.advance()) {
00471 entry->int_id_->transport_shutdown();
00472 }
00473
00474 this->links_.unbind_all();
00475
00476 for (AddrLinkMap::ITERATOR itr(this->pending_release_links_);
00477 itr.next(entry);
00478 itr.advance()) {
00479 entry->int_id_->transport_shutdown();
00480 }
00481
00482 this->pending_release_links_.unbind_all();
00483 }
00484
00485
00486 this->tcp_config_ = 0;
00487
00488
00489 this->reactor_task_ = 0;
00490
00491
00492
00493 this->acceptor_->transport_shutdown();
00494 }
00495
00496 bool
00497 TcpTransport::connection_info_i(TransportLocator& local_info) const
00498 {
00499 DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6);
00500
00501 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address str %C\n",
00502 this->tcp_config_->get_public_address().c_str()), 2);
00503
00504 this->tcp_config_->populate_locator(local_info);
00505
00506 return true;
00507 }
00508
00509 void
00510 TcpTransport::release_datalink(DataLink* link)
00511 {
00512 DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6);
00513
00514 TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00515
00516 if (tcp_link == 0) {
00517
00518 ACE_ERROR((LM_ERROR,
00519 "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to "
00520 "TcpDataLink.\n"));
00521 return;
00522 }
00523
00524 TcpDataLink_rch released_link;
00525
00526
00527 enum LinkAction { None, StopLink, ScheduleLinkRelease };
00528 LinkAction linkAction = None;
00529
00530
00531 GuardType guard(this->links_lock_);
00532
00533
00534 PriorityKey key(
00535 tcp_link->transport_priority(),
00536 tcp_link->remote_address(),
00537 tcp_link->is_loopback(),
00538 tcp_link->is_active());
00539
00540 VDBG_LVL((LM_DEBUG,
00541 "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey "
00542 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00543 link,
00544 tcp_link->transport_priority(),
00545 tcp_link->remote_address().get_host_addr(),
00546 tcp_link->remote_address().get_port_number(),
00547 (int)tcp_link->is_loopback(),
00548 (int)tcp_link->is_active()), 2);
00549
00550 if (this->links_.unbind(key, released_link) != 0) {
00551
00552 } else if (link->datalink_release_delay() > ACE_Time_Value::zero) {
00553 link->set_scheduling_release(true);
00554
00555 VDBG_LVL((LM_DEBUG,
00556 "(%P|%t) TcpTransport::release_datalink datalink_release_delay "
00557 "is %: sec %d usec\n",
00558 link->datalink_release_delay().sec(),
00559 link->datalink_release_delay().usec()), 4);
00560
00561
00562 released_link->set_release_pending(true);
00563
00564 switch (this->pending_release_links_.bind(key, released_link)) {
00565 case -1:
00566 ACE_ERROR((LM_ERROR,
00567 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00568 "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind")));
00569 linkAction = StopLink;
00570 break;
00571
00572 case 1:
00573 ACE_ERROR((LM_ERROR,
00574 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00575 "pending_release_links_ map: already bound\n", released_link.in()));
00576 linkAction = StopLink;
00577 break;
00578
00579 case 0:
00580 linkAction = ScheduleLinkRelease;
00581 break;
00582
00583 default:
00584 break;
00585 }
00586
00587 } else {
00588 link->set_scheduling_release(true);
00589
00590 linkAction = StopLink;
00591 }
00592
00593
00594 ACE_Time_Value cancel_now = ACE_OS::gettimeofday();
00595 switch (linkAction) {
00596 case StopLink:
00597 link->schedule_stop(cancel_now);
00598 break;
00599
00600 case ScheduleLinkRelease:
00601
00602 link->schedule_delayed_release();
00603 break;
00604
00605 case None:
00606 break;
00607 }
00608
00609 if (DCPS_debug_level > 9) {
00610 std::stringstream buffer;
00611 buffer << *link;
00612 ACE_DEBUG((LM_DEBUG,
00613 ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ")
00614 ACE_TEXT("link[%@] with priority %d released.\n%C"),
00615 link,
00616 link->transport_priority(),
00617 buffer.str().c_str()));
00618 }
00619 }
00620
00621 TcpInst*
00622 TcpTransport::get_configuration()
00623 {
00624 return this->tcp_config_.in();
00625 }
00626
00627
00628
00629
00630
00631
00632 void
00633 TcpTransport::passive_connection(const ACE_INET_Addr& remote_address,
00634 const TcpConnection_rch& connection)
00635 {
00636 DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6);
00637
00638 const PriorityKey key(connection->transport_priority(),
00639 remote_address,
00640 remote_address == tcp_config_->local_address(),
00641 connection->is_connector());
00642
00643 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
00644 ACE_TEXT("established with %C:%d.\n"),
00645 remote_address.get_host_name(),
00646 remote_address.get_port_number()), 2);
00647
00648 GuardType connection_guard(connections_lock_);
00649 TcpDataLink_rch link;
00650 {
00651 GuardType guard(links_lock_);
00652 links_.find(key, link);
00653 }
00654
00655 if (!link.is_nil()) {
00656 connection_guard.release();
00657
00658 if (connect_tcp_datalink(link, connection) == -1) {
00659 VDBG_LVL((LM_ERROR,
00660 ACE_TEXT("(%P|%t) ERROR: connect_tcp_datalink failed\n")), 5);
00661 GuardType guard(links_lock_);
00662 links_.unbind(key);
00663
00664 } else {
00665 con_checker_->add(connection);
00666 }
00667
00668 return;
00669 }
00670
00671
00672
00673
00674 VDBG_LVL((LM_DEBUG, "(%P|%t) # of bef connections: %d\n", connections_.size()), 5);
00675 const ConnectionMap::iterator where = connections_.find(key);
00676
00677 if (where != connections_.end()) {
00678 ACE_ERROR((LM_ERROR,
00679 ACE_TEXT("(%P|%t) ERROR: TcpTransport::passive_connection() - ")
00680 ACE_TEXT("connection with %C:%d at priority %d already exists, ")
00681 ACE_TEXT("overwriting previously established connection.\n"),
00682 remote_address.get_host_name(),
00683 remote_address.get_port_number(),
00684 connection->transport_priority()));
00685 }
00686
00687 connections_[key] = connection;
00688 VDBG_LVL((LM_DEBUG, "(%P|%t) # of after connections: %d\n", connections_.size()), 5);
00689
00690 con_checker_->add(connection);
00691 }
00692
00693
00694 int
00695 TcpTransport::connect_tcp_datalink(const TcpDataLink_rch& link,
00696 const TcpConnection_rch& connection)
00697 {
00698 DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6);
00699
00700 if (link->reuse_existing_connection(connection) == 0) {
00701 return 0;
00702 }
00703
00704 ++last_link_;
00705
00706 if (DCPS_debug_level > 4) {
00707 ACE_DEBUG((LM_DEBUG,
00708 ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ")
00709 ACE_TEXT("creating send strategy with priority %d.\n"),
00710 last_link_, link->transport_priority()));
00711 }
00712
00713 connection->id() = last_link_;
00714
00715 TransportSendStrategy_rch send_strategy =
00716 new TcpSendStrategy(last_link_, link, this->tcp_config_, connection,
00717 new TcpSynchResource(connection,
00718 this->tcp_config_->max_output_pause_period_),
00719 this->reactor_task_, link->transport_priority());
00720
00721 TransportStrategy_rch receive_strategy =
00722 new TcpReceiveStrategy(link, connection, this->reactor_task_);
00723
00724 if (link->connect(connection, send_strategy, receive_strategy) != 0) {
00725 return -1;
00726 }
00727
00728 return 0;
00729 }
00730
00731
00732
00733
00734 int
00735 TcpTransport::fresh_link(TcpConnection_rch connection)
00736 {
00737 DBG_ENTRY_LVL("TcpTransport","fresh_link",6);
00738
00739 TcpDataLink_rch link;
00740 GuardType guard(this->links_lock_);
00741
00742 PriorityKey key(connection->transport_priority(),
00743 connection->get_remote_address(),
00744 connection->get_remote_address() == this->tcp_config_->local_address(),
00745 connection->is_connector());
00746
00747 if (this->links_.find(key, link) == 0) {
00748 TcpConnection_rch old_con = link->get_connection();
00749
00750
00751
00752
00753 if (old_con.is_nil()) {
00754 return 0;
00755 }
00756
00757 if (old_con.in() != connection.in())
00758
00759 {
00760 return link->reconnect(connection.in());
00761 }
00762 }
00763
00764 return 0;
00765 }
00766
00767 void
00768 TcpTransport::unbind_link(DataLink* link)
00769 {
00770 TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00771
00772 if (tcp_link == 0) {
00773
00774 ACE_ERROR((LM_ERROR,
00775 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00776 "Failed to downcast DataLink to TcpDataLink.\n"));
00777 return;
00778 }
00779
00780
00781 PriorityKey key(
00782 tcp_link->transport_priority(),
00783 tcp_link->remote_address(),
00784 tcp_link->is_loopback(),
00785 tcp_link->is_active());
00786
00787 VDBG_LVL((LM_DEBUG,
00788 "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey "
00789 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00790 link,
00791 tcp_link->transport_priority(),
00792 tcp_link->remote_address().get_host_addr(),
00793 tcp_link->remote_address().get_port_number(),
00794 (int)tcp_link->is_loopback(),
00795 (int)tcp_link->is_active()), 2);
00796
00797 GuardType guard(this->links_lock_);
00798
00799 if (this->pending_release_links_.unbind(key) != 0 &&
00800 link->datalink_release_delay() > ACE_Time_Value::zero) {
00801 ACE_ERROR((LM_ERROR,
00802 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00803 "Failed to find link %@ tcp_link %@ PriorityKey "
00804 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00805 link,
00806 tcp_link,
00807 tcp_link->transport_priority(),
00808 tcp_link->remote_address().get_host_addr(),
00809 tcp_link->remote_address().get_port_number(),
00810 (int)tcp_link->is_loopback(),
00811 (int)tcp_link->is_active()));
00812 }
00813 }
00814
00815 }
00816 }