00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "TransportClient.h"
00011 #include "TransportConfig.h"
00012 #include "TransportRegistry.h"
00013 #include "TransportExceptions.h"
00014 #include "TransportReceiveListener.h"
00015
00016 #include "dds/DdsDcpsInfoUtilsC.h"
00017
00018 #include "dds/DCPS/DataWriterImpl.h"
00019 #include "dds/DCPS/SendStateDataSampleList.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DCPS/Definitions.h"
00022
00023 #include "ace/Reactor_Timer_Interface.h"
00024
00025 #include <algorithm>
00026 #include <iterator>
00027
00028 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00029
00030 namespace OpenDDS {
00031 namespace DCPS {
00032
00033 TransportClient::TransportClient()
00034 : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
00035 , expected_transaction_id_(1)
00036 , max_transaction_id_seen_(0)
00037 , max_transaction_tail_(0)
00038 , swap_bytes_(false)
00039 , cdr_encapsulation_(false)
00040 , reliable_(false)
00041 , durable_(false)
00042 , reverse_lock_(lock_)
00043 , repo_id_(GUID_UNKNOWN)
00044 {
00045 }
00046
00047 TransportClient::~TransportClient()
00048 {
00049 if (Transport_debug_level > 5) {
00050 GuidConverter converter(repo_id_);
00051 ACE_DEBUG((LM_DEBUG,
00052 ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
00053 OPENDDS_STRING(converter).c_str()));
00054 }
00055
00056 stop_associating();
00057
00058 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00059
00060 for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
00061 for (size_t i = 0; i < impls_.size(); ++i) {
00062 impls_[i]->stop_accepting_or_connecting(*this, it->second->data_.remote_id_);
00063 }
00064
00065 pending_assoc_timer_->cancel_timer(this, it->second);
00066 }
00067
00068 pending_assoc_timer_->wait();
00069
00070 }
00071
00072 void
00073 TransportClient::enable_transport(bool reliable, bool durable)
00074 {
00075
00076 TransportConfig_rch tc;
00077
00078
00079
00080 for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this));
00081 ent && tc.is_nil(); ent = ent->parent()) {
00082 tc = ent->transport_config();
00083 }
00084
00085 if (tc.is_nil()) {
00086 TransportRegistry* const reg = TransportRegistry::instance();
00087
00088 tc = reg->domain_default_config(domain_id());
00089
00090 if (tc.is_nil()) {
00091
00092 tc = reg->global_config();
00093
00094 if (!tc.is_nil() && tc->instances_.empty()
00095 && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
00096
00097
00098 tc = reg->fix_empty_default();
00099 }
00100 }
00101 }
00102
00103 if (tc.is_nil()) {
00104 ACE_ERROR((LM_ERROR,
00105 ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00106 ACE_TEXT("No TransportConfig found.\n")));
00107 throw Transport::NotConfigured();
00108 }
00109
00110 enable_transport_using_config(reliable, durable, tc);
00111 }
00112
00113 void
00114 TransportClient::enable_transport_using_config(bool reliable, bool durable,
00115 const TransportConfig_rch& tc)
00116 {
00117 swap_bytes_ = tc->swap_bytes_;
00118 cdr_encapsulation_ = false;
00119 reliable_ = reliable;
00120 durable_ = durable;
00121 unsigned long duration = tc->passive_connect_duration_;
00122 if (duration == 0) {
00123 duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION;
00124 if (DCPS_debug_level) {
00125 ACE_DEBUG((LM_WARNING,
00126 ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
00127 ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
00128 ACE_TEXT("default value\n")));
00129 }
00130 }
00131 passive_connect_duration_.set(duration / 1000, (duration % 1000) * 1000);
00132
00133 const size_t n = tc->instances_.size();
00134
00135 for (size_t i = 0; i < n; ++i) {
00136 TransportInst_rch inst = tc->instances_[i];
00137
00138 if (check_transport_qos(*inst)) {
00139 TransportImpl* impl = inst->impl();
00140
00141 if (impl) {
00142 impls_.push_back(impl);
00143 const CORBA::ULong len = conn_info_.length();
00144 conn_info_.length(len + 1);
00145 impl->connection_info(conn_info_[len]);
00146
00147 #if defined(OPENDDS_SECURITY)
00148 impl->local_crypto_handle(get_crypto_handle());
00149 #endif
00150
00151 cdr_encapsulation_ |= inst->requires_cdr();
00152 }
00153 }
00154 }
00155
00156 if (impls_.empty()) {
00157 ACE_ERROR((LM_ERROR,
00158 ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00159 ACE_TEXT("No TransportImpl could be created.\n")));
00160 throw Transport::NotConfigured();
00161 }
00162 }
00163
00164
00165 bool
00166 TransportClient::associate(const AssociationData& data, bool active)
00167 {
00168 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
00169
00170 repo_id_ = get_repo_id();
00171
00172 if (impls_.empty()) {
00173 if (DCPS_debug_level) {
00174 GuidConverter writer_converter(repo_id_);
00175 GuidConverter reader_converter(data.remote_id_);
00176 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00177 ACE_TEXT("local %C remote %C no available impls\n"),
00178 OPENDDS_STRING(writer_converter).c_str(),
00179 OPENDDS_STRING(reader_converter).c_str()));
00180 }
00181 return false;
00182 }
00183
00184 bool all_impls_shut_down = true;
00185 for (size_t i = 0; i < impls_.size(); ++i) {
00186 if (!impls_.at(i)->is_shut_down()) {
00187 all_impls_shut_down = false;
00188 break;
00189 }
00190 }
00191
00192 if (all_impls_shut_down) {
00193 if (DCPS_debug_level) {
00194 GuidConverter writer_converter(repo_id_);
00195 GuidConverter reader_converter(data.remote_id_);
00196 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00197 ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
00198 OPENDDS_STRING(writer_converter).c_str(),
00199 OPENDDS_STRING(reader_converter).c_str()));
00200 }
00201 return false;
00202 }
00203
00204 PendingMap::iterator iter = pending_.find(data.remote_id_);
00205
00206 if (iter == pending_.end()) {
00207 RepoId remote_copy(data.remote_id_);
00208 iter = pending_.insert(std::make_pair(remote_copy, make_rch<PendingAssoc>())).first;
00209
00210 GuidConverter tc_assoc(repo_id_);
00211 GuidConverter remote_new(data.remote_id_);
00212 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
00213 "between %C and remote %C\n",
00214 OPENDDS_STRING(tc_assoc).c_str(),
00215 OPENDDS_STRING(remote_new).c_str()), 0);
00216 } else {
00217
00218 ACE_ERROR((LM_ERROR,
00219 ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
00220 ACE_TEXT("already associating with remote.\n")));
00221
00222 return false;
00223
00224 }
00225
00226 PendingAssoc_rch pend = iter->second;
00227 pend->active_ = active;
00228 pend->impls_.clear();
00229 pend->blob_index_ = 0;
00230 pend->data_ = data;
00231 pend->attribs_.local_id_ = repo_id_;
00232 pend->attribs_.priority_ = get_priority_value(data);
00233 pend->attribs_.local_reliable_ = reliable_;
00234 pend->attribs_.local_durable_ = durable_;
00235
00236 if (active) {
00237 pend->impls_.reserve(impls_.size());
00238 std::reverse_copy(impls_.begin(), impls_.end(),
00239 std::back_inserter(pend->impls_));
00240
00241 return pend->initiate_connect(this, guard);
00242
00243 } else {
00244
00245
00246 for (size_t i = 0; i < impls_.size(); ++i) {
00247 pend->impls_.push_back(impls_[i]);
00248 const OPENDDS_STRING type = impls_[i]->transport_type();
00249
00250 for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
00251 if (data.remote_data_[j].transport_type.in() == type) {
00252 const TransportImpl::RemoteTransport remote = {
00253 data.remote_id_, data.remote_data_[j].data,
00254 data.publication_transport_priority_,
00255 data.remote_reliable_, data.remote_durable_};
00256
00257 TransportImpl::AcceptConnectResult res;
00258 {
00259
00260
00261
00262
00263 ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
00264 res = impls_[i]->accept_datalink(remote, pend->attribs_, rchandle_from(this));
00265 }
00266
00267
00268 PendingMap::iterator iter_after_accept = pending_.find(data.remote_id_);
00269
00270 if (iter_after_accept == pending_.end()) {
00271
00272
00273 return true;
00274 }
00275
00276 if (res.success_) {
00277 if (res.link_.is_nil()) {
00278
00279 pending_assoc_timer_->schedule_timer(this, iter->second);
00280 }
00281 else {
00282 use_datalink_i(data.remote_id_, res.link_, guard);
00283 }
00284 return true;
00285 }
00286 }
00287 }
00288
00289
00290 }
00291
00292 pending_assoc_timer_->schedule_timer(this, iter->second);
00293 }
00294
00295 return true;
00296 }
00297
00298 int
00299 TransportClient::PendingAssoc::handle_timeout(const ACE_Time_Value&,
00300 const void* arg)
00301 {
00302 TransportClient* tc = static_cast<TransportClient*>(const_cast<void*>(arg));
00303
00304 tc->use_datalink(data_.remote_id_, DataLink_rch());
00305
00306 return 0;
00307 }
00308
00309 bool
00310 TransportClient::initiate_connect_i(TransportImpl::AcceptConnectResult& result,
00311 TransportImpl* impl,
00312 const TransportImpl::RemoteTransport& remote,
00313 const TransportImpl::ConnectionAttribs& attribs_,
00314 Guard& guard)
00315 {
00316 if (!guard.locked()) {
00317
00318 GuidConverter local(repo_id_);
00319 GuidConverter remote_conv(remote.repo_id_);
00320 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00321 "guard was not locked, return false - initiate_connect_i between local %C and remote %C unsuccessful\n",
00322 OPENDDS_STRING(local).c_str(),
00323 OPENDDS_STRING(remote_conv).c_str()), 0);
00324 return false;
00325 }
00326
00327 {
00328
00329 guard.release();
00330 GuidConverter local(repo_id_);
00331 GuidConverter remote_conv(remote.repo_id_);
00332 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00333 "attempt to connect_datalink between local %C and remote %C\n",
00334 OPENDDS_STRING(local).c_str(),
00335 OPENDDS_STRING(remote_conv).c_str()), 0);
00336 result = impl->connect_datalink(remote, attribs_, rchandle_from(this));
00337 guard.acquire();
00338 if (!result.success_) {
00339 if (DCPS_debug_level) {
00340 GuidConverter writer_converter(repo_id_);
00341 GuidConverter reader_converter(remote.repo_id_);
00342 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00343 ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
00344 OPENDDS_STRING(writer_converter).c_str(),
00345 OPENDDS_STRING(reader_converter).c_str()));
00346 }
00347 return false;
00348 }
00349 }
00350
00351 GuidConverter local(repo_id_);
00352 GuidConverter remote_conv(remote.repo_id_);
00353 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00354 "connection between local %C and remote %C initiation successful\n",
00355 OPENDDS_STRING(local).c_str(),
00356 OPENDDS_STRING(remote_conv).c_str()), 0);
00357 return true;
00358 }
00359
00360 bool
00361 TransportClient::PendingAssoc::initiate_connect(TransportClient* tc,
00362 Guard& guard)
00363 {
00364 GuidConverter local(tc->repo_id_);
00365 GuidConverter remote(data_.remote_id_);
00366 VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
00367 "between %C and remote %C\n",
00368 OPENDDS_STRING(local).c_str(),
00369 OPENDDS_STRING(remote).c_str()), 0);
00370
00371 while (!impls_.empty()) {
00372 TransportImpl* impl = impls_.back();
00373 const OPENDDS_STRING type = impl->transport_type();
00374
00375 for (; blob_index_ < data_.remote_data_.length(); ++blob_index_) {
00376 if (data_.remote_data_[blob_index_].transport_type.in() == type) {
00377 const TransportImpl::RemoteTransport remote = {
00378 data_.remote_id_, data_.remote_data_[blob_index_].data,
00379 data_.publication_transport_priority_,
00380 data_.remote_reliable_, data_.remote_durable_};
00381
00382 TransportImpl::AcceptConnectResult res;
00383 GuidConverter tmp_local(tc->repo_id_);
00384 GuidConverter tmp_remote(data_.remote_id_);
00385 if (!tc->initiate_connect_i(res, impl, remote, attribs_, guard)) {
00386
00387
00388 if (res.success_ ) {
00389 GuidConverter local(tc->repo_id_);
00390 GuidConverter remote(data_.remote_id_);
00391 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) PendingAssoc::initiate_connect - ")
00392 ACE_TEXT("between %C and remote %C success\n"),
00393 OPENDDS_STRING(local).c_str(),
00394 OPENDDS_STRING(remote).c_str()), 0);
00395 return true;
00396 }
00397
00398 VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
00399 "between %C and remote %C unsuccessful\n",
00400 OPENDDS_STRING(tmp_local).c_str(),
00401 OPENDDS_STRING(tmp_remote).c_str()), 0);
00402 break;
00403 }
00404
00405 if (res.success_) {
00406
00407 ++blob_index_;
00408
00409 if (!res.link_.is_nil()) {
00410
00411 tc->use_datalink_i(data_.remote_id_, res.link_, guard);
00412 } else {
00413 GuidConverter local(tc->repo_id_);
00414 GuidConverter remote(data_.remote_id_);
00415 VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
00416 "resulting link from initiate_connect_i (local: %C to remote: %C) was nil\n",
00417 OPENDDS_STRING(local).c_str(),
00418 OPENDDS_STRING(remote).c_str()), 0);
00419 }
00420
00421 return true;
00422 } else {
00423 GuidConverter local(tc->repo_id_);
00424 GuidConverter remote(data_.remote_id_);
00425 VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
00426 "result of initiate_connect_i (local: %C to remote: %C) was not success \n",
00427 OPENDDS_STRING(local).c_str(),
00428 OPENDDS_STRING(remote).c_str()), 0);
00429 }
00430 }
00431 }
00432
00433 impls_.pop_back();
00434 blob_index_ = 0;
00435 }
00436
00437 return false;
00438 }
00439
00440 void
00441 TransportClient::use_datalink(const RepoId& remote_id,
00442 const DataLink_rch& link)
00443 {
00444 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00445
00446 use_datalink_i(remote_id, link, guard);
00447 }
00448
00449 void
00450 TransportClient::use_datalink_i(const RepoId& remote_id_ref,
00451 const DataLink_rch& link,
00452 Guard& guard)
00453 {
00454
00455
00456
00457
00458
00459 RepoId remote_id(remote_id_ref);
00460
00461 GuidConverter peerId_conv(remote_id);
00462 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00463 "TransportClient(%@) using datalink[%@] from %C\n",
00464 this,
00465 link.in(),
00466 OPENDDS_STRING(peerId_conv).c_str()), 0);
00467
00468 PendingMap::iterator iter = pending_.find(remote_id);
00469
00470 if (iter == pending_.end()) {
00471 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00472 "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
00473 this,
00474 link.in(),
00475 OPENDDS_STRING(peerId_conv).c_str()), 0);
00476 return;
00477 }
00478
00479 PendingAssoc_rch pend = iter->second;
00480 const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
00481 bool ok = false;
00482
00483 if (link.is_nil()) {
00484
00485 if (pend->active_ && pend->initiate_connect(this, guard)) {
00486 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00487 "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect\n",
00488 this,
00489 link.in(),
00490 OPENDDS_STRING(peerId_conv).c_str()), 0);
00491 return;
00492 }
00493
00494 } else {
00495 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00496 "TransportClient(%@) about to add_link[%@] to remote: %C\n",
00497 this,
00498 link.in(),
00499 OPENDDS_STRING(peerId_conv).c_str()), 0);
00500
00501 add_link(link, remote_id);
00502 ok = true;
00503 }
00504
00505
00506
00507 if (!pend->active_) {
00508
00509 for (size_t i = 0; i < pend->impls_.size(); ++i) {
00510 pend->impls_[i]->stop_accepting_or_connecting(*this, pend->data_.remote_id_);
00511 }
00512 }
00513
00514 pending_.erase(iter);
00515
00516 guard.release();
00517
00518 pending_assoc_timer_->cancel_timer(this, pend);
00519
00520 transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
00521 }
00522
00523 void
00524 TransportClient::add_link(const DataLink_rch& link, const RepoId& peer)
00525 {
00526 links_.insert_link(link);
00527 data_link_index_[peer] = link;
00528
00529 TransportReceiveListener_rch trl = get_receive_listener();
00530
00531 if (trl) {
00532 link->make_reservation(peer, repo_id_, trl);
00533
00534 } else {
00535 link->make_reservation(peer, repo_id_, get_send_listener());
00536 }
00537 }
00538
00539 void
00540 TransportClient::stop_associating()
00541 {
00542 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00543 pending_.clear();
00544 }
00545
00546 void
00547 TransportClient::stop_associating(const GUID_t* repos, CORBA::ULong length)
00548 {
00549 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00550
00551 if (repos == 0 || length == 0) {
00552 return;
00553 } else {
00554 for (CORBA::ULong i = 0; i < length; ++i) {
00555 pending_.erase(repos[i]);
00556 }
00557 }
00558 }
00559
00560 void
00561 TransportClient::send_final_acks()
00562 {
00563 links_.send_final_acks (get_repo_id());
00564 }
00565
00566 void
00567 TransportClient::disassociate(const RepoId& peerId)
00568 {
00569 GuidConverter peerId_conv(peerId);
00570 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
00571 "TransportClient(%@) disassociating from %C\n",
00572 this,
00573 OPENDDS_STRING(peerId_conv).c_str()), 5);
00574
00575 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00576
00577 if (pending_.erase(peerId)) {
00578 return;
00579 }
00580
00581 const DataLinkIndex::iterator found = data_link_index_.find(peerId);
00582
00583 if (found == data_link_index_.end()) {
00584 if (DCPS_debug_level > 4) {
00585 const GuidConverter converter(peerId);
00586 ACE_DEBUG((LM_DEBUG,
00587 ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00588 ACE_TEXT("no link for remote peer %C\n"),
00589 OPENDDS_STRING(converter).c_str()));
00590 }
00591
00592 return;
00593 }
00594
00595 const DataLink_rch link = found->second;
00596
00597
00598
00599 data_link_index_.erase(found);
00600 DataLinkSetMap released;
00601
00602 if (DCPS_debug_level > 4) {
00603 ACE_DEBUG((LM_DEBUG,
00604 ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00605 ACE_TEXT("about to release_reservations for link[%@] \n"),
00606 link.in()));
00607 }
00608
00609 link->release_reservations(peerId, repo_id_, released);
00610
00611 if (!released.empty()) {
00612
00613 if (DCPS_debug_level > 4) {
00614 ACE_DEBUG((LM_DEBUG,
00615 ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00616 ACE_TEXT("about to remove_link[%@] from links_\n"),
00617 link.in()));
00618 }
00619 links_.remove_link(link);
00620
00621 if (DCPS_debug_level > 4) {
00622 GuidConverter converter(repo_id_);
00623 ACE_DEBUG((LM_DEBUG,
00624 ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
00625 OPENDDS_STRING(converter).c_str(),
00626 link.in()));
00627 }
00628
00629 link->remove_listener(repo_id_);
00630
00631 }
00632 }
00633
00634 void
00635 TransportClient::register_for_reader(const RepoId& participant,
00636 const RepoId& writerid,
00637 const RepoId& readerid,
00638 const TransportLocatorSeq& locators,
00639 OpenDDS::DCPS::DiscoveryListener* listener)
00640 {
00641 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00642 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00643 pos != limit;
00644 ++pos) {
00645 (*pos)->register_for_reader(participant, writerid, readerid, locators, listener);
00646 }
00647 }
00648
00649 void
00650 TransportClient::unregister_for_reader(const RepoId& participant,
00651 const RepoId& writerid,
00652 const RepoId& readerid)
00653 {
00654 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00655 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00656 pos != limit;
00657 ++pos) {
00658 (*pos)->unregister_for_reader(participant, writerid, readerid);
00659 }
00660 }
00661
00662 void
00663 TransportClient::register_for_writer(const RepoId& participant,
00664 const RepoId& readerid,
00665 const RepoId& writerid,
00666 const TransportLocatorSeq& locators,
00667 DiscoveryListener* listener)
00668 {
00669 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00670 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00671 pos != limit;
00672 ++pos) {
00673 (*pos)->register_for_writer(participant, readerid, writerid, locators, listener);
00674 }
00675 }
00676
00677 void
00678 TransportClient::unregister_for_writer(const RepoId& participant,
00679 const RepoId& readerid,
00680 const RepoId& writerid)
00681 {
00682 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00683 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00684 pos != limit;
00685 ++pos) {
00686 (*pos)->unregister_for_writer(participant, readerid, writerid);
00687 }
00688 }
00689
00690 bool
00691 TransportClient::send_response(const RepoId& peer,
00692 const DataSampleHeader& header,
00693 Message_Block_Ptr payload)
00694 {
00695 DataLinkIndex::iterator found = data_link_index_.find(peer);
00696
00697 if (found == data_link_index_.end()) {
00698 if (DCPS_debug_level > 4) {
00699 GuidConverter converter(peer);
00700 ACE_DEBUG((LM_DEBUG,
00701 ACE_TEXT("(%P|%t) TransportClient::send_response: ")
00702 ACE_TEXT("no link for publication %C, ")
00703 ACE_TEXT("not sending response.\n"),
00704 OPENDDS_STRING(converter).c_str()));
00705 }
00706
00707 return false;
00708 }
00709
00710 DataLinkSet singular;
00711 singular.insert_link(found->second);
00712 singular.send_response(peer, header, move(payload));
00713 return true;
00714 }
00715
00716 void
00717 TransportClient::send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
00718 {
00719 if (send_list.head() == 0) {
00720 return;
00721 }
00722 ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
00723 send_i(send_list, transaction_id);
00724 }
00725
00726 SendControlStatus
00727 TransportClient::send_w_control(SendStateDataSampleList send_list,
00728 const DataSampleHeader& header,
00729 Message_Block_Ptr msg,
00730 const RepoId& destination)
00731 {
00732 ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
00733 send_transaction_lock_, SEND_CONTROL_ERROR);
00734 if (send_list.head()) {
00735 send_i(send_list, 0);
00736 }
00737 return send_control_to(header, move(msg), destination);
00738 }
00739
00740 void
00741 TransportClient::send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
00742 {
00743 if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
00744 if (transaction_id > max_transaction_id_seen_) {
00745 max_transaction_id_seen_ = transaction_id;
00746 max_transaction_tail_ = send_list.tail();
00747 }
00748 return;
00749 } else {
00750
00751 DataSampleElement* cur = send_list.head();
00752 if (max_transaction_tail_ == 0) {
00753
00754 if (transaction_id != 0)
00755 max_transaction_id_seen_ = expected_transaction_id_;
00756
00757 max_transaction_tail_ = send_list.tail();
00758 }
00759 DataLinkSet send_links;
00760
00761 while (true) {
00762
00763
00764
00765
00766
00767
00768
00769
00770
00771 DataSampleElement* next_elem;
00772 if (cur != max_transaction_tail_) {
00773 next_elem = cur->get_next_send_sample();
00774 } else {
00775 next_elem = max_transaction_tail_;
00776 }
00777 DataLinkSet_rch pub_links =
00778 (cur->get_num_subs() > 0)
00779 ? DataLinkSet_rch(links_.select_links(cur->get_sub_ids(), cur->get_num_subs()))
00780 : DataLinkSet_rch(&links_, inc_count());
00781
00782 if (pub_links.is_nil() || pub_links->empty()) {
00783
00784
00785
00786 if (DCPS_debug_level > 4) {
00787 GuidConverter converter(cur->get_pub_id());
00788 ACE_DEBUG((LM_DEBUG,
00789 ACE_TEXT("(%P|%t) TransportClient::send_i: ")
00790 ACE_TEXT("no links for publication %C, ")
00791 ACE_TEXT("not sending element %@ for transaction: %d.\n"),
00792 OPENDDS_STRING(converter).c_str(),
00793 cur,
00794 cur->transaction_id()));
00795 }
00796
00797
00798
00799
00800 cur->get_send_listener()->data_delivered(cur);
00801
00802 } else {
00803 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
00804 , cur), 5);
00805
00806 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00807
00808
00809
00810
00811
00812
00813
00814 if (cur->filter_out_.ptr()) {
00815 DataLinkSet_rch subset;
00816 DataLinkSet::GuardType guard(pub_links->lock());
00817 typedef DataLinkSet::MapType MapType;
00818 MapType& map = pub_links->map();
00819
00820 for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
00821 size_t n_subs;
00822 GUIDSeq_var ti =
00823 itr->second->target_intersection(cur->get_pub_id(),
00824 cur->filter_out_.in(), n_subs);
00825
00826 if (ti.ptr() == 0 || ti->length() != n_subs) {
00827 if (!subset.in()) {
00828 subset = make_rch<DataLinkSet>();
00829 }
00830
00831 subset->insert_link(itr->second);
00832 cur->filter_per_link_[itr->first] = ti._retn();
00833
00834 } else {
00835 VDBG((LM_DEBUG,
00836 "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
00837 itr->second.in()));
00838 }
00839 }
00840
00841 if (!subset.in()) {
00842 VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
00843
00844 cur->get_send_listener()->data_delivered(cur);
00845 if (cur != max_transaction_tail_) {
00846
00847 cur = next_elem;
00848 continue;
00849 } else {
00850 break;
00851 }
00852 }
00853
00854 pub_links = subset;
00855 }
00856
00857 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00858
00859
00860
00861
00862
00863
00864
00865
00866 send_links.send_start(pub_links.in());
00867 if (cur->get_header().message_id_ != SAMPLE_DATA) {
00868 pub_links->send_control(cur);
00869 } else {
00870 pub_links->send(cur);
00871 }
00872 }
00873 if (cur != max_transaction_tail_) {
00874
00875 cur = next_elem;
00876 } else {
00877 break;
00878 }
00879 }
00880
00881
00882
00883
00884
00885
00886
00887 RepoId pub_id(repo_id_);
00888 send_links.send_stop(pub_id);
00889 if (transaction_id != 0)
00890 expected_transaction_id_ = max_transaction_id_seen_ + 1;
00891 max_transaction_tail_ = 0;
00892 }
00893 }
00894
00895 TransportSendListener_rch
00896 TransportClient::get_send_listener()
00897 {
00898 return rchandle_from(dynamic_cast<TransportSendListener*>(this));
00899 }
00900
00901 TransportReceiveListener_rch
00902 TransportClient::get_receive_listener()
00903 {
00904 return rchandle_from(dynamic_cast<TransportReceiveListener*>(this));
00905 }
00906
00907 SendControlStatus
00908 TransportClient::send_control(const DataSampleHeader& header,
00909 Message_Block_Ptr msg)
00910 {
00911 return links_.send_control(repo_id_, get_send_listener(), header, move(msg));
00912 }
00913
00914 SendControlStatus
00915 TransportClient::send_control_to(const DataSampleHeader& header,
00916 Message_Block_Ptr msg,
00917 const RepoId& destination)
00918 {
00919 DataLinkSet singular;
00920 {
00921 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR);
00922 DataLinkIndex::iterator found = data_link_index_.find(destination);
00923
00924 if (found == data_link_index_.end()) {
00925 return SEND_CONTROL_ERROR;
00926 }
00927
00928 singular.insert_link(found->second);
00929 }
00930 return singular.send_control(repo_id_, get_send_listener(), header, move(msg));
00931 }
00932
00933 bool
00934 TransportClient::remove_sample(const DataSampleElement* sample)
00935 {
00936 return links_.remove_sample(sample);
00937 }
00938
00939 bool
00940 TransportClient::remove_all_msgs()
00941 {
00942 return links_.remove_all_msgs(repo_id_);
00943 }
00944
00945 }
00946 }
00947
00948 OPENDDS_END_VERSIONED_NAMESPACE_DECL