OpenDDS  Snapshot(2023/04/28-20:55)
RtpsUdpDataLink.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "RtpsUdpDataLink.h"
7 
8 #include "RtpsUdpTransport.h"
9 #include "RtpsUdpInst.h"
10 #include "RtpsUdpSendStrategy.h"
11 #include "RtpsUdpReceiveStrategy.h"
12 
13 #include <dds/DCPS/LogAddr.h>
14 #include <dds/DCPS/Definitions.h>
15 #include <dds/DCPS/Util.h>
16 #include <dds/DCPS/Logging.h>
18 #include <dds/DCPS/Qos_Helper.h>
25 #ifdef OPENDDS_SECURITY
27 #endif
28 
29 #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h>
30 #include <dds/DdsDcpsCoreTypeSupportImpl.h>
31 
32 #include <ace/Default_Constants.h>
33 #include <ace/Log_Msg.h>
34 #include <ace/Message_Block.h>
35 #include <ace/Reactor.h>
36 
37 #include <string.h>
38 
39 #ifndef __ACE_INLINE__
40 # include "RtpsUdpDataLink.inl"
41 #endif /* __ACE_INLINE__ */
42 
43 namespace {
44 
45 bool compare_and_update_counts(CORBA::Long incoming, CORBA::Long& existing) {
46  static const CORBA::Long ONE_QUARTER_MAX_POSITIVE = 0x20000000;
47  static const CORBA::Long THREE_QUARTER_MAX_POSITIVE = 0x60000000;
48  if (incoming <= existing &&
49  !(incoming < ONE_QUARTER_MAX_POSITIVE && existing > THREE_QUARTER_MAX_POSITIVE)) {
50  return false;
51  }
52  existing = incoming;
53  return true;
54 }
55 
56 }
57 
59 
60 namespace OpenDDS {
61 namespace DCPS {
62 
65 
66 const size_t ONE_SAMPLE_PER_PACKET = 1;
67 
69  const GuidPrefix_t& local_prefix,
70  const RtpsUdpInst_rch& config,
71  const ReactorTask_rch& reactor_task,
72  InternalTransportStatistics& transport_statistics,
73  ACE_Thread_Mutex& transport_statistics_mutex)
74  : DataLink(transport, // 3 data link "attributes", below, are unused
75  0, // priority
76  false, // is_loopback
77  false) // is_active
78  , reactor_task_(reactor_task)
79  , job_queue_(make_rch<JobQueue>(reactor_task->get_reactor()))
80  , event_dispatcher_(transport->event_dispatcher())
81  , mb_allocator_(TheServiceParticipant->association_chunk_multiplier())
82  , db_allocator_(TheServiceParticipant->association_chunk_multiplier())
83  , custom_allocator_(TheServiceParticipant->association_chunk_multiplier() * config->anticipated_fragments_, RtpsSampleHeader::FRAG_SIZE)
84  , bundle_allocator_(TheServiceParticipant->association_chunk_multiplier(), config->max_message_size_)
85  , db_lock_pool_(new DataBlockLockPool(static_cast<unsigned long>(TheServiceParticipant->n_chunks())))
86  , multi_buff_(this, config->nak_depth_)
87  , fsq_vec_size_(0)
88  , harvest_send_queue_sporadic_(make_rch<SporadicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::harvest_send_queue)))
89  , flush_send_queue_sporadic_(make_rch<SporadicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::flush_send_queue)))
90  , best_effort_heartbeat_count_(0)
91  , heartbeat_(make_rch<PeriodicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::send_heartbeats)))
92  , heartbeatchecker_(make_rch<PeriodicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::check_heartbeats)))
93  , max_bundle_size_(config->max_message_size_ - RTPS::RTPSHDR_SZ) // default maximum bundled message size is max udp message size (see TransportStrategy) minus RTPS header
94  , transport_statistics_(transport_statistics)
95  , transport_statistics_mutex_(transport_statistics_mutex)
96 #ifdef OPENDDS_SECURITY
97  , security_config_(Security::SecurityRegistry::instance()->default_config())
98  , local_crypto_handle_(DDS::HANDLE_NIL)
99  , ice_agent_(ICE::Agent::instance())
100 #endif
101  , network_interface_address_reader_(make_rch<InternalDataReader<NetworkInterfaceAddress> >(DCPS::DataReaderQosBuilder().reliability_reliable().durability_transient_local(), rchandle_from(this)))
102 {
103 #ifdef OPENDDS_SECURITY
104  const GUID_t guid = make_id(local_prefix, ENTITYID_PARTICIPANT);
105  handle_registry_ = security_config_->get_handle_registry(guid);
106 #endif
107 
108  send_strategy_ = make_rch<RtpsUdpSendStrategy>(this, local_prefix);
109  receive_strategy_ = make_rch<RtpsUdpReceiveStrategy>(this, local_prefix, ref(TheServiceParticipant->get_thread_status_manager()));
110  assign(local_prefix_, local_prefix);
111 
112  this->job_queue(job_queue_);
113 }
114 
116 {
118  flush_send_queue_sporadic_->cancel();
119 }
120 
123 {
125 }
126 
127 bool
129 {
131  RtpsWriter_rch writer;
132  RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
133  if (iter != writers_.end()) {
134  writer = iter->second;
135  }
136 
137  g.release();
138 
139  if (writer) {
140  writer->add_elem_awaiting_ack(element);
141  return true;
142  }
143  return false;
144 }
145 
148 {
149  GUID_t pub_id = sample->get_pub_id();
150 
152  RtpsWriter_rch writer;
153  RtpsWriterMap::iterator iter = writers_.find(pub_id);
154  if (iter != writers_.end()) {
155  writer = iter->second;
156  }
157 
158  g.release();
159 
160  if (writer) {
161  return writer->remove_sample(sample);
162  }
163  return REMOVE_NOT_FOUND;
164 }
165 
167 {
169  RtpsWriter_rch writer;
170  RtpsWriterMap::iterator iter = writers_.find(pub_id);
171  if (iter != writers_.end()) {
172  writer = iter->second;
173  }
174 
175  g.release();
176 
177  if (writer) {
178  writer->remove_all_msgs();
179  }
180 }
181 
184 {
185  bool found = false;
186  SequenceNumber to_release;
187  TransportQueueElement* tqe = 0;
188 
189  const SequenceNumber& seq = sample->get_header().sequence_;
190  const char* const payload = sample->get_sample()->cont()->rd_ptr();
191  const TransportQueueElement::MatchOnDataPayload modp(payload);
192  SingleSendBuffer::BufferVec removed;
193 
195 
196  RtpsUdpDataLink_rch link = link_.lock();
197  if (!link) {
198  return REMOVE_NOT_FOUND;
199  }
200 
202  {
203  GuardType guard(link->strategy_lock_);
204  if (link->send_strategy_) {
207  result = link->send_strategy_->remove_sample(sample);
208  guard.release();
209  }
210  }
211 
212  ACE_Guard<ACE_Thread_Mutex> g2(elems_not_acked_mutex_);
213 
214  if (!elems_not_acked_.empty()) {
215  typedef SnToTqeMap::iterator iter_t;
216  for (std::pair<iter_t, iter_t> er = elems_not_acked_.equal_range(seq); er.first != er.second; ++er.first) {
217  if (modp.matches(*er.first->second)) {
218  found = true;
219  to_release = seq;
220  tqe = er.first->second;
221  elems_not_acked_.erase(er.first);
222  break;
223  }
224  }
225  }
226 
227  g2.release();
228 
229  if (found) {
230  send_buff_->remove_acked(to_release, removed);
231  }
232 
233  g.release();
234 
235  if (found) {
236  for (size_t i = 0; i < removed.size(); ++i) {
237  RemoveAllVisitor visitor;
238  removed[i].first->accept_remove_visitor(visitor);
239  delete removed[i].first;
240  removed[i].second->release();
241  }
242  removed.clear();
243  tqe->data_dropped(true);
244  result = REMOVE_FOUND;
245  }
246  return result;
247 }
248 
249 void
251 {
253 
254  RtpsUdpDataLink_rch link = link_.lock();
255  if (!link) {
256  return;
257  }
258 
259  send_buff_->retain_all(id_);
260 
261  {
264  GuardType guard(link->strategy_lock_);
265  if (link->send_strategy_) {
267  }
268  }
269 
270  ACE_GUARD(ACE_Thread_Mutex, g2, elems_not_acked_mutex_);
271 
272  SnToTqeMap sn_tqe_map;
273  sn_tqe_map.swap(elems_not_acked_);
274 
275  g2.release();
276 
278  typedef SnToTqeMap::iterator iter_t;
279  for (iter_t it = sn_tqe_map.begin(); it != sn_tqe_map.end(); ++it) {
280  if (it->first != prev) {
281  send_buff_->release_acked(it->first);
282  prev = it->first;
283  }
284  }
285 
286  g.release();
287 
288  for (iter_t it = sn_tqe_map.begin(); it != sn_tqe_map.end(); ++it) {
289  it->second->data_dropped(true);
290  }
291 }
292 
293 bool
295 #ifdef ACE_HAS_IPV6
296  , const ACE_SOCK_Dgram& ipv6_unicast_socket
297 #endif
298  )
299 {
301 #ifdef ACE_HAS_IPV6
302  ipv6_unicast_socket_ = ipv6_unicast_socket;
303 #endif
304 
305  RtpsUdpInst_rch cfg = config();
306  if (!cfg) {
307  return false;
308  }
309 
310  if (cfg->use_multicast_) {
311 #ifdef ACE_HAS_MAC_OSX
314 #ifdef ACE_HAS_IPV6
315  ipv6_multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
317 #endif
318 #endif
319  }
320 
321  if (cfg->use_multicast_) {
322  if (!set_socket_multicast_ttl(unicast_socket_, cfg->ttl_)) {
323  if (DCPS_debug_level > 0) {
325  ACE_TEXT("(%P|%t) ERROR: ")
326  ACE_TEXT("RtpsUdpDataLink::open: ")
327  ACE_TEXT("failed to set TTL: %d\n"),
328  cfg->ttl_));
329  }
330  return false;
331  }
332 #ifdef ACE_HAS_IPV6
333  if (!set_socket_multicast_ttl(ipv6_unicast_socket_, cfg->ttl_)) {
334  if (DCPS_debug_level > 0) {
336  ACE_TEXT("(%P|%t) ERROR: ")
337  ACE_TEXT("RtpsUdpDataLink::open: ")
338  ACE_TEXT("failed to set TTL: %d\n"),
339  cfg->ttl_));
340  }
341  return false;
342  }
343 #endif
344  }
345 
346  if (cfg->send_buffer_size_ > 0) {
347  const int snd_size = cfg->send_buffer_size_;
349  SO_SNDBUF,
350  (void *) &snd_size,
351  sizeof(snd_size)) < 0
352  && errno != ENOTSUP) {
353  if (DCPS_debug_level > 0) {
355  ACE_TEXT("(%P|%t) ERROR: ")
356  ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
357  snd_size));
358  }
359  return false;
360  }
361 #ifdef ACE_HAS_IPV6
362  if (ipv6_unicast_socket_.set_option(SOL_SOCKET,
363  SO_SNDBUF,
364  (void *) &snd_size,
365  sizeof(snd_size)) < 0
366  && errno != ENOTSUP) {
367  if (DCPS_debug_level > 0) {
369  ACE_TEXT("(%P|%t) ERROR: ")
370  ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
371  snd_size));
372  }
373  return false;
374  }
375 #endif
376  }
377 
378  if (cfg->rcv_buffer_size_ > 0) {
379  const int rcv_size = cfg->rcv_buffer_size_;
381  SO_RCVBUF,
382  (void *) &rcv_size,
383  sizeof(int)) < 0
384  && errno != ENOTSUP) {
385  if (DCPS_debug_level > 0) {
387  ACE_TEXT("(%P|%t) ERROR: ")
388  ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
389  rcv_size));
390  }
391  return false;
392  }
393 #ifdef ACE_HAS_IPV6
394  if (ipv6_unicast_socket_.set_option(SOL_SOCKET,
395  SO_RCVBUF,
396  (void *) &rcv_size,
397  sizeof(int)) < 0
398  && errno != ENOTSUP) {
399  if (DCPS_debug_level > 0) {
401  ACE_TEXT("(%P|%t) ERROR: ")
402  ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
403  rcv_size));
404  }
405  return false;
406  }
407 #endif
408  }
409 
410  send_strategy()->send_buffer(&multi_buff_);
411 
412  if (start(send_strategy_,
413  receive_strategy_, false) != 0) {
414  stop_i();
415  if (DCPS_debug_level > 0) {
417  ACE_TEXT("(%P|%t) ERROR: ")
418  ACE_TEXT("UdpDataLink::open: start failed!\n")));
419  }
420  return false;
421  }
422 
423  TheServiceParticipant->network_interface_address_topic()->connect(network_interface_address_reader_);
424 
425  return true;
426 }
427 
429 {
431  InternalSampleInfoSequence infos;
432 
434 
435  RtpsUdpInst_rch cfg = config();
436  if (!cfg || !cfg->use_multicast_) {
437  return;
438  }
439 
440  multicast_manager_.process(samples,
441  infos,
442  cfg->multicast_interface_,
443  get_reactor(),
444  receive_strategy().in(),
445  cfg->multicast_group_address(),
447 #ifdef ACE_HAS_IPV6
448  , cfg->ipv6_multicast_group_address(),
449  ipv6_multicast_socket_
450 #endif
451  );
452 
453  if (!samples.empty()) {
454  // FUTURE: This is propagating info to discovery. Write instead.
455  network_change();
456  }
457 }
458 
459 void
461 {
462  locator_cache_.remove_id(remote_id);
463  bundling_cache_.remove_id(remote_id);
464 }
465 
468 {
470  const RemoteInfoMap::const_iterator pos = locators_.find(remote_id);
471  if (pos != locators_.end()) {
472  RtpsUdpInst_rch cfg = config();
473  const TimeDuration threshold = cfg ? cfg->receive_address_duration_ : TimeDuration();
474  const bool valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= threshold;
475  return valid_last_recv_addr ? pos->second.last_recv_addr_ : NetworkAddress();
476  }
477  return NetworkAddress();
478 }
479 
480 void
482  AddrSet& unicast_addresses,
483  AddrSet& multicast_addresses,
484  bool requires_inline_qos,
485  bool add_ref)
486 {
487  if (unicast_addresses.empty() && multicast_addresses.empty()) {
488  if (DCPS_debug_level > 0) {
489  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::update_locators: no addresses for %C\n"), LogGuid(remote_id).c_str()));
490  }
491  }
492 
494 
496 
497  RemoteInfo& info = locators_[remote_id];
498  const bool log_unicast_change = DCPS_debug_level > 3 && info.unicast_addrs_ != unicast_addresses;
499  const bool log_multicast_change = DCPS_debug_level > 3 && info.multicast_addrs_ != multicast_addresses;
500  info.unicast_addrs_.swap(unicast_addresses);
501  info.multicast_addrs_.swap(multicast_addresses);
503  if (add_ref) {
504  ++info.ref_count_;
505  }
506 
507  g.release();
508 
509  if (log_unicast_change) {
510  for (AddrSet::const_iterator pos = unicast_addresses.begin(), limit = unicast_addresses.end();
511  pos != limit; ++pos) {
512  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) RtpsUdpDataLink::update_locators %C is now at %C\n"),
513  LogGuid(remote_id).c_str(), LogAddr(*pos).c_str()));
514  }
515  }
516  if (log_multicast_change) {
517  for (AddrSet::const_iterator pos = multicast_addresses.begin(), limit = multicast_addresses.end();
518  pos != limit; ++pos) {
519  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) RtpsUdpDataLink::update_locators %C is now at %C\n"),
520  LogGuid(remote_id).c_str(), LogAddr(*pos).c_str()));
521  }
522  }
523 }
524 
526 {
528  const GUID_t& writer = ds.header_.publication_id_;
529  const SequenceNumber& seq = ds.header_.sequence_;
530  WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.find(writer);
531  if (w != writer_to_seq_best_effort_readers_.end()) {
532  if (w->second.seq < seq) {
533  w->second.seq = seq;
534  selected.insert(w->second.readers.begin(), w->second.readers.end());
535  } else {
536  withheld.insert(w->second.readers.begin(), w->second.readers.end());
537  }
538  } // else the writer is not associated with best effort readers
539 }
540 
541 int
543  const GUID_t& lsi,
545  bool reliable)
546 {
548  if (reliable) {
549  RtpsReaderMap::iterator rr = readers_.find(lsi);
550  if (rr == readers_.end()) {
551  pending_reliable_readers_.insert(lsi);
552  }
553  }
554  guard.release();
555  return DataLink::make_reservation(rpi, lsi, trl, reliable);
556 }
557 
558 bool
559 RtpsUdpDataLink::associated(const GUID_t& local_id, const GUID_t& remote_id,
560  bool local_reliable, bool remote_reliable,
561  bool local_durable, bool remote_durable,
562  const MonotonicTime_t& participant_discovered_at,
563  ACE_CDR::ULong participant_flags,
564  SequenceNumber max_sn,
565  const TransportClient_rch& client,
566  AddrSet& unicast_addresses,
567  AddrSet& multicast_addresses,
568  const NetworkAddress& last_addr_hint,
569  bool requires_inline_qos)
570 {
571  sq_.ignore(local_id, remote_id);
572 
573  update_locators(remote_id, unicast_addresses, multicast_addresses, requires_inline_qos, true);
574  if (last_addr_hint) {
575  update_last_recv_addr(remote_id, last_addr_hint);
576  }
577 
578  const GuidConverter conv(local_id);
579 
580  if (!local_reliable) {
581  if (conv.isReader()) {
583  WriterToSeqReadersMap::iterator i = writer_to_seq_best_effort_readers_.find(remote_id);
584  if (i == writer_to_seq_best_effort_readers_.end()) {
585  writer_to_seq_best_effort_readers_.insert(WriterToSeqReadersMap::value_type(remote_id, SeqReaders(local_id)));
586  } else if (i->second.readers.find(local_id) == i->second.readers.end()) {
587  i->second.readers.insert(local_id);
588  }
589  }
590  return true;
591  }
592 
593  bool associated = true;
594 
595  if (conv.isWriter()) {
597  log_progress("RTPS writer/reader association", local_id, remote_id, participant_discovered_at);
598  }
599 
600  if (remote_reliable) {
602  // Insert count if not already there.
603  RtpsWriterMap::iterator rw = writers_.find(local_id);
604  if (rw == writers_.end()) {
605  RtpsUdpDataLink_rch link(this, inc_count());
606  CORBA::Long hb_start = 0;
607  CountMapType::iterator hbc_it = heartbeat_counts_.find(local_id.entityId);
608  if (hbc_it != heartbeat_counts_.end()) {
609  hb_start = hbc_it->second;
610  heartbeat_counts_.erase(hbc_it);
611  }
612  RtpsWriter_rch writer = make_rch<RtpsWriter>(client, link, local_id, local_durable,
613  max_sn, hb_start, multi_buff_.capacity());
614  rw = writers_.insert(RtpsWriterMap::value_type(local_id, writer)).first;
615  }
616  RtpsWriter_rch writer = rw->second;
617  g.release();
618  const SequenceNumber writer_max_sn = writer->update_max_sn(remote_id, max_sn);
619  writer->add_reader(make_rch<ReaderInfo>(remote_id, remote_durable, participant_discovered_at, participant_flags, writer_max_sn + 1));
620  }
621  invoke_on_start_callbacks(local_id, remote_id, true);
622  } else if (conv.isReader()) {
624  log_progress("RTPS reader/writer association", local_id, remote_id, participant_discovered_at);
625  }
626  {
627  GuardType guard(strategy_lock_);
628  if (receive_strategy()) {
629  receive_strategy()->clear_completed_fragments(remote_id);
630  }
631  }
632  if (remote_reliable) {
634  RtpsReaderMap::iterator rr = readers_.find(local_id);
635  if (rr == readers_.end()) {
636  pending_reliable_readers_.erase(local_id);
637  RtpsUdpDataLink_rch link(this, inc_count());
638  RtpsReader_rch reader = make_rch<RtpsReader>(link, local_id);
639  rr = readers_.insert(RtpsReaderMap::value_type(local_id, reader)).first;
640  }
641  RtpsReader_rch reader = rr->second;
642  readers_of_writer_.insert(RtpsReaderMultiMap::value_type(remote_id, rr->second));
643  g.release();
644  add_on_start_callback(client, remote_id);
645  reader->add_writer(make_rch<WriterInfo>(remote_id, participant_discovered_at, participant_flags));
646  associated = false;
647  } else {
648  invoke_on_start_callbacks(local_id, remote_id, true);
649  }
650  }
651 
652  return associated;
653 }
654 
655 void
657  const GUID_t& remote_id)
658 {
659  release_reservations_i(remote_id, local_id);
661  sq_.ignore_remote(remote_id);
662 
664 
665  RemoteInfoMap::iterator pos = locators_.find(remote_id);
666  if (pos != locators_.end()) {
667  OPENDDS_ASSERT(pos->second.ref_count_ > 0);
668 
669  --pos->second.ref_count_;
670  if (pos->second.ref_count_ == 0) {
671  locators_.erase(pos);
672  }
673  } else if (Transport_debug_level > 3) {
674  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::disassociated: local id %C does not have any locators\n", LogGuid(local_id).c_str()));
675  }
676 }
677 
678 void
680  const GUID_t& readerid,
681  const AddrSet& addresses,
682  DiscoveryListener* listener)
683 {
685  const bool enableheartbeat = interesting_readers_.empty();
686  interesting_readers_.insert(
687  InterestingRemoteMapType::value_type(
688  readerid,
689  InterestingRemote(writerid, addresses, listener)));
690  if (heartbeat_counts_.find(writerid.entityId) == heartbeat_counts_.end()) {
691  heartbeat_counts_[writerid.entityId] = 0;
692  }
693  g.release();
694  if (enableheartbeat) {
695  RtpsUdpInst_rch cfg = config();
696  heartbeat_->enable(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
697  }
698 }
699 
700 void
702  const GUID_t& readerid)
703 {
705  for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
706  limit = interesting_readers_.upper_bound(readerid);
707  pos != limit;
708  ) {
709  if (pos->second.localid == writerid) {
710  interesting_readers_.erase(pos++);
711  } else {
712  ++pos;
713  }
714  }
715 }
716 
717 void
719  const GUID_t& writerid,
720  const AddrSet& addresses,
721  DiscoveryListener* listener)
722 {
724  bool enableheartbeatchecker = interesting_writers_.empty();
725  interesting_writers_.insert(
726  InterestingRemoteMapType::value_type(
727  writerid,
728  InterestingRemote(readerid, addresses, listener)));
729  g.release();
730  if (enableheartbeatchecker) {
731  RtpsUdpInst_rch cfg = config();
732  heartbeatchecker_->enable(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
733  }
734 }
735 
736 void
738  const GUID_t& writerid)
739 {
741  for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
742  limit = interesting_writers_.upper_bound(writerid);
743  pos != limit;
744  ) {
745  if (pos->second.localid == readerid) {
746  interesting_writers_.erase(pos++);
747  } else {
748  ++pos;
749  }
750  }
751 }
752 
754 {
755  const GuidConverter conv(localId);
756 
757  if (conv.isReader()) {
759  RtpsReaderMap::iterator rr = readers_.find(localId);
760  if (rr != readers_.end()) {
761  for (RtpsReaderMultiMap::iterator iter = readers_of_writer_.begin();
762  iter != readers_of_writer_.end();) {
763  if (iter->second->id() == localId) {
764  readers_of_writer_.erase(iter++);
765  } else {
766  ++iter;
767  }
768  }
769 
770  RtpsReader_rch reader = rr->second;
771  readers_.erase(rr);
772  gr.release();
773 
774  reader->pre_stop_helper();
775 
776  } else {
777  for (WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.begin();
779  RepoIdSet::iterator r = w->second.readers.find(localId);
780  if (r != w->second.readers.end()) {
781  w->second.readers.erase(r);
782  if (w->second.readers.empty()) {
784  continue;
785  }
786  }
787  ++w;
788  }
789  }
790 
791  } else {
792  RtpsWriter_rch writer;
793  {
794  // Don't hold the writers lock when destroying a writer.
796  RtpsWriterMap::iterator pos = writers_.find(localId);
797  if (pos != writers_.end()) {
798  writer = pos->second;
799  writers_.erase(pos);
800  }
801  }
802 
803  if (writer) {
804  TqeVector to_drop;
805  writer->pre_stop_helper(to_drop, true);
806 
807  TqeVector::iterator drop_it = to_drop.begin();
808  while (drop_it != to_drop.end()) {
809  (*drop_it)->data_dropped(true);
810  ++drop_it;
811  }
812  writer->remove_all_msgs();
813  } else {
814  GuardType guard(strategy_lock_);
815  if (send_strategy_) {
817  }
818  }
819  }
820  sq_.ignore_local(localId);
821 }
822 
823 void
824 RtpsUdpDataLink::RtpsWriter::pre_stop_helper(TqeVector& to_drop, bool true_stop)
825 {
826  typedef SnToTqeMap::iterator iter_t;
827 
829  ACE_GUARD(ACE_Thread_Mutex, g2, elems_not_acked_mutex_);
830 
831  stopping_ = true_stop;
832 
833  if (!elems_not_acked_.empty()) {
834  OPENDDS_SET(SequenceNumber) sns_to_release;
835  iter_t iter = elems_not_acked_.begin();
836  while (iter != elems_not_acked_.end()) {
837  to_drop.push_back(iter->second);
838  sns_to_release.insert(iter->first);
839  elems_not_acked_.erase(iter);
840  iter = elems_not_acked_.begin();
841  }
842  OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
843  while (sns_it != sns_to_release.end()) {
844  send_buff_->release_acked(*sns_it);
845  ++sns_it;
846  }
847  }
848 
849  send_buff_->pre_clear();
850 
851  g2.release();
852  g.release();
853 
854  if (stopping_) {
855  heartbeat_->cancel();
856  nack_response_->cancel();
857  }
858 }
859 
860 void
862 {
863  DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
865  TqeVector to_drop;
866 
867  RtpsWriterMap writers;
868  {
870  writers_.swap(writers);
871  for (RtpsWriterMap::const_iterator it = writers.begin(); it != writers.end(); ++it) {
872  heartbeat_counts_.erase(it->first.entityId);
873  }
874  }
875 
876  RtpsWriterMap::iterator w_iter = writers.begin();
877  while (w_iter != writers.end()) {
878  w_iter->second->pre_stop_helper(to_drop, true);
879  writers.erase(w_iter++);
880  }
881 
882  TqeVector::iterator drop_it = to_drop.begin();
883  while (drop_it != to_drop.end()) {
884  (*drop_it)->data_dropped(true);
885  ++drop_it;
886  }
887 
888  RtpsReaderMap readers;
889  {
891  readers = readers_;
892  }
893 
894  RtpsReaderMap::iterator r_iter = readers.begin();
895  while (r_iter != readers.end()) {
896  r_iter->second->pre_stop_helper();
897  ++r_iter;
898  }
899 }
900 
901 void
903  const GUID_t& local_id)
904 {
905  TqeVector to_drop;
906  using std::pair;
907  const GuidConverter conv(local_id);
908  if (conv.isWriter()) {
910  RtpsWriterMap::iterator rw = writers_.find(local_id);
911 
912  if (rw != writers_.end()) {
913  RtpsWriter_rch writer = rw->second;
914  g.release();
915  writer->remove_reader(remote_id);
916  if (writer->reader_count() == 0) {
917  writer->pre_stop_helper(to_drop, false);
918  }
919  writer->process_acked_by_all();
920  }
921 
922  } else if (conv.isReader()) {
924  RtpsReaderMap::iterator rr = readers_.find(local_id);
925 
926  if (rr != readers_.end()) {
927  for (pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> iters =
928  readers_of_writer_.equal_range(remote_id);
929  iters.first != iters.second;) {
930  if (iters.first->second->id() == local_id) {
931  readers_of_writer_.erase(iters.first++);
932  } else {
933  ++iters.first;
934  }
935  }
936 
937  RtpsReader_rch reader = rr->second;
938  g.release();
939 
940  reader->remove_writer(remote_id);
941 
942  } else {
943  WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.find(remote_id);
944  if (w != writer_to_seq_best_effort_readers_.end()) {
945  RepoIdSet::iterator r = w->second.readers.find(local_id);
946  if (r != w->second.readers.end()) {
947  w->second.readers.erase(r);
948  if (w->second.readers.empty()) {
950  }
951  }
952  }
953  }
954  }
955 
957 
958  for (TqeVector::iterator drop_it = to_drop.begin(); drop_it != to_drop.end(); ++drop_it) {
959  (*drop_it)->data_dropped(true);
960  }
961 }
962 
963 void
965 {
966  TheServiceParticipant->network_interface_address_topic()->disconnect(network_interface_address_reader_);
967 
968  heartbeat_->disable();
969  heartbeatchecker_->disable();
972 #ifdef ACE_HAS_IPV6
973  ipv6_unicast_socket_.close();
974  ipv6_multicast_socket_.close();
975 #endif
976 }
977 
980 {
983 
984  const RtpsWriterMap::iterator wi = writers_.find(pub_id);
985  if (wi != writers_.end()) {
986  result = wi->second->get_send_buff();
987  }
988  return result;
989 }
990 
991 // Implementing MultiSendBuffer nested class
992 
993 void
996  ACE_Message_Block* chain)
997 {
998  // Called from TransportSendStrategy::send_packet().
999  // RtpsUdpDataLink is not locked at this point, and is only locked
1000  // to grab the appropriate writer send buffer via get_writer_send_buffer()
1001  const TransportQueueElement* const tqe = q->peek();
1002  const SequenceNumber seq = tqe->sequence();
1004  return;
1005  }
1006 
1007  const GUID_t pub_id = tqe->publication_id();
1008 
1009  RcHandle<SingleSendBuffer> send_buff = outer_->get_writer_send_buffer(pub_id);
1010  if (send_buff.is_nil()) {
1011  return;
1012  }
1013 
1014  if (Transport_debug_level > 5) {
1015  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert() - "
1016  "pub_id %C seq %q frag %d\n", LogGuid(pub_id).c_str(), seq.getValue(),
1017  (int)tqe->is_fragment()));
1018  }
1019 
1020  if (tqe->is_fragment()) {
1021  const RtpsCustomizedElement* const rce =
1022  dynamic_cast<const RtpsCustomizedElement*>(tqe);
1023  if (rce) {
1024  send_buff->insert_fragment(seq, rce->last_fragment(), rce->is_last_fragment(), q, chain);
1025  } else if (Transport_debug_level) {
1026  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert()"
1027  " - ERROR: couldn't get fragment number for pub_id %C seq %q\n",
1028  LogGuid(pub_id).c_str(), seq.getValue()));
1029  }
1030  } else {
1031  send_buff->insert(seq, q, chain);
1032  }
1033 }
1034 
1036 RtpsUdpDataLink::alloc_msgblock(size_t size, ACE_Allocator* data_allocator) {
1037  ACE_Message_Block* result;
1038  ACE_NEW_MALLOC_RETURN(result,
1039  static_cast<ACE_Message_Block*>(
1041  ACE_Message_Block(size,
1043  0, // cont
1044  0, // data
1045  data_allocator,
1046  db_lock_pool_->get_lock(), // locking_strategy
1050  &db_allocator_,
1051  &mb_allocator_),
1052  0);
1053  return result;
1054 }
1055 
1058 {
1059  // byte swapping is handled in the operator<<() implementation
1061  size_t size = 0;
1062  for (CORBA::ULong i = 0; i < subm.length(); ++i) {
1063  encoding.align(size, RTPS::SMHDR_SZ);
1064  serialized_size(encoding, size, subm[i]);
1065  }
1066 
1068 
1069  Serializer ser(hdr, encoding);
1070  for (CORBA::ULong i = 0; i < subm.length(); ++i) {
1071  ser << subm[i];
1072  ser.align_w(RTPS::SMHDR_SZ);
1073  }
1074  return hdr;
1075 }
1076 
1079  TransportQueueElement* element,
1080  bool requires_inline_qos,
1081  MetaSubmessageVec& meta_submessages,
1082  bool& deliver_after_send)
1083 {
1085 
1086  RtpsUdpDataLink_rch link = link_.lock();
1087  if (stopping_ || !link) {
1088  g.release();
1089  element->data_dropped(true);
1090  return 0;
1091  }
1092 
1093  OPENDDS_ASSERT(element->publication_id() == id_);
1094 
1095  const SequenceNumber previous_max_sn = max_sn_;
1096  RTPS::SubmessageSeq subm;
1097 
1098  const SequenceNumber seq = element->sequence();
1100  if (!element->is_fragment() || element->is_last_fragment()) {
1101  max_sn_ = std::max(max_sn_, seq);
1102  }
1103  if (!durable_ && !is_pvs_writer() &&
1104  element->subscription_id() == GUID_UNKNOWN &&
1105  previous_max_sn < max_sn_.previous()) {
1106  add_gap_submsg_i(subm, previous_max_sn + 1);
1107  }
1108  }
1109 
1110  make_leader_lagger(element->subscription_id(), previous_max_sn);
1111  check_leader_lagger();
1112 
1113  TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
1115  dynamic_cast<TransportCustomizedElement*>(element);
1117  dynamic_cast<TransportSendControlElement*>(element);
1118 
1119  Message_Block_Ptr data;
1120  bool durable = false;
1121 
1122  const ACE_Message_Block* msg = element->msg();
1123  const GUID_t pub_id = element->publication_id();
1124 
1125  // Based on the type of 'element', find and duplicate the data payload
1126  // continuation block.
1127  if (tsce) { // Control message
1129  data.reset(msg->cont()->duplicate());
1130  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1132  subm, *tsce, requires_inline_qos);
1133  record_directed(element->subscription_id(), seq);
1134  } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
1135  end_historic_samples_i(tsce->header(), msg->cont(), meta_submessages);
1136  g.release();
1137  element->data_delivered();
1138  return 0;
1139  } else if (tsce->header().message_id_ == REQUEST_ACK) {
1140  request_ack_i(tsce->header(), msg->cont(), meta_submessages);
1141  deliver_after_send = true;
1142  return 0;
1143  } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
1144  send_heartbeats_manual_i(meta_submessages);
1145  deliver_after_send = true;
1146  return 0;
1147  } else {
1148  g.release();
1149  element->data_dropped(true /*dropped_by_transport*/);
1150  return 0;
1151  }
1152 
1153  } else if (tse) { // Basic data message
1154  // {DataSampleHeader} -> {Data Payload}
1155  data.reset(msg->cont()->duplicate());
1156  const DataSampleElement* dsle = tse->sample();
1157  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1159  subm, *dsle, requires_inline_qos);
1160  record_directed(element->subscription_id(), seq);
1161  durable = dsle->get_header().historic_sample_;
1162 
1163  } else if (tce) { // Customized data message
1164  // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
1165  data.reset(msg->cont()->cont()->duplicate());
1166  const DataSampleElement* dsle = tce->original_send_element()->sample();
1167  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1169  subm, *dsle, requires_inline_qos);
1170  record_directed(element->subscription_id(), seq);
1171  durable = dsle->get_header().historic_sample_;
1172 
1173  } else {
1174  send_buff_->pre_insert(seq);
1175  return element;
1176  }
1177 
1178 #ifdef OPENDDS_SECURITY
1179  {
1180  GuardType guard(link->strategy_lock_);
1181  if (link->send_strategy_) {
1182  link->send_strategy()->encode_payload(pub_id, data, subm);
1183  }
1184  }
1185 #endif
1186 
1187  if (stopping_) {
1188  g.release();
1189  element->data_dropped(true);
1190  return 0;
1191  }
1192 
1194  link->send_strategy()->append_submessages(subm);
1195  }
1196 
1197  Message_Block_Ptr hdr(link->submsgs_to_msgblock(subm));
1198  hdr->cont(data.release());
1199  RtpsCustomizedElement* rtps =
1200  new RtpsCustomizedElement(element, move(hdr));
1201 
1202  // Handle durability resends
1203  if (durable) {
1204  const GUID_t sub = element->subscription_id();
1205  if (sub != GUID_UNKNOWN) {
1206  ReaderInfoMap::iterator ri = remote_readers_.find(sub);
1207  if (ri != remote_readers_.end()) {
1208  ri->second->durable_data_[rtps->sequence()] = rtps;
1209  ri->second->durable_timestamp_.set_to_now();
1210  if (Transport_debug_level > 3) {
1211  const LogGuid conv(pub_id), sub_conv(sub);
1213  "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
1214  "storing durable data for local %C remote %C seq %q\n",
1215  conv.c_str(), sub_conv.c_str(),
1216  rtps->sequence().getValue()));
1217  }
1218  return 0;
1219  }
1220  }
1221  }
1222 
1223  send_buff_->pre_insert(seq);
1224  return rtps;
1225 }
1226 
1229  TransportQueueElement* element,
1230  bool requires_inline_qos,
1231  MetaSubmessageVec& meta_submessages,
1232  bool& deliver_after_send,
1234 {
1235  RTPS::SubmessageSeq subm;
1236 
1237  TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
1239  dynamic_cast<TransportCustomizedElement*>(element);
1241  dynamic_cast<TransportSendControlElement*>(element);
1242 
1243  Message_Block_Ptr data;
1244 
1245  const ACE_Message_Block* msg = element->msg();
1246 
1247  // Based on the type of 'element', find and duplicate the data payload
1248  // continuation block.
1249  if (tsce) { // Control message
1251  data.reset(msg->cont()->duplicate());
1252  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1254  subm, *tsce, requires_inline_qos);
1255  } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
1256  send_heartbeats_manual_i(tsce, meta_submessages);
1257  deliver_after_send = true;
1258  return 0;
1259  } else {
1260  guard.release();
1261  element->data_dropped(true /*dropped_by_transport*/);
1262  return 0;
1263  }
1264 
1265  } else if (tse) { // Basic data message
1266  // {DataSampleHeader} -> {Data Payload}
1267  data.reset(msg->cont()->duplicate());
1268  const DataSampleElement* dsle = tse->sample();
1269  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1271  subm, *dsle, requires_inline_qos);
1272 
1273  } else if (tce) { // Customized data message
1274  // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
1275  data.reset(msg->cont()->cont()->duplicate());
1276  const DataSampleElement* dsle = tce->original_send_element()->sample();
1277  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1279  subm, *dsle, requires_inline_qos);
1280 
1281  } else {
1282  return element;
1283  }
1284 
1285 #ifdef OPENDDS_SECURITY
1286  const GUID_t pub_id = element->publication_id();
1287 
1288  {
1289  GuardType guard(strategy_lock_);
1290  if (send_strategy_) {
1291  send_strategy()->encode_payload(pub_id, data, subm);
1292  }
1293  }
1294 #endif
1295 
1297  send_strategy()->append_submessages(subm);
1298  }
1299 
1301  hdr->cont(data.release());
1302  return new RtpsCustomizedElement(element, move(hdr));
1303 }
1304 
1307 {
1308  const ACE_Message_Block* msg = element->msg();
1309  if (!msg) {
1310  return element;
1311  }
1312 
1313  const GUID_t pub_id = element->publication_id();
1314  GUIDSeq_var peers = peer_ids(pub_id);
1315 
1316  bool require_iq = requires_inline_qos(peers);
1317 
1319 
1320  const RtpsWriterMap::iterator rw = writers_.find(pub_id);
1321  MetaSubmessageVec meta_submessages;
1322  RtpsWriter_rch writer;
1323  TransportQueueElement* result;
1324  bool deliver_after_send = false;
1325  if (rw != writers_.end()) {
1326  writer = rw->second;
1327  guard.release();
1328  result = writer->customize_queue_element_helper(element, require_iq, meta_submessages, deliver_after_send);
1329  } else {
1330  guard.release();
1331  result = customize_queue_element_non_reliable_i(element, require_iq, meta_submessages, deliver_after_send, guard);
1332  }
1333 
1334  queue_submessages(meta_submessages);
1335 
1336  if (deliver_after_send) {
1337  element->data_delivered();
1338  }
1339 
1340  return result;
1341 }
1342 
1343 void
1346  MetaSubmessageVec& meta_submessages)
1347 {
1348  // Set the ReaderInfo::durable_timestamp_ for the case where no
1349  // durable samples exist in the DataWriter.
1350  if (durable_) {
1352  GUID_t sub = GUID_UNKNOWN;
1353  if (body && header.message_length_ >= sizeof(sub)) {
1354  std::memcpy(&sub, body->rd_ptr(), sizeof(sub));
1355  }
1356  typedef ReaderInfoMap::iterator iter_t;
1357  if (sub == GUID_UNKNOWN) {
1358  if (Transport_debug_level > 3) {
1359  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
1360  "local %C all readers\n", LogGuid(id_).c_str()));
1361  }
1362  for (iter_t iter = remote_readers_.begin();
1363  iter != remote_readers_.end(); ++iter) {
1364  if (iter->second->durable_) {
1365  iter->second->durable_timestamp_ = now;
1367  log_progress("durable data queued", id_, iter->first, iter->second->participant_discovered_at_);
1368  }
1369  }
1370  }
1371  } else {
1372  iter_t iter = remote_readers_.find(sub);
1373  if (iter != remote_readers_.end()) {
1374  if (iter->second->durable_) {
1375  iter->second->durable_timestamp_ = now;
1377  log_progress("durable data queued", id_, iter->first, iter->second->participant_discovered_at_);
1378  }
1379  const SingleSendBuffer::Proxy proxy(*send_buff_);
1380  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
1381  initialize_heartbeat(proxy, meta_submessage);
1382  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, iter->second);
1383  if (Transport_debug_level > 3) {
1384  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
1385  " local %C remote %C\n", LogGuid(id_).c_str(), LogGuid(sub).c_str()));
1386  }
1387  }
1388  }
1389  }
1390  }
1391 }
1392 
1393 void
1396  MetaSubmessageVec& meta_submessages)
1397 {
1398  // Set the ReaderInfo::durable_timestamp_ for the case where no
1399  // durable samples exist in the DataWriter.
1400  GUID_t sub = GUID_UNKNOWN;
1401  if (body && header.message_length_ >= sizeof(sub)) {
1402  std::memcpy(&sub, body->rd_ptr(), sizeof(sub));
1403  }
1404  typedef ReaderInfoMap::iterator iter_t;
1405  if (sub == GUID_UNKNOWN) {
1406  gather_heartbeats_i(meta_submessages);
1407  if (Transport_debug_level > 3) {
1408  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::request_ack "
1409  "local %C all readers\n", LogGuid(id_).c_str()));
1410  }
1411  } else {
1412  iter_t iter = remote_readers_.find(sub);
1413  if (iter != remote_readers_.end()) {
1414  const SingleSendBuffer::Proxy proxy(*send_buff_);
1415  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
1416  initialize_heartbeat(proxy, meta_submessage);
1417  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, iter->second);
1418  if (Transport_debug_level > 3) {
1419  const LogGuid conv(id_), sub_conv(sub);
1420  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::request_ack"
1421  " local %C remote %C\n", conv.c_str(), sub_conv.c_str()));
1422  }
1423  }
1424  }
1425 }
1426 
1427 bool RtpsUdpDataLink::requires_inline_qos(const GUIDSeq_var& peers)
1428 {
1429  if (force_inline_qos_) {
1430  // Force true for testing purposes
1431  return true;
1432  } else {
1433  if (!peers.ptr()) {
1434  return false;
1435  }
1437  for (CORBA::ULong i = 0; i < peers->length(); ++i) {
1438  const RemoteInfoMap::const_iterator iter = locators_.find(peers[i]);
1439  if (iter != locators_.end() && iter->second.requires_inline_qos_) {
1440  return true;
1441  }
1442  }
1443  return false;
1444  }
1445 }
1446 
1448 
1449 void
1451 {
1453 
1454  if (stopping_) {
1455  return;
1456  }
1457 
1458  RtpsUdpDataLink_rch link = link_.lock();
1459 
1460  if (!link) {
1461  return;
1462  }
1463 
1464  MetaSubmessageVec meta_submessages;
1465  gather_heartbeats_i(meta_submessages);
1466 
1467  if (!preassociation_readers_.empty() || !lagging_readers_.empty()) {
1468  heartbeat_->schedule(fallback_.get());
1469  fallback_.advance();
1470  } else {
1471  fallback_.set(initial_fallback_);
1472  }
1473 
1474  g.release();
1475 
1476  link->queue_submessages(meta_submessages);
1477 }
1478 
1479 void
1481 {
1482  RtpsUdpDataLink_rch link = link_.lock();
1483  if (!link) {
1484  return;
1485  }
1486 
1487  MetaSubmessageVec meta_submessages;
1488  {
1490 
1491  if (stopping_) {
1492  return;
1493  }
1494 
1495  gather_nack_replies_i(meta_submessages);
1496  }
1497 
1498  link->queue_submessages(meta_submessages);
1499 }
1500 
1501 void
1503  SequenceNumber gap_start)
1504 {
1505  // These are the GAP submessages that we'll send directly in-line with the
1506  // DATA when we notice that the DataWriter has deliberately skipped seq #s.
1507  // There are other GAP submessages generated in meta_submessage to reader ACKNACKS,
1508  // see send_nack_replies().
1509  using namespace OpenDDS::RTPS;
1510 
1511  const LongSeq8 bitmap;
1512 
1513  // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
1514  // [gapStart, gapListBase) and those in the SNSet.
1515  GapSubmessage gap = {
1516  {GAP, FLAG_E, 0 /*length determined below*/},
1517  ENTITYID_UNKNOWN, // readerId: applies to all matched readers
1518  id_.entityId,
1519  to_rtps_seqnum(gap_start),
1520  {to_rtps_seqnum(max_sn_), 0, bitmap}
1521  };
1522  OPENDDS_ASSERT(gap_start < max_sn_);
1523 
1524  const size_t size = serialized_size(Encoding(Encoding::KIND_XCDR1), gap);
1526  static_cast<CORBA::UShort>(size) - SMHDR_SZ;
1527 
1528  const CORBA::ULong idx = grow(msg) - 1;
1529  msg[idx].gap_sm(gap);
1530 }
1531 
1533 {
1534  RtpsUdpInst_rch cfg = config();
1535 
1536  if (!cfg) {
1537  return;
1538  }
1539 
1540  if (addr == cfg->rtps_relay_address()) {
1541  return;
1542  }
1543 
1544  bool remove_cache = false;
1545  {
1547  const RemoteInfoMap::iterator pos = locators_.find(src);
1548  if (pos != locators_.end()) {
1549  const bool expired = cfg->receive_address_duration_ < (MonotonicTimePoint::now() - pos->second.last_recv_time_);
1550  const bool allow_update = expired ||
1551  pos->second.last_recv_addr_ == addr ||
1552  is_more_local(pos->second.last_recv_addr_, addr);
1553  if (allow_update) {
1554  remove_cache = pos->second.last_recv_addr_ != addr;
1555  pos->second.last_recv_addr_ = addr;
1556  pos->second.last_recv_time_ = now;
1557  }
1558  }
1559  }
1560  if (remove_cache) {
1562  }
1563 }
1564 
1565 // DataReader's side of Reliability
1566 
1567 void
1569  const GuidPrefix_t& src_prefix,
1570  const NetworkAddress& remote_addr)
1571 {
1572  const GUID_t local = make_id(local_prefix_, data.readerId);
1573  const GUID_t src = make_id(src_prefix, data.writerId);
1574 
1575  update_last_recv_addr(src, remote_addr);
1576 
1577  OPENDDS_VECTOR(RtpsReader_rch) to_call;
1578  {
1580  if (local.entityId == ENTITYID_UNKNOWN) {
1581  typedef std::pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> RRMM_IterRange;
1582  for (RRMM_IterRange iters = readers_of_writer_.equal_range(src); iters.first != iters.second; ++iters.first) {
1583  to_call.push_back(iters.first->second);
1584  }
1585  if (!pending_reliable_readers_.empty()) {
1586  GuardType guard(strategy_lock_);
1588  if (trs) {
1589  for (RepoIdSet::const_iterator it = pending_reliable_readers_.begin();
1590  it != pending_reliable_readers_.end(); ++it)
1591  {
1592  trs->withhold_data_from(*it);
1593  }
1594  }
1595  }
1596  } else {
1597  const RtpsReaderMap::iterator rr = readers_.find(local);
1598  if (rr != readers_.end()) {
1599  to_call.push_back(rr->second);
1600  } else if (pending_reliable_readers_.count(local)) {
1601  GuardType guard(strategy_lock_);
1603  if (trs) {
1604  trs->withhold_data_from(local);
1605  }
1606  }
1607  }
1608  }
1609  MetaSubmessageVec meta_submessages;
1610  for (OPENDDS_VECTOR(RtpsReader_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
1611  (*it)->process_data_i(data, src, meta_submessages);
1612  }
1613  queue_submessages(meta_submessages);
1614 }
1615 
1616 void
1618 {
1620 
1621  if (stopping_) {
1622  return;
1623  }
1624 
1625  stopping_ = true;
1626 
1627  preassociation_writers_.clear();
1628  log_remote_counts("pre_stop_helper");
1629 
1630  RtpsUdpDataLink_rch link = link_.lock();
1631 
1632  if (!link) {
1633  return;
1634  }
1635 
1636  GuardType guard(link->strategy_lock_);
1637  if (link->receive_strategy() == 0) {
1638  return;
1639  }
1640 
1641  for (WriterInfoMap::iterator it = remote_writers_.begin(); it != remote_writers_.end(); ++it) {
1642  it->second->held_.clear();
1643  }
1644 
1645  guard.release();
1646  g.release();
1647 
1648  preassociation_task_->cancel();
1649 }
1650 
1652  : link_(link)
1653  , id_(id)
1654  , stopping_(false)
1655  , nackfrag_count_(0)
1656  , preassociation_task_(make_rch<SporadicEvent>(link->event_dispatcher(), make_rch<PmfNowEvent<RtpsReader> >(rchandle_from(this), &RtpsUdpDataLink::RtpsReader::send_preassociation_acknacks)))
1657  , heartbeat_period_(link ? link->config()->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC))
1658 {
1659 }
1660 
1662 {
1663 }
1664 
1665 bool
1667  const GUID_t& src,
1668  MetaSubmessageVec&)
1669 {
1671 
1672  if (stopping_) {
1673  return false;
1674  }
1675 
1676  RtpsUdpDataLink_rch link = link_.lock();
1677 
1678  if (!link) {
1679  return false;
1680  }
1681 
1682  GuardType guard(link->strategy_lock_);
1683  if (link->receive_strategy() == 0) {
1684  return false;
1685  }
1686 
1687  const SequenceNumber seq = to_opendds_seqnum(data.writerSN);
1688  DeliverHeldData dhd;
1689  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1690  if (wi != remote_writers_.end()) {
1691  const WriterInfo_rch& writer = wi->second;
1692 
1693  DeliverHeldData dhd2(rchandle_from(this), src);
1694  std::swap(dhd, dhd2);
1695 
1696  writer->frags_.erase(seq);
1697 
1698  if (writer->recvd_.empty()) {
1699  if (Transport_debug_level > 5) {
1701  ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1702  ACE_TEXT(" data seq: %q from %C being from %C expecting heartbeat\n"),
1703  seq.getValue(),
1704  LogGuid(src).c_str(),
1705  LogGuid(id_).c_str()));
1706  }
1707  const ReceivedDataSample* sample =
1708  link->receive_strategy()->withhold_data_from(id_);
1709  writer->held_.insert(std::make_pair(seq, *sample));
1710 
1711  } else if (writer->recvd_.contains(seq)) {
1713  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_data_i: %C -> %C duplicate sample\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1714  }
1715  if (Transport_debug_level > 5) {
1716  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1717  ACE_TEXT(" data seq: %q from %C being DROPPED from %C because it's ALREADY received\n"),
1718  seq.getValue(),
1719  LogGuid(src).c_str(),
1720  LogGuid(id_).c_str()));
1721  }
1722  link->receive_strategy()->withhold_data_from(id_);
1723 
1724  } else if (!writer->held_.empty()) {
1725  const ReceivedDataSample* sample =
1726  link->receive_strategy()->withhold_data_from(id_);
1727  if (Transport_debug_level > 5) {
1728  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) WITHHOLD %q\n", seq.getValue()));
1729  writer->recvd_.dump();
1730  }
1731  writer->held_.insert(std::make_pair(seq, *sample));
1732  writer->recvd_.insert(seq);
1733 
1734  } else if (writer->recvd_.disjoint() || writer->recvd_.cumulative_ack() != seq.previous()) {
1735  if (Transport_debug_level > 5) {
1736  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1737  ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because it's EXPECTING more data\n"),
1738  seq.getValue(),
1739  LogGuid(src).c_str(),
1740  LogGuid(id_).c_str()));
1741  }
1742  const ReceivedDataSample* sample =
1743  link->receive_strategy()->withhold_data_from(id_);
1744  writer->held_.insert(std::make_pair(seq, *sample));
1745  writer->recvd_.insert(seq);
1746 
1747  } else {
1748  if (Transport_debug_level > 5) {
1749  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1750  ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
1751  seq.getValue(),
1752  LogGuid(src).c_str(),
1753  LogGuid(id_).c_str()));
1754  }
1755  writer->recvd_.insert(seq);
1756  link->receive_strategy()->do_not_withhold_data_from(id_);
1757  }
1758 
1759  } else {
1761  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_data_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1762  }
1763  if (Transport_debug_level > 5) {
1764  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1765  ACE_TEXT(" data seq: %q from %C to %C dropped because of unknown writer\n"),
1767  LogGuid(src).c_str(),
1768  LogGuid(id_).c_str()));
1769  }
1770  link->receive_strategy()->withhold_data_from(id_);
1771  }
1772 
1773  // Release for delivering held data.
1774  guard.release();
1775  g.release();
1776 
1777  return false;
1778 }
1779 
1780 void
1782  const GuidPrefix_t& src_prefix,
1783  bool directed,
1784  const NetworkAddress& remote_addr)
1785 {
1786  update_last_recv_addr(make_id(src_prefix, gap.writerId), remote_addr);
1787  datareader_dispatch(gap, src_prefix, directed, &RtpsReader::process_gap_i);
1788 }
1789 
1790 void
1792  const GUID_t& src,
1793  bool /*directed*/,
1794  MetaSubmessageVec&)
1795 {
1797 
1798  RtpsUdpDataLink_rch link = link_.lock();
1799 
1800  if (!link) {
1801  return;
1802  }
1803 
1804  GuardType guard(link->strategy_lock_);
1805  if (link->receive_strategy() == 0) {
1806  return;
1807  }
1808 
1809  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1810  if (wi == remote_writers_.end()) {
1812  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_gap_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1813  }
1814  return;
1815  }
1816 
1817  const WriterInfo_rch& writer = wi->second;
1818 
1819  if (writer->recvd_.empty()) {
1821  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_gap_i: %C -> %C preassociation writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1822  }
1823  return;
1824  }
1825 
1827  const SequenceNumber base = to_opendds_seqnum(gap.gapList.bitmapBase);
1828 
1829  if (start < base) {
1830  writer->recvd_.insert(SequenceRange(start, base.previous()));
1831  } else if (start != base) {
1832  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::RtpsReader::process_gap_i: ERROR - Incoming GAP has inverted start (%q) & base (%q) values, ignoring start value\n", start.getValue(), base.getValue()));
1833  }
1834  writer->recvd_.insert(base, gap.gapList.numBits, gap.gapList.bitmap.get_buffer());
1835 
1836  DisjointSequence gaps;
1837  if (start < base) {
1838  gaps.insert(SequenceRange(start, base.previous()));
1839  }
1840  gaps.insert(base, gap.gapList.numBits, gap.gapList.bitmap.get_buffer());
1841 
1842  if (!gaps.empty()) {
1843  for (WriterInfo::HeldMap::iterator pos = writer->held_.lower_bound(gaps.low()),
1844  limit = writer->held_.upper_bound(gaps.high()); pos != limit;) {
1845  if (gaps.contains(pos->first)) {
1846  writer->held_.erase(pos++);
1847  } else {
1848  ++pos;
1849  }
1850  }
1851  }
1852 
1853  const OPENDDS_VECTOR(SequenceRange) psr = gaps.present_sequence_ranges();
1854  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end(); pos != limit; ++pos) {
1855  link->receive_strategy()->remove_fragments(*pos, writer->id_);
1856  }
1857 
1858  guard.release();
1859  g.release();
1860 
1861  DeliverHeldData dhd(rchandle_from(this), src);
1862 }
1863 
1864 void
1866  const GuidPrefix_t& src_prefix,
1867  bool directed,
1868  const NetworkAddress& remote_addr)
1869 {
1870  const GUID_t src = make_id(src_prefix, heartbeat.writerId);
1872 
1873  update_last_recv_addr(src, remote_addr, now);
1874 
1875  MetaSubmessageVec meta_submessages;
1876  OPENDDS_VECTOR(InterestingRemote) callbacks;
1877  {
1879 
1880  // We received a heartbeat from a writer.
1881  // We should ACKNACK if the writer is interesting and there is no association.
1882 
1883  for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
1884  limit = interesting_writers_.upper_bound(src);
1885  pos != limit;
1886  ++pos) {
1887  pos->second.last_activity = now;
1888  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
1889  callbacks.push_back(pos->second);
1890  pos->second.status = InterestingRemote::EXISTS;
1891  }
1892  }
1893  }
1894  queue_submessages(meta_submessages);
1895 
1896  for (size_t i = 0; i < callbacks.size(); ++i) {
1897  callbacks[i].listener->writer_exists(src, callbacks[i].localid);
1898  }
1899 
1900  datareader_dispatch(heartbeat, src_prefix, directed, &RtpsReader::process_heartbeat_i);
1901 }
1902 
1903 void
1905  const GUID_t& src,
1906  bool directed,
1907  MetaSubmessageVec& meta_submessages)
1908 {
1909  // TODO: Delay responses by heartbeat_response_delay_.
1911 
1912  RtpsUdpDataLink_rch link = link_.lock();
1913 
1914  if (!link) {
1915  return;
1916  }
1917 
1918  GuardType guard(link->strategy_lock_);
1919  if (link->receive_strategy() == 0) {
1920  return;
1921  }
1922 
1923  // Heartbeat Sequence Range
1924  const SequenceNumber hb_first = to_opendds_seqnum(heartbeat.firstSN);
1925  const SequenceNumber hb_last = to_opendds_seqnum(heartbeat.lastSN);
1926 
1927  if (Transport_debug_level > 5) {
1928  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsReader::process_heartbeat_i - %C -> %C first %q last %q count %d\n",
1929  LogGuid(src).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
1930  }
1931 
1932  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1933  if (wi == remote_writers_.end()) {
1935  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1936  }
1937  return;
1938  }
1939 
1940  const WriterInfo_rch& writer = wi->second;
1941 
1942  if (!compare_and_update_counts(heartbeat.count.value, writer->heartbeat_recvd_count_)) {
1944  const GUID_t dst = heartbeat.readerId == DCPS::ENTITYID_UNKNOWN ? GUID_UNKNOWN : id_;
1945  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C stale/duplicate message (%d vs %d)\n",
1946  LogGuid(src).c_str(), LogGuid(dst).c_str(), heartbeat.count.value, writer->heartbeat_recvd_count_));
1947  }
1948  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_heartbeat_i "
1949  "WARNING Count indicates duplicate, dropping\n"));
1950  return;
1951  }
1952 
1953  const bool is_final = heartbeat.smHeader.flags & RTPS::FLAG_F;
1954 
1955  static const SequenceNumber one, zero = SequenceNumber::ZERO();
1956 
1957  bool first_ever_hb = false;
1958 
1959  if (!is_final && transport_debug.log_nonfinal_messages) {
1960  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i - %C -> %C first %q last %q count %d\n",
1961  LogGuid(src).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
1962  }
1963 
1964  // Only valid heartbeats (see spec) will be "fully" applied to writer info
1965  if (!(hb_first < 1 || hb_last < 0 || hb_last < hb_first.previous())) {
1966  if (writer->recvd_.empty() && (directed || !writer->sends_directed_hb())) {
1967  OPENDDS_ASSERT(preassociation_writers_.count(writer));
1968  preassociation_writers_.erase(writer);
1970  log_progress("RTPS reader/writer association complete", id_, writer->id_, writer->participant_discovered_at_);
1971  }
1972  log_remote_counts("process_heartbeat_i");
1973 
1974  const SequenceRange sr(zero, hb_first.previous());
1975  writer->recvd_.insert(sr);
1976  while (!writer->held_.empty() && writer->held_.begin()->first <= sr.second) {
1977  writer->held_.erase(writer->held_.begin());
1978  }
1979  for (WriterInfo::HeldMap::const_iterator it = writer->held_.begin(); it != writer->held_.end(); ++it) {
1980  writer->recvd_.insert(it->first);
1981  }
1982  link->receive_strategy()->remove_fragments(sr, writer->id_);
1983  first_ever_hb = true;
1984  }
1985 
1986  ACE_CDR::ULong cumulative_bits_added = 0;
1987  if (!writer->recvd_.empty()) {
1988  writer->hb_last_ = std::max(writer->hb_last_, hb_last);
1989  gather_ack_nacks_i(writer, link, !is_final, meta_submessages, cumulative_bits_added);
1990  }
1991  if (cumulative_bits_added) {
1992  RtpsUdpInst_rch cfg = link->config();
1993  if (cfg && cfg->count_messages()) {
1995  link->transport_statistics_.reader_nack_count[id_] += cumulative_bits_added;
1996  }
1997  }
1998  } else {
1999  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C - INVALID - first %q last %q count %d\n", LogGuid(writer->id_).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
2000  }
2001 
2002  guard.release();
2003  g.release();
2004 
2005  if (first_ever_hb) {
2006  link->invoke_on_start_callbacks(id_, src, true);
2007  }
2008 
2009  DeliverHeldData dhd(rchandle_from(this), src);
2010 
2011  //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC
2012  return;
2013 }
2014 
2015 bool
2017 {
2018  if (recvd_.empty() || (recvd_.disjoint() && recvd_.cumulative_ack() < hb_last_)) {
2019  return true;
2020  }
2021  if (!recvd_.empty()) {
2022  return recvd_.high() < hb_last_;
2023  }
2024  return false;
2025 }
2026 
2028 {
2029  return participant_flags_ & RTPS::PFLAGS_DIRECTED_HEARTBEAT;
2030 }
2031 
2032 bool
2034 {
2036  OPENDDS_ASSERT(!reader->durable_ || durable_);
2037 
2038  if (stopping_) {
2039  return false;
2040  }
2041 
2042  ReaderInfoMap::const_iterator iter = remote_readers_.find(reader->id_);
2043  if (iter == remote_readers_.end()) {
2044 #ifdef OPENDDS_SECURITY
2045  if (is_pvs_writer_) {
2046  reader->max_pvs_sn_ = max_sn_;
2047  }
2048 #endif
2049  remote_readers_.insert(ReaderInfoMap::value_type(reader->id_, reader));
2050  update_remote_guids_cache_i(true, reader->id_);
2051  preassociation_readers_.insert(reader);
2052  preassociation_reader_start_sns_.insert(reader->start_sn_);
2053  log_remote_counts("add_reader");
2054 
2055  RtpsUdpDataLink_rch link = link_.lock();
2056  if (!link) {
2057  return false;
2058  }
2059 
2060  fallback_.set(initial_fallback_);
2061  heartbeat_->schedule(fallback_.get());
2062  // Durable readers will get their heartbeat from end historic samples.
2063  if (!reader->durable_) {
2064  MetaSubmessageVec meta_submessages;
2065  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
2066  const SingleSendBuffer::Proxy proxy(*send_buff_);
2067  initialize_heartbeat(proxy, meta_submessage);
2068  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
2069  g.release();
2070  link->queue_submessages(meta_submessages);
2071  }
2072 
2073  return true;
2074  }
2075  return false;
2076 }
2077 
2078 bool
2080 {
2082  return remote_readers_.count(id) != 0;
2083 }
2084 
2085 bool
2087 {
2089  TqeSet to_drop;
2090 
2091  bool result = false;
2092  {
2094  ReaderInfoMap::iterator it = remote_readers_.find(id);
2095  if (it != remote_readers_.end()) {
2096  const ReaderInfo_rch& reader = it->second;
2097  reader->swap_durable_data(dd);
2098  remove_preassociation_reader(reader);
2099  const SequenceNumber acked_sn = reader->acked_sn();
2100  const SequenceNumber max_sn = expected_max_sn(reader);
2101  readers_expecting_data_.erase(reader);
2102  readers_expecting_heartbeat_.erase(reader);
2103  snris_erase(acked_sn == max_sn ? leading_readers_ : lagging_readers_, acked_sn, reader);
2104  check_leader_lagger();
2105 
2106 #ifdef OPENDDS_SECURITY
2107  if (is_pvs_writer_ &&
2108  !reader->pvs_outstanding_.empty()) {
2109  const OPENDDS_VECTOR(SequenceRange) psr = reader->pvs_outstanding_.present_sequence_ranges();
2110  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end(); pos != limit; ++pos) {
2111  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, elems_not_acked_mutex_, result);
2112  for (SequenceNumber seq = pos->first; seq <= pos->second; ++seq) {
2113  OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter = elems_not_acked_.find(seq);
2114  if (iter != elems_not_acked_.end()) {
2115  send_buff_->release_acked(iter->first);
2116  to_drop.insert(iter->second);
2117  elems_not_acked_.erase(iter);
2118  }
2119  }
2120  }
2121  }
2122 #endif
2123 
2124  remote_readers_.erase(it);
2125  update_remote_guids_cache_i(false, id);
2126  result = true;
2127  log_remote_counts("remove_reader");
2128  }
2129  }
2130  typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
2131  for (iter_t it = dd.begin(); it != dd.end(); ++it) {
2132  it->second->data_dropped();
2133  }
2134 
2135  for (TqeSet::iterator pos = to_drop.begin(), limit = to_drop.end(); pos != limit; ++pos) {
2136  (*pos)->data_dropped();
2137  }
2138 
2139  return result;
2140 }
2141 
2142 size_t
2144 {
2146  return remote_readers_.size();
2147 }
2148 
2149 bool
2151 {
2153 
2154  if (stopping_) {
2155  return false;
2156  }
2157 
2158  WriterInfoMap::const_iterator iter = remote_writers_.find(writer->id_);
2159  if (iter == remote_writers_.end()) {
2160  remote_writers_[writer->id_] = writer;
2161  preassociation_writers_.insert(writer);
2162  log_remote_counts("add_writer");
2163 
2164  RtpsUdpDataLink_rch link = link_.lock();
2165  if (!link) {
2166  return false;
2167  }
2168 
2170  MetaSubmessageVec meta_submessages;
2171  gather_preassociation_acknack_i(meta_submessages, writer);
2172  g.release();
2173  link->queue_submessages(meta_submessages);
2174 
2175  return true;
2176  }
2177  return false;
2178 }
2179 
2180 bool
2182 {
2184  return remote_writers_.count(id) != 0;
2185 }
2186 
2187 bool
2189 {
2191  WriterInfoMap::iterator pos = remote_writers_.find(id);
2192  if (pos != remote_writers_.end()) {
2193  preassociation_writers_.erase(pos->second);
2194  remote_writers_.erase(pos);
2195  log_remote_counts("remove_writer");
2196  return true;
2197  }
2198 
2199  return false;
2200 }
2201 
2202 size_t
2204 {
2206  return remote_writers_.size();
2207 }
2208 
2209 bool
2211  const WriterInfo_rch& info)
2212 {
2213  if (!info->frags_.empty()) {
2214  return true;
2215  }
2216 
2217  if (!info->recvd_.empty()) {
2218  const SequenceRange range(info->recvd_.cumulative_ack() + 1, info->hb_last_);
2219  if (link->receive_strategy()->has_fragments(range, info->id_)) {
2220  return true;
2221  }
2222  }
2223 
2224  return false;
2225 }
2226 
2227 void
2229 {
2230  RtpsUdpDataLink_rch link = link_.lock();
2231  if (!link) {
2232  return;
2233  }
2234 
2235  MetaSubmessageVec meta_submessages;
2236  {
2238 
2239  if (stopping_ || preassociation_writers_.empty()) {
2240  return;
2241  }
2242 
2243  // We want a heartbeat from these writers.
2244  meta_submessages.reserve(preassociation_writers_.size());
2245  for (WriterInfoSet::const_iterator pos = preassociation_writers_.begin(), limit = preassociation_writers_.end();
2246  pos != limit; ++pos) {
2247  gather_preassociation_acknack_i(meta_submessages, *pos);
2248  }
2249  }
2250 
2251  link->queue_submessages(meta_submessages);
2252 
2254 }
2255 
2256 void
2258  const WriterInfo_rch& writer)
2259 {
2260  using namespace OpenDDS::RTPS;
2261 
2262  OPENDDS_ASSERT(writer->recvd_.empty());
2263  const CORBA::ULong num_bits = 0;
2264  const LongSeq8 bitmap;
2265  const EntityId_t reader_id = id_.entityId;
2266  const EntityId_t writer_id = writer->id_.entityId;
2267 
2268  MetaSubmessage meta_submessage(id_, writer->id_);
2269 
2270  AckNackSubmessage acknack = {
2271  {ACKNACK,
2273  0 /*length*/},
2274  reader_id,
2275  writer_id,
2276  { // SequenceNumberSet: acking bitmapBase - 1
2277  {0, 1},
2278  num_bits, bitmap
2279  },
2280  {writer->heartbeat_recvd_count_}
2281  };
2282  meta_submessage.sm_.acknack_sm(acknack);
2283  meta_submessages.push_back(meta_submessage);
2284 }
2285 
2286 void
2288  const RtpsUdpDataLink_rch& link,
2289  bool heartbeat_was_non_final,
2290  MetaSubmessageVec& meta_submessages,
2291  ACE_CDR::ULong& cumulative_bits_added)
2292 {
2293  const bool should_nack_frags = should_nack_fragments(link, writer);
2294  if (writer->should_nack() ||
2295  should_nack_frags) {
2296  using namespace OpenDDS::RTPS;
2297  const EntityId_t reader_id = id_.entityId;
2298  const EntityId_t writer_id = writer->id_.entityId;
2299  MetaSubmessage meta_submessage(id_, writer->id_);
2300 
2301  const DisjointSequence& recvd = writer->recvd_;
2302  const SequenceNumber& hb_high = writer->hb_last_;
2303  const SequenceNumber ack = recvd.empty() ? 1 : ++SequenceNumber(recvd.cumulative_ack());
2304  const SequenceNumber::Value ack_val = ack.getValue();
2305  CORBA::ULong num_bits = 0;
2306  LongSeq8 bitmap;
2307 
2308  if (recvd.disjoint()) {
2309  bitmap.length(DisjointSequence::bitmap_num_longs(ack, recvd.last_ack().previous()));
2310  if (bitmap.length() > 0) {
2311  (void)recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
2312  num_bits, cumulative_bits_added, true);
2313  }
2314  }
2315 
2316  if (!recvd.empty() && hb_high > recvd.high()) {
2317  const SequenceNumber eff_high =
2318  (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
2319  const SequenceNumber::Value eff_high_val = eff_high.getValue();
2320  // Nack the range between the received high and the effective high.
2321  const CORBA::ULong old_len = bitmap.length(),
2322  new_len = DisjointSequence::bitmap_num_longs(ack, eff_high);
2323  if (new_len > old_len) {
2324  bitmap.length(new_len);
2325  for (CORBA::ULong i = old_len; i < new_len; ++i) {
2326  bitmap[i] = 0;
2327  }
2328  }
2329  const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
2330  idx_recv_high = recvd.disjoint() ?
2331  CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
2332  DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
2333  bitmap.get_buffer(), new_len,
2334  num_bits, cumulative_bits_added);
2335  }
2336 
2337  // If the receive strategy is holding any fragments, those should
2338  // not be "nacked" in the ACKNACK reply. They will be accounted for
2339  // in the NACK_FRAG(s) instead.
2340  const bool frags_modified =
2341  link->receive_strategy()->remove_frags_from_bitmap(bitmap.get_buffer(),
2342  num_bits, ack, writer->id_, cumulative_bits_added);
2343  if (frags_modified) {
2344  for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
2345  if ((i + 1) * 32 <= num_bits) {
2346  if (bitmap[i]) {
2347  break;
2348  }
2349  } else {
2350  if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
2351  break;
2352  }
2353  }
2354  }
2355  }
2356 
2357  AckNackSubmessage acknack = {
2358  {ACKNACK,
2360  0 /*length*/},
2361  reader_id,
2362  writer_id,
2363  { // SequenceNumberSet: acking bitmapBase - 1
2364  to_rtps_seqnum(ack),
2365  num_bits, bitmap
2366  },
2367  {writer->heartbeat_recvd_count_}
2368  };
2369  meta_submessage.sm_.acknack_sm(acknack);
2370  meta_submessages.push_back(meta_submessage);
2371 
2372  if (should_nack_frags) {
2373  generate_nack_frags_i(meta_submessages, writer, reader_id, writer_id, cumulative_bits_added);
2374  }
2375  } else if (heartbeat_was_non_final) {
2376  using namespace OpenDDS::RTPS;
2377  const DisjointSequence& recvd = writer->recvd_;
2378  const CORBA::ULong num_bits = 0;
2379  const LongSeq8 bitmap;
2380  const SequenceNumber ack = recvd.empty() ? 1 : ++SequenceNumber(recvd.cumulative_ack());
2381  const EntityId_t reader_id = id_.entityId;
2382  const EntityId_t writer_id = writer->id_.entityId;
2383 
2384  MetaSubmessage meta_submessage(id_, writer->id_);
2385 
2386  AckNackSubmessage acknack = {
2387  {ACKNACK,
2389  0 /*length*/},
2390  reader_id,
2391  writer_id,
2392  { // SequenceNumberSet: acking bitmapBase - 1
2393  to_rtps_seqnum(ack),
2394  num_bits, bitmap
2395  },
2396  {writer->heartbeat_recvd_count_}
2397  };
2398  meta_submessage.sm_.acknack_sm(acknack);
2399  meta_submessages.push_back(meta_submessage);
2400  }
2401 }
2402 
2403 #ifdef OPENDDS_SECURITY
2404 namespace {
2405  const NetworkAddress BUNDLING_PLACEHOLDER;
2406 }
2407 #endif
2408 
2409 void
2410 RtpsUdpDataLink::build_meta_submessage_map(MetaSubmessageVec& meta_submessages, AddrDestMetaSubmessageMap& addr_map)
2411 {
2412  size_t cache_hits = 0;
2413  size_t cache_misses = 0;
2414  size_t addrset_min_size = std::numeric_limits<size_t>::max();
2415  size_t addrset_max_size = 0;
2416 
2417  BundlingCache::ScopedAccess global_access(bundling_cache_);
2419 
2420  // Sort meta_submessages by address set and destination
2421  for (MetaSubmessageVec::iterator it = meta_submessages.begin(), limit = meta_submessages.end(); it != limit; ++it) {
2422  if (it->ignore_) {
2423  continue;
2424  }
2425 
2426  const BundlingCacheKey key(it->src_guid_, it->dst_guid_);
2427  BundlingCache::ScopedAccess entry(bundling_cache_, key, false, now);
2428  if (entry.is_new_) {
2429 
2430  AddrSet& addrs = entry.value().addrs_;
2432 
2433  const bool directed = it->dst_guid_ != GUID_UNKNOWN;
2434  if (directed) {
2435  accumulate_addresses(it->src_guid_, it->dst_guid_, addrs, true);
2436  } else {
2437  addrs = get_addresses_i(it->src_guid_);
2438  }
2439 #ifdef OPENDDS_SECURITY
2440  if (local_crypto_handle() != DDS::HANDLE_NIL && separate_message(it->src_guid_.entityId)) {
2441  addrs.insert(BUNDLING_PLACEHOLDER); // removed in bundle_mapped_meta_submessages
2442  }
2443 #endif
2444 #if defined ACE_HAS_CPP11
2445  entry.recalculate_hash();
2446 #endif
2447  ++cache_misses;
2448  } else {
2449  ++cache_hits;
2450  }
2451 
2452  const BundlingCache::ScopedAccess& const_entry = entry;
2453  const AddrSet& addrs = const_entry.value().addrs_;
2454  addrset_min_size = std::min(addrset_min_size, static_cast<size_t>(addrs.size()));
2455  addrset_max_size = std::max(addrset_max_size, static_cast<size_t>(addrs.size()));
2456  if (addrs.empty()) {
2457  continue;
2458 #ifdef OPENDDS_SECURITY
2459  } else if (addrs.size() == 1 && *addrs.begin() == BUNDLING_PLACEHOLDER) {
2460  continue;
2461 #endif
2462  }
2463 
2464  DestMetaSubmessageMap& dest_map = addr_map[AddressCacheEntryProxy(const_entry.rch_)];
2465  if (std::memcmp(&(it->dst_guid_.guidPrefix), &GUIDPREFIX_UNKNOWN, sizeof(GuidPrefix_t)) != 0) {
2466  MetaSubmessageIterVec& vec = dest_map[make_unknown_guid(it->dst_guid_.guidPrefix)];
2467  vec.reserve(meta_submessages.size());
2468  vec.push_back(it);
2469  } else {
2470  MetaSubmessageIterVec& vec = dest_map[GUID_UNKNOWN];
2471  vec.reserve(meta_submessages.size());
2472  vec.push_back(it);
2473  }
2474  }
2475 
2476  VDBG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::build_meta_submessage_map()"
2477  "- Bundling Cache Stats: hits = %B, misses = %B, min = %B, max = %B\n",
2478  cache_hits, cache_misses, addrset_min_size, addrset_max_size));
2479 }
2480 
2481 #ifdef OPENDDS_SECURITY
2483 {
2484  // submessages generated by these entities may not be combined
2485  // with other submessages when using full-message protection
2486  // DDS Security v1.1 8.4.2.4 Table 27 is_rtps_protected
2487  using namespace RTPS;
2492 }
2493 #endif
2494 
2495 namespace {
2496 
2497 struct BundleHelper {
2498  static const size_t initial_size =
2499 #ifdef OPENDDS_SECURITY
2501 #else
2502  0;
2503 #endif
2504 
2505  BundleHelper(
2506  const Encoding& encoding, size_t max_bundle_size,
2507  RtpsUdpDataLink::BundleVec& bundles)
2508  : encoding_(encoding)
2509  , max_bundle_size_(max_bundle_size)
2510  , size_(initial_size)
2511  , bundles_(bundles)
2512  {
2513  }
2514 
2515  void end_bundle()
2516  {
2517  bundles_.back().size_ = size_;
2518  size_ = initial_size;
2519  }
2520 
2521  template <typename T>
2522  bool add_to_bundle(T& submessage)
2523  {
2524  const size_t prev_size = size_;
2525 #ifdef OPENDDS_SECURITY
2526  // Could be an encoded submessage (encoding happens later)
2528 #endif
2529  const size_t submessage_size = serialized_size(encoding_, submessage);
2530  submessage.smHeader.submessageLength = static_cast<CORBA::UShort>(submessage_size - RTPS::SMHDR_SZ);
2532  size_ += submessage_size;
2533 #ifdef OPENDDS_SECURITY
2534  // Could be an encoded submessage (encoding happens later)
2537 #endif
2538  size_t compare_size = size_;
2539 #ifdef OPENDDS_SECURITY
2540  // Could be an encoded rtps message (encoding happens later)
2541  align(compare_size, RTPS::SM_ALIGN);
2543 #endif
2544  if (compare_size > max_bundle_size_) {
2545  const size_t chunk_size = size_ - prev_size;
2546  bundles_.back().size_ = prev_size;
2547  size_ = initial_size + chunk_size;
2548  return false;
2549  }
2550  return true;
2551  }
2552 
2553  const Encoding& encoding_;
2554  const size_t max_bundle_size_;
2555  size_t size_;
2556  RtpsUdpDataLink::BundleVec& bundles_;
2557 };
2558 
2559 }
2560 
2561 void
2563  AddrDestMetaSubmessageMap& addr_map,
2564  BundleVec& bundles,
2565  CountKeeper& counts)
2566 {
2567  using namespace RTPS;
2568 
2569  // Reusable INFO_DST
2570  InfoDestinationSubmessage idst = {
2572  {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
2573  };
2574 
2575  RtpsUdpInst_rch cfg = config();
2576 
2577  const bool new_bundle_per_dest_guid = cfg && cfg->rtps_relay_only();
2578 
2579  BundleHelper helper(encoding, max_bundle_size_, bundles);
2580  GUID_t prev_dst; // used to determine when we need to write a new info_dst
2581  for (AddrDestMetaSubmessageMap::iterator addr_it = addr_map.begin(), limit = addr_map.end(); addr_it != limit; ++addr_it) {
2582 
2583  // A new address set always starts a new bundle
2584  bundles.push_back(Bundle(addr_it->first));
2585 
2586  prev_dst = GUID_UNKNOWN;
2587 
2588  for (DestMetaSubmessageMap::iterator dest_it = addr_it->second.begin(), limit2 = addr_it->second.end(); dest_it != limit2; ++dest_it) {
2589 
2590  if (dest_it->second.empty()) {
2591  continue;
2592  }
2593 
2594  // Check to see if we're sending separate messages per destination guid
2595  if (new_bundle_per_dest_guid && bundles.back().submessages_.size()) {
2596  helper.end_bundle();
2597  bundles.push_back(Bundle(addr_it->first));
2598  prev_dst = GUID_UNKNOWN;
2599  }
2600 
2601  for (MetaSubmessageIterVec::iterator resp_it = dest_it->second.begin(), limit3 = dest_it->second.end(); resp_it != limit3; ++resp_it) {
2602 
2603  // Check before every meta_submessage to see if we need to prefix a INFO_DST
2604  if (dest_it->first != prev_dst) {
2605  // If adding an INFO_DST prefix bumped us over the limit, push the
2606  // size difference into the next bundle, reset prev_dst, and keep going
2607  if (!helper.add_to_bundle(idst)) {
2608  bundles.push_back(Bundle(addr_it->first));
2609  }
2610  }
2611 
2612  // Attempt to add the submessage meta_submessage to the bundle
2613  bool result = false, unique = false;
2614  ACE_UNUSED_ARG(unique);
2615  MetaSubmessage& res = **resp_it;
2616  switch (res.sm_._d()) {
2617  case HEARTBEAT: {
2618  const EntityId_t id = res.sm_.heartbeat_sm().writerId;
2619  result = helper.add_to_bundle(res.sm_.heartbeat_sm());
2620  CountMapPair& map_pair = counts.heartbeat_counts_[id].map_[res.sm_.heartbeat_sm().count.value];
2621  if (res.dst_guid_ == GUID_UNKNOWN) {
2622  map_pair.undirected_ = true;
2623  }
2624  break;
2625  }
2626  case ACKNACK: {
2627  result = helper.add_to_bundle(res.sm_.acknack_sm());
2628  break;
2629  }
2630  case GAP: {
2631  result = helper.add_to_bundle(res.sm_.gap_sm());
2632  break;
2633  }
2634  case NACK_FRAG: {
2635  const EntityId_t id = res.sm_.nack_frag_sm().readerId;
2636  unique = counts.nackfrag_counts_[id].insert(res.sm_.nack_frag_sm().count.value).second;
2637  OPENDDS_ASSERT(unique);
2638  result = helper.add_to_bundle(res.sm_.nack_frag_sm());
2639  break;
2640  }
2641  default: {
2642  break;
2643  }
2644  }
2645  prev_dst = dest_it->first;
2646 
2647  // If adding the submessage bumped us over the limit, push the size
2648  // difference into the next bundle, reset prev_dst, and keep going
2649  if (!result) {
2650  bundles.push_back(Bundle(addr_it->first));
2651  prev_dst = GUID_UNKNOWN;
2652  }
2653  bundles.back().submessages_.push_back(*resp_it);
2654  }
2655  }
2656  helper.end_bundle();
2657  }
2658 }
2659 
2660 void
2662 {
2664  sq_.ready_to_send();
2665 
2666  disable_response_queue(true);
2667 }
2668 
2669 void
2671 {
2674 }
2675 
2676 void
2678 {
2679  for (size_t idx = 0; idx != fsq_vec_size_; ++idx) {
2680  dedup(fsq_vec_[idx]);
2681  bundle_and_send_submessages(fsq_vec_[idx]);
2682  fsq_vec_[idx].clear();
2683  }
2684  fsq_vec_size_ = 0;
2685 }
2686 
2687 void
2689 {
2691 }
2692 
2693 void
2695 {
2696  MetaSubmessageVec vec;
2698  sq_.end_transaction(vec);
2699  if (!vec.empty()) {
2700  if (fsq_vec_.size() == fsq_vec_size_) {
2701  fsq_vec_.resize(fsq_vec_.size() + 1);
2702  }
2703  fsq_vec_[fsq_vec_size_++].swap(vec);
2704  }
2705 
2706  if (fsq_vec_size_) {
2707  if (send_immediately) {
2709  } else {
2711  }
2712  }
2713 
2714 }
2715 
2716 void
2718 {
2719  if (in.empty()) {
2720  return;
2721  }
2722 
2723  if (sq_.enqueue(in)) {
2724  RtpsUdpInst_rch cfg = config();
2725  if (cfg) {
2726  harvest_send_queue_sporadic_->schedule(cfg->send_delay_);
2727  }
2728  }
2729 }
2730 
2731 void
2733 {
2735  ReaderInfoMap::iterator ri = remote_readers_.find(id);
2736  if (ri != remote_readers_.end()) {
2737  ri->second->required_acknack_count_ = current;
2738  }
2739 }
2740 
2741 void
2743 {
2744  RtpsWriter_rch writer;
2745  {
2747  RtpsWriterMap::iterator rw = writers_.find(local_id);
2748  if (rw != writers_.end()) {
2749  writer = rw->second;
2750  }
2751  }
2752  if (writer) {
2753  writer->update_required_acknack_count(remote_id, current);
2754  }
2755 }
2756 
2757 void
2758 RtpsUdpDataLink::bundle_and_send_submessages(MetaSubmessageVec& meta_submessages)
2759 {
2760  using namespace RTPS;
2761 
2762  // Sort meta_submessages based on both locator IPs and INFO_DST GUID destination/s
2763  AddrDestMetaSubmessageMap addr_map;
2764  build_meta_submessage_map(meta_submessages, addr_map);
2765 
2767 
2768  // Build reasonably-sized submessage bundles based on our destination map
2769  BundleVec bundles;
2770  bundles.reserve(meta_submessages.size());
2771 
2772  CountKeeper counts;
2773  bundle_mapped_meta_submessages(encoding, addr_map, bundles, counts);
2774 
2775  // Reusable INFO_DST
2776  InfoDestinationSubmessage idst = {
2778  {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
2779  };
2780 
2781  for (IdCountMapping::iterator it = counts.heartbeat_counts_.begin(), limit = counts.heartbeat_counts_.end(); it != limit; ++it) {
2782  it->second.next_directed_unassigned_ = it->second.map_.begin();
2783  it->second.next_undirected_unassigned_ = it->second.map_.begin();
2784  for (CountMap::iterator it2 = it->second.map_.begin(), limit2 = it->second.map_.end(); it2 != limit2; ++it2) {
2785  if (it2->second.undirected_) {
2786  ++(it->second.next_directed_unassigned_);
2787  }
2788  }
2789  }
2790 
2791  // Allocate buffers, seralize, and send bundles
2792  GUID_t prev_dst; // used to determine when we need to write a new info_dst
2793  for (size_t i = 0; i < bundles.size(); ++i) {
2794  RTPS::Message rtps_message;
2795  prev_dst = GUID_UNKNOWN;
2796  Message_Block_Ptr mb_bundle(alloc_msgblock(bundles[i].size_, &bundle_allocator_));
2797  Serializer ser(mb_bundle.get(), encoding);
2798  const MetaSubmessageIterVec& bundle_vec = bundles[i].submessages_;
2799  for (MetaSubmessageIterVec::const_iterator it = bundle_vec.begin(), limit = bundle_vec.end(); it != limit; ++it) {
2800  MetaSubmessage& res = **it;
2801  const GUID_t dst = make_unknown_guid(res.dst_guid_);
2802  if (dst != prev_dst) {
2803  assign(idst.guidPrefix, dst.guidPrefix);
2804  ser << idst;
2806  append_submessage(rtps_message, idst);
2807  }
2808  }
2809  switch (res.sm_._d()) {
2810  case HEARTBEAT: {
2811  CountMapping& mapping = counts.heartbeat_counts_[res.sm_.heartbeat_sm().writerId];
2812  CountMapPair& map_pair = mapping.map_[res.sm_.heartbeat_sm().count.value];
2813  if (!map_pair.is_new_assigned_) {
2814  if (map_pair.undirected_) {
2815  OPENDDS_ASSERT(mapping.next_undirected_unassigned_ != mapping.map_.end());
2816  map_pair.new_ = mapping.next_undirected_unassigned_->first;
2817  ++mapping.next_undirected_unassigned_;
2818  } else {
2819  OPENDDS_ASSERT(mapping.next_directed_unassigned_ != mapping.map_.end());
2820  map_pair.new_ = mapping.next_directed_unassigned_->first;
2821  ++mapping.next_directed_unassigned_;
2823  if (res.sm_.heartbeat_sm().count.value != map_pair.new_) {
2825  }
2827  }
2828  }
2829  map_pair.is_new_assigned_ = true;
2830  }
2831  res.sm_.heartbeat_sm().count.value = map_pair.new_;
2832  const HeartBeatSubmessage& heartbeat = res.sm_.heartbeat_sm();
2834  const SequenceNumber hb_first = to_opendds_seqnum(heartbeat.firstSN);
2835  const SequenceNumber hb_last = to_opendds_seqnum(heartbeat.lastSN);
2836  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: HEARTBEAT: %C -> %C first %q last %q count %d\n",
2837  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
2838  }
2839  break;
2840  }
2841  case ACKNACK: {
2842  const AckNackSubmessage& acknack = res.sm_.acknack_sm();
2844  const SequenceNumber ack = to_opendds_seqnum(acknack.readerSNState.bitmapBase);
2845  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: ACKNACK: %C -> %C base %q bits %u count %d\n",
2846  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
2847  }
2848  break;
2849  }
2850  case NACK_FRAG: {
2851  CountSet& set = counts.nackfrag_counts_[res.sm_.nack_frag_sm().readerId];
2852  OPENDDS_ASSERT(!set.empty());
2853  res.sm_.nack_frag_sm().count.value = *set.begin();
2854  set.erase(set.begin());
2855  const NackFragSubmessage& nackfrag = res.sm_.nack_frag_sm();
2856  // All NackFrag messages are technically 'non-final' since they are only used to negatively acknowledge fragments and expect a response
2858  const SequenceNumber seq = to_opendds_seqnum(nackfrag.writerSN);
2859  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: NACKFRAG: %C -> %C seq %q base %u bits %u\n",
2860  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), seq.getValue(), nackfrag.fragmentNumberState.bitmapBase.value, nackfrag.fragmentNumberState.numBits));
2861  }
2862  break;
2863  }
2864  default: {
2865  break;
2866  }
2867  }
2868  ser << res.sm_;
2870  push_back(rtps_message.submessages, res.sm_);
2871  }
2872  prev_dst = dst;
2873  }
2875  if (ss) {
2876  ss->send_rtps_control(rtps_message, *(mb_bundle.get()), bundles[i].proxy_.addrs());
2877  }
2878  }
2879 }
2880 
2881 void
2882 RtpsUdpDataLink::RtpsReader::generate_nack_frags_i(MetaSubmessageVec& meta_submessages,
2883  const WriterInfo_rch& wi,
2884  EntityId_t reader_id,
2885  EntityId_t writer_id,
2886  ACE_CDR::ULong& cumulative_bits_added)
2887 {
2888  typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
2889  typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
2890  RtpsUdpReceiveStrategy::FragmentInfo frag_info;
2891 
2892  // This is an internal method, locks already locked,
2893  // we just need a local handle to the link
2894  RtpsUdpDataLink_rch link = link_.lock();
2895 
2896  // Populate frag_info with two possible sources of NackFrags:
2897  // 1. sequence #s in the reception gaps that we have partially received
2898  OPENDDS_VECTOR(SequenceRange) missing = wi->recvd_.missing_sequence_ranges();
2899  for (size_t i = 0; i < missing.size(); ++i) {
2900  link->receive_strategy()->has_fragments(missing[i], wi->id_, &frag_info);
2901  }
2902  // 1b. larger than the last received seq# but less than the heartbeat.lastSN
2903  if (!wi->recvd_.empty() && wi->recvd_.high() < wi->hb_last_) {
2904  const SequenceRange range(wi->recvd_.high() + 1, wi->hb_last_);
2905  link->receive_strategy()->has_fragments(range, wi->id_, &frag_info);
2906  }
2907  for (size_t i = 0; i < frag_info.size(); ++i) {
2908  // If we've received a HeartbeatFrag, we know the last (available) frag #
2909  const iter_t heartbeat_frag = wi->frags_.find(frag_info[i].first);
2910  if (heartbeat_frag != wi->frags_.end()) {
2911  extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value, cumulative_bits_added);
2912  }
2913  }
2914 
2915  // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag
2916  const iter_t low = wi->frags_.lower_bound(wi->recvd_.cumulative_ack()),
2917  high = wi->frags_.upper_bound(wi->recvd_.last_ack()),
2918  end = wi->frags_.end();
2919  for (iter_t iter = wi->frags_.begin(); iter != end; ++iter) {
2920  if (iter == low) {
2921  // skip over the range covered by step #1 above
2922  if (high == end) {
2923  break;
2924  }
2925  iter = high;
2926  }
2927 
2928  const SequenceRange range(iter->first, iter->first);
2929  if (!link->receive_strategy()->has_fragments(range, wi->id_, &frag_info)) {
2930  // it was not in the recv strategy, so the entire range is "missing"
2931  frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
2932  RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
2933  fnSet.bitmapBase.value = 1;
2934  fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
2935  fnSet.bitmap.length((fnSet.numBits + 31) / 32);
2936  for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
2937  fnSet.bitmap[i] = 0xFFFFFFFF;
2938  }
2939  }
2940  }
2941 
2942  if (frag_info.empty()) {
2943  return;
2944  }
2945 
2946  const RTPS::NackFragSubmessage nackfrag_prototype = {
2947  {RTPS::NACK_FRAG, RTPS::FLAG_E, 0 /* length set below */},
2948  reader_id,
2949  writer_id,
2950  {0, 0}, // writerSN set below
2951  RTPS::FragmentNumberSet(), // fragmentNumberState set below
2952  {0} // count set below
2953  };
2954 
2955  meta_submessages.reserve(meta_submessages.size() + frag_info.size());
2956  for (size_t i = 0; i < frag_info.size(); ++i) {
2957  MetaSubmessage meta_submessage(id_, wi->id_);
2958  meta_submessage.sm_.nack_frag_sm(nackfrag_prototype);
2959  RTPS::NackFragSubmessage& nackfrag = meta_submessage.sm_.nack_frag_sm();
2960  nackfrag.writerSN = to_rtps_seqnum(frag_info[i].first);
2961  nackfrag.fragmentNumberState = frag_info[i].second;
2962  nackfrag.count.value = ++nackfrag_count_;
2963  meta_submessages.push_back(meta_submessage);
2964  }
2965 }
2966 
2967 void
2968 RtpsUdpDataLink::extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
2969  CORBA::ULong extent,
2970  ACE_CDR::ULong& samples_requested)
2971 {
2972  if (extent < fnSet.bitmapBase.value) {
2973  return; // can't extend to some number under the base
2974  }
2975  // calculate the index to the extent to determine the new_num_bits
2976  const CORBA::ULong new_num_bits = std::min(CORBA::ULong(256),
2977  extent - fnSet.bitmapBase.value + 1),
2978  len = (new_num_bits + 31) / 32;
2979  if (new_num_bits < fnSet.numBits) {
2980  return; // bitmap already extends past "extent"
2981  }
2982  fnSet.bitmap.length(len);
2983  // We are missing from one past old bitmap end to the new end
2984  DisjointSequence::fill_bitmap_range(fnSet.numBits, new_num_bits,
2985  fnSet.bitmap.get_buffer(), len,
2986  fnSet.numBits, samples_requested);
2987 }
2988 
2989 void
2991  const GuidPrefix_t& src_prefix,
2992  bool directed,
2993  const NetworkAddress& remote_addr)
2994 {
2995  update_last_recv_addr(make_id(src_prefix, hb_frag.writerId), remote_addr);
2996  datareader_dispatch(hb_frag, src_prefix, directed, &RtpsReader::process_heartbeat_frag_i);
2997 }
2998 
2999 void
3001  const GUID_t& src,
3002  bool /*directed*/,
3003  MetaSubmessageVec& meta_submessages)
3004 {
3006 
3007  RtpsUdpDataLink_rch link = link_.lock();
3008 
3009  if (!link) {
3010  return;
3011  }
3012 
3013  GuardType guard(link->strategy_lock_);
3014  if (link->receive_strategy() == 0) {
3015  return;
3016  }
3017 
3018  const WriterInfoMap::iterator wi = remote_writers_.find(src);
3019  if (wi == remote_writers_.end()) {
3020  // we may not be associated yet, even if the writer thinks we are
3022  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3023  }
3024  return;
3025  }
3026 
3027  const WriterInfo_rch& writer = wi->second;
3028 
3029  if (!compare_and_update_counts(hb_frag.count.value, writer->hb_frag_recvd_count_)) {
3031  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i: %C -> %C stale/duplicate message\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3032  }
3033  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_heartbeat_frag_i "
3034  "WARNING Count indicates duplicate, dropping\n"));
3035  return;
3036  }
3037 
3038  // If seq is outside the heartbeat range or we haven't completely received
3039  // it yet, send a NackFrag along with the AckNack. The heartbeat range needs
3040  // to be checked first because recvd_ contains the numbers below the
3041  // heartbeat range (so that we don't NACK those).
3042  const SequenceNumber seq = to_opendds_seqnum(hb_frag.writerSN);
3043  if (seq > writer->hb_last_ || !writer->recvd_.contains(seq)) {
3044  writer->frags_[seq] = hb_frag.lastFragmentNum;
3045  ACE_CDR::ULong cumulative_bits_added = 0;
3046  gather_ack_nacks_i(writer, link, !(hb_frag.smHeader.flags & RTPS::FLAG_F), meta_submessages, cumulative_bits_added);
3047  if (cumulative_bits_added) {
3048  RtpsUdpInst_rch cfg = link->config();
3049  if (cfg && cfg->count_messages()) {
3051  link->transport_statistics_.reader_nack_count[id_] += cumulative_bits_added;
3052  }
3053  }
3054 
3055  }
3056 }
3057 
3058 
3059 // DataWriter's side of Reliability
3060 
3061 void
3063  const GuidPrefix_t& src_prefix,
3064  const NetworkAddress& remote_addr)
3065 {
3066  // local side is DW
3067  const GUID_t local = make_id(local_prefix_, acknack.writerId); // can't be ENTITYID_UNKNOWN
3068  const GUID_t remote = make_id(src_prefix, acknack.readerId);
3070 
3071  update_last_recv_addr(remote, remote_addr, now);
3072 
3073  OPENDDS_VECTOR(DiscoveryListener*) callbacks;
3074 
3075  {
3077  for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
3078  limit = interesting_readers_.upper_bound(remote);
3079  pos != limit;
3080  ++pos) {
3081  pos->second.last_activity = now;
3082  // Ensure the acknack was for the writer.
3083  if (local == pos->second.localid) {
3084  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
3085  callbacks.push_back(pos->second.listener);
3086  pos->second.status = InterestingRemote::EXISTS;
3087  }
3088  }
3089  }
3090  }
3091 
3092  for (size_t i = 0; i < callbacks.size(); ++i) {
3093  callbacks[i]->reader_exists(remote, local);
3094  }
3095 
3096  datawriter_dispatch(acknack, src_prefix, &RtpsWriter::process_acknack);
3097 }
3098 
3099 void
3101  const DisjointSequence& gaps,
3102  MetaSubmessageVec& meta_submessages)
3103 {
3104  using namespace RTPS;
3105 
3106  OPENDDS_ASSERT(reader || !durable_);
3107 
3108  if (gaps.empty()) {
3109  return;
3110  }
3111 
3112  // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
3113  // [gapStart, gapListBase) and those in the SNSet.
3114  const SequenceNumber firstMissing = gaps.low(),
3115  base = ++SequenceNumber(gaps.cumulative_ack());
3116  const SequenceNumber_t gapStart = to_rtps_seqnum(firstMissing);
3117  const SequenceNumber_t gapListBase = to_rtps_seqnum(base);
3118  CORBA::ULong num_bits = 0;
3119  LongSeq8 bitmap;
3120 
3121  if (gaps.disjoint()) {
3122  bitmap.length(DisjointSequence::bitmap_num_longs(base, gaps.high()));
3123  if (bitmap.length() > 0) {
3124  ACE_CDR::ULong cumulative_bits_added = 0;
3125  (void)gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits, cumulative_bits_added);
3126  }
3127  }
3128 
3129  MetaSubmessage meta_submessage(id_, reader ? reader->id_ : GUID_UNKNOWN);
3130  GapSubmessage gap = {
3131  {GAP, FLAG_E, 0 /*length determined later*/},
3132  reader ? reader->id_.entityId : ENTITYID_UNKNOWN,
3133  id_.entityId,
3134  gapStart,
3135  {gapListBase, num_bits, bitmap}
3136  };
3137  OPENDDS_ASSERT(firstMissing < base);
3138  meta_submessage.sm_.gap_sm(gap);
3139 
3140  if (Transport_debug_level > 5) {
3141  const LogGuid conv(id_);
3142  SequenceRange sr;
3143  sr.first = to_opendds_seqnum(gap.gapStart);
3144  const SequenceNumber srbase = to_opendds_seqnum(gap.gapList.bitmapBase);
3145  sr.second = srbase.previous();
3146  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_gaps_i "
3147  "GAP with range [%q, %q] from %C\n",
3148  sr.first.getValue(), sr.second.getValue(),
3149  conv.c_str()));
3150  }
3151 
3152  meta_submessages.push_back(meta_submessage);
3153 }
3154 
3155 void
3157  const GUID_t& src,
3158  MetaSubmessageVec&)
3159 {
3161 
3162  if (stopping_) {
3163  return;
3164  }
3165 
3166  RtpsUdpDataLink_rch link = link_.lock();
3167 
3168  if (!link) {
3169  return;
3170  }
3171 
3172  const SequenceNumber ack = to_opendds_seqnum(acknack.readerSNState.bitmapBase);
3173 
3174  if (Transport_debug_level > 5) {
3175  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C base %q bits %u count %d\n",
3176  LogGuid(src).c_str(), LogGuid(id_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
3177  }
3178 
3179  ReaderInfoMap::iterator ri = remote_readers_.find(src);
3180  if (ri == remote_readers_.end()) {
3182  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C unknown remote reader\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3183  }
3184  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
3185  "WARNING ReaderInfo not found\n"));
3186  return;
3187  }
3188 
3189  const ReaderInfo_rch& reader = ri->second;
3190 
3191  SequenceNumber previous_acked_sn = reader->acked_sn();
3192  const bool count_is_not_zero = acknack.count.value != 0;
3193  const CORBA::Long previous_count = reader->acknack_recvd_count_;
3194  bool dont_schedule_nack_response = false;
3195 
3196  if (count_is_not_zero) {
3197  if (!compare_and_update_counts(acknack.count.value, reader->acknack_recvd_count_) &&
3198  (!reader->reflects_heartbeat_count() || acknack.count.value != 0 || reader->acknack_recvd_count_ != 0)) {
3200  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C stale/duplicate message\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3201  }
3202  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
3203  "WARNING Count indicates duplicate, dropping\n"));
3204  return;
3205  }
3206 
3207  if (reader->reflects_heartbeat_count()) {
3208  if (acknack.count.value < reader->required_acknack_count_) {
3210  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C stale message (reflect %d < %d)\n", LogGuid(src).c_str(), LogGuid(id_).c_str(), acknack.count.value, reader->required_acknack_count_));
3211  }
3212  dont_schedule_nack_response = true;
3213  }
3214  }
3215  }
3216 
3217  fallback_.set(initial_fallback_);
3218 
3219  const bool is_final = acknack.smHeader.flags & RTPS::FLAG_F;
3220  const bool is_postassociation = count_is_not_zero && (is_final || bitmapNonEmpty(acknack.readerSNState) || ack != 1);
3221 
3222  if (preassociation_readers_.count(reader)) {
3223  if (is_postassociation) {
3224  remove_preassociation_reader(reader);
3226  log_progress("RTPS writer/reader association complete", id_, reader->id_, reader->participant_discovered_at_);
3227  }
3228  log_remote_counts("process_acknack");
3229 
3230  const SequenceNumber max_sn = expected_max_sn(reader);
3231  const SequenceNumber acked_sn = reader->acked_sn();
3232  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3233  check_leader_lagger();
3234  // Heartbeat is already scheduled.
3235  }
3236  }
3237 
3238  OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
3239 
3240  if (!is_final && transport_debug.log_nonfinal_messages) {
3241  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C base %q bits %u count %d\n",
3242  LogGuid(src).c_str(), LogGuid(id_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
3243  }
3244 
3245  // Process the ack.
3246  bool inform_send_listener = false;
3248 
3249  if (ack >= reader->cur_cumulative_ack_) {
3250  reader->cur_cumulative_ack_ = ack;
3251  inform_send_listener = true;
3252  } else if (count_is_not_zero) {
3253  // Count increased but ack decreased. Reset.
3254  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING RtpsUdpDataLink::RtpsWriter::process_acknack: "
3255  "%C -> %C reset detected count %d > %d ack %q < %q\n",
3256  LogGuid(id_).c_str(), LogGuid(reader->id_).c_str(),
3257  acknack.count.value, previous_count, ack.getValue(), reader->cur_cumulative_ack_.getValue()));
3258  const SequenceNumber max_sn = expected_max_sn(reader);
3259  snris_erase(previous_acked_sn == max_sn ? leading_readers_ : lagging_readers_, previous_acked_sn, reader);
3260  reader->cur_cumulative_ack_ = ack;
3261  const SequenceNumber acked_sn = reader->acked_sn();
3262  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3263  previous_acked_sn = acked_sn;
3264  check_leader_lagger();
3265  heartbeat_->schedule(fallback_.get());
3266 
3267  if (reader->durable_) {
3268  if (Transport_debug_level > 5) {
3269  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: enqueuing ReplayDurableData\n"));
3270  }
3271  reader->durable_data_.swap(pendingCallbacks);
3272  link->event_dispatcher()->dispatch(make_rch<ReplayDurableData>(link_, id_, src));
3273  reader->durable_timestamp_ = MonotonicTimePoint::zero_value;
3274  }
3275  }
3276 
3277  if (!reader->durable_data_.empty()) {
3278  if (Transport_debug_level > 5) {
3279  const LogGuid local_conv(id_), remote_conv(src);
3280  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3281  "local %C has durable for remote %C\n",
3282  local_conv.c_str(),
3283  remote_conv.c_str()));
3284  }
3285  const SequenceNumber& dd_last = reader->durable_data_.rbegin()->first;
3286  if (Transport_debug_level > 5) {
3287  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3288  "check base %q against last durable %q\n",
3289  ack.getValue(), dd_last.getValue()));
3290  }
3291  if (ack > dd_last) {
3293  log_progress("durable delivered", id_, reader->id_, reader->participant_discovered_at_);
3294  }
3295  // Reader acknowledges durable data, we no longer need to store it
3296  if (Transport_debug_level > 5) {
3297  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3298  "durable data acked\n"));
3299  }
3300  reader->durable_data_.swap(pendingCallbacks);
3301  } else {
3302  for (OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator pos = reader->durable_data_.begin(),
3303  limit = reader->durable_data_.end(); pos != limit && pos->first < ack;) {
3304  pendingCallbacks.insert(*pos);
3305  reader->durable_data_.erase(pos++);
3306  }
3307  }
3308  }
3309  } else {
3310  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C invalid acknack\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3311  }
3312 
3313  // Process the nack.
3314  bool schedule_nack_response = false;
3315  if (!dont_schedule_nack_response) {
3316  if (count_is_not_zero) {
3317  reader->requests_.reset();
3318  {
3319  const SingleSendBuffer::Proxy proxy(*send_buff_);
3320  if ((acknack.readerSNState.numBits == 0 ||
3321  (acknack.readerSNState.numBits == 1 && !(acknack.readerSNState.bitmap[0] & 1)))
3322  && ack == max_data_seq(proxy, reader)) {
3323  // Since there is an entry in requested_changes_, the DR must have
3324  // sent a non-final AckNack. If the base value is the high end of
3325  // the heartbeat range, treat it as a request for that seq#.
3326  if (reader->durable_data_.count(ack) || proxy.contains(ack) || proxy.pre_contains(ack)) {
3327  reader->requests_.insert(ack);
3328  }
3329  } else {
3330  reader->requests_.insert(ack, acknack.readerSNState.numBits, acknack.readerSNState.bitmap.get_buffer());
3331  }
3332 
3333  if (!reader->requests_.empty()) {
3334  readers_expecting_data_.insert(reader);
3335  schedule_nack_response = true;
3336  } else if (reader->requested_frags_.empty()) {
3337  readers_expecting_data_.erase(reader);
3338  }
3339  }
3340  }
3341 
3342  if (!is_final) {
3343  readers_expecting_heartbeat_.insert(reader);
3344  schedule_nack_response = true;
3345  }
3346  }
3347 
3348  if (preassociation_readers_.count(reader) == 0) {
3349  make_lagger_leader(reader, previous_acked_sn);
3350  check_leader_lagger();
3351  }
3352 
3353  TqeSet to_deliver;
3354  acked_by_all_helper_i(to_deliver);
3355 
3356 #ifdef OPENDDS_SECURITY
3357  if (is_pvs_writer_ &&
3358  !reader->pvs_outstanding_.empty() &&
3359  reader->pvs_outstanding_.low() < reader->cur_cumulative_ack_) {
3360  const OPENDDS_VECTOR(SequenceRange) psr = reader->pvs_outstanding_.present_sequence_ranges();
3361  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end();
3362  pos != limit && pos->first < reader->cur_cumulative_ack_; ++pos) {
3363  ACE_GUARD(ACE_Thread_Mutex, g, elems_not_acked_mutex_);
3364  for (SequenceNumber seq = pos->first; seq <= pos->second && seq < reader->cur_cumulative_ack_; ++seq) {
3365  reader->pvs_outstanding_.erase(seq);
3366  OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter = elems_not_acked_.find(seq);
3367  if (iter != elems_not_acked_.end()) {
3368  send_buff_->release_acked(iter->first);
3369  to_deliver.insert(iter->second);
3370  elems_not_acked_.erase(iter);
3371  }
3372  }
3373  }
3374  }
3375 #endif
3376 
3377  if (!dont_schedule_nack_response && schedule_nack_response) {
3378  RtpsUdpInst_rch cfg = link->config();
3379  nack_response_->schedule(cfg ? cfg->nak_response_delay_ : TimeDuration(0, RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC));
3380  }
3381 
3382  TransportClient_rch client = client_.lock();
3383 
3384  g.release();
3385 
3386  if (inform_send_listener && client) {
3387  client->data_acked(src);
3388  }
3389 
3390  typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
3391  for (iter_t it = pendingCallbacks.begin();
3392  it != pendingCallbacks.end(); ++it) {
3393  it->second->data_delivered();
3394  }
3395 
3396  TqeSet::iterator deliver_iter = to_deliver.begin();
3397  while (deliver_iter != to_deliver.end()) {
3398  (*deliver_iter)->data_delivered();
3399  ++deliver_iter;
3400  }
3401 }
3402 
3403 void
3405  const GuidPrefix_t& src_prefix,
3406  const NetworkAddress& remote_addr)
3407 {
3408  update_last_recv_addr(make_id(src_prefix, nackfrag.readerId), remote_addr);
3409  datawriter_dispatch(nackfrag, src_prefix, &RtpsWriter::process_nackfrag);
3410 }
3411 
3413  const GUID_t& src,
3414  MetaSubmessageVec& /*meta_submessages*/)
3415 {
3417 
3418  if (stopping_) {
3419  return;
3420  }
3421 
3422  RtpsUdpDataLink_rch link = link_.lock();
3423 
3424  if (!link) {
3425  return;
3426  }
3427 
3428  if (Transport_debug_level > 5) {
3429  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C\n",
3430  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3431  }
3432 
3433  const ReaderInfoMap::iterator ri = remote_readers_.find(src);
3434  if (ri == remote_readers_.end()) {
3436  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C unknown remote reader\n",
3437  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3438  }
3439  return;
3440  }
3441 
3442  const ReaderInfo_rch& reader = ri->second;
3443 
3444  if (!compare_and_update_counts(nackfrag.count.value, reader->nackfrag_recvd_count_)) {
3446  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C stale/duplicate message\n",
3447  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3448  }
3449  return;
3450  }
3451 
3452  const SequenceNumber seq = to_opendds_seqnum(nackfrag.writerSN);
3453 
3454  // All NackFrag messages are technically 'non-final' since they are only used to negatively acknowledge fragments and expect a response
3456  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C seq %q base %u bits %u\n",
3457  LogGuid(src).c_str(), LogGuid(id_).c_str(), seq.getValue(), nackfrag.fragmentNumberState.bitmapBase.value, nackfrag.fragmentNumberState.numBits));
3458  }
3459 
3460  reader->requested_frags_[seq][nackfrag.fragmentNumberState.bitmapBase.value] = nackfrag.fragmentNumberState;
3461  readers_expecting_data_.insert(reader);
3462  RtpsUdpInst_rch cfg = link->config();
3463  nack_response_->schedule(cfg ? cfg->nak_response_delay_ : TimeDuration(0, RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC));
3464 }
3465 
3466 void
3467 RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i(MetaSubmessageVec& meta_submessages)
3468 {
3469  RtpsUdpDataLink_rch link = link_.lock();
3470 
3471  if (!link) {
3472  return;
3473  }
3474 
3475  // Process naks requests from each reader replying with either data or a gap.
3476  // Requests for directed data are answered with a directed reply.
3477  // Requests for undirected data are answered with an undirected and consolidated reply.
3478  // Directed data includes things like the PVS writer.
3479  // The requests_ for a reader should not contain requests for durable data.
3480 
3481  typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
3482 
3483  // Consolidated non-directed requests and address sets to be sent together at the end, after directed replies
3484  typedef OPENDDS_MAP(SequenceNumber, AddrSet) RecipientMap;
3485  typedef OPENDDS_MAP(SequenceNumber, RepoIdSet) ReaderMap;
3486  DisjointSequence consolidated_requests;
3487  ReaderMap consolidated_request_readers;
3488  RecipientMap consolidated_recipients_unicast;
3489  RecipientMap consolidated_recipients_multicast;
3490  FragmentInfo consolidated_fragment_requests;
3491  ReaderMap consolidated_fragment_request_readers;
3492  RecipientMap consolidated_fragment_recipients_unicast;
3493  RecipientMap consolidated_fragment_recipients_multicast;
3494  DisjointSequence consolidated_gaps;
3495 
3496  ACE_GUARD(TransportSendBuffer::LockType, guard, send_buff_->strategy_lock());
3497  SingleSendBuffer::Proxy proxy(*send_buff_);
3498 
3499  size_t cumulative_send_count = 0;
3500 
3501  for (ReaderInfoSet::const_iterator pos = readers_expecting_data_.begin(), limit = readers_expecting_data_.end();
3502  pos != limit; ++pos) {
3503 
3504  const ReaderInfo_rch& reader = *pos;
3505  const AddrSet addrs = link->get_addresses(id_, reader->id_);
3506 
3507  DisjointSequence gaps;
3508 
3509  if (reader->expecting_durable_data()) {
3510 
3511  if (!reader->requests_.empty() && !reader->durable_data_.empty()) {
3512  const SequenceNumber dd_first = reader->durable_data_.begin()->first;
3513  const SequenceNumber dd_last = reader->durable_data_.rbegin()->first;
3514 
3515  if (reader->requests_.high() < dd_first) {
3516  gaps.insert(SequenceRange(reader->requests_.low(), dd_first.previous()));
3517  reader->requests_.reset();
3518  } else {
3519  const OPENDDS_VECTOR(SequenceRange) psr = reader->requests_.present_sequence_ranges();
3520  for (OPENDDS_VECTOR(SequenceRange)::const_iterator iter = psr.begin(), limit = psr.end();
3521  iter != limit && iter->first <= dd_last; ++iter) {
3522  for (SequenceNumber s = iter->first; s <= iter->second; ++s) {
3523  if (s <= dd_last) {
3524  const OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator dd_iter = reader->durable_data_.find(s);
3525  if (dd_iter != reader->durable_data_.end()) {
3526  link->durability_resend(dd_iter->second, cumulative_send_count);
3527  } else {
3528  gaps.insert(s);
3529  }
3530  reader->requests_.erase(s);
3531  }
3532  }
3533  }
3534  }
3535  }
3536 
3537  typedef RequestedFragSeqMap::const_iterator rfs_iter;
3538  for (rfs_iter rfs = reader->requested_frags_.begin(), limit = reader->requested_frags_.end(); rfs != limit; ++rfs) {
3539  const OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator dd_iter = reader->durable_data_.find(rfs->first);
3540  if (dd_iter != reader->durable_data_.end()) {
3541  for (RequestedFragMap::const_iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3542  link->durability_resend(dd_iter->second, rf->second, cumulative_send_count);
3543  }
3544  } else if ((!reader->durable_data_.empty() && rfs->first < reader->durable_data_.begin()->first)) {
3545  gaps.insert(rfs->first);
3546  }
3547  }
3548 
3549  gather_gaps_i(reader, gaps, meta_submessages);
3550 
3551  // The writer may not be done replaying durable data.
3552  // Do not send a gap.
3553  // TODO: If we have all of the durable data, adjust the request so that we can answer the non-durable part.
3554  reader->requests_.reset();
3555  reader->requested_frags_.clear();
3556  continue;
3557  }
3558 
3559  const SequenceNumber first_sn = std::max(non_durable_first_sn(proxy), reader->start_sn_);
3560  if (!reader->requests_.empty() &&
3561  reader->requests_.high() < first_sn) {
3562  // The reader is not going to get any data.
3563  // Send a gap that is going to to catch them up.
3564  gaps.insert(SequenceRange(reader->requests_.low(), first_sn.previous()));
3565  reader->requests_.reset();
3566  }
3567 
3568  const OPENDDS_VECTOR(SequenceRange) ranges = reader->requests_.present_sequence_ranges();
3569  for (OPENDDS_VECTOR(SequenceRange)::const_iterator iter = ranges.begin(), limit = ranges.end();
3570  iter != limit; ++iter) {
3571  for (SequenceNumber seq = iter->first; seq <= iter->second; ++seq) {
3572  GUID_t destination;
3573  if (proxy.contains(seq, destination)) {
3574  if (destination == GUID_UNKNOWN) {
3575  // Not directed.
3576  consolidated_requests.insert(seq);
3577  consolidated_request_readers[seq].insert(reader->id_);
3578  consolidated_recipients_unicast[seq].insert(addrs.begin(), addrs.end());
3580  link->accumulate_addresses(id_, reader->id_, consolidated_recipients_multicast[seq], false);
3581  continue;
3582  } else if (destination != reader->id_) {
3583  // Directed at another reader.
3584  gaps.insert(seq);
3585  continue;
3586  } else {
3587  // Directed at the reader.
3589  link->send_strategy()->override_destinations(addrs);
3590  proxy.resend_i(SequenceRange(seq, seq), 0, reader->id_);
3591  ++cumulative_send_count;
3592  continue;
3593  }
3594  } else if (proxy.pre_contains(seq) || seq > max_sn_) {
3595  // Can't answer, don't gap.
3596  continue;
3597  }
3598 
3599  if (durable_ || is_pvs_writer()) {
3600  // Must send directed gap.
3601  gaps.insert(seq);
3602  } else {
3603  // Non-directed gap.
3604  consolidated_gaps.insert(seq);
3605  }
3606  }
3607  }
3608 
3609  reader->requests_.reset();
3610 
3611  typedef RequestedFragSeqMap::iterator rfs_iter;
3612  const rfs_iter rfs_end = reader->requested_frags_.end();
3613  for (rfs_iter rfs = reader->requested_frags_.begin(); rfs != rfs_end; ++rfs) {
3614  const SequenceNumber& seq = rfs->first;
3615  GUID_t destination;
3616  if (proxy.contains(seq, destination)) {
3617  if (destination == GUID_UNKNOWN) {
3618  for (RequestedFragMap::iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3619  consolidated_fragment_requests[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
3620  rf->second.bitmap.get_buffer());
3621  }
3622  consolidated_fragment_request_readers[seq].insert(reader->id_);
3623  consolidated_fragment_recipients_unicast[seq].insert(addrs.begin(), addrs.end());
3625  link->accumulate_addresses(id_, reader->id_, consolidated_fragment_recipients_multicast[seq], false);
3626  continue;
3627  } else if (destination != reader->id_) {
3628  // Directed at another reader.
3629  gaps.insert(seq);
3630  continue;
3631  } else {
3632  // Directed at the reader.
3634  link->send_strategy()->override_destinations(addrs);
3635  for (RequestedFragMap::iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3636  DisjointSequence x;
3637  x.insert(rf->second.bitmapBase.value, rf->second.numBits,
3638  rf->second.bitmap.get_buffer());
3639  proxy.resend_fragments_i(seq, x, cumulative_send_count);
3640  }
3641  continue;
3642  }
3643  } else if (proxy.pre_contains(seq) || seq > max_sn_) {
3644  // Can't answer, don't gap.
3645  continue;
3646  }
3647 
3648  if (durable_ || is_pvs_writer()) {
3649  // Must send directed gap.
3650  gaps.insert(seq);
3651  } else {
3652  // Non-directed gap.
3653  consolidated_gaps.insert(seq);
3654  }
3655  }
3656 
3657  reader->requested_frags_.clear();
3658 
3659  // Gather directed gaps.
3660  gather_gaps_i(reader, gaps, meta_submessages);
3661  }
3662 
3663  {
3664  // Send the consolidated requests.
3665  const OPENDDS_VECTOR(SequenceRange) ranges = consolidated_requests.present_sequence_ranges();
3666  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = ranges.begin(), limit = ranges.end();
3667  pos != limit; ++pos) {
3668  if (Transport_debug_level > 5) {
3669  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: "
3670  "resend data %q-%q\n", pos->first.getValue(),
3671  pos->second.getValue()));
3672  }
3673  for (SequenceNumber seq = pos->first; seq <= pos->second; ++seq) {
3674  const AddrSet& uni = consolidated_recipients_unicast[seq];
3675  const AddrSet& multi = consolidated_recipients_multicast[seq];
3676  const RepoIdSet& readers = consolidated_request_readers[seq];
3677 
3678  if (proxy.has_frags(seq)) {
3679  if (consolidated_fragment_requests.find(seq) == consolidated_fragment_requests.end()) {
3680  consolidated_fragment_requests[seq].insert(1);
3681  }
3682  consolidated_fragment_recipients_unicast[seq].insert(uni.begin(), uni.end());
3683  consolidated_fragment_recipients_multicast[seq].insert(multi.begin(), multi.end());
3684  consolidated_fragment_request_readers[seq].insert(readers.begin(), readers.end());
3685  } else {
3687  link->send_strategy()->override_destinations(readers.size() * 2 > remote_readers_.size() ? multi : uni);
3688 
3689  proxy.resend_i(SequenceRange(seq, seq));
3690  ++cumulative_send_count;
3691  }
3692  }
3693  }
3694 
3695  for (FragmentInfo::const_iterator pos = consolidated_fragment_requests.begin(),
3696  limit = consolidated_fragment_requests.end(); pos != limit; ++pos) {
3697  const AddrSet& uni = consolidated_fragment_recipients_unicast[pos->first];
3698  const AddrSet& multi = consolidated_fragment_recipients_multicast[pos->first];
3699  const RepoIdSet& readers = consolidated_fragment_request_readers[pos->first];
3701  link->send_strategy()->override_destinations(readers.size() * 2 > remote_readers_.size() ? multi : uni);
3702 
3703  proxy.resend_fragments_i(pos->first, pos->second, cumulative_send_count);
3704  }
3705  }
3706 
3707  if (cumulative_send_count) {
3708  RtpsUdpInst_rch cfg = link->config();
3709  if (cfg && cfg->count_messages()) {
3711  link->transport_statistics_.writer_resend_count[id_] += static_cast<ACE_CDR::ULong>(cumulative_send_count);
3712  }
3713  }
3714 
3715  // Gather the consolidated gaps.
3716  if (!consolidated_gaps.empty()) {
3717  if (Transport_debug_level > 5) {
3719  "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: GAPs:\n"));
3720  consolidated_gaps.dump();
3721  }
3722  gather_gaps_i(ReaderInfo_rch(), consolidated_gaps, meta_submessages);
3723  } else if (Transport_debug_level > 5) {
3724  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: "
3725  "no GAPs to send\n"));
3726  }
3727 
3728  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
3729  initialize_heartbeat(proxy, meta_submessage);
3730 
3731  // Directed, non-final.
3732  for (ReaderInfoSet::const_iterator pos = readers_expecting_data_.begin(), limit = readers_expecting_data_.end();
3733  pos != limit; ++pos) {
3734  const ReaderInfo_rch& reader = *pos;
3735  readers_expecting_heartbeat_.erase(reader);
3736  if (reader->reflects_heartbeat_count()) {
3737  meta_submessage.sm_.heartbeat_sm().smHeader.flags |= RTPS::OPENDDS_FLAG_R;
3738  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3739  reader->required_acknack_count_ = heartbeat_count_;
3740  meta_submessage.sm_.heartbeat_sm().smHeader.flags &= ~RTPS::OPENDDS_FLAG_R;
3741  } else {
3742  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3743  }
3744  }
3745  readers_expecting_data_.clear();
3746 
3747  // Directed, final.
3748  meta_submessage.sm_.heartbeat_sm().smHeader.flags |= RTPS::FLAG_F;
3749  meta_submessages.reserve(meta_submessages.size() + readers_expecting_heartbeat_.size());
3750  for (ReaderInfoSet::const_iterator pos = readers_expecting_heartbeat_.begin(), limit = readers_expecting_heartbeat_.end();
3751  pos != limit; ++pos) {
3752  const ReaderInfo_rch& reader = *pos;
3753  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3754  }
3755  readers_expecting_heartbeat_.clear();
3756 }
3757 
3760 {
3761  ACE_UNUSED_ARG(reader);
3762 #ifdef OPENDDS_SECURITY
3763  if (is_pvs_writer_) {
3764  return reader->max_pvs_sn_;
3765  } else {
3766 #endif
3767  return max_sn_;
3768 #ifdef OPENDDS_SECURITY
3769  }
3770 #endif
3771 }
3772 
3773 void
3774 RtpsUdpDataLink::RtpsWriter::snris_insert(RtpsUdpDataLink::SNRIS& snris,
3775  const ReaderInfo_rch& reader)
3776 {
3777  const SequenceNumber sn = reader->acked_sn();
3778  SNRIS::iterator pos = snris.find(sn);
3779  if (pos == snris.end()) {
3780  pos = snris.insert(RtpsUdpDataLink::SNRIS::value_type(sn, make_rch<ReaderInfoSetHolder>())).first;
3781  }
3782  pos->second->readers.insert(reader);
3783 }
3784 
3785 void
3786 RtpsUdpDataLink::RtpsWriter::snris_erase(RtpsUdpDataLink::SNRIS& snris,
3787  const SequenceNumber sn,
3788  const ReaderInfo_rch& reader)
3789 {
3790  SNRIS::iterator pos = snris.find(sn);
3791  if (pos != snris.end()) {
3792  pos->second->readers.erase(reader);
3793  if (pos->second->readers.empty()) {
3794  snris.erase(pos);
3795  }
3796  }
3797 }
3798 
3799 void
3801  SequenceNumber previous_max_sn)
3802 {
3803  ACE_UNUSED_ARG(reader_id);
3804 
3805  if (stopping_) {
3806  return;
3807  }
3808 
3809  RtpsUdpDataLink_rch link = link_.lock();
3810  if (!link) {
3811  return;
3812  }
3813 
3814 #ifdef OPENDDS_SECURITY
3815  if (!is_pvs_writer_) {
3816 #endif
3817  if (previous_max_sn != max_sn_) {
3818  // All readers that have acked previous_max_sn are now lagging.
3819  // Move leader_readers_[previous_max_sn] to lagging_readers_[previous_max_sn].
3820  SNRIS::iterator leading_pos = leading_readers_.find(previous_max_sn);
3821  SNRIS::iterator lagging_pos = lagging_readers_.find(previous_max_sn);
3822  if (leading_pos != leading_readers_.end()) {
3823  if (lagging_pos != lagging_readers_.end()) {
3824  lagging_pos->second->readers.insert(leading_pos->second->readers.begin(), leading_pos->second->readers.end());
3825  } else {
3826  lagging_readers_[previous_max_sn] = leading_pos->second;
3827  }
3828  leading_readers_.erase(leading_pos);
3829  heartbeat_->schedule(fallback_.get());
3830  }
3831  }
3832 #ifdef OPENDDS_SECURITY
3833  } else {
3834  // Move a specific reader.
3835  const ReaderInfoMap::iterator iter = remote_readers_.find(reader_id);
3836  if (iter == remote_readers_.end()) {
3837  return;
3838  }
3839 
3840  const ReaderInfo_rch& reader = iter->second;
3841  previous_max_sn = reader->max_pvs_sn_;
3842  reader->max_pvs_sn_ = max_sn_;
3843  if (preassociation_readers_.count(reader)) {
3844  // Will be inserted once association is complete.
3845  return;
3846  }
3847 
3848  const SequenceNumber acked_sn = reader->acked_sn();
3849  if (acked_sn == previous_max_sn && previous_max_sn != max_sn_) {
3850  snris_erase(leading_readers_, acked_sn, reader);
3851  snris_insert(lagging_readers_, reader);
3852  heartbeat_->schedule(fallback_.get());
3853  }
3854  }
3855 #endif
3856 }
3857 
3858 void
3860  const SequenceNumber previous_acked_sn)
3861 {
3862  RtpsUdpDataLink_rch link = link_.lock();
3863  if (!link) {
3864  return;
3865  }
3866 
3867  const SequenceNumber acked_sn = reader->acked_sn();
3868  if (previous_acked_sn == acked_sn) { return; }
3869  const SequenceNumber previous_max_sn = expected_max_sn(reader);
3870 #ifdef OPENDDS_SECURITY
3871  if (is_pvs_writer_ && acked_sn > previous_max_sn) {
3872  reader->max_pvs_sn_ = acked_sn;
3873  }
3874 #endif
3875  const SequenceNumber max_sn = expected_max_sn(reader);
3876 
3877  snris_erase(previous_acked_sn == previous_max_sn ? leading_readers_ : lagging_readers_, previous_acked_sn, reader);
3878  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3879  if (acked_sn != max_sn) {
3880  heartbeat_->schedule(fallback_.get());
3881  }
3882 }
3883 
3884 bool
3886 {
3887  return reader->acked_sn() != expected_max_sn(reader);
3888 }
3889 
3890 bool
3892 {
3893  return reader->acked_sn() == expected_max_sn(reader);
3894 }
3895 
3896 void
3898 {
3899 #ifndef OPENDDS_SAFETY_PROFILE
3900 #ifndef NDEBUG
3901  static const SequenceNumber negative_one = SequenceNumber::ZERO().previous();
3902  for (SNRIS::const_iterator pos1 = lagging_readers_.begin(), limit = lagging_readers_.end();
3903  pos1 != limit; ++pos1) {
3904  const SequenceNumber& sn = pos1->first;
3905  const ReaderInfoSetHolder_rch& readers = pos1->second;
3906  for (ReaderInfoSet::const_iterator pos2 = readers->readers.begin(), limit = readers->readers.end();
3907  pos2 != limit; ++pos2) {
3908  const ReaderInfo_rch& reader = *pos2;
3909  OPENDDS_ASSERT(reader->acked_sn() == sn);
3910  const SequenceNumber expect_max_sn = expected_max_sn(reader);
3911  OPENDDS_ASSERT(sn == negative_one || sn < expect_max_sn);
3912  OPENDDS_ASSERT(preassociation_readers_.count(reader) == 0);
3913  }
3914  }
3915 
3916  for (SNRIS::const_iterator pos1 = leading_readers_.begin(), limit = leading_readers_.end();
3917  pos1 != limit; ++pos1) {
3918  const SequenceNumber& sn = pos1->first;
3919  const ReaderInfoSetHolder_rch& readers = pos1->second;
3920  for (ReaderInfoSet::const_iterator pos2 = readers->readers.begin(), limit = readers->readers.end();
3921  pos2 != limit; ++pos2) {
3922  const ReaderInfo_rch& reader = *pos2;
3923  OPENDDS_ASSERT(reader->acked_sn() == sn);
3924  const SequenceNumber expect_max_sn = expected_max_sn(reader);
3925  OPENDDS_ASSERT(sn == expect_max_sn);
3926  OPENDDS_ASSERT(preassociation_readers_.count(reader) == 0);
3927  }
3928  }
3929 #endif
3930 #endif
3931 }
3932 
3933 void
3935 {
3936  ACE_UNUSED_ARG(reader_id);
3937  ACE_UNUSED_ARG(seq);
3938 #ifdef OPENDDS_SECURITY
3939  if (!is_pvs_writer_) {
3940  return;
3941  }
3942 
3943  const ReaderInfoMap::iterator iter = remote_readers_.find(reader_id);
3944  if (iter == remote_readers_.end()) {
3945  return;
3946  }
3947 
3948  const ReaderInfo_rch& reader = iter->second;
3949  reader->pvs_outstanding_.insert(seq);
3950 #endif
3951 }
3952 
3953 void
3955 {
3956  TqeSet to_deliver;
3957  {
3959  acked_by_all_helper_i(to_deliver);
3960  }
3961 
3962  TqeSet::iterator deliver_iter = to_deliver.begin();
3963  while (deliver_iter != to_deliver.end()) {
3964  (*deliver_iter)->data_delivered();
3965  ++deliver_iter;
3966  }
3967 }
3968 
3969 void
3971 {
3972  using namespace OpenDDS::RTPS;
3973  typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
3974  OPENDDS_VECTOR(GUID_t) to_check;
3975 
3976  RtpsUdpDataLink_rch link = link_.lock();
3977 
3978  if (!link) {
3979  return;
3980  }
3981 
3982  //start with the max sequence number writer knows about and decrease
3983  //by what the min over all readers is
3984  SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
3985  if (!preassociation_readers_.empty()) {
3986  all_readers_ack = std::min(all_readers_ack, *preassociation_reader_start_sns_.begin());
3987  }
3988  if (!lagging_readers_.empty()) {
3989  all_readers_ack = std::min(all_readers_ack, lagging_readers_.begin()->first + 1);
3990  }
3991  if (!leading_readers_.empty()) {
3992  // When is_pvs_writer_ is true, the leading_readers_ will all be
3993  // at different sequence numbers. The minimum could actually be
3994  // before the preassociation readers or lagging readers. Use the
3995  // largest sequence number to avoid holding onto samples.
3996  all_readers_ack = std::min(all_readers_ack, leading_readers_.rbegin()->first + 1);
3997  }
3998 
3999  if (all_readers_ack == SequenceNumber::MAX_VALUE) {
4000  return;
4001  }
4002 
4003  ACE_GUARD(ACE_Thread_Mutex, g2, elems_not_acked_mutex_);
4004 
4005  if (!elems_not_acked_.empty()) {
4006  for (iter_t it = elems_not_acked_.begin(), limit = elems_not_acked_.end();
4007  it != limit && it->first < all_readers_ack;) {
4008  send_buff_->release_acked(it->first);
4009  to_deliver.insert(it->second);
4010  elems_not_acked_.erase(it++);
4011  }
4012  }
4013 }
4014 
4016  size_t& cumulative_send_count)
4017 {
4018  static CORBA::Long buffer[8];
4019  static const RTPS::FragmentNumberSet none = { {0}, 0, RTPS::LongSeq8(0, buffer) };
4020  durability_resend(element, none, cumulative_send_count);
4021 }
4022 
4024  const RTPS::FragmentNumberSet& fragmentSet,
4025  size_t& cumulative_send_count)
4026 {
4027  if (Transport_debug_level > 5) {
4028  ACE_DEBUG((LM_DEBUG, "(%P|%t) TRACK RtpsUdpDataLink::durability_resend %q\n", element->sequence().getValue()));
4029  }
4030  const AddrSet addrs = get_addresses(element->publication_id(), element->subscription_id());
4031  if (addrs.empty()) {
4032  const LogGuid conv(element->subscription_id());
4034  "(%P|%t) ERROR: RtpsUdpDataLink::durability_resend() - "
4035  "no locator for remote %C\n", conv.c_str()));
4036  return;
4037  }
4038 
4039  TqeVector to_send;
4040  if (!send_strategy()->fragmentation_helper(element, to_send)) {
4041  return;
4042  }
4043 
4044  DisjointSequence fragments;
4045  fragments.insert(fragmentSet.bitmapBase.value, fragmentSet.numBits,
4046  fragmentSet.bitmap.get_buffer());
4047  SequenceNumber lastFragment = 0;
4048 
4049  const TqeVector::iterator end = to_send.end();
4050  for (TqeVector::iterator i = to_send.begin(); i != end; ++i) {
4051  if (fragments.empty() || include_fragment(**i, fragments, lastFragment)) {
4052  RTPS::Message message;
4053  send_strategy()->send_rtps_control(message, *const_cast<ACE_Message_Block*>((*i)->msg()), addrs);
4054  ++cumulative_send_count;
4055  }
4056 
4057  (*i)->data_delivered();
4058  }
4059 }
4060 
4062  const DisjointSequence& fragments,
4063  SequenceNumber& lastFragment)
4064 {
4065  if (!element.is_fragment()) {
4066  return true;
4067  }
4068 
4069  const RtpsCustomizedElement* const rce = dynamic_cast<const RtpsCustomizedElement*>(&element);
4070  if (!rce) {
4071  return true;
4072  }
4073 
4074  const SequenceRange thisElement(lastFragment + 1, rce->last_fragment());
4075  lastFragment = thisElement.second;
4076  return fragments.contains_any(thisElement);
4077 }
4078 
4079 void
4081 {
4082  OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks;
4083 
4084  RtpsUdpInst_rch cfg = config();
4085 
4086  MetaSubmessageVec meta_submessages;
4087 
4088  {
4090 
4091  const MonotonicTimePoint tv = now - 10 * cfg->heartbeat_period_;
4092  const MonotonicTimePoint tv3 = now - 3 * cfg->heartbeat_period_;
4093 
4095  WtaMap writers_to_advertise;
4096 
4097  for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
4098  limit = interesting_readers_.end();
4099  pos != limit;
4100  ++pos) {
4101  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
4102  (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
4103  RcHandle<ConstSharedRepoIdSet>& tg = writers_to_advertise[pos->second.localid];
4104  if (!tg) {
4105  tg = make_rch<ConstSharedRepoIdSet>();
4106  }
4107  const_cast<RepoIdSet&>(tg->guids_).insert(pos->first);
4108  }
4109  if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
4110  CallbackType callback(pos->first, pos->second);
4111  readerDoesNotExistCallbacks.push_back(callback);
4112  pos->second.status = InterestingRemote::DOES_NOT_EXIST;
4113  }
4114  }
4115 
4116  for (WtaMap::const_iterator pos = writers_to_advertise.begin(),
4117  limit = writers_to_advertise.end();
4118  pos != limit;
4119  ++pos) {
4120  RtpsWriterMap::const_iterator wpos = writers_.find(pos->first);
4121  if (wpos != writers_.end()) {
4122  wpos->second->gather_heartbeats(pos->second, meta_submessages);
4123  } else {
4124  using namespace OpenDDS::RTPS;
4125  const int count = ++heartbeat_counts_[pos->first.entityId];
4126  const HeartBeatSubmessage hb = {
4128  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4129  pos->first.entityId,
4132  {count}
4133  };
4134 
4135  for (RepoIdSet::const_iterator it = pos->second->guids_.begin(),
4136  limit = pos->second->guids_.end(); it != limit; ++it) {
4137 
4138  MetaSubmessage meta_submessage(pos->first, *it);
4139  meta_submessage.sm_.heartbeat_sm(hb);
4140  meta_submessages.push_back(meta_submessage);
4141  }
4142  }
4143  }
4144  }
4145 
4146  queue_submessages(meta_submessages);
4147 
4148  for (OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks.begin();
4149  iter != readerDoesNotExistCallbacks.end(); ++iter) {
4150  const InterestingRemote& remote = iter->second;
4151  remote.listener->reader_does_not_exist(iter->first, remote.localid);
4152  }
4153 }
4154 
4155 void
4157  MetaSubmessage& meta_submessage)
4158 {
4159  using namespace OpenDDS::RTPS;
4160 
4161  // Assume no samples are available.
4162  const SequenceNumber nonDurableFirstSN = non_durable_first_sn(proxy);
4163  const SequenceNumber firstSN = durable_ ? 1 : nonDurableFirstSN;
4164 
4165  const HeartBeatSubmessage hb = {
4166  {HEARTBEAT,
4167  FLAG_E,
4168  HEARTBEAT_SZ},
4169  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4170  id_.entityId,
4171  to_rtps_seqnum(firstSN),
4172  to_rtps_seqnum(max_sn_),
4173  {0}
4174  };
4175 
4176  meta_submessage.sm_.heartbeat_sm(hb);
4177 }
4178 
4179 void
4181  MetaSubmessageVec& meta_submessages,
4182  MetaSubmessage& meta_submessage,
4183  const ReaderInfo_rch& reader)
4184 {
4185  const SequenceNumber first_sn = reader->durable_ ? 1 : std::max(non_durable_first_sn(proxy), reader->start_sn_);
4186  SequenceNumber last_sn = expected_max_sn(reader);
4187 #ifdef OPENDDS_SECURITY
4188  if (is_pvs_writer_ && last_sn < first_sn.previous()) {
4189  // This can happen if the reader get's reset.
4190  // Adjust the heartbeat to be valid.
4191  // Make lagger_leader will eventually correct the problem.
4192  last_sn = first_sn.previous();
4193  }
4194 #endif
4195  meta_submessage.dst_guid_ = reader->id_;
4196  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4197  meta_submessage.sm_.heartbeat_sm().readerId = reader->id_.entityId;
4198  meta_submessage.sm_.heartbeat_sm().firstSN = to_rtps_seqnum(first_sn);
4199  meta_submessage.sm_.heartbeat_sm().lastSN = to_rtps_seqnum(last_sn);
4200  OPENDDS_ASSERT(!(first_sn < 1 || last_sn < 0 || last_sn < first_sn.previous()));
4201  meta_submessages.push_back(meta_submessage);
4202  meta_submessage.reset_destination();
4203 }
4204 void
4206 {
4207  RtpsUdpDataLink_rch link = link_.lock();
4208  if (!link) {
4209  return;
4210  }
4211 
4212  {
4213  ACE_Guard<ACE_Thread_Mutex> rrg_guard(remote_reader_guids_mutex_);
4214 
4215  // We make a new RcHandle to prevent changing what's being pointed to by existing references in the send queue (i.e. to preserve historic values)
4216  RcHandle<ConstSharedRepoIdSet> temp = make_rch<ConstSharedRepoIdSet>();
4217  if (remote_reader_guids_) {
4218  const_cast<RepoIdSet&>(temp->guids_) = remote_reader_guids_->guids_;
4219  }
4220  if (add) {
4221  const_cast<RepoIdSet&>(temp->guids_).insert(guid);
4222  } else {
4223  const_cast<RepoIdSet&>(temp->guids_).erase(guid);
4224  }
4225  remote_reader_guids_ = temp;
4226  }
4227 
4229 }
4230 
4231 void
4232 RtpsUdpDataLink::RtpsWriter::gather_heartbeats_i(MetaSubmessageVec& meta_submessages)
4233 {
4234  if (preassociation_readers_.empty() && lagging_readers_.empty()) {
4235  return;
4236  }
4237 
4238  check_leader_lagger();
4239 
4240  RtpsUdpDataLink_rch link = link_.lock();
4241  if (!link) {
4242  return;
4243  }
4244 
4245  using namespace OpenDDS::RTPS;
4246 
4247  const SingleSendBuffer::Proxy proxy(*send_buff_);
4248 
4249  // Assume no samples are available.
4250  const SequenceNumber nonDurableFirstSN = non_durable_first_sn(proxy);
4251  const SequenceNumber firstSN = durable_ ? 1 : nonDurableFirstSN;
4252  const SequenceNumber lastSN = max_sn_;
4253 
4254  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4255  initialize_heartbeat(proxy, meta_submessage);
4256 
4257  // Directed, non-final.
4258  if (!preassociation_readers_.empty()) {
4259  meta_submessages.reserve(meta_submessages.size() + preassociation_readers_.size());
4260  for (ReaderInfoSet::const_iterator pos = preassociation_readers_.begin(), limit = preassociation_readers_.end();
4261  pos != limit; ++pos) {
4262  const ReaderInfo_rch& reader = *pos;
4263  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
4264  }
4265  }
4266 
4267  if (!lagging_readers_.empty()) {
4268  if (leading_readers_.empty() && remote_readers_.size() > 1
4269 #ifdef OPENDDS_SECURITY
4270  && !is_pvs_writer_
4271  && !is_ps_writer_
4272 #endif
4273  ) {
4274  // Every reader is lagging and there is more than one.
4275  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4276  meta_submessage.sm_.heartbeat_sm().readerId = ENTITYID_UNKNOWN;
4277  meta_submessage.sm_.heartbeat_sm().firstSN = to_rtps_seqnum(firstSN);
4278  meta_submessage.sm_.heartbeat_sm().lastSN = to_rtps_seqnum(lastSN);
4279 
4280  meta_submessages.push_back(meta_submessage);
4281  meta_submessage.reset_destination();
4282  } else {
4283  for (SNRIS::const_iterator snris_pos = lagging_readers_.begin(), snris_limit = lagging_readers_.end();
4284  snris_pos != snris_limit; ++snris_pos) {
4285  meta_submessages.reserve(meta_submessages.size() + snris_pos->second->readers.size());
4286  for (ReaderInfoSet::const_iterator pos = snris_pos->second->readers.begin(),
4287  limit = snris_pos->second->readers.end();
4288  pos != limit; ++pos) {
4289  const ReaderInfo_rch& reader = *pos;
4290  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
4291  }
4292  }
4293  }
4294  }
4295 }
4296 
4297 void
4299  MetaSubmessageVec& meta_submessages)
4300 {
4301  OPENDDS_ASSERT(!additional_guids->guids_.empty());
4302 
4304 
4305  RtpsUdpDataLink_rch link = link_.lock();
4306  if (!link) {
4307  return;
4308  }
4309 
4310  const SingleSendBuffer::Proxy proxy(*send_buff_);
4311 
4312  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4313  initialize_heartbeat(proxy, meta_submessage);
4314 
4315  for (RepoIdSet::const_iterator it = additional_guids->guids_.begin(),
4316  limit = additional_guids->guids_.end(); it != limit; ++it) {
4317 
4318  // Semi-directed (INFO_DST but ENTITYID_UNKNOWN, non-final.
4319  meta_submessage.dst_guid_ = *it;
4320  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4321  meta_submessages.push_back(meta_submessage);
4322  meta_submessage.reset_destination();
4323  }
4324 }
4325 
4326 void
4328 {
4329  OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks;
4330 
4331  RtpsUdpInst_rch cfg = config();
4332 
4333  // Have any interesting writers timed out?
4334  const MonotonicTimePoint tv(now - 10 * (cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC)));
4335  {
4337 
4338  for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
4339  pos != limit;
4340  ++pos) {
4341  if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
4342  CallbackType callback(pos->first, pos->second);
4343  writerDoesNotExistCallbacks.push_back(callback);
4344  pos->second.status = InterestingRemote::DOES_NOT_EXIST;
4345  }
4346  }
4347  }
4348 
4349  OPENDDS_VECTOR(CallbackType)::iterator iter;
4350  for (iter = writerDoesNotExistCallbacks.begin(); iter != writerDoesNotExistCallbacks.end(); ++iter) {
4351  const GUID_t& rid = iter->first;
4352  const InterestingRemote& remote = iter->second;
4353  remote.listener->writer_does_not_exist(rid, remote.localid);
4354  }
4355 }
4356 
4357 void
4358 RtpsUdpDataLink::send_heartbeats_manual_i(const TransportSendControlElement* tsce, MetaSubmessageVec& meta_submessages)
4359 {
4360  using namespace OpenDDS::RTPS;
4361 
4362  const GUID_t pub_id = tsce->publication_id();
4363  const HeartBeatSubmessage hb = {
4364  {HEARTBEAT,
4366  HEARTBEAT_SZ},
4367  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4368  pub_id.entityId,
4369  // This liveliness heartbeat is from a best-effort Writer, the sequence numbers are not used
4371  to_rtps_seqnum(tsce->sequence()),
4373  };
4374 
4375  MetaSubmessage meta_submessage(pub_id, GUID_UNKNOWN);
4376  meta_submessage.sm_.heartbeat_sm(hb);
4377  meta_submessages.push_back(meta_submessage);
4378 }
4379 
4380 void
4381 RtpsUdpDataLink::RtpsWriter::send_heartbeats_manual_i(MetaSubmessageVec& meta_submessages)
4382 {
4383  using namespace OpenDDS::RTPS;
4384 
4385  RtpsUdpDataLink_rch link = link_.lock();
4386  if (!link) {
4387  return;
4388  }
4389 
4390  const SingleSendBuffer::Proxy proxy(*send_buff_);
4391  const SequenceNumber firstSN = durable_ ? 1 : non_durable_first_sn(proxy);
4392  const SequenceNumber lastSN = max_sn_;
4393  const int counter = ++heartbeat_count_;
4394 
4395  const HeartBeatSubmessage hb = {
4396  {HEARTBEAT,
4398  HEARTBEAT_SZ},
4399  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4400  id_.entityId,
4401  to_rtps_seqnum(firstSN),
4402  to_rtps_seqnum(lastSN),
4403  {counter}
4404  };
4405 
4406  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4407  meta_submessage.sm_.heartbeat_sm(hb);
4408 
4409  meta_submessages.push_back(meta_submessage);
4410 }
4411 
4413 {
4414  expunge_durable_data();
4415 }
4416 
4417 void
4419 {
4420  durable_data_.swap(dd);
4421 }
4422 
4423 void
4425 {
4426  typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
4427  for (iter_t it = durable_data_.begin(); it != durable_data_.end(); ++it) {
4428  it->second->data_dropped();
4429  }
4430 }
4431 
4432 bool
4434 {
4435  return durable_ &&
4436  (durable_timestamp_.is_zero() // DW hasn't resent yet
4437  || !durable_data_.empty()); // DW resent, not sent to reader
4438 }
4439 
4440 bool
4442 {
4443  return participant_flags_ & RTPS::PFLAGS_REFLECT_HEARTBEAT_COUNT;
4444 }
4445 
4447  const GUID_t& id,
4448  bool durable, SequenceNumber max_sn, int heartbeat_count, size_t capacity)
4449  : send_buff_(make_rch<SingleSendBuffer>(capacity, ONE_SAMPLE_PER_PACKET))
4450  , max_sn_(max_sn == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? SequenceNumber::ZERO() : max_sn)
4451  , client_(client)
4452  , link_(link)
4453  , id_(id)
4454  , durable_(durable)
4455  , stopping_(false)
4456  , heartbeat_count_(heartbeat_count)
4457 #ifdef OPENDDS_SECURITY
4458  , is_pvs_writer_(id_.entityId == RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER)
4459  , is_ps_writer_(id_.entityId == RTPS::ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER)
4460 #endif
4462  , nack_response_(make_rch<SporadicEvent>(link->event_dispatcher(), make_rch<PmfNowEvent<RtpsWriter> >(rchandle_from(this), &RtpsWriter::send_nack_responses)))
4463  , initial_fallback_(link->config()->heartbeat_period_)
4464  , fallback_(initial_fallback_)
4465 {
4466  send_buff_->bind(link->send_strategy().in());
4467 }
4468 
4470 {
4471  if (!elems_not_acked_.empty()) {
4472  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: RtpsWriter::~RtpsWriter - ")
4473  ACE_TEXT("deleting with %d elements left not fully acknowledged\n"),
4474  elems_not_acked_.size()));
4475  }
4476 }
4477 
4479 {
4481  return ++heartbeat_count_;
4482 }
4483 
4486  const ReaderInfo_rch& ri) const
4487 {
4488  const SequenceNumber durable_max = ri->durable_data_.empty() ? 0 : ri->durable_data_.rbegin()->first;
4489  const SequenceNumber pre_max = proxy.pre_empty() ? 0 : proxy.pre_high();
4490  const SequenceNumber data_max = proxy.empty() ? 0 : proxy.high();
4491  return std::max(durable_max, std::max(pre_max, data_max));
4492 }
4493 
4496 {
4498  SequenceNumber previous_max_sn = max_sn_;
4499  max_sn_ = std::max(max_sn_, seq);
4500  make_leader_lagger(reader, previous_max_sn);
4502  return max_sn_;
4503 }
4504 
4505 void
4507 {
4509  elems_not_acked_.insert(SnToTqeMap::value_type(element->sequence(), element));
4510 }
4511 
4512 bool
4514 {
4516 
4517  ReaderInfoMap::const_iterator iter = remote_readers_.find(reader_id);
4518  if (iter != remote_readers_.end()) {
4519  return is_leading(iter->second);
4520  }
4521 
4522  return false;
4523 }
4524 
4525 void
4527 {
4529 
4530  if (stopping_) {
4531  return;
4532  }
4533 
4534  RtpsUdpDataLink_rch link = link_.lock();
4535 
4536  if (!link) {
4537  return;
4538  }
4539 
4540  OPENDDS_VECTOR(ReceivedDataSample) to_deliver;
4541 
4542  const WriterInfoMap::iterator wi = remote_writers_.find(src);
4543  if (wi != remote_writers_.end()) {
4544  const SequenceNumber ca = wi->second->recvd_.cumulative_ack();
4545  const WriterInfo::HeldMap::iterator end = wi->second->held_.upper_bound(ca);
4546  for (WriterInfo::HeldMap::iterator it = wi->second->held_.begin(); it != end; /*increment in loop body*/) {
4547  to_deliver.push_back(it->second);
4548  wi->second->held_.erase(it++);
4549  }
4550  }
4551 
4552  const GUID_t dst = id_;
4553 
4554  g.release();
4555 
4556  for (OPENDDS_VECTOR(ReceivedDataSample)::iterator it = to_deliver.begin(); it != to_deliver.end(); ++it) {
4557  if (Transport_debug_level > 5) {
4558  LogGuid reader(dst);
4559  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::DeliverHeldData::~DeliverHeldData -")
4560  ACE_TEXT(" deliver sequence: %q to %C\n"),
4561  it->header_.sequence_.getValue(),
4562  reader.c_str()));
4563  }
4564  link->data_received(*it, dst);
4565  }
4566 }
4567 
4569 {
4570  if (reader_) {
4571  reader_->deliver_held_data(writer_id_);
4572  }
4573 }
4574 
4577 {
4579 }
4580 
4583 {
4585 }
4586 
4589 {
4591 }
4592 
4593 AddrSet
4594 RtpsUdpDataLink::get_addresses(const GUID_t& local, const GUID_t& remote) const
4595 {
4597  return get_addresses_i(local, remote);
4598 }
4599 
4600 AddrSet
4602 {
4604  return get_addresses_i(local);
4605 }
4606 
4607 AddrSet
4608 RtpsUdpDataLink::get_addresses_i(const GUID_t& local, const GUID_t& remote) const
4609 {
4610  AddrSet retval;
4611 
4612  accumulate_addresses(local, remote, retval, true);
4613 
4614  return retval;
4615 }
4616 
4617 AddrSet
4619 {
4620  AddrSet retval;
4621  bool use_peers = true;
4622 
4623  // For reliable writers, use remote_reader_guids()
4624  const GuidConverter conv(local);
4625  if (conv.isWriter()) {
4626  RtpsWriter_rch writer;
4628  RtpsWriterMap::const_iterator pos = writers_.find(local);
4629  if (pos != writers_.end()) {
4630  writer = pos->second;
4631  }
4632  guard.release();
4633  if (writer) {
4634  RcHandle<ConstSharedRepoIdSet> addr_guids = writer->get_remote_reader_guids();
4635  if (addr_guids) {
4636  for (RepoIdSet::const_iterator it = addr_guids->guids_.begin(),
4637  limit = addr_guids->guids_.end(); it != limit; ++it) {
4638  accumulate_addresses(local, *it, retval);
4639 
4640  }
4641  use_peers = false;
4642  }
4643  }
4644  }
4645 
4646  if (use_peers) {
4647  const GUIDSeq_var peers = peer_ids(local);
4648  if (peers.ptr()) {
4649  for (CORBA::ULong i = 0; i < peers->length(); ++i) {
4650  accumulate_addresses(local, peers[i], retval);
4651  }
4652  }
4653  }
4654 
4655  return retval;
4656 }
4657 
4659 {
4660  if (!last_recv_addr_) {
4661  return false;
4662  }
4663  const ACE_INT16 last_addr_type = last_recv_addr_.get_type();
4664  NetworkAddress limit;
4665  limit.set_type(last_addr_type);
4666  AddrSet::const_iterator it = unicast_addrs_.lower_bound(limit);
4667  while (it != unicast_addrs_.end() && it->get_type() == last_addr_type) {
4668  if (it->addr_bytes_equal(last_recv_addr_)) {
4669  aset.insert(*it);
4670  return true;
4671  }
4672  ++it;
4673  }
4674  return false;
4675 }
4676 
4677 void
4679  AddrSet& addresses, bool prefer_unicast) const
4680 {
4681  OPENDDS_ASSERT(local != GUID_UNKNOWN);
4682  OPENDDS_ASSERT(remote != GUID_UNKNOWN);
4683 
4684  const LocatorCacheKey key(remote, local, prefer_unicast);
4685  LocatorCache::ScopedAccess entry(locator_cache_, key);
4686  if (!entry.is_new_) {
4687  addresses.insert(entry.value().addrs_.begin(), entry.value().addrs_.end());
4688  return;
4689  }
4690 
4691  RtpsUdpInst_rch cfg = config();
4692  if (!cfg) {
4693  return;
4694  }
4695 
4696  if (cfg->rtps_relay_only() && std::memcmp(&local.guidPrefix, &remote.guidPrefix, sizeof(GuidPrefix_t)) != 0) {
4697  if (NetworkAddress() != cfg->rtps_relay_address()) {
4698  addresses.insert(cfg->rtps_relay_address());
4699  entry.value().addrs_.insert(cfg->rtps_relay_address());
4700  }
4701  return;
4702  }
4703 
4704  AddrSet normal_addrs;
4705  MonotonicTimePoint normal_addrs_expires = MonotonicTimePoint::max_value;
4706  NetworkAddress ice_addr;
4707  bool valid_last_recv_addr = false;
4708  static const NetworkAddress NO_ADDR;
4709 
4710  const RemoteInfoMap::const_iterator pos = locators_.find(remote);
4711  if (pos != locators_.end()) {
4712  if (prefer_unicast && pos->second.insert_recv_addr(normal_addrs)) {
4713  normal_addrs_expires = pos->second.last_recv_time_ + cfg->receive_address_duration_;
4714  valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= cfg->receive_address_duration_;
4715  } else if (prefer_unicast && !pos->second.unicast_addrs_.empty()) {
4716  normal_addrs = pos->second.unicast_addrs_;
4717  } else if (!pos->second.multicast_addrs_.empty()) {
4718 #ifdef ACE_HAS_IPV6
4719  if (pos->second.last_recv_addr_ != NO_ADDR) {
4720  const AddrSet& mc_addrs = pos->second.multicast_addrs_;
4721  for (AddrSet::const_iterator it = mc_addrs.begin(); it != mc_addrs.end(); ++it) {
4722  if (it->get_type() == pos->second.last_recv_addr_.get_type()) {
4723  normal_addrs.insert(*it);
4724  }
4725  }
4726  } else {
4727  normal_addrs = pos->second.multicast_addrs_;
4728  }
4729 #else
4730  normal_addrs = pos->second.multicast_addrs_;
4731 #endif
4732  } else if (pos->second.insert_recv_addr(normal_addrs)) {
4733  normal_addrs_expires = pos->second.last_recv_time_ + cfg->receive_address_duration_;
4734  valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= cfg->receive_address_duration_;
4735  } else {
4736  normal_addrs = pos->second.unicast_addrs_;
4737  }
4738  } else {
4739  const GuidConverter conv(remote);
4740  if (conv.isReader()) {
4742  InterestingRemoteMapType::const_iterator ipos = interesting_readers_.find(remote);
4743  if (ipos != interesting_readers_.end()) {
4744  normal_addrs = ipos->second.addresses;
4745  }
4746  } else if (conv.isWriter()) {
4748  InterestingRemoteMapType::const_iterator ipos = interesting_writers_.find(remote);
4749  if (ipos != interesting_writers_.end()) {
4750  normal_addrs = ipos->second.addresses;
4751  }
4752  }
4753  }
4754 
4755 #ifdef OPENDDS_SECURITY
4757  if (endpoint) {
4758  ice_addr = ice_agent_->get_address(endpoint, local, remote);
4759  }
4760 #endif
4761 
4762  if (ice_addr == NO_ADDR) {
4763  addresses.insert(normal_addrs.begin(), normal_addrs.end());
4764  entry.value().addrs_.insert(normal_addrs.begin(), normal_addrs.end());
4765  entry.value().expires_ = normal_addrs_expires;
4766  const NetworkAddress relay_addr = cfg->rtps_relay_address();
4767  if (!valid_last_recv_addr && relay_addr != NO_ADDR) {
4768  addresses.insert(relay_addr);
4769  entry.value().addrs_.insert(relay_addr);
4770  }
4771  return;
4772  }
4773 
4774  if (normal_addrs.count(ice_addr) == 0) {
4775  addresses.insert(ice_addr);
4776  entry.value().addrs_.insert(ice_addr);
4777  return;
4778  }
4779 
4780  addresses.insert(normal_addrs.begin(), normal_addrs.end());
4781  entry.value().addrs_.insert(normal_addrs.begin(), normal_addrs.end());
4782  entry.value().expires_ = normal_addrs_expires;
4783 }
4784 
4785 #ifdef OPENDDS_SECURITY
4788 {
4789  return ice_agent_;
4790 }
4791 #endif
4792 
4795 {
4796  TransportImpl_rch ti = impl();
4797  return ti ? ti->get_ice_endpoint() : WeakRcHandle<ICE::Endpoint>();
4798 }
4799 
4800 bool RtpsUdpDataLink::is_leading(const GUID_t& writer_id,
4801  const GUID_t& reader_id) const
4802 {
4803  RtpsWriterMap::mapped_type writer;
4804 
4805  {
4807  RtpsWriterMap::const_iterator pos = writers_.find(writer_id);
4808  if (pos == writers_.end()) {
4809  return false;
4810  }
4811  writer = pos->second;
4812  }
4813 
4814  return writer->is_leading(reader_id);
4815 }
4816 
4818 {
4820  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_remote_counts} "
4821  "RtpsUdpDataLink::RtpsWriter::%C: "
4822  "%C pre: %b assoc: %b\n",
4823  funcname, LogGuid(id_).c_str(),
4824  preassociation_readers_.size(), remote_readers_.size()));
4825  }
4826 }
4827 
4829 {
4831  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_remote_counts} "
4832  "RtpsUdpDataLink::RtpsReader::%C: "
4833  "%C pre: %b assoc: %b\n",
4834  funcname, LogGuid(id_).c_str(),
4835  preassociation_writers_.size(), remote_writers_.size()));
4836  }
4837 }
4838 
4839 } // namespace DCPS
4840 } // namespace OpenDDS
4841 
bool add_writer(const WriterInfo_rch &info)
SequenceNumberSet readerSNState
Definition: RtpsCore.idl:567
void remove_all_msgs(const GUID_t &pub_id)
DataSampleHeader header_
The demarshalled sample header.
sequence< Submessage > SubmessageSeq
Definition: RtpsCore.idl:885
void swap(MessageBlock &lhs, MessageBlock &rhs)
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
static void populate_data_control_submessages(RTPS::SubmessageSeq &subm, const TransportSendControlElement &tsce, bool requires_inline_qos)
ACE_CDR::Long Long
void ready_to_send()
Indicate that the queue is ready to send after all pending transactions are complete.
void send_heartbeats(const MonotonicTimePoint &now)
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void send_heartbeats(const MonotonicTimePoint &now)
virtual SequenceNumber sequence() const
void begin_transaction()
Signal that a thread is beginning to send a sequence of submessages.
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
static const size_t MaxSecureSubmessageLeadingSize
EventDispatcher_rch event_dispatcher()
bool requires_inline_qos(const GUIDSeq_var &peers)
int make_reservation(const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
static const size_t MaxSecureFullMessageFollowingSize
virtual bool matches(const TransportQueueElement &candidate) const
const DataSampleHeader & get_header() const
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
ACE_Message_Block * submsgs_to_msgblock(const RTPS::SubmessageSeq &subm)
#define ENOTSUP
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
const octet FLAG_E
Definition: RtpsCore.idl:521
bool process_data_i(const RTPS::DataSubmessage &data, const GUID_t &src, MetaSubmessageVec &meta_submessages)
char message_id_
The enum MessageId.
AckNackSubmessage acknack_sm
Definition: RtpsCore.idl:839
const ACE_CDR::UShort RTPSHDR_SZ
Definition: MessageTypes.h:105
void unregister_for_reader(const GUID_t &writerid, const GUID_t &readerid)
void register_for_writer(const GUID_t &readerid, const GUID_t &writerid, const AddrSet &addresses, DiscoveryListener *listener)
SequenceNumber previous() const
virtual bool is_last_fragment() const
Is this QueueElement the last result of fragmentation?
LM_INFO
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
RtpsUdpDataLink(const RtpsUdpTransport_rch &transport, const GuidPrefix_t &local_prefix, const RtpsUdpInst_rch &config, const ReactorTask_rch &reactor_task, InternalTransportStatistics &transport_statistics, ACE_Thread_Mutex &transport_statistics_mutex)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
bool contains(SequenceNumber seq) const
const TransportSendElement * original_send_element() const
void bundle_and_send_submessages(MetaSubmessageVec &meta_submessages)
virtual bool is_leading(const GUID_t &writer_id, const GUID_t &reader_id) const
RemoveResult remove_sample(const DataSampleElement *sample)
bool add_reader(const ReaderInfo_rch &reader)
SequenceNumber cumulative_ack() const
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool data_dropped(bool dropped_by_transport=false)
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > bundle_allocator_
RtpsUdpTransport_rch transport()
bool isReader() const
Returns true if the GUID represents a reader entity.
sequence< octet > key
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
static bool include_fragment(const TransportQueueElement &element, const DisjointSequence &fragments, SequenceNumber &lastFragment)
RcHandle< ReaderInfo > ReaderInfo_rch
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
bool log_dropped_messages
Log received RTPS messages that were dropped.
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
void process_heartbeat_frag_i(const RTPS::HeartBeatFragSubmessage &hb_frag, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
static void populate_data_sample_submessages(RTPS::SubmessageSeq &subm, const DataSampleElement &dsle, bool requires_inline_qos)
DataBlockAllocator db_allocator_
TransportSendStrategy::LockType LockType
bool is_more_local(const NetworkAddress &current, const NetworkAddress &incoming)
GuidSet RepoIdSet
Definition: GuidUtils.h:113
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
void gather_nack_replies_i(MetaSubmessageVec &meta_submessages)
const char * c_str() const
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
void accumulate_addresses(const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const
void opts(int opts)
RtpsUdpReceiveStrategy_rch receive_strategy()
RcHandle< SporadicEvent > harvest_send_queue_sporadic_
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
void send_nack_responses(const MonotonicTimePoint &now)
void end_historic_samples_i(const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
const ACE_CDR::UShort INFO_DST_SZ
Definition: MessageTypes.h:108
const DataSampleElement * sample() const
Original sample from send listener.
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
const SampleStateMask ANY_SAMPLE_STATE
OpenDDS_Dcps_Export GUID_t make_unknown_guid(const GuidPrefix_t &prefix)
Definition: GuidUtils.h:228
virtual GUID_t publication_id() const
Accessor for the publisher id.
DataBlockLock * get_lock()
static bool control_message_supported(char message_id)
void update_required_acknack_count(const GUID_t &local_id, const GUID_t &remote_id, CORBA::Long current)
void process_nackfrag(const RTPS::NackFragSubmessage &nackfrag, const GUID_t &src, MetaSubmessageVec &meta_submessages)
#define OPENDDS_MULTIMAP(K, T)
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
bool contains_any(const SequenceRange &range) const
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint() const
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
ACE_Thread_Mutex & transport_statistics_mutex_
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void queue_submessages(MetaSubmessageVec &meta_submessages)
int release(void)
int set_option(int level, int option, void *optval, int optlen) const
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
void disable_response_queue(bool send_immediately)
void swap_durable_data(OPENDDS_MAP(SequenceNumber, TransportQueueElement *)&dd)
static const suseconds_t DEFAULT_NAK_RESPONSE_DELAY_USEC
Definition: RtpsUdpInst.h:34
char * rd_ptr(void) const
const size_t max_bundle_size_
RtpsReaderMultiMap readers_of_writer_
void process_acknack(const RTPS::AckNackSubmessage &acknack, const GUID_t &src, MetaSubmessageVec &meta_submessages)
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
void gather_heartbeats_i(MetaSubmessageVec &meta_submessages)
bool is_lagging(const ReaderInfo_rch &reader) const
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
void register_for_reader(const GUID_t &writerid, const GUID_t &readerid, const AddrSet &addresses, DiscoveryListener *listener)
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
#define SOL_SOCKET
RcHandle< SingleSendBuffer > send_buff_
void generate_nack_frags_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &wi, EntityId_t reader_id, EntityId_t writer_id, ACE_CDR::ULong &cumulative_bits_added)
const octet FLAG_F
Definition: RtpsCore.idl:523
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
RtpsUdpInst_rch config() const
void process_heartbeat_i(const RTPS::HeartBeatSubmessage &heartbeat, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
typedef OPENDDS_MAP_CMP(GUID_t, RemoteInfo, GUID_tKeyLessThan) RemoteInfoMap
bool bitmapNonEmpty(const SequenceNumberSet &snSet)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
SequenceNumber max_data_seq(const SingleSendBuffer::Proxy &proxy, const ReaderInfo_rch &) const
void send_heartbeats_manual_i(MetaSubmessageVec &meta_submessages)
const GuidPrefix_t & local_prefix() const
void set_type(ACE_INT16 type)
InternalTransportStatistics & transport_statistics_
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
void disassociated(const GUID_t &local, const GUID_t &remote)
DCPS::EntityId_t writerId
Definition: RtpsCore.idl:576
void remove_id(const GUID_t &val)
Definition: AddressCache.h:221
RtpsReader(const RtpsUdpDataLink_rch &link, const GUID_t &id)
#define SO_SNDBUF
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
void add_elem_awaiting_ack(TransportQueueElement *element)
const OpenDDSParticipantFlagsBits_t PFLAGS_DIRECTED_HEARTBEAT
Definition: RtpsCore.idl:324
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
RemoveResult remove_sample(const DataSampleElement *sample)
LM_DEBUG
virtual bool is_fragment() const
Is this QueueElement the result of fragmentation?
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
virtual TransportQueueElement * customize_queue_element(TransportQueueElement *element)
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
bool is_leading(const ReaderInfo_rch &reader) const
bool pre_contains(SequenceNumber sequence) const
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > custom_allocator_
ACE_CDR::ULong ULong
NetworkAddress get_last_recv_address(const GUID_t &remote_id)
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
void client_stop(const GUID_t &localId)
#define VDBG(DBG_ARGS)
void send_preassociation_acknacks(const MonotonicTimePoint &now)
static const void * body(MD5_CTX *ctx, const void *data, unsigned long size)
Definition: Hash.cpp:115
InterestingRemoteMapType interesting_readers_
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
SequenceNumber_t writerSN
Definition: RtpsCore.idl:675
Holds a data sample received by the transport.
void update_required_acknack_count(const GUID_t &id, CORBA::Long current)
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const octet FLAG_L
Definition: RtpsCore.idl:527
const InstanceStateMask ANY_INSTANCE_STATE
void durability_resend(TransportQueueElement *element, size_t &cumulative_send_count)
void ignore(const GUID_t &local, const GUID_t &remote)
Mark all queued submessage with the given source and destination as ignored.
TransactionalRtpsSendQueue sq_
void pre_stop_helper(TqeVector &to_drop, bool true_stop)
RcHandle< SporadicEvent > preassociation_task_
void process_gap_i(const RTPS::GapSubmessage &gap, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
const ViewStateMask ANY_VIEW_STATE
ACE_Message_Block * cont(void) const
void gather_gaps_i(const ReaderInfo_rch &reader, const DisjointSequence &gaps, MetaSubmessageVec &meta_submessages)
FragmentNumberSet fragmentNumberState
Definition: RtpsCore.idl:598
const char * c_str() const
Definition: LogAddr.h:32
size_t size_
virtual ACE_Message_Block * duplicate(void) const
const size_t ONE_SAMPLE_PER_PACKET
ACE_CDR::UShort UShort
SequenceNumber last_ack() const
static const TimePoint_T< MonotonicClock > max_value
Definition: TimePoint_T.h:41
static void snris_insert(RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
Data structure representing an "interesting" remote entity for static discovery.
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
void swap(RcHandle &rhs)
Definition: RcHandle_T.h:102
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
void datawriter_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
DCPS::EntityId_t writerId
Definition: RtpsCore.idl:674
bool contains(SequenceNumber value) const
static SequenceNumber ZERO()
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
size_t dedup(MetaSubmessageVec &vec)
LM_WARNING
RcHandle< JobQueue > job_queue_
bool add_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
Definition: DataLink.cpp:111
void build_meta_submessage_map(MetaSubmessageVec &meta_submessages, AddrDestMetaSubmessageMap &addr_map)
ACE_UINT32 ULong
void ignore_remote(const GUID_t &id)
Mark all queued submessage with the given destination (dst_guid_) as ignored.
The End User API.
GUIDSeq * peer_ids(const GUID_t &local_id) const
Definition: DataLink.cpp:490
OpenDDS_Dcps_Export void align(size_t &value, size_t by)
Align "value" by "by" if it&#39;s not already.
Definition: Serializer.inl:23
AddrSet get_addresses(const GUID_t &local, const GUID_t &remote) const
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER
Definition: MessageTypes.h:83
bool associated(const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
static ACE_CDR::ULong bitmap_num_longs(const SequenceNumber &low, const SequenceNumber &high)
DiscoveryListener * listener
Callback to invoke.
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
Definition: MessageTypes.h:85
Security::SecurityConfig_rch security_config_
RemoveResult remove_sample(const DataSampleElement *sample)
DCPS::EntityId_t readerId
Definition: RtpsCore.idl:673
AddrSet get_addresses_i(const GUID_t &local, const GUID_t &remote) const
void update_locators(const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
SequenceNumber_t gapStart
Definition: RtpsCore.idl:577
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
void append_submessage(RTPS::Message &message, const RTPS::InfoDestinationSubmessage &submessage)
Definition: MessageUtils.h:147
ACE_TEXT("TCP_Factory")
bool log_remote_counts
Log number of associations and pending associations of RTPS entities.
static const TimeDuration zero_value
Definition: TimeDuration.h:31
void unregister_for_writer(const GUID_t &readerid, const GUID_t &writerid)
void make_leader_lagger(const GUID_t &reader, SequenceNumber previous_max_sn)
Security::HandleRegistry_rch handle_registry_
void check_heartbeats(const MonotonicTimePoint &now)
std::pair< SequenceNumber, SequenceNumber > SequenceRange
bool open(const ACE_SOCK_Dgram &unicast_socket)
ACE_SOCK_Dgram_Mcast multicast_socket_
MessageBlockAllocator mb_allocator_
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69
virtual bool is_last_fragment() const
Is this QueueElement the last result of fragmentation?
void gather_ack_nacks_i(const WriterInfo_rch &writer, const RtpsUdpDataLink_rch &link, bool heartbeat_was_non_final, MetaSubmessageVec &meta_submessages, ACE_CDR::ULong &cumulative_bits_added)
RcHandle< ICE::Agent > ice_agent_
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER
Definition: MessageTypes.h:86
HeartBeatSubmessage heartbeat_sm
Definition: RtpsCore.idl:842
virtual void reader_does_not_exist(const GUID_t &readerid, const GUID_t &writerid)=0
static bool force_inline_qos_
static member used by testing code to force inline qos
std::pair< GUID_t, InterestingRemote > CallbackType
virtual void release_reservations_i(const GUID_t &remote_id, const GUID_t &local_id)
bool align_w(size_t alignment)
Definition: Serializer.inl:830
ACE_Message_Block * alloc_msgblock(size_t size, ACE_Allocator *data_allocator)
GUID_t localid
id of local entity that is interested in this remote.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
RcHandle< SporadicEvent > flush_send_queue_sporadic_
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
Definition: MessageTypes.h:50
void add_gap_submsg_i(RTPS::SubmessageSeq &msg, SequenceNumber gap_start)
static void extend_bitmap_range(RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent, ACE_CDR::ULong &cumulative_bits_added)
Sequence number abstraction. Only allows positive 64 bit values.
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
void send_heartbeats_manual_i(const TransportSendControlElement *tsce, MetaSubmessageVec &meta_submessages)
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER
Definition: MessageTypes.h:84
virtual GUID_t subscription_id() const
Accessor for the subscription id, if sent the sample is sent to 1 sub.
static const ACE_Time_Value zero
bool to_bitmap(ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added, bool invert=false) const
WeakRcHandle< RtpsUdpDataLink > link_
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
DCPS::RcHandle< ICE::Agent > get_ice_agent() const
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
static const Value MAX_VALUE
void network_change() const
Definition: DataLink.cpp:1212
Adapt the TransportReceiveStrategy for RTPS&#39;s "sample" (submessage) Header.
ACE_CDR::Octet Octet
bool log_messages
Log all RTPS messages sent or recieved.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
SequenceNumber update_max_sn(const GUID_t &reader, SequenceNumber seq)
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
InterestingRemoteMapType interesting_writers_
void gather_heartbeats(RcHandle< ConstSharedRepoIdSet > additional_guids, MetaSubmessageVec &meta_submessages)
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
void make_lagger_leader(const ReaderInfo_rch &reader, const SequenceNumber previous_acked_sn)
void update_remote_guids_cache_i(bool add, const GUID_t &guid)
virtual void pre_stop_i()
Definition: DataLink.cpp:993
RtpsWriter(const TransportClient_rch &client, const RtpsUdpDataLink_rch &link, const GUID_t &id, bool durable, SequenceNumber max_sn, CORBA::Long heartbeat_count, size_t capacity)
typedef OPENDDS_SET(ReaderInfo_rch) ReaderInfoSet
virtual GUID_t publication_id() const =0
Accessor for the publication id that sent the sample.
const octet OPENDDS_FLAG_R
Definition: RtpsCore.idl:529
void harvest_send_queue(const MonotonicTimePoint &now)
static const size_t MaxSecureFullMessageLeadingSize
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
ACE_UINT64 id_
The id for this DataLink.
Definition: DataLink.h:420
void flush_send_queue(const MonotonicTimePoint &now)
RcHandle< SingleSendBuffer > get_writer_send_buffer(const GUID_t &pub_id)
NackFragSubmessage nack_frag_sm
Definition: RtpsCore.idl:863
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
const EntityId_t ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER
Definition: MessageTypes.h:87
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.
#define SO_RCVBUF
void filterBestEffortReaders(const ReceivedDataSample &ds, RepoIdSet &selected, RepoIdSet &withheld)
const long LENGTH_UNLIMITED
RtpsUdpSendStrategy_rch send_strategy()
virtual bool dispatch(EventBase_rch event)=0
static bool separate_message(EntityId_t entity)
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
bool should_nack_fragments(const RcHandle< RtpsUdpDataLink > &link, const WriterInfo_rch &info)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define TheServiceParticipant
bool isWriter() const
Returns true if the GUID represents a writer entity.
RcHandle< PeriodicEvent > heartbeatchecker_
void remove_locator_and_bundling_cache(const GUID_t &remote_id)
RtpsUdpDataLink::BundleVec & bundles_
static const size_t initial_size
void request_ack_i(const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
LM_ERROR
bool add_delayed_notification(TransportQueueElement *element)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
void on_data_available(RcHandle< InternalDataReader< NetworkInterfaceAddress > > reader)
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
void received(const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
int close(void)
SubmessageSeq submessages
Definition: RtpsCore.idl:903
static const size_t MaxSecureSubmessageFollowingSize
unique_ptr< DataBlockLockPool > db_lock_pool_
void ignore_local(const GUID_t &id)
Mark all queued submessage with the given source (src_guid_) as ignored.
RcHandle< PeriodicEvent > heartbeat_
bool log_progress
Log progress for RTPS entity discovery and association.
Base wrapper class around a data/control sample to be sent.
void record_directed(const GUID_t &reader, SequenceNumber seq)
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
TransportQueueElement * customize_queue_element_non_reliable_i(TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send, ACE_Guard< ACE_Thread_Mutex > &guard)
void bundle_mapped_meta_submessages(const Encoding &encoding, AddrDestMetaSubmessageMap &addr_map, BundleVec &bundles, CountKeeper &counts)
TransportQueueElement * customize_queue_element_helper(TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
virtual void writer_does_not_exist(const GUID_t &writerid, const GUID_t &readerid)=0
SequenceNumberSet gapList
Definition: RtpsCore.idl:578
const size_t SM_ALIGN
Alignment of RTPS Submessage.
Definition: MessageTypes.h:113
const OpenDDSParticipantFlagsBits_t PFLAGS_REFLECT_HEARTBEAT_COUNT
Definition: RtpsCore.idl:326
SubmessageHeader smHeader
Definition: RtpsCore.idl:574
void align(size_t &value, size_t by=(std::numeric_limits< size_t >::max)()) const
Align "value" to "by" and according to the stream&#39;s alignment.
Definition: Serializer.inl:118
void gather_preassociation_acknack_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &writer)
void insert(SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
bool process(InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
Returns true if at least one group was joined.
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)
const Encoding & encoding_
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194