Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 : #include "DataLink.h"
10 :
11 : #include "ReceivedDataSample.h"
12 :
13 : #include "TransportImpl.h"
14 : #include "TransportInst.h"
15 : #include "TransportClient.h"
16 :
17 : #include "dds/DCPS/DataWriterImpl.h"
18 : #include "dds/DCPS/DataReaderImpl.h"
19 : #include "dds/DCPS/Service_Participant.h"
20 : #include "dds/DCPS/GuidConverter.h"
21 : #include "dds/DdsDcpsGuidTypeSupportImpl.h"
22 : #include "dds/DCPS/Util.h"
23 : #include "dds/DCPS/Definitions.h"
24 : #include "dds/DCPS/SafetyProfileStreams.h"
25 :
26 : #include "EntryExit.h"
27 : #include "tao/debug.h"
28 : #include "ace/Reactor.h"
29 : #include "ace/SOCK.h"
30 :
31 :
32 : #if !defined (__ACE_INLINE__)
33 : #include "DataLink.inl"
34 : #endif /* __ACE_INLINE__ */
35 :
36 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
37 :
38 : namespace OpenDDS {
39 : namespace DCPS {
40 :
41 : /// Only called by our TransportImpl object.
42 0 : DataLink::DataLink(const TransportImpl_rch& impl, Priority priority, bool is_loopback,
43 0 : bool is_active)
44 0 : : stopped_(false),
45 0 : impl_(impl),
46 0 : transport_priority_(priority),
47 0 : scheduling_release_(false),
48 0 : is_loopback_(is_loopback),
49 0 : is_active_(is_active),
50 0 : started_(false),
51 0 : send_response_listener_("DataLink"),
52 0 : interceptor_(impl->reactor(), impl->reactor_owner())
53 : {
54 : DBG_ENTRY_LVL("DataLink", "DataLink", 6);
55 :
56 0 : id_ = DataLink::get_next_datalink_id();
57 :
58 0 : long datalink_release_delay = TransportInst::DEFAULT_DATALINK_RELEASE_DELAY;
59 0 : size_t control_chunks = TransportInst::DEFAULT_DATALINK_CONTROL_CHUNKS;
60 :
61 0 : TransportInst_rch cfg = impl->config();
62 0 : if (cfg) {
63 0 : datalink_release_delay = cfg->datalink_release_delay_;
64 0 : if (cfg->thread_per_connection_) {
65 0 : thr_per_con_send_task_.reset(new ThreadPerConnectionSendTask(this));
66 :
67 0 : if (thr_per_con_send_task_->open() == -1) {
68 0 : ACE_ERROR((LM_ERROR,
69 : ACE_TEXT("(%P|%t) DataLink::DataLink: ")
70 : ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
71 :
72 0 : } else if (DCPS_debug_level > 4) {
73 0 : ACE_DEBUG((LM_DEBUG,
74 : ACE_TEXT("(%P|%t) DataLink::DataLink - ")
75 : ACE_TEXT("started new thread to send data with.\n")));
76 : }
77 : }
78 0 : control_chunks = cfg->datalink_control_chunks_;
79 : }
80 :
81 : // Initialize transport control sample allocators:
82 0 : datalink_release_delay_ = TimeDuration::from_msec(datalink_release_delay);
83 :
84 0 : this->mb_allocator_.reset(new MessageBlockAllocator(control_chunks));
85 0 : this->db_allocator_.reset(new DataBlockAllocator(control_chunks));
86 0 : }
87 :
88 0 : DataLink::~DataLink()
89 : {
90 : DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
91 :
92 0 : if (!assoc_by_local_.empty()) {
93 0 : ACE_DEBUG((LM_WARNING,
94 : ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
95 : ACE_TEXT("link still in use by %d entities when deleted!\n"),
96 : this, assoc_by_local_.size()));
97 : }
98 :
99 0 : if (this->thr_per_con_send_task_ != 0) {
100 0 : this->thr_per_con_send_task_->close(1);
101 : }
102 0 : }
103 :
104 : TransportImpl_rch
105 0 : DataLink::impl() const
106 : {
107 0 : return impl_.lock();
108 : }
109 :
110 : bool
111 0 : DataLink::add_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote)
112 : {
113 0 : const DataLink_rch link(this, inc_count());
114 :
115 0 : TransportClient_rch client_lock = client.lock();
116 0 : const GUID_t client_id = client_lock ? client_lock->get_guid() : GUID_UNKNOWN;
117 :
118 0 : GuardType guard(strategy_lock_);
119 :
120 0 : if (client_lock) {
121 0 : PendingOnStartsMap::iterator it = pending_on_starts_.find(remote);
122 0 : if (it != pending_on_starts_.end()) {
123 0 : RepoIdSet::iterator it2 = it->second.find(client_id);
124 0 : if (it2 != it->second.end()) {
125 0 : it->second.erase(it2);
126 0 : if (it->second.empty()) {
127 0 : pending_on_starts_.erase(it);
128 : }
129 0 : guard.release();
130 0 : interceptor_.execute_or_enqueue(make_rch<ImmediateStart>(link, client, remote));
131 : } else {
132 0 : on_start_callbacks_[remote][client_id] = client;
133 : }
134 : } else {
135 0 : on_start_callbacks_[remote][client_id] = client;
136 : }
137 : }
138 :
139 0 : if (started_ && !send_strategy_.is_nil()) {
140 0 : return false; // link already started
141 : }
142 0 : return true;
143 0 : }
144 :
145 : void
146 0 : DataLink::remove_startup_callbacks(const GUID_t& local, const GUID_t& remote)
147 : {
148 0 : GuardType guard(strategy_lock_);
149 :
150 0 : OnStartCallbackMap::iterator oit = on_start_callbacks_.find(remote);
151 0 : if (oit != on_start_callbacks_.end()) {
152 0 : RepoToClientMap::iterator oit2 = oit->second.find(local);
153 0 : if (oit2 != oit->second.end()) {
154 0 : oit->second.erase(oit2);
155 0 : if (oit->second.empty()) {
156 0 : on_start_callbacks_.erase(oit);
157 : }
158 : }
159 : }
160 0 : PendingOnStartsMap::iterator pit = pending_on_starts_.find(remote);
161 0 : if (pit != pending_on_starts_.end()) {
162 0 : RepoIdSet::iterator pit2 = pit->second.find(local);
163 0 : if (pit2 != pit->second.end()) {
164 0 : pit->second.erase(pit2);
165 0 : if (pit->second.empty()) {
166 0 : pending_on_starts_.erase(pit);
167 : }
168 : }
169 : }
170 0 : }
171 :
172 : void
173 0 : DataLink::remove_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote)
174 : {
175 0 : TransportClient_rch client_lock = client.lock();
176 0 : if (client_lock) {
177 0 : const GUID_t id = client_lock->get_guid();
178 :
179 0 : GuardType guard(strategy_lock_);
180 0 : OnStartCallbackMap::iterator it = on_start_callbacks_.find(remote);
181 0 : if (it != on_start_callbacks_.end()) {
182 0 : RepoToClientMap::iterator it2 = it->second.find(id);
183 0 : if (it2 != it->second.end()) {
184 0 : it->second.erase(it2);
185 0 : if (it->second.empty()) {
186 0 : on_start_callbacks_.erase(it);
187 : }
188 : }
189 : }
190 0 : }
191 0 : }
192 :
193 : void
194 0 : DataLink::invoke_on_start_callbacks(bool success)
195 : {
196 0 : const DataLink_rch link(success ? this : 0, inc_count());
197 :
198 : while (true) {
199 0 : GuardType guard(strategy_lock_);
200 :
201 0 : if (on_start_callbacks_.empty()) {
202 0 : break;
203 : }
204 :
205 0 : GUID_t remote = GUID_UNKNOWN;
206 0 : TransportClient_wrch client;
207 0 : OnStartCallbackMap::iterator it = on_start_callbacks_.begin();
208 0 : if (it != on_start_callbacks_.end()) {
209 0 : remote = it->first;
210 0 : RepoToClientMap::iterator it2 = it->second.begin();
211 0 : if (it2 != it->second.end()) {
212 0 : client = it2->second;
213 0 : it->second.erase(it2);
214 0 : if (it->second.empty()) {
215 0 : on_start_callbacks_.erase(it);
216 : }
217 : }
218 : }
219 :
220 0 : guard.release();
221 0 : if (success) {
222 0 : TransportClient_rch client_lock = client.lock();
223 0 : if (client_lock) {
224 0 : client_lock->use_datalink(remote, link);
225 : }
226 0 : }
227 0 : }
228 0 : }
229 :
230 0 : bool DataLink::invoke_on_start_callbacks(const GUID_t& local, const GUID_t& remote, bool success)
231 : {
232 0 : const DataLink_rch link(success ? this : 0, inc_count());
233 :
234 0 : TransportClient_wrch client;
235 0 : bool made_callback = false;
236 :
237 : {
238 0 : GuardType guard(strategy_lock_);
239 :
240 0 : OnStartCallbackMap::iterator it = on_start_callbacks_.find(remote);
241 0 : if (it != on_start_callbacks_.end()) {
242 0 : RepoToClientMap::iterator it2 = it->second.find(local);
243 0 : if (it2 != it->second.end()) {
244 0 : client = it2->second;
245 0 : it->second.erase(it2);
246 0 : if (it->second.empty()) {
247 0 : on_start_callbacks_.erase(it);
248 : }
249 : } else {
250 0 : pending_on_starts_[remote].insert(local);
251 : }
252 : } else {
253 0 : pending_on_starts_[remote].insert(local);
254 : }
255 0 : }
256 :
257 0 : if (success) {
258 0 : TransportClient_rch client_lock = client.lock();
259 0 : if (client_lock) {
260 0 : client_lock->use_datalink(remote, link);
261 0 : made_callback = true;
262 : }
263 0 : }
264 :
265 0 : return made_callback;
266 0 : }
267 :
268 : //Reactor invokes this after being notified in schedule_stop or cancel_release
269 : int
270 0 : DataLink::handle_exception(ACE_HANDLE /* fd */)
271 : {
272 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
273 :
274 0 : const MonotonicTimePoint now = MonotonicTimePoint::now();
275 0 : if (scheduled_to_stop_at_.is_zero()) {
276 0 : if (DCPS_debug_level > 0) {
277 0 : ACE_DEBUG((LM_DEBUG,
278 : ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
279 : }
280 0 : TransportImpl_rch impl = impl_.lock();
281 0 : if (impl) {
282 0 : ACE_Reactor_Timer_Interface* reactor = impl->timer();
283 0 : if (reactor && reactor->cancel_timer(this) > 0) {
284 0 : if (DCPS_debug_level > 0) {
285 0 : ACE_DEBUG((LM_DEBUG,
286 : ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
287 : }
288 : }
289 : }
290 0 : return 0;
291 0 : } else if (scheduled_to_stop_at_ <= now) {
292 0 : if (this->scheduling_release_) {
293 0 : if (DCPS_debug_level > 0) {
294 0 : ACE_DEBUG((LM_DEBUG,
295 : ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
296 : }
297 0 : this->handle_timeout(ACE_Time_Value::zero, 0);
298 0 : return 0;
299 : }
300 0 : if (DCPS_debug_level > 0) {
301 0 : ACE_DEBUG((LM_DEBUG,
302 : ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
303 : }
304 0 : this->stop();
305 0 : return 0;
306 : } else /* SCHEDULE TO STOP IN THE FUTURE*/ {
307 0 : if (DCPS_debug_level > 0) {
308 0 : ACE_DEBUG((LM_DEBUG,
309 : ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
310 : }
311 0 : TransportImpl_rch impl = impl_.lock();
312 0 : if (impl) {
313 0 : ACE_Reactor_Timer_Interface* reactor = impl->timer();
314 0 : const TimeDuration future_release_time = scheduled_to_stop_at_ - now;
315 0 : reactor->schedule_timer(this, 0, future_release_time.value());
316 0 : }
317 0 : }
318 0 : return 0;
319 0 : }
320 :
321 : //Allows DataLink::stop to be done on the reactor thread so that
322 : //this thread avoids possibly deadlocking trying to access reactor
323 : //to stop strategies or schedule timers
324 : void
325 0 : DataLink::schedule_stop(const MonotonicTimePoint& schedule_to_stop_at)
326 : {
327 0 : if (!stopped_ && scheduled_to_stop_at_.is_zero()) {
328 0 : this->scheduled_to_stop_at_ = schedule_to_stop_at;
329 0 : notify_reactor();
330 : // reactor will invoke our DataLink::handle_exception()
331 : } else {
332 0 : if (DCPS_debug_level > 0) {
333 0 : ACE_DEBUG((LM_DEBUG,
334 : ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
335 : }
336 : }
337 0 : }
338 :
339 : void
340 0 : DataLink::notify_reactor()
341 : {
342 0 : TransportImpl_rch impl = impl_.lock();
343 0 : if (impl) {
344 0 : ReactorTask_rch rt(impl->reactor_task());
345 0 : if (rt) {
346 0 : ACE_Reactor* reactor = rt->get_reactor();
347 0 : if (reactor) {
348 0 : reactor->notify(this);
349 : }
350 : }
351 0 : }
352 0 : }
353 :
354 : void
355 0 : DataLink::stop()
356 : {
357 0 : pre_stop_i();
358 :
359 0 : TransportSendStrategy_rch send_strategy;
360 0 : TransportStrategy_rch recv_strategy;
361 :
362 : {
363 0 : GuardType guard(strategy_lock_);
364 :
365 0 : if (stopped_) return;
366 :
367 0 : send_strategy = send_strategy_;
368 0 : send_strategy_.reset();
369 :
370 0 : recv_strategy = receive_strategy_;
371 0 : receive_strategy_.reset();
372 0 : }
373 :
374 0 : if (!send_strategy.is_nil()) {
375 0 : send_strategy->stop();
376 : }
377 :
378 0 : if (!recv_strategy.is_nil()) {
379 0 : recv_strategy->stop();
380 : }
381 :
382 0 : stop_i();
383 0 : stopped_ = true;
384 0 : scheduled_to_stop_at_ = MonotonicTimePoint::zero_value;
385 0 : }
386 :
387 : void
388 0 : DataLink::resume_send()
389 : {
390 0 : TransportSendStrategy_rch strategy = get_send_strategy();
391 :
392 0 : if (strategy && strategy->isDirectMode()) {
393 0 : strategy->resume_send();
394 : }
395 0 : }
396 :
397 : int
398 0 : DataLink::make_reservation(const GUID_t& remote_subscription_id,
399 : const GUID_t& local_publication_id,
400 : const TransportSendListener_wrch& send_listener,
401 : bool reliable)
402 : {
403 : DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
404 :
405 0 : if (DCPS_debug_level > 9) {
406 0 : LogGuid local_log(local_publication_id), remote_log(remote_subscription_id);
407 0 : ACE_DEBUG((LM_DEBUG,
408 : ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
409 : ACE_TEXT("creating association local publication %C ")
410 : ACE_TEXT("<--> with remote subscription %C.\n"),
411 : local_log .c_str(),
412 : remote_log.c_str()));
413 0 : }
414 :
415 0 : TransportSendStrategy_rch strategy = get_send_strategy();
416 :
417 0 : if (strategy) {
418 0 : strategy->link_released(false);
419 : }
420 :
421 : {
422 0 : GuardType guard(pub_sub_maps_lock_);
423 :
424 0 : LocalAssociationInfo& info = assoc_by_local_[local_publication_id];
425 0 : info.reliable_ = reliable;
426 0 : info.associated_.insert(remote_subscription_id);
427 0 : ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
428 :
429 0 : if (rls.is_nil())
430 0 : rls = make_rch<ReceiveListenerSet>();
431 0 : rls->insert(local_publication_id, TransportReceiveListener_rch());
432 :
433 0 : send_listeners_.insert(std::make_pair(local_publication_id, send_listener));
434 0 : }
435 0 : return 0;
436 0 : }
437 :
438 : int
439 0 : DataLink::make_reservation(const GUID_t& remote_publication_id,
440 : const GUID_t& local_subscription_id,
441 : const TransportReceiveListener_wrch& receive_listener,
442 : bool reliable)
443 : {
444 : DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
445 :
446 0 : if (DCPS_debug_level > 9) {
447 0 : LogGuid local(local_subscription_id), remote(remote_publication_id);
448 0 : ACE_DEBUG((LM_DEBUG,
449 : ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
450 : ACE_TEXT("creating association local subscription %C ")
451 : ACE_TEXT("<--> with remote publication %C.\n"),
452 : local.c_str(), remote.c_str()));
453 0 : }
454 :
455 0 : TransportSendStrategy_rch strategy = get_send_strategy();
456 :
457 0 : if (strategy) {
458 0 : strategy->link_released(false);
459 : }
460 :
461 : {
462 0 : GuardType guard(pub_sub_maps_lock_);
463 :
464 0 : LocalAssociationInfo& info = assoc_by_local_[local_subscription_id];
465 0 : info.reliable_ = reliable;
466 0 : info.associated_.insert(remote_publication_id);
467 0 : ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
468 :
469 0 : if (rls.is_nil())
470 0 : rls = make_rch<ReceiveListenerSet>();
471 0 : rls->insert(local_subscription_id, receive_listener);
472 :
473 0 : recv_listeners_.insert(std::make_pair(local_subscription_id,
474 : receive_listener));
475 0 : }
476 0 : return 0;
477 0 : }
478 :
479 : template <typename Seq>
480 0 : void set_to_seq(const RepoIdSet& rids, Seq& seq)
481 : {
482 0 : seq.length(static_cast<CORBA::ULong>(rids.size()));
483 0 : CORBA::ULong i = 0;
484 0 : for (RepoIdSet::const_iterator iter = rids.begin(); iter != rids.end(); ++iter) {
485 0 : seq[i++] = *iter;
486 : }
487 0 : }
488 :
489 : GUIDSeq*
490 0 : DataLink::peer_ids(const GUID_t& local_id) const
491 : {
492 0 : GuardType guard(pub_sub_maps_lock_);
493 :
494 0 : const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
495 :
496 0 : if (iter == assoc_by_local_.end())
497 0 : return 0;
498 :
499 0 : GUIDSeq_var result = new GUIDSeq;
500 0 : set_to_seq(iter->second.associated_, static_cast<GUIDSeq&>(result));
501 0 : return result._retn();
502 0 : }
503 :
504 : /// This gets invoked when a TransportClient::remove_associations()
505 : /// call has been made. Because this DataLink can be shared amongst
506 : /// different TransportClient objects, and different threads could
507 : /// be "managing" the different TransportClient objects, we need
508 : /// to make sure that this release_reservations() works in conjunction
509 : /// with a simultaneous call (in another thread) to one of this
510 : /// DataLink's make_reservation() methods.
511 : void
512 0 : DataLink::release_reservations(GUID_t remote_id, GUID_t local_id,
513 : DataLinkSetMap& released_locals)
514 : {
515 : DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
516 :
517 0 : if (DCPS_debug_level > 9) {
518 0 : GuidConverter local(local_id);
519 0 : GuidConverter remote(remote_id);
520 0 : ACE_DEBUG((LM_DEBUG,
521 : ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
522 : ACE_TEXT("releasing association local: %C ")
523 : ACE_TEXT("<--> with remote %C.\n"),
524 : OPENDDS_STRING(local).c_str(),
525 : OPENDDS_STRING(remote).c_str()));
526 0 : }
527 :
528 0 : remove_startup_callbacks(local_id, remote_id);
529 :
530 : //let the specific class release its reservations
531 : //done this way to prevent deadlock of holding pub_sub_maps_lock_
532 : //then obtaining a specific class lock in release_reservations_i
533 : //which reverses lock ordering of the active send logic of needing
534 : //the specific class lock before obtaining the over arching DataLink
535 : //pub_sub_maps_lock_
536 0 : this->release_reservations_i(remote_id, local_id);
537 :
538 0 : bool release_remote_required = false;
539 : {
540 0 : GuardType guard(this->pub_sub_maps_lock_);
541 :
542 0 : if (this->stopped_) return;
543 :
544 0 : ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
545 0 : if (rls->size() == 1) {
546 0 : assoc_by_remote_.erase(remote_id);
547 0 : release_remote_required = true;
548 : } else {
549 0 : rls->remove(local_id);
550 : }
551 0 : RepoIdSet& ris = assoc_by_local_[local_id].associated_;
552 0 : if (ris.size() == 1) {
553 0 : DataLinkSet_rch& links = released_locals[local_id];
554 0 : if (links.is_nil()) {
555 0 : links = make_rch<DataLinkSet>();
556 : }
557 0 : links->insert_link(rchandle_from(this));
558 0 : assoc_by_local_.erase(local_id);
559 : } else {
560 0 : ris.erase(remote_id);
561 : }
562 :
563 0 : if (assoc_by_local_.empty()) {
564 0 : VDBG_LVL((LM_DEBUG,
565 : ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
566 : ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
567 :
568 0 : guard.release();
569 0 : TransportImpl_rch impl = impl_.lock();
570 0 : if (impl) {
571 0 : impl->release_datalink(this);
572 : }
573 0 : }
574 0 : }
575 0 : if (release_remote_required) {
576 0 : release_remote_i(remote_id);
577 : }
578 : }
579 :
580 : void
581 0 : DataLink::schedule_delayed_release()
582 : {
583 : DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
584 :
585 0 : VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
586 :
587 : // The samples have to be removed at this point, otherwise the samples
588 : // can not be delivered when new association is added and still use
589 : // this connection/datalink.
590 0 : TransportSendStrategy_rch strategy = get_send_strategy();
591 :
592 0 : if (strategy) {
593 0 : strategy->clear(TransportSendStrategy::MODE_DIRECT);
594 : }
595 :
596 0 : const MonotonicTimePoint future_release_time(MonotonicTimePoint::now() + datalink_release_delay_);
597 0 : schedule_stop(future_release_time);
598 0 : }
599 :
600 : bool
601 0 : DataLink::cancel_release()
602 : {
603 : DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
604 0 : if (stopped_) {
605 0 : if (DCPS_debug_level > 0) {
606 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
607 : }
608 0 : return false;
609 : }
610 0 : if (scheduling_release_) {
611 0 : if (DCPS_debug_level > 0) {
612 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
613 : }
614 0 : this->set_scheduling_release(false);
615 0 : scheduled_to_stop_at_ = MonotonicTimePoint::zero_value;
616 0 : notify_reactor();
617 : }
618 0 : return true;
619 : }
620 :
621 : void
622 0 : DataLink::stop_i()
623 : {
624 : DBG_ENTRY_LVL("DataLink", "stop_i", 6);
625 0 : }
626 :
627 : ACE_Message_Block*
628 0 : DataLink::create_control(char submessage_id,
629 : DataSampleHeader& header,
630 : Message_Block_Ptr data)
631 : {
632 : DBG_ENTRY_LVL("DataLink", "create_control", 6);
633 :
634 0 : header.byte_order_ = ACE_CDR_BYTE_ORDER;
635 0 : header.message_id_ = TRANSPORT_CONTROL;
636 0 : header.submessage_id_ = submessage_id;
637 0 : header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
638 :
639 0 : ACE_Message_Block* message = 0;
640 0 : ACE_NEW_MALLOC_RETURN(message,
641 : static_cast<ACE_Message_Block*>(
642 : this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
643 : ACE_Message_Block(header.get_max_serialized_size(),
644 : ACE_Message_Block::MB_DATA,
645 : data.release(),
646 : 0, // data
647 : 0, // allocator_strategy
648 : 0, // locking_strategy
649 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
650 : ACE_Time_Value::zero,
651 : ACE_Time_Value::max_time,
652 : this->db_allocator_.get(),
653 : this->mb_allocator_.get()),
654 : 0);
655 :
656 0 : if (!(*message << header)) {
657 0 : ACE_ERROR((LM_ERROR,
658 : ACE_TEXT("(%P|%t) DataLink::create_control: ")
659 : ACE_TEXT("cannot put header in message\n")));
660 0 : ACE_DES_FREE(message, this->mb_allocator_->free, ACE_Message_Block);
661 0 : message = 0;
662 : }
663 :
664 0 : return message;
665 : }
666 :
667 : SendControlStatus
668 0 : DataLink::send_control(const DataSampleHeader& header, Message_Block_Ptr message)
669 : {
670 : DBG_ENTRY_LVL("DataLink", "send_control", 6);
671 :
672 : TransportSendControlElement* const elem = new TransportSendControlElement(1, // initial_count
673 : GUID_UNKNOWN, &send_response_listener_,
674 0 : header, move(message));
675 :
676 0 : send_response_listener_.track_message();
677 :
678 0 : GUID_t senderId(header.publication_id_);
679 0 : send_start();
680 0 : send(elem);
681 0 : send_stop(senderId);
682 :
683 0 : return SEND_CONTROL_OK;
684 : }
685 :
686 : /// This method will "deliver" the sample to all TransportReceiveListeners
687 : /// within this DataLink that are interested in the (remote) publisher id
688 : /// that sent the sample.
689 : int
690 0 : DataLink::data_received(ReceivedDataSample& sample,
691 : const GUID_t& readerId /* = GUID_UNKNOWN */)
692 : {
693 0 : data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED);
694 0 : return 0;
695 : }
696 :
697 : void
698 0 : DataLink::data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl)
699 : {
700 0 : data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED);
701 0 : }
702 :
703 : void
704 0 : DataLink::data_received_i(ReceivedDataSample& sample,
705 : const GUID_t& readerId,
706 : const RepoIdSet& incl_excl,
707 : ReceiveListenerSet::ConstrainReceiveSet constrain)
708 : {
709 : DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
710 : // Which remote publication sent this message?
711 0 : const GUID_t& publication_id = sample.header_.publication_id_;
712 :
713 : // Locate the set of TransportReceiveListeners associated with this
714 : // DataLink that are interested in hearing about any samples received
715 : // from the remote publisher_id.
716 0 : if (DCPS_debug_level > 9) {
717 0 : const GuidConverter converter(publication_id);
718 0 : const GuidConverter reader(readerId);
719 0 : ACE_DEBUG((LM_DEBUG,
720 : ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
721 : ACE_TEXT("from publication %C received sample: %C to readerId %C (%C).\n"),
722 : OPENDDS_STRING(converter).c_str(),
723 : to_string(sample.header_).c_str(),
724 : OPENDDS_STRING(reader).c_str(),
725 : constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
726 0 : }
727 :
728 0 : if (Transport_debug_level > 9) {
729 0 : const GuidConverter converter(publication_id);
730 0 : ACE_DEBUG((LM_DEBUG,
731 : ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
732 : ACE_TEXT("from publication %C received sample: %C.\n"),
733 : OPENDDS_STRING(converter).c_str(),
734 : to_string(sample.header_).c_str()));
735 0 : }
736 :
737 0 : ReceiveListenerSet_rch listener_set;
738 0 : TransportReceiveListener_rch listener;
739 : {
740 0 : GuardType guard(this->pub_sub_maps_lock_);
741 0 : AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
742 0 : if (iter != assoc_by_remote_.end()) {
743 0 : listener_set = iter->second;
744 : } else {
745 0 : listener = this->default_listener_.lock();
746 : }
747 0 : }
748 :
749 0 : if (listener_set.is_nil()) {
750 0 : if (listener) {
751 0 : listener->data_received(sample);
752 : } else {
753 : // Nobody has any interest in this message. Drop it on the floor.
754 0 : if (Transport_debug_level > 4) {
755 0 : const GuidConverter converter(publication_id);
756 0 : ACE_DEBUG((LM_DEBUG,
757 : ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
758 : ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
759 : OPENDDS_STRING(converter).c_str()));
760 0 : }
761 : }
762 0 : return;
763 : }
764 :
765 0 : if (readerId != GUID_UNKNOWN) {
766 0 : listener_set->data_received(sample, readerId);
767 0 : return;
768 : }
769 :
770 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
771 :
772 0 : if (sample.header_.content_filter_
773 0 : && sample.header_.content_filter_entries_.length()) {
774 0 : ReceiveListenerSet subset(*listener_set.in());
775 0 : subset.remove_all(sample.header_.content_filter_entries_);
776 0 : subset.data_received(sample, incl_excl, constrain);
777 :
778 0 : } else {
779 : #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
780 :
781 0 : if (DCPS_debug_level > 9) {
782 : // Just get the set to do our dirty work by having it iterate over its
783 : // collection of TransportReceiveListeners, and invoke the data_received()
784 : // method on each one.
785 0 : OPENDDS_STRING included_ids;
786 0 : bool first = true;
787 0 : RepoIdSet::const_iterator iter = incl_excl.begin();
788 0 : while(iter != incl_excl.end()) {
789 0 : included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
790 0 : first = false;
791 0 : ++iter;
792 : }
793 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n",
794 : constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
795 0 : }
796 0 : listener_set->data_received(sample, incl_excl, constrain);
797 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
798 : }
799 :
800 : #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
801 0 : }
802 :
803 : // static
804 : ACE_UINT64
805 0 : DataLink::get_next_datalink_id()
806 : {
807 : static ACE_UINT64 next_id = 0;
808 0 : static LockType lock;
809 :
810 : ACE_UINT64 id;
811 : {
812 0 : GuardType guard(lock);
813 0 : id = next_id++;
814 :
815 0 : if (0 == next_id) {
816 0 : ACE_ERROR((LM_ERROR,
817 : ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
818 : ACE_TEXT("has rolled over and is reusing ids!\n")));
819 : }
820 0 : }
821 :
822 0 : return id;
823 : }
824 :
825 : void
826 0 : DataLink::transport_shutdown()
827 : {
828 : DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
829 :
830 : //this->cancel_release();
831 0 : this->set_scheduling_release(false);
832 0 : scheduled_to_stop_at_ = MonotonicTimePoint::zero_value;
833 :
834 : {
835 0 : TransportImpl_rch impl = impl_.lock();
836 0 : if (impl) {
837 0 : ACE_Reactor_Timer_Interface* reactor = impl->timer();
838 0 : reactor->cancel_timer(this);
839 : }
840 0 : }
841 0 : this->stop();
842 : // this->send_listeners_.clear();
843 : // this->recv_listeners_.clear();
844 : // Drop our reference to the TransportImpl object
845 0 : }
846 :
847 : void
848 0 : DataLink::notify(ConnectionNotice notice)
849 : {
850 : DBG_ENTRY_LVL("DataLink", "notify", 6);
851 :
852 0 : VDBG((LM_DEBUG,
853 : ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
854 : this,
855 : connection_notice_as_str(notice)));
856 :
857 0 : GuardType guard(this->pub_sub_maps_lock_);
858 :
859 : // Notify the datawriters
860 : // the lost publications due to a connection problem.
861 0 : for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
862 0 : itr != send_listeners_.end(); ++itr) {
863 :
864 0 : TransportSendListener_rch tsl = itr->second.lock();
865 :
866 0 : if (tsl) {
867 0 : if (Transport_debug_level > 0) {
868 0 : GuidConverter converter(itr->first);
869 0 : ACE_DEBUG((LM_DEBUG,
870 : ACE_TEXT("(%P|%t) DataLink::notify: ")
871 : ACE_TEXT("notify pub %C %C.\n"),
872 : OPENDDS_STRING(converter).c_str(),
873 : connection_notice_as_str(notice)));
874 0 : }
875 0 : AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
876 0 : if (local_it == assoc_by_local_.end()) {
877 0 : if (Transport_debug_level) {
878 0 : GuidConverter converter(itr->first);
879 0 : ACE_DEBUG((LM_DEBUG,
880 : ACE_TEXT("(%P|%t) DataLink::notify: ")
881 : ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
882 : OPENDDS_STRING(converter).c_str(),
883 : connection_notice_as_str(notice)));
884 0 : }
885 0 : break;
886 : }
887 0 : const RepoIdSet& rids = local_it->second.associated_;
888 :
889 0 : ReaderIdSeq subids;
890 0 : set_to_seq(rids, subids);
891 :
892 0 : switch (notice) {
893 0 : case DISCONNECTED:
894 0 : tsl->notify_publication_disconnected(subids);
895 0 : break;
896 :
897 0 : case RECONNECTED:
898 0 : tsl->notify_publication_reconnected(subids);
899 0 : break;
900 :
901 0 : case LOST:
902 0 : tsl->notify_publication_lost(subids);
903 0 : break;
904 :
905 0 : default:
906 0 : ACE_ERROR((LM_ERROR,
907 : ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
908 : ACE_TEXT("unknown notice to TransportSendListener\n")));
909 0 : break;
910 : }
911 :
912 0 : } else {
913 0 : if (Transport_debug_level > 0) {
914 0 : GuidConverter converter(itr->first);
915 0 : ACE_DEBUG((LM_DEBUG,
916 : ACE_TEXT("(%P|%t) DataLink::notify: ")
917 : ACE_TEXT("not notify pub %C %C\n"),
918 : OPENDDS_STRING(converter).c_str(),
919 : connection_notice_as_str(notice)));
920 0 : }
921 : }
922 0 : }
923 :
924 : // Notify the datareaders registered with TransportImpl
925 : // the lost subscriptions due to a connection problem.
926 0 : for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
927 0 : itr != recv_listeners_.end(); ++itr) {
928 :
929 0 : TransportReceiveListener_rch trl = itr->second.lock();
930 :
931 0 : if (trl) {
932 0 : if (Transport_debug_level > 0) {
933 0 : GuidConverter converter(itr->first);
934 0 : ACE_DEBUG((LM_DEBUG,
935 : ACE_TEXT("(%P|%t) DataLink::notify: ")
936 : ACE_TEXT("notify sub %C %C.\n"),
937 : OPENDDS_STRING(converter).c_str(),
938 : connection_notice_as_str(notice)));
939 0 : }
940 0 : AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
941 0 : if (local_it == assoc_by_local_.end()) {
942 0 : if (Transport_debug_level) {
943 0 : GuidConverter converter(itr->first);
944 0 : ACE_DEBUG((LM_DEBUG,
945 : ACE_TEXT("(%P|%t) DataLink::notify: ")
946 : ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
947 : OPENDDS_STRING(converter).c_str(),
948 : connection_notice_as_str(notice)));
949 0 : }
950 0 : break;
951 : }
952 0 : const RepoIdSet& rids = local_it->second.associated_;
953 :
954 0 : WriterIdSeq pubids;
955 0 : set_to_seq(rids, pubids);
956 :
957 0 : switch (notice) {
958 0 : case DISCONNECTED:
959 0 : trl->notify_subscription_disconnected(pubids);
960 0 : break;
961 :
962 0 : case RECONNECTED:
963 0 : trl->notify_subscription_reconnected(pubids);
964 0 : break;
965 :
966 0 : case LOST:
967 0 : trl->notify_subscription_lost(pubids);
968 0 : break;
969 :
970 0 : default:
971 0 : ACE_ERROR((LM_ERROR,
972 : ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
973 : ACE_TEXT("unknown notice to datareader.\n")));
974 0 : break;
975 : }
976 :
977 0 : } else {
978 0 : if (Transport_debug_level > 0) {
979 0 : GuidConverter converter(itr->first);
980 0 : ACE_DEBUG((LM_DEBUG,
981 : ACE_TEXT("(%P|%t) DataLink::notify: ")
982 : ACE_TEXT("not notify sub %C subscription lost.\n"),
983 : OPENDDS_STRING(converter).c_str()));
984 0 : }
985 :
986 : }
987 0 : }
988 0 : }
989 :
990 :
991 :
992 : void
993 0 : DataLink::pre_stop_i()
994 : {
995 0 : if (this->thr_per_con_send_task_ != 0) {
996 0 : this->thr_per_con_send_task_->close(1);
997 : }
998 0 : }
999 :
1000 : void
1001 0 : DataLink::release_resources()
1002 : {
1003 : DBG_ENTRY_LVL("DataLink", "release_resources", 6);
1004 :
1005 0 : this->prepare_release();
1006 0 : TransportImpl_rch impl = impl_.lock();
1007 0 : if (impl) {
1008 0 : impl->release_link_resources(this);
1009 : }
1010 0 : }
1011 :
1012 : bool
1013 0 : DataLink::is_target(const GUID_t& remote_id)
1014 : {
1015 0 : GuardType guard(this->pub_sub_maps_lock_);
1016 0 : return assoc_by_remote_.count(remote_id);
1017 0 : }
1018 :
1019 : GUIDSeq*
1020 0 : DataLink::target_intersection(const GUID_t& pub_id, const GUIDSeq& in,
1021 : size_t& n_subs)
1022 : {
1023 0 : GUIDSeq_var res;
1024 0 : GuardType guard(this->pub_sub_maps_lock_);
1025 0 : AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
1026 :
1027 0 : if (iter != assoc_by_local_.end()) {
1028 0 : n_subs = iter->second.associated_.size();
1029 0 : const CORBA::ULong len = in.length();
1030 :
1031 0 : for (CORBA::ULong i(0); i < len; ++i) {
1032 0 : if (iter->second.associated_.count(in[i])) {
1033 0 : if (res.ptr() == 0) {
1034 0 : res = new GUIDSeq;
1035 : }
1036 :
1037 0 : push_back(res.inout(), in[i]);
1038 : }
1039 : }
1040 : }
1041 :
1042 0 : return res._retn();
1043 0 : }
1044 :
1045 0 : void DataLink::prepare_release()
1046 : {
1047 0 : GuardType guard(this->pub_sub_maps_lock_);
1048 :
1049 0 : if (!assoc_releasing_.empty()) {
1050 0 : ACE_ERROR((LM_ERROR,
1051 : ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
1052 : ACE_TEXT("already prepared for release.\n")));
1053 0 : return;
1054 : }
1055 :
1056 0 : assoc_releasing_ = assoc_by_local_;
1057 0 : }
1058 :
1059 0 : void DataLink::clear_associations()
1060 : {
1061 0 : for (AssocByLocal::iterator iter = assoc_releasing_.begin();
1062 0 : iter != assoc_releasing_.end(); ++iter) {
1063 0 : TransportSendListener_rch tsl = send_listener_for(iter->first);
1064 0 : if (tsl) {
1065 0 : ReaderIdSeq sub_ids;
1066 0 : set_to_seq(iter->second.associated_, sub_ids);
1067 0 : tsl->remove_associations(sub_ids, false);
1068 0 : continue;
1069 0 : }
1070 0 : TransportReceiveListener_rch trl = recv_listener_for(iter->first);
1071 0 : if (trl) {
1072 0 : WriterIdSeq pub_ids;
1073 0 : set_to_seq(iter->second.associated_, pub_ids);
1074 0 : trl->remove_associations(pub_ids, false);
1075 0 : }
1076 0 : }
1077 0 : assoc_releasing_.clear();
1078 0 : }
1079 :
1080 : int
1081 0 : DataLink::handle_timeout(const ACE_Time_Value& /*tv*/, const void* /*arg*/)
1082 : {
1083 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1084 :
1085 0 : if (!scheduled_to_stop_at_.is_zero()) {
1086 0 : VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
1087 : {
1088 0 : TransportImpl_rch impl = impl_.lock();
1089 0 : if (impl) {
1090 0 : impl->unbind_link(this);
1091 : }
1092 0 : }
1093 0 : if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
1094 0 : this->stop();
1095 : }
1096 : }
1097 0 : return 0;
1098 0 : }
1099 :
1100 : int
1101 0 : DataLink::handle_close(ACE_HANDLE h, ACE_Reactor_Mask m)
1102 : {
1103 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1104 :
1105 0 : if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
1106 : // Reactor is shutting down with this timer still pending.
1107 : // Take the same cleanup actions as if the timeout had expired.
1108 0 : handle_timeout(ACE_Time_Value::zero, 0);
1109 : }
1110 :
1111 0 : return 0;
1112 0 : }
1113 :
1114 : void
1115 0 : DataLink::set_dscp_codepoint(int cp, ACE_SOCK& socket)
1116 : {
1117 : /**
1118 : * The following IPV6 code was lifted in spirit from the RTCORBA
1119 : * implementation of setting the DiffServ codepoint.
1120 : */
1121 0 : int result = 0;
1122 :
1123 : // Shift the code point up to bits, so that we only use the DS field
1124 0 : int tos = cp << 2;
1125 :
1126 0 : const char* which = "IPV4 TOS";
1127 : #if defined (ACE_HAS_IPV6)
1128 : ACE_INET_Addr local_address;
1129 :
1130 : if (socket.get_local_addr(local_address) == -1) {
1131 : return;
1132 :
1133 : } else if (local_address.get_type() == AF_INET6)
1134 : #if !defined (IPV6_TCLASS)
1135 : {
1136 : if (DCPS_debug_level > 0) {
1137 : ACE_ERROR((LM_ERROR,
1138 : ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
1139 : ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
1140 : cp));
1141 : }
1142 :
1143 : return;
1144 : }
1145 :
1146 : #else /* IPV6_TCLASS */
1147 : {
1148 : which = "IPV6 TCLASS";
1149 : result = socket.set_option(
1150 : IPPROTO_IPV6,
1151 : IPV6_TCLASS,
1152 : &tos,
1153 : sizeof(tos));
1154 :
1155 : } else // This is a bit tricky and might be hard to follow...
1156 :
1157 : #endif /* IPV6_TCLASS */
1158 : #endif /* ACE_HAS_IPV6 */
1159 :
1160 : #ifdef IP_TOS
1161 0 : result = socket.set_option(
1162 : IPPROTO_IP,
1163 : IP_TOS,
1164 : &tos,
1165 : sizeof(tos));
1166 :
1167 0 : if ((result == -1) && (errno != ENOTSUP)
1168 : #ifdef WSAEINVAL
1169 : && (errno != WSAEINVAL)
1170 : #endif /* WSAINVAL */
1171 : ) {
1172 : #endif /* IP_TOS */
1173 0 : ACE_DEBUG((LM_DEBUG,
1174 : ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
1175 : ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
1176 : ACE_TEXT("try running as superuser.\n"),
1177 : which,
1178 : cp));
1179 : #ifdef IP_TOS
1180 0 : } else if (DCPS_debug_level > 4) {
1181 0 : ACE_DEBUG((LM_DEBUG,
1182 : ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
1183 : ACE_TEXT("set %C codepoint to %d.\n"),
1184 : which,
1185 : cp));
1186 : }
1187 : #endif /* IP_TOS */
1188 0 : }
1189 :
1190 : bool
1191 0 : DataLink::handle_send_request_ack(TransportQueueElement* element)
1192 : {
1193 0 : element->data_delivered();
1194 0 : return true;
1195 : }
1196 :
1197 : bool
1198 0 : DataLink::Interceptor::reactor_is_shut_down() const {
1199 0 : return false;
1200 : }
1201 :
1202 : void
1203 0 : DataLink::ImmediateStart::execute() {
1204 0 : TransportClient_rch client_lock = client_.lock();
1205 0 : if (client_lock) {
1206 0 : client_lock->use_datalink(remote_, link_);
1207 : }
1208 0 : }
1209 :
1210 :
1211 : void
1212 0 : DataLink::network_change() const
1213 : {
1214 0 : IdToSendListenerMap send_listeners;
1215 0 : IdToRecvListenerMap recv_listeners;
1216 : {
1217 0 : GuardType guard(pub_sub_maps_lock_);
1218 0 : send_listeners = send_listeners_;
1219 0 : recv_listeners = recv_listeners_;
1220 0 : }
1221 0 : for (IdToSendListenerMap::const_iterator itr = send_listeners.begin();
1222 0 : itr != send_listeners.end(); ++itr) {
1223 0 : TransportSendListener_rch tsl = itr->second.lock();
1224 0 : if (tsl) {
1225 0 : tsl->transport_discovery_change();
1226 : }
1227 0 : }
1228 :
1229 0 : for (IdToRecvListenerMap::const_iterator itr = recv_listeners.begin();
1230 0 : itr != recv_listeners.end(); ++itr) {
1231 0 : TransportReceiveListener_rch trl = itr->second.lock();
1232 0 : if (trl) {
1233 0 : trl->transport_discovery_change();
1234 : }
1235 0 : }
1236 0 : }
1237 :
1238 : void
1239 0 : DataLink::replay_durable_data(const GUID_t& local_pub_id, const GUID_t& remote_sub_id) const
1240 : {
1241 0 : GuidConverter local(local_pub_id);
1242 0 : GuidConverter remote(remote_sub_id);
1243 0 : TransportSendListener_rch send_listener = send_listener_for(local_pub_id);
1244 0 : if (send_listener) {
1245 0 : send_listener->replay_durable_data_for(remote_sub_id);
1246 : }
1247 0 : }
1248 :
1249 : #ifndef OPENDDS_SAFETY_PROFILE
1250 : std::ostream&
1251 0 : operator<<(std::ostream& str, const DataLink& value)
1252 : {
1253 0 : str << " There are " << value.assoc_by_local_.size()
1254 0 : << " local entities currently using this link";
1255 :
1256 0 : if (!value.assoc_by_local_.empty()) {
1257 0 : str << " comprising following associations:";
1258 : }
1259 0 : str << std::endl;
1260 :
1261 : typedef DataLink::AssocByLocal::const_iterator assoc_iter_t;
1262 0 : const DataLink::AssocByLocal& abl = value.assoc_by_local_;
1263 0 : for (assoc_iter_t ait = abl.begin(); ait != abl.end(); ++ait) {
1264 0 : const RepoIdSet& set = ait->second.associated_;
1265 0 : for (RepoIdSet::const_iterator rit = set.begin(); rit != set.end(); ++rit) {
1266 0 : str << GuidConverter(ait->first) << " --> "
1267 0 : << GuidConverter(*rit) << " " << std::endl;
1268 : }
1269 : }
1270 0 : return str;
1271 : }
1272 : #endif
1273 :
1274 : void
1275 0 : DataLink::terminate_send_if_suspended()
1276 : {
1277 0 : TransportSendStrategy_rch strategy = get_send_strategy();
1278 :
1279 0 : if (strategy) {
1280 0 : strategy->terminate_send_if_suspended();
1281 : }
1282 0 : }
1283 :
1284 : }
1285 : }
1286 :
1287 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|