LCOV - code coverage report
Current view: top level - DCPS/transport/framework - TransportClient.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 23 613 3.8 %
Date: 2023-04-30 01:32:43 Functions: 3 44 6.8 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include <DCPS/DdsDcps_pch.h> //Only the _pch include should start with DCPS/
       9             : 
      10             : #include "TransportClient.h"
      11             : #include "TransportConfig.h"
      12             : #include "TransportRegistry.h"
      13             : #include "TransportExceptions.h"
      14             : #include "TransportReceiveListener.h"
      15             : 
      16             : #include <dds/DCPS/DataWriterImpl.h>
      17             : #include <dds/DCPS/SendStateDataSampleList.h>
      18             : #include <dds/DCPS/GuidConverter.h>
      19             : #include <dds/DCPS/Definitions.h>
      20             : #include <dds/DCPS/RTPS/ICE/Ice.h>
      21             : 
      22             : #include <dds/DdsDcpsInfoUtilsC.h>
      23             : 
      24             : #include <ace/Reactor_Timer_Interface.h>
      25             : 
      26             : #include <algorithm>
      27             : #include <iterator>
      28             : 
      29             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      30             : 
      31             : namespace OpenDDS {
      32             : namespace DCPS {
      33             : 
      34           3 : TransportClient::TransportClient()
      35           3 :   : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
      36           3 :   , expected_transaction_id_(1)
      37           3 :   , max_transaction_id_seen_(0)
      38           3 :   , max_transaction_tail_(0)
      39           3 :   , swap_bytes_(false)
      40           3 :   , cdr_encapsulation_(false)
      41           3 :   , reliable_(false)
      42           3 :   , durable_(false)
      43           3 :   , reverse_lock_(lock_)
      44           6 :   , repo_id_(GUID_UNKNOWN)
      45             : {
      46           3 : }
      47             : 
      48           3 : TransportClient::~TransportClient()
      49             : {
      50           3 :   if (Transport_debug_level > 5) {
      51           0 :     LogGuid logger(repo_id_);
      52           0 :     ACE_DEBUG((LM_DEBUG,
      53             :                ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
      54             :                logger.c_str()));
      55           0 :   }
      56             : 
      57           3 :   stop_associating();
      58             : 
      59           3 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
      60             : 
      61           3 :   for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end(); ++it) {
      62           0 :     for (size_t i = 0; i < impls_.size(); ++i) {
      63           0 :       TransportImpl_rch impl = impls_[i].lock();
      64           0 :       if (impl) {
      65           0 :         impl->stop_accepting_or_connecting(it->second->client_, it->second->data_.remote_id_, false, false);
      66             :       }
      67           0 :     }
      68             :   }
      69           6 : }
      70             : 
      71             : void
      72           0 : TransportClient::clean_prev_pending()
      73             : {
      74           0 :   for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end();) {
      75           0 :     if (it->second->safe_to_remove()) {
      76           0 :       prev_pending_.erase(it++);
      77             :     } else {
      78           0 :       ++it;
      79             :     }
      80             :   }
      81           0 : }
      82             : 
      83             : void
      84           0 : TransportClient::enable_transport(bool reliable, bool durable)
      85             : {
      86             :   // Search for a TransportConfig to use:
      87           0 :   TransportConfig_rch tc;
      88             : 
      89             :   // 1. If this object is an Entity, check if a TransportConfig has been
      90             :   //    bound either directly to this entity or to a parent entity.
      91           0 :   for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this));
      92           0 :        ent && tc.is_nil(); ent = ent->parent()) {
      93           0 :     tc = ent->transport_config();
      94           0 :   }
      95             : 
      96           0 :   if (tc.is_nil()) {
      97           0 :     TransportRegistry* const reg = TransportRegistry::instance();
      98             :     // 2. Check for a TransportConfig that is the default for this Domain.
      99           0 :     tc = reg->domain_default_config(domain_id());
     100             : 
     101           0 :     if (tc.is_nil()) {
     102             :       // 3. Use the global_config if one has been set.
     103           0 :       tc = reg->global_config();
     104             : 
     105           0 :       if (!tc.is_nil() && tc->instances_.empty()
     106           0 :           && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
     107             :         // 4. Set the "fallback option" if the global_config is empty.
     108             :         //    (only applies if the user hasn't changed the global config)
     109           0 :         tc = reg->fix_empty_default();
     110             :       }
     111             :     }
     112             :   }
     113             : 
     114           0 :   if (tc.is_nil()) {
     115           0 :     ACE_ERROR((LM_ERROR,
     116             :                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
     117             :                ACE_TEXT("No TransportConfig found.\n")));
     118           0 :     throw Transport::NotConfigured();
     119             :   }
     120             : 
     121           0 :   enable_transport_using_config(reliable, durable, tc);
     122           0 : }
     123             : 
     124             : void
     125           0 : TransportClient::enable_transport_using_config(bool reliable, bool durable,
     126             :                                                const TransportConfig_rch& tc)
     127             : {
     128           0 :   config_ = tc;
     129           0 :   swap_bytes_ = tc->swap_bytes_;
     130           0 :   reliable_ = reliable;
     131           0 :   durable_ = durable;
     132           0 :   unsigned long duration = tc->passive_connect_duration_;
     133           0 :   if (duration == 0) {
     134           0 :     duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION;
     135           0 :     if (DCPS_debug_level) {
     136           0 :       ACE_DEBUG((LM_WARNING,
     137             :         ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
     138             :         ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
     139             :         ACE_TEXT("default value\n")));
     140             :     }
     141             :   }
     142           0 :   passive_connect_duration_ = TimeDuration::from_msec(duration);
     143             : 
     144           0 :   populate_connection_info();
     145             : 
     146           0 :   const size_t n = tc->instances_.size();
     147             : 
     148           0 :   for (size_t i = 0; i < n; ++i) {
     149           0 :     TransportInst_rch inst = tc->instances_[i];
     150             : 
     151           0 :     if (check_transport_qos(*inst)) {
     152           0 :       TransportImpl_rch impl = inst->get_or_create_impl();
     153             : 
     154           0 :       if (impl) {
     155           0 :         impls_.push_back(impl);
     156             : 
     157             : #if defined(OPENDDS_SECURITY)
     158           0 :         impl->local_crypto_handle(get_crypto_handle());
     159             : #endif
     160             : 
     161           0 :         cdr_encapsulation_ |= inst->requires_cdr_encapsulation();
     162             :       }
     163           0 :     }
     164           0 :   }
     165             : 
     166           0 :   if (impls_.empty()) {
     167           0 :     ACE_ERROR((LM_ERROR,
     168             :                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
     169             :                ACE_TEXT("No TransportImpl could be created.\n")));
     170           0 :     throw Transport::NotConfigured();
     171             :   }
     172           0 : }
     173             : 
     174             : void
     175           0 : TransportClient::populate_connection_info()
     176             : {
     177           0 :   conn_info_.length(0);
     178             : 
     179           0 :   const size_t n = config_->instances_.size();
     180           0 :   for (size_t i = 0; i < n; ++i) {
     181           0 :     TransportInst_rch inst = config_->instances_[i];
     182           0 :     if (check_transport_qos(*inst)) {
     183           0 :       TransportImpl_rch impl = inst->get_or_create_impl();
     184           0 :       if (impl) {
     185           0 :         const CORBA::ULong idx = DCPS::grow(conn_info_) - 1;
     186           0 :         impl->connection_info(conn_info_[idx], CONNINFO_ALL);
     187             :       }
     188           0 :     }
     189           0 :   }
     190             : 
     191           0 :   if (conn_info_.length() == 0) {
     192           0 :     ACE_ERROR((LM_ERROR,
     193             :                ACE_TEXT("(%P|%t) TransportClient::populate_connection_info: ")
     194             :                ACE_TEXT("No connection info\n")));
     195             :   }
     196           0 : }
     197             : 
     198             : bool
     199           0 : TransportClient::associate(const AssociationData& data, bool active)
     200             : {
     201           0 :   GUID_t repo_id = get_guid();
     202             : 
     203           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
     204             : 
     205           0 :   repo_id_ = repo_id;
     206           0 :   OPENDDS_ASSERT(repo_id_ != GUID_UNKNOWN);
     207             : 
     208           0 :   if (impls_.empty()) {
     209           0 :     if (DCPS_debug_level) {
     210           0 :       LogGuid writer_log(repo_id_);
     211           0 :       LogGuid reader_log(data.remote_id_);
     212           0 :       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
     213             :                  ACE_TEXT("local %C remote %C no available impls\n"),
     214             :                  writer_log.c_str(),
     215             :                  reader_log.c_str()));
     216           0 :     }
     217           0 :     return false;
     218             :   }
     219             : 
     220           0 :   bool all_impls_shut_down = true;
     221           0 :   for (size_t i = 0; i < impls_.size(); ++i) {
     222           0 :     TransportImpl_rch impl = impls_[i].lock();
     223           0 :     if (impl && !impl->is_shut_down()) {
     224           0 :       all_impls_shut_down = false;
     225           0 :       break;
     226             :     }
     227           0 :   }
     228             : 
     229           0 :   if (all_impls_shut_down) {
     230           0 :     if (DCPS_debug_level) {
     231           0 :       LogGuid writer_log(repo_id_);
     232           0 :       LogGuid reader_log(data.remote_id_);
     233           0 :       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
     234             :                  ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
     235             :                  writer_log.c_str(),
     236             :                  reader_log.c_str()));
     237           0 :     }
     238           0 :     return false;
     239             :   }
     240             : 
     241           0 :   clean_prev_pending();
     242             : 
     243           0 :   PendingMap::iterator iter = pending_.find(data.remote_id_);
     244             : 
     245           0 :   if (iter == pending_.end()) {
     246           0 :     GUID_t remote_copy(data.remote_id_);
     247           0 :     PendingAssoc_rch pa = make_rch<PendingAssoc>(rchandle_from(this));
     248           0 :     pa->active_ = active;
     249           0 :     pa->impls_.clear();
     250           0 :     pa->blob_index_ = 0;
     251           0 :     pa->data_ = data;
     252           0 :     OPENDDS_ASSERT(repo_id_ != GUID_UNKNOWN);
     253           0 :     pa->attribs_.local_id_ = repo_id_;
     254           0 :     pa->attribs_.priority_ = get_priority_value(data);
     255           0 :     pa->attribs_.local_reliable_ = reliable_;
     256           0 :     pa->attribs_.local_durable_ = durable_;
     257           0 :     pa->attribs_.max_sn_ = get_max_sn();
     258           0 :     iter = pending_.insert(std::make_pair(remote_copy, pa)).first;
     259             : 
     260           0 :     LogGuid tc_assoc_log(repo_id_);
     261           0 :     LogGuid remote_log(data.remote_id_);
     262           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
     263             :               "between %C and remote %C\n",
     264             :               tc_assoc_log.c_str(),
     265             :               remote_log.c_str()), 0);
     266           0 :   } else {
     267             : 
     268           0 :     ACE_ERROR((LM_ERROR,
     269             :                ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
     270             :                ACE_TEXT("already associating with remote.\n")));
     271             : 
     272           0 :     return false;
     273             : 
     274             :   }
     275             : 
     276           0 :   PendingAssoc_rch pend = iter->second;
     277             : 
     278           0 :   if (active) {
     279           0 :     ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
     280           0 :     pend->impls_.reserve(impls_.size());
     281           0 :     std::reverse_copy(impls_.begin(), impls_.end(),
     282           0 :                       std::back_inserter(pend->impls_));
     283             : 
     284           0 :     return pend->initiate_connect(this, guard);
     285             : 
     286           0 :   } else { // passive
     287             : 
     288             :     // call accept_datalink for each impl / blob pair of the same type
     289           0 :     for (size_t i = 0; i < impls_.size(); ++i) {
     290             :       // Release the PendingAssoc object's mutex_ since the nested for-loop does not access
     291             :       // the PendingAssoc object directly and the functions called by the nested loop can
     292             :       // lead to a PendingAssoc object's mutex_ being acquired, which will cause deadlock if
     293             :       // it is not released here.
     294           0 :       TransportImpl::ConnectionAttribs attribs;
     295           0 :       TransportImpl_rch impl = impls_[i].lock();
     296             :       {
     297           0 :         ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
     298           0 :         pend->impls_.push_back(impl);
     299           0 :         attribs = pend->attribs_;
     300           0 :       }
     301           0 :       const OPENDDS_STRING type = impl->transport_type();
     302             : 
     303           0 :       for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
     304           0 :         if (data.remote_data_[j].transport_type.in() == type) {
     305             :           const TransportImpl::RemoteTransport remote = {
     306           0 :             data.remote_id_, data.remote_data_[j].data, data.discovery_locator_.data, data.participant_discovered_at_, data.remote_transport_context_,
     307           0 :             data.publication_transport_priority_,
     308           0 :             data.remote_reliable_, data.remote_durable_};
     309             : 
     310           0 :           TransportImpl::AcceptConnectResult res;
     311             :           {
     312             :             // This thread acquired lock_ at the beginning of this method.
     313             :             // Calling accept_datalink might require getting the lock for the transport's reactor.
     314             :             // If the current thread is not an event handler for the transport's reactor, e.g.,
     315             :             // the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
     316             :             // Event handlers in the transport reactor may call passive_connection which calls use_datalink
     317             :             // which acquires lock_.  The locking order in this case is transport reactor lock -> lock_.
     318             :             // To avoid deadlock, we must reverse the lock.
     319           0 :             RcHandle<TransportClient> client = rchandle_from(this);
     320           0 :             ACE_GUARD_RETURN(Reverse_Lock_t, rev_tc_guard, reverse_lock_, false);
     321           0 :             res = impl->accept_datalink(remote, attribs, client);
     322           0 :           }
     323             : 
     324             :           //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
     325           0 :           iter = pending_.find(data.remote_id_);
     326             : 
     327           0 :           if (iter == pending_.end()) {
     328             :             //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
     329             :             //active side connection and completed, thus pend was removed from pending_.  Can return true.
     330           0 :             return true;
     331             :           }
     332           0 :           pend = iter->second;
     333             : 
     334           0 :           if (res.success_) {
     335           0 :             if (res.link_.is_nil()) {
     336             :               // In this case, it may be waiting for the TCP connection to be
     337             :               // established.  Just wait without trying other transports.
     338           0 :               pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
     339             :             } else {
     340           0 :               use_datalink_i(data.remote_id_, res.link_, guard);
     341           0 :               return true;
     342             :             }
     343             :           }
     344           0 :         }
     345             :       }
     346           0 :     }
     347             : 
     348           0 :     pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
     349             :   }
     350             : 
     351           0 :   return true;
     352           0 : }
     353             : 
     354             : void
     355           0 : TransportClient::PendingAssoc::reset_client() {
     356           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     357           0 :   client_.reset();
     358           0 : }
     359             : 
     360             : bool
     361           0 : TransportClient::PendingAssoc::safe_to_remove() {
     362           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     363           0 :   return !client_ && !scheduled_;
     364           0 : }
     365             : 
     366             : int
     367           0 : TransportClient::PendingAssoc::handle_timeout(const ACE_Time_Value&,
     368             :                                               const void* arg)
     369             : {
     370           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
     371             : 
     372           0 :   RcHandle<TransportClient> client;
     373             :   {
     374           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     375           0 :     client = client_.lock();
     376           0 :     scheduled_ = false;
     377           0 :   }
     378             : 
     379           0 :   if (client && client.get() == static_cast<TransportClient*>(const_cast<void*>(arg))) {
     380           0 :     client->use_datalink(data_.remote_id_, DataLink_rch());
     381             :   }
     382           0 :   return 0;
     383           0 : }
     384             : 
     385             : bool
     386           0 : TransportClient::initiate_connect_i(TransportImpl::AcceptConnectResult& result,
     387             :                                     TransportImpl_rch impl,
     388             :                                     const TransportImpl::RemoteTransport& remote,
     389             :                                     const TransportImpl::ConnectionAttribs& attribs_,
     390             :                                     Guard& guard)
     391             : {
     392           0 :   if (!guard.locked()) {
     393             :     //don't own the lock_ so can't release it...shouldn't happen
     394           0 :     LogGuid local_log(repo_id_);
     395           0 :     LogGuid remote_log(remote.repo_id_);
     396           0 :     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i ")
     397             :                         ACE_TEXT("between local %C and remote %C unsuccessful because ")
     398             :                         ACE_TEXT("guard was not locked\n"),
     399             :                         local_log.c_str(),
     400             :                         remote_log.c_str()), 0);
     401           0 :     return false;
     402           0 :   }
     403             : 
     404             :   {
     405             :     //can't call connect while holding lock due to possible reactor deadlock
     406           0 :     LogGuid local_log(repo_id_);
     407           0 :     LogGuid remote_log(remote.repo_id_);
     408           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
     409             :                         "attempt to connect_datalink between local %C and remote %C\n",
     410             :                         local_log.c_str(),
     411             :                         remote_log.c_str()), 0);
     412             :     {
     413           0 :       TransportImpl::ConnectionAttribs attribs = attribs_;
     414           0 :       RcHandle<TransportClient> client = rchandle_from(this);
     415           0 :       ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
     416           0 :       result = impl->connect_datalink(remote, attribs, client);
     417           0 :     }
     418           0 :     if (!result.success_) {
     419           0 :       if (DCPS_debug_level) {
     420           0 :         LogGuid writer_log(repo_id_);
     421           0 :         LogGuid reader_log(remote.repo_id_);
     422           0 :         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i - ")
     423             :                    ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
     424             :                    writer_log.c_str(),
     425             :                    reader_log.c_str()));
     426           0 :       }
     427           0 :       return false;
     428             :     }
     429           0 :   }
     430             : 
     431           0 :   LogGuid local_log(repo_id_);
     432           0 :   LogGuid remote_log(remote.repo_id_);
     433           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
     434             :                       "connection between local %C and remote %C initiation successful\n",
     435             :                       local_log.c_str(),
     436             :                       remote_log.c_str()), 0);
     437           0 :   return true;
     438           0 : }
     439             : 
     440             : bool
     441           0 : TransportClient::PendingAssoc::initiate_connect(TransportClient* tc,
     442             :                                                 Guard& guard)
     443             : {
     444           0 :   LogGuid local_log(tc->repo_id_);
     445           0 :   LogGuid remote_log(data_.remote_id_);
     446           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
     447             :                       "between %C and remote %C\n",
     448             :                       local_log.c_str(),
     449             :                       remote_log.c_str()), 0);
     450             :   // find the next impl / blob entry that have matching types
     451           0 :   while (!impls_.empty()) {
     452           0 :     TransportImpl_rch impl = impls_.back().lock();
     453           0 :     if (!impl) {
     454           0 :       impls_.pop_back();
     455           0 :       continue;
     456             :     }
     457           0 :     const OPENDDS_STRING type = impl->transport_type();
     458             : 
     459           0 :     for (; blob_index_ < data_.remote_data_.length(); ++blob_index_) {
     460           0 :       if (data_.remote_data_[blob_index_].transport_type.in() == type) {
     461             :         const TransportImpl::RemoteTransport remote_transport = {
     462           0 :           data_.remote_id_, data_.remote_data_[blob_index_].data, data_.discovery_locator_.data,
     463           0 :           data_.participant_discovered_at_, data_.remote_transport_context_,
     464           0 :           data_.publication_transport_priority_, data_.remote_reliable_, data_.remote_durable_};
     465             : 
     466           0 :         TransportImpl::AcceptConnectResult res;
     467             :         bool ret;
     468             :         {
     469             :           // Release the PendingAssoc object's mutex_ since initiate_connect_i doesn't need it.
     470           0 :           Reverse_Lock_t rev_mutex(mutex_);
     471           0 :           ACE_GUARD_RETURN(Reverse_Lock_t, rev_pend_guard, rev_mutex, false);
     472           0 :           ret = tc->initiate_connect_i(res, impl, remote_transport, attribs_, guard);
     473           0 :         }
     474           0 :         if (!ret) {
     475             :           //tc init connect returned false there is no PendingAssoc left in map because use_datalink_i finished elsewhere
     476             :           //so don't do anything further with pend and return success or failure up to tc's associate
     477           0 :           if (res.success_ ) {
     478           0 :             VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) PendingAssoc::initiate_connect - ")
     479             :                                 ACE_TEXT("between %C and remote %C success\n"),
     480             :                                 local_log.c_str(),
     481             :                                 remote_log.c_str()), 0);
     482           0 :             return true;
     483             :           }
     484             : 
     485           0 :           VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
     486             :                               "between %C and remote %C unsuccessful\n",
     487             :                               local_log.c_str(),
     488             :                               remote_log.c_str()), 0);
     489             :         }
     490             : 
     491           0 :         if (res.success_) {
     492             : 
     493           0 :           ++blob_index_;
     494             : 
     495           0 :           if (!res.link_.is_nil()) {
     496             : 
     497             :             {
     498             :               // use_datalink_i calls PendingAssoc::reset_client which needs the PendingAssoc's mutex_.
     499           0 :               Reverse_Lock_t rev_mutex(mutex_);
     500           0 :               ACE_GUARD_RETURN(Reverse_Lock_t, rev_pend_guard, rev_mutex, false);
     501           0 :               tc->use_datalink_i(data_.remote_id_, res.link_, guard);
     502           0 :             }
     503             :           } else {
     504           0 :             VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
     505             :                                 "resulting link from initiate_connect_i (local: %C to remote: %C) was nil\n",
     506             :                                 local_log.c_str(),
     507             :                                 remote_log.c_str()), 0);
     508             :           }
     509             : 
     510           0 :           return true;
     511             :         } else {
     512           0 :           VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
     513             :                               "result of initiate_connect_i (local: %C to remote: %C) was not success\n",
     514             :                               local_log.c_str(),
     515             :                               remote_log.c_str()), 0);
     516             :         }
     517           0 :       }
     518             :     }
     519             : 
     520           0 :     impls_.pop_back();
     521           0 :     blob_index_ = 0;
     522           0 :   }
     523             : 
     524           0 :   return false;
     525           0 : }
     526             : 
     527             : void
     528           0 : TransportClient::use_datalink(const GUID_t& remote_id,
     529             :                               const DataLink_rch& link)
     530             : {
     531           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     532             : 
     533           0 :   use_datalink_i(remote_id, link, guard);
     534           0 : }
     535             : 
     536             : void
     537           0 : TransportClient::use_datalink_i(const GUID_t& remote_id_ref,
     538             :                                 const DataLink_rch& link,
     539             :                                 Guard& guard)
     540             : {
     541             :   // Try to make a local copy of remote_id to use in calls
     542             :   // because the reference could be invalidated if the caller
     543             :   // reference location is deleted (i.e. in stop_accepting_or_connecting
     544             :   // if use_datalink_i was called from passive_connection)
     545             :   // Does changing this from a reference to a local affect anything going forward?
     546           0 :   GUID_t remote_id(remote_id_ref);
     547             : 
     548           0 :   LogGuid peerId_log(remote_id);
     549           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
     550             :             "TransportClient(%@) using datalink[%@] from %C\n",
     551             :             this,
     552             :             link.in(),
     553             :             peerId_log.c_str()), 0);
     554             : 
     555           0 :   PendingMap::iterator iter = pending_.find(remote_id);
     556             : 
     557           0 :   if (iter == pending_.end()) {
     558           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
     559             :                         "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
     560             :                         this,
     561             :                         link.in(),
     562             :                         peerId_log.c_str()), 0);
     563           0 :     return;
     564             :   }
     565             : 
     566           0 :   PendingAssoc_rch pend = iter->second;
     567           0 :   ACE_GUARD(ACE_Thread_Mutex, pend_guard, pend->mutex_);
     568           0 :   const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
     569           0 :   bool ok = false;
     570             : 
     571           0 :   if (link.is_nil()) {
     572             : 
     573           0 :     if (pend->active_ && pend->initiate_connect(this, guard)) {
     574           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
     575             :                           "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect to remote %C\n",
     576             :                           this,
     577             :                           link.in(),
     578             :                           peerId_log.c_str()), 0);
     579           0 :       return;
     580             :     }
     581             : 
     582           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
     583             :               "TransportClient(%@) using datalink[%@] link is nil, since this is passive side, connection to remote %C timed out\n",
     584             :               this,
     585             :               link.in(),
     586             :               peerId_log.c_str()), 0);
     587             :   } else { // link is ready to use
     588           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
     589             :               "TransportClient(%@) about to add_link[%@] to remote: %C\n",
     590             :               this,
     591             :               link.in(),
     592             :               peerId_log.c_str()), 0);
     593             : 
     594           0 :     add_link(link, remote_id);
     595           0 :     ok = true;
     596             :   }
     597             : 
     598             :   // either link is valid or assoc failed, clean up pending object
     599           0 :   for (size_t i = 0; i < pend->impls_.size(); ++i) {
     600           0 :     TransportImpl_rch impl = pend->impls_[i].lock();
     601           0 :     if (impl) {
     602           0 :       impl->stop_accepting_or_connecting(*this, pend->data_.remote_id_, false, !ok);
     603             :     }
     604           0 :   }
     605             : 
     606           0 :   pend_guard.release();
     607           0 :   pend->reset_client();
     608           0 :   pending_assoc_timer_->cancel_timer(pend);
     609           0 :   prev_pending_.insert(std::make_pair(iter->first, iter->second));
     610           0 :   pending_.erase(iter);
     611             : 
     612             :   // Release TransportClient's lock as we're done updating its data.
     613           0 :   guard.release();
     614             : 
     615           0 :   transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
     616           0 : }
     617             : 
     618             : void
     619           0 : TransportClient::add_link(const DataLink_rch& link, const GUID_t& peer)
     620             : {
     621           0 :   links_.insert_link(link);
     622           0 :   data_link_index_[peer] = link;
     623             : 
     624           0 :   TransportReceiveListener_rch trl = get_receive_listener();
     625             : 
     626           0 :   OPENDDS_ASSERT(repo_id_ != GUID_UNKNOWN);
     627           0 :   if (trl) {
     628           0 :     link->make_reservation(peer, repo_id_, trl, reliable_);
     629             :   } else {
     630           0 :     link->make_reservation(peer, repo_id_, get_send_listener(), reliable_);
     631             :   }
     632           0 : }
     633             : 
     634             : void
     635           3 : TransportClient::stop_associating()
     636             : {
     637           3 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     638           3 :   for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
     639             :     {
     640             :       // The transport impl may have resource for a pending connection.
     641           0 :       ACE_Guard<ACE_Thread_Mutex> guard(it->second->mutex_);
     642           0 :       for (size_t i = 0; i < it->second->impls_.size(); ++i) {
     643           0 :         TransportImpl_rch impl = it->second->impls_[i].lock();
     644           0 :         if (impl) {
     645           0 :           impl->stop_accepting_or_connecting(*this, it->second->data_.remote_id_, true, true);
     646             :         }
     647           0 :       }
     648           0 :     }
     649           0 :     it->second->reset_client();
     650           0 :     pending_assoc_timer_->cancel_timer(it->second);
     651           0 :     prev_pending_.insert(std::make_pair(it->first, it->second));
     652             :   }
     653           3 :   pending_.clear();
     654           3 : }
     655             : 
     656             : void
     657           0 : TransportClient::stop_associating(const GUID_t* repos, CORBA::ULong length)
     658             : {
     659           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     660             : 
     661           0 :   if (repos == 0 || length == 0) {
     662           0 :     return;
     663             :   } else {
     664           0 :     for (CORBA::ULong i = 0; i < length; ++i) {
     665           0 :       PendingMap::iterator iter = pending_.find(repos[i]);
     666           0 :       if (iter != pending_.end()) {
     667             :         {
     668             :           // The transport impl may have resource for a pending connection.
     669           0 :           ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
     670           0 :           for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
     671           0 :             TransportImpl_rch impl = iter->second->impls_[i].lock();
     672           0 :             if (impl) {
     673           0 :               impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
     674             :             }
     675           0 :           }
     676           0 :         }
     677           0 :         iter->second->reset_client();
     678           0 :         pending_assoc_timer_->cancel_timer(iter->second);
     679           0 :         prev_pending_.insert(std::make_pair(iter->first, iter->second));
     680           0 :         pending_.erase(iter);
     681             :       }
     682             :     }
     683             :   }
     684           0 : }
     685             : 
     686             : void
     687           0 : TransportClient::send_final_acks()
     688             : {
     689           0 :   links_.send_final_acks(get_guid());
     690           0 : }
     691             : 
     692             : void
     693           0 : TransportClient::disassociate(const GUID_t& peerId)
     694             : {
     695           0 :   LogGuid peerId_log(peerId);
     696           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
     697             :             "TransportClient(%@) disassociating from %C\n",
     698             :             this,
     699             :             peerId_log.c_str()), 5);
     700             : 
     701           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     702             : 
     703           0 :   PendingMap::iterator iter = pending_.find(peerId);
     704           0 :   if (iter != pending_.end()) {
     705             :     {
     706             :       // The transport impl may have resource for a pending connection.
     707           0 :       ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
     708           0 :       for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
     709           0 :         TransportImpl_rch impl = iter->second->impls_[i].lock();
     710           0 :         if (impl) {
     711           0 :           impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
     712             :         }
     713           0 :       }
     714           0 :     }
     715           0 :     iter->second->reset_client();
     716           0 :     pending_assoc_timer_->cancel_timer(iter->second);
     717           0 :     prev_pending_.insert(std::make_pair(iter->first, iter->second));
     718           0 :     pending_.erase(iter);
     719           0 :     return;
     720             :   }
     721             : 
     722           0 :   const DataLinkIndex::iterator found = data_link_index_.find(peerId);
     723             : 
     724           0 :   if (found == data_link_index_.end()) {
     725           0 :     if (DCPS_debug_level > 4) {
     726           0 :       const LogGuid log(peerId);
     727           0 :       ACE_DEBUG((LM_DEBUG,
     728             :                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
     729             :                  ACE_TEXT("no link for remote peer %C\n"),
     730             :                  log.c_str()));
     731           0 :     }
     732             : 
     733           0 :     return;
     734             :   }
     735             : 
     736           0 :   const DataLink_rch link = found->second;
     737             : 
     738             :   //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
     739             :   //otherwise it could be removed in transport_detached()
     740           0 :   data_link_index_.erase(found);
     741           0 :   DataLinkSetMap released;
     742             : 
     743           0 :   if (DCPS_debug_level > 4) {
     744           0 :     ACE_DEBUG((LM_DEBUG,
     745             :                ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
     746             :                ACE_TEXT("about to release_reservations for link[%@]\n"),
     747             :                link.in()));
     748             :   }
     749             : 
     750           0 :   OPENDDS_ASSERT(repo_id_ != GUID_UNKNOWN);
     751           0 :   link->release_reservations(peerId, repo_id_, released);
     752             : 
     753           0 :   if (!released.empty()) {
     754             : 
     755           0 :     if (DCPS_debug_level > 4) {
     756           0 :       ACE_DEBUG((LM_DEBUG,
     757             :                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
     758             :                  ACE_TEXT("about to remove_link[%@] from links_\n"),
     759             :                  link.in()));
     760             :     }
     761           0 :     links_.remove_link(link);
     762             : 
     763           0 :     if (DCPS_debug_level > 4) {
     764           0 :       LogGuid logger(repo_id_);
     765           0 :       ACE_DEBUG((LM_DEBUG,
     766             :                  ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
     767             :                  logger.c_str(),
     768             :                  link.in()));
     769           0 :     }
     770             :     // Datalink is no longer used for any remote peer by this TransportClient
     771           0 :     link->remove_listener(repo_id_);
     772             : 
     773             :   }
     774           0 : }
     775             : 
     776           0 : void TransportClient::transport_stop()
     777             : {
     778           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     779           0 :   const ImplsType impls = impls_;
     780           0 :   const GUID_t repo_id = repo_id_;
     781           0 :   guard.release();
     782             : 
     783           0 :   if (repo_id == GUID_UNKNOWN) {
     784             :     // Not associated so nothing to stop.
     785           0 :     return;
     786             :   }
     787             : 
     788           0 :   for (size_t i = 0; i < impls.size(); ++i) {
     789           0 :     const TransportImpl_rch impl = impls[i].lock();
     790           0 :     if (impl) {
     791           0 :       impl->client_stop(repo_id);
     792             :     }
     793           0 :   }
     794           0 : }
     795             : 
     796             : void
     797           0 : TransportClient::register_for_reader(const GUID_t& participant,
     798             :                                      const GUID_t& writerid,
     799             :                                      const GUID_t& readerid,
     800             :                                      const TransportLocatorSeq& locators,
     801             :                                      OpenDDS::DCPS::DiscoveryListener* listener)
     802             : {
     803           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     804           0 :   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
     805           0 :        pos != limit;
     806           0 :        ++pos) {
     807           0 :     TransportImpl_rch impl = pos->lock();
     808           0 :     if (impl) {
     809           0 :       impl->register_for_reader(participant, writerid, readerid, locators, listener);
     810             :     }
     811           0 :   }
     812           0 : }
     813             : 
     814             : void
     815           0 : TransportClient::unregister_for_reader(const GUID_t& participant,
     816             :                                        const GUID_t& writerid,
     817             :                                        const GUID_t& readerid)
     818             : {
     819           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     820           0 :   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
     821           0 :        pos != limit;
     822           0 :        ++pos) {
     823           0 :     TransportImpl_rch impl = pos->lock();
     824           0 :     if (impl) {
     825           0 :       impl->unregister_for_reader(participant, writerid, readerid);
     826             :     }
     827           0 :   }
     828           0 : }
     829             : 
     830             : void
     831           0 : TransportClient::register_for_writer(const GUID_t& participant,
     832             :                                      const GUID_t& readerid,
     833             :                                      const GUID_t& writerid,
     834             :                                      const TransportLocatorSeq& locators,
     835             :                                      DiscoveryListener* listener)
     836             : {
     837           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     838           0 :   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
     839           0 :        pos != limit;
     840           0 :        ++pos) {
     841           0 :     TransportImpl_rch impl = pos->lock();
     842           0 :     if (impl) {
     843           0 :       impl->register_for_writer(participant, readerid, writerid, locators, listener);
     844             :     }
     845           0 :   }
     846           0 : }
     847             : 
     848             : void
     849           0 : TransportClient::unregister_for_writer(const GUID_t& participant,
     850             :                                        const GUID_t& readerid,
     851             :                                        const GUID_t& writerid)
     852             : {
     853           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     854           0 :   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
     855           0 :        pos != limit;
     856           0 :        ++pos) {
     857           0 :     TransportImpl_rch impl = pos->lock();
     858           0 :     if (impl) {
     859           0 :       impl->unregister_for_writer(participant, readerid, writerid);
     860             :     }
     861           0 :   }
     862           0 : }
     863             : 
     864             : void
     865           0 : TransportClient::update_locators(const GUID_t& remote,
     866             :                                  const TransportLocatorSeq& locators)
     867             : {
     868           0 :   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
     869           0 :   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
     870           0 :        pos != limit;
     871           0 :        ++pos) {
     872           0 :     TransportImpl_rch impl = pos->lock();
     873           0 :     if (impl) {
     874           0 :       impl->update_locators(remote, locators);
     875             :     }
     876           0 :   }
     877           0 : }
     878             : 
     879             : WeakRcHandle<ICE::Endpoint>
     880           0 : TransportClient::get_ice_endpoint()
     881             : {
     882             :   // The one-to-many relationship with impls implies that this should
     883             :   // return a set of endpoints instead of a single endpoint or null.
     884             :   // For now, we will assume a single impl.
     885             : 
     886           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, WeakRcHandle<ICE::Endpoint>());
     887           0 :   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
     888           0 :        pos != limit;
     889           0 :        ++pos) {
     890           0 :     TransportImpl_rch impl = pos->lock();
     891           0 :     if (impl) {
     892           0 :       WeakRcHandle<ICE::Endpoint> endpoint = impl->get_ice_endpoint();
     893           0 :       if (endpoint) { return endpoint; }
     894           0 :     }
     895           0 :   }
     896             : 
     897           0 :   return WeakRcHandle<ICE::Endpoint>();
     898           0 : }
     899             : 
     900             : bool
     901           0 : TransportClient::send_response(const GUID_t& peer,
     902             :                                const DataSampleHeader& header,
     903             :                                Message_Block_Ptr payload)
     904             : {
     905           0 :   DataLinkIndex::iterator found = data_link_index_.find(peer);
     906             : 
     907           0 :   if (found == data_link_index_.end()) {
     908           0 :     if (DCPS_debug_level > 4) {
     909           0 :       LogGuid logger(peer);
     910           0 :       ACE_DEBUG((LM_DEBUG,
     911             :                  ACE_TEXT("(%P|%t) TransportClient::send_response: ")
     912             :                  ACE_TEXT("no link for publication %C, ")
     913             :                  ACE_TEXT("not sending response.\n"),
     914             :                  logger.c_str()));
     915           0 :     }
     916             : 
     917           0 :     return false;
     918             :   }
     919             : 
     920           0 :   DataLinkSet singular;
     921           0 :   singular.insert_link(found->second);
     922           0 :   singular.send_response(peer, header, move(payload));
     923           0 :   return true;
     924           0 : }
     925             : 
     926             : void
     927           0 : TransportClient::send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
     928             : {
     929           0 :   if (send_list.head() == 0) {
     930           0 :     return;
     931             :   }
     932           0 :   ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
     933           0 :   send_i(send_list, transaction_id);
     934           0 : }
     935             : 
     936             : SendControlStatus
     937           0 : TransportClient::send_w_control(SendStateDataSampleList send_list,
     938             :                                 const DataSampleHeader& header,
     939             :                                 Message_Block_Ptr msg,
     940             :                                 const GUID_t& destination)
     941             : {
     942           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
     943             :                    send_transaction_lock_, SEND_CONTROL_ERROR);
     944           0 :   if (send_list.head()) {
     945           0 :     send_i(send_list, 0);
     946             :   }
     947           0 :   return send_control_to(header, move(msg), destination);
     948           0 : }
     949             : 
     950             : void
     951           0 : TransportClient::send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
     952             : {
     953           0 :   if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
     954           0 :     if (transaction_id > max_transaction_id_seen_) {
     955           0 :       max_transaction_id_seen_ = transaction_id;
     956           0 :       max_transaction_tail_ = send_list.tail();
     957             :     }
     958           0 :     return;
     959             :   } else /* transaction_id == expected_transaction_id */ {
     960             : 
     961           0 :     DataSampleElement* cur = send_list.head();
     962           0 :     if (max_transaction_tail_ == 0) {
     963             :       //Means no future transaction beat this transaction into send
     964           0 :       if (transaction_id != 0)
     965           0 :         max_transaction_id_seen_ = expected_transaction_id_;
     966             :       // Only send this current transaction
     967           0 :       max_transaction_tail_ = send_list.tail();
     968             :     }
     969           0 :     DataLinkSet send_links;
     970             : 
     971           0 :     while (cur != 0) {
     972             :       // VERY IMPORTANT NOTE:
     973             :       //
     974             :       // We have to be very careful in how we deal with the current
     975             :       // DataSampleElement.  The issue is that once we have invoked
     976             :       // data_delivered() on the send_listener_ object, or we have invoked
     977             :       // send() on the pub_links, we can no longer access the current
     978             :       // DataSampleElement!Thus, we need to get the next
     979             :       // DataSampleElement (pointer) from the current element now,
     980             :       // while it is safe.
     981             :       DataSampleElement* next_elem;
     982           0 :       if (cur != max_transaction_tail_) {
     983           0 :         next_elem = cur->get_next_send_sample();
     984             :       } else {
     985           0 :         next_elem = max_transaction_tail_;
     986             :       }
     987             :       DataLinkSet_rch pub_links =
     988           0 :         (cur->get_num_subs() > 0)
     989             :         ? DataLinkSet_rch(links_.select_links(cur->get_sub_ids(), cur->get_num_subs()))
     990           0 :         : DataLinkSet_rch(&links_, inc_count());
     991             : 
     992           0 :       if (pub_links.is_nil() || pub_links->empty()) {
     993             :         // NOTE: This is the "local publisher id is not currently
     994             :         //       associated with any remote subscriber ids" case.
     995             : 
     996           0 :         if (DCPS_debug_level > 4) {
     997           0 :           LogGuid logger(cur->get_pub_id());
     998           0 :           ACE_DEBUG((LM_DEBUG,
     999             :                      ACE_TEXT("(%P|%t) TransportClient::send_i: ")
    1000             :                      ACE_TEXT("no links for publication %C, ")
    1001             :                      ACE_TEXT("not sending element %@ for transaction: %d.\n"),
    1002             :                      logger.c_str(),
    1003             :                      cur,
    1004             :                      cur->transaction_id()));
    1005           0 :         }
    1006             : 
    1007             :         // We tell the send_listener_ that all of the remote subscriber ids
    1008             :         // that wanted the data (all zero of them) have indeed received
    1009             :         // the data.
    1010           0 :         cur->get_send_listener()->data_delivered(cur);
    1011             : 
    1012             :       } else {
    1013           0 :         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
    1014             :                   , cur), 5);
    1015             : 
    1016             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
    1017             : 
    1018             :         // Content-Filtering adjustment to the pub_links:
    1019             :         // - If the sample should be filtered out of all subscriptions on a given
    1020             :         //   DataLink, then exclude that link from the subset that we'll send to.
    1021             :         // - If the sample should be filtered out of some (or none) of the subs,
    1022             :         //   then record that information in the DataSampleElement so that the
    1023             :         //   header's content_filter_entries_ can be marshaled before it's sent.
    1024           0 :         if (cur->filter_out_.ptr()) {
    1025           0 :           DataLinkSet_rch subset;
    1026           0 :           DataLinkSet::GuardType guard(pub_links->lock());
    1027             :           typedef DataLinkSet::MapType MapType;
    1028           0 :           MapType& map = pub_links->map();
    1029             : 
    1030           0 :           for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
    1031             :             size_t n_subs;
    1032             :             GUIDSeq_var ti =
    1033           0 :               itr->second->target_intersection(cur->get_pub_id(),
    1034           0 :                                                cur->filter_out_.in(), n_subs);
    1035             : 
    1036           0 :             if (ti.ptr() == 0 || ti->length() != n_subs) {
    1037           0 :               if (!subset.in()) {
    1038           0 :                 subset = make_rch<DataLinkSet>();
    1039             :               }
    1040             : 
    1041           0 :               subset->insert_link(itr->second);
    1042           0 :               cur->filter_per_link_[itr->first] = ti._retn();
    1043             : 
    1044             :             } else {
    1045           0 :               VDBG((LM_DEBUG,
    1046             :                     "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
    1047             :                     itr->second.in()));
    1048             :             }
    1049           0 :           }
    1050             : 
    1051           0 :           if (!subset.in()) {
    1052           0 :             guard.release();
    1053           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
    1054             :             // similar to the "if (pub_links.is_nil())" case above, no links
    1055           0 :             cur->get_send_listener()->data_delivered(cur);
    1056           0 :             if (cur != max_transaction_tail_) {
    1057             :               // Move on to the next DataSampleElement to send.
    1058           0 :               cur = next_elem;
    1059           0 :               continue;
    1060             :             } else {
    1061           0 :               break;
    1062             :             }
    1063             :           }
    1064             : 
    1065           0 :           pub_links = subset;
    1066           0 :         }
    1067             : 
    1068             : #endif
    1069             : 
    1070             :         // This will do several things, including adding to the membership
    1071             :         // of the send_links set.  Any DataLinks added to the send_links
    1072             :         // set will be also told about the send_start() event.  Those
    1073             :         // DataLinks (in the pub_links set) that are already in the
    1074             :         // send_links set will not be told about the send_start() event
    1075             :         // since they heard about it when they were inserted into the
    1076             :         // send_links set.
    1077           0 :         send_links.send_start(pub_links.in());
    1078           0 :         if (cur->get_header().message_id_ != SAMPLE_DATA) {
    1079           0 :           pub_links->send_control(cur);
    1080             :         } else {
    1081           0 :           pub_links->send(cur);
    1082             :         }
    1083             :       }
    1084           0 :       if (cur != max_transaction_tail_) {
    1085             :         // Move on to the next DataSampleElement to send.
    1086           0 :         cur = next_elem;
    1087             :       } else {
    1088           0 :         break;
    1089             :       }
    1090           0 :     }
    1091             : 
    1092             :     // This will inform each DataLink in the set about the stop_send() event.
    1093             :     // It will then clear the send_links_ set.
    1094             :     //
    1095             :     // The reason that the send_links_ set is cleared is because we continually
    1096             :     // reuse the same send_links_ object over and over for each call to this
    1097             :     // send method.
    1098           0 :     GUID_t pub_id = repo_id();
    1099           0 :     send_links.send_stop(pub_id);
    1100           0 :     if (transaction_id != 0) {
    1101           0 :       expected_transaction_id_ = max_transaction_id_seen_ + 1;
    1102             :     }
    1103           0 :     max_transaction_tail_ = 0;
    1104           0 :   }
    1105             : }
    1106             : 
    1107             : TransportSendListener_rch
    1108           0 : TransportClient::get_send_listener()
    1109             : {
    1110           0 :   return rchandle_from(dynamic_cast<TransportSendListener*>(this));
    1111             : }
    1112             : 
    1113             : TransportReceiveListener_rch
    1114           0 : TransportClient::get_receive_listener()
    1115             : {
    1116           0 :   return rchandle_from(dynamic_cast<TransportReceiveListener*>(this));
    1117             : }
    1118             : 
    1119             : SendControlStatus
    1120           0 : TransportClient::send_control(const DataSampleHeader& header,
    1121             :                               Message_Block_Ptr msg)
    1122             : {
    1123           0 :   if (repo_id_ == GUID_UNKNOWN) {
    1124           0 :     return SEND_CONTROL_OK;
    1125             :   }
    1126           0 :   return links_.send_control(repo_id_, get_send_listener(), header, move(msg));
    1127             : }
    1128             : 
    1129             : SendControlStatus
    1130           0 : TransportClient::send_control_to(const DataSampleHeader& header,
    1131             :                                  Message_Block_Ptr msg,
    1132             :                                  const GUID_t& destination)
    1133             : {
    1134           0 :   if (repo_id_ == GUID_UNKNOWN) {
    1135           0 :     return SEND_CONTROL_OK;
    1136             :   }
    1137             : 
    1138           0 :   DataLinkSet singular;
    1139             :   {
    1140           0 :     ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR);
    1141           0 :     DataLinkIndex::iterator found = data_link_index_.find(destination);
    1142             : 
    1143           0 :     if (found == data_link_index_.end()) {
    1144           0 :       return SEND_CONTROL_ERROR;
    1145             :     }
    1146             : 
    1147           0 :     singular.insert_link(found->second);
    1148           0 :   }
    1149           0 :   return singular.send_control(repo_id_, get_send_listener(), header, move(msg));
    1150           0 : }
    1151             : 
    1152             : bool
    1153           0 : TransportClient::remove_sample(const DataSampleElement* sample)
    1154             : {
    1155           0 :   return links_.remove_sample(sample);
    1156             : }
    1157             : 
    1158             : bool
    1159           0 : TransportClient::remove_all_msgs()
    1160             : {
    1161           0 :   if (repo_id_ == GUID_UNKNOWN) {
    1162           0 :     return true;
    1163             :   }
    1164           0 :   return links_.remove_all_msgs(repo_id_);
    1165             : }
    1166             : 
    1167           0 : void TransportClient::terminate_send_if_suspended()
    1168             : {
    1169           0 :   links_.terminate_send_if_suspended();
    1170           0 : }
    1171             : 
    1172           0 : bool TransportClient::associated_with(const GUID_t& remote) const
    1173             : {
    1174           0 :   ACE_Guard<ACE_Thread_Mutex> guard(lock_);
    1175           0 :   if (!guard.locked()) {
    1176           0 :     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::associated_with: "
    1177             :       "lock failed\n"));
    1178           0 :     return false;
    1179             :   }
    1180           0 :   return data_link_index_.count(remote);
    1181           0 : }
    1182             : 
    1183           0 : bool TransportClient::pending_association_with(const GUID_t& remote) const
    1184             : {
    1185           0 :   ACE_Guard<ACE_Thread_Mutex> guard(lock_);
    1186           0 :   if (!guard.locked()) {
    1187           0 :     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::pending_association_with: "
    1188             :       "lock failed\n"));
    1189           0 :     return false;
    1190             :   }
    1191           0 :   return pending_.count(remote);
    1192           0 : }
    1193             : 
    1194           0 : void TransportClient::data_acked(const GUID_t& remote)
    1195             : {
    1196           0 :   TransportSendListener_rch send_listener;
    1197             :   {
    1198           0 :     ACE_Guard<ACE_Thread_Mutex> guard(lock_);
    1199           0 :     if (!guard.locked()) {
    1200           0 :       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::data_acked: "
    1201             :         "lock failed\n"));
    1202           0 :       return;
    1203             :     }
    1204           0 :     send_listener = get_send_listener();
    1205           0 :   }
    1206           0 :   send_listener->data_acked(remote);
    1207           0 : }
    1208             : 
    1209           0 : bool TransportClient::is_leading(const GUID_t& reader_id) const
    1210             : {
    1211           0 :   return links_.is_leading(get_guid(), reader_id);
    1212             : }
    1213             : 
    1214             : 
    1215             : } // namepsace DCPS
    1216             : } // namepsace OpenDDS
    1217             : 
    1218             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16