OpenDDS::DCPS::TcpTransport Class Reference

#include <TcpTransport.h>

Inheritance diagram for OpenDDS::DCPS::TcpTransport:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpTransport:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TcpTransport (const TransportInst_rch &inst)
virtual ~TcpTransport ()
TcpInstget_configuration ()
int fresh_link (TcpConnection_rch connection)
virtual void unbind_link (DataLink *link)
 Remove any pending_release mappings.

Private Types

typedef ACE_Hash_Map_Manager_Ex<
PriorityKey, TcpDataLink_rch,
ACE_Hash< PriorityKey >,
ACE_Equal_To< PriorityKey >,
ACE_Null_Mutex > 
AddrLinkMap
 Map Type: (key) PriorityKey to (value) TcpDataLink_rch.
typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType

Private Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual void stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id)
virtual bool configure_i (TransportInst *config)
virtual void shutdown_i ()
virtual void pre_shutdown_i ()
virtual bool connection_info_i (TransportLocator &local_info) const
virtual void release_datalink (DataLink *link)
 Called by the DataLink to release itself.
virtual std::string transport_type () const
void async_connect_failed (const PriorityKey &key)
void passive_connection (const ACE_INET_Addr &remote_address, const TcpConnection_rch &connection)
bool find_datalink_i (const PriorityKey &key, TcpDataLink_rch &link, TransportClient *client, const RepoId &remote_id)
int connect_tcp_datalink (const TcpDataLink_rch &link, const TcpConnection_rch &connection)
 Common code used by accept_datalink(), passive_connection(), and active completion.
PriorityKey blob_to_key (const TransportBLOB &remote, Priority priority, bool active)
typedef OPENDDS_MAP (PriorityKey, TcpDataLink_rch) LinkMap
typedef OPENDDS_MAP (PriorityKey, TcpConnection_rch) ConnectionMap

Private Attributes

TcpAcceptoracceptor_
 Used to accept passive connections on our local_address_.
ACE_Connector< TcpConnection,
ACE_SOCK_Connector > 
connector_
 Open TcpConnections using non-blocking connect.
TcpInst_rch tcp_config_
 Our configuration object, supplied to us in config_i().
AddrLinkMap links_
 This is the map of connected DataLinks.
AddrLinkMap pending_release_links_
LockType links_lock_
 This lock is used to protect the links_ data member.
ConnectionMap connections_
LockType connections_lock_
TransportReactorTask_rch reactor_task_
 We need the reactor for our Acceptor.
TcpConnectionReplaceTaskcon_checker_

Friends

class TcpConnection
class TcpDataLink

Detailed Description

This class provides the "Tcp" transport specific implementation. It creates the acceptor for listening the incoming requests using TCP and maintains a collection of TCP specific connections/datalinks.

Notes about object ownership: 1) Own the datalink objects, passive connection objects, acceptor object and TcpConnectionReplaceTask object(used during reconnecting). 2) Reference to TransportReactorTask object owned by base class.

Definition at line 44 of file TcpTransport.h.


Member Typedef Documentation

typedef ACE_Hash_Map_Manager_Ex<PriorityKey, TcpDataLink_rch, ACE_Hash<PriorityKey>, ACE_Equal_To<PriorityKey>, ACE_Null_Mutex> OpenDDS::DCPS::TcpTransport::AddrLinkMap [private]

Map Type: (key) PriorityKey to (value) TcpDataLink_rch.

Definition at line 115 of file TcpTransport.h.

typedef ACE_Condition<LockType> OpenDDS::DCPS::TcpTransport::ConditionType [private]

Definition at line 122 of file TcpTransport.h.

typedef ACE_Guard<LockType> OpenDDS::DCPS::TcpTransport::GuardType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 121 of file TcpTransport.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TcpTransport::LockType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 120 of file TcpTransport.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::TcpTransport::TcpTransport ( const TransportInst_rch inst  )  [explicit]

Definition at line 32 of file TcpTransport.cpp.

References acceptor_, con_checker_, OpenDDS::DCPS::TransportImpl::configure(), DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().

00033   : acceptor_(new TcpAcceptor(this)),
00034     con_checker_(new TcpConnectionReplaceTask(this))
00035 {
00036   DBG_ENTRY_LVL("TcpTransport","TcpTransport",6);
00037 
00038   if (!inst.is_nil()) {
00039     if (!configure(inst.in())) {
00040       delete con_checker_;
00041       delete acceptor_;
00042       throw Transport::UnableToCreate();
00043     }
00044   }
00045 }

OpenDDS::DCPS::TcpTransport::~TcpTransport (  )  [virtual]

Definition at line 47 of file TcpTransport.cpp.

References acceptor_, OpenDDS::DCPS::QueueTaskBase< T >::close(), con_checker_, and DBG_ENTRY_LVL.

00048 {
00049   DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6);
00050   delete acceptor_;
00051 
00052   con_checker_->close(1);  // This could potentially fix a race condition
00053   delete con_checker_;
00054 }


Member Function Documentation

TransportImpl::AcceptConnectResult OpenDDS::DCPS::TcpTransport::accept_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [private, virtual]

Definition at line 231 of file TcpTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), OpenDDS::DCPS::PriorityKey::address(), blob_to_key(), connect_tcp_datalink(), connections_, connections_lock_, find_datalink_i(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PriorityKey::is_active(), OpenDDS::DCPS::PriorityKey::is_loopback(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, OpenDDS::DCPS::PriorityKey::priority(), OpenDDS::DCPS::TransportClient::repo_id_, TcpDataLink, and VDBG_LVL.

00234 {
00235   GuidConverter remote_conv(remote.repo_id_);
00236   GuidConverter local_conv(attribs.local_id_);
00237 
00238   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C "
00239             "accepting connection from remote %C\n",
00240             std::string(local_conv).c_str(),
00241             std::string(remote_conv).c_str()), 5);
00242 
00243   GuardType guard(connections_lock_);
00244   const PriorityKey key =
00245     blob_to_key(remote.blob_, attribs.priority_, false /* !active */);
00246 
00247   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey "
00248             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", attribs.priority_,
00249             key.address().get_host_addr(), key.address().get_port_number(),
00250             key.is_loopback(), key.is_active()), 2);
00251 
00252   TcpDataLink_rch link;
00253   {
00254     GuardType guard(links_lock_);
00255 
00256     if (find_datalink_i(key, link, client, remote.repo_id_)) {
00257       return link.is_nil()
00258         ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00259         : AcceptConnectResult(link._retn());
00260 
00261     } else {
00262       link = new TcpDataLink(key.address(), this, key.priority(),
00263                              key.is_loopback(), key.is_active());
00264 
00265       if (links_.bind(key, link) != 0 /*OK*/) {
00266         ACE_ERROR((LM_ERROR,
00267                    "(%P|%t) ERROR: TcpTransport::accept_datalink "
00268                    "Unable to bind new TcpDataLink to "
00269                    "TcpTransport in links_ map.\n"));
00270         return AcceptConnectResult();
00271       }
00272     }
00273   }
00274 
00275   TcpConnection_rch connection;
00276   const ConnectionMap::iterator iter = connections_.find(key);
00277 
00278   if (iter != connections_.end()) {
00279     connection = iter->second;
00280     connections_.erase(iter);
00281   }
00282 
00283   if (connection.is_nil()) {
00284     if (!link->add_on_start_callback(client, remote.repo_id_)) {
00285       VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00286                 "got started link %@.\n", link.in()), 0);
00287       return AcceptConnectResult(link._retn());
00288     }
00289 
00290     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00291               "no existing TcpConnection.\n"), 0);
00292 
00293     add_pending_connection(client, link.in());
00294 
00295     // no link ready, passive_connection will complete later
00296     return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00297   }
00298 
00299   guard.release(); // connect_tcp_datalink() isn't called with connections_lock_
00300 
00301   if (connect_tcp_datalink(link, connection) == -1) {
00302     GuardType guard(links_lock_);
00303     links_.unbind(key);
00304     link = 0;
00305   }
00306 
00307   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00308             "connected link %@.\n", link.in()), 2);
00309   return AcceptConnectResult(link._retn());
00310 }

void OpenDDS::DCPS::TcpTransport::async_connect_failed ( const PriorityKey key  )  [private]

Definition at line 169 of file TcpTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::in(), links_, and links_lock_.

00170 {
00171 
00172   ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Failed to make active connection.\n"));
00173   GuardType guard(links_lock_);
00174   TcpDataLink_rch link;
00175   links_.find(key, link);
00176   links_.unbind(key);
00177   guard.release();
00178 
00179   if (link.in()) {
00180     link->invoke_on_start_callbacks(false);
00181   }
00182 
00183 }

PriorityKey OpenDDS::DCPS::TcpTransport::blob_to_key ( const TransportBLOB remote,
Priority  priority,
bool  active 
) [private]

Definition at line 57 of file TcpTransport.cpp.

References OpenDDS::DCPS::AssociationData::get_remote_address(), and tcp_config_.

Referenced by accept_datalink(), and connect_datalink().

00060 {
00061   const ACE_INET_Addr remote_address =
00062     AssociationData::get_remote_address(remote);
00063   const bool is_loopback = remote_address == tcp_config_->local_address();
00064   return PriorityKey(priority, remote_address, is_loopback, active);
00065 }

bool OpenDDS::DCPS::TcpTransport::configure_i ( TransportInst config  )  [private, virtual]

Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 334 of file TcpTransport.cpp.

References OpenDDS::DCPS::RcObject< T >::_add_ref(), OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::TransportImpl::config(), connector_, OpenDDS::DCPS::TransportImpl::create_reactor_task(), DBG_ENTRY_LVL, OpenDDS::DCPS::get_fully_qualified_hostname(), OpenDDS::DCPS::TcpInst::local_address(), OPENDDS_STRING, OpenDDS::DCPS::TransportImpl::reactor_task(), reactor_task_, tcp_config_, TheServiceParticipant, and VDBG_LVL.

00335 {
00336   DBG_ENTRY_LVL("TcpTransport", "configure_i", 6);
00337 
00338   // Downcast the config argument to a TcpInst*
00339   TcpInst* tcp_config =
00340     static_cast<TcpInst*>(config);
00341 
00342   if (tcp_config == 0) {
00343     // The downcast failed.
00344     ACE_ERROR_RETURN((LM_ERROR,
00345                       "(%P|%t) ERROR: Failed downcast from TransportInst "
00346                       "to TcpInst.\n"),
00347                      false);
00348   }
00349 
00350   this->create_reactor_task();
00351 
00352   // Ask our base class for a "copy" of the reference to the reactor task.
00353   this->reactor_task_ = reactor_task();
00354 
00355   connector_.open(reactor_task_->get_reactor());
00356 
00357   // Make a "copy" of the reference for ourselves.
00358   tcp_config->_add_ref();
00359   this->tcp_config_ = tcp_config;
00360 
00361   // Open the reconnect task
00362   if (this->con_checker_->open()) {
00363     ACE_ERROR_RETURN((LM_ERROR,
00364                       ACE_TEXT("(%P|%t) ERROR: connection checker failed to open : %p\n"),
00365                       ACE_TEXT("open")),
00366                      false);
00367   }
00368 
00369   // Override with DCPSDefaultAddress.
00370   if (this->tcp_config_->local_address() == ACE_INET_Addr () &&
00371       !TheServiceParticipant->default_address ().empty ()) {
00372     this->tcp_config_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00373   }
00374 
00375   // Open our acceptor object so that we can accept passive connections
00376   // on our this->tcp_config_->local_address_.
00377 
00378   if (this->acceptor_->open(this->tcp_config_->local_address(),
00379                             this->reactor_task_->get_reactor()) != 0) {
00380     // Remember to drop our reference to the tcp_config_ object since
00381     // we are about to return -1 here, which means we are supposed to
00382     // keep a copy after all.
00383     TcpInst_rch cfg = this->tcp_config_._retn();
00384 
00385     ACE_ERROR_RETURN((LM_ERROR,
00386                       ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C:%d: %p\n"),
00387                       cfg->local_address().get_host_addr(),
00388                       cfg->local_address().get_port_number(),
00389                       ACE_TEXT("open")),
00390                      false);
00391   }
00392 
00393   // update the port number (incase port zero was given).
00394   ACE_INET_Addr address;
00395 
00396   if (this->acceptor_->acceptor().get_local_addr(address) != 0) {
00397     ACE_ERROR((LM_ERROR,
00398                ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ")
00399                ACE_TEXT("- %p"),
00400                ACE_TEXT("cannot get local addr\n")));
00401   }
00402 
00403   OPENDDS_STRING listening_addr(address.get_host_addr());
00404   VDBG_LVL((LM_DEBUG,
00405             ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C:%hu\n"),
00406             listening_addr.c_str(), address.get_port_number()), 2);
00407 
00408   unsigned short port = address.get_port_number();
00409 
00410   // As default, the acceptor will be listening on INADDR_ANY but advertise with the fully
00411   // qualified hostname and actual listening port number.
00412   if (tcp_config_->local_address().is_any()) {
00413     std::string hostname = get_fully_qualified_hostname();
00414 
00415     this->tcp_config_->local_address(port, hostname.c_str());
00416   }
00417 
00418   // Now we got the actual listening port. Update the port number in the configuration
00419   // if it's 0 originally.
00420   else if (tcp_config_->local_address().get_port_number() == 0) {
00421     tcp_config_->local_address_set_port(port);
00422   }
00423 
00424   // Ahhh...  The sweet smell of success!
00425   return true;
00426 }

TransportImpl::AcceptConnectResult OpenDDS::DCPS::TcpTransport::connect_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [private, virtual]

Definition at line 68 of file TcpTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::TransportImpl::add_pending_connection(), OpenDDS::DCPS::PriorityKey::address(), blob_to_key(), connections_lock_, connector_, DBG_ENTRY_LVL, find_datalink_i(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PriorityKey::is_active(), OpenDDS::DCPS::PriorityKey::is_loopback(), OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, OpenDDS::DCPS::PriorityKey::priority(), OpenDDS::DCPS::TransportClient::repo_id_, tcp_config_, TcpConnection, TcpDataLink, and VDBG_LVL.

00071 {
00072   DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6);
00073 
00074   const PriorityKey key =
00075     blob_to_key(remote.blob_, attribs.priority_, true /*active*/);
00076 
00077   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey "
00078             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00079             key.priority(), key.address().get_host_addr(),
00080             key.address().get_port_number(), key.is_loopback(),
00081             key.is_active()), 0);
00082 
00083   TcpDataLink_rch link;
00084   {
00085     GuardType guard(links_lock_);
00086 
00087     if (find_datalink_i(key, link, client, remote.repo_id_)) {
00088       VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0);
00089       return link.is_nil()
00090         ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00091         : AcceptConnectResult(link._retn());
00092     }
00093 
00094     link = new TcpDataLink(key.address(), this, attribs.priority_,
00095                            key.is_loopback(), true /*active*/);
00096     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0);
00097     if (links_.bind(key, link) != 0 /*OK*/) {
00098       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00099                  "Unable to bind new TcpDataLink[%@] to "
00100                  "TcpTransport in links_ map.\n", link.in()));
00101       return AcceptConnectResult();
00102     }
00103   }
00104 
00105   TcpConnection_rch connection =
00106     new TcpConnection(key.address(), link->transport_priority(), tcp_config_);
00107   connection->set_datalink(link.in());
00108 
00109   TcpConnection* pConn = connection.in();
00110   TcpConnection_rch reactor_refcount(connection); // increment for reactor callback
00111 
00112   ACE_TCHAR str[64];
00113   key.address().addr_to_string(str,sizeof(str)/sizeof(str[0]));
00114 
00115   // Can't make this call while holding onto TransportClient::lock_
00116   const int ret =
00117     connector_.connect(pConn, key.address(), ACE_Synch_Options::asynch);
00118 
00119   if (ret == -1 && errno != EWOULDBLOCK) {
00120 
00121     VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2);
00122     ACE_DEBUG((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"));
00123     //If the connection fails and, in the interim between releasing
00124     //lock and re-acquiring to remove the failed link, another association may have found
00125     //the datalink in links_ (always using find_datalink_i) so must allow the other
00126     //association to either try to connect again (might succeed for it)
00127     //or try another transport.  If find_datalink_i was called for this datalink, an
00128     //on_start_callback will be registered and can be invoked.
00129     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0);
00130     {
00131       GuardType guard(links_lock_);
00132       if (links_.unbind(key, link) != 0 /*OK*/) {
00133         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00134                    "Unable to unbind failed TcpDataLink[%@] from "
00135                    "TcpTransport links_ map.\n", link.in()));
00136       }
00137     }
00138     link->invoke_on_start_callbacks(false);
00139 
00140     return AcceptConnectResult();
00141   }
00142 
00143   // Don't decrement count when reactor_refcount goes out of scope, see
00144   // TcpConnection::open()
00145   (void) reactor_refcount._retn();
00146 
00147   if (ret == 0) {
00148     // connect() completed synchronously and called TcpConnection::active_open().
00149     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink "
00150               "completed synchronously.\n"), 0);
00151     return AcceptConnectResult(link._retn());
00152   }
00153 
00154   if (!link->add_on_start_callback(client, remote.repo_id_)) {
00155     // link was started by the reactor thread before we could add a callback
00156 
00157     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink got link.\n"), 0);
00158     return AcceptConnectResult(link._retn());
00159   }
00160 
00161   GuardType connections_guard(connections_lock_);
00162 
00163   add_pending_connection(client, link.in());
00164   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0);
00165   return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00166 }

int OpenDDS::DCPS::TcpTransport::connect_tcp_datalink ( const TcpDataLink_rch link,
const TcpConnection_rch connection 
) [private]

Common code used by accept_datalink(), passive_connection(), and active completion.

Code common to make_active_connection() and make_passive_connection().

Definition at line 695 of file TcpTransport.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::DCPS::TransportImpl::last_link_.

Referenced by accept_datalink(), and passive_connection().

00697 {
00698   DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6);
00699 
00700   if (link->reuse_existing_connection(connection) == 0) {
00701     return 0;
00702   }
00703 
00704   ++last_link_;
00705 
00706   if (DCPS_debug_level > 4) {
00707     ACE_DEBUG((LM_DEBUG,
00708                ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ")
00709                ACE_TEXT("creating send strategy with priority %d.\n"),
00710                last_link_, link->transport_priority()));
00711   }
00712 
00713   connection->id() = last_link_;
00714 
00715   TransportSendStrategy_rch send_strategy =
00716     new TcpSendStrategy(last_link_, link, this->tcp_config_, connection,
00717                         new TcpSynchResource(connection,
00718                                              this->tcp_config_->max_output_pause_period_),
00719                         this->reactor_task_, link->transport_priority());
00720 
00721   TransportStrategy_rch receive_strategy =
00722     new TcpReceiveStrategy(link, connection, this->reactor_task_);
00723 
00724   if (link->connect(connection, send_strategy, receive_strategy) != 0) {
00725     return -1;
00726   }
00727 
00728   return 0;
00729 }

bool OpenDDS::DCPS::TcpTransport::connection_info_i ( TransportLocator local_info  )  const [private, virtual]

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 497 of file TcpTransport.cpp.

References DBG_ENTRY_LVL, tcp_config_, and VDBG_LVL.

00498 {
00499   DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6);
00500 
00501   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address str %C\n",
00502             this->tcp_config_->get_public_address().c_str()), 2);
00503 
00504   this->tcp_config_->populate_locator(local_info);
00505 
00506   return true;
00507 }

bool OpenDDS::DCPS::TcpTransport::find_datalink_i ( const PriorityKey key,
TcpDataLink_rch link,
TransportClient client,
const RepoId remote_id 
) [private]

Definition at line 187 of file TcpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::add_pending_connection(), DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), links_, pending_release_links_, and VDBG_LVL.

Referenced by accept_datalink(), and connect_datalink().

00189 {
00190   DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6);
00191 
00192   if (links_.find(key, link) == 0 /*OK*/) {
00193     if (!link->add_on_start_callback(client, remote_id)) {
00194       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00195                 ACE_TEXT("link[%@] found, already started.\n"), link.in()), 0);
00196       // Since the link was already started, we won't get an "on start"
00197       // callback, and the link is immediately usable.
00198       return true;
00199     }
00200 
00201     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00202               ACE_TEXT("link[%@] found, add to pending connections.\n"), link.in()), 0);
00203     add_pending_connection(client, link.in());
00204     link = 0; // don't return link to TransportClient
00205     return true;
00206 
00207   } else if (pending_release_links_.find(key, link) == 0 /*OK*/) {
00208     if (link->cancel_release()) {
00209       link->set_release_pending(false);
00210 
00211       if (pending_release_links_.unbind(key, link) == 0 /*OK*/
00212           && links_.bind(key, link) == 0 /*OK*/) {
00213         VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00214                   ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0);
00215         return true;
00216       }
00217       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00218                 ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0);
00219     } else {
00220       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00221                 ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0);
00222     }
00223     link = 0; // don't return link to TransportClient
00224     return false;
00225   }
00226 
00227   return false;
00228 }

int OpenDDS::DCPS::TcpTransport::fresh_link ( TcpConnection_rch  connection  ) 

This function is called by the TcpReconnectTask thread to check if the passively accepted connection is the re-established connection. If it is, then the "old" connection object in the datalink is replaced by the "new" connection object.

Definition at line 735 of file TcpTransport.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::in(), and tcp_config_.

Referenced by OpenDDS::DCPS::TcpConnectionReplaceTask::execute().

00736 {
00737   DBG_ENTRY_LVL("TcpTransport","fresh_link",6);
00738 
00739   TcpDataLink_rch link;
00740   GuardType guard(this->links_lock_);
00741 
00742   PriorityKey key(connection->transport_priority(),
00743                   connection->get_remote_address(),
00744                   connection->get_remote_address() == this->tcp_config_->local_address(),
00745                   connection->is_connector());
00746 
00747   if (this->links_.find(key, link) == 0) {
00748     TcpConnection_rch old_con = link->get_connection();
00749 
00750     // The connection is accepted but may not be associated with the datalink
00751     // at this point. The thread calling add_associations() will associate
00752     // the datalink with the connection in make_passive_connection().
00753     if (old_con.is_nil()) {
00754       return 0;
00755     }
00756 
00757     if (old_con.in() != connection.in())
00758       // Replace the "old" connection object with the "new" connection object.
00759     {
00760       return link->reconnect(connection.in());
00761     }
00762   }
00763 
00764   return 0;
00765 }

TcpInst * OpenDDS::DCPS::TcpTransport::get_configuration (  ) 

Definition at line 622 of file TcpTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::in(), and tcp_config_.

00623 {
00624   return this->tcp_config_.in();
00625 }

typedef OpenDDS::DCPS::TcpTransport::OPENDDS_MAP ( PriorityKey  ,
TcpConnection_rch   
) [private]

typedef OpenDDS::DCPS::TcpTransport::OPENDDS_MAP ( PriorityKey  ,
TcpDataLink_rch   
) [private]

void OpenDDS::DCPS::TcpTransport::passive_connection ( const ACE_INET_Addr &  remote_address,
const TcpConnection_rch connection 
) [private]

Called by the TcpConnection object when it has been created by the acceptor and needs to be attached to a DataLink. The DataLink may or may not already be created and waiting for this passive connection to appear.

Definition at line 633 of file TcpTransport.cpp.

References OpenDDS::DCPS::QueueTaskBase< T >::add(), con_checker_, connect_tcp_datalink(), connections_, connections_lock_, DBG_ENTRY_LVL, OpenDDS::DCPS::RcHandle< T >::is_nil(), links_, links_lock_, tcp_config_, and VDBG_LVL.

00635 {
00636   DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6);
00637 
00638   const PriorityKey key(connection->transport_priority(),
00639                         remote_address,
00640                         remote_address == tcp_config_->local_address(),
00641                         connection->is_connector());
00642 
00643   VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
00644             ACE_TEXT("established with %C:%d.\n"),
00645             remote_address.get_host_name(),
00646             remote_address.get_port_number()), 2);
00647 
00648   GuardType connection_guard(connections_lock_);
00649   TcpDataLink_rch link;
00650   {
00651     GuardType guard(links_lock_);
00652     links_.find(key, link);
00653   }
00654 
00655   if (!link.is_nil()) {
00656     connection_guard.release();
00657 
00658     if (connect_tcp_datalink(link, connection) == -1) {
00659       VDBG_LVL((LM_ERROR,
00660                 ACE_TEXT("(%P|%t) ERROR: connect_tcp_datalink failed\n")), 5);
00661       GuardType guard(links_lock_);
00662       links_.unbind(key);
00663 
00664     } else {
00665       con_checker_->add(connection);
00666     }
00667 
00668     return;
00669   }
00670 
00671   // If we reach this point, this link was not in links_, so the
00672   // accept_datalink() call hasn't happened yet.  Store in connections_ for the
00673   // accept_datalink() method to find.
00674   VDBG_LVL((LM_DEBUG, "(%P|%t) # of bef connections: %d\n", connections_.size()), 5);
00675   const ConnectionMap::iterator where = connections_.find(key);
00676 
00677   if (where != connections_.end()) {
00678     ACE_ERROR((LM_ERROR,
00679                ACE_TEXT("(%P|%t) ERROR: TcpTransport::passive_connection() - ")
00680                ACE_TEXT("connection with %C:%d at priority %d already exists, ")
00681                ACE_TEXT("overwriting previously established connection.\n"),
00682                remote_address.get_host_name(),
00683                remote_address.get_port_number(),
00684                connection->transport_priority()));
00685   }
00686 
00687   connections_[key] = connection;
00688   VDBG_LVL((LM_DEBUG, "(%P|%t) # of after connections: %d\n", connections_.size()), 5);
00689 
00690   con_checker_->add(connection);
00691 }

void OpenDDS::DCPS::TcpTransport::pre_shutdown_i (  )  [private, virtual]

Called before transport is shutdown to let the concrete transport to do anything necessary.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 429 of file TcpTransport.cpp.

References DBG_ENTRY_LVL.

00430 {
00431   DBG_ENTRY_LVL("TcpTransport","pre_shutdown_i",6);
00432 
00433   GuardType guard(this->links_lock_);
00434 
00435   AddrLinkMap::ENTRY* entry;
00436 
00437   for (AddrLinkMap::ITERATOR itr(this->links_);
00438        itr.next(entry);
00439        itr.advance()) {
00440     entry->int_id_->pre_stop_i();
00441   }
00442 }

void OpenDDS::DCPS::TcpTransport::release_datalink ( DataLink link  )  [private, virtual]

Called by the DataLink to release itself.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 510 of file TcpTransport.cpp.

References OpenDDS::DCPS::DataLink::datalink_release_delay(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataLink::is_loopback(), OpenDDS::DCPS::TcpDataLink::remote_address(), OpenDDS::DCPS::DataLink::schedule_delayed_release(), OpenDDS::DCPS::DataLink::schedule_stop(), OpenDDS::DCPS::DataLink::set_scheduling_release(), OpenDDS::DCPS::DataLink::transport_priority(), and VDBG_LVL.

00511 {
00512   DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6);
00513 
00514   TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00515 
00516   if (tcp_link == 0) {
00517     // Really an assertion failure
00518     ACE_ERROR((LM_ERROR,
00519                "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to "
00520                "TcpDataLink.\n"));
00521     return;
00522   }
00523 
00524   TcpDataLink_rch released_link;
00525 
00526   // Possible actions that will be taken to release the link.
00527   enum LinkAction { None, StopLink, ScheduleLinkRelease };
00528   LinkAction linkAction = None;
00529 
00530   // Scope for locking to protect the links (and pending_release) containers.
00531   GuardType guard(this->links_lock_);
00532 
00533   // Attempt to remove the TcpDataLink from our links_ map.
00534   PriorityKey key(
00535     tcp_link->transport_priority(),
00536     tcp_link->remote_address(),
00537     tcp_link->is_loopback(),
00538     tcp_link->is_active());
00539 
00540   VDBG_LVL((LM_DEBUG,
00541             "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey "
00542             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00543             link,
00544             tcp_link->transport_priority(),
00545             tcp_link->remote_address().get_host_addr(),
00546             tcp_link->remote_address().get_port_number(),
00547             (int)tcp_link->is_loopback(),
00548             (int)tcp_link->is_active()), 2);
00549 
00550   if (this->links_.unbind(key, released_link) != 0) {
00551     //No op
00552   } else if (link->datalink_release_delay() > ACE_Time_Value::zero) {
00553     link->set_scheduling_release(true);
00554 
00555     VDBG_LVL((LM_DEBUG,
00556               "(%P|%t) TcpTransport::release_datalink datalink_release_delay "
00557               "is %: sec %d usec\n",
00558               link->datalink_release_delay().sec(),
00559               link->datalink_release_delay().usec()), 4);
00560 
00561     // Atomic value update, safe to perform here.
00562     released_link->set_release_pending(true);
00563 
00564     switch (this->pending_release_links_.bind(key, released_link)) {
00565     case -1:
00566       ACE_ERROR((LM_ERROR,
00567                  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00568                  "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind")));
00569       linkAction = StopLink;
00570       break;
00571 
00572     case 1:
00573       ACE_ERROR((LM_ERROR,
00574                  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00575                  "pending_release_links_ map: already bound\n", released_link.in()));
00576       linkAction = StopLink;
00577       break;
00578 
00579     case 0:
00580       linkAction = ScheduleLinkRelease;
00581       break;
00582 
00583     default:
00584       break;
00585     }
00586 
00587   } else { // datalink_release_delay_ is 0
00588     link->set_scheduling_release(true);
00589 
00590     linkAction = StopLink;
00591   }
00592 
00593   // Actions are executed outside of the lock scope.
00594   ACE_Time_Value cancel_now = ACE_OS::gettimeofday();
00595   switch (linkAction) {
00596   case StopLink:
00597     link->schedule_stop(cancel_now);
00598     break;
00599 
00600   case ScheduleLinkRelease:
00601 
00602     link->schedule_delayed_release();
00603     break;
00604 
00605   case None:
00606     break;
00607   }
00608 
00609   if (DCPS_debug_level > 9) {
00610     std::stringstream buffer;
00611     buffer << *link;
00612     ACE_DEBUG((LM_DEBUG,
00613                ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ")
00614                ACE_TEXT("link[%@] with priority %d released.\n%C"),
00615                link,
00616                link->transport_priority(),
00617                buffer.str().c_str()));
00618   }
00619 }

void OpenDDS::DCPS::TcpTransport::shutdown_i (  )  [private, virtual]

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 445 of file TcpTransport.cpp.

References acceptor_, OpenDDS::DCPS::QueueTaskBase< T >::close(), con_checker_, connections_, DBG_ENTRY_LVL, links_, pending_release_links_, reactor_task_, tcp_config_, and OpenDDS::DCPS::TcpAcceptor::transport_shutdown().

00446 {
00447   DBG_ENTRY_LVL("TcpTransport","shutdown_i",6);
00448 
00449   // Don't accept any more connections.
00450   this->acceptor_->close();
00451   this->acceptor_->transport_shutdown();
00452 
00453   this->con_checker_->close(1);
00454 
00455   {
00456     GuardType guard(this->connections_lock_);
00457 
00458     this->connections_.clear();
00459     this->pending_connections_.clear();
00460   }
00461 
00462   // Disconnect all of our DataLinks, and clear our links_ collection.
00463   {
00464     GuardType guard(this->links_lock_);
00465 
00466     AddrLinkMap::ENTRY* entry;
00467 
00468     for (AddrLinkMap::ITERATOR itr(this->links_);
00469          itr.next(entry);
00470          itr.advance()) {
00471       entry->int_id_->transport_shutdown();
00472     }
00473 
00474     this->links_.unbind_all();
00475 
00476     for (AddrLinkMap::ITERATOR itr(this->pending_release_links_);
00477          itr.next(entry);
00478          itr.advance()) {
00479       entry->int_id_->transport_shutdown();
00480     }
00481 
00482     this->pending_release_links_.unbind_all();
00483   }
00484 
00485   // Drop our reference to the TcpInst object.
00486   this->tcp_config_ = 0;
00487 
00488   // Drop our reference to the TransportReactorTask
00489   this->reactor_task_ = 0;
00490 
00491   // Tell our acceptor about this event so that it can drop its reference
00492   // it holds to this TcpTransport object (via smart-pointer).
00493   this->acceptor_->transport_shutdown();
00494 }

void OpenDDS::DCPS::TcpTransport::stop_accepting_or_connecting ( TransportClient client,
const RepoId remote_id 
) [private, virtual]

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 313 of file TcpTransport.cpp.

References connections_lock_, and VDBG_LVL.

00315 {
00316   GuidConverter remote_converted(remote_id);
00317   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting "
00318             "stop connecting to remote: %C\n",
00319             std::string(remote_converted).c_str()), 5);
00320 
00321   GuardType guard(connections_lock_);
00322   typedef std::multimap<TransportClient*, DataLink_rch>::iterator iter_t;
00323   const std::pair<iter_t, iter_t> range =
00324     pending_connections_.equal_range(client);
00325 
00326   for (iter_t iter = range.first; iter != range.second; ++iter) {
00327     iter->second->remove_on_start_callback(client, remote_id);
00328   }
00329 
00330   pending_connections_.erase(range.first, range.second);
00331 }

virtual std::string OpenDDS::DCPS::TcpTransport::transport_type (  )  const [inline, private, virtual]

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 79 of file TcpTransport.h.

00079 { return "tcp"; }

void OpenDDS::DCPS::TcpTransport::unbind_link ( DataLink link  )  [virtual]

Remove any pending_release mappings.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 768 of file TcpTransport.cpp.

References OpenDDS::DCPS::DataLink::datalink_release_delay(), OpenDDS::DCPS::DataLink::is_active(), OpenDDS::DCPS::DataLink::is_loopback(), OpenDDS::DCPS::TcpDataLink::remote_address(), OpenDDS::DCPS::DataLink::transport_priority(), and VDBG_LVL.

00769 {
00770   TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00771 
00772   if (tcp_link == 0) {
00773     // Really an assertion failure
00774     ACE_ERROR((LM_ERROR,
00775                "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00776                "Failed to downcast DataLink to TcpDataLink.\n"));
00777     return;
00778   }
00779 
00780   // Attempt to remove the TcpDataLink from our links_ map.
00781   PriorityKey key(
00782     tcp_link->transport_priority(),
00783     tcp_link->remote_address(),
00784     tcp_link->is_loopback(),
00785     tcp_link->is_active());
00786 
00787   VDBG_LVL((LM_DEBUG,
00788             "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey "
00789             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00790             link,
00791             tcp_link->transport_priority(),
00792             tcp_link->remote_address().get_host_addr(),
00793             tcp_link->remote_address().get_port_number(),
00794             (int)tcp_link->is_loopback(),
00795             (int)tcp_link->is_active()), 2);
00796 
00797   GuardType guard(this->links_lock_);
00798 
00799   if (this->pending_release_links_.unbind(key) != 0 &&
00800       link->datalink_release_delay() > ACE_Time_Value::zero) {
00801     ACE_ERROR((LM_ERROR,
00802                "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00803                "Failed to find link %@ tcp_link %@ PriorityKey "
00804                "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00805                link,
00806                tcp_link,
00807                tcp_link->transport_priority(),
00808                tcp_link->remote_address().get_host_addr(),
00809                tcp_link->remote_address().get_port_number(),
00810                (int)tcp_link->is_loopback(),
00811                (int)tcp_link->is_active()));
00812   }
00813 }


Friends And Related Function Documentation

friend class TcpConnection [friend]

The TcpConnection is our friend. It tells us when it has been created (by our acceptor_), and is seeking the DataLink that should be (or will be) expecting the passive connection.

Definition at line 87 of file TcpTransport.h.

Referenced by connect_datalink().

friend class TcpDataLink [friend]

Definition at line 88 of file TcpTransport.h.

Referenced by accept_datalink(), and connect_datalink().


Member Data Documentation

TcpAcceptor* OpenDDS::DCPS::TcpTransport::acceptor_ [private]

Used to accept passive connections on our local_address_.

Definition at line 138 of file TcpTransport.h.

Referenced by shutdown_i(), TcpTransport(), and ~TcpTransport().

TcpConnectionReplaceTask* OpenDDS::DCPS::TcpTransport::con_checker_ [private]

This task is used to resolve some deadlock situation during reconnecting. TODO: reuse the reconnect_task in the TcpConnection for new connection checking.

Definition at line 168 of file TcpTransport.h.

Referenced by passive_connection(), shutdown_i(), TcpTransport(), and ~TcpTransport().

ConnectionMap OpenDDS::DCPS::TcpTransport::connections_ [private]

Map of passive connection objects that need to be paired with a DataLink.

Definition at line 155 of file TcpTransport.h.

Referenced by accept_datalink(), passive_connection(), and shutdown_i().

LockType OpenDDS::DCPS::TcpTransport::connections_lock_ [private]

This protects the connections_ and the pending_connections_ data members.

Definition at line 159 of file TcpTransport.h.

Referenced by accept_datalink(), connect_datalink(), passive_connection(), and stop_accepting_or_connecting().

ACE_Connector<TcpConnection, ACE_SOCK_Connector> OpenDDS::DCPS::TcpTransport::connector_ [private]

Open TcpConnections using non-blocking connect.

Definition at line 141 of file TcpTransport.h.

Referenced by configure_i(), and connect_datalink().

AddrLinkMap OpenDDS::DCPS::TcpTransport::links_ [private]

This is the map of connected DataLinks.

Definition at line 147 of file TcpTransport.h.

Referenced by accept_datalink(), async_connect_failed(), connect_datalink(), find_datalink_i(), passive_connection(), and shutdown_i().

LockType OpenDDS::DCPS::TcpTransport::links_lock_ [private]

This lock is used to protect the links_ data member.

Definition at line 151 of file TcpTransport.h.

Referenced by accept_datalink(), async_connect_failed(), connect_datalink(), and passive_connection().

AddrLinkMap OpenDDS::DCPS::TcpTransport::pending_release_links_ [private]

Definition at line 148 of file TcpTransport.h.

Referenced by find_datalink_i(), and shutdown_i().

TransportReactorTask_rch OpenDDS::DCPS::TcpTransport::reactor_task_ [private]

We need the reactor for our Acceptor.

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 162 of file TcpTransport.h.

Referenced by configure_i(), and shutdown_i().

TcpInst_rch OpenDDS::DCPS::TcpTransport::tcp_config_ [private]

Our configuration object, supplied to us in config_i().

Definition at line 144 of file TcpTransport.h.

Referenced by blob_to_key(), configure_i(), connect_datalink(), connection_info_i(), fresh_link(), get_configuration(), passive_connection(), and shutdown_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:39 2016 for OpenDDS by  doxygen 1.4.7