TransportClient.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "TransportClient.h"
00011 #include "TransportConfig.h"
00012 #include "TransportRegistry.h"
00013 #include "TransportExceptions.h"
00014 #include "TransportReceiveListener.h"
00015 
00016 #include "dds/DdsDcpsInfoUtilsC.h"
00017 
00018 #include "dds/DCPS/DataWriterImpl.h"
00019 #include "dds/DCPS/SendStateDataSampleList.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DCPS/Definitions.h"
00022 
00023 #include "ace/Reactor_Timer_Interface.h"
00024 
00025 #include <algorithm>
00026 #include <iterator>
00027 
00028 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00029 
00030 namespace OpenDDS {
00031 namespace DCPS {
00032 
00033 TransportClient::TransportClient()
00034   : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
00035   , expected_transaction_id_(1)
00036   , max_transaction_id_seen_(0)
00037   , max_transaction_tail_(0)
00038   , swap_bytes_(false)
00039   , cdr_encapsulation_(false)
00040   , reliable_(false)
00041   , durable_(false)
00042   , reverse_lock_(lock_)
00043   , repo_id_(GUID_UNKNOWN)
00044 {
00045 }
00046 
00047 TransportClient::~TransportClient()
00048 {
00049   if (Transport_debug_level > 5) {
00050     GuidConverter converter(repo_id_);
00051     ACE_DEBUG((LM_DEBUG,
00052                ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
00053                OPENDDS_STRING(converter).c_str()));
00054   }
00055 
00056   stop_associating();
00057 
00058   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00059 
00060   for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
00061     for (size_t i = 0; i < impls_.size(); ++i) {
00062       impls_[i]->stop_accepting_or_connecting(*this, it->second->data_.remote_id_);
00063     }
00064 
00065     pending_assoc_timer_->cancel_timer(this, it->second);
00066   }
00067 
00068   pending_assoc_timer_->wait();
00069 
00070 }
00071 
00072 void
00073 TransportClient::enable_transport(bool reliable, bool durable)
00074 {
00075   // Search for a TransportConfig to use:
00076   TransportConfig_rch tc;
00077 
00078   // 1. If this object is an Entity, check if a TransportConfig has been
00079   //    bound either directly to this entity or to a parent entity.
00080   for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this));
00081        ent && tc.is_nil(); ent = ent->parent()) {
00082     tc = ent->transport_config();
00083   }
00084 
00085   if (tc.is_nil()) {
00086     TransportRegistry* const reg = TransportRegistry::instance();
00087     // 2. Check for a TransportConfig that is the default for this Domain.
00088     tc = reg->domain_default_config(domain_id());
00089 
00090     if (tc.is_nil()) {
00091       // 3. Use the global_config if one has been set.
00092       tc = reg->global_config();
00093 
00094       if (!tc.is_nil() && tc->instances_.empty()
00095           && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
00096         // 4. Set the "fallback option" if the global_config is empty.
00097         //    (only applies if the user hasn't changed the global config)
00098         tc = reg->fix_empty_default();
00099       }
00100     }
00101   }
00102 
00103   if (tc.is_nil()) {
00104     ACE_ERROR((LM_ERROR,
00105                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00106                ACE_TEXT("No TransportConfig found.\n")));
00107     throw Transport::NotConfigured();
00108   }
00109 
00110   enable_transport_using_config(reliable, durable, tc);
00111 }
00112 
00113 void
00114 TransportClient::enable_transport_using_config(bool reliable, bool durable,
00115                                                const TransportConfig_rch& tc)
00116 {
00117   swap_bytes_ = tc->swap_bytes_;
00118   cdr_encapsulation_ = false;
00119   reliable_ = reliable;
00120   durable_ = durable;
00121   unsigned long duration = tc->passive_connect_duration_;
00122   if (duration == 0) {
00123     duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION;
00124     if (DCPS_debug_level) {
00125       ACE_DEBUG((LM_WARNING,
00126         ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
00127         ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
00128         ACE_TEXT("default value\n")));
00129     }
00130   }
00131   passive_connect_duration_.set(duration / 1000, (duration % 1000) * 1000);
00132 
00133   const size_t n = tc->instances_.size();
00134 
00135   for (size_t i = 0; i < n; ++i) {
00136     TransportInst_rch inst = tc->instances_[i];
00137 
00138     if (check_transport_qos(*inst)) {
00139       TransportImpl* impl = inst->impl();
00140 
00141       if (impl) {
00142         impls_.push_back(impl);
00143         const CORBA::ULong len = conn_info_.length();
00144         conn_info_.length(len + 1);
00145         impl->connection_info(conn_info_[len]);
00146 
00147 #if defined(OPENDDS_SECURITY)
00148         impl->local_crypto_handle(get_crypto_handle());
00149 #endif
00150 
00151         cdr_encapsulation_ |= inst->requires_cdr();
00152       }
00153     }
00154   }
00155 
00156   if (impls_.empty()) {
00157     ACE_ERROR((LM_ERROR,
00158                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00159                ACE_TEXT("No TransportImpl could be created.\n")));
00160     throw Transport::NotConfigured();
00161   }
00162 }
00163 
00164 
00165 bool
00166 TransportClient::associate(const AssociationData& data, bool active)
00167 {
00168   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
00169 
00170   repo_id_ = get_repo_id();
00171 
00172   if (impls_.empty()) {
00173     if (DCPS_debug_level) {
00174       GuidConverter writer_converter(repo_id_);
00175       GuidConverter reader_converter(data.remote_id_);
00176       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00177                  ACE_TEXT("local %C remote %C no available impls\n"),
00178                  OPENDDS_STRING(writer_converter).c_str(),
00179                  OPENDDS_STRING(reader_converter).c_str()));
00180     }
00181     return false;
00182   }
00183 
00184   bool all_impls_shut_down = true;
00185   for (size_t i = 0; i < impls_.size(); ++i) {
00186     if (!impls_.at(i)->is_shut_down()) {
00187       all_impls_shut_down = false;
00188       break;
00189     }
00190   }
00191 
00192   if (all_impls_shut_down) {
00193     if (DCPS_debug_level) {
00194       GuidConverter writer_converter(repo_id_);
00195       GuidConverter reader_converter(data.remote_id_);
00196       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00197                  ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
00198                  OPENDDS_STRING(writer_converter).c_str(),
00199                  OPENDDS_STRING(reader_converter).c_str()));
00200     }
00201     return false;
00202   }
00203 
00204   PendingMap::iterator iter = pending_.find(data.remote_id_);
00205 
00206   if (iter == pending_.end()) {
00207     RepoId remote_copy(data.remote_id_);
00208     iter = pending_.insert(std::make_pair(remote_copy, make_rch<PendingAssoc>())).first;
00209 
00210     GuidConverter tc_assoc(repo_id_);
00211     GuidConverter remote_new(data.remote_id_);
00212     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
00213               "between %C and remote %C\n",
00214               OPENDDS_STRING(tc_assoc).c_str(),
00215               OPENDDS_STRING(remote_new).c_str()), 0);
00216   } else {
00217 
00218     ACE_ERROR((LM_ERROR,
00219                ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
00220                ACE_TEXT("already associating with remote.\n")));
00221 
00222     return false;
00223 
00224   }
00225 
00226   PendingAssoc_rch pend = iter->second;
00227   pend->active_ = active;
00228   pend->impls_.clear();
00229   pend->blob_index_ = 0;
00230   pend->data_ = data;
00231   pend->attribs_.local_id_ = repo_id_;
00232   pend->attribs_.priority_ = get_priority_value(data);
00233   pend->attribs_.local_reliable_ = reliable_;
00234   pend->attribs_.local_durable_ = durable_;
00235 
00236   if (active) {
00237     pend->impls_.reserve(impls_.size());
00238     std::reverse_copy(impls_.begin(), impls_.end(),
00239                       std::back_inserter(pend->impls_));
00240 
00241     return pend->initiate_connect(this, guard);
00242 
00243   } else { // passive
00244 
00245     // call accept_datalink for each impl / blob pair of the same type
00246     for (size_t i = 0; i < impls_.size(); ++i) {
00247       pend->impls_.push_back(impls_[i]);
00248       const OPENDDS_STRING type = impls_[i]->transport_type();
00249 
00250       for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
00251         if (data.remote_data_[j].transport_type.in() == type) {
00252           const TransportImpl::RemoteTransport remote = {
00253             data.remote_id_, data.remote_data_[j].data,
00254             data.publication_transport_priority_,
00255             data.remote_reliable_, data.remote_durable_};
00256 
00257           TransportImpl::AcceptConnectResult res;
00258           {
00259             // This thread acquired lock_ at the beginning of this method.  Calling accept_datalink might require getting the lock for the transport's reactor.
00260             // If the current thread is not an event handler for the transport's reactor, e.g., the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
00261             // Event handlers in the transport reactor may call passive_connection which calls use_datalink which acquires lock_.  The locking order in this case is transport reactor lock -> lock_.
00262             // To avoid deadlock, we must reverse the lock.
00263             ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
00264             res = impls_[i]->accept_datalink(remote, pend->attribs_, rchandle_from(this));
00265           }
00266 
00267           //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
00268           PendingMap::iterator iter_after_accept = pending_.find(data.remote_id_);
00269 
00270           if (iter_after_accept == pending_.end()) {
00271             //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
00272             //active side connection and completed, thus pend was removed from pending_.  Can return true.
00273             return true;
00274           }
00275 
00276           if (res.success_) {
00277             if (res.link_.is_nil()) {
00278                 // In this case, it may be waiting for the TCP connection to be established.  Just wait without trying other transports.
00279                 pending_assoc_timer_->schedule_timer(this, iter->second);
00280             }
00281             else {
00282                 use_datalink_i(data.remote_id_, res.link_, guard);
00283             }
00284             return true;
00285           }
00286         }
00287       }
00288 
00289       //pend->impls_.push_back(impls_[i]);
00290     }
00291 
00292     pending_assoc_timer_->schedule_timer(this, iter->second);
00293   }
00294 
00295   return true;
00296 }
00297 
00298 int
00299 TransportClient::PendingAssoc::handle_timeout(const ACE_Time_Value&,
00300                                               const void* arg)
00301 {
00302   TransportClient* tc = static_cast<TransportClient*>(const_cast<void*>(arg));
00303 
00304   tc->use_datalink(data_.remote_id_, DataLink_rch());
00305 
00306   return 0;
00307 }
00308 
00309 bool
00310 TransportClient::initiate_connect_i(TransportImpl::AcceptConnectResult& result,
00311                                     TransportImpl* impl,
00312                                     const TransportImpl::RemoteTransport& remote,
00313                                     const TransportImpl::ConnectionAttribs& attribs_,
00314                                     Guard& guard)
00315 {
00316   if (!guard.locked()) {
00317     //don't own the lock_ so can't release it...shouldn't happen
00318     GuidConverter local(repo_id_);
00319     GuidConverter remote_conv(remote.repo_id_);
00320     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00321                         "guard was not locked, return false - initiate_connect_i between local %C and remote %C unsuccessful\n",
00322                         OPENDDS_STRING(local).c_str(),
00323                         OPENDDS_STRING(remote_conv).c_str()), 0);
00324     return false;
00325   }
00326 
00327   {
00328     //can't call connect while holding lock due to possible reactor deadlock
00329     guard.release();
00330     GuidConverter local(repo_id_);
00331     GuidConverter remote_conv(remote.repo_id_);
00332     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00333                         "attempt to connect_datalink between local %C and remote %C\n",
00334                         OPENDDS_STRING(local).c_str(),
00335                         OPENDDS_STRING(remote_conv).c_str()), 0);
00336     result = impl->connect_datalink(remote, attribs_, rchandle_from(this));
00337     guard.acquire();
00338     if (!result.success_) {
00339       if (DCPS_debug_level) {
00340         GuidConverter writer_converter(repo_id_);
00341         GuidConverter reader_converter(remote.repo_id_);
00342         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00343                    ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
00344                    OPENDDS_STRING(writer_converter).c_str(),
00345                    OPENDDS_STRING(reader_converter).c_str()));
00346       }
00347       return false;
00348     }
00349   }
00350 
00351   GuidConverter local(repo_id_);
00352   GuidConverter remote_conv(remote.repo_id_);
00353   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00354                       "connection between local %C and remote %C initiation successful\n",
00355                       OPENDDS_STRING(local).c_str(),
00356                       OPENDDS_STRING(remote_conv).c_str()), 0);
00357   return true;
00358 }
00359 
00360 bool
00361 TransportClient::PendingAssoc::initiate_connect(TransportClient* tc,
00362                                                 Guard& guard)
00363 {
00364   GuidConverter local(tc->repo_id_);
00365   GuidConverter remote(data_.remote_id_);
00366   VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
00367                       "between %C and remote %C\n",
00368                       OPENDDS_STRING(local).c_str(),
00369                       OPENDDS_STRING(remote).c_str()), 0);
00370   // find the next impl / blob entry that have matching types
00371   while (!impls_.empty()) {
00372     TransportImpl* impl = impls_.back();
00373     const OPENDDS_STRING type = impl->transport_type();
00374 
00375     for (; blob_index_ < data_.remote_data_.length(); ++blob_index_) {
00376       if (data_.remote_data_[blob_index_].transport_type.in() == type) {
00377         const TransportImpl::RemoteTransport remote = {
00378           data_.remote_id_, data_.remote_data_[blob_index_].data,
00379           data_.publication_transport_priority_,
00380           data_.remote_reliable_, data_.remote_durable_};
00381 
00382         TransportImpl::AcceptConnectResult res;
00383         GuidConverter tmp_local(tc->repo_id_);
00384         GuidConverter tmp_remote(data_.remote_id_);
00385         if (!tc->initiate_connect_i(res, impl, remote, attribs_, guard)) {
00386           //tc init connect returned false there is no PendingAssoc left in map because use_datalink_i finished elsewhere
00387           //so don't do anything further with pend and return success or failure up to tc's associate
00388           if (res.success_ ) {
00389             GuidConverter local(tc->repo_id_);
00390             GuidConverter remote(data_.remote_id_);
00391             VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) PendingAssoc::initiate_connect - ")
00392                                 ACE_TEXT("between %C and remote %C success\n"),
00393                                 OPENDDS_STRING(local).c_str(),
00394                                 OPENDDS_STRING(remote).c_str()), 0);
00395             return true;
00396           }
00397 
00398           VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
00399                               "between %C and remote %C unsuccessful\n",
00400                               OPENDDS_STRING(tmp_local).c_str(),
00401                               OPENDDS_STRING(tmp_remote).c_str()), 0);
00402           break;
00403         }
00404 
00405         if (res.success_) {
00406 
00407           ++blob_index_;
00408 
00409           if (!res.link_.is_nil()) {
00410 
00411             tc->use_datalink_i(data_.remote_id_, res.link_, guard);
00412           } else {
00413             GuidConverter local(tc->repo_id_);
00414             GuidConverter remote(data_.remote_id_);
00415             VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
00416                                 "resulting link from initiate_connect_i (local: %C to remote: %C) was nil\n",
00417                                 OPENDDS_STRING(local).c_str(),
00418                                 OPENDDS_STRING(remote).c_str()), 0);
00419           }
00420 
00421           return true;
00422         } else {
00423           GuidConverter local(tc->repo_id_);
00424           GuidConverter remote(data_.remote_id_);
00425           VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
00426                               "result of initiate_connect_i (local: %C to remote: %C) was not success \n",
00427                               OPENDDS_STRING(local).c_str(),
00428                               OPENDDS_STRING(remote).c_str()), 0);
00429         }
00430       }
00431     }
00432 
00433     impls_.pop_back();
00434     blob_index_ = 0;
00435   }
00436 
00437   return false;
00438 }
00439 
00440 void
00441 TransportClient::use_datalink(const RepoId& remote_id,
00442                               const DataLink_rch& link)
00443 {
00444   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00445 
00446   use_datalink_i(remote_id, link, guard);
00447 }
00448 
00449 void
00450 TransportClient::use_datalink_i(const RepoId& remote_id_ref,
00451                                 const DataLink_rch& link,
00452                                 Guard& guard)
00453 {
00454   // Try to make a local copy of remote_id to use in calls
00455   // because the reference could be invalidated if the caller
00456   // reference location is deleted (i.e. in stop_accepting_or_connecting
00457   // if use_datalink_i was called from passive_connection)
00458   // Does changing this from a reference to a local affect anything going forward?
00459   RepoId remote_id(remote_id_ref);
00460 
00461   GuidConverter peerId_conv(remote_id);
00462   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00463             "TransportClient(%@) using datalink[%@] from %C\n",
00464             this,
00465             link.in(),
00466             OPENDDS_STRING(peerId_conv).c_str()), 0);
00467 
00468   PendingMap::iterator iter = pending_.find(remote_id);
00469 
00470   if (iter == pending_.end()) {
00471     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00472                         "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
00473                         this,
00474                         link.in(),
00475                         OPENDDS_STRING(peerId_conv).c_str()), 0);
00476     return;
00477   }
00478 
00479   PendingAssoc_rch pend = iter->second;
00480   const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
00481   bool ok = false;
00482 
00483   if (link.is_nil()) {
00484 
00485     if (pend->active_ && pend->initiate_connect(this, guard)) {
00486       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00487                           "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect\n",
00488                           this,
00489                           link.in(),
00490                           OPENDDS_STRING(peerId_conv).c_str()), 0);
00491       return;
00492     }
00493 
00494   } else { // link is ready to use
00495     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00496               "TransportClient(%@) about to add_link[%@] to remote: %C\n",
00497               this,
00498               link.in(),
00499               OPENDDS_STRING(peerId_conv).c_str()), 0);
00500 
00501     add_link(link, remote_id);
00502     ok = true;
00503   }
00504 
00505   // either link is valid or assoc failed, clean up pending object
00506   // for passive side processing
00507   if (!pend->active_) {
00508 
00509     for (size_t i = 0; i < pend->impls_.size(); ++i) {
00510       pend->impls_[i]->stop_accepting_or_connecting(*this, pend->data_.remote_id_);
00511     }
00512   }
00513 
00514   pending_.erase(iter);
00515 
00516   guard.release();
00517 
00518   pending_assoc_timer_->cancel_timer(this, pend);
00519 
00520   transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
00521 }
00522 
00523 void
00524 TransportClient::add_link(const DataLink_rch& link, const RepoId& peer)
00525 {
00526   links_.insert_link(link);
00527   data_link_index_[peer] = link;
00528 
00529   TransportReceiveListener_rch trl = get_receive_listener();
00530 
00531   if (trl) {
00532     link->make_reservation(peer, repo_id_, trl);
00533 
00534   } else {
00535     link->make_reservation(peer, repo_id_, get_send_listener());
00536   }
00537 }
00538 
00539 void
00540 TransportClient::stop_associating()
00541 {
00542   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00543   pending_.clear();
00544 }
00545 
00546 void
00547 TransportClient::stop_associating(const GUID_t* repos, CORBA::ULong length)
00548 {
00549   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00550 
00551   if (repos == 0 || length == 0) {
00552     return;
00553   } else {
00554     for (CORBA::ULong i = 0; i < length; ++i) {
00555       pending_.erase(repos[i]);
00556     }
00557   }
00558 }
00559 
00560 void
00561 TransportClient::send_final_acks()
00562 {
00563   links_.send_final_acks (get_repo_id());
00564 }
00565 
00566 void
00567 TransportClient::disassociate(const RepoId& peerId)
00568 {
00569   GuidConverter peerId_conv(peerId);
00570   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
00571             "TransportClient(%@) disassociating from %C\n",
00572             this,
00573             OPENDDS_STRING(peerId_conv).c_str()), 5);
00574 
00575   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00576 
00577   if (pending_.erase(peerId)) {
00578     return;
00579   }
00580 
00581   const DataLinkIndex::iterator found = data_link_index_.find(peerId);
00582 
00583   if (found == data_link_index_.end()) {
00584     if (DCPS_debug_level > 4) {
00585       const GuidConverter converter(peerId);
00586       ACE_DEBUG((LM_DEBUG,
00587                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00588                  ACE_TEXT("no link for remote peer %C\n"),
00589                  OPENDDS_STRING(converter).c_str()));
00590     }
00591 
00592     return;
00593   }
00594 
00595   const DataLink_rch link = found->second;
00596 
00597   //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
00598   //otherwise it could be removed in transport_detached()
00599   data_link_index_.erase(found);
00600   DataLinkSetMap released;
00601 
00602     if (DCPS_debug_level > 4) {
00603       ACE_DEBUG((LM_DEBUG,
00604                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00605                  ACE_TEXT("about to release_reservations for link[%@] \n"),
00606                  link.in()));
00607     }
00608 
00609     link->release_reservations(peerId, repo_id_, released);
00610 
00611   if (!released.empty()) {
00612 
00613     if (DCPS_debug_level > 4) {
00614       ACE_DEBUG((LM_DEBUG,
00615                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00616                  ACE_TEXT("about to remove_link[%@] from links_\n"),
00617                  link.in()));
00618     }
00619     links_.remove_link(link);
00620 
00621     if (DCPS_debug_level > 4) {
00622       GuidConverter converter(repo_id_);
00623       ACE_DEBUG((LM_DEBUG,
00624                  ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
00625                  OPENDDS_STRING(converter).c_str(),
00626                  link.in()));
00627     }
00628     // Datalink is no longer used for any remote peer by this TransportClient
00629     link->remove_listener(repo_id_);
00630 
00631   }
00632 }
00633 
00634 void
00635 TransportClient::register_for_reader(const RepoId& participant,
00636                                      const RepoId& writerid,
00637                                      const RepoId& readerid,
00638                                      const TransportLocatorSeq& locators,
00639                                      OpenDDS::DCPS::DiscoveryListener* listener)
00640 {
00641   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00642   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00643        pos != limit;
00644        ++pos) {
00645     (*pos)->register_for_reader(participant, writerid, readerid, locators, listener);
00646   }
00647 }
00648 
00649 void
00650 TransportClient::unregister_for_reader(const RepoId& participant,
00651                                        const RepoId& writerid,
00652                                        const RepoId& readerid)
00653 {
00654   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00655   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00656        pos != limit;
00657        ++pos) {
00658     (*pos)->unregister_for_reader(participant, writerid, readerid);
00659   }
00660 }
00661 
00662 void
00663 TransportClient::register_for_writer(const RepoId& participant,
00664                                      const RepoId& readerid,
00665                                      const RepoId& writerid,
00666                                      const TransportLocatorSeq& locators,
00667                                      DiscoveryListener* listener)
00668 {
00669   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00670   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00671        pos != limit;
00672        ++pos) {
00673     (*pos)->register_for_writer(participant, readerid, writerid, locators, listener);
00674   }
00675 }
00676 
00677 void
00678 TransportClient::unregister_for_writer(const RepoId& participant,
00679                                        const RepoId& readerid,
00680                                        const RepoId& writerid)
00681 {
00682   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00683   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00684        pos != limit;
00685        ++pos) {
00686     (*pos)->unregister_for_writer(participant, readerid, writerid);
00687   }
00688 }
00689 
00690 bool
00691 TransportClient::send_response(const RepoId& peer,
00692                                const DataSampleHeader& header,
00693                                Message_Block_Ptr payload)
00694 {
00695   DataLinkIndex::iterator found = data_link_index_.find(peer);
00696 
00697   if (found == data_link_index_.end()) {
00698     if (DCPS_debug_level > 4) {
00699       GuidConverter converter(peer);
00700       ACE_DEBUG((LM_DEBUG,
00701                  ACE_TEXT("(%P|%t) TransportClient::send_response: ")
00702                  ACE_TEXT("no link for publication %C, ")
00703                  ACE_TEXT("not sending response.\n"),
00704                  OPENDDS_STRING(converter).c_str()));
00705     }
00706 
00707     return false;
00708   }
00709 
00710   DataLinkSet singular;
00711   singular.insert_link(found->second);
00712   singular.send_response(peer, header, move(payload));
00713   return true;
00714 }
00715 
00716 void
00717 TransportClient::send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
00718 {
00719   if (send_list.head() == 0) {
00720     return;
00721   }
00722   ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
00723   send_i(send_list, transaction_id);
00724 }
00725 
00726 SendControlStatus
00727 TransportClient::send_w_control(SendStateDataSampleList send_list,
00728                                 const DataSampleHeader& header,
00729                                 Message_Block_Ptr msg,
00730                                 const RepoId& destination)
00731 {
00732   ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
00733                    send_transaction_lock_, SEND_CONTROL_ERROR);
00734   if (send_list.head()) {
00735     send_i(send_list, 0);
00736   }
00737   return send_control_to(header, move(msg), destination);
00738 }
00739 
00740 void
00741 TransportClient::send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
00742 {
00743   if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
00744     if (transaction_id > max_transaction_id_seen_) {
00745       max_transaction_id_seen_ = transaction_id;
00746       max_transaction_tail_ = send_list.tail();
00747     }
00748     return;
00749   } else /* transaction_id == expected_transaction_id */ {
00750 
00751     DataSampleElement* cur = send_list.head();
00752     if (max_transaction_tail_ == 0) {
00753       //Means no future transaction beat this transaction into send
00754       if (transaction_id != 0)
00755         max_transaction_id_seen_ = expected_transaction_id_;
00756       // Only send this current transaction
00757       max_transaction_tail_ = send_list.tail();
00758     }
00759     DataLinkSet send_links;
00760 
00761     while (true) {
00762       // VERY IMPORTANT NOTE:
00763       //
00764       // We have to be very careful in how we deal with the current
00765       // DataSampleElement.  The issue is that once we have invoked
00766       // data_delivered() on the send_listener_ object, or we have invoked
00767       // send() on the pub_links, we can no longer access the current
00768       // DataSampleElement!Thus, we need to get the next
00769       // DataSampleElement (pointer) from the current element now,
00770       // while it is safe.
00771       DataSampleElement* next_elem;
00772       if (cur != max_transaction_tail_) {
00773         next_elem = cur->get_next_send_sample();
00774       } else {
00775         next_elem = max_transaction_tail_;
00776       }
00777       DataLinkSet_rch pub_links =
00778         (cur->get_num_subs() > 0)
00779         ? DataLinkSet_rch(links_.select_links(cur->get_sub_ids(), cur->get_num_subs()))
00780         : DataLinkSet_rch(&links_, inc_count());
00781 
00782       if (pub_links.is_nil() || pub_links->empty()) {
00783         // NOTE: This is the "local publisher id is not currently
00784         //       associated with any remote subscriber ids" case.
00785 
00786         if (DCPS_debug_level > 4) {
00787           GuidConverter converter(cur->get_pub_id());
00788           ACE_DEBUG((LM_DEBUG,
00789                      ACE_TEXT("(%P|%t) TransportClient::send_i: ")
00790                      ACE_TEXT("no links for publication %C, ")
00791                      ACE_TEXT("not sending element %@ for transaction: %d.\n"),
00792                      OPENDDS_STRING(converter).c_str(),
00793                      cur,
00794                      cur->transaction_id()));
00795         }
00796 
00797         // We tell the send_listener_ that all of the remote subscriber ids
00798         // that wanted the data (all zero of them) have indeed received
00799         // the data.
00800         cur->get_send_listener()->data_delivered(cur);
00801 
00802       } else {
00803         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
00804                   , cur), 5);
00805 
00806   #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00807 
00808         // Content-Filtering adjustment to the pub_links:
00809         // - If the sample should be filtered out of all subscriptions on a given
00810         //   DataLink, then exclude that link from the subset that we'll send to.
00811         // - If the sample should be filtered out of some (or none) of the subs,
00812         //   then record that information in the DataSampleElement so that the
00813         //   header's content_filter_entries_ can be marshaled before it's sent.
00814         if (cur->filter_out_.ptr()) {
00815           DataLinkSet_rch subset;
00816           DataLinkSet::GuardType guard(pub_links->lock());
00817           typedef DataLinkSet::MapType MapType;
00818           MapType& map = pub_links->map();
00819 
00820           for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
00821             size_t n_subs;
00822             GUIDSeq_var ti =
00823               itr->second->target_intersection(cur->get_pub_id(),
00824                                                cur->filter_out_.in(), n_subs);
00825 
00826             if (ti.ptr() == 0 || ti->length() != n_subs) {
00827               if (!subset.in()) {
00828                 subset = make_rch<DataLinkSet>();
00829               }
00830 
00831               subset->insert_link(itr->second);
00832               cur->filter_per_link_[itr->first] = ti._retn();
00833 
00834             } else {
00835               VDBG((LM_DEBUG,
00836                     "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
00837                     itr->second.in()));
00838             }
00839           }
00840 
00841           if (!subset.in()) {
00842             VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
00843             // similar to the "if (pub_links.is_nil())" case above, no links
00844             cur->get_send_listener()->data_delivered(cur);
00845             if (cur != max_transaction_tail_) {
00846               // Move on to the next DataSampleElement to send.
00847               cur = next_elem;
00848               continue;
00849             } else {
00850               break;
00851             }
00852           }
00853 
00854           pub_links = subset;
00855         }
00856 
00857   #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00858 
00859         // This will do several things, including adding to the membership
00860         // of the send_links set.  Any DataLinks added to the send_links
00861         // set will be also told about the send_start() event.  Those
00862         // DataLinks (in the pub_links set) that are already in the
00863         // send_links set will not be told about the send_start() event
00864         // since they heard about it when they were inserted into the
00865         // send_links set.
00866         send_links.send_start(pub_links.in());
00867         if (cur->get_header().message_id_ != SAMPLE_DATA) {
00868           pub_links->send_control(cur);
00869         } else {
00870           pub_links->send(cur);
00871         }
00872       }
00873       if (cur != max_transaction_tail_) {
00874         // Move on to the next DataSampleElement to send.
00875         cur = next_elem;
00876       } else {
00877         break;
00878       }
00879     }
00880 
00881     // This will inform each DataLink in the set about the stop_send() event.
00882     // It will then clear the send_links_ set.
00883     //
00884     // The reason that the send_links_ set is cleared is because we continually
00885     // reuse the same send_links_ object over and over for each call to this
00886     // send method.
00887     RepoId pub_id(repo_id_);
00888     send_links.send_stop(pub_id);
00889     if (transaction_id != 0)
00890       expected_transaction_id_ = max_transaction_id_seen_ + 1;
00891     max_transaction_tail_ = 0;
00892   }
00893 }
00894 
00895 TransportSendListener_rch
00896 TransportClient::get_send_listener()
00897 {
00898   return rchandle_from(dynamic_cast<TransportSendListener*>(this));
00899 }
00900 
00901 TransportReceiveListener_rch
00902 TransportClient::get_receive_listener()
00903 {
00904   return rchandle_from(dynamic_cast<TransportReceiveListener*>(this));
00905 }
00906 
00907 SendControlStatus
00908 TransportClient::send_control(const DataSampleHeader& header,
00909                               Message_Block_Ptr msg)
00910 {
00911   return links_.send_control(repo_id_, get_send_listener(), header, move(msg));
00912 }
00913 
00914 SendControlStatus
00915 TransportClient::send_control_to(const DataSampleHeader& header,
00916                                  Message_Block_Ptr msg,
00917                                  const RepoId& destination)
00918 {
00919   DataLinkSet singular;
00920   {
00921     ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR);
00922     DataLinkIndex::iterator found = data_link_index_.find(destination);
00923 
00924     if (found == data_link_index_.end()) {
00925       return SEND_CONTROL_ERROR;
00926     }
00927 
00928     singular.insert_link(found->second);
00929   }
00930   return singular.send_control(repo_id_, get_send_listener(), header, move(msg));
00931 }
00932 
00933 bool
00934 TransportClient::remove_sample(const DataSampleElement* sample)
00935 {
00936   return links_.remove_sample(sample);
00937 }
00938 
00939 bool
00940 TransportClient::remove_all_msgs()
00941 {
00942   return links_.remove_all_msgs(repo_id_);
00943 }
00944 
00945 }
00946 }
00947 
00948 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1