00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "TransportClient.h"
00011 #include "TransportConfig.h"
00012 #include "TransportRegistry.h"
00013 #include "TransportExceptions.h"
00014 #include "TransportReceiveListener.h"
00015
00016 #include "dds/DdsDcpsInfoUtilsC.h"
00017
00018 #include "dds/DCPS/DataWriterImpl.h"
00019 #include "dds/DCPS/SendStateDataSampleList.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DCPS/Definitions.h"
00022 #include "dds/DCPS/Service_Participant.h"
00023
00024 #include "ace/Reactor_Timer_Interface.h"
00025
00026 #include <algorithm>
00027 #include <iterator>
00028
00029 namespace OpenDDS {
00030 namespace DCPS {
00031
00032 TransportClient::TransportClient()
00033 : pending_assoc_timer_(new PendingAssocTimer (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
00034 , expected_transaction_id_(1)
00035 , max_transaction_id_seen_(0)
00036 , max_transaction_tail_(0)
00037 , swap_bytes_(false)
00038 , cdr_encapsulation_(false)
00039 , reliable_(false)
00040 , durable_(false)
00041 , reverse_lock_(lock_)
00042 , repo_id_(GUID_UNKNOWN)
00043 {
00044 }
00045
00046 TransportClient::~TransportClient()
00047 {
00048 if (Transport_debug_level > 5) {
00049 GuidConverter converter(repo_id_);
00050 ACE_DEBUG((LM_DEBUG,
00051 ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
00052 OPENDDS_STRING(converter).c_str()));
00053 }
00054
00055 this->stop_associating();
00056
00057 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00058
00059 for (DataLinkIndex::iterator iter = links_waiting_for_on_deleted_callback_.begin();
00060 iter != links_waiting_for_on_deleted_callback_.end(); ++iter) {
00061 if (Transport_debug_level > 5) {
00062 GuidConverter converter(repo_id_);
00063 ACE_DEBUG((LM_DEBUG,
00064 ACE_TEXT("(%P|%t) TransportClient[%@]::~TransportClient: about to remove_listener %C from link waiting for callback\n"),
00065 this,
00066 OPENDDS_STRING(converter).c_str()));
00067 }
00068 iter->second->remove_listener(repo_id_);
00069 }
00070
00071 for (DataLinkSet::MapType::iterator iter = links_.map().begin();
00072 iter != links_.map().end(); ++iter) {
00073 if (Transport_debug_level > 5) {
00074 GuidConverter converter(repo_id_);
00075 ACE_DEBUG((LM_DEBUG,
00076 ACE_TEXT("(%P|%t) TransportClient[%@]::~TransportClient: about to remove_listener %C\n"),
00077 this,
00078 OPENDDS_STRING(converter).c_str()));
00079 }
00080 iter->second->remove_listener(repo_id_);
00081 }
00082
00083 for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
00084 for (size_t i = 0; i < impls_.size(); ++i) {
00085 impls_[i]->stop_accepting_or_connecting(this, it->second->data_.remote_id_);
00086 }
00087
00088 pending_assoc_timer_->cancel_timer(this, it->second);
00089 }
00090
00091 pending_assoc_timer_->wait();
00092 pending_assoc_timer_->destroy();
00093
00094 for (OPENDDS_VECTOR(TransportImpl_rch)::iterator it = impls_.begin();
00095 it != impls_.end(); ++it) {
00096
00097 (*it)->detach_client(this);
00098 }
00099 }
00100
00101 void
00102 TransportClient::enable_transport(bool reliable, bool durable)
00103 {
00104
00105 TransportConfig_rch tc;
00106
00107
00108
00109 for (const EntityImpl* ent = dynamic_cast<const EntityImpl*>(this);
00110 ent && tc.is_nil(); ent = ent->parent()) {
00111 tc = ent->transport_config();
00112 }
00113
00114 if (tc.is_nil()) {
00115 TransportRegistry* const reg = TransportRegistry::instance();
00116
00117 tc = reg->domain_default_config(domain_id());
00118
00119 if (tc.is_nil()) {
00120
00121 tc = reg->global_config();
00122
00123 if (!tc.is_nil() && tc->instances_.empty()
00124 && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
00125
00126
00127 tc = reg->fix_empty_default();
00128 }
00129 }
00130 }
00131
00132 if (tc.is_nil()) {
00133 ACE_ERROR((LM_ERROR,
00134 ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00135 ACE_TEXT("No TransportConfig found.\n")));
00136 throw Transport::NotConfigured();
00137 }
00138
00139 enable_transport_using_config(reliable, durable, tc);
00140 }
00141
00142 void
00143 TransportClient::enable_transport_using_config(bool reliable, bool durable,
00144 const TransportConfig_rch& tc)
00145 {
00146 swap_bytes_ = tc->swap_bytes_;
00147 cdr_encapsulation_ = false;
00148 reliable_ = reliable;
00149 durable_ = durable;
00150 unsigned long duration = tc->passive_connect_duration_;
00151 if (duration == 0) {
00152 duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION;
00153 if (DCPS_debug_level) {
00154 ACE_DEBUG((LM_WARNING,
00155 ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
00156 ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
00157 ACE_TEXT("default value\n")));
00158 }
00159 }
00160 passive_connect_duration_.set(duration / 1000, (duration % 1000) * 1000);
00161
00162 const size_t n = tc->instances_.size();
00163
00164 for (size_t i = 0; i < n; ++i) {
00165 TransportInst_rch inst = tc->instances_[i];
00166
00167 if (check_transport_qos(*inst.in())) {
00168 TransportImpl_rch impl = inst->impl();
00169
00170 if (!impl.is_nil()) {
00171 impl->attach_client(this);
00172 impls_.push_back(impl);
00173 const CORBA::ULong len = conn_info_.length();
00174 conn_info_.length(len + 1);
00175 impl->connection_info(conn_info_[len]);
00176 cdr_encapsulation_ |= inst->requires_cdr();
00177 }
00178 }
00179 }
00180
00181 if (impls_.empty()) {
00182 ACE_ERROR((LM_ERROR,
00183 ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
00184 ACE_TEXT("No TransportImpl could be created.\n")));
00185 throw Transport::NotConfigured();
00186 }
00187 }
00188
00189 void
00190 TransportClient::transport_detached(TransportImpl* which)
00191 {
00192
00193 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00194
00195
00196
00197 for (DataLinkSet::MapType::iterator iter = links_.map().begin();
00198 iter != links_.map().end();) {
00199 TransportImpl_rch impl = iter->second->impl();
00200
00201 if (impl.in() == which) {
00202 for (DataLinkIndex::iterator it2 = data_link_index_.begin();
00203 it2 != data_link_index_.end();) {
00204 if (it2->second.in() == iter->second.in()) {
00205 data_link_index_.erase(it2++);
00206
00207 } else {
00208 ++it2;
00209 }
00210 }
00211 if (DCPS_debug_level > 4) {
00212 GuidConverter converter(repo_id_);
00213 ACE_DEBUG((LM_DEBUG,
00214 ACE_TEXT("(%P|%t) TransportClient::transport_detached: calling remove_listener %C on link[%@]\n"),
00215 OPENDDS_STRING(converter).c_str(),
00216 iter->second.in()));
00217 }
00218 iter->second->remove_listener(repo_id_);
00219 links_.map().erase(iter++);
00220
00221 } else {
00222 ++iter;
00223 }
00224 }
00225
00226
00227 for (OPENDDS_VECTOR(TransportImpl_rch)::iterator it = impls_.begin();
00228 it != impls_.end(); ++it) {
00229 if (it->in() == which) {
00230 impls_.erase(it);
00231
00232 for (PendingMap::iterator it2 = pending_.begin();
00233 it2 != pending_.end(); ++it2) {
00234 which->stop_accepting_or_connecting(this, it2->first);
00235 }
00236
00237 return;
00238 }
00239 }
00240 }
00241
00242 bool
00243 TransportClient::associate(const AssociationData& data, bool active)
00244 {
00245 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
00246
00247 repo_id_ = get_repo_id();
00248
00249 if (impls_.empty()) {
00250 if (DCPS_debug_level) {
00251 GuidConverter writer_converter(this->repo_id_);
00252 GuidConverter reader_converter(data.remote_id_);
00253 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00254 ACE_TEXT("local %C remote %C no available impls\n"),
00255 OPENDDS_STRING(writer_converter).c_str(),
00256 OPENDDS_STRING(reader_converter).c_str()));
00257 }
00258 return false;
00259 }
00260
00261 bool all_impls_shut_down = true;
00262 for (size_t i = 0; i < impls_.size(); ++i) {
00263 if (!impls_.at(i)->is_shut_down()) {
00264 all_impls_shut_down = false;
00265 break;
00266 }
00267 }
00268
00269 if (all_impls_shut_down) {
00270 if (DCPS_debug_level) {
00271 GuidConverter writer_converter(this->repo_id_);
00272 GuidConverter reader_converter(data.remote_id_);
00273 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00274 ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
00275 OPENDDS_STRING(writer_converter).c_str(),
00276 OPENDDS_STRING(reader_converter).c_str()));
00277 }
00278 return false;
00279 }
00280
00281 PendingMap::iterator iter = pending_.find(data.remote_id_);
00282
00283 if (iter == pending_.end()) {
00284 RepoId remote_copy(data.remote_id_);
00285 iter = pending_.insert(std::make_pair(remote_copy, new PendingAssoc())).first;
00286
00287 GuidConverter tc_assoc(this->repo_id_);
00288 GuidConverter remote_new(data.remote_id_);
00289 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
00290 "between %C and remote %C\n",
00291 OPENDDS_STRING(tc_assoc).c_str(),
00292 OPENDDS_STRING(remote_new).c_str()), 0);
00293 } else {
00294 if (iter->second->removed_) {
00295 iter->second->removed_ = false;
00296 GuidConverter tc_assoc(this->repo_id_);
00297 GuidConverter remote_new(data.remote_id_);
00298 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate found existing PendingAssoc "
00299 "between %C and remote %C, set removed to false to continue using this pending association\n",
00300 OPENDDS_STRING(tc_assoc).c_str(),
00301 OPENDDS_STRING(remote_new).c_str()), 0);
00302 } else {
00303 ACE_ERROR((LM_ERROR,
00304 ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
00305 ACE_TEXT("already associating with remote.\n")));
00306
00307 return false;
00308 }
00309 }
00310
00311 PendingAssoc& pend = *(iter->second);
00312 pend.active_ = active;
00313 pend.impls_.clear();
00314 pend.blob_index_ = 0;
00315 pend.data_ = data;
00316 pend.attribs_.local_id_ = repo_id_;
00317 pend.attribs_.priority_ = get_priority_value(data);
00318 pend.attribs_.local_reliable_ = reliable_;
00319 pend.attribs_.local_durable_ = durable_;
00320
00321 if (active) {
00322 pend.impls_.reserve(impls_.size());
00323 std::reverse_copy(impls_.begin(), impls_.end(),
00324 std::back_inserter(pend.impls_));
00325
00326 pend.initiate_connect(this, guard);
00327
00328
00329
00330 return true;
00331
00332 } else {
00333
00334
00335 for (size_t i = 0; i < impls_.size(); ++i) {
00336 pend.impls_.push_back(impls_[i]);
00337 const OPENDDS_STRING type = impls_[i]->transport_type();
00338
00339 for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
00340 if (data.remote_data_[j].transport_type.in() == type) {
00341 const TransportImpl::RemoteTransport remote = {
00342 data.remote_id_, data.remote_data_[j].data,
00343 data.publication_transport_priority_,
00344 data.remote_reliable_, data.remote_durable_};
00345
00346 TransportImpl::AcceptConnectResult res;
00347 {
00348
00349
00350
00351
00352 ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
00353 res = impls_[i]->accept_datalink(remote, pend.attribs_, this);
00354 }
00355
00356
00357 PendingMap::iterator iter_after_accept = pending_.find(data.remote_id_);
00358
00359 if (iter_after_accept == pending_.end()) {
00360
00361
00362 return true;
00363 }
00364
00365 if (res.success_ && !res.link_.is_nil()) {
00366
00367 use_datalink_i(data.remote_id_, res.link_, guard);
00368
00369 return true;
00370 }
00371 }
00372 }
00373
00374
00375 }
00376
00377 pending_assoc_timer_->schedule_timer(this, iter->second);
00378 }
00379
00380 return true;
00381 }
00382
00383 int
00384 TransportClient::PendingAssoc::handle_timeout(const ACE_Time_Value&,
00385 const void* arg)
00386 {
00387 TransportClient* tc = static_cast<TransportClient*>(const_cast<void*>(arg));
00388
00389 tc->use_datalink(data_.remote_id_, 0);
00390
00391 return 0;
00392 }
00393
00394 bool
00395 TransportClient::initiate_connect_i(TransportImpl::AcceptConnectResult& result,
00396 const TransportImpl_rch impl,
00397 const TransportImpl::RemoteTransport& remote,
00398 const TransportImpl::ConnectionAttribs& attribs_,
00399 Guard& guard)
00400 {
00401 if (!guard.locked()) {
00402
00403 GuidConverter local(repo_id_);
00404 GuidConverter remote_conv(remote.repo_id_);
00405 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00406 "guard was not locked, return false - initiate_connect_i between local %C and remote %C unsuccessful\n",
00407 OPENDDS_STRING(local).c_str(),
00408 OPENDDS_STRING(remote_conv).c_str()), 0);
00409 return false;
00410 }
00411
00412 {
00413
00414 guard.release();
00415 GuidConverter local(repo_id_);
00416 GuidConverter remote_conv(remote.repo_id_);
00417 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00418 "attempt to connect_datalink between local %C and remote %C\n",
00419 OPENDDS_STRING(local).c_str(),
00420 OPENDDS_STRING(remote_conv).c_str()), 0);
00421 result = impl->connect_datalink(remote, attribs_, this);
00422 if (!result.success_) {
00423 if (DCPS_debug_level) {
00424 GuidConverter writer_converter(repo_id_);
00425 GuidConverter reader_converter(remote.repo_id_);
00426 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
00427 ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
00428 OPENDDS_STRING(writer_converter).c_str(),
00429 OPENDDS_STRING(reader_converter).c_str()));
00430 }
00431 return false;
00432 }
00433 guard.acquire();
00434 }
00435
00436
00437
00438 PendingMap::iterator iter = pending_.find(remote.repo_id_);
00439
00440 if (iter == pending_.end()) {
00441 GuidConverter local(repo_id_);
00442 GuidConverter remote_conv(remote.repo_id_);
00443 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00444 "cannot find pending association after connecting datalink between local %C and remote %C\n",
00445 OPENDDS_STRING(local).c_str(),
00446 OPENDDS_STRING(remote_conv).c_str()), 0);
00447 return false;
00448
00449
00450
00451 } else {
00452 if (iter->second->removed_) {
00453 GuidConverter local(repo_id_);
00454 GuidConverter remote_conv(remote.repo_id_);
00455 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00456 "pending association marked for removal already, after connecting datalink between local %C and remote %C\n",
00457 OPENDDS_STRING(local).c_str(),
00458 OPENDDS_STRING(remote_conv).c_str()), 0);
00459
00460
00461
00462 return false;
00463
00464 }
00465
00466 }
00467 GuidConverter local(repo_id_);
00468 GuidConverter remote_conv(remote.repo_id_);
00469 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
00470 "connection between local %C and remote %C initiation successful\n",
00471 OPENDDS_STRING(local).c_str(),
00472 OPENDDS_STRING(remote_conv).c_str()), 0);
00473 return true;
00474 }
00475
00476 bool
00477 TransportClient::PendingAssoc::initiate_connect(TransportClient* tc,
00478 Guard& guard)
00479 {
00480 GuidConverter local(tc->repo_id_);
00481 GuidConverter remote(this->data_.remote_id_);
00482 VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
00483 "between %C and remote %C\n",
00484 OPENDDS_STRING(local).c_str(),
00485 OPENDDS_STRING(remote).c_str()), 0);
00486
00487 while (!impls_.empty()) {
00488 const TransportImpl_rch& impl = impls_.back();
00489 const OPENDDS_STRING type = impl->transport_type();
00490
00491 for (; blob_index_ < data_.remote_data_.length(); ++blob_index_) {
00492 if (data_.remote_data_[blob_index_].transport_type.in() == type) {
00493 const TransportImpl::RemoteTransport remote = {
00494 data_.remote_id_, data_.remote_data_[blob_index_].data,
00495 data_.publication_transport_priority_,
00496 data_.remote_reliable_, data_.remote_durable_};
00497
00498 TransportImpl::AcceptConnectResult res;
00499 GuidConverter tmp_local(tc->repo_id_);
00500 GuidConverter tmp_remote(this->data_.remote_id_);
00501 if (!tc->initiate_connect_i(res, impl, remote, attribs_, guard)) {
00502
00503
00504 if (res.success_ && !this->removed_) {
00505 GuidConverter local(tc->repo_id_);
00506 GuidConverter remote(this->data_.remote_id_);
00507 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) PendingAssoc::initiate_connect - ")
00508 ACE_TEXT("between %C and remote %C success\n"),
00509 OPENDDS_STRING(local).c_str(),
00510 OPENDDS_STRING(remote).c_str()), 0);
00511 return true;
00512 }
00513
00514 VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
00515 "between %C and remote %C unsuccessful\n",
00516 OPENDDS_STRING(tmp_local).c_str(),
00517 OPENDDS_STRING(tmp_remote).c_str()), 0);
00518 return false;
00519 }
00520
00521 if (res.success_) {
00522
00523 ++blob_index_;
00524
00525 if (!res.link_.is_nil()) {
00526
00527 tc->use_datalink_i(data_.remote_id_, res.link_, guard);
00528 } else {
00529 GuidConverter local(tc->repo_id_);
00530 GuidConverter remote(this->data_.remote_id_);
00531 VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
00532 "resulting link from initiate_connect_i (local: %C to remote: %C) was nil\n",
00533 OPENDDS_STRING(local).c_str(),
00534 OPENDDS_STRING(remote).c_str()), 0);
00535 }
00536
00537 return true;
00538 } else {
00539 GuidConverter local(tc->repo_id_);
00540 GuidConverter remote(this->data_.remote_id_);
00541 VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
00542 "result of initiate_connect_i (local: %C to remote: %C) was not success \n",
00543 OPENDDS_STRING(local).c_str(),
00544 OPENDDS_STRING(remote).c_str()), 0);
00545 }
00546 }
00547 }
00548
00549 impls_.pop_back();
00550 blob_index_ = 0;
00551 }
00552
00553 return false;
00554 }
00555
00556 void
00557 TransportClient::use_datalink(const RepoId& remote_id,
00558 const DataLink_rch& link)
00559 {
00560 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00561
00562 use_datalink_i(remote_id, link, guard);
00563 }
00564
00565 void
00566 TransportClient::use_datalink_i(const RepoId& remote_id_ref,
00567 const DataLink_rch& link,
00568 Guard& guard)
00569 {
00570
00571
00572
00573
00574
00575 RepoId remote_id(remote_id_ref);
00576
00577 GuidConverter peerId_conv(remote_id);
00578 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00579 "TransportClient(%@) using datalink[%@] from %C\n",
00580 this,
00581 link.in(),
00582 OPENDDS_STRING(peerId_conv).c_str()), 0);
00583
00584 PendingMap::iterator iter = pending_.find(remote_id);
00585
00586 if (iter == pending_.end()) {
00587 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00588 "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
00589 this,
00590 link.in(),
00591 OPENDDS_STRING(peerId_conv).c_str()), 0);
00592 return;
00593 }
00594
00595 PendingAssoc* pend = iter->second;
00596 const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
00597 bool ok = false;
00598
00599 if (pend->removed_) {
00600 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00601 "TransportClient(%@) using datalink[%@] pending association to remote %C was removed\n",
00602 this,
00603 link.in(),
00604 OPENDDS_STRING(peerId_conv).c_str()), 0);
00605 return;
00606 } else if (link.is_nil()) {
00607
00608 if (pend->active_ && pend->initiate_connect(this, guard)) {
00609 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00610 "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect\n",
00611 this,
00612 link.in(),
00613 OPENDDS_STRING(peerId_conv).c_str()), 0);
00614 return;
00615 }
00616
00617 } else {
00618 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
00619 "TransportClient(%@) about to add_link[%@] to remote: %C\n",
00620 this,
00621 link.in(),
00622 OPENDDS_STRING(peerId_conv).c_str()), 0);
00623
00624 add_link(link, remote_id);
00625 ok = true;
00626 }
00627
00628
00629
00630 if (!pend->active_) {
00631
00632 for (size_t i = 0; i < pend->impls_.size(); ++i) {
00633 pend->impls_[i]->stop_accepting_or_connecting(this, pend->data_.remote_id_);
00634 }
00635 }
00636
00637 pending_.erase(iter);
00638 pend->removed_ = true;
00639
00640 guard.release();
00641
00642 pending_assoc_timer_->cancel_timer(this, pend);
00643 pending_assoc_timer_->delete_pending_assoc(pend);
00644
00645 transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
00646 }
00647
00648 void
00649 TransportClient::add_link(const DataLink_rch& link, const RepoId& peer)
00650 {
00651 links_.insert_link(link.in());
00652 data_link_index_[peer] = link;
00653
00654 TransportReceiveListener* trl = get_receive_listener();
00655
00656 if (trl) {
00657 link->make_reservation(peer, repo_id_, trl);
00658
00659 } else {
00660 link->make_reservation(peer, repo_id_, get_send_listener());
00661 }
00662 }
00663
00664 void
00665 TransportClient::on_notification_of_connection_deletion(const RepoId& peerId)
00666 {
00667 DBG_ENTRY_LVL("TransportClient","on_notification_of_connection_deletion",6);
00668
00669 GuidConverter peerId_conv(peerId);
00670 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::on_notification_of_connection_deletion "
00671 "TransportClient(%@) connection to %C deleted\n",
00672 this,
00673 OPENDDS_STRING(peerId_conv).c_str()), 5);
00674
00675 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00676
00677 const DataLinkIndex::iterator found = links_waiting_for_on_deleted_callback_.find(peerId);
00678
00679 if (found == links_waiting_for_on_deleted_callback_.end()) {
00680 if (DCPS_debug_level > 4) {
00681 const GuidConverter converter(peerId);
00682 ACE_DEBUG((LM_DEBUG,
00683 ACE_TEXT("(%P|%t) TransportClient::on_notification_of_connection_deletion: ")
00684 ACE_TEXT("no link for remote peer %C\n"),
00685 OPENDDS_STRING(converter).c_str()));
00686 }
00687
00688 return;
00689 }
00690
00691 const DataLink_rch link = found->second;
00692
00693
00694 links_waiting_for_on_deleted_callback_.erase(found);
00695
00696 link->remove_listener(repo_id_);
00697 }
00698
00699 void
00700 TransportClient::stop_associating()
00701 {
00702 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00703
00704 PendingMap::iterator iter = pending_.begin();
00705
00706 while (iter != pending_.end()) {
00707 iter->second->removed_ = true;
00708 ++iter;
00709 }
00710 }
00711
00712 void
00713 TransportClient::stop_associating(const GUID_t* repos, CORBA::ULong length)
00714 {
00715 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00716
00717 if (repos == 0 || length == 0) {
00718 return;
00719 } else {
00720 for (CORBA::ULong i = 0; i < length; ++i) {
00721 PendingMap::iterator iter = pending_.find(repos[i]);
00722
00723 if (iter != pending_.end()) {
00724 iter->second->removed_ = true;
00725 }
00726 }
00727 }
00728 }
00729
00730 void
00731 TransportClient::send_final_acks()
00732 {
00733 links_.send_final_acks (get_repo_id());
00734 }
00735
00736 void
00737 TransportClient::disassociate(const RepoId& peerId)
00738 {
00739 GuidConverter peerId_conv(peerId);
00740 VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
00741 "TransportClient(%@) disassociating from %C\n",
00742 this,
00743 OPENDDS_STRING(peerId_conv).c_str()), 5);
00744
00745 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00746
00747 const PendingMap::iterator iter = pending_.find(peerId);
00748
00749 if (iter != pending_.end()) {
00750 iter->second->removed_ = true;
00751 return;
00752 }
00753
00754 const DataLinkIndex::iterator found = data_link_index_.find(peerId);
00755
00756 if (found == data_link_index_.end()) {
00757 if (DCPS_debug_level > 4) {
00758 const GuidConverter converter(peerId);
00759 ACE_DEBUG((LM_DEBUG,
00760 ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00761 ACE_TEXT("no link for remote peer %C\n"),
00762 OPENDDS_STRING(converter).c_str()));
00763 }
00764
00765 return;
00766 }
00767
00768 const DataLink_rch link = found->second;
00769
00770
00771
00772 data_link_index_.erase(found);
00773 DataLinkSetMap released;
00774
00775 if (DCPS_debug_level > 4) {
00776 ACE_DEBUG((LM_DEBUG,
00777 ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00778 ACE_TEXT("about to release_reservations for link[%@] \n"),
00779 link.in()));
00780 }
00781
00782 link->release_reservations(peerId, repo_id_, released);
00783
00784 if (!released.empty()) {
00785
00786 if (DCPS_debug_level > 4) {
00787 ACE_DEBUG((LM_DEBUG,
00788 ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
00789 ACE_TEXT("about to remove_link[%@] from links_\n"),
00790 link.in()));
00791 }
00792 links_.remove_link(link);
00793
00794 if (link->issues_on_deleted_callback()) {
00795 if (DCPS_debug_level > 4) {
00796 GuidConverter converter(repo_id_);
00797 ACE_DEBUG((LM_DEBUG,
00798 ACE_TEXT("(%P|%t) TransportClient::disassociate: wait for connection deleted callback for %C on link[%@]\n"),
00799 OPENDDS_STRING(converter).c_str(),
00800 link.in()));
00801 }
00802 links_waiting_for_on_deleted_callback_[peerId] = link;
00803 } else {
00804 if (DCPS_debug_level > 4) {
00805 GuidConverter converter(repo_id_);
00806 ACE_DEBUG((LM_DEBUG,
00807 ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
00808 OPENDDS_STRING(converter).c_str(),
00809 link.in()));
00810 }
00811
00812 link->remove_listener(repo_id_);
00813 }
00814 }
00815 }
00816
00817 void
00818 TransportClient::register_for_reader(const RepoId& participant,
00819 const RepoId& writerid,
00820 const RepoId& readerid,
00821 const TransportLocatorSeq& locators,
00822 OpenDDS::DCPS::DiscoveryListener* listener)
00823 {
00824 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00825 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00826 pos != limit;
00827 ++pos) {
00828 (*pos)->register_for_reader(participant, writerid, readerid, locators, listener);
00829 }
00830 }
00831
00832 void
00833 TransportClient::unregister_for_reader(const RepoId& participant,
00834 const RepoId& writerid,
00835 const RepoId& readerid)
00836 {
00837 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00838 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00839 pos != limit;
00840 ++pos) {
00841 (*pos)->unregister_for_reader(participant, writerid, readerid);
00842 }
00843 }
00844
00845 void
00846 TransportClient::register_for_writer(const RepoId& participant,
00847 const RepoId& readerid,
00848 const RepoId& writerid,
00849 const TransportLocatorSeq& locators,
00850 DiscoveryListener* listener)
00851 {
00852 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00853 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00854 pos != limit;
00855 ++pos) {
00856 (*pos)->register_for_writer(participant, readerid, writerid, locators, listener);
00857 }
00858 }
00859
00860 void
00861 TransportClient::unregister_for_writer(const RepoId& participant,
00862 const RepoId& readerid,
00863 const RepoId& writerid)
00864 {
00865 ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
00866 for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
00867 pos != limit;
00868 ++pos) {
00869 (*pos)->unregister_for_writer(participant, readerid, writerid);
00870 }
00871 }
00872
00873 bool
00874 TransportClient::send_response(const RepoId& peer,
00875 const DataSampleHeader& header,
00876 ACE_Message_Block* payload)
00877 {
00878 DataLinkIndex::iterator found = data_link_index_.find(peer);
00879
00880 if (found == data_link_index_.end()) {
00881 payload->release();
00882
00883 if (DCPS_debug_level > 4) {
00884 GuidConverter converter(peer);
00885 ACE_DEBUG((LM_DEBUG,
00886 ACE_TEXT("(%P|%t) TransportClient::send_response: ")
00887 ACE_TEXT("no link for publication %C, ")
00888 ACE_TEXT("not sending response.\n"),
00889 OPENDDS_STRING(converter).c_str()));
00890 }
00891
00892 return false;
00893 }
00894
00895 DataLinkSet singular;
00896 singular.insert_link(found->second.in());
00897 singular.send_response(peer, header, payload);
00898 return true;
00899 }
00900
00901 void
00902 TransportClient::send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
00903 {
00904 if (send_list.head() == 0) {
00905 return;
00906 }
00907 ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
00908 send_i(send_list, transaction_id);
00909 }
00910
00911 SendControlStatus
00912 TransportClient::send_w_control(SendStateDataSampleList send_list,
00913 const DataSampleHeader& header,
00914 ACE_Message_Block* msg,
00915 const RepoId& destination)
00916 {
00917 ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
00918 send_transaction_lock_, SEND_CONTROL_ERROR);
00919 if (send_list.head()) {
00920 send_i(send_list, 0);
00921 }
00922 return send_control_to(header, msg, destination);
00923 }
00924
00925 void
00926 TransportClient::send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
00927 {
00928 if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
00929 if (transaction_id > max_transaction_id_seen_) {
00930 max_transaction_id_seen_ = transaction_id;
00931 max_transaction_tail_ = send_list.tail();
00932 }
00933 return;
00934 } else {
00935
00936 DataSampleElement* cur = send_list.head();
00937 if (max_transaction_tail_ == 0) {
00938
00939 if (transaction_id != 0)
00940 max_transaction_id_seen_ = expected_transaction_id_;
00941
00942 max_transaction_tail_ = send_list.tail();
00943 }
00944 DataLinkSet send_links;
00945
00946 while (true) {
00947
00948
00949
00950
00951
00952
00953
00954
00955
00956 DataSampleElement* next_elem;
00957 if (cur != max_transaction_tail_) {
00958 next_elem = cur->get_next_send_sample();
00959 } else {
00960 next_elem = max_transaction_tail_;
00961 }
00962 DataLinkSet_rch pub_links =
00963 (cur->get_num_subs() > 0)
00964 ? links_.select_links(cur->get_sub_ids(), cur->get_num_subs())
00965 : DataLinkSet_rch(&links_, false);
00966
00967 if (pub_links.is_nil() || pub_links->empty()) {
00968
00969
00970
00971 if (DCPS_debug_level > 4) {
00972 GuidConverter converter(cur->get_pub_id());
00973 ACE_DEBUG((LM_DEBUG,
00974 ACE_TEXT("(%P|%t) TransportClient::send_i: ")
00975 ACE_TEXT("no links for publication %C, ")
00976 ACE_TEXT("not sending element %@ for transaction: %d.\n"),
00977 OPENDDS_STRING(converter).c_str(),
00978 cur,
00979 cur->transaction_id()));
00980 }
00981
00982
00983
00984
00985 cur->get_send_listener()->data_delivered(cur);
00986
00987 } else {
00988 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
00989 , cur), 5);
00990
00991 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00992
00993
00994
00995
00996
00997
00998
00999 if (cur->filter_out_.ptr()) {
01000 DataLinkSet_rch subset;
01001 DataLinkSet::GuardType guard(pub_links->lock());
01002 typedef DataLinkSet::MapType MapType;
01003 MapType& map = pub_links->map();
01004
01005 for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
01006 size_t n_subs;
01007 GUIDSeq_var ti =
01008 itr->second->target_intersection(cur->get_pub_id(),
01009 cur->filter_out_.in(), n_subs);
01010
01011 if (ti.ptr() == 0 || ti->length() != n_subs) {
01012 if (!subset.in()) {
01013 subset = new DataLinkSet;
01014 }
01015
01016 subset->insert_link(itr->second.in());
01017 cur->filter_per_link_[itr->first] = ti._retn();
01018
01019 } else {
01020 VDBG((LM_DEBUG,
01021 "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
01022 itr->second.in()));
01023 }
01024 }
01025
01026 if (!subset.in()) {
01027 VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
01028
01029 cur->get_send_listener()->data_delivered(cur);
01030 if (cur != max_transaction_tail_) {
01031
01032 cur = next_elem;
01033 continue;
01034 } else {
01035 break;
01036 }
01037 }
01038
01039 pub_links = subset;
01040 }
01041
01042 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
01043
01044
01045
01046
01047
01048
01049
01050
01051 send_links.send_start(pub_links.in());
01052 if (cur->get_header().message_id_ != SAMPLE_DATA) {
01053 pub_links->send_control(cur);
01054 } else {
01055 pub_links->send(cur);
01056 }
01057 }
01058 if (cur != max_transaction_tail_) {
01059
01060 cur = next_elem;
01061 } else {
01062 break;
01063 }
01064 }
01065
01066
01067
01068
01069
01070
01071
01072 RepoId pub_id(this->repo_id_);
01073 send_links.send_stop(pub_id);
01074 if (transaction_id != 0)
01075 expected_transaction_id_ = max_transaction_id_seen_ + 1;
01076 max_transaction_tail_ = 0;
01077 }
01078 }
01079
01080 TransportSendListener*
01081 TransportClient::get_send_listener()
01082 {
01083 return dynamic_cast<TransportSendListener*>(this);
01084 }
01085
01086 TransportReceiveListener*
01087 TransportClient::get_receive_listener()
01088 {
01089 return dynamic_cast<TransportReceiveListener*>(this);
01090 }
01091
01092 SendControlStatus
01093 TransportClient::send_control(const DataSampleHeader& header,
01094 ACE_Message_Block* msg)
01095 {
01096 TransportSendListener* listener = get_send_listener();
01097
01098 return links_.send_control(repo_id_, listener, header, msg);
01099 }
01100
01101 SendControlStatus
01102 TransportClient::send_control_to(const DataSampleHeader& header,
01103 ACE_Message_Block* msg,
01104 const RepoId& destination)
01105 {
01106 DataLinkSet singular;
01107 {
01108 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR);
01109 DataLinkIndex::iterator found = data_link_index_.find(destination);
01110
01111 if (found == data_link_index_.end()) {
01112 msg->release();
01113 return SEND_CONTROL_ERROR;
01114 }
01115
01116 singular.insert_link(found->second.in());
01117 }
01118 return singular.send_control(repo_id_, get_send_listener(), header, msg,
01119 &links_.tsce_allocator());
01120 }
01121
01122 bool
01123 TransportClient::remove_sample(const DataSampleElement* sample)
01124 {
01125 return links_.remove_sample(sample);
01126 }
01127
01128 bool
01129 TransportClient::remove_all_msgs()
01130 {
01131 return links_.remove_all_msgs(repo_id_);
01132 }
01133
01134 }
01135 }