TcpTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Tcp_pch.h"
00009 #include "TcpTransport.h"
00010 #include "TcpConnectionReplaceTask.h"
00011 #include "TcpAcceptor.h"
00012 #include "TcpSendStrategy.h"
00013 #include "TcpReceiveStrategy.h"
00014 #include "TcpInst.h"
00015 #include "TcpDataLink.h"
00016 #include "TcpSynchResource.h"
00017 #include "TcpConnection.h"
00018 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00019 #include "dds/DCPS/transport/framework/TransportReactorTask.h"
00020 #include "dds/DCPS/transport/framework/EntryExit.h"
00021 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00022 #include "dds/DCPS/AssociationData.h"
00023 #include "dds/DCPS/debug.h"
00024 #include "dds/DCPS/GuidConverter.h"
00025 #include "dds/DCPS/Service_Participant.h"
00026 
00027 #include <sstream>
00028 
00029 namespace OpenDDS {
00030 namespace DCPS {
00031 
00032 TcpTransport::TcpTransport(const TransportInst_rch& inst)
00033   : acceptor_(new TcpAcceptor(this)),
00034     con_checker_(new TcpConnectionReplaceTask(this))
00035 {
00036   DBG_ENTRY_LVL("TcpTransport","TcpTransport",6);
00037 
00038   if (!inst.is_nil()) {
00039     if (!configure(inst.in())) {
00040       delete con_checker_;
00041       delete acceptor_;
00042       throw Transport::UnableToCreate();
00043     }
00044   }
00045 }
00046 
00047 TcpTransport::~TcpTransport()
00048 {
00049   DBG_ENTRY_LVL("TcpTransport","~TcpTransport",6);
00050   delete acceptor_;
00051 
00052   con_checker_->close(1);  // This could potentially fix a race condition
00053   delete con_checker_;
00054 }
00055 
00056 PriorityKey
00057 TcpTransport::blob_to_key(const TransportBLOB& remote,
00058                           Priority priority,
00059                           bool active)
00060 {
00061   const ACE_INET_Addr remote_address =
00062     AssociationData::get_remote_address(remote);
00063   const bool is_loopback = remote_address == tcp_config_->local_address();
00064   return PriorityKey(priority, remote_address, is_loopback, active);
00065 }
00066 
00067 TransportImpl::AcceptConnectResult
00068 TcpTransport::connect_datalink(const RemoteTransport& remote,
00069                                const ConnectionAttribs& attribs,
00070                                TransportClient* client)
00071 {
00072   DBG_ENTRY_LVL("TcpTransport", "connect_datalink", 6);
00073 
00074   const PriorityKey key =
00075     blob_to_key(remote.blob_, attribs.priority_, true /*active*/);
00076 
00077   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink PriorityKey "
00078             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00079             key.priority(), key.address().get_host_addr(),
00080             key.address().get_port_number(), key.is_loopback(),
00081             key.is_active()), 0);
00082 
00083   TcpDataLink_rch link;
00084   {
00085     GuardType guard(links_lock_);
00086 
00087     if (find_datalink_i(key, link, client, remote.repo_id_)) {
00088       VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink found datalink link[%@]\n", link.in()), 0);
00089       return link.is_nil()
00090         ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00091         : AcceptConnectResult(link._retn());
00092     }
00093 
00094     link = new TcpDataLink(key.address(), this, attribs.priority_,
00095                            key.is_loopback(), true /*active*/);
00096     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink create new link[%@]\n", link.in()), 0);
00097     if (links_.bind(key, link) != 0 /*OK*/) {
00098       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00099                  "Unable to bind new TcpDataLink[%@] to "
00100                  "TcpTransport in links_ map.\n", link.in()));
00101       return AcceptConnectResult();
00102     }
00103   }
00104 
00105   TcpConnection_rch connection =
00106     new TcpConnection(key.address(), link->transport_priority(), tcp_config_);
00107   connection->set_datalink(link.in());
00108 
00109   TcpConnection* pConn = connection.in();
00110   TcpConnection_rch reactor_refcount(connection); // increment for reactor callback
00111 
00112   ACE_TCHAR str[64];
00113   key.address().addr_to_string(str,sizeof(str)/sizeof(str[0]));
00114 
00115   // Can't make this call while holding onto TransportClient::lock_
00116   const int ret =
00117     connector_.connect(pConn, key.address(), ACE_Synch_Options::asynch);
00118 
00119   if (ret == -1 && errno != EWOULDBLOCK) {
00120 
00121     VDBG_LVL((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"), 2);
00122     ACE_DEBUG((LM_ERROR, "(%P|%t) TcpTransport::connect_datalink error %m.\n"));
00123     //If the connection fails and, in the interim between releasing
00124     //lock and re-acquiring to remove the failed link, another association may have found
00125     //the datalink in links_ (always using find_datalink_i) so must allow the other
00126     //association to either try to connect again (might succeed for it)
00127     //or try another transport.  If find_datalink_i was called for this datalink, an
00128     //on_start_callback will be registered and can be invoked.
00129     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink connect failed, remove link[%@]\n", link.in()), 0);
00130     {
00131       GuardType guard(links_lock_);
00132       if (links_.unbind(key, link) != 0 /*OK*/) {
00133         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TcpTransport::connect_datalink "
00134                    "Unable to unbind failed TcpDataLink[%@] from "
00135                    "TcpTransport links_ map.\n", link.in()));
00136       }
00137     }
00138     link->invoke_on_start_callbacks(false);
00139 
00140     return AcceptConnectResult();
00141   }
00142 
00143   // Don't decrement count when reactor_refcount goes out of scope, see
00144   // TcpConnection::open()
00145   (void) reactor_refcount._retn();
00146 
00147   if (ret == 0) {
00148     // connect() completed synchronously and called TcpConnection::active_open().
00149     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink "
00150               "completed synchronously.\n"), 0);
00151     return AcceptConnectResult(link._retn());
00152   }
00153 
00154   if (!link->add_on_start_callback(client, remote.repo_id_)) {
00155     // link was started by the reactor thread before we could add a callback
00156 
00157     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink got link.\n"), 0);
00158     return AcceptConnectResult(link._retn());
00159   }
00160 
00161   GuardType connections_guard(connections_lock_);
00162 
00163   add_pending_connection(client, link.in());
00164   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::connect_datalink pending.\n"), 0);
00165   return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00166 }
00167 
00168 void
00169 TcpTransport::async_connect_failed(const PriorityKey& key)
00170 {
00171 
00172   ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Failed to make active connection.\n"));
00173   GuardType guard(links_lock_);
00174   TcpDataLink_rch link;
00175   links_.find(key, link);
00176   links_.unbind(key);
00177   guard.release();
00178 
00179   if (link.in()) {
00180     link->invoke_on_start_callbacks(false);
00181   }
00182 
00183 }
00184 
00185 //Called with links_lock_ held
00186 bool
00187 TcpTransport::find_datalink_i(const PriorityKey& key, TcpDataLink_rch& link,
00188                               TransportClient* client, const RepoId& remote_id)
00189 {
00190   DBG_ENTRY_LVL("TcpTransport", "find_datalink_i", 6);
00191 
00192   if (links_.find(key, link) == 0 /*OK*/) {
00193     if (!link->add_on_start_callback(client, remote_id)) {
00194       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00195                 ACE_TEXT("link[%@] found, already started.\n"), link.in()), 0);
00196       // Since the link was already started, we won't get an "on start"
00197       // callback, and the link is immediately usable.
00198       return true;
00199     }
00200 
00201     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00202               ACE_TEXT("link[%@] found, add to pending connections.\n"), link.in()), 0);
00203     add_pending_connection(client, link.in());
00204     link = 0; // don't return link to TransportClient
00205     return true;
00206 
00207   } else if (pending_release_links_.find(key, link) == 0 /*OK*/) {
00208     if (link->cancel_release()) {
00209       link->set_release_pending(false);
00210 
00211       if (pending_release_links_.unbind(key, link) == 0 /*OK*/
00212           && links_.bind(key, link) == 0 /*OK*/) {
00213         VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00214                   ACE_TEXT("found link[%@] in pending release list, cancelled release and moved back to links_.\n"), link.in()), 0);
00215         return true;
00216       }
00217       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00218                 ACE_TEXT("found link[%@] in pending release list but was unable to shift back to links_.\n"), link.in()), 0);
00219     } else {
00220       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::find_datalink_i ")
00221                 ACE_TEXT("found link[%@] in pending release list but was unable to cancel release.\n"), link.in()), 0);
00222     }
00223     link = 0; // don't return link to TransportClient
00224     return false;
00225   }
00226 
00227   return false;
00228 }
00229 
00230 TransportImpl::AcceptConnectResult
00231 TcpTransport::accept_datalink(const RemoteTransport& remote,
00232                               const ConnectionAttribs& attribs,
00233                               TransportClient* client)
00234 {
00235   GuidConverter remote_conv(remote.repo_id_);
00236   GuidConverter local_conv(attribs.local_id_);
00237 
00238   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink local %C "
00239             "accepting connection from remote %C\n",
00240             std::string(local_conv).c_str(),
00241             std::string(remote_conv).c_str()), 5);
00242 
00243   GuardType guard(connections_lock_);
00244   const PriorityKey key =
00245     blob_to_key(remote.blob_, attribs.priority_, false /* !active */);
00246 
00247   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink PriorityKey "
00248             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n", attribs.priority_,
00249             key.address().get_host_addr(), key.address().get_port_number(),
00250             key.is_loopback(), key.is_active()), 2);
00251 
00252   TcpDataLink_rch link;
00253   {
00254     GuardType guard(links_lock_);
00255 
00256     if (find_datalink_i(key, link, client, remote.repo_id_)) {
00257       return link.is_nil()
00258         ? AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS)
00259         : AcceptConnectResult(link._retn());
00260 
00261     } else {
00262       link = new TcpDataLink(key.address(), this, key.priority(),
00263                              key.is_loopback(), key.is_active());
00264 
00265       if (links_.bind(key, link) != 0 /*OK*/) {
00266         ACE_ERROR((LM_ERROR,
00267                    "(%P|%t) ERROR: TcpTransport::accept_datalink "
00268                    "Unable to bind new TcpDataLink to "
00269                    "TcpTransport in links_ map.\n"));
00270         return AcceptConnectResult();
00271       }
00272     }
00273   }
00274 
00275   TcpConnection_rch connection;
00276   const ConnectionMap::iterator iter = connections_.find(key);
00277 
00278   if (iter != connections_.end()) {
00279     connection = iter->second;
00280     connections_.erase(iter);
00281   }
00282 
00283   if (connection.is_nil()) {
00284     if (!link->add_on_start_callback(client, remote.repo_id_)) {
00285       VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00286                 "got started link %@.\n", link.in()), 0);
00287       return AcceptConnectResult(link._retn());
00288     }
00289 
00290     VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00291               "no existing TcpConnection.\n"), 0);
00292 
00293     add_pending_connection(client, link.in());
00294 
00295     // no link ready, passive_connection will complete later
00296     return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
00297   }
00298 
00299   guard.release(); // connect_tcp_datalink() isn't called with connections_lock_
00300 
00301   if (connect_tcp_datalink(link, connection) == -1) {
00302     GuardType guard(links_lock_);
00303     links_.unbind(key);
00304     link = 0;
00305   }
00306 
00307   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::accept_datalink "
00308             "connected link %@.\n", link.in()), 2);
00309   return AcceptConnectResult(link._retn());
00310 }
00311 
00312 void
00313 TcpTransport::stop_accepting_or_connecting(TransportClient* client,
00314                                            const RepoId& remote_id)
00315 {
00316   GuidConverter remote_converted(remote_id);
00317   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport::stop_accepting_or_connecting "
00318             "stop connecting to remote: %C\n",
00319             std::string(remote_converted).c_str()), 5);
00320 
00321   GuardType guard(connections_lock_);
00322   typedef std::multimap<TransportClient*, DataLink_rch>::iterator iter_t;
00323   const std::pair<iter_t, iter_t> range =
00324     pending_connections_.equal_range(client);
00325 
00326   for (iter_t iter = range.first; iter != range.second; ++iter) {
00327     iter->second->remove_on_start_callback(client, remote_id);
00328   }
00329 
00330   pending_connections_.erase(range.first, range.second);
00331 }
00332 
00333 bool
00334 TcpTransport::configure_i(TransportInst* config)
00335 {
00336   DBG_ENTRY_LVL("TcpTransport", "configure_i", 6);
00337 
00338   // Downcast the config argument to a TcpInst*
00339   TcpInst* tcp_config =
00340     static_cast<TcpInst*>(config);
00341 
00342   if (tcp_config == 0) {
00343     // The downcast failed.
00344     ACE_ERROR_RETURN((LM_ERROR,
00345                       "(%P|%t) ERROR: Failed downcast from TransportInst "
00346                       "to TcpInst.\n"),
00347                      false);
00348   }
00349 
00350   this->create_reactor_task();
00351 
00352   // Ask our base class for a "copy" of the reference to the reactor task.
00353   this->reactor_task_ = reactor_task();
00354 
00355   connector_.open(reactor_task_->get_reactor());
00356 
00357   // Make a "copy" of the reference for ourselves.
00358   tcp_config->_add_ref();
00359   this->tcp_config_ = tcp_config;
00360 
00361   // Open the reconnect task
00362   if (this->con_checker_->open()) {
00363     ACE_ERROR_RETURN((LM_ERROR,
00364                       ACE_TEXT("(%P|%t) ERROR: connection checker failed to open : %p\n"),
00365                       ACE_TEXT("open")),
00366                      false);
00367   }
00368 
00369   // Override with DCPSDefaultAddress.
00370   if (this->tcp_config_->local_address() == ACE_INET_Addr () &&
00371       !TheServiceParticipant->default_address ().empty ()) {
00372     this->tcp_config_->local_address(0, TheServiceParticipant->default_address ().c_str ());
00373   }
00374 
00375   // Open our acceptor object so that we can accept passive connections
00376   // on our this->tcp_config_->local_address_.
00377 
00378   if (this->acceptor_->open(this->tcp_config_->local_address(),
00379                             this->reactor_task_->get_reactor()) != 0) {
00380     // Remember to drop our reference to the tcp_config_ object since
00381     // we are about to return -1 here, which means we are supposed to
00382     // keep a copy after all.
00383     TcpInst_rch cfg = this->tcp_config_._retn();
00384 
00385     ACE_ERROR_RETURN((LM_ERROR,
00386                       ACE_TEXT("(%P|%t) ERROR: Acceptor failed to open %C:%d: %p\n"),
00387                       cfg->local_address().get_host_addr(),
00388                       cfg->local_address().get_port_number(),
00389                       ACE_TEXT("open")),
00390                      false);
00391   }
00392 
00393   // update the port number (incase port zero was given).
00394   ACE_INET_Addr address;
00395 
00396   if (this->acceptor_->acceptor().get_local_addr(address) != 0) {
00397     ACE_ERROR((LM_ERROR,
00398                ACE_TEXT("(%P|%t) ERROR: TcpTransport::configure_i ")
00399                ACE_TEXT("- %p"),
00400                ACE_TEXT("cannot get local addr\n")));
00401   }
00402 
00403   OPENDDS_STRING listening_addr(address.get_host_addr());
00404   VDBG_LVL((LM_DEBUG,
00405             ACE_TEXT("(%P|%t) TcpTransport::configure_i listening on %C:%hu\n"),
00406             listening_addr.c_str(), address.get_port_number()), 2);
00407 
00408   unsigned short port = address.get_port_number();
00409 
00410   // As default, the acceptor will be listening on INADDR_ANY but advertise with the fully
00411   // qualified hostname and actual listening port number.
00412   if (tcp_config_->local_address().is_any()) {
00413     std::string hostname = get_fully_qualified_hostname();
00414 
00415     this->tcp_config_->local_address(port, hostname.c_str());
00416   }
00417 
00418   // Now we got the actual listening port. Update the port number in the configuration
00419   // if it's 0 originally.
00420   else if (tcp_config_->local_address().get_port_number() == 0) {
00421     tcp_config_->local_address_set_port(port);
00422   }
00423 
00424   // Ahhh...  The sweet smell of success!
00425   return true;
00426 }
00427 
00428 void
00429 TcpTransport::pre_shutdown_i()
00430 {
00431   DBG_ENTRY_LVL("TcpTransport","pre_shutdown_i",6);
00432 
00433   GuardType guard(this->links_lock_);
00434 
00435   AddrLinkMap::ENTRY* entry;
00436 
00437   for (AddrLinkMap::ITERATOR itr(this->links_);
00438        itr.next(entry);
00439        itr.advance()) {
00440     entry->int_id_->pre_stop_i();
00441   }
00442 }
00443 
00444 void
00445 TcpTransport::shutdown_i()
00446 {
00447   DBG_ENTRY_LVL("TcpTransport","shutdown_i",6);
00448 
00449   // Don't accept any more connections.
00450   this->acceptor_->close();
00451   this->acceptor_->transport_shutdown();
00452 
00453   this->con_checker_->close(1);
00454 
00455   {
00456     GuardType guard(this->connections_lock_);
00457 
00458     this->connections_.clear();
00459     this->pending_connections_.clear();
00460   }
00461 
00462   // Disconnect all of our DataLinks, and clear our links_ collection.
00463   {
00464     GuardType guard(this->links_lock_);
00465 
00466     AddrLinkMap::ENTRY* entry;
00467 
00468     for (AddrLinkMap::ITERATOR itr(this->links_);
00469          itr.next(entry);
00470          itr.advance()) {
00471       entry->int_id_->transport_shutdown();
00472     }
00473 
00474     this->links_.unbind_all();
00475 
00476     for (AddrLinkMap::ITERATOR itr(this->pending_release_links_);
00477          itr.next(entry);
00478          itr.advance()) {
00479       entry->int_id_->transport_shutdown();
00480     }
00481 
00482     this->pending_release_links_.unbind_all();
00483   }
00484 
00485   // Drop our reference to the TcpInst object.
00486   this->tcp_config_ = 0;
00487 
00488   // Drop our reference to the TransportReactorTask
00489   this->reactor_task_ = 0;
00490 
00491   // Tell our acceptor about this event so that it can drop its reference
00492   // it holds to this TcpTransport object (via smart-pointer).
00493   this->acceptor_->transport_shutdown();
00494 }
00495 
00496 bool
00497 TcpTransport::connection_info_i(TransportLocator& local_info) const
00498 {
00499   DBG_ENTRY_LVL("TcpTransport", "connection_info_i", 6);
00500 
00501   VDBG_LVL((LM_DEBUG, "(%P|%t) TcpTransport public address str %C\n",
00502             this->tcp_config_->get_public_address().c_str()), 2);
00503 
00504   this->tcp_config_->populate_locator(local_info);
00505 
00506   return true;
00507 }
00508 
00509 void
00510 TcpTransport::release_datalink(DataLink* link)
00511 {
00512   DBG_ENTRY_LVL("TcpTransport", "release_datalink", 6);
00513 
00514   TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00515 
00516   if (tcp_link == 0) {
00517     // Really an assertion failure
00518     ACE_ERROR((LM_ERROR,
00519                "(%P|%t) INTERNAL ERROR - Failed to downcast DataLink to "
00520                "TcpDataLink.\n"));
00521     return;
00522   }
00523 
00524   TcpDataLink_rch released_link;
00525 
00526   // Possible actions that will be taken to release the link.
00527   enum LinkAction { None, StopLink, ScheduleLinkRelease };
00528   LinkAction linkAction = None;
00529 
00530   // Scope for locking to protect the links (and pending_release) containers.
00531   GuardType guard(this->links_lock_);
00532 
00533   // Attempt to remove the TcpDataLink from our links_ map.
00534   PriorityKey key(
00535     tcp_link->transport_priority(),
00536     tcp_link->remote_address(),
00537     tcp_link->is_loopback(),
00538     tcp_link->is_active());
00539 
00540   VDBG_LVL((LM_DEBUG,
00541             "(%P|%t) TcpTransport::release_datalink link[%@] PriorityKey "
00542             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00543             link,
00544             tcp_link->transport_priority(),
00545             tcp_link->remote_address().get_host_addr(),
00546             tcp_link->remote_address().get_port_number(),
00547             (int)tcp_link->is_loopback(),
00548             (int)tcp_link->is_active()), 2);
00549 
00550   if (this->links_.unbind(key, released_link) != 0) {
00551     //No op
00552   } else if (link->datalink_release_delay() > ACE_Time_Value::zero) {
00553     link->set_scheduling_release(true);
00554 
00555     VDBG_LVL((LM_DEBUG,
00556               "(%P|%t) TcpTransport::release_datalink datalink_release_delay "
00557               "is %: sec %d usec\n",
00558               link->datalink_release_delay().sec(),
00559               link->datalink_release_delay().usec()), 4);
00560 
00561     // Atomic value update, safe to perform here.
00562     released_link->set_release_pending(true);
00563 
00564     switch (this->pending_release_links_.bind(key, released_link)) {
00565     case -1:
00566       ACE_ERROR((LM_ERROR,
00567                  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00568                  "pending_release_links_ map: %p\n", released_link.in(), ACE_TEXT("bind")));
00569       linkAction = StopLink;
00570       break;
00571 
00572     case 1:
00573       ACE_ERROR((LM_ERROR,
00574                  "(%P|%t) ERROR: Unable to bind released TcpDataLink[%@] to "
00575                  "pending_release_links_ map: already bound\n", released_link.in()));
00576       linkAction = StopLink;
00577       break;
00578 
00579     case 0:
00580       linkAction = ScheduleLinkRelease;
00581       break;
00582 
00583     default:
00584       break;
00585     }
00586 
00587   } else { // datalink_release_delay_ is 0
00588     link->set_scheduling_release(true);
00589 
00590     linkAction = StopLink;
00591   }
00592 
00593   // Actions are executed outside of the lock scope.
00594   ACE_Time_Value cancel_now = ACE_OS::gettimeofday();
00595   switch (linkAction) {
00596   case StopLink:
00597     link->schedule_stop(cancel_now);
00598     break;
00599 
00600   case ScheduleLinkRelease:
00601 
00602     link->schedule_delayed_release();
00603     break;
00604 
00605   case None:
00606     break;
00607   }
00608 
00609   if (DCPS_debug_level > 9) {
00610     std::stringstream buffer;
00611     buffer << *link;
00612     ACE_DEBUG((LM_DEBUG,
00613                ACE_TEXT("(%P|%t) TcpTransport::release_datalink() - ")
00614                ACE_TEXT("link[%@] with priority %d released.\n%C"),
00615                link,
00616                link->transport_priority(),
00617                buffer.str().c_str()));
00618   }
00619 }
00620 
00621 TcpInst*
00622 TcpTransport::get_configuration()
00623 {
00624   return this->tcp_config_.in();
00625 }
00626 
00627 /// This method is called by a TcpConnection object that has been
00628 /// created and opened by our acceptor_ as a result of passively
00629 /// accepting a connection on our local address.  Ultimately, the connection
00630 /// object needs to be paired with a DataLink object that is (or will be)
00631 /// expecting this passive connection to be established.
00632 void
00633 TcpTransport::passive_connection(const ACE_INET_Addr& remote_address,
00634                                  const TcpConnection_rch& connection)
00635 {
00636   DBG_ENTRY_LVL("TcpTransport", "passive_connection", 6);
00637 
00638   const PriorityKey key(connection->transport_priority(),
00639                         remote_address,
00640                         remote_address == tcp_config_->local_address(),
00641                         connection->is_connector());
00642 
00643   VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TcpTransport::passive_connection() - ")
00644             ACE_TEXT("established with %C:%d.\n"),
00645             remote_address.get_host_name(),
00646             remote_address.get_port_number()), 2);
00647 
00648   GuardType connection_guard(connections_lock_);
00649   TcpDataLink_rch link;
00650   {
00651     GuardType guard(links_lock_);
00652     links_.find(key, link);
00653   }
00654 
00655   if (!link.is_nil()) {
00656     connection_guard.release();
00657 
00658     if (connect_tcp_datalink(link, connection) == -1) {
00659       VDBG_LVL((LM_ERROR,
00660                 ACE_TEXT("(%P|%t) ERROR: connect_tcp_datalink failed\n")), 5);
00661       GuardType guard(links_lock_);
00662       links_.unbind(key);
00663 
00664     } else {
00665       con_checker_->add(connection);
00666     }
00667 
00668     return;
00669   }
00670 
00671   // If we reach this point, this link was not in links_, so the
00672   // accept_datalink() call hasn't happened yet.  Store in connections_ for the
00673   // accept_datalink() method to find.
00674   VDBG_LVL((LM_DEBUG, "(%P|%t) # of bef connections: %d\n", connections_.size()), 5);
00675   const ConnectionMap::iterator where = connections_.find(key);
00676 
00677   if (where != connections_.end()) {
00678     ACE_ERROR((LM_ERROR,
00679                ACE_TEXT("(%P|%t) ERROR: TcpTransport::passive_connection() - ")
00680                ACE_TEXT("connection with %C:%d at priority %d already exists, ")
00681                ACE_TEXT("overwriting previously established connection.\n"),
00682                remote_address.get_host_name(),
00683                remote_address.get_port_number(),
00684                connection->transport_priority()));
00685   }
00686 
00687   connections_[key] = connection;
00688   VDBG_LVL((LM_DEBUG, "(%P|%t) # of after connections: %d\n", connections_.size()), 5);
00689 
00690   con_checker_->add(connection);
00691 }
00692 
00693 /// Common code used by accept_datalink(), passive_connection(), and active completion.
00694 int
00695 TcpTransport::connect_tcp_datalink(const TcpDataLink_rch& link,
00696                                    const TcpConnection_rch& connection)
00697 {
00698   DBG_ENTRY_LVL("TcpTransport", "connect_tcp_datalink", 6);
00699 
00700   if (link->reuse_existing_connection(connection) == 0) {
00701     return 0;
00702   }
00703 
00704   ++last_link_;
00705 
00706   if (DCPS_debug_level > 4) {
00707     ACE_DEBUG((LM_DEBUG,
00708                ACE_TEXT("(%P|%t) TcpTransport::connect_tcp_datalink() [%d] - ")
00709                ACE_TEXT("creating send strategy with priority %d.\n"),
00710                last_link_, link->transport_priority()));
00711   }
00712 
00713   connection->id() = last_link_;
00714 
00715   TransportSendStrategy_rch send_strategy =
00716     new TcpSendStrategy(last_link_, link, this->tcp_config_, connection,
00717                         new TcpSynchResource(connection,
00718                                              this->tcp_config_->max_output_pause_period_),
00719                         this->reactor_task_, link->transport_priority());
00720 
00721   TransportStrategy_rch receive_strategy =
00722     new TcpReceiveStrategy(link, connection, this->reactor_task_);
00723 
00724   if (link->connect(connection, send_strategy, receive_strategy) != 0) {
00725     return -1;
00726   }
00727 
00728   return 0;
00729 }
00730 
00731 /// This function is called by the TcpReconnectTask thread to check if the passively
00732 /// accepted connection is the re-established connection. If it is, then the "old" connection
00733 /// object in the datalink is replaced by the "new" connection object.
00734 int
00735 TcpTransport::fresh_link(TcpConnection_rch connection)
00736 {
00737   DBG_ENTRY_LVL("TcpTransport","fresh_link",6);
00738 
00739   TcpDataLink_rch link;
00740   GuardType guard(this->links_lock_);
00741 
00742   PriorityKey key(connection->transport_priority(),
00743                   connection->get_remote_address(),
00744                   connection->get_remote_address() == this->tcp_config_->local_address(),
00745                   connection->is_connector());
00746 
00747   if (this->links_.find(key, link) == 0) {
00748     TcpConnection_rch old_con = link->get_connection();
00749 
00750     // The connection is accepted but may not be associated with the datalink
00751     // at this point. The thread calling add_associations() will associate
00752     // the datalink with the connection in make_passive_connection().
00753     if (old_con.is_nil()) {
00754       return 0;
00755     }
00756 
00757     if (old_con.in() != connection.in())
00758       // Replace the "old" connection object with the "new" connection object.
00759     {
00760       return link->reconnect(connection.in());
00761     }
00762   }
00763 
00764   return 0;
00765 }
00766 
00767 void
00768 TcpTransport::unbind_link(DataLink* link)
00769 {
00770   TcpDataLink* tcp_link = static_cast<TcpDataLink*>(link);
00771 
00772   if (tcp_link == 0) {
00773     // Really an assertion failure
00774     ACE_ERROR((LM_ERROR,
00775                "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00776                "Failed to downcast DataLink to TcpDataLink.\n"));
00777     return;
00778   }
00779 
00780   // Attempt to remove the TcpDataLink from our links_ map.
00781   PriorityKey key(
00782     tcp_link->transport_priority(),
00783     tcp_link->remote_address(),
00784     tcp_link->is_loopback(),
00785     tcp_link->is_active());
00786 
00787   VDBG_LVL((LM_DEBUG,
00788             "(%P|%t) TcpTransport::unbind_link link %@ PriorityKey "
00789             "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00790             link,
00791             tcp_link->transport_priority(),
00792             tcp_link->remote_address().get_host_addr(),
00793             tcp_link->remote_address().get_port_number(),
00794             (int)tcp_link->is_loopback(),
00795             (int)tcp_link->is_active()), 2);
00796 
00797   GuardType guard(this->links_lock_);
00798 
00799   if (this->pending_release_links_.unbind(key) != 0 &&
00800       link->datalink_release_delay() > ACE_Time_Value::zero) {
00801     ACE_ERROR((LM_ERROR,
00802                "(%P|%t) TcpTransport::unbind_link INTERNAL ERROR - "
00803                "Failed to find link %@ tcp_link %@ PriorityKey "
00804                "prio=%d, addr=%C:%hu, is_loopback=%d, is_active=%d\n",
00805                link,
00806                tcp_link,
00807                tcp_link->transport_priority(),
00808                tcp_link->remote_address().get_host_addr(),
00809                tcp_link->remote_address().get_port_number(),
00810                (int)tcp_link->is_loopback(),
00811                (int)tcp_link->is_active()));
00812   }
00813 }
00814 
00815 }
00816 }

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7