TransportClient.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "TransportClient.h"
00011 #include "TransportConfig.h"
00012 #include "TransportRegistry.h"
00013 #include "TransportExceptions.h"
00014 #include "TransportReceiveListener.h"
00015 
00016 #include "dds/DdsDcpsInfoUtilsC.h"
00017 
00018 #include "dds/DCPS/DataWriterImpl.h"
00019 #include "dds/DCPS/SendStateDataSampleList.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DCPS/Definitions.h"
00022 #include "dds/DCPS/Service_Participant.h"
00023 
00024 #include "ace/Reactor_Timer_Interface.h"
00025 
00026 #include <algorithm>
00027 #include <iterator>
00028 
00029 namespace OpenDDS {
00030 namespace DCPS {
00031 
00032 TransportClient::TransportClient()
00033   : pending_assoc_timer_(new PendingAssocTimer (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
00034   , expected_transaction_id_(1)
00035   , max_transaction_id_seen_(0)
00036   , max_transaction_tail_(0)
00037   , swap_bytes_(false)
00038   , cdr_encapsulation_(false)
00039   , reliable_(false)
00040   , durable_(false)
00041   , reverse_lock_(lock_)
00042   , repo_id_(GUID_UNKNOWN)
00043 {
00044 }
00045 
00046 TransportClient::~TransportClient()
00047 {
00048   if (Transport_debug_level > 5) {
00049     GuidConverter converter(repo_id_);
00050     ACE_DEBUG((LM_DEBUG,
00051                ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
00052                OPENDDS_STRING(converter).c_str()));
00053   }
00054 
00055   this->stop_associating();
00056 
00057   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00058 
00059   for (DataLinkIndex::iterator iter = links_waiting_for_on_deleted_callback_.begin();
00060        iter != links_waiting_for_on_deleted_callback_.end(); ++iter) {
00061     if (Transport_debug_level > 5) {
00062       GuidConverter converter(repo_id_);
00063       ACE_DEBUG((LM_DEBUG,
00064                  ACE_TEXT("(%P|%t) TransportClient[%@]::~TransportClient: about to remove_listener %C from link waiting for callback\n"),
00065                  this,
00066                  OPENDDS_STRING(converter).c_str()));
00067     }
00068     iter->second->remove_listener(repo_id_);
00069   }
00070 
00071   for (DataLinkSet::MapType::iterator iter = links_.map().begin();
00072        iter != links_.map().end(); ++iter) {
00073     if (Transport_debug_level > 5) {
00074       GuidConverter converter(repo_id_);
00075       ACE_DEBUG((LM_DEBUG,
00076                  ACE_TEXT("(%P|%t) TransportClient[%@]::~TransportClient: about to remove_listener %C\n"),
00077                  this,
00078                  OPENDDS_STRING(converter).c_str()));
00079     }
00080     iter->second->remove_listener(repo_id_);
00081   }
00082 
00083   for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
00084     for (size_t i = 0; i < impls_.size(); ++i) {
00085       impls_[i]->stop_accepting_or_connecting(this, it->second->data_.remote_id_);
00086     }
00087 
00088     pending_assoc_timer_->cancel_timer(this, it->second);
00089   }
00090 
00091   pending_assoc_timer_->wait();
00092   pending_assoc_timer_->destroy();
00093 
00094   for (OPENDDS_VECTOR(TransportImpl_rch)::iterator it = impls_.begin();
00095        it != impls_.end(); ++it) {
00096 
00097     (*it)->detach_client(this);
00098   }
00099 }
00100 
00101 void
00102 TransportClient::enable_transport(bool reliable, bool durable)
00103 {
00104   // Search for a TransportConfig to use:
00105   TransportConfig_rch tc;
00106 
00107   // 1. If this object is an Entity, check if a TransportConfig has been
00108   //    bound either directly to this entity or to a parent entity.
00109   for (const EntityImpl* ent = dynamic_cast<const EntityImpl*>(this);
00110        ent && tc.is_nil(); ent = ent->parent()) {
00111     tc = ent->transport_config();
00112   }
00113 
00114   if (tc.is_nil()) {
00115     TransportRegistry* const reg = TransportRegistry::instance();
00116     // 2. Check for a TransportConfig that is the default for this Domain.
00117     tc = reg->domain_default_config(domain_id());
00118 
00119     if (tc.is_nil()) {
00120       // 3. Use the global_config if one has been set.
00121       tc = reg->global_config();
00122 
00123       if (!tc.is_nil() && tc->instances_.empty()
00124           && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
00125         // 4. Set the "fallback option" if the global_config is empty.
00126         //    (only applies if the user hasn't changed the global config)
00127         tc = reg->fix_empty_default();
00128       }
00129     }
00130   }
00131 
00132   if (tc.is_nil()) {
00133     ACE_ERROR((LM_ERROR,
00134                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00135                ACE_TEXT("No TransportConfig found.\n")));
00136     throw Transport::NotConfigured();
00137   }
00138 
00139   enable_transport_using_config(reliable, durable, tc);
00140 }
00141 
00142 void
00143 TransportClient::enable_transport_using_config(bool reliable, bool durable,
00144                                                const TransportConfig_rch& tc)
00145 {
00146   swap_bytes_ = tc->swap_bytes_;
00147   cdr_encapsulation_ = false;
00148   reliable_ = reliable;
00149   durable_ = durable;
00150   unsigned long duration = tc->passive_connect_duration_;
00151   if (duration == 0) {
00152     duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION;
00153     if (DCPS_debug_level) {
00154       ACE_DEBUG((LM_WARNING,
00155         ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
00156         ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
00157         ACE_TEXT("default value\n")));
00158     }
00159   }
00160   passive_connect_duration_.set(duration / 1000, (duration % 1000) * 1000);
00161 
00162   const size_t n = tc->instances_.size();
00163 
00164   for (size_t i = 0; i < n; ++i) {
00165     TransportInst_rch inst = tc->instances_[i];
00166 
00167     if (check_transport_qos(*inst.in())) {
00168       TransportImpl_rch impl = inst->impl();
00169 
00170       if (!impl.is_nil()) {
00171         impl->attach_client(this);
00172         impls_.push_back(impl);
00173         const CORBA::ULong len = conn_info_.length();
00174         conn_info_.length(len + 1);
00175         impl->connection_info(conn_info_[len]);
00176         cdr_encapsulation_ |= inst->requires_cdr();
00177       }
00178     }
00179   }
00180 
00181   if (impls_.empty()) {
00182     ACE_ERROR((LM_ERROR,
00183                ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00184                ACE_TEXT("No TransportImpl could be created.\n")));
00185     throw Transport::NotConfigured();
00186   }
00187 }
00188 
00189 void
00190 TransportClient::transport_detached(TransportImpl* which)
00191 {
00192 
00193   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00194 
00195   // Remove any DataLinks created by the 'which' TransportImpl from our local
00196   // data structures (both links_ and data_link_index_).
00197   for (DataLinkSet::MapType::iterator iter = links_.map().begin();
00198        iter != links_.map().end();) {
00199     TransportImpl_rch impl = iter->second->impl();
00200 
00201     if (impl.in() == which) {
00202       for (DataLinkIndex::iterator it2 = data_link_index_.begin();
00203            it2 != data_link_index_.end();) {
00204         if (it2->second.in() == iter->second.in()) {
00205           data_link_index_.erase(it2++);
00206 
00207         } else {
00208           ++it2;
00209         }
00210       }
00211       if (DCPS_debug_level > 4) {
00212         GuidConverter converter(repo_id_);
00213         ACE_DEBUG((LM_DEBUG,
00214                    ACE_TEXT("(%P|%t) TransportClient::transport_detached: calling remove_listener %C on link[%@]\n"),
00215                    OPENDDS_STRING(converter).c_str(),
00216                    iter->second.in()));
00217       }
00218       iter->second->remove_listener(repo_id_);
00219       links_.map().erase(iter++);
00220 
00221     } else {
00222       ++iter;
00223     }
00224   }
00225 
00226   // Remove the 'which' TransportImpl from the impls_ list
00227   for (OPENDDS_VECTOR(TransportImpl_rch)::iterator it = impls_.begin();
00228        it != impls_.end(); ++it) {
00229     if (it->in() == which) {
00230       impls_.erase(it);
00231 
00232       for (PendingMap::iterator it2 = pending_.begin();
00233            it2 != pending_.end(); ++it2) {
00234         which->stop_accepting_or_connecting(this, it2->first);
00235       }
00236 
00237       return;
00238     }
00239   }
00240 }
00241 
00242 bool
00243 TransportClient::associate(const AssociationData& data, bool active)
00244 {
00245   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
00246 
00247   repo_id_ = get_repo_id();
00248 
00249   if (impls_.empty()) {
00250     if (DCPS_debug_level) {
00251       GuidConverter writer_converter(this->repo_id_);
00252       GuidConverter reader_converter(data.remote_id_);
00253       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00254                  ACE_TEXT("local %C remote %C no available impls\n"),
00255                  OPENDDS_STRING(writer_converter).c_str(),
00256                  OPENDDS_STRING(reader_converter).c_str()));
00257     }
00258     return false;
00259   }
00260 
00261   bool all_impls_shut_down = true;
00262   for (size_t i = 0; i < impls_.size(); ++i) {
00263     if (!impls_.at(i)->is_shut_down()) {
00264       all_impls_shut_down = false;
00265       break;
00266     }
00267   }
00268 
00269   if (all_impls_shut_down) {
00270     if (DCPS_debug_level) {
00271       GuidConverter writer_converter(this->repo_id_);
00272       GuidConverter reader_converter(data.remote_id_);
00273       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00274                  ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
00275                  OPENDDS_STRING(writer_converter).c_str(),
00276                  OPENDDS_STRING(reader_converter).c_str()));
00277     }
00278     return false;
00279   }
00280 
00281   PendingMap::iterator iter = pending_.find(data.remote_id_);
00282 
00283   if (iter == pending_.end()) {
00284     RepoId remote_copy(data.remote_id_);
00285     iter = pending_.insert(std::make_pair(remote_copy, new PendingAssoc())).first;
00286 
00287     GuidConverter tc_assoc(this->repo_id_);
00288     GuidConverter remote_new(data.remote_id_);
00289     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
00290               "between %C and remote %C\n",
00291               OPENDDS_STRING(tc_assoc).c_str(),
00292               OPENDDS_STRING(remote_new).c_str()), 0);
00293   } else {
00294     if (iter->second->removed_) {
00295       iter->second->removed_ = false;
00296       GuidConverter tc_assoc(this->repo_id_);
00297       GuidConverter remote_new(data.remote_id_);
00298       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate found existing PendingAssoc "
00299                 "between %C and remote %C, set removed to false to continue using this pending association\n",
00300                 OPENDDS_STRING(tc_assoc).c_str(),
00301                 OPENDDS_STRING(remote_new).c_str()), 0);
00302     } else {
00303       ACE_ERROR((LM_ERROR,
00304                  ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
00305                  ACE_TEXT("already associating with remote.\n")));
00306 
00307       return false;
00308     }
00309   }
00310 
00311   PendingAssoc& pend = *(iter->second);
00312   pend.active_ = active;
00313   pend.impls_.clear();
00314   pend.blob_index_ = 0;
00315   pend.data_ = data;
00316   pend.attribs_.local_id_ = repo_id_;
00317   pend.attribs_.priority_ = get_priority_value(data);
00318   pend.attribs_.local_reliable_ = reliable_;
00319   pend.attribs_.local_durable_ = durable_;
00320 
00321   if (active) {
00322     pend.impls_.reserve(impls_.size());
00323     std::reverse_copy(impls_.begin(), impls_.end(),
00324                       std::back_inserter(pend.impls_));
00325 
00326     pend.initiate_connect(this, guard);
00327 
00328     //Revisit if this should be used instead of always returning true.
00329     //return pend.initiate_connect(this, guard);
00330     return true;
00331 
00332   } else { // passive
00333 
00334     // call accept_datalink for each impl / blob pair of the same type
00335     for (size_t i = 0; i < impls_.size(); ++i) {
00336       pend.impls_.push_back(impls_[i]);
00337       const OPENDDS_STRING type = impls_[i]->transport_type();
00338 
00339       for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
00340         if (data.remote_data_[j].transport_type.in() == type) {
00341           const TransportImpl::RemoteTransport remote = {
00342             data.remote_id_, data.remote_data_[j].data,
00343             data.publication_transport_priority_,
00344             data.remote_reliable_, data.remote_durable_};
00345 
00346           TransportImpl::AcceptConnectResult res;
00347           {
00348             // This thread acquired lock_ at the beginning of this method.  Calling accept_datalink might require getting the lock for the transport's reactor.
00349             // If the current thread is not an event handler for the transport's reactor, e.g., the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
00350             // Event handlers in the transport reactor may call passive_connection which calls use_datalink which acquires lock_.  The locking order in this case is transport reactor lock -> lock_.
00351             // To avoid deadlock, we must reverse the lock.
00352             ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
00353             res = impls_[i]->accept_datalink(remote, pend.attribs_, this);
00354           }
00355 
00356           //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
00357           PendingMap::iterator iter_after_accept = pending_.find(data.remote_id_);
00358 
00359           if (iter_after_accept == pending_.end()) {
00360             //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
00361             //active side connection and completed, thus pend was removed from pending_.  Can return true.
00362             return true;
00363           }
00364 
00365           if (res.success_ && !res.link_.is_nil()) {
00366 
00367             use_datalink_i(data.remote_id_, res.link_, guard);
00368 
00369             return true;
00370           }
00371         }
00372       }
00373 
00374       //pend.impls_.push_back(impls_[i]);
00375     }
00376 
00377     pending_assoc_timer_->schedule_timer(this, iter->second);
00378   }
00379 
00380   return true;
00381 }
00382 
00383 int
00384 TransportClient::PendingAssoc::handle_timeout(const ACE_Time_Value&,
00385                                               const void* arg)
00386 {
00387   TransportClient* tc = static_cast<TransportClient*>(const_cast<void*>(arg));
00388 
00389   tc->use_datalink(data_.remote_id_, 0);
00390 
00391   return 0;
00392 }
00393 
00394 bool
00395 TransportClient::initiate_connect_i(TransportImpl::AcceptConnectResult& result,
00396                                     const TransportImpl_rch impl,
00397                                     const TransportImpl::RemoteTransport& remote,
00398                                     const TransportImpl::ConnectionAttribs& attribs_,
00399                                     Guard& guard)
00400 {
00401   if (!guard.locked()) {
00402     //don't own the lock_ so can't release it...shouldn't happen
00403     GuidConverter local(repo_id_);
00404     GuidConverter remote_conv(remote.repo_id_);
00405     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00406                         "guard was not locked, return false - initiate_connect_i between local %C and remote %C unsuccessful\n",
00407                         OPENDDS_STRING(local).c_str(),
00408                         OPENDDS_STRING(remote_conv).c_str()), 0);
00409     return false;
00410   }
00411 
00412   {
00413     //can't call connect while holding lock due to possible reactor deadlock
00414     guard.release();
00415     GuidConverter local(repo_id_);
00416     GuidConverter remote_conv(remote.repo_id_);
00417     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00418                         "attempt to connect_datalink between local %C and remote %C\n",
00419                         OPENDDS_STRING(local).c_str(),
00420                         OPENDDS_STRING(remote_conv).c_str()), 0);
00421     result = impl->connect_datalink(remote, attribs_, this);
00422     if (!result.success_) {
00423       if (DCPS_debug_level) {
00424         GuidConverter writer_converter(repo_id_);
00425         GuidConverter reader_converter(remote.repo_id_);
00426         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00427                    ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
00428                    OPENDDS_STRING(writer_converter).c_str(),
00429                    OPENDDS_STRING(reader_converter).c_str()));
00430       }
00431       return false;
00432     }
00433     guard.acquire();
00434   }
00435 
00436   //Check to make sure the pending assoc still exists in the map and hasn't been slated for removal
00437   //figure out how to respond to these possible results that occurred while lock was released to connect
00438   PendingMap::iterator iter = pending_.find(remote.repo_id_);
00439 
00440   if (iter == pending_.end()) {
00441     GuidConverter local(repo_id_);
00442     GuidConverter remote_conv(remote.repo_id_);
00443     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00444                         "cannot find pending association after connecting datalink between local %C and remote %C\n",
00445                         OPENDDS_STRING(local).c_str(),
00446                         OPENDDS_STRING(remote_conv).c_str()), 0);
00447     return false;
00448     //log some sort of error message...
00449     //PendingAssoc's are only erased from pending_ in use_datalink_i after
00450 
00451   } else {
00452     if (iter->second->removed_) {
00453       GuidConverter local(repo_id_);
00454       GuidConverter remote_conv(remote.repo_id_);
00455       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00456                           "pending association marked for removal already, after connecting datalink between local %C and remote %C\n",
00457                           OPENDDS_STRING(local).c_str(),
00458                           OPENDDS_STRING(remote_conv).c_str()), 0);
00459       //this occurs if the transport client was told to disassociate while connecting
00460       //disassociate cleans up everything except this local AcceptConnectResult whose destructor
00461       //should take care of it because link has not been shifted into links_ by use_datalink_i
00462       return false;
00463 
00464     }
00465 
00466   }
00467   GuidConverter local(repo_id_);
00468   GuidConverter remote_conv(remote.repo_id_);
00469   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00470                       "connection between local %C and remote %C initiation successful\n",
00471                       OPENDDS_STRING(local).c_str(),
00472                       OPENDDS_STRING(remote_conv).c_str()), 0);
00473   return true;
00474 }
00475 
00476 bool
00477 TransportClient::PendingAssoc::initiate_connect(TransportClient* tc,
00478                                                 Guard& guard)
00479 {
00480   GuidConverter local(tc->repo_id_);
00481   GuidConverter remote(this->data_.remote_id_);
00482   VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
00483                       "between %C and remote %C\n",
00484                       OPENDDS_STRING(local).c_str(),
00485                       OPENDDS_STRING(remote).c_str()), 0);
00486   // find the next impl / blob entry that have matching types
00487   while (!impls_.empty()) {
00488     const TransportImpl_rch& impl = impls_.back();
00489     const OPENDDS_STRING type = impl->transport_type();
00490 
00491     for (; blob_index_ < data_.remote_data_.length(); ++blob_index_) {
00492       if (data_.remote_data_[blob_index_].transport_type.in() == type) {
00493         const TransportImpl::RemoteTransport remote = {
00494           data_.remote_id_, data_.remote_data_[blob_index_].data,
00495           data_.publication_transport_priority_,
00496           data_.remote_reliable_, data_.remote_durable_};
00497 
00498         TransportImpl::AcceptConnectResult res;
00499         GuidConverter tmp_local(tc->repo_id_);
00500         GuidConverter tmp_remote(this->data_.remote_id_);
00501         if (!tc->initiate_connect_i(res, impl, remote, attribs_, guard)) {
00502           //tc init connect returned false there is no PendingAssoc left in map because use_datalink_i finished elsewhere
00503           //so don't do anything further with pend and return success or failure up to tc's associate
00504           if (res.success_ && !this->removed_) {
00505             GuidConverter local(tc->repo_id_);
00506             GuidConverter remote(this->data_.remote_id_);
00507             VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) PendingAssoc::initiate_connect - ")
00508                                 ACE_TEXT("between %C and remote %C success\n"),
00509                                 OPENDDS_STRING(local).c_str(),
00510                                 OPENDDS_STRING(remote).c_str()), 0);
00511             return true;
00512           }
00513 
00514           VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
00515                               "between %C and remote %C unsuccessful\n",
00516                               OPENDDS_STRING(tmp_local).c_str(),
00517                               OPENDDS_STRING(tmp_remote).c_str()), 0);
00518           return false;
00519         }
00520 
00521         if (res.success_) {
00522 
00523           ++blob_index_;
00524 
00525           if (!res.link_.is_nil()) {
00526 
00527             tc->use_datalink_i(data_.remote_id_, res.link_, guard);
00528           } else {
00529             GuidConverter local(tc->repo_id_);
00530             GuidConverter remote(this->data_.remote_id_);
00531             VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
00532                                 "resulting link from initiate_connect_i (local: %C to remote: %C) was nil\n",
00533                                 OPENDDS_STRING(local).c_str(),
00534                                 OPENDDS_STRING(remote).c_str()), 0);
00535           }
00536 
00537           return true;
00538         } else {
00539           GuidConverter local(tc->repo_id_);
00540           GuidConverter remote(this->data_.remote_id_);
00541           VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
00542                               "result of initiate_connect_i (local: %C to remote: %C) was not success \n",
00543                               OPENDDS_STRING(local).c_str(),
00544                               OPENDDS_STRING(remote).c_str()), 0);
00545         }
00546       }
00547     }
00548 
00549     impls_.pop_back();
00550     blob_index_ = 0;
00551   }
00552 
00553   return false;
00554 }
00555 
00556 void
00557 TransportClient::use_datalink(const RepoId& remote_id,
00558                               const DataLink_rch& link)
00559 {
00560   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00561 
00562   use_datalink_i(remote_id, link, guard);
00563 }
00564 
00565 void
00566 TransportClient::use_datalink_i(const RepoId& remote_id_ref,
00567                                 const DataLink_rch& link,
00568                                 Guard& guard)
00569 {
00570   //try to make a local copy of remote_id to use in calls
00571   //because the reference could be invalidated if the caller
00572   //reference location is deleted (i.e. in stop_accepting_or_connecting
00573   //if use_datalink_i was called from passive_connection)
00574   //Does changing this from a reference to a local affect anything going forward?
00575   RepoId remote_id(remote_id_ref);
00576 
00577   GuidConverter peerId_conv(remote_id);
00578   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00579             "TransportClient(%@) using datalink[%@] from %C\n",
00580             this,
00581             link.in(),
00582             OPENDDS_STRING(peerId_conv).c_str()), 0);
00583 
00584   PendingMap::iterator iter = pending_.find(remote_id);
00585 
00586   if (iter == pending_.end()) {
00587     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00588                         "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
00589                         this,
00590                         link.in(),
00591                         OPENDDS_STRING(peerId_conv).c_str()), 0);
00592     return;
00593   }
00594 
00595   PendingAssoc* pend = iter->second;
00596   const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
00597   bool ok = false;
00598 
00599   if (pend->removed_) { // no-op
00600     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00601                         "TransportClient(%@) using datalink[%@] pending association to remote %C was removed\n",
00602                         this,
00603                         link.in(),
00604                         OPENDDS_STRING(peerId_conv).c_str()), 0);
00605     return;
00606   } else if (link.is_nil()) {
00607 
00608     if (pend->active_ && pend->initiate_connect(this, guard)) {
00609       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00610                           "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect\n",
00611                           this,
00612                           link.in(),
00613                           OPENDDS_STRING(peerId_conv).c_str()), 0);
00614       return;
00615     }
00616 
00617   } else { // link is ready to use
00618     VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00619               "TransportClient(%@) about to add_link[%@] to remote: %C\n",
00620               this,
00621               link.in(),
00622               OPENDDS_STRING(peerId_conv).c_str()), 0);
00623 
00624     add_link(link, remote_id);
00625     ok = true;
00626   }
00627 
00628   // either link is valid or assoc failed, clean up pending object
00629   // for passive side processing
00630   if (!pend->active_) {
00631 
00632     for (size_t i = 0; i < pend->impls_.size(); ++i) {
00633       pend->impls_[i]->stop_accepting_or_connecting(this, pend->data_.remote_id_);
00634     }
00635   }
00636 
00637   pending_.erase(iter);
00638   pend->removed_ = true;
00639 
00640   guard.release();
00641 
00642   pending_assoc_timer_->cancel_timer(this, pend);
00643   pending_assoc_timer_->delete_pending_assoc(pend);
00644 
00645   transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
00646 }
00647 
00648 void
00649 TransportClient::add_link(const DataLink_rch& link, const RepoId& peer)
00650 {
00651   links_.insert_link(link.in());
00652   data_link_index_[peer] = link;
00653 
00654   TransportReceiveListener* trl = get_receive_listener();
00655 
00656   if (trl) {
00657     link->make_reservation(peer, repo_id_, trl);
00658 
00659   } else {
00660     link->make_reservation(peer, repo_id_, get_send_listener());
00661   }
00662 }
00663 
00664 void
00665 TransportClient::on_notification_of_connection_deletion(const RepoId& peerId)
00666 {
00667   DBG_ENTRY_LVL("TransportClient","on_notification_of_connection_deletion",6);
00668 
00669   GuidConverter peerId_conv(peerId);
00670   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::on_notification_of_connection_deletion "
00671             "TransportClient(%@) connection to %C deleted\n",
00672             this,
00673             OPENDDS_STRING(peerId_conv).c_str()), 5);
00674 
00675   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00676 
00677   const DataLinkIndex::iterator found = links_waiting_for_on_deleted_callback_.find(peerId);
00678 
00679   if (found == links_waiting_for_on_deleted_callback_.end()) {
00680     if (DCPS_debug_level > 4) {
00681       const GuidConverter converter(peerId);
00682       ACE_DEBUG((LM_DEBUG,
00683                  ACE_TEXT("(%P|%t) TransportClient::on_notification_of_connection_deletion: ")
00684                  ACE_TEXT("no link for remote peer %C\n"),
00685                  OPENDDS_STRING(converter).c_str()));
00686     }
00687 
00688     return;
00689   }
00690 
00691   const DataLink_rch link = found->second;
00692 
00693   //now that an _rch is created for the link, remove the iterator from links_waiting_for_on_deleted_callback_ while still holding lock
00694   links_waiting_for_on_deleted_callback_.erase(found);
00695 
00696   link->remove_listener(repo_id_);
00697 }
00698 
00699 void
00700 TransportClient::stop_associating()
00701 {
00702   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00703 
00704   PendingMap::iterator iter = pending_.begin();
00705 
00706   while (iter != pending_.end()) {
00707     iter->second->removed_ = true;
00708     ++iter;
00709   }
00710 }
00711 
00712 void
00713 TransportClient::stop_associating(const GUID_t* repos, CORBA::ULong length)
00714 {
00715   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00716 
00717   if (repos == 0 || length == 0) {
00718     return;
00719   } else {
00720     for (CORBA::ULong i = 0; i < length; ++i) {
00721       PendingMap::iterator iter = pending_.find(repos[i]);
00722 
00723       if (iter != pending_.end()) {
00724         iter->second->removed_ = true;
00725       }
00726     }
00727   }
00728 }
00729 
00730 void
00731 TransportClient::send_final_acks()
00732 {
00733   links_.send_final_acks (get_repo_id());
00734 }
00735 
00736 void
00737 TransportClient::disassociate(const RepoId& peerId)
00738 {
00739   GuidConverter peerId_conv(peerId);
00740   VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
00741             "TransportClient(%@) disassociating from %C\n",
00742             this,
00743             OPENDDS_STRING(peerId_conv).c_str()), 5);
00744 
00745   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00746 
00747   const PendingMap::iterator iter = pending_.find(peerId);
00748 
00749   if (iter != pending_.end()) {
00750     iter->second->removed_ = true;
00751     return;
00752   }
00753 
00754   const DataLinkIndex::iterator found = data_link_index_.find(peerId);
00755 
00756   if (found == data_link_index_.end()) {
00757     if (DCPS_debug_level > 4) {
00758       const GuidConverter converter(peerId);
00759       ACE_DEBUG((LM_DEBUG,
00760                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00761                  ACE_TEXT("no link for remote peer %C\n"),
00762                  OPENDDS_STRING(converter).c_str()));
00763     }
00764 
00765     return;
00766   }
00767 
00768   const DataLink_rch link = found->second;
00769 
00770   //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
00771   //otherwise it could be removed in transport_detached()
00772   data_link_index_.erase(found);
00773   DataLinkSetMap released;
00774 
00775     if (DCPS_debug_level > 4) {
00776       ACE_DEBUG((LM_DEBUG,
00777                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00778                  ACE_TEXT("about to release_reservations for link[%@] \n"),
00779                  link.in()));
00780     }
00781 
00782     link->release_reservations(peerId, repo_id_, released);
00783 
00784   if (!released.empty()) {
00785 
00786     if (DCPS_debug_level > 4) {
00787       ACE_DEBUG((LM_DEBUG,
00788                  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00789                  ACE_TEXT("about to remove_link[%@] from links_\n"),
00790                  link.in()));
00791     }
00792     links_.remove_link(link);
00793 
00794     if (link->issues_on_deleted_callback()) {
00795       if (DCPS_debug_level > 4) {
00796         GuidConverter converter(repo_id_);
00797         ACE_DEBUG((LM_DEBUG,
00798                    ACE_TEXT("(%P|%t) TransportClient::disassociate: wait for connection deleted callback for %C on link[%@]\n"),
00799                    OPENDDS_STRING(converter).c_str(),
00800                    link.in()));
00801       }
00802       links_waiting_for_on_deleted_callback_[peerId] = link;
00803     } else {
00804       if (DCPS_debug_level > 4) {
00805         GuidConverter converter(repo_id_);
00806         ACE_DEBUG((LM_DEBUG,
00807                    ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
00808                    OPENDDS_STRING(converter).c_str(),
00809                    link.in()));
00810       }
00811       // Datalink is no longer used for any remote peer by this TransportClient
00812       link->remove_listener(repo_id_);
00813     }
00814   }
00815 }
00816 
00817 void
00818 TransportClient::register_for_reader(const RepoId& participant,
00819                                      const RepoId& writerid,
00820                                      const RepoId& readerid,
00821                                      const TransportLocatorSeq& locators,
00822                                      OpenDDS::DCPS::DiscoveryListener* listener)
00823 {
00824   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00825   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00826        pos != limit;
00827        ++pos) {
00828     (*pos)->register_for_reader(participant, writerid, readerid, locators, listener);
00829   }
00830 }
00831 
00832 void
00833 TransportClient::unregister_for_reader(const RepoId& participant,
00834                                        const RepoId& writerid,
00835                                        const RepoId& readerid)
00836 {
00837   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00838   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00839        pos != limit;
00840        ++pos) {
00841     (*pos)->unregister_for_reader(participant, writerid, readerid);
00842   }
00843 }
00844 
00845 void
00846 TransportClient::register_for_writer(const RepoId& participant,
00847                                      const RepoId& readerid,
00848                                      const RepoId& writerid,
00849                                      const TransportLocatorSeq& locators,
00850                                      DiscoveryListener* listener)
00851 {
00852   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00853   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00854        pos != limit;
00855        ++pos) {
00856     (*pos)->register_for_writer(participant, readerid, writerid, locators, listener);
00857   }
00858 }
00859 
00860 void
00861 TransportClient::unregister_for_writer(const RepoId& participant,
00862                                        const RepoId& readerid,
00863                                        const RepoId& writerid)
00864 {
00865   ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00866   for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00867        pos != limit;
00868        ++pos) {
00869     (*pos)->unregister_for_writer(participant, readerid, writerid);
00870   }
00871 }
00872 
00873 bool
00874 TransportClient::send_response(const RepoId& peer,
00875                                const DataSampleHeader& header,
00876                                ACE_Message_Block* payload)
00877 {
00878   DataLinkIndex::iterator found = data_link_index_.find(peer);
00879 
00880   if (found == data_link_index_.end()) {
00881     payload->release();
00882 
00883     if (DCPS_debug_level > 4) {
00884       GuidConverter converter(peer);
00885       ACE_DEBUG((LM_DEBUG,
00886                  ACE_TEXT("(%P|%t) TransportClient::send_response: ")
00887                  ACE_TEXT("no link for publication %C, ")
00888                  ACE_TEXT("not sending response.\n"),
00889                  OPENDDS_STRING(converter).c_str()));
00890     }
00891 
00892     return false;
00893   }
00894 
00895   DataLinkSet singular;
00896   singular.insert_link(found->second.in());
00897   singular.send_response(peer, header, payload);
00898   return true;
00899 }
00900 
00901 void
00902 TransportClient::send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
00903 {
00904   if (send_list.head() == 0) {
00905     return;
00906   }
00907   ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
00908   send_i(send_list, transaction_id);
00909 }
00910 
00911 SendControlStatus
00912 TransportClient::send_w_control(SendStateDataSampleList send_list,
00913                                 const DataSampleHeader& header,
00914                                 ACE_Message_Block* msg,
00915                                 const RepoId& destination)
00916 {
00917   ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
00918                    send_transaction_lock_, SEND_CONTROL_ERROR);
00919   if (send_list.head()) {
00920     send_i(send_list, 0);
00921   }
00922   return send_control_to(header, msg, destination);
00923 }
00924 
00925 void
00926 TransportClient::send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
00927 {
00928   if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
00929     if (transaction_id > max_transaction_id_seen_) {
00930       max_transaction_id_seen_ = transaction_id;
00931       max_transaction_tail_ = send_list.tail();
00932     }
00933     return;
00934   } else /* transaction_id == expected_transaction_id */ {
00935 
00936     DataSampleElement* cur = send_list.head();
00937     if (max_transaction_tail_ == 0) {
00938       //Means no future transaction beat this transaction into send
00939       if (transaction_id != 0)
00940         max_transaction_id_seen_ = expected_transaction_id_;
00941       // Only send this current transaction
00942       max_transaction_tail_ = send_list.tail();
00943     }
00944     DataLinkSet send_links;
00945 
00946     while (true) {
00947       // VERY IMPORTANT NOTE:
00948       //
00949       // We have to be very careful in how we deal with the current
00950       // DataSampleElement.  The issue is that once we have invoked
00951       // data_delivered() on the send_listener_ object, or we have invoked
00952       // send() on the pub_links, we can no longer access the current
00953       // DataSampleElement!Thus, we need to get the next
00954       // DataSampleElement (pointer) from the current element now,
00955       // while it is safe.
00956       DataSampleElement* next_elem;
00957       if (cur != max_transaction_tail_) {
00958         next_elem = cur->get_next_send_sample();
00959       } else {
00960         next_elem = max_transaction_tail_;
00961       }
00962       DataLinkSet_rch pub_links =
00963         (cur->get_num_subs() > 0)
00964         ? links_.select_links(cur->get_sub_ids(), cur->get_num_subs())
00965         : DataLinkSet_rch(&links_, false);
00966 
00967       if (pub_links.is_nil() || pub_links->empty()) {
00968         // NOTE: This is the "local publisher id is not currently
00969         //       associated with any remote subscriber ids" case.
00970 
00971         if (DCPS_debug_level > 4) {
00972           GuidConverter converter(cur->get_pub_id());
00973           ACE_DEBUG((LM_DEBUG,
00974                      ACE_TEXT("(%P|%t) TransportClient::send_i: ")
00975                      ACE_TEXT("no links for publication %C, ")
00976                      ACE_TEXT("not sending element %@ for transaction: %d.\n"),
00977                      OPENDDS_STRING(converter).c_str(),
00978                      cur,
00979                      cur->transaction_id()));
00980         }
00981 
00982         // We tell the send_listener_ that all of the remote subscriber ids
00983         // that wanted the data (all zero of them) have indeed received
00984         // the data.
00985         cur->get_send_listener()->data_delivered(cur);
00986 
00987       } else {
00988         VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
00989                   , cur), 5);
00990 
00991   #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00992 
00993         // Content-Filtering adjustment to the pub_links:
00994         // - If the sample should be filtered out of all subscriptions on a given
00995         //   DataLink, then exclude that link from the subset that we'll send to.
00996         // - If the sample should be filtered out of some (or none) of the subs,
00997         //   then record that information in the DataSampleElement so that the
00998         //   header's content_filter_entries_ can be marshaled before it's sent.
00999         if (cur->filter_out_.ptr()) {
01000           DataLinkSet_rch subset;
01001           DataLinkSet::GuardType guard(pub_links->lock());
01002           typedef DataLinkSet::MapType MapType;
01003           MapType& map = pub_links->map();
01004 
01005           for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
01006             size_t n_subs;
01007             GUIDSeq_var ti =
01008               itr->second->target_intersection(cur->get_pub_id(),
01009                                                cur->filter_out_.in(), n_subs);
01010 
01011             if (ti.ptr() == 0 || ti->length() != n_subs) {
01012               if (!subset.in()) {
01013                 subset = new DataLinkSet;
01014               }
01015 
01016               subset->insert_link(itr->second.in());
01017               cur->filter_per_link_[itr->first] = ti._retn();
01018 
01019             } else {
01020               VDBG((LM_DEBUG,
01021                     "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
01022                     itr->second.in()));
01023             }
01024           }
01025 
01026           if (!subset.in()) {
01027             VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
01028             // similar to the "if (pub_links.is_nil())" case above, no links
01029             cur->get_send_listener()->data_delivered(cur);
01030             if (cur != max_transaction_tail_) {
01031               // Move on to the next DataSampleElement to send.
01032               cur = next_elem;
01033               continue;
01034             } else {
01035               break;
01036             }
01037           }
01038 
01039           pub_links = subset;
01040         }
01041 
01042   #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
01043 
01044         // This will do several things, including adding to the membership
01045         // of the send_links set.  Any DataLinks added to the send_links
01046         // set will be also told about the send_start() event.  Those
01047         // DataLinks (in the pub_links set) that are already in the
01048         // send_links set will not be told about the send_start() event
01049         // since they heard about it when they were inserted into the
01050         // send_links set.
01051         send_links.send_start(pub_links.in());
01052         if (cur->get_header().message_id_ != SAMPLE_DATA) {
01053           pub_links->send_control(cur);
01054         } else {
01055           pub_links->send(cur);
01056         }
01057       }
01058       if (cur != max_transaction_tail_) {
01059         // Move on to the next DataSampleElement to send.
01060         cur = next_elem;
01061       } else {
01062         break;
01063       }
01064     }
01065 
01066     // This will inform each DataLink in the set about the stop_send() event.
01067     // It will then clear the send_links_ set.
01068     //
01069     // The reason that the send_links_ set is cleared is because we continually
01070     // reuse the same send_links_ object over and over for each call to this
01071     // send method.
01072     RepoId pub_id(this->repo_id_);
01073     send_links.send_stop(pub_id);
01074     if (transaction_id != 0)
01075       expected_transaction_id_ = max_transaction_id_seen_ + 1;
01076     max_transaction_tail_ = 0;
01077   }
01078 }
01079 
01080 TransportSendListener*
01081 TransportClient::get_send_listener()
01082 {
01083   return dynamic_cast<TransportSendListener*>(this);
01084 }
01085 
01086 TransportReceiveListener*
01087 TransportClient::get_receive_listener()
01088 {
01089   return dynamic_cast<TransportReceiveListener*>(this);
01090 }
01091 
01092 SendControlStatus
01093 TransportClient::send_control(const DataSampleHeader& header,
01094                               ACE_Message_Block* msg)
01095 {
01096   TransportSendListener* listener = get_send_listener();
01097 
01098   return links_.send_control(repo_id_, listener, header, msg);
01099 }
01100 
01101 SendControlStatus
01102 TransportClient::send_control_to(const DataSampleHeader& header,
01103                                  ACE_Message_Block* msg,
01104                                  const RepoId& destination)
01105 {
01106   DataLinkSet singular;
01107   {
01108     ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR);
01109     DataLinkIndex::iterator found = data_link_index_.find(destination);
01110 
01111     if (found == data_link_index_.end()) {
01112       msg->release();
01113       return SEND_CONTROL_ERROR;
01114     }
01115 
01116     singular.insert_link(found->second.in());
01117   }
01118   return singular.send_control(repo_id_, get_send_listener(), header, msg,
01119                                &links_.tsce_allocator());
01120 }
01121 
01122 bool
01123 TransportClient::remove_sample(const DataSampleElement* sample)
01124 {
01125   return links_.remove_sample(sample);
01126 }
01127 
01128 bool
01129 TransportClient::remove_all_msgs()
01130 {
01131   return links_.remove_all_msgs(repo_id_);
01132 }
01133 
01134 }
01135 }

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7