Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include <DCPS/DdsDcps_pch.h> //Only the _pch include should start with DCPS/
9 :
10 : #include "TransportClient.h"
11 : #include "TransportConfig.h"
12 : #include "TransportRegistry.h"
13 : #include "TransportExceptions.h"
14 : #include "TransportReceiveListener.h"
15 :
16 : #include <dds/DCPS/DataWriterImpl.h>
17 : #include <dds/DCPS/SendStateDataSampleList.h>
18 : #include <dds/DCPS/GuidConverter.h>
19 : #include <dds/DCPS/Definitions.h>
20 : #include <dds/DCPS/RTPS/ICE/Ice.h>
21 :
22 : #include <dds/DdsDcpsInfoUtilsC.h>
23 :
24 : #include <ace/Reactor_Timer_Interface.h>
25 :
26 : #include <algorithm>
27 : #include <iterator>
28 :
29 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
30 :
31 : namespace OpenDDS {
32 : namespace DCPS {
33 :
34 3 : TransportClient::TransportClient()
35 3 : : pending_assoc_timer_(make_rch<PendingAssocTimer> (TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner()))
36 3 : , expected_transaction_id_(1)
37 3 : , max_transaction_id_seen_(0)
38 3 : , max_transaction_tail_(0)
39 3 : , swap_bytes_(false)
40 3 : , cdr_encapsulation_(false)
41 3 : , reliable_(false)
42 3 : , durable_(false)
43 3 : , reverse_lock_(lock_)
44 6 : , repo_id_(GUID_UNKNOWN)
45 : {
46 3 : }
47 :
48 3 : TransportClient::~TransportClient()
49 : {
50 3 : if (Transport_debug_level > 5) {
51 0 : LogGuid logger(repo_id_);
52 0 : ACE_DEBUG((LM_DEBUG,
53 : ACE_TEXT("(%P|%t) TransportClient::~TransportClient: %C\n"),
54 : logger.c_str()));
55 0 : }
56 :
57 3 : stop_associating();
58 :
59 3 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
60 :
61 3 : for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end(); ++it) {
62 0 : for (size_t i = 0; i < impls_.size(); ++i) {
63 0 : TransportImpl_rch impl = impls_[i].lock();
64 0 : if (impl) {
65 0 : impl->stop_accepting_or_connecting(it->second->client_, it->second->data_.remote_id_, false, false);
66 : }
67 0 : }
68 : }
69 6 : }
70 :
71 : void
72 0 : TransportClient::clean_prev_pending()
73 : {
74 0 : for (PrevPendingMap::iterator it = prev_pending_.begin(); it != prev_pending_.end();) {
75 0 : if (it->second->safe_to_remove()) {
76 0 : prev_pending_.erase(it++);
77 : } else {
78 0 : ++it;
79 : }
80 : }
81 0 : }
82 :
83 : void
84 0 : TransportClient::enable_transport(bool reliable, bool durable)
85 : {
86 : // Search for a TransportConfig to use:
87 0 : TransportConfig_rch tc;
88 :
89 : // 1. If this object is an Entity, check if a TransportConfig has been
90 : // bound either directly to this entity or to a parent entity.
91 0 : for (RcHandle<EntityImpl> ent = rchandle_from(dynamic_cast<EntityImpl*>(this));
92 0 : ent && tc.is_nil(); ent = ent->parent()) {
93 0 : tc = ent->transport_config();
94 0 : }
95 :
96 0 : if (tc.is_nil()) {
97 0 : TransportRegistry* const reg = TransportRegistry::instance();
98 : // 2. Check for a TransportConfig that is the default for this Domain.
99 0 : tc = reg->domain_default_config(domain_id());
100 :
101 0 : if (tc.is_nil()) {
102 : // 3. Use the global_config if one has been set.
103 0 : tc = reg->global_config();
104 :
105 0 : if (!tc.is_nil() && tc->instances_.empty()
106 0 : && tc->name() == TransportRegistry::DEFAULT_CONFIG_NAME) {
107 : // 4. Set the "fallback option" if the global_config is empty.
108 : // (only applies if the user hasn't changed the global config)
109 0 : tc = reg->fix_empty_default();
110 : }
111 : }
112 : }
113 :
114 0 : if (tc.is_nil()) {
115 0 : ACE_ERROR((LM_ERROR,
116 : ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
117 : ACE_TEXT("No TransportConfig found.\n")));
118 0 : throw Transport::NotConfigured();
119 : }
120 :
121 0 : enable_transport_using_config(reliable, durable, tc);
122 0 : }
123 :
124 : void
125 0 : TransportClient::enable_transport_using_config(bool reliable, bool durable,
126 : const TransportConfig_rch& tc)
127 : {
128 0 : config_ = tc;
129 0 : swap_bytes_ = tc->swap_bytes_;
130 0 : reliable_ = reliable;
131 0 : durable_ = durable;
132 0 : unsigned long duration = tc->passive_connect_duration_;
133 0 : if (duration == 0) {
134 0 : duration = TransportConfig::DEFAULT_PASSIVE_CONNECT_DURATION;
135 0 : if (DCPS_debug_level) {
136 0 : ACE_DEBUG((LM_WARNING,
137 : ACE_TEXT("(%P|%t) TransportClient::enable_transport_using_config ")
138 : ACE_TEXT("passive_connect_duration_ configured as 0, changing to ")
139 : ACE_TEXT("default value\n")));
140 : }
141 : }
142 0 : passive_connect_duration_ = TimeDuration::from_msec(duration);
143 :
144 0 : populate_connection_info();
145 :
146 0 : const size_t n = tc->instances_.size();
147 :
148 0 : for (size_t i = 0; i < n; ++i) {
149 0 : TransportInst_rch inst = tc->instances_[i];
150 :
151 0 : if (check_transport_qos(*inst)) {
152 0 : TransportImpl_rch impl = inst->get_or_create_impl();
153 :
154 0 : if (impl) {
155 0 : impls_.push_back(impl);
156 :
157 : #if defined(OPENDDS_SECURITY)
158 0 : impl->local_crypto_handle(get_crypto_handle());
159 : #endif
160 :
161 0 : cdr_encapsulation_ |= inst->requires_cdr_encapsulation();
162 : }
163 0 : }
164 0 : }
165 :
166 0 : if (impls_.empty()) {
167 0 : ACE_ERROR((LM_ERROR,
168 : ACE_TEXT("(%P|%t) ERROR: TransportClient::enable_transport ")
169 : ACE_TEXT("No TransportImpl could be created.\n")));
170 0 : throw Transport::NotConfigured();
171 : }
172 0 : }
173 :
174 : void
175 0 : TransportClient::populate_connection_info()
176 : {
177 0 : conn_info_.length(0);
178 :
179 0 : const size_t n = config_->instances_.size();
180 0 : for (size_t i = 0; i < n; ++i) {
181 0 : TransportInst_rch inst = config_->instances_[i];
182 0 : if (check_transport_qos(*inst)) {
183 0 : TransportImpl_rch impl = inst->get_or_create_impl();
184 0 : if (impl) {
185 0 : const CORBA::ULong idx = DCPS::grow(conn_info_) - 1;
186 0 : impl->connection_info(conn_info_[idx], CONNINFO_ALL);
187 : }
188 0 : }
189 0 : }
190 :
191 0 : if (conn_info_.length() == 0) {
192 0 : ACE_ERROR((LM_ERROR,
193 : ACE_TEXT("(%P|%t) TransportClient::populate_connection_info: ")
194 : ACE_TEXT("No connection info\n")));
195 : }
196 0 : }
197 :
198 : bool
199 0 : TransportClient::associate(const AssociationData& data, bool active)
200 : {
201 0 : GUID_t repo_id = get_guid();
202 :
203 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, false);
204 :
205 0 : repo_id_ = repo_id;
206 0 : OPENDDS_ASSERT(repo_id_ != GUID_UNKNOWN);
207 :
208 0 : if (impls_.empty()) {
209 0 : if (DCPS_debug_level) {
210 0 : LogGuid writer_log(repo_id_);
211 0 : LogGuid reader_log(data.remote_id_);
212 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
213 : ACE_TEXT("local %C remote %C no available impls\n"),
214 : writer_log.c_str(),
215 : reader_log.c_str()));
216 0 : }
217 0 : return false;
218 : }
219 :
220 0 : bool all_impls_shut_down = true;
221 0 : for (size_t i = 0; i < impls_.size(); ++i) {
222 0 : TransportImpl_rch impl = impls_[i].lock();
223 0 : if (impl && !impl->is_shut_down()) {
224 0 : all_impls_shut_down = false;
225 0 : break;
226 : }
227 0 : }
228 :
229 0 : if (all_impls_shut_down) {
230 0 : if (DCPS_debug_level) {
231 0 : LogGuid writer_log(repo_id_);
232 0 : LogGuid reader_log(data.remote_id_);
233 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::associate - ")
234 : ACE_TEXT("local %C remote %C all available impls previously shutdown\n"),
235 : writer_log.c_str(),
236 : reader_log.c_str()));
237 0 : }
238 0 : return false;
239 : }
240 :
241 0 : clean_prev_pending();
242 :
243 0 : PendingMap::iterator iter = pending_.find(data.remote_id_);
244 :
245 0 : if (iter == pending_.end()) {
246 0 : GUID_t remote_copy(data.remote_id_);
247 0 : PendingAssoc_rch pa = make_rch<PendingAssoc>(rchandle_from(this));
248 0 : pa->active_ = active;
249 0 : pa->impls_.clear();
250 0 : pa->blob_index_ = 0;
251 0 : pa->data_ = data;
252 0 : OPENDDS_ASSERT(repo_id_ != GUID_UNKNOWN);
253 0 : pa->attribs_.local_id_ = repo_id_;
254 0 : pa->attribs_.priority_ = get_priority_value(data);
255 0 : pa->attribs_.local_reliable_ = reliable_;
256 0 : pa->attribs_.local_durable_ = durable_;
257 0 : pa->attribs_.max_sn_ = get_max_sn();
258 0 : iter = pending_.insert(std::make_pair(remote_copy, pa)).first;
259 :
260 0 : LogGuid tc_assoc_log(repo_id_);
261 0 : LogGuid remote_log(data.remote_id_);
262 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::associate added PendingAssoc "
263 : "between %C and remote %C\n",
264 : tc_assoc_log.c_str(),
265 : remote_log.c_str()), 0);
266 0 : } else {
267 :
268 0 : ACE_ERROR((LM_ERROR,
269 : ACE_TEXT("(%P|%t) ERROR: TransportClient::associate ")
270 : ACE_TEXT("already associating with remote.\n")));
271 :
272 0 : return false;
273 :
274 : }
275 :
276 0 : PendingAssoc_rch pend = iter->second;
277 :
278 0 : if (active) {
279 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
280 0 : pend->impls_.reserve(impls_.size());
281 0 : std::reverse_copy(impls_.begin(), impls_.end(),
282 0 : std::back_inserter(pend->impls_));
283 :
284 0 : return pend->initiate_connect(this, guard);
285 :
286 0 : } else { // passive
287 :
288 : // call accept_datalink for each impl / blob pair of the same type
289 0 : for (size_t i = 0; i < impls_.size(); ++i) {
290 : // Release the PendingAssoc object's mutex_ since the nested for-loop does not access
291 : // the PendingAssoc object directly and the functions called by the nested loop can
292 : // lead to a PendingAssoc object's mutex_ being acquired, which will cause deadlock if
293 : // it is not released here.
294 0 : TransportImpl::ConnectionAttribs attribs;
295 0 : TransportImpl_rch impl = impls_[i].lock();
296 : {
297 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, pend_guard, pend->mutex_, false);
298 0 : pend->impls_.push_back(impl);
299 0 : attribs = pend->attribs_;
300 0 : }
301 0 : const OPENDDS_STRING type = impl->transport_type();
302 :
303 0 : for (CORBA::ULong j = 0; j < data.remote_data_.length(); ++j) {
304 0 : if (data.remote_data_[j].transport_type.in() == type) {
305 : const TransportImpl::RemoteTransport remote = {
306 0 : data.remote_id_, data.remote_data_[j].data, data.discovery_locator_.data, data.participant_discovered_at_, data.remote_transport_context_,
307 0 : data.publication_transport_priority_,
308 0 : data.remote_reliable_, data.remote_durable_};
309 :
310 0 : TransportImpl::AcceptConnectResult res;
311 : {
312 : // This thread acquired lock_ at the beginning of this method.
313 : // Calling accept_datalink might require getting the lock for the transport's reactor.
314 : // If the current thread is not an event handler for the transport's reactor, e.g.,
315 : // the ORB's thread, then the order of acquired locks will be lock_ -> transport reactor lock.
316 : // Event handlers in the transport reactor may call passive_connection which calls use_datalink
317 : // which acquires lock_. The locking order in this case is transport reactor lock -> lock_.
318 : // To avoid deadlock, we must reverse the lock.
319 0 : RcHandle<TransportClient> client = rchandle_from(this);
320 0 : ACE_GUARD_RETURN(Reverse_Lock_t, rev_tc_guard, reverse_lock_, false);
321 0 : res = impl->accept_datalink(remote, attribs, client);
322 0 : }
323 :
324 : //NEED to check that pend is still valid here after you re-acquire the lock_ after accepting the datalink
325 0 : iter = pending_.find(data.remote_id_);
326 :
327 0 : if (iter == pending_.end()) {
328 : //If Pending Assoc is no longer in pending_ then use_datalink_i has been called from an
329 : //active side connection and completed, thus pend was removed from pending_. Can return true.
330 0 : return true;
331 : }
332 0 : pend = iter->second;
333 :
334 0 : if (res.success_) {
335 0 : if (res.link_.is_nil()) {
336 : // In this case, it may be waiting for the TCP connection to be
337 : // established. Just wait without trying other transports.
338 0 : pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
339 : } else {
340 0 : use_datalink_i(data.remote_id_, res.link_, guard);
341 0 : return true;
342 : }
343 : }
344 0 : }
345 : }
346 0 : }
347 :
348 0 : pending_assoc_timer_->schedule_timer(rchandle_from(this), iter->second);
349 : }
350 :
351 0 : return true;
352 0 : }
353 :
354 : void
355 0 : TransportClient::PendingAssoc::reset_client() {
356 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
357 0 : client_.reset();
358 0 : }
359 :
360 : bool
361 0 : TransportClient::PendingAssoc::safe_to_remove() {
362 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
363 0 : return !client_ && !scheduled_;
364 0 : }
365 :
366 : int
367 0 : TransportClient::PendingAssoc::handle_timeout(const ACE_Time_Value&,
368 : const void* arg)
369 : {
370 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
371 :
372 0 : RcHandle<TransportClient> client;
373 : {
374 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
375 0 : client = client_.lock();
376 0 : scheduled_ = false;
377 0 : }
378 :
379 0 : if (client && client.get() == static_cast<TransportClient*>(const_cast<void*>(arg))) {
380 0 : client->use_datalink(data_.remote_id_, DataLink_rch());
381 : }
382 0 : return 0;
383 0 : }
384 :
385 : bool
386 0 : TransportClient::initiate_connect_i(TransportImpl::AcceptConnectResult& result,
387 : TransportImpl_rch impl,
388 : const TransportImpl::RemoteTransport& remote,
389 : const TransportImpl::ConnectionAttribs& attribs_,
390 : Guard& guard)
391 : {
392 0 : if (!guard.locked()) {
393 : //don't own the lock_ so can't release it...shouldn't happen
394 0 : LogGuid local_log(repo_id_);
395 0 : LogGuid remote_log(remote.repo_id_);
396 0 : VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i ")
397 : ACE_TEXT("between local %C and remote %C unsuccessful because ")
398 : ACE_TEXT("guard was not locked\n"),
399 : local_log.c_str(),
400 : remote_log.c_str()), 0);
401 0 : return false;
402 0 : }
403 :
404 : {
405 : //can't call connect while holding lock due to possible reactor deadlock
406 0 : LogGuid local_log(repo_id_);
407 0 : LogGuid remote_log(remote.repo_id_);
408 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
409 : "attempt to connect_datalink between local %C and remote %C\n",
410 : local_log.c_str(),
411 : remote_log.c_str()), 0);
412 : {
413 0 : TransportImpl::ConnectionAttribs attribs = attribs_;
414 0 : RcHandle<TransportClient> client = rchandle_from(this);
415 0 : ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, reverse_lock_, false);
416 0 : result = impl->connect_datalink(remote, attribs, client);
417 0 : }
418 0 : if (!result.success_) {
419 0 : if (DCPS_debug_level) {
420 0 : LogGuid writer_log(repo_id_);
421 0 : LogGuid reader_log(remote.repo_id_);
422 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) TransportClient::initiate_connect_i - ")
423 : ACE_TEXT("connect_datalink between local %C remote %C not successful\n"),
424 : writer_log.c_str(),
425 : reader_log.c_str()));
426 0 : }
427 0 : return false;
428 : }
429 0 : }
430 :
431 0 : LogGuid local_log(repo_id_);
432 0 : LogGuid remote_log(remote.repo_id_);
433 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::initiate_connect_i - "
434 : "connection between local %C and remote %C initiation successful\n",
435 : local_log.c_str(),
436 : remote_log.c_str()), 0);
437 0 : return true;
438 0 : }
439 :
440 : bool
441 0 : TransportClient::PendingAssoc::initiate_connect(TransportClient* tc,
442 : Guard& guard)
443 : {
444 0 : LogGuid local_log(tc->repo_id_);
445 0 : LogGuid remote_log(data_.remote_id_);
446 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
447 : "between %C and remote %C\n",
448 : local_log.c_str(),
449 : remote_log.c_str()), 0);
450 : // find the next impl / blob entry that have matching types
451 0 : while (!impls_.empty()) {
452 0 : TransportImpl_rch impl = impls_.back().lock();
453 0 : if (!impl) {
454 0 : impls_.pop_back();
455 0 : continue;
456 : }
457 0 : const OPENDDS_STRING type = impl->transport_type();
458 :
459 0 : for (; blob_index_ < data_.remote_data_.length(); ++blob_index_) {
460 0 : if (data_.remote_data_[blob_index_].transport_type.in() == type) {
461 : const TransportImpl::RemoteTransport remote_transport = {
462 0 : data_.remote_id_, data_.remote_data_[blob_index_].data, data_.discovery_locator_.data,
463 0 : data_.participant_discovered_at_, data_.remote_transport_context_,
464 0 : data_.publication_transport_priority_, data_.remote_reliable_, data_.remote_durable_};
465 :
466 0 : TransportImpl::AcceptConnectResult res;
467 : bool ret;
468 : {
469 : // Release the PendingAssoc object's mutex_ since initiate_connect_i doesn't need it.
470 0 : Reverse_Lock_t rev_mutex(mutex_);
471 0 : ACE_GUARD_RETURN(Reverse_Lock_t, rev_pend_guard, rev_mutex, false);
472 0 : ret = tc->initiate_connect_i(res, impl, remote_transport, attribs_, guard);
473 0 : }
474 0 : if (!ret) {
475 : //tc init connect returned false there is no PendingAssoc left in map because use_datalink_i finished elsewhere
476 : //so don't do anything further with pend and return success or failure up to tc's associate
477 0 : if (res.success_ ) {
478 0 : VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) PendingAssoc::initiate_connect - ")
479 : ACE_TEXT("between %C and remote %C success\n"),
480 : local_log.c_str(),
481 : remote_log.c_str()), 0);
482 0 : return true;
483 : }
484 :
485 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::initiate_connect - "
486 : "between %C and remote %C unsuccessful\n",
487 : local_log.c_str(),
488 : remote_log.c_str()), 0);
489 : }
490 :
491 0 : if (res.success_) {
492 :
493 0 : ++blob_index_;
494 :
495 0 : if (!res.link_.is_nil()) {
496 :
497 : {
498 : // use_datalink_i calls PendingAssoc::reset_client which needs the PendingAssoc's mutex_.
499 0 : Reverse_Lock_t rev_mutex(mutex_);
500 0 : ACE_GUARD_RETURN(Reverse_Lock_t, rev_pend_guard, rev_mutex, false);
501 0 : tc->use_datalink_i(data_.remote_id_, res.link_, guard);
502 0 : }
503 : } else {
504 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
505 : "resulting link from initiate_connect_i (local: %C to remote: %C) was nil\n",
506 : local_log.c_str(),
507 : remote_log.c_str()), 0);
508 : }
509 :
510 0 : return true;
511 : } else {
512 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) PendingAssoc::intiate_connect - "
513 : "result of initiate_connect_i (local: %C to remote: %C) was not success\n",
514 : local_log.c_str(),
515 : remote_log.c_str()), 0);
516 : }
517 0 : }
518 : }
519 :
520 0 : impls_.pop_back();
521 0 : blob_index_ = 0;
522 0 : }
523 :
524 0 : return false;
525 0 : }
526 :
527 : void
528 0 : TransportClient::use_datalink(const GUID_t& remote_id,
529 : const DataLink_rch& link)
530 : {
531 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
532 :
533 0 : use_datalink_i(remote_id, link, guard);
534 0 : }
535 :
536 : void
537 0 : TransportClient::use_datalink_i(const GUID_t& remote_id_ref,
538 : const DataLink_rch& link,
539 : Guard& guard)
540 : {
541 : // Try to make a local copy of remote_id to use in calls
542 : // because the reference could be invalidated if the caller
543 : // reference location is deleted (i.e. in stop_accepting_or_connecting
544 : // if use_datalink_i was called from passive_connection)
545 : // Does changing this from a reference to a local affect anything going forward?
546 0 : GUID_t remote_id(remote_id_ref);
547 :
548 0 : LogGuid peerId_log(remote_id);
549 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
550 : "TransportClient(%@) using datalink[%@] from %C\n",
551 : this,
552 : link.in(),
553 : peerId_log.c_str()), 0);
554 :
555 0 : PendingMap::iterator iter = pending_.find(remote_id);
556 :
557 0 : if (iter == pending_.end()) {
558 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
559 : "TransportClient(%@) using datalink[%@] did not find Pending Association to remote %C\n",
560 : this,
561 : link.in(),
562 : peerId_log.c_str()), 0);
563 0 : return;
564 : }
565 :
566 0 : PendingAssoc_rch pend = iter->second;
567 0 : ACE_GUARD(ACE_Thread_Mutex, pend_guard, pend->mutex_);
568 0 : const int active_flag = pend->active_ ? ASSOC_ACTIVE : 0;
569 0 : bool ok = false;
570 :
571 0 : if (link.is_nil()) {
572 :
573 0 : if (pend->active_ && pend->initiate_connect(this, guard)) {
574 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
575 : "TransportClient(%@) using datalink[%@] link is nil, since this is active side, initiate_connect to remote %C\n",
576 : this,
577 : link.in(),
578 : peerId_log.c_str()), 0);
579 0 : return;
580 : }
581 :
582 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
583 : "TransportClient(%@) using datalink[%@] link is nil, since this is passive side, connection to remote %C timed out\n",
584 : this,
585 : link.in(),
586 : peerId_log.c_str()), 0);
587 : } else { // link is ready to use
588 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::use_datalink_i "
589 : "TransportClient(%@) about to add_link[%@] to remote: %C\n",
590 : this,
591 : link.in(),
592 : peerId_log.c_str()), 0);
593 :
594 0 : add_link(link, remote_id);
595 0 : ok = true;
596 : }
597 :
598 : // either link is valid or assoc failed, clean up pending object
599 0 : for (size_t i = 0; i < pend->impls_.size(); ++i) {
600 0 : TransportImpl_rch impl = pend->impls_[i].lock();
601 0 : if (impl) {
602 0 : impl->stop_accepting_or_connecting(*this, pend->data_.remote_id_, false, !ok);
603 : }
604 0 : }
605 :
606 0 : pend_guard.release();
607 0 : pend->reset_client();
608 0 : pending_assoc_timer_->cancel_timer(pend);
609 0 : prev_pending_.insert(std::make_pair(iter->first, iter->second));
610 0 : pending_.erase(iter);
611 :
612 : // Release TransportClient's lock as we're done updating its data.
613 0 : guard.release();
614 :
615 0 : transport_assoc_done(active_flag | (ok ? ASSOC_OK : 0), remote_id);
616 0 : }
617 :
618 : void
619 0 : TransportClient::add_link(const DataLink_rch& link, const GUID_t& peer)
620 : {
621 0 : links_.insert_link(link);
622 0 : data_link_index_[peer] = link;
623 :
624 0 : TransportReceiveListener_rch trl = get_receive_listener();
625 :
626 0 : OPENDDS_ASSERT(repo_id_ != GUID_UNKNOWN);
627 0 : if (trl) {
628 0 : link->make_reservation(peer, repo_id_, trl, reliable_);
629 : } else {
630 0 : link->make_reservation(peer, repo_id_, get_send_listener(), reliable_);
631 : }
632 0 : }
633 :
634 : void
635 3 : TransportClient::stop_associating()
636 : {
637 3 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
638 3 : for (PendingMap::iterator it = pending_.begin(); it != pending_.end(); ++it) {
639 : {
640 : // The transport impl may have resource for a pending connection.
641 0 : ACE_Guard<ACE_Thread_Mutex> guard(it->second->mutex_);
642 0 : for (size_t i = 0; i < it->second->impls_.size(); ++i) {
643 0 : TransportImpl_rch impl = it->second->impls_[i].lock();
644 0 : if (impl) {
645 0 : impl->stop_accepting_or_connecting(*this, it->second->data_.remote_id_, true, true);
646 : }
647 0 : }
648 0 : }
649 0 : it->second->reset_client();
650 0 : pending_assoc_timer_->cancel_timer(it->second);
651 0 : prev_pending_.insert(std::make_pair(it->first, it->second));
652 : }
653 3 : pending_.clear();
654 3 : }
655 :
656 : void
657 0 : TransportClient::stop_associating(const GUID_t* repos, CORBA::ULong length)
658 : {
659 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
660 :
661 0 : if (repos == 0 || length == 0) {
662 0 : return;
663 : } else {
664 0 : for (CORBA::ULong i = 0; i < length; ++i) {
665 0 : PendingMap::iterator iter = pending_.find(repos[i]);
666 0 : if (iter != pending_.end()) {
667 : {
668 : // The transport impl may have resource for a pending connection.
669 0 : ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
670 0 : for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
671 0 : TransportImpl_rch impl = iter->second->impls_[i].lock();
672 0 : if (impl) {
673 0 : impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
674 : }
675 0 : }
676 0 : }
677 0 : iter->second->reset_client();
678 0 : pending_assoc_timer_->cancel_timer(iter->second);
679 0 : prev_pending_.insert(std::make_pair(iter->first, iter->second));
680 0 : pending_.erase(iter);
681 : }
682 : }
683 : }
684 0 : }
685 :
686 : void
687 0 : TransportClient::send_final_acks()
688 : {
689 0 : links_.send_final_acks(get_guid());
690 0 : }
691 :
692 : void
693 0 : TransportClient::disassociate(const GUID_t& peerId)
694 : {
695 0 : LogGuid peerId_log(peerId);
696 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) TransportClient::disassociate "
697 : "TransportClient(%@) disassociating from %C\n",
698 : this,
699 : peerId_log.c_str()), 5);
700 :
701 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
702 :
703 0 : PendingMap::iterator iter = pending_.find(peerId);
704 0 : if (iter != pending_.end()) {
705 : {
706 : // The transport impl may have resource for a pending connection.
707 0 : ACE_Guard<ACE_Thread_Mutex> guard(iter->second->mutex_);
708 0 : for (size_t i = 0; i < iter->second->impls_.size(); ++i) {
709 0 : TransportImpl_rch impl = iter->second->impls_[i].lock();
710 0 : if (impl) {
711 0 : impl->stop_accepting_or_connecting(*this, iter->second->data_.remote_id_, true, true);
712 : }
713 0 : }
714 0 : }
715 0 : iter->second->reset_client();
716 0 : pending_assoc_timer_->cancel_timer(iter->second);
717 0 : prev_pending_.insert(std::make_pair(iter->first, iter->second));
718 0 : pending_.erase(iter);
719 0 : return;
720 : }
721 :
722 0 : const DataLinkIndex::iterator found = data_link_index_.find(peerId);
723 :
724 0 : if (found == data_link_index_.end()) {
725 0 : if (DCPS_debug_level > 4) {
726 0 : const LogGuid log(peerId);
727 0 : ACE_DEBUG((LM_DEBUG,
728 : ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
729 : ACE_TEXT("no link for remote peer %C\n"),
730 : log.c_str()));
731 0 : }
732 :
733 0 : return;
734 : }
735 :
736 0 : const DataLink_rch link = found->second;
737 :
738 : //now that an _rch is created for the link, remove the iterator from data_link_index_ while still holding lock
739 : //otherwise it could be removed in transport_detached()
740 0 : data_link_index_.erase(found);
741 0 : DataLinkSetMap released;
742 :
743 0 : if (DCPS_debug_level > 4) {
744 0 : ACE_DEBUG((LM_DEBUG,
745 : ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
746 : ACE_TEXT("about to release_reservations for link[%@]\n"),
747 : link.in()));
748 : }
749 :
750 0 : OPENDDS_ASSERT(repo_id_ != GUID_UNKNOWN);
751 0 : link->release_reservations(peerId, repo_id_, released);
752 :
753 0 : if (!released.empty()) {
754 :
755 0 : if (DCPS_debug_level > 4) {
756 0 : ACE_DEBUG((LM_DEBUG,
757 : ACE_TEXT("(%P|%t) TransportClient::disassociate: ")
758 : ACE_TEXT("about to remove_link[%@] from links_\n"),
759 : link.in()));
760 : }
761 0 : links_.remove_link(link);
762 :
763 0 : if (DCPS_debug_level > 4) {
764 0 : LogGuid logger(repo_id_);
765 0 : ACE_DEBUG((LM_DEBUG,
766 : ACE_TEXT("(%P|%t) TransportClient::disassociate: calling remove_listener %C on link[%@]\n"),
767 : logger.c_str(),
768 : link.in()));
769 0 : }
770 : // Datalink is no longer used for any remote peer by this TransportClient
771 0 : link->remove_listener(repo_id_);
772 :
773 : }
774 0 : }
775 :
776 0 : void TransportClient::transport_stop()
777 : {
778 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
779 0 : const ImplsType impls = impls_;
780 0 : const GUID_t repo_id = repo_id_;
781 0 : guard.release();
782 :
783 0 : if (repo_id == GUID_UNKNOWN) {
784 : // Not associated so nothing to stop.
785 0 : return;
786 : }
787 :
788 0 : for (size_t i = 0; i < impls.size(); ++i) {
789 0 : const TransportImpl_rch impl = impls[i].lock();
790 0 : if (impl) {
791 0 : impl->client_stop(repo_id);
792 : }
793 0 : }
794 0 : }
795 :
796 : void
797 0 : TransportClient::register_for_reader(const GUID_t& participant,
798 : const GUID_t& writerid,
799 : const GUID_t& readerid,
800 : const TransportLocatorSeq& locators,
801 : OpenDDS::DCPS::DiscoveryListener* listener)
802 : {
803 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
804 0 : for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
805 0 : pos != limit;
806 0 : ++pos) {
807 0 : TransportImpl_rch impl = pos->lock();
808 0 : if (impl) {
809 0 : impl->register_for_reader(participant, writerid, readerid, locators, listener);
810 : }
811 0 : }
812 0 : }
813 :
814 : void
815 0 : TransportClient::unregister_for_reader(const GUID_t& participant,
816 : const GUID_t& writerid,
817 : const GUID_t& readerid)
818 : {
819 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
820 0 : for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
821 0 : pos != limit;
822 0 : ++pos) {
823 0 : TransportImpl_rch impl = pos->lock();
824 0 : if (impl) {
825 0 : impl->unregister_for_reader(participant, writerid, readerid);
826 : }
827 0 : }
828 0 : }
829 :
830 : void
831 0 : TransportClient::register_for_writer(const GUID_t& participant,
832 : const GUID_t& readerid,
833 : const GUID_t& writerid,
834 : const TransportLocatorSeq& locators,
835 : DiscoveryListener* listener)
836 : {
837 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
838 0 : for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
839 0 : pos != limit;
840 0 : ++pos) {
841 0 : TransportImpl_rch impl = pos->lock();
842 0 : if (impl) {
843 0 : impl->register_for_writer(participant, readerid, writerid, locators, listener);
844 : }
845 0 : }
846 0 : }
847 :
848 : void
849 0 : TransportClient::unregister_for_writer(const GUID_t& participant,
850 : const GUID_t& readerid,
851 : const GUID_t& writerid)
852 : {
853 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
854 0 : for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
855 0 : pos != limit;
856 0 : ++pos) {
857 0 : TransportImpl_rch impl = pos->lock();
858 0 : if (impl) {
859 0 : impl->unregister_for_writer(participant, readerid, writerid);
860 : }
861 0 : }
862 0 : }
863 :
864 : void
865 0 : TransportClient::update_locators(const GUID_t& remote,
866 : const TransportLocatorSeq& locators)
867 : {
868 0 : ACE_GUARD(ACE_Thread_Mutex, guard, lock_);
869 0 : for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
870 0 : pos != limit;
871 0 : ++pos) {
872 0 : TransportImpl_rch impl = pos->lock();
873 0 : if (impl) {
874 0 : impl->update_locators(remote, locators);
875 : }
876 0 : }
877 0 : }
878 :
879 : WeakRcHandle<ICE::Endpoint>
880 0 : TransportClient::get_ice_endpoint()
881 : {
882 : // The one-to-many relationship with impls implies that this should
883 : // return a set of endpoints instead of a single endpoint or null.
884 : // For now, we will assume a single impl.
885 :
886 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, WeakRcHandle<ICE::Endpoint>());
887 0 : for (ImplsType::iterator pos = impls_.begin(), limit = impls_.end();
888 0 : pos != limit;
889 0 : ++pos) {
890 0 : TransportImpl_rch impl = pos->lock();
891 0 : if (impl) {
892 0 : WeakRcHandle<ICE::Endpoint> endpoint = impl->get_ice_endpoint();
893 0 : if (endpoint) { return endpoint; }
894 0 : }
895 0 : }
896 :
897 0 : return WeakRcHandle<ICE::Endpoint>();
898 0 : }
899 :
900 : bool
901 0 : TransportClient::send_response(const GUID_t& peer,
902 : const DataSampleHeader& header,
903 : Message_Block_Ptr payload)
904 : {
905 0 : DataLinkIndex::iterator found = data_link_index_.find(peer);
906 :
907 0 : if (found == data_link_index_.end()) {
908 0 : if (DCPS_debug_level > 4) {
909 0 : LogGuid logger(peer);
910 0 : ACE_DEBUG((LM_DEBUG,
911 : ACE_TEXT("(%P|%t) TransportClient::send_response: ")
912 : ACE_TEXT("no link for publication %C, ")
913 : ACE_TEXT("not sending response.\n"),
914 : logger.c_str()));
915 0 : }
916 :
917 0 : return false;
918 : }
919 :
920 0 : DataLinkSet singular;
921 0 : singular.insert_link(found->second);
922 0 : singular.send_response(peer, header, move(payload));
923 0 : return true;
924 0 : }
925 :
926 : void
927 0 : TransportClient::send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
928 : {
929 0 : if (send_list.head() == 0) {
930 0 : return;
931 : }
932 0 : ACE_GUARD(ACE_Thread_Mutex, send_transaction_guard, send_transaction_lock_);
933 0 : send_i(send_list, transaction_id);
934 0 : }
935 :
936 : SendControlStatus
937 0 : TransportClient::send_w_control(SendStateDataSampleList send_list,
938 : const DataSampleHeader& header,
939 : Message_Block_Ptr msg,
940 : const GUID_t& destination)
941 : {
942 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, send_transaction_guard,
943 : send_transaction_lock_, SEND_CONTROL_ERROR);
944 0 : if (send_list.head()) {
945 0 : send_i(send_list, 0);
946 : }
947 0 : return send_control_to(header, move(msg), destination);
948 0 : }
949 :
950 : void
951 0 : TransportClient::send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id)
952 : {
953 0 : if (transaction_id != 0 && transaction_id != expected_transaction_id_) {
954 0 : if (transaction_id > max_transaction_id_seen_) {
955 0 : max_transaction_id_seen_ = transaction_id;
956 0 : max_transaction_tail_ = send_list.tail();
957 : }
958 0 : return;
959 : } else /* transaction_id == expected_transaction_id */ {
960 :
961 0 : DataSampleElement* cur = send_list.head();
962 0 : if (max_transaction_tail_ == 0) {
963 : //Means no future transaction beat this transaction into send
964 0 : if (transaction_id != 0)
965 0 : max_transaction_id_seen_ = expected_transaction_id_;
966 : // Only send this current transaction
967 0 : max_transaction_tail_ = send_list.tail();
968 : }
969 0 : DataLinkSet send_links;
970 :
971 0 : while (cur != 0) {
972 : // VERY IMPORTANT NOTE:
973 : //
974 : // We have to be very careful in how we deal with the current
975 : // DataSampleElement. The issue is that once we have invoked
976 : // data_delivered() on the send_listener_ object, or we have invoked
977 : // send() on the pub_links, we can no longer access the current
978 : // DataSampleElement!Thus, we need to get the next
979 : // DataSampleElement (pointer) from the current element now,
980 : // while it is safe.
981 : DataSampleElement* next_elem;
982 0 : if (cur != max_transaction_tail_) {
983 0 : next_elem = cur->get_next_send_sample();
984 : } else {
985 0 : next_elem = max_transaction_tail_;
986 : }
987 : DataLinkSet_rch pub_links =
988 0 : (cur->get_num_subs() > 0)
989 : ? DataLinkSet_rch(links_.select_links(cur->get_sub_ids(), cur->get_num_subs()))
990 0 : : DataLinkSet_rch(&links_, inc_count());
991 :
992 0 : if (pub_links.is_nil() || pub_links->empty()) {
993 : // NOTE: This is the "local publisher id is not currently
994 : // associated with any remote subscriber ids" case.
995 :
996 0 : if (DCPS_debug_level > 4) {
997 0 : LogGuid logger(cur->get_pub_id());
998 0 : ACE_DEBUG((LM_DEBUG,
999 : ACE_TEXT("(%P|%t) TransportClient::send_i: ")
1000 : ACE_TEXT("no links for publication %C, ")
1001 : ACE_TEXT("not sending element %@ for transaction: %d.\n"),
1002 : logger.c_str(),
1003 : cur,
1004 : cur->transaction_id()));
1005 0 : }
1006 :
1007 : // We tell the send_listener_ that all of the remote subscriber ids
1008 : // that wanted the data (all zero of them) have indeed received
1009 : // the data.
1010 0 : cur->get_send_listener()->data_delivered(cur);
1011 :
1012 : } else {
1013 0 : VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: Found DataLinkSet. Sending element %@.\n"
1014 : , cur), 5);
1015 :
1016 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
1017 :
1018 : // Content-Filtering adjustment to the pub_links:
1019 : // - If the sample should be filtered out of all subscriptions on a given
1020 : // DataLink, then exclude that link from the subset that we'll send to.
1021 : // - If the sample should be filtered out of some (or none) of the subs,
1022 : // then record that information in the DataSampleElement so that the
1023 : // header's content_filter_entries_ can be marshaled before it's sent.
1024 0 : if (cur->filter_out_.ptr()) {
1025 0 : DataLinkSet_rch subset;
1026 0 : DataLinkSet::GuardType guard(pub_links->lock());
1027 : typedef DataLinkSet::MapType MapType;
1028 0 : MapType& map = pub_links->map();
1029 :
1030 0 : for (MapType::iterator itr = map.begin(); itr != map.end(); ++itr) {
1031 : size_t n_subs;
1032 : GUIDSeq_var ti =
1033 0 : itr->second->target_intersection(cur->get_pub_id(),
1034 0 : cur->filter_out_.in(), n_subs);
1035 :
1036 0 : if (ti.ptr() == 0 || ti->length() != n_subs) {
1037 0 : if (!subset.in()) {
1038 0 : subset = make_rch<DataLinkSet>();
1039 : }
1040 :
1041 0 : subset->insert_link(itr->second);
1042 0 : cur->filter_per_link_[itr->first] = ti._retn();
1043 :
1044 : } else {
1045 0 : VDBG((LM_DEBUG,
1046 : "(%P|%t) DBG: DataLink completely filtered-out %@.\n",
1047 : itr->second.in()));
1048 : }
1049 0 : }
1050 :
1051 0 : if (!subset.in()) {
1052 0 : guard.release();
1053 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: filtered-out of all DataLinks.\n"));
1054 : // similar to the "if (pub_links.is_nil())" case above, no links
1055 0 : cur->get_send_listener()->data_delivered(cur);
1056 0 : if (cur != max_transaction_tail_) {
1057 : // Move on to the next DataSampleElement to send.
1058 0 : cur = next_elem;
1059 0 : continue;
1060 : } else {
1061 0 : break;
1062 : }
1063 : }
1064 :
1065 0 : pub_links = subset;
1066 0 : }
1067 :
1068 : #endif
1069 :
1070 : // This will do several things, including adding to the membership
1071 : // of the send_links set. Any DataLinks added to the send_links
1072 : // set will be also told about the send_start() event. Those
1073 : // DataLinks (in the pub_links set) that are already in the
1074 : // send_links set will not be told about the send_start() event
1075 : // since they heard about it when they were inserted into the
1076 : // send_links set.
1077 0 : send_links.send_start(pub_links.in());
1078 0 : if (cur->get_header().message_id_ != SAMPLE_DATA) {
1079 0 : pub_links->send_control(cur);
1080 : } else {
1081 0 : pub_links->send(cur);
1082 : }
1083 : }
1084 0 : if (cur != max_transaction_tail_) {
1085 : // Move on to the next DataSampleElement to send.
1086 0 : cur = next_elem;
1087 : } else {
1088 0 : break;
1089 : }
1090 0 : }
1091 :
1092 : // This will inform each DataLink in the set about the stop_send() event.
1093 : // It will then clear the send_links_ set.
1094 : //
1095 : // The reason that the send_links_ set is cleared is because we continually
1096 : // reuse the same send_links_ object over and over for each call to this
1097 : // send method.
1098 0 : GUID_t pub_id = repo_id();
1099 0 : send_links.send_stop(pub_id);
1100 0 : if (transaction_id != 0) {
1101 0 : expected_transaction_id_ = max_transaction_id_seen_ + 1;
1102 : }
1103 0 : max_transaction_tail_ = 0;
1104 0 : }
1105 : }
1106 :
1107 : TransportSendListener_rch
1108 0 : TransportClient::get_send_listener()
1109 : {
1110 0 : return rchandle_from(dynamic_cast<TransportSendListener*>(this));
1111 : }
1112 :
1113 : TransportReceiveListener_rch
1114 0 : TransportClient::get_receive_listener()
1115 : {
1116 0 : return rchandle_from(dynamic_cast<TransportReceiveListener*>(this));
1117 : }
1118 :
1119 : SendControlStatus
1120 0 : TransportClient::send_control(const DataSampleHeader& header,
1121 : Message_Block_Ptr msg)
1122 : {
1123 0 : if (repo_id_ == GUID_UNKNOWN) {
1124 0 : return SEND_CONTROL_OK;
1125 : }
1126 0 : return links_.send_control(repo_id_, get_send_listener(), header, move(msg));
1127 : }
1128 :
1129 : SendControlStatus
1130 0 : TransportClient::send_control_to(const DataSampleHeader& header,
1131 : Message_Block_Ptr msg,
1132 : const GUID_t& destination)
1133 : {
1134 0 : if (repo_id_ == GUID_UNKNOWN) {
1135 0 : return SEND_CONTROL_OK;
1136 : }
1137 :
1138 0 : DataLinkSet singular;
1139 : {
1140 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, lock_, SEND_CONTROL_ERROR);
1141 0 : DataLinkIndex::iterator found = data_link_index_.find(destination);
1142 :
1143 0 : if (found == data_link_index_.end()) {
1144 0 : return SEND_CONTROL_ERROR;
1145 : }
1146 :
1147 0 : singular.insert_link(found->second);
1148 0 : }
1149 0 : return singular.send_control(repo_id_, get_send_listener(), header, move(msg));
1150 0 : }
1151 :
1152 : bool
1153 0 : TransportClient::remove_sample(const DataSampleElement* sample)
1154 : {
1155 0 : return links_.remove_sample(sample);
1156 : }
1157 :
1158 : bool
1159 0 : TransportClient::remove_all_msgs()
1160 : {
1161 0 : if (repo_id_ == GUID_UNKNOWN) {
1162 0 : return true;
1163 : }
1164 0 : return links_.remove_all_msgs(repo_id_);
1165 : }
1166 :
1167 0 : void TransportClient::terminate_send_if_suspended()
1168 : {
1169 0 : links_.terminate_send_if_suspended();
1170 0 : }
1171 :
1172 0 : bool TransportClient::associated_with(const GUID_t& remote) const
1173 : {
1174 0 : ACE_Guard<ACE_Thread_Mutex> guard(lock_);
1175 0 : if (!guard.locked()) {
1176 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::associated_with: "
1177 : "lock failed\n"));
1178 0 : return false;
1179 : }
1180 0 : return data_link_index_.count(remote);
1181 0 : }
1182 :
1183 0 : bool TransportClient::pending_association_with(const GUID_t& remote) const
1184 : {
1185 0 : ACE_Guard<ACE_Thread_Mutex> guard(lock_);
1186 0 : if (!guard.locked()) {
1187 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::pending_association_with: "
1188 : "lock failed\n"));
1189 0 : return false;
1190 : }
1191 0 : return pending_.count(remote);
1192 0 : }
1193 :
1194 0 : void TransportClient::data_acked(const GUID_t& remote)
1195 : {
1196 0 : TransportSendListener_rch send_listener;
1197 : {
1198 0 : ACE_Guard<ACE_Thread_Mutex> guard(lock_);
1199 0 : if (!guard.locked()) {
1200 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportClient::data_acked: "
1201 : "lock failed\n"));
1202 0 : return;
1203 : }
1204 0 : send_listener = get_send_listener();
1205 0 : }
1206 0 : send_listener->data_acked(remote);
1207 0 : }
1208 :
1209 0 : bool TransportClient::is_leading(const GUID_t& reader_id) const
1210 : {
1211 0 : return links_.is_leading(get_guid(), reader_id);
1212 : }
1213 :
1214 :
1215 : } // namepsace DCPS
1216 : } // namepsace OpenDDS
1217 :
1218 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|