#include <TcpTransport.h>
This class provides the "Tcp" transport specific implementation. It creates the acceptor for listening the incoming requests using TCP and maintains a collection of TCP specific connections/datalinks.
Notes about object ownership: 1) Own the datalink objects, passive connection objects, acceptor object and TcpConnectionReplaceTask object(used during reconnecting). 2) Reference to TransportReactorTask object owned by base class.
Definition at line 47 of file TcpTransport.h.
typedef ACE_Hash_Map_Manager_Ex<PriorityKey, TcpDataLink_rch, ACE_Hash<PriorityKey>, ACE_Equal_To<PriorityKey>, ACE_Null_Mutex> OpenDDS::DCPS::TcpTransport::AddrLinkMap [private] |
Map Type: (key) PriorityKey to (value) TcpDataLink_rch.
Definition at line 117 of file TcpTransport.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::TcpTransport::ConditionType [private] |
Definition at line 124 of file TcpTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::TcpTransport::GuardType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 123 of file TcpTransport.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TcpTransport::LockType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 122 of file TcpTransport.h.
OpenDDS::DCPS::TcpTransport::TcpTransport | ( | TcpInst & | inst | ) | [explicit] |
Definition at line 35 of file TcpTransport.cpp.
References configure_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportImpl::open(), and OpenDDS::DCPS::TransportImpl::shutdown().
00036 : TransportImpl(inst) 00037 , acceptor_(new TcpAcceptor(this)) 00038 , con_checker_(new TcpConnectionReplaceTask(this)) 00039 { 00040 DBG_ENTRY_LVL("TcpTransport","TcpTransport",6); 00041 00042 if (!(configure_i(inst) && open())) { 00043 this->shutdown(); 00044 throw Transport::UnableToCreate(); 00045 } 00046 00047 }
OpenDDS::DCPS::TcpTransport::~TcpTransport | ( | ) | [virtual] |
Definition at line 49 of file TcpTransport.cpp.
References con_checker_, and DBG_ENTRY_LVL.
00050 { 00051 DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6); 00052 con_checker_->close(1); // This could potentially fix a race condition 00053 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::TcpTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [private, virtual] |
accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 232 of file TcpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::TransportImpl::add_pending_connection(), OpenDDS::DCPS::PriorityKey::address(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), connect_tcp_datalink(), connections_, connections_lock_, find_datalink_i(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PriorityKey::is_active(), OpenDDS::DCPS::PriorityKey::is_loopback(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::PriorityKey::priority(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::ref(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, OpenDDS::DCPS::RcHandle< T >::reset(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), and VDBG_LVL.
00235 { 00236 GuidConverter remote_conv(remote.repo_id_); 00237 GuidConverter local_conv(attribs.local_id_); 00238 00239 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C " 00240 "accepting connection from remote %C\n", 00241 std::string(local_conv).c_str(), 00242 std::string(remote_conv).c_str()), 5); 00243 00244 GuardType guard(connections_lock_); 00245 const PriorityKey key = 00246 blob_to_key(remote.blob_, attribs.priority_, false /* !active */); 00247 00248 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey " 00249 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", attribs.priority_, 00250 key.address().get_host_addr(), key.address().get_port_number(), 00251 key.is_loopback(), key.is_active()), 2); 00252 00253 TcpDataLink_rch link; 00254 { 00255 GuardType guard(links_lock_); 00256 00257 if (find_datalink_i(key, link, client, remote.repo_id_)) { 00258 return link.is_nil() 00259 ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS) 00260 : AcceptConnectResult(link); 00261 00262 } else { 00263 link = make_rch<TcpDataLink>(key.address(), ref(*this), key.priority(), 00264 key.is_loopback(), key.is_active()); 00265 00266 if (links_.bind(key, link) != 0 /*OK*/) { 00267 ACE_ERROR((LM_ERROR, 00268 "(%P|%t) ERROR: TcpTransport::accept_datalink " 00269 "Unable to bind new TcpDataLink to " 00270 "TcpTransport in links_ map.\n")); 00271 return AcceptConnectResult(); 00272 } 00273 } 00274 } 00275 00276 TcpConnection_rch connection; 00277 const ConnectionMap::iterator iter = connections_.find(key); 00278 00279 if (iter != connections_.end()) { 00280 connection = iter->second; 00281 connections_.erase(iter); 00282 } 00283 00284 if (connection.is_nil()) { 00285 if (!link->add_on_start_callback(client, remote.repo_id_)) { 00286 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink " 00287 "got started link %@.\n", link.in()), 0); 00288 return AcceptConnectResult(link); 00289 } 00290 00291 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink " 00292 "no existing TcpConnection.\n"), 0); 00293 00294 add_pending_connection(client, link); 00295 00296 // no link ready, passive_connection will complete later 00297 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00298 } 00299 00300 guard.release(); // connect_tcp_datalink() isn't called with connections_lock_ 00301 00302 if (connect_tcp_datalink(*link, connection) == -1) { 00303 GuardType guard(links_lock_); 00304 links_.unbind(key); 00305 link.reset(); 00306 } 00307 00308 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink " 00309 "connected link %@.\n", link.in()), 2); 00310 return AcceptConnectResult(link); 00311 }
void OpenDDS::DCPS::TcpTransport::async_connect_failed | ( | const PriorityKey & | key | ) | [private] |
Definition at line 170 of file TcpTransport.cpp.
References ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), OpenDDS::DCPS::RcHandle< T >::in(), links_, links_lock_, LM_ERROR, ACE_Guard< ACE_LOCK >::release(), and ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind().
Referenced by OpenDDS::DCPS::TcpConnection::open().
00171 { 00172 00173 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Failed to make active connection.\n")); 00174 GuardType guard(links_lock_); 00175 TcpDataLink_rch link; 00176 links_.find(key, link); 00177 links_.unbind(key); 00178 guard.release(); 00179 00180 if (link.in()) { 00181 link->invoke_on_start_callbacks(false); 00182 } 00183 00184 }
PriorityKey OpenDDS::DCPS::TcpTransport::blob_to_key | ( | const TransportBLOB & | remote, | |
Priority | priority, | |||
bool | active | |||
) | [private] |
Definition at line 63 of file TcpTransport.cpp.
References config(), OpenDDS::DCPS::AssociationData::get_remote_address(), and OpenDDS::DCPS::TcpInst::local_address().
Referenced by accept_datalink(), and connect_datalink().
00066 { 00067 const ACE_INET_Addr remote_address = 00068 AssociationData::get_remote_address(remote); 00069 const bool is_loopback = remote_address == config().local_address(); 00070 return PriorityKey(priority, remote_address, is_loopback, active); 00071 }
TcpInst & OpenDDS::DCPS::TcpTransport::config | ( | ) | const |
Expose the configuration information so others can see what we can do.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 57 of file TcpTransport.cpp.
Referenced by blob_to_key(), connect_datalink(), connect_tcp_datalink(), connection_info_i(), fresh_link(), OpenDDS::DCPS::TcpAcceptor::get_configuration(), and passive_connection().
00058 { 00059 return static_cast<TcpInst&>(TransportImpl::config()); 00060 }
bool OpenDDS::DCPS::TcpTransport::configure_i | ( | TcpInst & | config | ) | [private, virtual] |
Definition at line 335 of file TcpTransport.cpp.
References acceptor_, ACE_TEXT(), con_checker_, connector_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), DBG_ENTRY_LVL, OpenDDS::DCPS::get_fully_qualified_hostname(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), hostname(), ACE_INET_Addr::is_any(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TcpInst::local_address(), OpenDDS::DCPS::TcpInst::local_address_set_port(), ACE_Connector< class, class >::open(), OPENDDS_STRING, OpenDDS::DCPS::TransportImpl::reactor_task(), TheServiceParticipant, and VDBG_LVL.
Referenced by TcpTransport().
00336 { 00337 DBG_ENTRY_LVL("TcpTransport", "configure_i", 6); 00338 00339 this->create_reactor_task(); 00340 00341 connector_.open(reactor_task()->get_reactor()); 00342 00343 // Open the reconnect task 00344 if (this->con_checker_->open()) { 00345 ACE_ERROR_RETURN((LM_ERROR, 00346 ACE_TEXT("(%P|%t) ERROR: connection checker failed to open : %p\n"), 00347 ACE_TEXT("open")), 00348 false); 00349 } 00350 00351 // Override with DCPSDefaultAddress. 00352 if (config.local_address() == ACE_INET_Addr () && 00353 !TheServiceParticipant->default_address ().empty ()) { 00354 config.local_address(0, TheServiceParticipant->default_address ().c_str ()); 00355 } 00356 00357 // Open our acceptor object so that we can accept passive connections 00358 // on our config.local_address_. 00359 00360 if (this->acceptor_->open(config.local_address(), 00361 this->reactor_task()->get_reactor()) != 0) { 00362 00363 ACE_ERROR_RETURN((LM_ERROR, 00364 ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C:%d: %p\n"), 00365 config.local_address().get_host_addr(), 00366 config.local_address().get_port_number(), 00367 ACE_TEXT("open")), 00368 false); 00369 } 00370 00371 // update the port number (incase port zero was given). 00372 ACE_INET_Addr address; 00373 00374 if (this->acceptor_->acceptor().get_local_addr(address) != 0) { 00375 ACE_ERROR((LM_ERROR, 00376 ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ") 00377 ACE_TEXT("- %p"), 00378 ACE_TEXT("cannot get local addr\n"))); 00379 } 00380 00381 OPENDDS_STRING listening_addr(address.get_host_addr()); 00382 VDBG_LVL((LM_DEBUG, 00383 ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C:%hu\n"), 00384 listening_addr.c_str(), address.get_port_number()), 2); 00385 00386 unsigned short port = address.get_port_number(); 00387 00388 // As default, the acceptor will be listening on INADDR_ANY but advertise with the fully 00389 // qualified hostname and actual listening port number. 00390 if (config.local_address().is_any()) { 00391 std::string hostname = get_fully_qualified_hostname(); 00392 00393 config.local_address(port, hostname.c_str()); 00394 } 00395 00396 // Now we got the actual listening port. Update the port number in the configuration 00397 // if it's 0 originally. 00398 else if (config.local_address().get_port_number() == 0) { 00399 config.local_address_set_port(port); 00400 } 00401 00402 // Ahhh... The sweet smell of success! 00403 return true; 00404 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::TcpTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [private, virtual] |
connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 74 of file TcpTransport.cpp.
References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::TransportImpl::add_pending_connection(), ACE_INET_Addr::addr_to_string(), OpenDDS::DCPS::PriorityKey::address(), ACE_Synch_Options::asynch, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), config(), ACE_Connector< class, class >::connect(), connections_lock_, connector_, DBG_ENTRY_LVL, find_datalink_i(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PriorityKey::is_active(), OpenDDS::DCPS::PriorityKey::is_loopback(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::PriorityKey::priority(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::priority_, OpenDDS::DCPS::ref(), OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, str, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), and VDBG_LVL.
00077 { 00078 DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6); 00079 00080 const PriorityKey key = 00081 blob_to_key(remote.blob_, attribs.priority_, true /*active*/); 00082 00083 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey " 00084 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", 00085 key.priority(), key.address().get_host_addr(), 00086 key.address().get_port_number(), key.is_loopback(), 00087 key.is_active()), 0); 00088 00089 TcpDataLink_rch link; 00090 { 00091 GuardType guard(links_lock_); 00092 00093 if (find_datalink_i(key, link, client, remote.repo_id_)) { 00094 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0); 00095 return link.is_nil() 00096 ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS) 00097 : AcceptConnectResult(link); 00098 } 00099 00100 link = make_rch<TcpDataLink>(key.address(), ref(*this), attribs.priority_, 00101 key.is_loopback(), true /*active*/); 00102 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0); 00103 if (links_.bind(key, link) != 0 /*OK*/) { 00104 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink " 00105 "Unable to bind new TcpDataLink[%@] to " 00106 "TcpTransport in links_ map.\n", link.in())); 00107 return AcceptConnectResult(); 00108 } 00109 } 00110 00111 TcpConnection_rch connection( 00112 make_rch<TcpConnection>(key.address(), link->transport_priority(), this->config())); 00113 connection->set_datalink(link); 00114 00115 TcpConnection* pConn = connection.in(); 00116 00117 ACE_TCHAR str[64]; 00118 key.address().addr_to_string(str,sizeof(str)/sizeof(str[0])); 00119 00120 // Can't make this call while holding onto TransportClient::lock_ 00121 const int ret = 00122 connector_.connect(pConn, key.address(), ACE_Synch_Options::asynch); 00123 00124 if (ret == -1 && errno != EWOULDBLOCK) { 00125 00126 VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2); 00127 ACE_ERROR((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n")); 00128 //If the connection fails and, in the interim between releasing 00129 //lock and re-acquiring to remove the failed link, another association may have found 00130 //the datalink in links_ (always using find_datalink_i) so must allow the other 00131 //association to either try to connect again (might succeed for it) 00132 //or try another transport. If find_datalink_i was called for this datalink, an 00133 //on_start_callback will be registered and can be invoked. 00134 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0); 00135 { 00136 GuardType guard(links_lock_); 00137 if (links_.unbind(key, link) != 0 /*OK*/) { 00138 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink " 00139 "Unable to unbind failed TcpDataLink[%@] from " 00140 "TcpTransport links_ map.\n", link.in())); 00141 } 00142 } 00143 link->invoke_on_start_callbacks(false); 00144 00145 return AcceptConnectResult(); 00146 } 00147 00148 if (ret == 0) { 00149 // connect() completed synchronously and called TcpConnection::active_open(). 00150 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink " 00151 "completed synchronously.\n"), 0); 00152 return AcceptConnectResult(link); 00153 } 00154 00155 if (!link->add_on_start_callback(client, remote.repo_id_)) { 00156 // link was started by the reactor thread before we could add a callback 00157 00158 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink got link.\n"), 0); 00159 return AcceptConnectResult(link); 00160 } 00161 00162 GuardType connections_guard(connections_lock_); 00163 00164 add_pending_connection(client, link); 00165 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0); 00166 return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS); 00167 }
int OpenDDS::DCPS::TcpTransport::connect_tcp_datalink | ( | TcpDataLink & | link, | |
const TcpConnection_rch & | connection | |||
) | [private] |
Common code used by accept_datalink(), passive_connection(), and active completion.
Code common to make_active_connection() and make_passive_connection().
Definition at line 658 of file TcpTransport.cpp.
References ACE_TEXT(), config(), OpenDDS::DCPS::TcpDataLink::connect(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportImpl::last_link_, LM_DEBUG, OpenDDS::DCPS::TransportImpl::reactor_task(), OpenDDS::DCPS::ref(), OpenDDS::DCPS::TcpDataLink::reuse_existing_connection(), and OpenDDS::DCPS::DataLink::transport_priority().
Referenced by accept_datalink(), OpenDDS::DCPS::TcpConnection::open(), and passive_connection().
00660 { 00661 DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6); 00662 00663 if (link.reuse_existing_connection(connection) == 0) { 00664 return 0; 00665 } 00666 00667 ++last_link_; 00668 00669 if (DCPS_debug_level > 4) { 00670 ACE_DEBUG((LM_DEBUG, 00671 ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ") 00672 ACE_TEXT("creating send strategy with priority %d.\n"), 00673 last_link_, link.transport_priority())); 00674 } 00675 00676 connection->id() = last_link_; 00677 00678 TcpSendStrategy_rch send_strategy ( 00679 make_rch<TcpSendStrategy>(last_link_, ref(link), 00680 new TcpSynchResource(link, 00681 this->config().max_output_pause_period_), 00682 this->reactor_task(), link.transport_priority())); 00683 00684 TcpReceiveStrategy_rch receive_strategy( 00685 make_rch<TcpReceiveStrategy>(ref(link), this->reactor_task())); 00686 00687 if (link.connect(connection, send_strategy, receive_strategy) != 0) { 00688 return -1; 00689 } 00690 00691 return 0; 00692 }
bool OpenDDS::DCPS::TcpTransport::connection_info_i | ( | TransportLocator & | local_info | ) | const [private, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 466 of file TcpTransport.cpp.
References config(), DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::TcpInst::populate_locator(), and VDBG_LVL.
00467 { 00468 DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6); 00469 00470 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address str %C\n", 00471 this->config().get_public_address().c_str()), 2); 00472 00473 this->config().populate_locator(local_info); 00474 00475 return true; 00476 }
bool OpenDDS::DCPS::TcpTransport::find_datalink_i | ( | const PriorityKey & | key, | |
TcpDataLink_rch & | link, | |||
const TransportClient_rch & | client, | |||
const RepoId & | remote_id | |||
) | [private] |
Definition at line 188 of file TcpTransport.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), DBG_ENTRY_LVL, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), OpenDDS::DCPS::RcHandle< T >::in(), links_, LM_DEBUG, pending_release_links_, OpenDDS::DCPS::RcHandle< T >::reset(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), and VDBG_LVL.
Referenced by accept_datalink(), and connect_datalink().
00190 { 00191 DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6); 00192 00193 if (links_.find(key, link) == 0 /*OK*/) { 00194 if (!link->add_on_start_callback(client, remote_id)) { 00195 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00196 ACE_TEXT("link[%@] found, already started.\n"), link.in()), 0); 00197 // Since the link was already started, we won't get an "on start" 00198 // callback, and the link is immediately usable. 00199 return true; 00200 } 00201 00202 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00203 ACE_TEXT("link[%@] found, add to pending connections.\n"), link.in()), 0); 00204 add_pending_connection(client, link); 00205 link.reset(); // don't return link to TransportClient 00206 return true; 00207 00208 } else if (pending_release_links_.find(key, link) == 0 /*OK*/) { 00209 if (link->cancel_release()) { 00210 link->set_release_pending(false); 00211 00212 if (pending_release_links_.unbind(key, link) == 0 /*OK*/ 00213 && links_.bind(key, link) == 0 /*OK*/) { 00214 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00215 ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0); 00216 return true; 00217 } 00218 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00219 ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0); 00220 } else { 00221 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ") 00222 ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0); 00223 } 00224 link.reset(); // don't return link to TransportClient 00225 return false; 00226 } 00227 00228 return false; 00229 }
int OpenDDS::DCPS::TcpTransport::fresh_link | ( | TcpConnection_rch | connection | ) |
This function is called by the TcpReconnectTask thread to check if the passively accepted connection is the re-established connection. If it is, then the "old" connection object in the datalink is replaced by the "new" connection object.
Definition at line 698 of file TcpTransport.cpp.
References config(), DBG_ENTRY_LVL, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, and OpenDDS::DCPS::TcpInst::local_address().
Referenced by OpenDDS::DCPS::TcpConnectionReplaceTask::execute().
00699 { 00700 DBG_ENTRY_LVL("TcpTransport","fresh_link",6); 00701 00702 TcpDataLink_rch link; 00703 GuardType guard(this->links_lock_); 00704 00705 PriorityKey key(connection->transport_priority(), 00706 connection->get_remote_address(), 00707 connection->get_remote_address() == this->config().local_address(), 00708 connection->is_connector()); 00709 00710 if (this->links_.find(key, link) == 0) { 00711 TcpConnection_rch old_con = link->get_connection(); 00712 00713 // The connection is accepted but may not be associated with the datalink 00714 // at this point. The thread calling add_associations() will associate 00715 // the datalink with the connection in make_passive_connection(). 00716 if (old_con.is_nil()) { 00717 return 0; 00718 } 00719 00720 if (old_con.in() != connection.in()) 00721 // Replace the "old" connection object with the "new" connection object. 00722 { 00723 return link->reconnect(connection); 00724 } 00725 } 00726 00727 return 0; 00728 }
typedef OpenDDS::DCPS::TcpTransport::OPENDDS_MAP | ( | PriorityKey | , | |
TcpConnection_rch | ||||
) | [private] |
typedef OpenDDS::DCPS::TcpTransport::OPENDDS_MAP | ( | PriorityKey | , | |
TcpDataLink_rch | ||||
) | [private] |
void OpenDDS::DCPS::TcpTransport::passive_connection | ( | const ACE_INET_Addr & | remote_address, | |
const TcpConnection_rch & | connection | |||
) | [private] |
Called by the TcpConnection object when it has been created by the acceptor and needs to be attached to a DataLink. The DataLink may or may not already be created and waiting for this passive connection to appear.
This method is called by a TcpConnection object that has been created and opened by our acceptor_ as a result of passively accepting a connection on our local address. Ultimately, the connection object needs to be paired with a DataLink object that is (or will be) expecting this passive connection to be established.
Definition at line 596 of file TcpTransport.cpp.
References ACE_TEXT(), con_checker_, config(), connect_tcp_datalink(), connections_, connections_lock_, DBG_ENTRY_LVL, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::find(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TcpInst::local_address(), ACE_Guard< ACE_LOCK >::release(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), and VDBG_LVL.
Referenced by OpenDDS::DCPS::TcpConnection::handle_setup_input().
00598 { 00599 DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6); 00600 00601 const PriorityKey key(connection->transport_priority(), 00602 remote_address, 00603 remote_address == config().local_address(), 00604 connection->is_connector()); 00605 00606 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ") 00607 ACE_TEXT("established with %C:%d.\n"), 00608 remote_address.get_host_name(), 00609 remote_address.get_port_number()), 2); 00610 00611 GuardType connection_guard(connections_lock_); 00612 TcpDataLink_rch link; 00613 { 00614 GuardType guard(links_lock_); 00615 links_.find(key, link); 00616 } 00617 00618 if (!link.is_nil()) { 00619 connection_guard.release(); 00620 00621 if (connect_tcp_datalink(*link, connection) == -1) { 00622 VDBG_LVL((LM_ERROR, 00623 ACE_TEXT("(%P|%t) ERROR: connect_tcp_datalink failed\n")), 5); 00624 GuardType guard(links_lock_); 00625 links_.unbind(key); 00626 00627 } else { 00628 con_checker_->add(connection); 00629 } 00630 00631 return; 00632 } 00633 00634 // If we reach this point, this link was not in links_, so the 00635 // accept_datalink() call hasn't happened yet. Store in connections_ for the 00636 // accept_datalink() method to find. 00637 VDBG_LVL((LM_DEBUG, "(%P|%t) # of bef connections: %d\n", connections_.size()), 5); 00638 const ConnectionMap::iterator where = connections_.find(key); 00639 00640 if (where != connections_.end()) { 00641 ACE_ERROR((LM_ERROR, 00642 ACE_TEXT("(%P|%t) ERROR: TcpTransport::passive_connection() - ") 00643 ACE_TEXT("connection with %C:%d at priority %d already exists, ") 00644 ACE_TEXT("overwriting previously established connection.\n"), 00645 remote_address.get_host_name(), 00646 remote_address.get_port_number(), 00647 connection->transport_priority())); 00648 } 00649 00650 connections_[key] = connection; 00651 VDBG_LVL((LM_DEBUG, "(%P|%t) # of after connections: %d\n", connections_.size()), 5); 00652 00653 con_checker_->add(connection); 00654 }
void OpenDDS::DCPS::TcpTransport::release_datalink | ( | DataLink * | link | ) | [private, virtual] |
Called by the DataLink to release itself.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 479 of file TcpTransport.cpp.
References ACE_TEXT(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::bind(), OpenDDS::DCPS::DataLink::datalink_release_delay(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), ACE_OS::gettimeofday(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataLink::is_loopback(), links_, links_lock_, LM_DEBUG, LM_ERROR, pending_release_links_, OpenDDS::DCPS::TcpDataLink::remote_address(), OpenDDS::DCPS::DataLink::schedule_delayed_release(), OpenDDS::DCPS::DataLink::schedule_stop(), ACE_Time_Value::sec(), OpenDDS::DCPS::DataLink::set_scheduling_release(), OpenDDS::DCPS::DataLink::transport_priority(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), ACE_Time_Value::usec(), VDBG_LVL, and ACE_Time_Value::zero.
00480 { 00481 DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6); 00482 00483 TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link); 00484 00485 if (tcp_link == 0) { 00486 // Really an assertion failure 00487 ACE_ERROR((LM_ERROR, 00488 "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to " 00489 "TcpDataLink.\n")); 00490 return; 00491 } 00492 00493 TcpDataLink_rch released_link; 00494 00495 // Possible actions that will be taken to release the link. 00496 enum LinkAction { None, StopLink, ScheduleLinkRelease }; 00497 LinkAction linkAction = None; 00498 00499 // Scope for locking to protect the links (and pending_release) containers. 00500 GuardType guard(this->links_lock_); 00501 00502 // Attempt to remove the TcpDataLink from our links_ map. 00503 PriorityKey key( 00504 tcp_link->transport_priority(), 00505 tcp_link->remote_address(), 00506 tcp_link->is_loopback(), 00507 tcp_link->is_active()); 00508 00509 VDBG_LVL((LM_DEBUG, 00510 "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey " 00511 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", 00512 link, 00513 tcp_link->transport_priority(), 00514 tcp_link->remote_address().get_host_addr(), 00515 tcp_link->remote_address().get_port_number(), 00516 (int)tcp_link->is_loopback(), 00517 (int)tcp_link->is_active()), 2); 00518 00519 if (this->links_.unbind(key, released_link) != 0) { 00520 //No op 00521 } else if (link->datalink_release_delay() > ACE_Time_Value::zero) { 00522 link->set_scheduling_release(true); 00523 00524 VDBG_LVL((LM_DEBUG, 00525 "(%P|%t) TcpTransport::release_datalink datalink_release_delay " 00526 "is %: sec %d usec\n", 00527 link->datalink_release_delay().sec(), 00528 link->datalink_release_delay().usec()), 4); 00529 00530 // Atomic value update, safe to perform here. 00531 released_link->set_release_pending(true); 00532 00533 switch (this->pending_release_links_.bind(key, released_link)) { 00534 case -1: 00535 ACE_ERROR((LM_ERROR, 00536 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to " 00537 "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind"))); 00538 linkAction = StopLink; 00539 break; 00540 00541 case 1: 00542 ACE_ERROR((LM_ERROR, 00543 "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to " 00544 "pending_release_links_ map: already bound\n", released_link.in())); 00545 linkAction = StopLink; 00546 break; 00547 00548 case 0: 00549 linkAction = ScheduleLinkRelease; 00550 break; 00551 00552 default: 00553 break; 00554 } 00555 00556 } else { // datalink_release_delay_ is 0 00557 link->set_scheduling_release(true); 00558 00559 linkAction = StopLink; 00560 } 00561 00562 // Actions are executed outside of the lock scope. 00563 ACE_Time_Value cancel_now = ACE_OS::gettimeofday(); 00564 switch (linkAction) { 00565 case StopLink: 00566 link->schedule_stop(cancel_now); 00567 break; 00568 00569 case ScheduleLinkRelease: 00570 00571 link->schedule_delayed_release(); 00572 break; 00573 00574 case None: 00575 break; 00576 } 00577 00578 if (DCPS_debug_level > 9) { 00579 std::stringstream buffer; 00580 buffer << *link; 00581 ACE_DEBUG((LM_DEBUG, 00582 ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ") 00583 ACE_TEXT("link[%@] with priority %d released.\n%C"), 00584 link, 00585 link->transport_priority(), 00586 buffer.str().c_str())); 00587 } 00588 }
void OpenDDS::DCPS::TcpTransport::shutdown_i | ( | ) | [private, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 408 of file TcpTransport.cpp.
References acceptor_, con_checker_, connections_, connections_lock_, DBG_ENTRY_LVL, ACE_Hash_Map_Entry< EXT_ID, INT_ID >::int_id_, links_, links_lock_, OpenDDS::DCPS::TransportImpl::pending_connections_, pending_release_links_, and ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind_all().
00409 { 00410 DBG_ENTRY_LVL("TcpTransport","shutdown_i",6); 00411 00412 { 00413 GuardType guard(this->links_lock_); 00414 00415 AddrLinkMap::ENTRY* entry; 00416 00417 for (AddrLinkMap::ITERATOR itr(this->links_); 00418 itr.next(entry); 00419 itr.advance()) { 00420 entry->int_id_->pre_stop_i(); 00421 } 00422 } 00423 00424 // Don't accept any more connections. 00425 this->acceptor_->close(); 00426 this->acceptor_->transport_shutdown(); 00427 00428 this->con_checker_->close(1); 00429 00430 { 00431 GuardType guard(this->connections_lock_); 00432 00433 this->connections_.clear(); 00434 this->pending_connections_.clear(); 00435 } 00436 00437 // Disconnect all of our DataLinks, and clear our links_ collection. 00438 { 00439 GuardType guard(this->links_lock_); 00440 00441 AddrLinkMap::ENTRY* entry; 00442 00443 for (AddrLinkMap::ITERATOR itr(this->links_); 00444 itr.next(entry); 00445 itr.advance()) { 00446 entry->int_id_->transport_shutdown(); 00447 } 00448 00449 this->links_.unbind_all(); 00450 00451 for (AddrLinkMap::ITERATOR itr(this->pending_release_links_); 00452 itr.next(entry); 00453 itr.advance()) { 00454 entry->int_id_->transport_shutdown(); 00455 } 00456 00457 this->pending_release_links_.unbind_all(); 00458 } 00459 00460 // Tell our acceptor about this event so that it can drop its reference 00461 // it holds to this TcpTransport object (via smart-pointer). 00462 this->acceptor_->transport_shutdown(); 00463 }
void OpenDDS::DCPS::TcpTransport::stop_accepting_or_connecting | ( | const TransportClient_wrch & | client, | |
const RepoId & | remote_id | |||
) | [private, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 314 of file TcpTransport.cpp.
References connections_lock_, LM_DEBUG, OpenDDS::DCPS::TransportImpl::pending_connections_, and VDBG_LVL.
00316 { 00317 GuidConverter remote_converted(remote_id); 00318 VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting " 00319 "stop connecting to remote: %C\n", 00320 std::string(remote_converted).c_str()), 5); 00321 00322 GuardType guard(connections_lock_); 00323 typedef PendConnMap::iterator iter_t; 00324 const std::pair<iter_t, iter_t> range = 00325 pending_connections_.equal_range(client); 00326 00327 for (iter_t iter = range.first; iter != range.second; ++iter) { 00328 iter->second->remove_on_start_callback(client, remote_id); 00329 } 00330 00331 pending_connections_.erase(range.first, range.second); 00332 }
virtual std::string OpenDDS::DCPS::TcpTransport::transport_type | ( | ) | const [inline, private, virtual] |
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 81 of file TcpTransport.h.
void OpenDDS::DCPS::TcpTransport::unbind_link | ( | DataLink * | link | ) | [virtual] |
Remove any pending_release mappings.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 731 of file TcpTransport.cpp.
References OpenDDS::DCPS::DataLink::datalink_release_delay(), ACE_INET_Addr::get_host_addr(), ACE_INET_Addr::get_port_number(), OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataLink::is_loopback(), links_lock_, LM_DEBUG, LM_ERROR, pending_release_links_, OpenDDS::DCPS::TcpDataLink::remote_address(), OpenDDS::DCPS::DataLink::transport_priority(), ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::unbind(), VDBG_LVL, and ACE_Time_Value::zero.
00732 { 00733 TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link); 00734 00735 if (tcp_link == 0) { 00736 // Really an assertion failure 00737 ACE_ERROR((LM_ERROR, 00738 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - " 00739 "Failed to downcast DataLink to TcpDataLink.\n")); 00740 return; 00741 } 00742 00743 // Attempt to remove the TcpDataLink from our links_ map. 00744 PriorityKey key( 00745 tcp_link->transport_priority(), 00746 tcp_link->remote_address(), 00747 tcp_link->is_loopback(), 00748 tcp_link->is_active()); 00749 00750 VDBG_LVL((LM_DEBUG, 00751 "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey " 00752 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", 00753 link, 00754 tcp_link->transport_priority(), 00755 tcp_link->remote_address().get_host_addr(), 00756 tcp_link->remote_address().get_port_number(), 00757 (int)tcp_link->is_loopback(), 00758 (int)tcp_link->is_active()), 2); 00759 00760 GuardType guard(this->links_lock_); 00761 00762 if (this->pending_release_links_.unbind(key) != 0 && 00763 link->datalink_release_delay() > ACE_Time_Value::zero) { 00764 ACE_ERROR((LM_ERROR, 00765 "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - " 00766 "Failed to find link %@ tcp_link %@ PriorityKey " 00767 "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", 00768 link, 00769 tcp_link, 00770 tcp_link->transport_priority(), 00771 tcp_link->remote_address().get_host_addr(), 00772 tcp_link->remote_address().get_port_number(), 00773 (int)tcp_link->is_loopback(), 00774 (int)tcp_link->is_active())); 00775 } 00776 }
friend class TcpConnection [friend] |
The TcpConnection is our friend. It tells us when it has been created (by our acceptor_), and is seeking the DataLink that should be (or will be) expecting the passive connection.
Definition at line 89 of file TcpTransport.h.
friend class TcpDataLink [friend] |
Definition at line 90 of file TcpTransport.h.
Used to accept passive connections on our local_address_.
Definition at line 140 of file TcpTransport.h.
Referenced by configure_i(), and shutdown_i().
This task is used to resolve some deadlock situation during reconnecting. TODO: reuse the reconnect_task in the TcpConnection for new connection checking.
Definition at line 164 of file TcpTransport.h.
Referenced by configure_i(), passive_connection(), shutdown_i(), and ~TcpTransport().
ConnectionMap OpenDDS::DCPS::TcpTransport::connections_ [private] |
Map of passive connection objects that need to be paired with a DataLink.
Definition at line 154 of file TcpTransport.h.
Referenced by accept_datalink(), passive_connection(), and shutdown_i().
This protects the connections_ and the pending_connections_ data members.
Definition at line 158 of file TcpTransport.h.
Referenced by accept_datalink(), connect_datalink(), passive_connection(), shutdown_i(), and stop_accepting_or_connecting().
Open TcpConnections using non-blocking connect.
Definition at line 143 of file TcpTransport.h.
Referenced by configure_i(), and connect_datalink().
This is the map of connected DataLinks.
Definition at line 146 of file TcpTransport.h.
Referenced by accept_datalink(), async_connect_failed(), connect_datalink(), find_datalink_i(), fresh_link(), passive_connection(), release_datalink(), and shutdown_i().
This lock is used to protect the links_ data member.
Definition at line 150 of file TcpTransport.h.
Referenced by accept_datalink(), async_connect_failed(), connect_datalink(), fresh_link(), passive_connection(), release_datalink(), shutdown_i(), and unbind_link().
Definition at line 147 of file TcpTransport.h.
Referenced by find_datalink_i(), release_datalink(), shutdown_i(), and unbind_link().