OpenDDS  Snapshot(2023/04/28-20:55)
DataLink.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 #include "DataLink.h"
10 
11 #include "ReceivedDataSample.h"
12 
13 #include "TransportImpl.h"
14 #include "TransportInst.h"
15 #include "TransportClient.h"
16 
20 #include "dds/DCPS/GuidConverter.h"
21 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
22 #include "dds/DCPS/Util.h"
23 #include "dds/DCPS/Definitions.h"
25 
26 #include "EntryExit.h"
27 #include "tao/debug.h"
28 #include "ace/Reactor.h"
29 #include "ace/SOCK.h"
30 
31 
32 #if !defined (__ACE_INLINE__)
33 #include "DataLink.inl"
34 #endif /* __ACE_INLINE__ */
35 
37 
38 namespace OpenDDS {
39 namespace DCPS {
40 
41 /// Only called by our TransportImpl object.
42 DataLink::DataLink(const TransportImpl_rch& impl, Priority priority, bool is_loopback,
43  bool is_active)
44  : stopped_(false),
45  impl_(impl),
46  transport_priority_(priority),
47  scheduling_release_(false),
48  is_loopback_(is_loopback),
49  is_active_(is_active),
50  started_(false),
51  send_response_listener_("DataLink"),
52  interceptor_(impl->reactor(), impl->reactor_owner())
53 {
54  DBG_ENTRY_LVL("DataLink", "DataLink", 6);
55 
57 
59  size_t control_chunks = TransportInst::DEFAULT_DATALINK_CONTROL_CHUNKS;
60 
61  TransportInst_rch cfg = impl->config();
62  if (cfg) {
63  datalink_release_delay = cfg->datalink_release_delay_;
64  if (cfg->thread_per_connection_) {
66 
67  if (thr_per_con_send_task_->open() == -1) {
69  ACE_TEXT("(%P|%t) DataLink::DataLink: ")
70  ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
71 
72  } else if (DCPS_debug_level > 4) {
74  ACE_TEXT("(%P|%t) DataLink::DataLink - ")
75  ACE_TEXT("started new thread to send data with.\n")));
76  }
77  }
78  control_chunks = cfg->datalink_control_chunks_;
79  }
80 
81  // Initialize transport control sample allocators:
82  datalink_release_delay_ = TimeDuration::from_msec(datalink_release_delay);
83 
84  this->mb_allocator_.reset(new MessageBlockAllocator(control_chunks));
85  this->db_allocator_.reset(new DataBlockAllocator(control_chunks));
86 }
87 
89 {
90  DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
91 
92  if (!assoc_by_local_.empty()) {
94  ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
95  ACE_TEXT("link still in use by %d entities when deleted!\n"),
96  this, assoc_by_local_.size()));
97  }
98 
99  if (this->thr_per_con_send_task_ != 0) {
100  this->thr_per_con_send_task_->close(1);
101  }
102 }
103 
106 {
107  return impl_.lock();
108 }
109 
110 bool
112 {
113  const DataLink_rch link(this, inc_count());
114 
115  TransportClient_rch client_lock = client.lock();
116  const GUID_t client_id = client_lock ? client_lock->get_guid() : GUID_UNKNOWN;
117 
118  GuardType guard(strategy_lock_);
119 
120  if (client_lock) {
121  PendingOnStartsMap::iterator it = pending_on_starts_.find(remote);
122  if (it != pending_on_starts_.end()) {
123  RepoIdSet::iterator it2 = it->second.find(client_id);
124  if (it2 != it->second.end()) {
125  it->second.erase(it2);
126  if (it->second.empty()) {
127  pending_on_starts_.erase(it);
128  }
129  guard.release();
130  interceptor_.execute_or_enqueue(make_rch<ImmediateStart>(link, client, remote));
131  } else {
132  on_start_callbacks_[remote][client_id] = client;
133  }
134  } else {
135  on_start_callbacks_[remote][client_id] = client;
136  }
137  }
138 
139  if (started_ && !send_strategy_.is_nil()) {
140  return false; // link already started
141  }
142  return true;
143 }
144 
145 void
147 {
148  GuardType guard(strategy_lock_);
149 
150  OnStartCallbackMap::iterator oit = on_start_callbacks_.find(remote);
151  if (oit != on_start_callbacks_.end()) {
152  RepoToClientMap::iterator oit2 = oit->second.find(local);
153  if (oit2 != oit->second.end()) {
154  oit->second.erase(oit2);
155  if (oit->second.empty()) {
156  on_start_callbacks_.erase(oit);
157  }
158  }
159  }
160  PendingOnStartsMap::iterator pit = pending_on_starts_.find(remote);
161  if (pit != pending_on_starts_.end()) {
162  RepoIdSet::iterator pit2 = pit->second.find(local);
163  if (pit2 != pit->second.end()) {
164  pit->second.erase(pit2);
165  if (pit->second.empty()) {
166  pending_on_starts_.erase(pit);
167  }
168  }
169  }
170 }
171 
172 void
174 {
175  TransportClient_rch client_lock = client.lock();
176  if (client_lock) {
177  const GUID_t id = client_lock->get_guid();
178 
179  GuardType guard(strategy_lock_);
180  OnStartCallbackMap::iterator it = on_start_callbacks_.find(remote);
181  if (it != on_start_callbacks_.end()) {
182  RepoToClientMap::iterator it2 = it->second.find(id);
183  if (it2 != it->second.end()) {
184  it->second.erase(it2);
185  if (it->second.empty()) {
186  on_start_callbacks_.erase(it);
187  }
188  }
189  }
190  }
191 }
192 
193 void
195 {
196  const DataLink_rch link(success ? this : 0, inc_count());
197 
198  while (true) {
199  GuardType guard(strategy_lock_);
200 
201  if (on_start_callbacks_.empty()) {
202  break;
203  }
204 
205  GUID_t remote = GUID_UNKNOWN;
206  TransportClient_wrch client;
207  OnStartCallbackMap::iterator it = on_start_callbacks_.begin();
208  if (it != on_start_callbacks_.end()) {
209  remote = it->first;
210  RepoToClientMap::iterator it2 = it->second.begin();
211  if (it2 != it->second.end()) {
212  client = it2->second;
213  it->second.erase(it2);
214  if (it->second.empty()) {
215  on_start_callbacks_.erase(it);
216  }
217  }
218  }
219 
220  guard.release();
221  if (success) {
222  TransportClient_rch client_lock = client.lock();
223  if (client_lock) {
224  client_lock->use_datalink(remote, link);
225  }
226  }
227  }
228 }
229 
230 bool DataLink::invoke_on_start_callbacks(const GUID_t& local, const GUID_t& remote, bool success)
231 {
232  const DataLink_rch link(success ? this : 0, inc_count());
233 
234  TransportClient_wrch client;
235  bool made_callback = false;
236 
237  {
238  GuardType guard(strategy_lock_);
239 
240  OnStartCallbackMap::iterator it = on_start_callbacks_.find(remote);
241  if (it != on_start_callbacks_.end()) {
242  RepoToClientMap::iterator it2 = it->second.find(local);
243  if (it2 != it->second.end()) {
244  client = it2->second;
245  it->second.erase(it2);
246  if (it->second.empty()) {
247  on_start_callbacks_.erase(it);
248  }
249  } else {
250  pending_on_starts_[remote].insert(local);
251  }
252  } else {
253  pending_on_starts_[remote].insert(local);
254  }
255  }
256 
257  if (success) {
258  TransportClient_rch client_lock = client.lock();
259  if (client_lock) {
260  client_lock->use_datalink(remote, link);
261  made_callback = true;
262  }
263  }
264 
265  return made_callback;
266 }
267 
268 //Reactor invokes this after being notified in schedule_stop or cancel_release
269 int
270 DataLink::handle_exception(ACE_HANDLE /* fd */)
271 {
272  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
273 
276  if (DCPS_debug_level > 0) {
278  ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
279  }
280  TransportImpl_rch impl = impl_.lock();
281  if (impl) {
283  if (reactor && reactor->cancel_timer(this) > 0) {
284  if (DCPS_debug_level > 0) {
286  ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
287  }
288  }
289  }
290  return 0;
291  } else if (scheduled_to_stop_at_ <= now) {
292  if (this->scheduling_release_) {
293  if (DCPS_debug_level > 0) {
295  ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
296  }
298  return 0;
299  }
300  if (DCPS_debug_level > 0) {
302  ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
303  }
304  this->stop();
305  return 0;
306  } else /* SCHEDULE TO STOP IN THE FUTURE*/ {
307  if (DCPS_debug_level > 0) {
309  ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
310  }
311  TransportImpl_rch impl = impl_.lock();
312  if (impl) {
314  const TimeDuration future_release_time = scheduled_to_stop_at_ - now;
315  reactor->schedule_timer(this, 0, future_release_time.value());
316  }
317  }
318  return 0;
319 }
320 
321 //Allows DataLink::stop to be done on the reactor thread so that
322 //this thread avoids possibly deadlocking trying to access reactor
323 //to stop strategies or schedule timers
324 void
325 DataLink::schedule_stop(const MonotonicTimePoint& schedule_to_stop_at)
326 {
328  this->scheduled_to_stop_at_ = schedule_to_stop_at;
329  notify_reactor();
330  // reactor will invoke our DataLink::handle_exception()
331  } else {
332  if (DCPS_debug_level > 0) {
334  ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
335  }
336  }
337 }
338 
339 void
341 {
342  TransportImpl_rch impl = impl_.lock();
343  if (impl) {
344  ReactorTask_rch rt(impl->reactor_task());
345  if (rt) {
346  ACE_Reactor* reactor = rt->get_reactor();
347  if (reactor) {
348  reactor->notify(this);
349  }
350  }
351  }
352 }
353 
354 void
356 {
357  pre_stop_i();
358 
359  TransportSendStrategy_rch send_strategy;
360  TransportStrategy_rch recv_strategy;
361 
362  {
363  GuardType guard(strategy_lock_);
364 
365  if (stopped_) return;
366 
367  send_strategy = send_strategy_;
369 
370  recv_strategy = receive_strategy_;
372  }
373 
374  if (!send_strategy.is_nil()) {
375  send_strategy->stop();
376  }
377 
378  if (!recv_strategy.is_nil()) {
379  recv_strategy->stop();
380  }
381 
382  stop_i();
383  stopped_ = true;
385 }
386 
387 void
389 {
391 
392  if (strategy && strategy->isDirectMode()) {
393  strategy->resume_send();
394  }
395 }
396 
397 int
398 DataLink::make_reservation(const GUID_t& remote_subscription_id,
399  const GUID_t& local_publication_id,
400  const TransportSendListener_wrch& send_listener,
401  bool reliable)
402 {
403  DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
404 
405  if (DCPS_debug_level > 9) {
406  LogGuid local_log(local_publication_id), remote_log(remote_subscription_id);
408  ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
409  ACE_TEXT("creating association local publication %C ")
410  ACE_TEXT("<--> with remote subscription %C.\n"),
411  local_log .c_str(),
412  remote_log.c_str()));
413  }
414 
416 
417  if (strategy) {
418  strategy->link_released(false);
419  }
420 
421  {
423 
424  LocalAssociationInfo& info = assoc_by_local_[local_publication_id];
425  info.reliable_ = reliable;
426  info.associated_.insert(remote_subscription_id);
427  ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
428 
429  if (rls.is_nil())
430  rls = make_rch<ReceiveListenerSet>();
431  rls->insert(local_publication_id, TransportReceiveListener_rch());
432 
433  send_listeners_.insert(std::make_pair(local_publication_id, send_listener));
434  }
435  return 0;
436 }
437 
438 int
439 DataLink::make_reservation(const GUID_t& remote_publication_id,
440  const GUID_t& local_subscription_id,
441  const TransportReceiveListener_wrch& receive_listener,
442  bool reliable)
443 {
444  DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
445 
446  if (DCPS_debug_level > 9) {
447  LogGuid local(local_subscription_id), remote(remote_publication_id);
449  ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
450  ACE_TEXT("creating association local subscription %C ")
451  ACE_TEXT("<--> with remote publication %C.\n"),
452  local.c_str(), remote.c_str()));
453  }
454 
456 
457  if (strategy) {
458  strategy->link_released(false);
459  }
460 
461  {
463 
464  LocalAssociationInfo& info = assoc_by_local_[local_subscription_id];
465  info.reliable_ = reliable;
466  info.associated_.insert(remote_publication_id);
467  ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
468 
469  if (rls.is_nil())
470  rls = make_rch<ReceiveListenerSet>();
471  rls->insert(local_subscription_id, receive_listener);
472 
473  recv_listeners_.insert(std::make_pair(local_subscription_id,
474  receive_listener));
475  }
476  return 0;
477 }
478 
479 template <typename Seq>
480 void set_to_seq(const RepoIdSet& rids, Seq& seq)
481 {
482  seq.length(static_cast<CORBA::ULong>(rids.size()));
483  CORBA::ULong i = 0;
484  for (RepoIdSet::const_iterator iter = rids.begin(); iter != rids.end(); ++iter) {
485  seq[i++] = *iter;
486  }
487 }
488 
489 GUIDSeq*
490 DataLink::peer_ids(const GUID_t& local_id) const
491 {
493 
494  const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
495 
496  if (iter == assoc_by_local_.end())
497  return 0;
498 
499  GUIDSeq_var result = new GUIDSeq;
500  set_to_seq(iter->second.associated_, static_cast<GUIDSeq&>(result));
501  return result._retn();
502 }
503 
504 /// This gets invoked when a TransportClient::remove_associations()
505 /// call has been made. Because this DataLink can be shared amongst
506 /// different TransportClient objects, and different threads could
507 /// be "managing" the different TransportClient objects, we need
508 /// to make sure that this release_reservations() works in conjunction
509 /// with a simultaneous call (in another thread) to one of this
510 /// DataLink's make_reservation() methods.
511 void
513  DataLinkSetMap& released_locals)
514 {
515  DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
516 
517  if (DCPS_debug_level > 9) {
518  GuidConverter local(local_id);
519  GuidConverter remote(remote_id);
521  ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
522  ACE_TEXT("releasing association local: %C ")
523  ACE_TEXT("<--> with remote %C.\n"),
524  OPENDDS_STRING(local).c_str(),
525  OPENDDS_STRING(remote).c_str()));
526  }
527 
528  remove_startup_callbacks(local_id, remote_id);
529 
530  //let the specific class release its reservations
531  //done this way to prevent deadlock of holding pub_sub_maps_lock_
532  //then obtaining a specific class lock in release_reservations_i
533  //which reverses lock ordering of the active send logic of needing
534  //the specific class lock before obtaining the over arching DataLink
535  //pub_sub_maps_lock_
536  this->release_reservations_i(remote_id, local_id);
537 
538  bool release_remote_required = false;
539  {
540  GuardType guard(this->pub_sub_maps_lock_);
541 
542  if (this->stopped_) return;
543 
544  ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
545  if (rls->size() == 1) {
546  assoc_by_remote_.erase(remote_id);
547  release_remote_required = true;
548  } else {
549  rls->remove(local_id);
550  }
551  RepoIdSet& ris = assoc_by_local_[local_id].associated_;
552  if (ris.size() == 1) {
553  DataLinkSet_rch& links = released_locals[local_id];
554  if (links.is_nil()) {
555  links = make_rch<DataLinkSet>();
556  }
557  links->insert_link(rchandle_from(this));
558  assoc_by_local_.erase(local_id);
559  } else {
560  ris.erase(remote_id);
561  }
562 
563  if (assoc_by_local_.empty()) {
565  ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
566  ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
567 
568  guard.release();
569  TransportImpl_rch impl = impl_.lock();
570  if (impl) {
571  impl->release_datalink(this);
572  }
573  }
574  }
575  if (release_remote_required) {
576  release_remote_i(remote_id);
577  }
578 }
579 
580 void
582 {
583  DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
584 
585  VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
586 
587  // The samples have to be removed at this point, otherwise the samples
588  // can not be delivered when new association is added and still use
589  // this connection/datalink.
591 
592  if (strategy) {
594  }
595 
597  schedule_stop(future_release_time);
598 }
599 
600 bool
602 {
603  DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
604  if (stopped_) {
605  if (DCPS_debug_level > 0) {
606  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
607  }
608  return false;
609  }
610  if (scheduling_release_) {
611  if (DCPS_debug_level > 0) {
612  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
613  }
614  this->set_scheduling_release(false);
616  notify_reactor();
617  }
618  return true;
619 }
620 
621 void
623 {
624  DBG_ENTRY_LVL("DataLink", "stop_i", 6);
625 }
626 
628 DataLink::create_control(char submessage_id,
630  Message_Block_Ptr data)
631 {
632  DBG_ENTRY_LVL("DataLink", "create_control", 6);
633 
636  header.submessage_id_ = submessage_id;
637  header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
638 
639  ACE_Message_Block* message = 0;
640  ACE_NEW_MALLOC_RETURN(message,
641  static_cast<ACE_Message_Block*>(
642  this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
645  data.release(),
646  0, // data
647  0, // allocator_strategy
648  0, // locking_strategy
652  this->db_allocator_.get(),
653  this->mb_allocator_.get()),
654  0);
655 
656  if (!(*message << header)) {
658  ACE_TEXT("(%P|%t) DataLink::create_control: ")
659  ACE_TEXT("cannot put header in message\n")));
660  ACE_DES_FREE(message, this->mb_allocator_->free, ACE_Message_Block);
661  message = 0;
662  }
663 
664  return message;
665 }
666 
669 {
670  DBG_ENTRY_LVL("DataLink", "send_control", 6);
671 
672  TransportSendControlElement* const elem = new TransportSendControlElement(1, // initial_count
674  header, move(message));
675 
677 
678  GUID_t senderId(header.publication_id_);
679  send_start();
680  send(elem);
681  send_stop(senderId);
682 
683  return SEND_CONTROL_OK;
684 }
685 
686 /// This method will "deliver" the sample to all TransportReceiveListeners
687 /// within this DataLink that are interested in the (remote) publisher id
688 /// that sent the sample.
689 int
691  const GUID_t& readerId /* = GUID_UNKNOWN */)
692 {
694  return 0;
695 }
696 
697 void
699 {
701 }
702 
703 void
705  const GUID_t& readerId,
706  const RepoIdSet& incl_excl,
708 {
709  DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
710  // Which remote publication sent this message?
711  const GUID_t& publication_id = sample.header_.publication_id_;
712 
713  // Locate the set of TransportReceiveListeners associated with this
714  // DataLink that are interested in hearing about any samples received
715  // from the remote publisher_id.
716  if (DCPS_debug_level > 9) {
717  const GuidConverter converter(publication_id);
718  const GuidConverter reader(readerId);
720  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
721  ACE_TEXT("from publication %C received sample: %C to readerId %C (%C).\n"),
722  OPENDDS_STRING(converter).c_str(),
723  to_string(sample.header_).c_str(),
724  OPENDDS_STRING(reader).c_str(),
725  constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
726  }
727 
728  if (Transport_debug_level > 9) {
729  const GuidConverter converter(publication_id);
731  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
732  ACE_TEXT("from publication %C received sample: %C.\n"),
733  OPENDDS_STRING(converter).c_str(),
734  to_string(sample.header_).c_str()));
735  }
736 
737  ReceiveListenerSet_rch listener_set;
739  {
740  GuardType guard(this->pub_sub_maps_lock_);
741  AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
742  if (iter != assoc_by_remote_.end()) {
743  listener_set = iter->second;
744  } else {
745  listener = this->default_listener_.lock();
746  }
747  }
748 
749  if (listener_set.is_nil()) {
750  if (listener) {
751  listener->data_received(sample);
752  } else {
753  // Nobody has any interest in this message. Drop it on the floor.
754  if (Transport_debug_level > 4) {
755  const GuidConverter converter(publication_id);
757  ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
758  ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
759  OPENDDS_STRING(converter).c_str()));
760  }
761  }
762  return;
763  }
764 
765  if (readerId != GUID_UNKNOWN) {
766  listener_set->data_received(sample, readerId);
767  return;
768  }
769 
770 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
771 
772  if (sample.header_.content_filter_
773  && sample.header_.content_filter_entries_.length()) {
774  ReceiveListenerSet subset(*listener_set.in());
776  subset.data_received(sample, incl_excl, constrain);
777 
778  } else {
779 #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
780 
781  if (DCPS_debug_level > 9) {
782  // Just get the set to do our dirty work by having it iterate over its
783  // collection of TransportReceiveListeners, and invoke the data_received()
784  // method on each one.
785  OPENDDS_STRING included_ids;
786  bool first = true;
787  RepoIdSet::const_iterator iter = incl_excl.begin();
788  while(iter != incl_excl.end()) {
789  included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
790  first = false;
791  ++iter;
792  }
793  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n",
794  constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
795  }
796  listener_set->data_received(sample, incl_excl, constrain);
797 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
798  }
799 
800 #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
801 }
802 
803 // static
806 {
807  static ACE_UINT64 next_id = 0;
808  static LockType lock;
809 
810  ACE_UINT64 id;
811  {
812  GuardType guard(lock);
813  id = next_id++;
814 
815  if (0 == next_id) {
817  ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
818  ACE_TEXT("has rolled over and is reusing ids!\n")));
819  }
820  }
821 
822  return id;
823 }
824 
825 void
827 {
828  DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
829 
830  //this->cancel_release();
831  this->set_scheduling_release(false);
833 
834  {
835  TransportImpl_rch impl = impl_.lock();
836  if (impl) {
838  reactor->cancel_timer(this);
839  }
840  }
841  this->stop();
842  // this->send_listeners_.clear();
843  // this->recv_listeners_.clear();
844  // Drop our reference to the TransportImpl object
845 }
846 
847 void
849 {
850  DBG_ENTRY_LVL("DataLink", "notify", 6);
851 
852  VDBG((LM_DEBUG,
853  ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
854  this,
855  connection_notice_as_str(notice)));
856 
857  GuardType guard(this->pub_sub_maps_lock_);
858 
859  // Notify the datawriters
860  // the lost publications due to a connection problem.
861  for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
862  itr != send_listeners_.end(); ++itr) {
863 
864  TransportSendListener_rch tsl = itr->second.lock();
865 
866  if (tsl) {
867  if (Transport_debug_level > 0) {
868  GuidConverter converter(itr->first);
870  ACE_TEXT("(%P|%t) DataLink::notify: ")
871  ACE_TEXT("notify pub %C %C.\n"),
872  OPENDDS_STRING(converter).c_str(),
873  connection_notice_as_str(notice)));
874  }
875  AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
876  if (local_it == assoc_by_local_.end()) {
877  if (Transport_debug_level) {
878  GuidConverter converter(itr->first);
880  ACE_TEXT("(%P|%t) DataLink::notify: ")
881  ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
882  OPENDDS_STRING(converter).c_str(),
883  connection_notice_as_str(notice)));
884  }
885  break;
886  }
887  const RepoIdSet& rids = local_it->second.associated_;
888 
889  ReaderIdSeq subids;
890  set_to_seq(rids, subids);
891 
892  switch (notice) {
893  case DISCONNECTED:
894  tsl->notify_publication_disconnected(subids);
895  break;
896 
897  case RECONNECTED:
898  tsl->notify_publication_reconnected(subids);
899  break;
900 
901  case LOST:
902  tsl->notify_publication_lost(subids);
903  break;
904 
905  default:
907  ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
908  ACE_TEXT("unknown notice to TransportSendListener\n")));
909  break;
910  }
911 
912  } else {
913  if (Transport_debug_level > 0) {
914  GuidConverter converter(itr->first);
916  ACE_TEXT("(%P|%t) DataLink::notify: ")
917  ACE_TEXT("not notify pub %C %C\n"),
918  OPENDDS_STRING(converter).c_str(),
919  connection_notice_as_str(notice)));
920  }
921  }
922  }
923 
924  // Notify the datareaders registered with TransportImpl
925  // the lost subscriptions due to a connection problem.
926  for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
927  itr != recv_listeners_.end(); ++itr) {
928 
929  TransportReceiveListener_rch trl = itr->second.lock();
930 
931  if (trl) {
932  if (Transport_debug_level > 0) {
933  GuidConverter converter(itr->first);
935  ACE_TEXT("(%P|%t) DataLink::notify: ")
936  ACE_TEXT("notify sub %C %C.\n"),
937  OPENDDS_STRING(converter).c_str(),
938  connection_notice_as_str(notice)));
939  }
940  AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
941  if (local_it == assoc_by_local_.end()) {
942  if (Transport_debug_level) {
943  GuidConverter converter(itr->first);
945  ACE_TEXT("(%P|%t) DataLink::notify: ")
946  ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
947  OPENDDS_STRING(converter).c_str(),
948  connection_notice_as_str(notice)));
949  }
950  break;
951  }
952  const RepoIdSet& rids = local_it->second.associated_;
953 
954  WriterIdSeq pubids;
955  set_to_seq(rids, pubids);
956 
957  switch (notice) {
958  case DISCONNECTED:
960  break;
961 
962  case RECONNECTED:
963  trl->notify_subscription_reconnected(pubids);
964  break;
965 
966  case LOST:
967  trl->notify_subscription_lost(pubids);
968  break;
969 
970  default:
972  ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
973  ACE_TEXT("unknown notice to datareader.\n")));
974  break;
975  }
976 
977  } else {
978  if (Transport_debug_level > 0) {
979  GuidConverter converter(itr->first);
981  ACE_TEXT("(%P|%t) DataLink::notify: ")
982  ACE_TEXT("not notify sub %C subscription lost.\n"),
983  OPENDDS_STRING(converter).c_str()));
984  }
985 
986  }
987  }
988 }
989 
990 
991 
992 void
994 {
995  if (this->thr_per_con_send_task_ != 0) {
996  this->thr_per_con_send_task_->close(1);
997  }
998 }
999 
1000 void
1002 {
1003  DBG_ENTRY_LVL("DataLink", "release_resources", 6);
1004 
1005  this->prepare_release();
1006  TransportImpl_rch impl = impl_.lock();
1007  if (impl) {
1008  impl->release_link_resources(this);
1009  }
1010 }
1011 
1012 bool
1013 DataLink::is_target(const GUID_t& remote_id)
1014 {
1015  GuardType guard(this->pub_sub_maps_lock_);
1016  return assoc_by_remote_.count(remote_id);
1017 }
1018 
1019 GUIDSeq*
1021  size_t& n_subs)
1022 {
1023  GUIDSeq_var res;
1024  GuardType guard(this->pub_sub_maps_lock_);
1025  AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
1026 
1027  if (iter != assoc_by_local_.end()) {
1028  n_subs = iter->second.associated_.size();
1029  const CORBA::ULong len = in.length();
1030 
1031  for (CORBA::ULong i(0); i < len; ++i) {
1032  if (iter->second.associated_.count(in[i])) {
1033  if (res.ptr() == 0) {
1034  res = new GUIDSeq;
1035  }
1036 
1037  push_back(res.inout(), in[i]);
1038  }
1039  }
1040  }
1041 
1042  return res._retn();
1043 }
1044 
1046 {
1047  GuardType guard(this->pub_sub_maps_lock_);
1048 
1049  if (!assoc_releasing_.empty()) {
1051  ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
1052  ACE_TEXT("already prepared for release.\n")));
1053  return;
1054  }
1055 
1057 }
1058 
1060 {
1061  for (AssocByLocal::iterator iter = assoc_releasing_.begin();
1062  iter != assoc_releasing_.end(); ++iter) {
1063  TransportSendListener_rch tsl = send_listener_for(iter->first);
1064  if (tsl) {
1065  ReaderIdSeq sub_ids;
1066  set_to_seq(iter->second.associated_, sub_ids);
1067  tsl->remove_associations(sub_ids, false);
1068  continue;
1069  }
1071  if (trl) {
1072  WriterIdSeq pub_ids;
1073  set_to_seq(iter->second.associated_, pub_ids);
1074  trl->remove_associations(pub_ids, false);
1075  }
1076  }
1077  assoc_releasing_.clear();
1078 }
1079 
1080 int
1081 DataLink::handle_timeout(const ACE_Time_Value& /*tv*/, const void* /*arg*/)
1082 {
1083  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1084 
1085  if (!scheduled_to_stop_at_.is_zero()) {
1086  VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
1087  {
1088  TransportImpl_rch impl = impl_.lock();
1089  if (impl) {
1090  impl->unbind_link(this);
1091  }
1092  }
1093  if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
1094  this->stop();
1095  }
1096  }
1097  return 0;
1098 }
1099 
1100 int
1102 {
1103  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1104 
1105  if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
1106  // Reactor is shutting down with this timer still pending.
1107  // Take the same cleanup actions as if the timeout had expired.
1109  }
1110 
1111  return 0;
1112 }
1113 
1114 void
1116 {
1117  /**
1118  * The following IPV6 code was lifted in spirit from the RTCORBA
1119  * implementation of setting the DiffServ codepoint.
1120  */
1121  int result = 0;
1122 
1123  // Shift the code point up to bits, so that we only use the DS field
1124  int tos = cp << 2;
1125 
1126  const char* which = "IPV4 TOS";
1127 #if defined (ACE_HAS_IPV6)
1128  ACE_INET_Addr local_address;
1129 
1130  if (socket.get_local_addr(local_address) == -1) {
1131  return;
1132 
1133  } else if (local_address.get_type() == AF_INET6)
1134 #if !defined (IPV6_TCLASS)
1135  {
1136  if (DCPS_debug_level > 0) {
1138  ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
1139  ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
1140  cp));
1141  }
1142 
1143  return;
1144  }
1145 
1146 #else /* IPV6_TCLASS */
1147  {
1148  which = "IPV6 TCLASS";
1149  result = socket.set_option(
1150  IPPROTO_IPV6,
1151  IPV6_TCLASS,
1152  &tos,
1153  sizeof(tos));
1154 
1155  } else // This is a bit tricky and might be hard to follow...
1156 
1157 #endif /* IPV6_TCLASS */
1158 #endif /* ACE_HAS_IPV6 */
1159 
1160 #ifdef IP_TOS
1161  result = socket.set_option(
1162  IPPROTO_IP,
1163  IP_TOS,
1164  &tos,
1165  sizeof(tos));
1166 
1167  if ((result == -1) && (errno != ENOTSUP)
1168 #ifdef WSAEINVAL
1169  && (errno != WSAEINVAL)
1170 #endif /* WSAINVAL */
1171  ) {
1172 #endif /* IP_TOS */
1174  ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
1175  ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
1176  ACE_TEXT("try running as superuser.\n"),
1177  which,
1178  cp));
1179 #ifdef IP_TOS
1180  } else if (DCPS_debug_level > 4) {
1182  ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
1183  ACE_TEXT("set %C codepoint to %d.\n"),
1184  which,
1185  cp));
1186  }
1187 #endif /* IP_TOS */
1188 }
1189 
1190 bool
1192 {
1193  element->data_delivered();
1194  return true;
1195 }
1196 
1197 bool
1199  return false;
1200 }
1201 
1202 void
1204  TransportClient_rch client_lock = client_.lock();
1205  if (client_lock) {
1206  client_lock->use_datalink(remote_, link_);
1207  }
1208 }
1209 
1210 
1211 void
1213 {
1214  IdToSendListenerMap send_listeners;
1215  IdToRecvListenerMap recv_listeners;
1216  {
1218  send_listeners = send_listeners_;
1219  recv_listeners = recv_listeners_;
1220  }
1221  for (IdToSendListenerMap::const_iterator itr = send_listeners.begin();
1222  itr != send_listeners.end(); ++itr) {
1223  TransportSendListener_rch tsl = itr->second.lock();
1224  if (tsl) {
1225  tsl->transport_discovery_change();
1226  }
1227  }
1228 
1229  for (IdToRecvListenerMap::const_iterator itr = recv_listeners.begin();
1230  itr != recv_listeners.end(); ++itr) {
1231  TransportReceiveListener_rch trl = itr->second.lock();
1232  if (trl) {
1234  }
1235  }
1236 }
1237 
1238 void
1239 DataLink::replay_durable_data(const GUID_t& local_pub_id, const GUID_t& remote_sub_id) const
1240 {
1241  GuidConverter local(local_pub_id);
1242  GuidConverter remote(remote_sub_id);
1243  TransportSendListener_rch send_listener = send_listener_for(local_pub_id);
1244  if (send_listener) {
1245  send_listener->replay_durable_data_for(remote_sub_id);
1246  }
1247 }
1248 
1249 #ifndef OPENDDS_SAFETY_PROFILE
1250 std::ostream&
1251 operator<<(std::ostream& str, const DataLink& value)
1252 {
1253  str << " There are " << value.assoc_by_local_.size()
1254  << " local entities currently using this link";
1255 
1256  if (!value.assoc_by_local_.empty()) {
1257  str << " comprising following associations:";
1258  }
1259  str << std::endl;
1260 
1261  typedef DataLink::AssocByLocal::const_iterator assoc_iter_t;
1262  const DataLink::AssocByLocal& abl = value.assoc_by_local_;
1263  for (assoc_iter_t ait = abl.begin(); ait != abl.end(); ++ait) {
1264  const RepoIdSet& set = ait->second.associated_;
1265  for (RepoIdSet::const_iterator rit = set.begin(); rit != set.end(); ++rit) {
1266  str << GuidConverter(ait->first) << " --> "
1267  << GuidConverter(*rit) << " " << std::endl;
1268  }
1269  }
1270  return str;
1271 }
1272 #endif
1273 
1274 void
1276 {
1278 
1279  if (strategy) {
1280  strategy->terminate_send_if_suspended();
1281  }
1282 }
1283 
1284 }
1285 }
1286 
DataSampleHeader header_
The demarshalled sample header.
#define ACE_DEBUG(X)
IdToSendListenerMap send_listeners_
Definition: DataLink.h:392
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
const TimeDuration & datalink_release_delay() const
Definition: DataLink.inl:62
const LogLevel::Value value
Definition: debug.cpp:61
static const ACE_Time_Value max_time
#define ENOTSUP
char message_id_
The enum MessageId.
void remove_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
Definition: DataLink.cpp:173
unsigned long ACE_Reactor_Mask
Interceptor interceptor_
Definition: DataLink.h:469
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
static const size_t DEFAULT_DATALINK_CONTROL_CHUNKS
Definition: TransportInst.h:68
void data_received_include(ReceivedDataSample &sample, const RepoIdSet &incl)
Definition: DataLink.cpp:698
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
int handle_timeout(const ACE_Time_Value &tv, const void *arg)
Definition: DataLink.cpp:1081
ACE_Message_Block * create_control(char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:628
void set_scheduling_release(bool scheduling_release)
Definition: DataLink.inl:161
GuidSet RepoIdSet
Definition: GuidUtils.h:113
const char * c_str() const
TransportSendListener_rch send_listener_for(const GUID_t &pub_id) const
Definition: DataLink.inl:324
TransportSendStrategy_rch get_send_strategy()
Definition: DataLink.inl:374
void terminate_send_if_suspended()
Definition: DataLink.cpp:1275
void schedule_stop(const MonotonicTimePoint &schedule_to_stop_at)
Definition: DataLink.cpp:325
CommandPtr execute_or_enqueue(CommandPtr command)
static ACE_UINT64 get_next_datalink_id()
Used to provide unique Ids to all DataLink methods.
Definition: DataLink.cpp:805
virtual void stop_i()
Definition: DataLink.cpp:622
const ACE_Time_Value & value() const
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
RcHandle< TransportReceiveListener > TransportReceiveListener_rch
#define ACE_CDR_BYTE_ORDER
int release(void)
int set_option(int level, int option, void *optval, int optlen) const
int get_type(void) const
void send_stop(GUID_t repoId)
Definition: DataLink.inl:135
void remove_startup_callbacks(const GUID_t &local, const GUID_t &remote)
Definition: DataLink.cpp:146
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
void clear(SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
bool is_target(const GUID_t &remote_id)
Definition: DataLink.cpp:1013
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define OPENDDS_STRING
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
sequence< GUID_t > ReaderIdSeq
AssocByLocal assoc_releasing_
Definition: DataLink.h:428
virtual void remove_associations(const WriterIdSeq &pubids, bool notify)=0
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
LM_DEBUG
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
friend class ThreadPerConnectionSendTask
Definition: DataLink.h:326
int notify(ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask masks=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
static const long DEFAULT_DATALINK_RELEASE_DELAY
Definition: TransportInst.h:67
void send(TransportQueueElement *element)
Definition: DataLink.inl:94
ACE_CDR::ULong ULong
virtual void notify_subscription_disconnected(const WriterIdSeq &pubids)=0
void notify(ConnectionNotice notice)
Definition: DataLink.cpp:848
static TimeDuration from_msec(const ACE_UINT64 &ms)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
#define VDBG(DBG_ARGS)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
virtual void data_received(const ReceivedDataSample &sample)=0
TransportReceiveListener_wrch default_listener_
Definition: DataLink.h:401
void set_dscp_codepoint(int cp, ACE_SOCK &socket)
Definition: DataLink.cpp:1115
PendingOnStartsMap pending_on_starts_
Definition: DataLink.h:449
friend OpenDDS_Dcps_Export std::ostream & operator<<(std::ostream &str, const DataLink &value)
Convenience function for diagnostic information.
Definition: DataLink.cpp:1251
ACE_SYNCH_MUTEX LockType
Definition: DataLink.h:377
void replay_durable_data(const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
Definition: DataLink.cpp:1239
int get_local_addr(ACE_Addr &) const
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
sequence< GUID_t > WriterIdSeq
size_t total_length(void) const
DataLinkIdType id() const
Obtain a unique identifier for this DataLink object.
Definition: DataLink.inl:205
TimeDuration datalink_release_delay_
Definition: DataLink.h:453
LM_WARNING
bool add_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
Definition: DataLink.cpp:111
int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m)
Definition: DataLink.cpp:1101
GUIDSeq * peer_ids(const GUID_t &local_id) const
Definition: DataLink.cpp:490
ACE_Reactor_Timer_Interface * timer() const
Interface to the transport&#39;s reactor for scheduling timers.
void set_to_seq(const RepoIdSet &rids, Seq &seq)
Definition: DataLink.cpp:480
virtual ACE_Reactor * reactor(void) const
virtual void release_remote_i(const GUID_t &)
Definition: DataLink.h:366
GUIDSeq * target_intersection(const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
Definition: DataLink.cpp:1020
char submessage_id_
Implementation-specific sub-message Ids.
#define IPPROTO_IP
ACE_TEXT("TCP_Factory")
unsigned long long ACE_UINT64
ReactorTask_rch reactor_task()
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
bool release_link_resources(DataLink *link)
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
ACE_CDR::Long Priority
const char * connection_notice_as_str(ConnectionNotice notice)
Helper function to output the enum as a string to help debugging.
Definition: DataLink.inl:267
SendResponseListener send_response_listener_
Listener for TransportSendControlElements created in send_control.
Definition: DataLink.h:467
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void stop()
The stop method is used to stop the DataLink prior to shutdown.
Definition: DataLink.cpp:355
virtual void unbind_link(DataLink *link)
Remove any pending_release mappings.
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447
static const ACE_Time_Value zero
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
#define VDBG_LVL(DBG_ARGS, LEVEL)
void network_change() const
Definition: DataLink.cpp:1212
TransportReceiveListener_rch recv_listener_for(const GUID_t &sub_id) const
Definition: DataLink.inl:338
unique_ptr< MessageBlockAllocator > mb_allocator_
Definition: DataLink.h:457
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:668
AssocByRemote assoc_by_remote_
Definition: DataLink.h:406
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual void pre_stop_i()
Definition: DataLink.cpp:993
IdToRecvListenerMap recv_listeners_
Definition: DataLink.h:396
virtual void release_reservations_i(const GUID_t &, const GUID_t &)
Definition: DataLink.h:367
ACE_UINT64 id_
The id for this DataLink.
Definition: DataLink.h:420
const char * to_string(MessageId value)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
RcHandle< T > lock() const
Definition: RcObject.h:188
DataLink(const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
Only called by our TransportImpl object.
Definition: DataLink.cpp:42
MonotonicTimePoint scheduled_to_stop_at_
Definition: DataLink.h:388
virtual void notify_subscription_reconnected(const WriterIdSeq &pubids)=0
virtual void notify_subscription_lost(const WriterIdSeq &pubids)=0
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
void remove_all(const GUIDSeq &to_remove)
#define TheServiceParticipant
int handle_exception(ACE_HANDLE)
Reactor invokes this after being notified in schedule_stop or cancel_release.
Definition: DataLink.cpp:270
void release_reservations(GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
Definition: DataLink.cpp:512
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
void data_received_i(ReceivedDataSample &sample, const GUID_t &readerId, const RepoIdSet &incl_excl, ReceiveListenerSet::ConstrainReceiveSet constrain)
Definition: DataLink.cpp:704
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
Base wrapper class around a data/control sample to be sent.
unique_ptr< DataBlockAllocator > db_allocator_
Definition: DataLink.h:458
SendControlStatus
Return code type for send_control() operations.
virtual void release_datalink(DataLink *link)=0
TransportInst_rch config() const
virtual bool handle_send_request_ack(TransportQueueElement *element)
Definition: DataLink.cpp:1191
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194