#include <TcpTransport.h>
Inheritance diagram for OpenDDS::DCPS::TcpTransport:
Public Member Functions | |
TcpTransport (const TransportInst_rch &inst) | |
virtual | ~TcpTransport () |
TcpInst * | get_configuration () |
int | fresh_link (TcpConnection_rch connection) |
virtual void | unbind_link (DataLink *link) |
Remove any pending_release mappings. | |
Private Types | |
typedef ACE_Hash_Map_Manager_Ex< PriorityKey, TcpDataLink_rch, ACE_Hash< PriorityKey >, ACE_Equal_To< PriorityKey >, ACE_Null_Mutex > | AddrLinkMap |
Map Type: (key) PriorityKey to (value) TcpDataLink_rch. | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
Private Member Functions | |
virtual AcceptConnectResult | connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual AcceptConnectResult | accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual void | stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id) |
virtual bool | configure_i (TransportInst *config) |
virtual void | shutdown_i () |
virtual void | pre_shutdown_i () |
virtual bool | connection_info_i (TransportLocator &local_info) const |
virtual void | release_datalink (DataLink *link) |
Called by the DataLink to release itself. | |
virtual std::string | transport_type () const |
void | async_connect_failed (const PriorityKey &key) |
void | passive_connection (const ACE_INET_Addr &remote_address, const TcpConnection_rch &connection) |
bool | find_datalink_i (const PriorityKey &key, TcpDataLink_rch &link, TransportClient *client, const RepoId &remote_id) |
int | connect_tcp_datalink (const TcpDataLink_rch &link, const TcpConnection_rch &connection) |
Common code used by accept_datalink(), passive_connection(), and active completion. | |
PriorityKey | blob_to_key (const TransportBLOB &remote, Priority priority, bool active) |
typedef | OPENDDS_MAP (PriorityKey, TcpDataLink_rch) LinkMap |
typedef | OPENDDS_MAP (PriorityKey, TcpConnection_rch) ConnectionMap |
Private Attributes | |
TcpAcceptor * | acceptor_ |
Used to accept passive connections on our local_address_. | |
ACE_Connector< TcpConnection, ACE_SOCK_Connector > | connector_ |
Open TcpConnections using non-blocking connect. | |
TcpInst_rch | tcp_config_ |
Our configuration object, supplied to us in config_i(). | |
AddrLinkMap | links_ |
This is the map of connected DataLinks. | |
AddrLinkMap | pending_release_links_ |
LockType | links_lock_ |
This lock is used to protect the links_ data member. | |
ConnectionMap | connections_ |
LockType | connections_lock_ |
TransportReactorTask_rch | reactor_task_ |
We need the reactor for our Acceptor. | |
TcpConnectionReplaceTask * | con_checker_ |
Friends | |
class | TcpConnection |
class | TcpDataLink |
Notes about object ownership: 1) Own the datalink objects, passive connection objects, acceptor object and TcpConnectionReplaceTask object(used during reconnecting). 2) Reference to TransportReactorTask object owned by base class.
Definition at line 44 of file TcpTransport.h.
typedef ACE_Hash_Map_Manager_Ex<PriorityKey, TcpDataLink_rch, ACE_Hash<PriorityKey>, ACE_Equal_To<PriorityKey>, ACE_Null_Mutex> OpenDDS::DCPS::TcpTransport::AddrLinkMap [private] |
Map Type: (key) PriorityKey to (value) TcpDataLink_rch.
Definition at line 115 of file TcpTransport.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::TcpTransport::ConditionType [private] |
Definition at line 122 of file TcpTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::TcpTransport::GuardType [private] |
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TcpTransport::LockType [private] |
OpenDDS::DCPS::TcpTransport::TcpTransport | ( | const TransportInst_rch & | inst | ) | [explicit] |
Definition at line 32 of file TcpTransport.cpp.
References acceptor_, con_checker_, OpenDDS::DCPS::TransportImpl::configure(), DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().
00033 : acceptor_(new TcpAcceptor(this)), 00034 con_checker_(new TcpConnectionReplaceTask(this)) 00035 { 00036 DBG_ENTRY_LVL("TcpTransport","TcpTransport",6); 00037 00038 if (!inst.is_nil()) { 00039 if (!configure(inst.in())) { 00040 delete con_checker_; 00041 delete acceptor_; 00042 throw Transport::UnableToCreate(); 00043 } 00044 } 00045 }
OpenDDS::DCPS::TcpTransport::~TcpTransport | ( | ) | [virtual] |
Definition at line 47 of file TcpTransport.cpp.
References acceptor_, OpenDDS::DCPS::QueueTaskBase< T >::close(), con_checker_, and DBG_ENTRY_LVL.
00048 { 00049 DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6); 00050 delete acceptor_; 00051 00052 con_checker_->close(1); // This could potentially fix a race condition 00053 delete con_checker_; 00054 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::TcpTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [private, virtual] |
Definition at line 231 of file TcpTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), OpenDDS::DCPS::PriorityKey::address(), blob_to_key(), connect_tcp_datalink(), connections_, connections_lock_, find_datalink_i(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PriorityKey::is_active(), OpenDDS::DCPS::PriorityKey::is_loopback(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, OpenDDS::DCPS::PriorityKey::priority(), OpenDDS::DCPS::TransportClient::repo_id_, TcpDataLink, and VDBG_LVL.
00234 { 00235 GuidConverter remote_conv(remote.repo_id_); 00236 GuidConverter local_conv(attribs.local_id_); 00237 00238 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C " 00239 "accepting connection from remote %C\n", 00240 std::string(local_conv).c_str(), 00241 std::string(remote_conv).c_str()), 5); 00242 00243 GuardType guard(connections_lock_); 00244 const PriorityKey key = 00245 blob_to_key(remote.blob_, attribs.priority_, false /* !active */); 00246 00247 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey " 00248 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", attribs.priority_, 00249 key.address().get_host_addr(), key.address().get_port_number(), 00250 key.is_loopback(), key.is_active()), 2); 00251 00252 TcpDataLink_rch link; 00253 { 00254 GuardType guard(links_lock_); 00255 00256 if (find_datalink_i(key, link, client, remote.repo_id_)) { 00257 return link.is_nil() 00258 ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS) 00259 : AcceptConnectResult(link._retn()); 00260 00261 } else { 00262 link = new TcpDataLink(key.address(), this, key.priority(), 00263 key.is_loopback(), key.is_active()); 00264 00265 if (links_.bind(key, link) != 0 /*OK*/) { 00266 ACE_ERROR((LM_ERROR, 00267 "(%P|%t) ERROR: TcpTransport::accept_datalink " 00268 "Unable to bind new TcpDataLink to " 00269 "TcpTransport in links_ map.\n")); 00270 return AcceptConnectResult(); 00271 } 00272 } 00273 } 00274 00275 TcpConnection_rch connection; 00276 const ConnectionMap::iterator iter = connections_.find(key); 00277 00278 if (iter != connections_.end()) { 00279 connection = iter->second; 00280 connections_.erase(iter); 00281 } 00282 00283 if (connection.is_nil()) { 00284 if (!link->add_on_start_callback(client, remote.repo_id_)) { 00285 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink " 00286 "got started link %@.\n", link.in()), 0); 00287 return AcceptConnectResult(link._retn()); 00288 } 00289 00290 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink " 00291 "no existing TcpConnection.\n"), 0); 00292 00293 add_pending_connection(client, link.in()); 00294 00295 // no link ready, passive_connection will complete later 00296 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00297 } 00298 00299 guard.release(); // connect_tcp_datalink() isn't called with connections_lock_ 00300 00301 if (connect_tcp_datalink(link, connection) == -1) { 00302 GuardType guard(links_lock_); 00303 links_.unbind(key); 00304 link = 0; 00305 } 00306 00307 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink " 00308 "connected link %@.\n", link.in()), 2); 00309 return AcceptConnectResult(link._retn()); 00310 }
void OpenDDS::DCPS::TcpTransport::async_connect_failed | ( | const PriorityKey & | key | ) | [private] |
Definition at line 169 of file TcpTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::in(), links_, and links_lock_.
00170 { 00171 00172 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Failed to make active connection.\n")); 00173 GuardType guard(links_lock_); 00174 TcpDataLink_rch link; 00175 links_.find(key, link); 00176 links_.unbind(key); 00177 guard.release(); 00178 00179 if (link.in()) { 00180 link->invoke_on_start_callbacks(false); 00181 } 00182 00183 }
PriorityKey OpenDDS::DCPS::TcpTransport::blob_to_key | ( | const TransportBLOB & | remote, | |
Priority | priority, | |||
bool | active | |||
) | [private] |
Definition at line 57 of file TcpTransport.cpp.
References OpenDDS::DCPS::AssociationData::get_remote_address(), and tcp_config_.
Referenced by accept_datalink(), and connect_datalink().
00060 { 00061 const ACE_INET_Addr remote_address = 00062 AssociationData::get_remote_address(remote); 00063 const bool is_loopback = remote_address == tcp_config_->local_address(); 00064 return PriorityKey(priority, remote_address, is_loopback, active); 00065 }
bool OpenDDS::DCPS::TcpTransport::configure_i | ( | TransportInst * | config | ) | [private, virtual] |
Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 334 of file TcpTransport.cpp.
References OpenDDS::DCPS::RcObject< T >::_add_ref(), OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::TransportImpl::config(), connector_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), DBG_ENTRY_LVL, OpenDDS::DCPS::get_fully_qualified_hostname(), OpenDDS::DCPS::TcpInst::local_address(), OPENDDS_STRING, OpenDDS::DCPS::TransportImpl::reactor_task(), reactor_task_, tcp_config_, TheServiceParticipant, and VDBG_LVL.
00335 { 00336 DBG_ENTRY_LVL("TcpTransport", "configure_i", 6); 00337 00338 // Downcast the config argument to a TcpInst* 00339 TcpInst* tcp_config = 00340 static_cast<TcpInst*>(config); 00341 00342 if (tcp_config == 0) { 00343 // The downcast failed. 00344 ACE_ERROR_RETURN((LM_ERROR, 00345 "(%P|%t) ERROR: Failed downcast from TransportInst " 00346 "to TcpInst.\n"), 00347 false); 00348 } 00349 00350 this->create_reactor_task(); 00351 00352 // Ask our base class for a "copy" of the reference to the reactor task. 00353 this->reactor_task_ = reactor_task(); 00354 00355 connector_.open(reactor_task_->get_reactor()); 00356 00357 // Make a "copy" of the reference for ourselves. 00358 tcp_config->_add_ref(); 00359 this->tcp_config_ = tcp_config; 00360 00361 // Open the reconnect task 00362 if (this->con_checker_->open()) { 00363 ACE_ERROR_RETURN((LM_ERROR, 00364 ACE_TEXT("(%P|%t) ERROR: connection checker failed to open : %p\n"), 00365 ACE_TEXT("open")), 00366 false); 00367 } 00368 00369 // Override with DCPSDefaultAddress. 00370 if (this->tcp_config_->local_address() == ACE_INET_Addr () && 00371 !TheServiceParticipant->default_address ().empty ()) { 00372 this->tcp_config_->local_address(0, TheServiceParticipant->default_address ().c_str ()); 00373 } 00374 00375 // Open our acceptor object so that we can accept passive connections 00376 // on our this->tcp_config_->local_address_. 00377 00378 if (this->acceptor_->open(this->tcp_config_->local_address(), 00379 this->reactor_task_->get_reactor()) != 0) { 00380 // Remember to drop our reference to the tcp_config_ object since 00381 // we are about to return -1 here, which means we are supposed to 00382 // keep a copy after all. 00383 TcpInst_rch cfg = this->tcp_config_._retn(); 00384 00385 ACE_ERROR_RETURN((LM_ERROR, 00386 ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C:%d: %p\n"), 00387 cfg->local_address().get_host_addr(), 00388 cfg->local_address().get_port_number(), 00389 ACE_TEXT("open")), 00390 false); 00391 } 00392 00393 // update the port number (incase port zero was given). 00394 ACE_INET_Addr address; 00395 00396 if (this->acceptor_->acceptor().get_local_addr(address) != 0) { 00397 ACE_ERROR((LM_ERROR, 00398 ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ") 00399 ACE_TEXT("- %p"), 00400 ACE_TEXT("cannot get local addr\n"))); 00401 } 00402 00403 OPENDDS_STRING listening_addr(address.get_host_addr()); 00404 VDBG_LVL((LM_DEBUG, 00405 ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C:%hu\n"), 00406 listening_addr.c_str(), address.get_port_number()), 2); 00407 00408 unsigned short port = address.get_port_number(); 00409 00410 // As default, the acceptor will be listening on INADDR_ANY but advertise with the fully 00411 // qualified hostname and actual listening port number. 00412 if (tcp_config_->local_address().is_any()) { 00413 std::string hostname = get_fully_qualified_hostname(); 00414 00415 this->tcp_config_->local_address(port, hostname.c_str()); 00416 } 00417 00418 // Now we got the actual listening port. Update the port number in the configuration 00419 // if it's 0 originally. 00420 else if (tcp_config_->local_address().get_port_number() == 0) { 00421 tcp_config_->local_address_set_port(port); 00422 } 00423 00424 // Ahhh... The sweet smell of success! 00425 return true; 00426 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::TcpTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [private, virtual] |
Definition at line 68 of file TcpTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), OpenDDS::DCPS::PriorityKey::address(), blob_to_key(), connections_lock_, connector_, DBG_ENTRY_LVL, find_datalink_i(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PriorityKey::is_active(), OpenDDS::DCPS::PriorityKey::is_loopback(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, OpenDDS::DCPS::PriorityKey::priority(), OpenDDS::DCPS::TransportClient::repo_id_, tcp_config_, TcpConnection, TcpDataLink, and VDBG_LVL.
00071 { 00072 DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6); 00073 00074 const PriorityKey key = 00075 blob_to_key(remote.blob_, attribs.priority_, true /*active*/); 00076 00077 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey " 00078 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", 00079 key.priority(), key.address().get_host_addr(), 00080 key.address().get_port_number(), key.is_loopback(), 00081 key.is_active()), 0); 00082 00083 TcpDataLink_rch link; 00084 { 00085 GuardType guard(links_lock_); 00086 00087 if (find_datalink_i(key, link, client, remote.repo_id_)) { 00088 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0); 00089 return link.is_nil() 00090 ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS) 00091 : AcceptConnectResult(link._retn()); 00092 } 00093 00094 link = new TcpDataLink(key.address(), this, attribs.priority_, 00095 key.is_loopback(), true /*active*/); 00096 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0); 00097 if (links_.bind(key, link) != 0 /*OK*/) { 00098 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink " 00099 "Unable to bind new TcpDataLink[%@] to " 00100 "TcpTransport in links_ map.\n", link.in())); 00101 return AcceptConnectResult(); 00102 } 00103 } 00104 00105 TcpConnection_rch connection = 00106 new TcpConnection(key.address(), link->transport_priority(), tcp_config_); 00107 connection->set_datalink(link.in()); 00108 00109 TcpConnection* pConn = connection.in(); 00110 TcpConnection_rch reactor_refcount(connection); // increment for reactor callback 00111 00112 ACE_TCHAR str[64]; 00113 key.address().addr_to_string(str,sizeof(str)/sizeof(str[0])); 00114 00115 // Can't make this call while holding onto TransportClient::lock_ 00116 const int ret = 00117 connector_.connect(pConn, key.address(), ACE_Synch_Options::asynch); 00118 00119 if (ret == -1 && errno != EWOULDBLOCK) { 00120 00121 VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2); 00122 ACE_DEBUG((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n")); 00123 //If the connection fails and, in the interim between releasing 00124 //lock and re-acquiring to remove the failed link, another association may have found 00125 //the datalink in links_ (always using find_datalink_i) so must allow the other 00126 //association to either try to connect again (might succeed for it) 00127 //or try another transport. If find_datalink_i was called for this datalink, an 00128 //on_start_callback will be registered and can be invoked. 00129 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0); 00130 { 00131 GuardType guard(links_lock_); 00132 if (links_.unbind(key, link) != 0 /*OK*/) { 00133 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink " 00134 "Unable to unbind failed TcpDataLink[%@] from " 00135 "TcpTransport links_ map.\n", link.in())); 00136 } 00137 } 00138 link->invoke_on_start_callbacks(false); 00139 00140 return AcceptConnectResult(); 00141 } 00142 00143 // Don't decrement count when reactor_refcount goes out of scope, see 00144 // TcpConnection::open() 00145 (void) reactor_refcount._retn(); 00146 00147 if (ret == 0) { 00148 // connect() completed synchronously and called TcpConnection::active_open(). 00149 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink " 00150 "completed synchronously.\n"), 0); 00151 return AcceptConnectResult(link._retn()); 00152 } 00153 00154 if (!link->add_on_start_callback(client, remote.repo_id_)) { 00155 // link was started by the reactor thread before we could add a callback 00156 00157 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink got link.\n"), 0); 00158 return AcceptConnectResult(link._retn()); 00159 } 00160 00161 GuardType connections_guard(connections_lock_); 00162 00163 add_pending_connection(client, link.in()); 00164 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0); 00165 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00166 }
int OpenDDS::DCPS::TcpTransport::connect_tcp_datalink | ( | const TcpDataLink_rch & | link, | |
const TcpConnection_rch & | connection | |||
) | [private] |
Common code used by accept_datalink(), passive_connection(), and active completion.
Code common to make_active_connection() and make_passive_connection().
Definition at line 695 of file TcpTransport.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::DCPS::TransportImpl::last_link_.
Referenced by accept_datalink(), and passive_connection().
00697 { 00698 DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6); 00699 00700 if (link->reuse_existing_connection(connection) == 0) { 00701 return 0; 00702 } 00703 00704 ++last_link_; 00705 00706 if (DCPS_debug_level > 4) { 00707 ACE_DEBUG((LM_DEBUG, 00708 ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ") 00709 ACE_TEXT("creating send strategy with priority %d.\n"), 00710 last_link_, link->transport_priority())); 00711 } 00712 00713 connection->id() = last_link_; 00714 00715 TransportSendStrategy_rch send_strategy = 00716 new TcpSendStrategy(last_link_, link, this->tcp_config_, connection, 00717 new TcpSynchResource(connection, 00718 this->tcp_config_->max_output_pause_period_), 00719 this->reactor_task_, link->transport_priority()); 00720 00721 TransportStrategy_rch receive_strategy = 00722 new TcpReceiveStrategy(link, connection, this->reactor_task_); 00723 00724 if (link->connect(connection, send_strategy, receive_strategy) != 0) { 00725 return -1; 00726 } 00727 00728 return 0; 00729 }
bool OpenDDS::DCPS::TcpTransport::connection_info_i | ( | TransportLocator & | local_info | ) | const [private, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 497 of file TcpTransport.cpp.
References DBG_ENTRY_LVL, tcp_config_, and VDBG_LVL.
00498 { 00499 DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6); 00500 00501 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address str %C\n", 00502 this->tcp_config_->get_public_address().c_str()), 2); 00503 00504 this->tcp_config_->populate_locator(local_info); 00505 00506 return true; 00507 }
bool OpenDDS::DCPS::TcpTransport::find_datalink_i | ( | const PriorityKey & | key, | |
TcpDataLink_rch & | link, | |||
TransportClient * | client, | |||
const RepoId & | remote_id | |||
) | [private] |
Definition at line 187 of file TcpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::add_pending_connection(), DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), links_, pending_release_links_, and VDBG_LVL.
Referenced by accept_datalink(), and connect_datalink().
00189 { 00190 DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6); 00191 00192 if (links_.find(key, link) == 0 /*OK*/) { 00193 if (!link->add_on_start_callback(client, remote_id)) { 00194 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00195 ACE_TEXT("link[%@] found, already started.\n"), link.in()), 0); 00196 // Since the link was already started, we won't get an "on start" 00197 // callback, and the link is immediately usable. 00198 return true; 00199 } 00200 00201 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00202 ACE_TEXT("link[%@] found, add to pending connections.\n"), link.in()), 0); 00203 add_pending_connection(client, link.in()); 00204 link = 0; // don't return link to TransportClient 00205 return true; 00206 00207 } else if (pending_release_links_.find(key, link) == 0 /*OK*/) { 00208 if (link->cancel_release()) { 00209 link->set_release_pending(false); 00210 00211 if (pending_release_links_.unbind(key, link) == 0 /*OK*/ 00212 && links_.bind(key, link) == 0 /*OK*/) { 00213 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00214 ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0); 00215 return true; 00216 } 00217 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00218 ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0); 00219 } else { 00220 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00221 ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0); 00222 } 00223 link = 0; // don't return link to TransportClient 00224 return false; 00225 } 00226 00227 return false; 00228 }
int OpenDDS::DCPS::TcpTransport::fresh_link | ( | TcpConnection_rch | connection | ) |
This function is called by the TcpReconnectTask thread to check if the passively accepted connection is the re-established connection. If it is, then the "old" connection object in the datalink is replaced by the "new" connection object.
Definition at line 735 of file TcpTransport.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), and tcp_config_.
Referenced by OpenDDS::DCPS::TcpConnectionReplaceTask::execute().
00736 { 00737 DBG_ENTRY_LVL("TcpTransport","fresh_link",6); 00738 00739 TcpDataLink_rch link; 00740 GuardType guard(this->links_lock_); 00741 00742 PriorityKey key(connection->transport_priority(), 00743 connection->get_remote_address(), 00744 connection->get_remote_address() == this->tcp_config_->local_address(), 00745 connection->is_connector()); 00746 00747 if (this->links_.find(key, link) == 0) { 00748 TcpConnection_rch old_con = link->get_connection(); 00749 00750 // The connection is accepted but may not be associated with the datalink 00751 // at this point. The thread calling add_associations() will associate 00752 // the datalink with the connection in make_passive_connection(). 00753 if (old_con.is_nil()) { 00754 return 0; 00755 } 00756 00757 if (old_con.in() != connection.in()) 00758 // Replace the "old" connection object with the "new" connection object. 00759 { 00760 return link->reconnect(connection.in()); 00761 } 00762 } 00763 00764 return 0; 00765 }
TcpInst * OpenDDS::DCPS::TcpTransport::get_configuration | ( | ) |
Definition at line 622 of file TcpTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::in(), and tcp_config_.
00623 { 00624 return this->tcp_config_.in(); 00625 }
typedef OpenDDS::DCPS::TcpTransport::OPENDDS_MAP | ( | PriorityKey | , | |
TcpConnection_rch | ||||
) | [private] |
typedef OpenDDS::DCPS::TcpTransport::OPENDDS_MAP | ( | PriorityKey | , | |
TcpDataLink_rch | ||||
) | [private] |
void OpenDDS::DCPS::TcpTransport::passive_connection | ( | const ACE_INET_Addr & | remote_address, | |
const TcpConnection_rch & | connection | |||
) | [private] |
Called by the TcpConnection object when it has been created by the acceptor and needs to be attached to a DataLink. The DataLink may or may not already be created and waiting for this passive connection to appear.
Definition at line 633 of file TcpTransport.cpp.
References OpenDDS::DCPS::QueueTaskBase< T >::add(), con_checker_, connect_tcp_datalink(), connections_, connections_lock_, DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, tcp_config_, and VDBG_LVL.
00635 { 00636 DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6); 00637 00638 const PriorityKey key(connection->transport_priority(), 00639 remote_address, 00640 remote_address == tcp_config_->local_address(), 00641 connection->is_connector()); 00642 00643 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ") 00644 ACE_TEXT("established with %C:%d.\n"), 00645 remote_address.get_host_name(), 00646 remote_address.get_port_number()), 2); 00647 00648 GuardType connection_guard(connections_lock_); 00649 TcpDataLink_rch link; 00650 { 00651 GuardType guard(links_lock_); 00652 links_.find(key, link); 00653 } 00654 00655 if (!link.is_nil()) { 00656 connection_guard.release(); 00657 00658 if (connect_tcp_datalink(link, connection) == -1) { 00659 VDBG_LVL((LM_ERROR, 00660 ACE_TEXT("(%P|%t) ERROR: connect_tcp_datalink failed\n")), 5); 00661 GuardType guard(links_lock_); 00662 links_.unbind(key); 00663 00664 } else { 00665 con_checker_->add(connection); 00666 } 00667 00668 return; 00669 } 00670 00671 // If we reach this point, this link was not in links_, so the 00672 // accept_datalink() call hasn't happened yet. Store in connections_ for the 00673 // accept_datalink() method to find. 00674 VDBG_LVL((LM_DEBUG, "(%P|%t) # of bef connections: %d\n", connections_.size()), 5); 00675 const ConnectionMap::iterator where = connections_.find(key); 00676 00677 if (where != connections_.end()) { 00678 ACE_ERROR((LM_ERROR, 00679 ACE_TEXT("(%P|%t) ERROR: TcpTransport::passive_connection() - ") 00680 ACE_TEXT("connection with %C:%d at priority %d already exists, ") 00681 ACE_TEXT("overwriting previously established connection.\n"), 00682 remote_address.get_host_name(), 00683 remote_address.get_port_number(), 00684 connection->transport_priority())); 00685 } 00686 00687 connections_[key] = connection; 00688 VDBG_LVL((LM_DEBUG, "(%P|%t) # of after connections: %d\n", connections_.size()), 5); 00689 00690 con_checker_->add(connection); 00691 }
void OpenDDS::DCPS::TcpTransport::pre_shutdown_i | ( | ) | [private, virtual] |
Called before transport is shutdown to let the concrete transport to do anything necessary.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 429 of file TcpTransport.cpp.
References DBG_ENTRY_LVL.
00430 { 00431 DBG_ENTRY_LVL("TcpTransport","pre_shutdown_i",6); 00432 00433 GuardType guard(this->links_lock_); 00434 00435 AddrLinkMap::ENTRY* entry; 00436 00437 for (AddrLinkMap::ITERATOR itr(this->links_); 00438 itr.next(entry); 00439 itr.advance()) { 00440 entry->int_id_->pre_stop_i(); 00441 } 00442 }
void OpenDDS::DCPS::TcpTransport::release_datalink | ( | DataLink * | link | ) | [private, virtual] |
Called by the DataLink to release itself.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 510 of file TcpTransport.cpp.
References OpenDDS::DCPS::DataLink::datalink_release_delay(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataLink::is_loopback(), OpenDDS::DCPS::TcpDataLink::remote_address(), OpenDDS::DCPS::DataLink::schedule_delayed_release(), OpenDDS::DCPS::DataLink::schedule_stop(), OpenDDS::DCPS::DataLink::set_scheduling_release(), OpenDDS::DCPS::DataLink::transport_priority(), and VDBG_LVL.
00511 { 00512 DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6); 00513 00514 TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link); 00515 00516 if (tcp_link == 0) { 00517 // Really an assertion failure 00518 ACE_ERROR((LM_ERROR, 00519 "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to " 00520 "TcpDataLink.\n")); 00521 return; 00522 } 00523 00524 TcpDataLink_rch released_link; 00525 00526 // Possible actions that will be taken to release the link. 00527 enum LinkAction { None, StopLink, ScheduleLinkRelease }; 00528 LinkAction linkAction = None; 00529 00530 // Scope for locking to protect the links (and pending_release) containers. 00531 GuardType guard(this->links_lock_); 00532 00533 // Attempt to remove the TcpDataLink from our links_ map. 00534 PriorityKey key( 00535 tcp_link->transport_priority(), 00536 tcp_link->remote_address(), 00537 tcp_link->is_loopback(), 00538 tcp_link->is_active()); 00539 00540 VDBG_LVL((LM_DEBUG, 00541 "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey " 00542 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", 00543 link, 00544 tcp_link->transport_priority(), 00545 tcp_link->remote_address().get_host_addr(), 00546 tcp_link->remote_address().get_port_number(), 00547 (int)tcp_link->is_loopback(), 00548 (int)tcp_link->is_active()), 2); 00549 00550 if (this->links_.unbind(key, released_link) != 0) { 00551 //No op 00552 } else if (link->datalink_release_delay() > ACE_Time_Value::zero) { 00553 link->set_scheduling_release(true); 00554 00555 VDBG_LVL((LM_DEBUG, 00556 "(%P|%t) TcpTransport::release_datalink datalink_release_delay " 00557 "is %: sec %d usec\n", 00558 link->datalink_release_delay().sec(), 00559 link->datalink_release_delay().usec()), 4); 00560 00561 // Atomic value update, safe to perform here. 00562 released_link->set_release_pending(true); 00563 00564 switch (this->pending_release_links_.bind(key, released_link)) { 00565 case -1: 00566 ACE_ERROR((LM_ERROR, 00567 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to " 00568 "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind"))); 00569 linkAction = StopLink; 00570 break; 00571 00572 case 1: 00573 ACE_ERROR((LM_ERROR, 00574 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to " 00575 "pending_release_links_ map: already bound\n", released_link.in())); 00576 linkAction = StopLink; 00577 break; 00578 00579 case 0: 00580 linkAction = ScheduleLinkRelease; 00581 break; 00582 00583 default: 00584 break; 00585 } 00586 00587 } else { // datalink_release_delay_ is 0 00588 link->set_scheduling_release(true); 00589 00590 linkAction = StopLink; 00591 } 00592 00593 // Actions are executed outside of the lock scope. 00594 ACE_Time_Value cancel_now = ACE_OS::gettimeofday(); 00595 switch (linkAction) { 00596 case StopLink: 00597 link->schedule_stop(cancel_now); 00598 break; 00599 00600 case ScheduleLinkRelease: 00601 00602 link->schedule_delayed_release(); 00603 break; 00604 00605 case None: 00606 break; 00607 } 00608 00609 if (DCPS_debug_level > 9) { 00610 std::stringstream buffer; 00611 buffer << *link; 00612 ACE_DEBUG((LM_DEBUG, 00613 ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ") 00614 ACE_TEXT("link[%@] with priority %d released.\n%C"), 00615 link, 00616 link->transport_priority(), 00617 buffer.str().c_str())); 00618 } 00619 }
void OpenDDS::DCPS::TcpTransport::shutdown_i | ( | ) | [private, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 445 of file TcpTransport.cpp.
References acceptor_, OpenDDS::DCPS::QueueTaskBase< T >::close(), con_checker_, connections_, DBG_ENTRY_LVL, links_, pending_release_links_, reactor_task_, tcp_config_, and OpenDDS::DCPS::TcpAcceptor::transport_shutdown().
00446 { 00447 DBG_ENTRY_LVL("TcpTransport","shutdown_i",6); 00448 00449 // Don't accept any more connections. 00450 this->acceptor_->close(); 00451 this->acceptor_->transport_shutdown(); 00452 00453 this->con_checker_->close(1); 00454 00455 { 00456 GuardType guard(this->connections_lock_); 00457 00458 this->connections_.clear(); 00459 this->pending_connections_.clear(); 00460 } 00461 00462 // Disconnect all of our DataLinks, and clear our links_ collection. 00463 { 00464 GuardType guard(this->links_lock_); 00465 00466 AddrLinkMap::ENTRY* entry; 00467 00468 for (AddrLinkMap::ITERATOR itr(this->links_); 00469 itr.next(entry); 00470 itr.advance()) { 00471 entry->int_id_->transport_shutdown(); 00472 } 00473 00474 this->links_.unbind_all(); 00475 00476 for (AddrLinkMap::ITERATOR itr(this->pending_release_links_); 00477 itr.next(entry); 00478 itr.advance()) { 00479 entry->int_id_->transport_shutdown(); 00480 } 00481 00482 this->pending_release_links_.unbind_all(); 00483 } 00484 00485 // Drop our reference to the TcpInst object. 00486 this->tcp_config_ = 0; 00487 00488 // Drop our reference to the TransportReactorTask 00489 this->reactor_task_ = 0; 00490 00491 // Tell our acceptor about this event so that it can drop its reference 00492 // it holds to this TcpTransport object (via smart-pointer). 00493 this->acceptor_->transport_shutdown(); 00494 }
void OpenDDS::DCPS::TcpTransport::stop_accepting_or_connecting | ( | TransportClient * | client, | |
const RepoId & | remote_id | |||
) | [private, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 313 of file TcpTransport.cpp.
References connections_lock_, and VDBG_LVL.
00315 { 00316 GuidConverter remote_converted(remote_id); 00317 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting " 00318 "stop connecting to remote: %C\n", 00319 std::string(remote_converted).c_str()), 5); 00320 00321 GuardType guard(connections_lock_); 00322 typedef std::multimap<TransportClient*, DataLink_rch>::iterator iter_t; 00323 const std::pair<iter_t, iter_t> range = 00324 pending_connections_.equal_range(client); 00325 00326 for (iter_t iter = range.first; iter != range.second; ++iter) { 00327 iter->second->remove_on_start_callback(client, remote_id); 00328 } 00329 00330 pending_connections_.erase(range.first, range.second); 00331 }
virtual std::string OpenDDS::DCPS::TcpTransport::transport_type | ( | ) | const [inline, private, virtual] |
void OpenDDS::DCPS::TcpTransport::unbind_link | ( | DataLink * | link | ) | [virtual] |
Remove any pending_release mappings.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 768 of file TcpTransport.cpp.
References OpenDDS::DCPS::DataLink::datalink_release_delay(), OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataLink::is_loopback(), OpenDDS::DCPS::TcpDataLink::remote_address(), OpenDDS::DCPS::DataLink::transport_priority(), and VDBG_LVL.
00769 { 00770 TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link); 00771 00772 if (tcp_link == 0) { 00773 // Really an assertion failure 00774 ACE_ERROR((LM_ERROR, 00775 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - " 00776 "Failed to downcast DataLink to TcpDataLink.\n")); 00777 return; 00778 } 00779 00780 // Attempt to remove the TcpDataLink from our links_ map. 00781 PriorityKey key( 00782 tcp_link->transport_priority(), 00783 tcp_link->remote_address(), 00784 tcp_link->is_loopback(), 00785 tcp_link->is_active()); 00786 00787 VDBG_LVL((LM_DEBUG, 00788 "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey " 00789 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", 00790 link, 00791 tcp_link->transport_priority(), 00792 tcp_link->remote_address().get_host_addr(), 00793 tcp_link->remote_address().get_port_number(), 00794 (int)tcp_link->is_loopback(), 00795 (int)tcp_link->is_active()), 2); 00796 00797 GuardType guard(this->links_lock_); 00798 00799 if (this->pending_release_links_.unbind(key) != 0 && 00800 link->datalink_release_delay() > ACE_Time_Value::zero) { 00801 ACE_ERROR((LM_ERROR, 00802 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - " 00803 "Failed to find link %@ tcp_link %@ PriorityKey " 00804 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", 00805 link, 00806 tcp_link, 00807 tcp_link->transport_priority(), 00808 tcp_link->remote_address().get_host_addr(), 00809 tcp_link->remote_address().get_port_number(), 00810 (int)tcp_link->is_loopback(), 00811 (int)tcp_link->is_active())); 00812 } 00813 }
friend class TcpConnection [friend] |
The TcpConnection is our friend. It tells us when it has been created (by our acceptor_), and is seeking the DataLink that should be (or will be) expecting the passive connection.
Definition at line 87 of file TcpTransport.h.
Referenced by connect_datalink().
friend class TcpDataLink [friend] |
Definition at line 88 of file TcpTransport.h.
Referenced by accept_datalink(), and connect_datalink().
Used to accept passive connections on our local_address_.
Definition at line 138 of file TcpTransport.h.
Referenced by shutdown_i(), TcpTransport(), and ~TcpTransport().
This task is used to resolve some deadlock situation during reconnecting. TODO: reuse the reconnect_task in the TcpConnection for new connection checking.
Definition at line 168 of file TcpTransport.h.
Referenced by passive_connection(), shutdown_i(), TcpTransport(), and ~TcpTransport().
ConnectionMap OpenDDS::DCPS::TcpTransport::connections_ [private] |
Map of passive connection objects that need to be paired with a DataLink.
Definition at line 155 of file TcpTransport.h.
Referenced by accept_datalink(), passive_connection(), and shutdown_i().
This protects the connections_ and the pending_connections_ data members.
Definition at line 159 of file TcpTransport.h.
Referenced by accept_datalink(), connect_datalink(), passive_connection(), and stop_accepting_or_connecting().
ACE_Connector<TcpConnection, ACE_SOCK_Connector> OpenDDS::DCPS::TcpTransport::connector_ [private] |
Open TcpConnections using non-blocking connect.
Definition at line 141 of file TcpTransport.h.
Referenced by configure_i(), and connect_datalink().
This is the map of connected DataLinks.
Definition at line 147 of file TcpTransport.h.
Referenced by accept_datalink(), async_connect_failed(), connect_datalink(), find_datalink_i(), passive_connection(), and shutdown_i().
This lock is used to protect the links_ data member.
Definition at line 151 of file TcpTransport.h.
Referenced by accept_datalink(), async_connect_failed(), connect_datalink(), and passive_connection().
We need the reactor for our Acceptor.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 162 of file TcpTransport.h.
Referenced by configure_i(), and shutdown_i().
Our configuration object, supplied to us in config_i().
Definition at line 144 of file TcpTransport.h.
Referenced by blob_to_key(), configure_i(), connect_datalink(), connection_info_i(), fresh_link(), get_configuration(), passive_connection(), and shutdown_i().