TcpTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Tcp_pch.h"
00009 #include "TcpTransport.h"
00010 #include "TcpConnectionReplaceTask.h"
00011 #include "TcpAcceptor.h"
00012 #include "TcpSendStrategy.h"
00013 #include "TcpReceiveStrategy.h"
00014 #include "TcpInst.h"
00015 #include "TcpDataLink.h"
00016 #include "TcpSynchResource.h"
00017 #include "TcpConnection.h"
00018 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00019 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00020 #include "dds/DCPS/transport/framework/EntryExit.h"
00021 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00022 #include "dds/DCPS/AssociationData.h"
00023 #include "dds/DCPS/debug.h"
00024 #include "dds/DCPS/GuidConverter.h"
00025 #include "dds/DCPS/Service_Participant.h"
00026 #include "dds/DCPS/transport/framework/TransportClient.h"
00027 
00028 #include <sstream>
00029 
00030 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00031 
00032 namespace OpenDDS {
00033 namespace DCPS {
00034 
00035 TcpTransport::TcpTransport(TcpInst& inst)
00036   : TransportImpl(inst)
00037   , acceptor_(new TcpAcceptor(this))
00038   , con_checker_(new TcpConnectionReplaceTask(this))
00039 {
00040   DBG_ENTRY_LVL("TcpTransport","TcpTransport",6);
00041 
00042   if (!(configure_i(inst) && open())) {
00043     this->shutdown();
00044     throw Transport::UnableToCreate();
00045   }
00046 
00047 }
00048 
00049 TcpTransport::~TcpTransport()
00050 {
00051   DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6);
00052   con_checker_->close(1);  // This could potentially fix a race condition
00053 }
00054 
00055 
00056 TcpInst&
00057 TcpTransport::config() const
00058 {
00059   return static_cast<TcpInst&>(TransportImpl::config());
00060 }
00061 
00062 PriorityKey
00063 TcpTransport::blob_to_key(const TransportBLOB& remote,
00064                           Priority priority,
00065                           bool active)
00066 {
00067   const ACE_INET_Addr remote_address =
00068     AssociationData::get_remote_address(remote);
00069   const bool is_loopback = remote_address == config().local_address();
00070   return PriorityKey(priority, remote_address, is_loopback, active);
00071 }
00072 
00073 TransportImpl::AcceptConnectResult
00074 TcpTransport::connect_datalink(const RemoteTransport& remote,
00075                                const ConnectionAttribs& attribs,
00076                                const TransportClient_rch& client)
00077 {
00078   DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6);
00079 
00080   const PriorityKey key =
00081     blob_to_key(remote.blob_, attribs.priority_, true /*active*/);
00082 
00083   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey "
00084             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00085             key.priority(), key.address().get_host_addr(),
00086             key.address().get_port_number(), key.is_loopback(),
00087             key.is_active()), 0);
00088 
00089   TcpDataLink_rch link;
00090   {
00091     GuardType guard(links_lock_);
00092 
00093     if (find_datalink_i(key, link, client, remote.repo_id_)) {
00094       VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0);
00095       return link.is_nil()
00096         ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00097         : AcceptConnectResult(link);
00098     }
00099 
00100     link = make_rch<TcpDataLink>(key.address(), ref(*this), attribs.priority_,
00101                                 key.is_loopback(), true /*active*/);
00102     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0);
00103     if (links_.bind(key, link) != 0 /*OK*/) {
00104       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00105                  "Unable to bind new TcpDataLink[%@] to "
00106                  "TcpTransport in links_ map.\n", link.in()));
00107       return AcceptConnectResult();
00108     }
00109   }
00110 
00111   TcpConnection_rch connection(
00112     make_rch<TcpConnection>(key.address(), link->transport_priority(), this->config()));
00113   connection->set_datalink(link);
00114 
00115   TcpConnection* pConn = connection.in();
00116 
00117   ACE_TCHAR str[64];
00118   key.address().addr_to_string(str,sizeof(str)/sizeof(str[0]));
00119 
00120   // Can't make this call while holding onto TransportClient::lock_
00121   const int ret =
00122     connector_.connect(pConn, key.address(), ACE_Synch_Options::asynch);
00123 
00124   if (ret == -1 && errno != EWOULDBLOCK) {
00125 
00126     VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2);
00127     ACE_ERROR((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"));
00128     //If the connection fails and, in the interim between releasing
00129     //lock and re-acquiring to remove the failed link, another association may have found
00130     //the datalink in links_ (always using find_datalink_i) so must allow the other
00131     //association to either try to connect again (might succeed for it)
00132     //or try another transport.  If find_datalink_i was called for this datalink, an
00133     //on_start_callback will be registered and can be invoked.
00134     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0);
00135     {
00136       GuardType guard(links_lock_);
00137       if (links_.unbind(key, link) != 0 /*OK*/) {
00138         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00139                    "Unable to unbind failed TcpDataLink[%@] from "
00140                    "TcpTransport links_ map.\n", link.in()));
00141       }
00142     }
00143     link->invoke_on_start_callbacks(false);
00144 
00145     return AcceptConnectResult();
00146   }
00147 
00148   if (ret == 0) {
00149     // connect() completed synchronously and called TcpConnection::active_open().
00150     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink "
00151               "completed synchronously.\n"), 0);
00152     return AcceptConnectResult(link);
00153   }
00154 
00155   if (!link->add_on_start_callback(client, remote.repo_id_)) {
00156     // link was started by the reactor thread before we could add a callback
00157 
00158     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink got link.\n"), 0);
00159     return AcceptConnectResult(link);
00160   }
00161 
00162   GuardType connections_guard(connections_lock_);
00163 
00164   add_pending_connection(client, link);
00165   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0);
00166   return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00167 }
00168 
00169 void
00170 TcpTransport::async_connect_failed(const PriorityKey& key)
00171 {
00172 
00173   ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Failed to make active connection.\n"));
00174   GuardType guard(links_lock_);
00175   TcpDataLink_rch link;
00176   links_.find(key, link);
00177   links_.unbind(key);
00178   guard.release();
00179 
00180   if (link.in()) {
00181     link->invoke_on_start_callbacks(false);
00182   }
00183 
00184 }
00185 
00186 //Called with links_lock_ held
00187 bool
00188 TcpTransport::find_datalink_i(const PriorityKey& key, TcpDataLink_rch& link,
00189                               const TransportClient_rch& client, const RepoId& remote_id)
00190 {
00191   DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6);
00192 
00193   if (links_.find(key, link) == 0 /*OK*/) {
00194     if (!link->add_on_start_callback(client, remote_id)) {
00195       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00196                 ACE_TEXT("link[%@] found, already started.\n"), link.in()), 0);
00197       // Since the link was already started, we won't get an "on start"
00198       // callback, and the link is immediately usable.
00199       return true;
00200     }
00201 
00202     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00203               ACE_TEXT("link[%@] found, add to pending connections.\n"), link.in()), 0);
00204     add_pending_connection(client, link);
00205     link.reset(); // don't return link to TransportClient
00206     return true;
00207 
00208   } else if (pending_release_links_.find(key, link) == 0 /*OK*/) {
00209     if (link->cancel_release()) {
00210       link->set_release_pending(false);
00211 
00212       if (pending_release_links_.unbind(key, link) == 0 /*OK*/
00213           && links_.bind(key, link) == 0 /*OK*/) {
00214         VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00215                   ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0);
00216         return true;
00217       }
00218       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00219                 ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0);
00220     } else {
00221       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00222                 ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0);
00223     }
00224     link.reset(); // don't return link to TransportClient
00225     return false;
00226   }
00227 
00228   return false;
00229 }
00230 
00231 TransportImpl::AcceptConnectResult
00232 TcpTransport::accept_datalink(const RemoteTransport& remote,
00233                               const ConnectionAttribs& attribs,
00234                               const TransportClient_rch& client)
00235 {
00236   GuidConverter remote_conv(remote.repo_id_);
00237   GuidConverter local_conv(attribs.local_id_);
00238 
00239   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C "
00240             "accepting connection from remote %C\n",
00241             std::string(local_conv).c_str(),
00242             std::string(remote_conv).c_str()), 5);
00243 
00244   GuardType guard(connections_lock_);
00245   const PriorityKey key =
00246     blob_to_key(remote.blob_, attribs.priority_, false /* !active */);
00247 
00248   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey "
00249             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", attribs.priority_,
00250             key.address().get_host_addr(), key.address().get_port_number(),
00251             key.is_loopback(), key.is_active()), 2);
00252 
00253   TcpDataLink_rch link;
00254   {
00255     GuardType guard(links_lock_);
00256 
00257     if (find_datalink_i(key, link, client, remote.repo_id_)) {
00258       return link.is_nil()
00259         ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00260         : AcceptConnectResult(link);
00261 
00262     } else {
00263       link = make_rch<TcpDataLink>(key.address(), ref(*this), key.priority(),
00264                                   key.is_loopback(), key.is_active());
00265 
00266       if (links_.bind(key, link) != 0 /*OK*/) {
00267         ACE_ERROR((LM_ERROR,
00268                    "(%P|%t) ERROR: TcpTransport::accept_datalink "
00269                    "Unable to bind new TcpDataLink to "
00270                    "TcpTransport in links_ map.\n"));
00271         return AcceptConnectResult();
00272       }
00273     }
00274   }
00275 
00276   TcpConnection_rch connection;
00277   const ConnectionMap::iterator iter = connections_.find(key);
00278 
00279   if (iter != connections_.end()) {
00280     connection = iter->second;
00281     connections_.erase(iter);
00282   }
00283 
00284   if (connection.is_nil()) {
00285     if (!link->add_on_start_callback(client, remote.repo_id_)) {
00286       VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00287                 "got started link %@.\n", link.in()), 0);
00288       return AcceptConnectResult(link);
00289     }
00290 
00291     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00292               "no existing TcpConnection.\n"), 0);
00293 
00294     add_pending_connection(client, link);
00295 
00296     // no link ready, passive_connection will complete later
00297     return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00298   }
00299 
00300   guard.release(); // connect_tcp_datalink() isn't called with connections_lock_
00301 
00302   if (connect_tcp_datalink(*link, connection) == -1) {
00303     GuardType guard(links_lock_);
00304     links_.unbind(key);
00305     link.reset();
00306   }
00307 
00308   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00309             "connected link %@.\n", link.in()), 2);
00310   return AcceptConnectResult(link);
00311 }
00312 
00313 void
00314 TcpTransport::stop_accepting_or_connecting(const TransportClient_wrch& client,
00315                                            const RepoId& remote_id)
00316 {
00317   GuidConverter remote_converted(remote_id);
00318   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting "
00319             "stop connecting to remote: %C\n",
00320             std::string(remote_converted).c_str()), 5);
00321 
00322   GuardType guard(connections_lock_);
00323   typedef PendConnMap::iterator iter_t;
00324   const std::pair<iter_t, iter_t> range =
00325     pending_connections_.equal_range(client);
00326 
00327   for (iter_t iter = range.first; iter != range.second; ++iter) {
00328     iter->second->remove_on_start_callback(client, remote_id);
00329   }
00330 
00331   pending_connections_.erase(range.first, range.second);
00332 }
00333 
00334 bool
00335 TcpTransport::configure_i(TcpInst& config)
00336 {
00337   DBG_ENTRY_LVL("TcpTransport", "configure_i", 6);
00338 
00339   this->create_reactor_task();
00340 
00341   connector_.open(reactor_task()->get_reactor());
00342 
00343   // Open the reconnect task
00344   if (this->con_checker_->open()) {
00345     ACE_ERROR_RETURN((LM_ERROR,
00346                       ACE_TEXT("(%P|%t) ERROR: connection checker failed to open : %p\n"),
00347                       ACE_TEXT("open")),
00348                      false);
00349   }
00350 
00351   // Override with DCPSDefaultAddress.
00352   if (config.local_address() == ACE_INET_Addr () &&
00353       !TheServiceParticipant->default_address ().empty ()) {
00354     config.local_address(0, TheServiceParticipant->default_address ().c_str ());
00355   }
00356 
00357   // Open our acceptor object so that we can accept passive connections
00358   // on our config.local_address_.
00359 
00360   if (this->acceptor_->open(config.local_address(),
00361                             this->reactor_task()->get_reactor()) != 0) {
00362 
00363     ACE_ERROR_RETURN((LM_ERROR,
00364                       ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C:%d: %p\n"),
00365                       config.local_address().get_host_addr(),
00366                       config.local_address().get_port_number(),
00367                       ACE_TEXT("open")),
00368                      false);
00369   }
00370 
00371   // update the port number (incase port zero was given).
00372   ACE_INET_Addr address;
00373 
00374   if (this->acceptor_->acceptor().get_local_addr(address) != 0) {
00375     ACE_ERROR((LM_ERROR,
00376                ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ")
00377                ACE_TEXT("- %p"),
00378                ACE_TEXT("cannot get local addr\n")));
00379   }
00380 
00381   OPENDDS_STRING listening_addr(address.get_host_addr());
00382   VDBG_LVL((LM_DEBUG,
00383             ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C:%hu\n"),
00384             listening_addr.c_str(), address.get_port_number()), 2);
00385 
00386   unsigned short port = address.get_port_number();
00387 
00388   // As default, the acceptor will be listening on INADDR_ANY but advertise with the fully
00389   // qualified hostname and actual listening port number.
00390   if (config.local_address().is_any()) {
00391     std::string hostname = get_fully_qualified_hostname();
00392 
00393     config.local_address(port, hostname.c_str());
00394   }
00395 
00396   // Now we got the actual listening port. Update the port number in the configuration
00397   // if it's 0 originally.
00398   else if (config.local_address().get_port_number() == 0) {
00399     config.local_address_set_port(port);
00400   }
00401 
00402   // Ahhh...  The sweet smell of success!
00403   return true;
00404 }
00405 
00406 
00407 void
00408 TcpTransport::shutdown_i()
00409 {
00410   DBG_ENTRY_LVL("TcpTransport","shutdown_i",6);
00411 
00412   {
00413     GuardType guard(this->links_lock_);
00414 
00415     AddrLinkMap::ENTRY* entry;
00416 
00417     for (AddrLinkMap::ITERATOR itr(this->links_);
00418          itr.next(entry);
00419          itr.advance()) {
00420       entry->int_id_->pre_stop_i();
00421     }
00422   }
00423 
00424   // Don't accept any more connections.
00425   this->acceptor_->close();
00426   this->acceptor_->transport_shutdown();
00427 
00428   this->con_checker_->close(1);
00429 
00430   {
00431     GuardType guard(this->connections_lock_);
00432 
00433     this->connections_.clear();
00434     this->pending_connections_.clear();
00435   }
00436 
00437   // Disconnect all of our DataLinks, and clear our links_ collection.
00438   {
00439     GuardType guard(this->links_lock_);
00440 
00441     AddrLinkMap::ENTRY* entry;
00442 
00443     for (AddrLinkMap::ITERATOR itr(this->links_);
00444          itr.next(entry);
00445          itr.advance()) {
00446       entry->int_id_->transport_shutdown();
00447     }
00448 
00449     this->links_.unbind_all();
00450 
00451     for (AddrLinkMap::ITERATOR itr(this->pending_release_links_);
00452          itr.next(entry);
00453          itr.advance()) {
00454       entry->int_id_->transport_shutdown();
00455     }
00456 
00457     this->pending_release_links_.unbind_all();
00458   }
00459 
00460   // Tell our acceptor about this event so that it can drop its reference
00461   // it holds to this TcpTransport object (via smart-pointer).
00462   this->acceptor_->transport_shutdown();
00463 }
00464 
00465 bool
00466 TcpTransport::connection_info_i(TransportLocator& local_info) const
00467 {
00468   DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6);
00469 
00470   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address str %C\n",
00471             this->config().get_public_address().c_str()), 2);
00472 
00473   this->config().populate_locator(local_info);
00474 
00475   return true;
00476 }
00477 
00478 void
00479 TcpTransport::release_datalink(DataLink* link)
00480 {
00481   DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6);
00482 
00483   TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00484 
00485   if (tcp_link == 0) {
00486     // Really an assertion failure
00487     ACE_ERROR((LM_ERROR,
00488                "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to "
00489                "TcpDataLink.\n"));
00490     return;
00491   }
00492 
00493   TcpDataLink_rch released_link;
00494 
00495   // Possible actions that will be taken to release the link.
00496   enum LinkAction { None, StopLink, ScheduleLinkRelease };
00497   LinkAction linkAction = None;
00498 
00499   // Scope for locking to protect the links (and pending_release) containers.
00500   GuardType guard(this->links_lock_);
00501 
00502   // Attempt to remove the TcpDataLink from our links_ map.
00503   PriorityKey key(
00504     tcp_link->transport_priority(),
00505     tcp_link->remote_address(),
00506     tcp_link->is_loopback(),
00507     tcp_link->is_active());
00508 
00509   VDBG_LVL((LM_DEBUG,
00510             "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey "
00511             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00512             link,
00513             tcp_link->transport_priority(),
00514             tcp_link->remote_address().get_host_addr(),
00515             tcp_link->remote_address().get_port_number(),
00516             (int)tcp_link->is_loopback(),
00517             (int)tcp_link->is_active()), 2);
00518 
00519   if (this->links_.unbind(key, released_link) != 0) {
00520     //No op
00521   } else if (link->datalink_release_delay() > ACE_Time_Value::zero) {
00522     link->set_scheduling_release(true);
00523 
00524     VDBG_LVL((LM_DEBUG,
00525               "(%P|%t) TcpTransport::release_datalink datalink_release_delay "
00526               "is %: sec %d usec\n",
00527               link->datalink_release_delay().sec(),
00528               link->datalink_release_delay().usec()), 4);
00529 
00530     // Atomic value update, safe to perform here.
00531     released_link->set_release_pending(true);
00532 
00533     switch (this->pending_release_links_.bind(key, released_link)) {
00534     case -1:
00535       ACE_ERROR((LM_ERROR,
00536                  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00537                  "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind")));
00538       linkAction = StopLink;
00539       break;
00540 
00541     case 1:
00542       ACE_ERROR((LM_ERROR,
00543                  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00544                  "pending_release_links_ map: already bound\n", released_link.in()));
00545       linkAction = StopLink;
00546       break;
00547 
00548     case 0:
00549       linkAction = ScheduleLinkRelease;
00550       break;
00551 
00552     default:
00553       break;
00554     }
00555 
00556   } else { // datalink_release_delay_ is 0
00557     link->set_scheduling_release(true);
00558 
00559     linkAction = StopLink;
00560   }
00561 
00562   // Actions are executed outside of the lock scope.
00563   ACE_Time_Value cancel_now = ACE_OS::gettimeofday();
00564   switch (linkAction) {
00565   case StopLink:
00566     link->schedule_stop(cancel_now);
00567     break;
00568 
00569   case ScheduleLinkRelease:
00570 
00571     link->schedule_delayed_release();
00572     break;
00573 
00574   case None:
00575     break;
00576   }
00577 
00578   if (DCPS_debug_level > 9) {
00579     std::stringstream buffer;
00580     buffer << *link;
00581     ACE_DEBUG((LM_DEBUG,
00582                ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ")
00583                ACE_TEXT("link[%@] with priority %d released.\n%C"),
00584                link,
00585                link->transport_priority(),
00586                buffer.str().c_str()));
00587   }
00588 }
00589 
00590 /// This method is called by a TcpConnection object that has been
00591 /// created and opened by our acceptor_ as a result of passively
00592 /// accepting a connection on our local address.  Ultimately, the connection
00593 /// object needs to be paired with a DataLink object that is (or will be)
00594 /// expecting this passive connection to be established.
00595 void
00596 TcpTransport::passive_connection(const ACE_INET_Addr& remote_address,
00597                                  const TcpConnection_rch& connection)
00598 {
00599   DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6);
00600 
00601   const PriorityKey key(connection->transport_priority(),
00602                         remote_address,
00603                         remote_address == config().local_address(),
00604                         connection->is_connector());
00605 
00606   VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
00607             ACE_TEXT("established with %C:%d.\n"),
00608             remote_address.get_host_name(),
00609             remote_address.get_port_number()), 2);
00610 
00611   GuardType connection_guard(connections_lock_);
00612   TcpDataLink_rch link;
00613   {
00614     GuardType guard(links_lock_);
00615     links_.find(key, link);
00616   }
00617 
00618   if (!link.is_nil()) {
00619     connection_guard.release();
00620 
00621     if (connect_tcp_datalink(*link, connection) == -1) {
00622       VDBG_LVL((LM_ERROR,
00623                 ACE_TEXT("(%P|%t) ERROR: connect_tcp_datalink failed\n")), 5);
00624       GuardType guard(links_lock_);
00625       links_.unbind(key);
00626 
00627     } else {
00628       con_checker_->add(connection);
00629     }
00630 
00631     return;
00632   }
00633 
00634   // If we reach this point, this link was not in links_, so the
00635   // accept_datalink() call hasn't happened yet.  Store in connections_ for the
00636   // accept_datalink() method to find.
00637   VDBG_LVL((LM_DEBUG, "(%P|%t) # of bef connections: %d\n", connections_.size()), 5);
00638   const ConnectionMap::iterator where = connections_.find(key);
00639 
00640   if (where != connections_.end()) {
00641     ACE_ERROR((LM_ERROR,
00642                ACE_TEXT("(%P|%t) ERROR: TcpTransport::passive_connection() - ")
00643                ACE_TEXT("connection with %C:%d at priority %d already exists, ")
00644                ACE_TEXT("overwriting previously established connection.\n"),
00645                remote_address.get_host_name(),
00646                remote_address.get_port_number(),
00647                connection->transport_priority()));
00648   }
00649 
00650   connections_[key] = connection;
00651   VDBG_LVL((LM_DEBUG, "(%P|%t) # of after connections: %d\n", connections_.size()), 5);
00652 
00653   con_checker_->add(connection);
00654 }
00655 
00656 /// Common code used by accept_datalink(), passive_connection(), and active completion.
00657 int
00658 TcpTransport::connect_tcp_datalink(TcpDataLink& link,
00659                                    const TcpConnection_rch& connection)
00660 {
00661   DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6);
00662 
00663   if (link.reuse_existing_connection(connection) == 0) {
00664     return 0;
00665   }
00666 
00667   ++last_link_;
00668 
00669   if (DCPS_debug_level > 4) {
00670     ACE_DEBUG((LM_DEBUG,
00671                ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ")
00672                ACE_TEXT("creating send strategy with priority %d.\n"),
00673                last_link_, link.transport_priority()));
00674   }
00675 
00676   connection->id() = last_link_;
00677 
00678   TcpSendStrategy_rch send_strategy (
00679     make_rch<TcpSendStrategy>(last_link_, ref(link),
00680                              new TcpSynchResource(link,
00681                                                   this->config().max_output_pause_period_),
00682                              this->reactor_task(), link.transport_priority()));
00683 
00684   TcpReceiveStrategy_rch receive_strategy(
00685     make_rch<TcpReceiveStrategy>(ref(link), this->reactor_task()));
00686 
00687   if (link.connect(connection, send_strategy, receive_strategy) != 0) {
00688     return -1;
00689   }
00690 
00691   return 0;
00692 }
00693 
00694 /// This function is called by the TcpReconnectTask thread to check if the passively
00695 /// accepted connection is the re-established connection. If it is, then the "old" connection
00696 /// object in the datalink is replaced by the "new" connection object.
00697 int
00698 TcpTransport::fresh_link(TcpConnection_rch connection)
00699 {
00700   DBG_ENTRY_LVL("TcpTransport","fresh_link",6);
00701 
00702   TcpDataLink_rch link;
00703   GuardType guard(this->links_lock_);
00704 
00705   PriorityKey key(connection->transport_priority(),
00706                   connection->get_remote_address(),
00707                   connection->get_remote_address() == this->config().local_address(),
00708                   connection->is_connector());
00709 
00710   if (this->links_.find(key, link) == 0) {
00711     TcpConnection_rch old_con = link->get_connection();
00712 
00713     // The connection is accepted but may not be associated with the datalink
00714     // at this point. The thread calling add_associations() will associate
00715     // the datalink with the connection in make_passive_connection().
00716     if (old_con.is_nil()) {
00717       return 0;
00718     }
00719 
00720     if (old_con.in() != connection.in())
00721       // Replace the "old" connection object with the "new" connection object.
00722     {
00723       return link->reconnect(connection);
00724     }
00725   }
00726 
00727   return 0;
00728 }
00729 
00730 void
00731 TcpTransport::unbind_link(DataLink* link)
00732 {
00733   TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00734 
00735   if (tcp_link == 0) {
00736     // Really an assertion failure
00737     ACE_ERROR((LM_ERROR,
00738                "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00739                "Failed to downcast DataLink to TcpDataLink.\n"));
00740     return;
00741   }
00742 
00743   // Attempt to remove the TcpDataLink from our links_ map.
00744   PriorityKey key(
00745     tcp_link->transport_priority(),
00746     tcp_link->remote_address(),
00747     tcp_link->is_loopback(),
00748     tcp_link->is_active());
00749 
00750   VDBG_LVL((LM_DEBUG,
00751             "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey "
00752             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00753             link,
00754             tcp_link->transport_priority(),
00755             tcp_link->remote_address().get_host_addr(),
00756             tcp_link->remote_address().get_port_number(),
00757             (int)tcp_link->is_loopback(),
00758             (int)tcp_link->is_active()), 2);
00759 
00760   GuardType guard(this->links_lock_);
00761 
00762   if (this->pending_release_links_.unbind(key) != 0 &&
00763       link->datalink_release_delay() > ACE_Time_Value::zero) {
00764     ACE_ERROR((LM_ERROR,
00765                "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00766                "Failed to find link %@ tcp_link %@ PriorityKey "
00767                "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00768                link,
00769                tcp_link,
00770                tcp_link->transport_priority(),
00771                tcp_link->remote_address().get_host_addr(),
00772                tcp_link->remote_address().get_port_number(),
00773                (int)tcp_link->is_loopback(),
00774                (int)tcp_link->is_active()));
00775   }
00776 }
00777 
00778 }
00779 }
00780 
00781 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1