OpenDDS  Snapshot(2023/04/28-20:55)
TransportClient.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include <DCPS/DdsDcps_pch.h> //Only the _pch include should start with DCPS/
9 
10 #include "TransportClient.h"
11 #include "TransportConfig.h"
12 #include "TransportRegistry.h"
13 #include "TransportExceptions.h"
15 
18 #include <dds/DCPS/GuidConverter.h>
19 #include <dds/DCPS/Definitions.h>
20 #include <dds/DCPS/RTPS/ICE/Ice.h>
21 
22 #include <dds/DdsDcpsInfoUtilsC.h>
23 
25 
26 #include <algorithm>
27 #include <iterator>
28 
30 
31 namespace OpenDDS {
32 namespace DCPS {
33 
35  : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
36  , expected_transaction_id_(1)
37  , max_transaction_id_seen_(0)
38  , max_transaction_tail_(0)
39  , swap_bytes_(false)
40  , cdr_encapsulation_(false)
41  , reliable_(false)
42  , durable_(false)
43  , reverse_lock_(lock_)
44  , repo_id_(GUID_UNKNOWN)
45 {
46 }
47 
49 {
50  if (Transport_debug_level > 5) {
51  LogGuid logger(repo_id_);
53  ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
54  logger.c_str()));
55  }
56 
58 
60 
61  for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end(); ++it) {
62  for (size_t i = 0; i < impls_.size(); ++i) {
63  TransportImpl_rch impl = impls_[i].lock();
64  if (impl) {
65  impl->stop_accepting_or_connecting(it->second->client_, it->second->data_.remote_id_, false, false);
66  }
67  }
68  }
69 }
70 
71 void
73 {
74  for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end();) {
75  if (it->second->safe_to_remove()) {
76  prev_pending_.erase(it++);
77  } else {
78  ++it;
79  }
80  }
81 }
82 
83 void
84 TransportClient::enable_transport(bool reliable, bool durable)
85 {
86  // Search for a TransportConfig to use:
88 
89  // 1. If this object is an Entity, check if a TransportConfig has been
90  // bound either directly to this entity or to a parent entity.
91  for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this));
92  ent && tc.is_nil(); ent = ent->parent()) {
93  tc = ent->transport_config();
94  }
95 
96  if (tc.is_nil()) {
98  // 2. Check for a TransportConfig that is the default for this Domain.
99  tc = reg->domain_default_config(domain_id());
100 
101  if (tc.is_nil()) {
102  // 3. Use the global_config if one has been set.
103  tc = reg->global_config();
104 
105  if (!tc.is_nil() && tc->instances_.empty()
107  // 4. Set the "fallback option" if the global_config is empty.
108  // (only applies if the user hasn't changed the global config)
109  tc = reg->fix_empty_default();
110  }
111  }
112  }
113 
114  if (tc.is_nil()) {
116  ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
117  ACE_TEXT("No TransportConfig found.\n")));
118  throw Transport::NotConfigured();
119  }
120 
121  enable_transport_using_config(reliable, durable, tc);
122 }
123 
124 void
126  const TransportConfig_rch& tc)
127 {
128  config_ = tc;
129  swap_bytes_ = tc->swap_bytes_;
130  reliable_ = reliable;
131  durable_ = durable;
132  unsigned long duration = tc->passive_connect_duration_;
133  if (duration == 0) {
135  if (DCPS_debug_level) {
137  ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
138  ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
139  ACE_TEXT("default value\n")));
140  }
141  }
143 
145 
146  const size_t n = tc->instances_.size();
147 
148  for (size_t i = 0; i < n; ++i) {
149  TransportInst_rch inst = tc->instances_[i];
150 
151  if (check_transport_qos(*inst)) {
152  TransportImpl_rch impl = inst->get_or_create_impl();
153 
154  if (impl) {
155  impls_.push_back(impl);
156 
157 #if defined(OPENDDS_SECURITY)
159 #endif
160 
162  }
163  }
164  }
165 
166  if (impls_.empty()) {
168  ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
169  ACE_TEXT("No TransportImpl could be created.\n")));
170  throw Transport::NotConfigured();
171  }
172 }
173 
174 void
176 {
177  conn_info_.length(0);
178 
179  const size_t n = config_->instances_.size();
180  for (size_t i = 0; i < n; ++i) {
182  if (check_transport_qos(*inst)) {
183  TransportImpl_rch impl = inst->get_or_create_impl();
184  if (impl) {
185  const CORBA::ULong idx = DCPS::grow(conn_info_) - 1;
187  }
188  }
189  }
190 
191  if (conn_info_.length() == 0) {
193  ACE_TEXT("(%P|%t) TransportClient::populate_connection_info: ")
194  ACE_TEXT("No connection info\n")));
195  }
196 }
197 
198 bool
200 {
201  GUID_t repo_id = get_guid();
202 
203  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
204 
205  repo_id_ = repo_id;
207 
208  if (impls_.empty()) {
209  if (DCPS_debug_level) {
210  LogGuid writer_log(repo_id_);
211  LogGuid reader_log(data.remote_id_);
212  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
213  ACE_TEXT("local %C remote %C no available impls\n"),
214  writer_log.c_str(),
215  reader_log.c_str()));
216  }
217  return false;
218  }
219 
220  bool all_impls_shut_down = true;
221  for (size_t i = 0; i < impls_.size(); ++i) {
222  TransportImpl_rch impl = impls_[i].lock();
223  if (impl && !impl->is_shut_down()) {
224  all_impls_shut_down = false;
225  break;
226  }
227  }
228 
229  if (all_impls_shut_down) {
230  if (DCPS_debug_level) {
231  LogGuid writer_log(repo_id_);
232  LogGuid reader_log(data.remote_id_);
233  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
234  ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
235  writer_log.c_str(),
236  reader_log.c_str()));
237  }
238  return false;
239  }
240 
242 
243  PendingMap::iterator iter = pending_.find(data.remote_id_);
244 
245  if (iter == pending_.end()) {
246  GUID_t remote_copy(data.remote_id_);
247  PendingAssoc_rch pa = make_rch<PendingAssoc>(rchandle_from(this));
248  pa->active_ = active;
249  pa->impls_.clear();
250  pa->blob_index_ = 0;
251  pa->data_ = data;
257  pa->attribs_.max_sn_ = get_max_sn();
258  iter = pending_.insert(std::make_pair(remote_copy, pa)).first;
259 
260  LogGuid tc_assoc_log(repo_id_);
261  LogGuid remote_log(data.remote_id_);
262  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
263  "between %C and remote %C\n",
264  tc_assoc_log.c_str(),
265  remote_log.c_str()), 0);
266  } else {
267 
269  ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
270  ACE_TEXT("already associating with remote.\n")));
271 
272  return false;
273 
274  }
275 
276  PendingAssoc_rch pend = iter->second;
277 
278  if (active) {
279  ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
280  pend->impls_.reserve(impls_.size());
281  std::reverse_copy(impls_.begin(), impls_.end(),
282  std::back_inserter(pend->impls_));
283 
284  return pend->initiate_connect(this, guard);
285 
286  } else { // passive
287 
288  // call accept_datalink for each impl / blob pair of the same type
289  for (size_t i = 0; i < impls_.size(); ++i) {
290  // Release the PendingAssoc object's mutex_ since the nested for-loop does not access
291  // the PendingAssoc object directly and the functions called by the nested loop can
292  // lead to a PendingAssoc object's mutex_ being acquired, which will cause deadlock if
293  // it is not released here.
295  TransportImpl_rch impl = impls_[i].lock();
296  {
297  ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
298  pend->impls_.push_back(impl);
299  attribs = pend->attribs_;
300  }
301  const OPENDDS_STRING type = impl->transport_type();
302 
303  for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
304  if (data.remote_data_[j].transport_type.in() == type) {
305  const TransportImpl::RemoteTransport remote = {
308  data.remote_reliable_, data.remote_durable_};
309 
311  {
312  // This thread acquired lock_ at the beginning of this method.
313  // Calling accept_datalink might require getting the lock for the transport's reactor.
314  // If the current thread is not an event handler for the transport's reactor, e.g.,
315  // the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
316  // Event handlers in the transport reactor may call passive_connection which calls use_datalink
317  // which acquires lock_. The locking order in this case is transport reactor lock -> lock_.
318  // To avoid deadlock, we must reverse the lock.
320  ACE_GUARD_RETURN(Reverse_Lock_t, rev_tc_guard, reverse_lock_, false);
321  res = impl->accept_datalink(remote, attribs, client);
322  }
323 
324  //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
325  iter = pending_.find(data.remote_id_);
326 
327  if (iter == pending_.end()) {
328  //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
329  //active side connection and completed, thus pend was removed from pending_. Can return true.
330  return true;
331  }
332  pend = iter->second;
333 
334  if (res.success_) {
335  if (res.link_.is_nil()) {
336  // In this case, it may be waiting for the TCP connection to be
337  // established. Just wait without trying other transports.
338  pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
339  } else {
340  use_datalink_i(data.remote_id_, res.link_, guard);
341  return true;
342  }
343  }
344  }
345  }
346  }
347 
348  pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
349  }
350 
351  return true;
352 }
353 
354 void
356  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
357  client_.reset();
358 }
359 
360 bool
362  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
363  return !client_ && !scheduled_;
364 }
365 
366 int
368  const void* arg)
369 {
370  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
371 
373  {
374  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
375  client = client_.lock();
376  scheduled_ = false;
377  }
378 
379  if (client && client.get() == static_cast<TransportClient*>(const_cast<void*>(arg))) {
380  client->use_datalink(data_.remote_id_, DataLink_rch());
381  }
382  return 0;
383 }
384 
385 bool
387  TransportImpl_rch impl,
388  const TransportImpl::RemoteTransport& remote,
389  const TransportImpl::ConnectionAttribs& attribs_,
390  Guard& guard)
391 {
392  if (!guard.locked()) {
393  //don't own the lock_ so can't release it...shouldn't happen
394  LogGuid local_log(repo_id_);
395  LogGuid remote_log(remote.repo_id_);
396  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i ")
397  ACE_TEXT("between local %C and remote %C unsuccessful because ")
398  ACE_TEXT("guard was not locked\n"),
399  local_log.c_str(),
400  remote_log.c_str()), 0);
401  return false;
402  }
403 
404  {
405  //can't call connect while holding lock due to possible reactor deadlock
406  LogGuid local_log(repo_id_);
407  LogGuid remote_log(remote.repo_id_);
408  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
409  "attempt to connect_datalink between local %C and remote %C\n",
410  local_log.c_str(),
411  remote_log.c_str()), 0);
412  {
413  TransportImpl::ConnectionAttribs attribs = attribs_;
415  ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
416  result = impl->connect_datalink(remote, attribs, client);
417  }
418  if (!result.success_) {
419  if (DCPS_debug_level) {
420  LogGuid writer_log(repo_id_);
421  LogGuid reader_log(remote.repo_id_);
422  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i - ")
423  ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
424  writer_log.c_str(),
425  reader_log.c_str()));
426  }
427  return false;
428  }
429  }
430 
431  LogGuid local_log(repo_id_);
432  LogGuid remote_log(remote.repo_id_);
433  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
434  "connection between local %C and remote %C initiation successful\n",
435  local_log.c_str(),
436  remote_log.c_str()), 0);
437  return true;
438 }
439 
440 bool
442  Guard& guard)
443 {
444  LogGuid local_log(tc->repo_id_);
445  LogGuid remote_log(data_.remote_id_);
446  VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
447  "between %C and remote %C\n",
448  local_log.c_str(),
449  remote_log.c_str()), 0);
450  // find the next impl / blob entry that have matching types
451  while (!impls_.empty()) {
452  TransportImpl_rch impl = impls_.back().lock();
453  if (!impl) {
454  impls_.pop_back();
455  continue;
456  }
457  const OPENDDS_STRING type = impl->transport_type();
458 
459  for (; blob_index_ < data_.remote_data_.length(); ++blob_index_) {
460  if (data_.remote_data_[blob_index_].transport_type.in() == type) {
461  const TransportImpl::RemoteTransport remote_transport = {
462  data_.remote_id_, data_.remote_data_[blob_index_].data, data_.discovery_locator_.data,
463  data_.participant_discovered_at_, data_.remote_transport_context_,
464  data_.publication_transport_priority_, data_.remote_reliable_, data_.remote_durable_};
465 
467  bool ret;
468  {
469  // Release the PendingAssoc object's mutex_ since initiate_connect_i doesn't need it.
470  Reverse_Lock_t rev_mutex(mutex_);
471  ACE_GUARD_RETURN(Reverse_Lock_t, rev_pend_guard, rev_mutex, false);
472  ret = tc->initiate_connect_i(res, impl, remote_transport, attribs_, guard);
473  }
474  if (!ret) {
475  //tc init connect returned false there is no PendingAssoc left in map because use_datalink_i finished elsewhere
476  //so don't do anything further with pend and return success or failure up to tc's associate
477  if (res.success_ ) {
478  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) PendingAssoc::initiate_connect - ")
479  ACE_TEXT("between %C and remote %C success\n"),
480  local_log.c_str(),
481  remote_log.c_str()), 0);
482  return true;
483  }
484 
485  VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
486  "between %C and remote %C unsuccessful\n",
487  local_log.c_str(),
488  remote_log.c_str()), 0);
489  }
490 
491  if (res.success_) {
492 
493  ++blob_index_;
494 
495  if (!res.link_.is_nil()) {
496 
497  {
498  // use_datalink_i calls PendingAssoc::reset_client which needs the PendingAssoc's mutex_.
499  Reverse_Lock_t rev_mutex(mutex_);
500  ACE_GUARD_RETURN(Reverse_Lock_t, rev_pend_guard, rev_mutex, false);
501  tc->use_datalink_i(data_.remote_id_, res.link_, guard);
502  }
503  } else {
504  VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
505  "resulting link from initiate_connect_i (local: %C to remote: %C) was nil\n",
506  local_log.c_str(),
507  remote_log.c_str()), 0);
508  }
509 
510  return true;
511  } else {
512  VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
513  "result of initiate_connect_i (local: %C to remote: %C) was not success\n",
514  local_log.c_str(),
515  remote_log.c_str()), 0);
516  }
517  }
518  }
519 
520  impls_.pop_back();
521  blob_index_ = 0;
522  }
523 
524  return false;
525 }
526 
527 void
529  const DataLink_rch& link)
530 {
532 
533  use_datalink_i(remote_id, link, guard);
534 }
535 
536 void
538  const DataLink_rch& link,
539  Guard& guard)
540 {
541  // Try to make a local copy of remote_id to use in calls
542  // because the reference could be invalidated if the caller
543  // reference location is deleted (i.e. in stop_accepting_or_connecting
544  // if use_datalink_i was called from passive_connection)
545  // Does changing this from a reference to a local affect anything going forward?
546  GUID_t remote_id(remote_id_ref);
547 
548  LogGuid peerId_log(remote_id);
549  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
550  "TransportClient(%@) using datalink[%@] from %C\n",
551  this,
552  link.in(),
553  peerId_log.c_str()), 0);
554 
555  PendingMap::iterator iter = pending_.find(remote_id);
556 
557  if (iter == pending_.end()) {
558  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
559  "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
560  this,
561  link.in(),
562  peerId_log.c_str()), 0);
563  return;
564  }
565 
566  PendingAssoc_rch pend = iter->second;
567  ACE_GUARD(ACE_Thread_Mutex, pend_guard, pend->mutex_);
568  const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
569  bool ok = false;
570 
571  if (link.is_nil()) {
572 
573  if (pend->active_ && pend->initiate_connect(this, guard)) {
574  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
575  "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect to remote %C\n",
576  this,
577  link.in(),
578  peerId_log.c_str()), 0);
579  return;
580  }
581 
582  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
583  "TransportClient(%@) using datalink[%@] link is nil, since this is passive side, connection to remote %C timed out\n",
584  this,
585  link.in(),
586  peerId_log.c_str()), 0);
587  } else { // link is ready to use
588  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
589  "TransportClient(%@) about to add_link[%@] to remote: %C\n",
590  this,
591  link.in(),
592  peerId_log.c_str()), 0);
593 
594  add_link(link, remote_id);
595  ok = true;
596  }
597 
598  // either link is valid or assoc failed, clean up pending object
599  for (size_t i = 0; i < pend->impls_.size(); ++i) {
600  TransportImpl_rch impl = pend->impls_[i].lock();
601  if (impl) {
602  impl->stop_accepting_or_connecting(*this, pend->data_.remote_id_, false, !ok);
603  }
604  }
605 
606  pend_guard.release();
607  pend->reset_client();
608  pending_assoc_timer_->cancel_timer(pend);
609  prev_pending_.insert(std::make_pair(iter->first, iter->second));
610  pending_.erase(iter);
611 
612  // Release TransportClient's lock as we're done updating its data.
613  guard.release();
614 
615  transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
616 }
617 
618 void
620 {
621  links_.insert_link(link);
622  data_link_index_[peer] = link;
623 
625 
627  if (trl) {
628  link->make_reservation(peer, repo_id_, trl, reliable_);
629  } else {
631  }
632 }
633 
634 void
636 {
638  for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
639  {
640  // The transport impl may have resource for a pending connection.
641  ACE_Guard<ACE_Thread_Mutex> guard(it->second->mutex_);
642  for (size_t i = 0; i < it->second->impls_.size(); ++i) {
643  TransportImpl_rch impl = it->second->impls_[i].lock();
644  if (impl) {
645  impl->stop_accepting_or_connecting(*this, it->second->data_.remote_id_, true, true);
646  }
647  }
648  }
649  it->second->reset_client();
650  pending_assoc_timer_->cancel_timer(it->second);
651  prev_pending_.insert(std::make_pair(it->first, it->second));
652  }
653  pending_.clear();
654 }
655 
656 void
658 {
660 
661  if (repos == 0 || length == 0) {
662  return;
663  } else {
664  for (CORBA::ULong i = 0; i < length; ++i) {
665  PendingMap::iterator iter = pending_.find(repos[i]);
666  if (iter != pending_.end()) {
667  {
668  // The transport impl may have resource for a pending connection.
669  ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
670  for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
671  TransportImpl_rch impl = iter->second->impls_[i].lock();
672  if (impl) {
673  impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
674  }
675  }
676  }
677  iter->second->reset_client();
678  pending_assoc_timer_->cancel_timer(iter->second);
679  prev_pending_.insert(std::make_pair(iter->first, iter->second));
680  pending_.erase(iter);
681  }
682  }
683  }
684 }
685 
686 void
688 {
690 }
691 
692 void
694 {
695  LogGuid peerId_log(peerId);
696  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
697  "TransportClient(%@) disassociating from %C\n",
698  this,
699  peerId_log.c_str()), 5);
700 
702 
703  PendingMap::iterator iter = pending_.find(peerId);
704  if (iter != pending_.end()) {
705  {
706  // The transport impl may have resource for a pending connection.
707  ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
708  for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
709  TransportImpl_rch impl = iter->second->impls_[i].lock();
710  if (impl) {
711  impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
712  }
713  }
714  }
715  iter->second->reset_client();
716  pending_assoc_timer_->cancel_timer(iter->second);
717  prev_pending_.insert(std::make_pair(iter->first, iter->second));
718  pending_.erase(iter);
719  return;
720  }
721 
722  const DataLinkIndex::iterator found = data_link_index_.find(peerId);
723 
724  if (found == data_link_index_.end()) {
725  if (DCPS_debug_level > 4) {
726  const LogGuid log(peerId);
728  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
729  ACE_TEXT("no link for remote peer %C\n"),
730  log.c_str()));
731  }
732 
733  return;
734  }
735 
736  const DataLink_rch link = found->second;
737 
738  //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
739  //otherwise it could be removed in transport_detached()
740  data_link_index_.erase(found);
741  DataLinkSetMap released;
742 
743  if (DCPS_debug_level > 4) {
745  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
746  ACE_TEXT("about to release_reservations for link[%@]\n"),
747  link.in()));
748  }
749 
751  link->release_reservations(peerId, repo_id_, released);
752 
753  if (!released.empty()) {
754 
755  if (DCPS_debug_level > 4) {
757  ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
758  ACE_TEXT("about to remove_link[%@] from links_\n"),
759  link.in()));
760  }
761  links_.remove_link(link);
762 
763  if (DCPS_debug_level > 4) {
764  LogGuid logger(repo_id_);
766  ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
767  logger.c_str(),
768  link.in()));
769  }
770  // Datalink is no longer used for any remote peer by this TransportClient
771  link->remove_listener(repo_id_);
772 
773  }
774 }
775 
777 {
779  const ImplsType impls = impls_;
780  const GUID_t repo_id = repo_id_;
781  guard.release();
782 
783  if (repo_id == GUID_UNKNOWN) {
784  // Not associated so nothing to stop.
785  return;
786  }
787 
788  for (size_t i = 0; i < impls.size(); ++i) {
789  const TransportImpl_rch impl = impls[i].lock();
790  if (impl) {
791  impl->client_stop(repo_id);
792  }
793  }
794 }
795 
796 void
798  const GUID_t& writerid,
799  const GUID_t& readerid,
800  const TransportLocatorSeq& locators,
802 {
804  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
805  pos != limit;
806  ++pos) {
807  TransportImpl_rch impl = pos->lock();
808  if (impl) {
809  impl->register_for_reader(participant, writerid, readerid, locators, listener);
810  }
811  }
812 }
813 
814 void
816  const GUID_t& writerid,
817  const GUID_t& readerid)
818 {
820  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
821  pos != limit;
822  ++pos) {
823  TransportImpl_rch impl = pos->lock();
824  if (impl) {
825  impl->unregister_for_reader(participant, writerid, readerid);
826  }
827  }
828 }
829 
830 void
832  const GUID_t& readerid,
833  const GUID_t& writerid,
834  const TransportLocatorSeq& locators,
835  DiscoveryListener* listener)
836 {
838  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
839  pos != limit;
840  ++pos) {
841  TransportImpl_rch impl = pos->lock();
842  if (impl) {
843  impl->register_for_writer(participant, readerid, writerid, locators, listener);
844  }
845  }
846 }
847 
848 void
850  const GUID_t& readerid,
851  const GUID_t& writerid)
852 {
854  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
855  pos != limit;
856  ++pos) {
857  TransportImpl_rch impl = pos->lock();
858  if (impl) {
859  impl->unregister_for_writer(participant, readerid, writerid);
860  }
861  }
862 }
863 
864 void
866  const TransportLocatorSeq& locators)
867 {
869  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
870  pos != limit;
871  ++pos) {
872  TransportImpl_rch impl = pos->lock();
873  if (impl) {
874  impl->update_locators(remote, locators);
875  }
876  }
877 }
878 
881 {
882  // The one-to-many relationship with impls implies that this should
883  // return a set of endpoints instead of a single endpoint or null.
884  // For now, we will assume a single impl.
885 
887  for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
888  pos != limit;
889  ++pos) {
890  TransportImpl_rch impl = pos->lock();
891  if (impl) {
893  if (endpoint) { return endpoint; }
894  }
895  }
896 
898 }
899 
900 bool
902  const DataSampleHeader& header,
903  Message_Block_Ptr payload)
904 {
905  DataLinkIndex::iterator found = data_link_index_.find(peer);
906 
907  if (found == data_link_index_.end()) {
908  if (DCPS_debug_level > 4) {
909  LogGuid logger(peer);
911  ACE_TEXT("(%P|%t) TransportClient::send_response: ")
912  ACE_TEXT("no link for publication %C, ")
913  ACE_TEXT("not sending response.\n"),
914  logger.c_str()));
915  }
916 
917  return false;
918  }
919 
920  DataLinkSet singular;
921  singular.insert_link(found->second);
922  singular.send_response(peer, header, move(payload));
923  return true;
924 }
925 
926 void
928 {
929  if (send_list.head() == 0) {
930  return;
931  }
932  ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
933  send_i(send_list, transaction_id);
934 }
935 
938  const DataSampleHeader& header,
939  Message_Block_Ptr msg,
940  const GUID_t& destination)
941 {
942  ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
944  if (send_list.head()) {
945  send_i(send_list, 0);
946  }
947  return send_control_to(header, move(msg), destination);
948 }
949 
950 void
952 {
953  if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
954  if (transaction_id > max_transaction_id_seen_) {
955  max_transaction_id_seen_ = transaction_id;
956  max_transaction_tail_ = send_list.tail();
957  }
958  return;
959  } else /* transaction_id == expected_transaction_id */ {
960 
961  DataSampleElement* cur = send_list.head();
962  if (max_transaction_tail_ == 0) {
963  //Means no future transaction beat this transaction into send
964  if (transaction_id != 0)
966  // Only send this current transaction
967  max_transaction_tail_ = send_list.tail();
968  }
969  DataLinkSet send_links;
970 
971  while (cur != 0) {
972  // VERY IMPORTANT NOTE:
973  //
974  // We have to be very careful in how we deal with the current
975  // DataSampleElement. The issue is that once we have invoked
976  // data_delivered() on the send_listener_ object, or we have invoked
977  // send() on the pub_links, we can no longer access the current
978  // DataSampleElement!Thus, we need to get the next
979  // DataSampleElement (pointer) from the current element now,
980  // while it is safe.
981  DataSampleElement* next_elem;
982  if (cur != max_transaction_tail_) {
983  next_elem = cur->get_next_send_sample();
984  } else {
985  next_elem = max_transaction_tail_;
986  }
987  DataLinkSet_rch pub_links =
988  (cur->get_num_subs() > 0)
991 
992  if (pub_links.is_nil() || pub_links->empty()) {
993  // NOTE: This is the "local publisher id is not currently
994  // associated with any remote subscriber ids" case.
995 
996  if (DCPS_debug_level > 4) {
997  LogGuid logger(cur->get_pub_id());
999  ACE_TEXT("(%P|%t) TransportClient::send_i: ")
1000  ACE_TEXT("no links for publication %C, ")
1001  ACE_TEXT("not sending element %@ for transaction: %d.\n"),
1002  logger.c_str(),
1003  cur,
1004  cur->transaction_id()));
1005  }
1006 
1007  // We tell the send_listener_ that all of the remote subscriber ids
1008  // that wanted the data (all zero of them) have indeed received
1009  // the data.
1010  cur->get_send_listener()->data_delivered(cur);
1011 
1012  } else {
1013  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
1014  , cur), 5);
1015 
1016 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
1017 
1018  // Content-Filtering adjustment to the pub_links:
1019  // - If the sample should be filtered out of all subscriptions on a given
1020  // DataLink, then exclude that link from the subset that we'll send to.
1021  // - If the sample should be filtered out of some (or none) of the subs,
1022  // then record that information in the DataSampleElement so that the
1023  // header's content_filter_entries_ can be marshaled before it's sent.
1024  if (cur->filter_out_.ptr()) {
1025  DataLinkSet_rch subset;
1026  DataLinkSet::GuardType guard(pub_links->lock());
1027  typedef DataLinkSet::MapType MapType;
1028  MapType& map = pub_links->map();
1029 
1030  for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
1031  size_t n_subs;
1032  GUIDSeq_var ti =
1033  itr->second->target_intersection(cur->get_pub_id(),
1034  cur->filter_out_.in(), n_subs);
1035 
1036  if (ti.ptr() == 0 || ti->length() != n_subs) {
1037  if (!subset.in()) {
1038  subset = make_rch<DataLinkSet>();
1039  }
1040 
1041  subset->insert_link(itr->second);
1042  cur->filter_per_link_[itr->first] = ti._retn();
1043 
1044  } else {
1045  VDBG((LM_DEBUG,
1046  "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
1047  itr->second.in()));
1048  }
1049  }
1050 
1051  if (!subset.in()) {
1052  guard.release();
1053  VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
1054  // similar to the "if (pub_links.is_nil())" case above, no links
1055  cur->get_send_listener()->data_delivered(cur);
1056  if (cur != max_transaction_tail_) {
1057  // Move on to the next DataSampleElement to send.
1058  cur = next_elem;
1059  continue;
1060  } else {
1061  break;
1062  }
1063  }
1064 
1065  pub_links = subset;
1066  }
1067 
1068 #endif
1069 
1070  // This will do several things, including adding to the membership
1071  // of the send_links set. Any DataLinks added to the send_links
1072  // set will be also told about the send_start() event. Those
1073  // DataLinks (in the pub_links set) that are already in the
1074  // send_links set will not be told about the send_start() event
1075  // since they heard about it when they were inserted into the
1076  // send_links set.
1077  send_links.send_start(pub_links.in());
1078  if (cur->get_header().message_id_ != SAMPLE_DATA) {
1079  pub_links->send_control(cur);
1080  } else {
1081  pub_links->send(cur);
1082  }
1083  }
1084  if (cur != max_transaction_tail_) {
1085  // Move on to the next DataSampleElement to send.
1086  cur = next_elem;
1087  } else {
1088  break;
1089  }
1090  }
1091 
1092  // This will inform each DataLink in the set about the stop_send() event.
1093  // It will then clear the send_links_ set.
1094  //
1095  // The reason that the send_links_ set is cleared is because we continually
1096  // reuse the same send_links_ object over and over for each call to this
1097  // send method.
1098  GUID_t pub_id = repo_id();
1099  send_links.send_stop(pub_id);
1100  if (transaction_id != 0) {
1102  }
1104  }
1105 }
1106 
1109 {
1110  return rchandle_from(dynamic_cast<TransportSendListener*>(this));
1111 }
1112 
1115 {
1116  return rchandle_from(dynamic_cast<TransportReceiveListener*>(this));
1117 }
1118 
1121  Message_Block_Ptr msg)
1122 {
1123  if (repo_id_ == GUID_UNKNOWN) {
1124  return SEND_CONTROL_OK;
1125  }
1126  return links_.send_control(repo_id_, get_send_listener(), header, move(msg));
1127 }
1128 
1131  Message_Block_Ptr msg,
1132  const GUID_t& destination)
1133 {
1134  if (repo_id_ == GUID_UNKNOWN) {
1135  return SEND_CONTROL_OK;
1136  }
1137 
1138  DataLinkSet singular;
1139  {
1141  DataLinkIndex::iterator found = data_link_index_.find(destination);
1142 
1143  if (found == data_link_index_.end()) {
1144  return SEND_CONTROL_ERROR;
1145  }
1146 
1147  singular.insert_link(found->second);
1148  }
1149  return singular.send_control(repo_id_, get_send_listener(), header, move(msg));
1150 }
1151 
1152 bool
1154 {
1155  return links_.remove_sample(sample);
1156 }
1157 
1158 bool
1160 {
1161  if (repo_id_ == GUID_UNKNOWN) {
1162  return true;
1163  }
1165 }
1166 
1168 {
1170 }
1171 
1172 bool TransportClient::associated_with(const GUID_t& remote) const
1173 {
1175  if (!guard.locked()) {
1176  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::associated_with: "
1177  "lock failed\n"));
1178  return false;
1179  }
1180  return data_link_index_.count(remote);
1181 }
1182 
1184 {
1186  if (!guard.locked()) {
1187  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::pending_association_with: "
1188  "lock failed\n"));
1189  return false;
1190  }
1191  return pending_.count(remote);
1192 }
1193 
1195 {
1196  TransportSendListener_rch send_listener;
1197  {
1199  if (!guard.locked()) {
1200  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::data_acked: "
1201  "lock failed\n"));
1202  return;
1203  }
1204  send_listener = get_send_listener();
1205  }
1206  send_listener->data_acked(remote);
1207 }
1208 
1209 bool TransportClient::is_leading(const GUID_t& reader_id) const
1210 {
1211  return links_.is_leading(get_guid(), reader_id);
1212 }
1213 
1214 
1215 } // namepsace DCPS
1216 } // namepsace OpenDDS
1217 
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
#define ACE_DEBUG(X)
bool connection_info(TransportLocator &local_info, ConnectionInfoFlags flags) const
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
TransportImpl_rch get_or_create_impl()
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const DataSampleHeader & get_header() const
virtual SequenceNumber get_max_sn() const
SendControlStatus send_control_to(const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
char message_id_
The enum MessageId.
SequenceBackInsertIterator< Sequence > back_inserter(Sequence &seq)
virtual GUID_t get_guid() const =0
void enable_transport(bool reliable, bool durable)
TransportSendListener * get_send_listener() const
void remove_link(const DataLink_rch &link)
Definition: DataLinkSet.cpp:50
virtual bool check_transport_qos(const TransportInst &inst)=0
SendControlStatus send_w_control(SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
void send_final_acks(const GUID_t &readerid)
bool remove_sample(const DataSampleElement *sample)
TransportLocatorSeq remote_data_
GUIDSeq_var filter_out_
tracking for Content-Filtering data
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)=0
static const char DEFAULT_CONFIG_NAME[]
MonotonicTime_t participant_discovered_at_
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
const char * c_str() const
virtual void client_stop(const GUID_t &)
void send_stop(GUID_t repoId)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
TransportLocator discovery_locator_
ACE_CDR::ULong remote_transport_context_
static const ConnectionInfoFlags CONNINFO_ALL
void domain_default_config(DDS::DomainId_t domain, const TransportConfig_rch &cfg)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
sequence< TransportLocator > TransportLocatorSeq
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
DataSampleElement * get_next_send_sample() const
bool is_leading(const GUID_t &writer_id, const GUID_t &reader_id) const
virtual void local_crypto_handle(DDS::Security::ParticipantCryptoHandle)
int release(void)
void disassociate(const GUID_t &peerId)
ACE_Guard< ACE_Thread_Mutex > lock_
TransportImpl::ConnectionAttribs attribs_
TransportConfig_rch global_config() const
TransportReceiveListener_rch get_receive_listener()
void unregister_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
DataSampleElement * max_transaction_tail_
#define OPENDDS_STRING
virtual Priority get_priority_value(const AssociationData &data) const =0
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
bool remove_sample(const DataSampleElement *sample)
LM_DEBUG
bool send_response(const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
ACE_CDR::ULong ULong
int insert_link(const DataLink_rch &link)
Definition: DataLinkSet.cpp:42
static TimeDuration from_msec(const ACE_UINT64 &ms)
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
Definition: TransportImpl.h:97
RcHandle< DataLinkSet > DataLinkSet_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLinkSet.h:27
#define VDBG(DBG_ARGS)
void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
DataLinkSet_rch select_links(const GUID_t *remoteIds, const CORBA::ULong num_targets)
Definition: DataLinkSet.cpp:64
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
bool associated_with(const GUID_t &remote) const
RcHandle< PendingAssocTimer > pending_assoc_timer_
TransportLocatorSeq conn_info_
bool pending_association_with(const GUID_t &remote) const
virtual OPENDDS_STRING transport_type() const =0
void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
bool locked(void) const
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
bool initiate_connect_i(TransportImpl::AcceptConnectResult &result, TransportImpl_rch impl, const TransportImpl::RemoteTransport &remote, const TransportImpl::ConnectionAttribs &attribs_, Guard &guard)
virtual DDS::DomainId_t domain_id() const =0
void data_acked(const GUID_t &remote)
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
LM_WARNING
virtual void transport_assoc_done(int, const GUID_t &)
DataLinkIdTypeGUIDMap filter_per_link_
OPENDDS_STRING name() const
bool is_leading(const GUID_t &reader_id) const
void use_datalink(const GUID_t &remote_id, const DataLink_rch &link)
Mix-in class for DDS entities which directly use the transport layer.
void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
ACE_TEXT("TCP_Factory")
WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
bool associate(const AssociationData &peer, bool active)
virtual bool requires_cdr_encapsulation() const
Does the transport require a CDR-encapsulated data payload?
unsigned long long ACE_UINT64
void remove_listener(const GUID_t &local_id)
Definition: DataLink.inl:289
TransportConfig_rch fix_empty_default()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual void unregister_for_reader(const GUID_t &, const GUID_t &, const GUID_t &)
Definition: TransportImpl.h:93
static const unsigned long DEFAULT_PASSIVE_CONNECT_DURATION
virtual void register_for_reader(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, OpenDDS::DCPS::DiscoveryListener *)
Definition: TransportImpl.h:87
bool initiate_connect(TransportClient *tc, Guard &guard)
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static TransportRegistry * instance()
Return a singleton instance of this class.
int handle_timeout(const ACE_Time_Value &time, const void *arg)
bool remove_all_msgs(const GUID_t &pub_id)
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
void register_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
const OpenDDS::DCPS::GUID_t * get_sub_ids() const
void enable_transport_using_config(bool reliable, bool durable, const TransportConfig_rch &tc)
void send_control(DataSampleElement *sample)
Send a control message that is wrapped in a DataSampleElement.
Definition: DataLinkSet.inl:81
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)=0
void send_response(GUID_t sub_id, const DataSampleHeader &header, Message_Block_Ptr response)
#define TheServiceParticipant
bool success_
If false, the accept or connect has failed and link_ is ignored.
void release_reservations(GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
Definition: DataLink.cpp:512
LM_ERROR
void send_start(DataLinkSet *link_set)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
ACE_Thread_Mutex send_transaction_lock_
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)=0
virtual void data_delivered(const DataSampleElement *sample)
TransportSendListener_rch get_send_listener()
virtual void update_locators(const GUID_t &, const TransportLocatorSeq &)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
SendControlStatus
Return code type for send_control() operations.
void use_datalink_i(const GUID_t &remote_id, const DataLink_rch &link, Guard &guard)