OpenDDS  Snapshot(2023/04/28-20:55)
RtpsUdpDataLink.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_TRANSPORT_RTPS_UDP_RTPSUDPDATALINK_H
7 #define OPENDDS_DCPS_TRANSPORT_RTPS_UDP_RTPSUDPDATALINK_H
8 
9 #include "Rtps_Udp_Export.h"
10 #include "BundlingCacheKey.h"
11 #include "LocatorCacheKey.h"
12 #include "RtpsCustomizedElement.h"
13 #include "RtpsUdpDataLink_rch.h"
16 #include "RtpsUdpTransport_rch.h"
18 
20 #include <dds/DCPS/ReactorTask.h>
27 #include <dds/DCPS/GuidConverter.h>
29 #include <dds/DCPS/PoolAllocator.h>
33 #include <dds/DCPS/JobQueue.h>
35 #include <dds/DCPS/AddressCache.h>
36 #include <dds/DCPS/Hash.h>
39 #include <dds/DCPS/SporadicEvent.h>
40 #include <dds/DCPS/PeriodicEvent.h>
41 
42 #ifdef OPENDDS_SECURITY
45 # include <dds/DCPS/RTPS/ICE/Ice.h>
46 #endif
47 
48 #ifdef OPENDDS_SECURITY
49 # include <dds/DdsSecurityCoreC.h>
50 #endif
51 
52 #include <ace/Basic_Types.h>
53 #include <ace/SOCK_Dgram.h>
54 #include <ace/SOCK_Dgram_Mcast.h>
55 
56 #ifdef ACE_HAS_CPP11
57 # include <functional>
58 #endif
59 
60 class DDS_TEST;
61 
63 
64 namespace OpenDDS {
65 
66 namespace ICE {
67  class Endpoint;
68 }
69 
70 namespace DCPS {
71 
72 class RtpsUdpInst;
76 
77 struct SeqReaders {
80  SeqReaders(const GUID_t& id) : seq(0) { readers.insert(id); }
81 };
82 
85 
86 typedef OPENDDS_MAP_CMP(GUID_t, SeqReaders, GUID_tKeyLessThan) WriterToSeqReadersMap;
87 
88 const size_t initial_bundle_size = 32;
89 
91  : public virtual DataLink
92  , public virtual InternalDataReaderListener<NetworkInterfaceAddress>
93 {
94 public:
95  RtpsUdpDataLink(const RtpsUdpTransport_rch& transport,
96  const GuidPrefix_t& local_prefix,
97  const RtpsUdpInst_rch& config,
98  const ReactorTask_rch& reactor_task,
99  InternalTransportStatistics& transport_statistics,
100  ACE_Thread_Mutex& transport_statistics_mutex);
101 
102  ~RtpsUdpDataLink();
103 
104  bool add_delayed_notification(TransportQueueElement* element);
105 
106  RemoveResult remove_sample(const DataSampleElement* sample);
107  void remove_all_msgs(const GUID_t& pub_id);
108 
109  RtpsUdpInst_rch config() const;
110 
111  ACE_Reactor* get_reactor();
112  ReactorInterceptor_rch get_reactor_interceptor() const;
113  bool reactor_is_shut_down();
114 
115  ACE_SOCK_Dgram& unicast_socket();
116  ACE_SOCK_Dgram_Mcast& multicast_socket();
117 #ifdef ACE_HAS_IPV6
118  ACE_SOCK_Dgram& ipv6_unicast_socket();
119  ACE_SOCK_Dgram_Mcast& ipv6_multicast_socket();
120 #endif
121 
122  bool open(const ACE_SOCK_Dgram& unicast_socket
123 #ifdef ACE_HAS_IPV6
124  , const ACE_SOCK_Dgram& ipv6_unicast_socket
125 #endif
126  );
127 
128  void received(const RTPS::DataSubmessage& data,
129  const GuidPrefix_t& src_prefix,
130  const NetworkAddress& remote_addr);
131 
132  void received(const RTPS::GapSubmessage& gap,
133  const GuidPrefix_t& src_prefix,
134  bool directed,
135  const NetworkAddress& remote_addr);
136 
137  void received(const RTPS::HeartBeatSubmessage& heartbeat,
138  const GuidPrefix_t& src_prefix,
139  bool directed,
140  const NetworkAddress& remote_addr);
141 
142  void received(const RTPS::HeartBeatFragSubmessage& hb_frag,
143  const GuidPrefix_t& src_prefix,
144  bool directed,
145  const NetworkAddress& remote_addr);
146 
147  void received(const RTPS::AckNackSubmessage& acknack,
148  const GuidPrefix_t& src_prefix,
149  const NetworkAddress& remote_addr);
150 
151  void received(const RTPS::NackFragSubmessage& nackfrag,
152  const GuidPrefix_t& src_prefix,
153  const NetworkAddress& remote_addr);
154 
155  const GuidPrefix_t& local_prefix() const { return local_prefix_; }
156 
157  void remove_locator_and_bundling_cache(const GUID_t& remote_id);
158 
159  NetworkAddress get_last_recv_address(const GUID_t& remote_id);
160 
161  void update_locators(const GUID_t& remote_id,
162  AddrSet& unicast_addresses,
163  AddrSet& multicast_addresses,
164  bool requires_inline_qos,
165  bool add_ref);
166 
167  /// Given a 'local' id and a 'remote' id of a publication or
168  /// subscription, return the set of addresses of the remote peers.
169  AddrSet get_addresses(const GUID_t& local, const GUID_t& remote) const;
170  /// Given a 'local' id, return the set of address for all remote peers.
171  AddrSet get_addresses(const GUID_t& local) const;
172 
173  void filterBestEffortReaders(const ReceivedDataSample& ds, RepoIdSet& selected, RepoIdSet& withheld);
174 
175  int make_reservation(const GUID_t& remote_publication_id,
176  const GUID_t& local_subscription_id,
177  const TransportReceiveListener_wrch& receive_listener,
178  bool reliable);
179 
180  bool associated(const GUID_t& local, const GUID_t& remote,
181  bool local_reliable, bool remote_reliable,
182  bool local_durable, bool remote_durable,
183  const MonotonicTime_t& participant_discovered_at,
184  ACE_CDR::ULong participant_flags,
185  SequenceNumber max_sn,
186  const TransportClient_rch& client,
187  AddrSet& unicast_addresses,
188  AddrSet& multicast_addresses,
189  const NetworkAddress& last_addr_hint,
190  bool requires_inline_qos);
191 
192  void disassociated(const GUID_t& local, const GUID_t& remote);
193 
194  void register_for_reader(const GUID_t& writerid,
195  const GUID_t& readerid,
196  const AddrSet& addresses,
197  DiscoveryListener* listener);
198 
199  void unregister_for_reader(const GUID_t& writerid,
200  const GUID_t& readerid);
201 
202  void register_for_writer(const GUID_t& readerid,
203  const GUID_t& writerid,
204  const AddrSet& addresses,
205  DiscoveryListener* listener);
206 
207  void unregister_for_writer(const GUID_t& readerid,
208  const GUID_t& writerid);
209 
210  void client_stop(const GUID_t& localId);
211 
212  virtual void pre_stop_i();
213 
214 #ifdef OPENDDS_SECURITY
215  DCPS::RcHandle<ICE::Agent> get_ice_agent() const;
216 #endif
217  virtual DCPS::WeakRcHandle<ICE::Endpoint> get_ice_endpoint() const;
218 
219  virtual bool is_leading(const GUID_t& writer_id,
220  const GUID_t& reader_id) const;
221 
222 #ifdef OPENDDS_SECURITY
224  {
225  ACE_Guard<ACE_Thread_Mutex> guard(security_mutex_);
226  return security_config_;
227  }
228 
230  {
231  ACE_Guard<ACE_Thread_Mutex> guard(security_mutex_);
232  return handle_registry_;
233  }
234 
235  DDS::Security::ParticipantCryptoHandle local_crypto_handle() const;
236  void local_crypto_handle(DDS::Security::ParticipantCryptoHandle pch);
237 
238  static bool separate_message(EntityId_t entity);
239 #endif
240 
241  RtpsUdpTransport_rch transport();
242 
243  void enable_response_queue();
244  void disable_response_queue(bool send_immediately);
245 
246  bool requires_inline_qos(const GUIDSeq_var& peers);
247 
248  EventDispatcher_rch event_dispatcher() { return event_dispatcher_; }
249  RcHandle<JobQueue> get_job_queue() const { return job_queue_; }
250 
251 private:
252  void on_data_available(RcHandle<InternalDataReader<NetworkInterfaceAddress> > reader);
253 
254  // Internal non-locking versions of the above
255  AddrSet get_addresses_i(const GUID_t& local, const GUID_t& remote) const;
256  AddrSet get_addresses_i(const GUID_t& local) const;
257 
258  virtual void stop_i();
259 
260  virtual TransportQueueElement* customize_queue_element(
261  TransportQueueElement* element);
262 
263  virtual void release_reservations_i(const GUID_t& remote_id,
264  const GUID_t& local_id);
265 
266  friend class ::DDS_TEST;
267  /// static member used by testing code to force inline qos
268  static bool force_inline_qos_;
269 
273 
274  RtpsUdpSendStrategy_rch send_strategy();
275  RtpsUdpReceiveStrategy_rch receive_strategy();
276 
278 
279  struct RemoteInfo {
280  RemoteInfo() : unicast_addrs_(), multicast_addrs_(), requires_inline_qos_(false), ref_count_(0) {}
281  RemoteInfo(const AddrSet& unicast_addrs, const AddrSet& multicast_addrs, bool iqos)
282  : unicast_addrs_(unicast_addrs), multicast_addrs_(multicast_addrs), requires_inline_qos_(iqos), ref_count_(0) {}
283  AddrSet unicast_addrs_;
288  size_t ref_count_;
289  bool insert_recv_addr(AddrSet& aset) const;
290  };
291 
292 #ifdef ACE_HAS_CPP11
293  typedef OPENDDS_UNORDERED_MAP(GUID_t, RemoteInfo) RemoteInfoMap;
294 #else
295  typedef OPENDDS_MAP_CMP(GUID_t, RemoteInfo, GUID_tKeyLessThan) RemoteInfoMap;
296 #endif
297  RemoteInfoMap locators_;
298 
299  void update_last_recv_addr(const GUID_t& src, const NetworkAddress& addr, const MonotonicTimePoint& now = MonotonicTimePoint::now());
300 
301  mutable LocatorCache locator_cache_;
302  mutable BundlingCache bundling_cache_;
303 
306 #ifdef ACE_HAS_IPV6
307  ACE_SOCK_Dgram ipv6_unicast_socket_;
308  ACE_SOCK_Dgram_Mcast ipv6_multicast_socket_;
309 #endif
310 
316 
317  ACE_Message_Block* alloc_msgblock(size_t size, ACE_Allocator* data_allocator);
318  ACE_Message_Block* submsgs_to_msgblock(const RTPS::SubmessageSeq& subm);
319 
320  RcHandle<SingleSendBuffer> get_writer_send_buffer(const GUID_t& pub_id);
321 
323 
324  MultiSendBuffer(RtpsUdpDataLink* outer, size_t capacity)
325  : TransportSendBuffer(capacity)
326  , outer_(outer)
327  {}
328 
329  void insert(SequenceNumber sequence,
331  ACE_Message_Block* chain);
332 
334 
335  } multi_buff_;
336 
337  // RTPS reliability support for local writers:
338 
340  typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap;
341  typedef OPENDDS_MAP(SequenceNumber, RequestedFragMap) RequestedFragSeqMap;
342 
343  struct ReaderInfo : public virtual RcObject {
344  const GUID_t id_;
346  CORBA::Long acknack_recvd_count_, nackfrag_recvd_count_;
348  RequestedFragSeqMap requested_frags_;
350  const bool durable_;
356 #ifdef OPENDDS_SECURITY
359 #endif
360 
361  ReaderInfo(const GUID_t& id,
362  bool durable,
363  const MonotonicTime_t& participant_discovered_at,
364  ACE_CDR::ULong participant_flags,
365  const SequenceNumber& start_sn)
366  : id_(id)
367  , participant_discovered_at_(participant_discovered_at)
368  , acknack_recvd_count_(0)
369  , nackfrag_recvd_count_(0)
370  , cur_cumulative_ack_(SequenceNumber::ZERO()) // Starting at zero instead of unknown makes the logic cleaner.
371  , durable_(durable)
372  , participant_flags_(participant_flags)
373  , required_acknack_count_(0)
374  , start_sn_(start_sn)
375 #ifdef OPENDDS_SECURITY
376  , max_pvs_sn_(SequenceNumber::ZERO())
377 #endif
378  {}
379  ~ReaderInfo();
380  void swap_durable_data(OPENDDS_MAP(SequenceNumber, TransportQueueElement*)& dd);
381  void expunge_durable_data();
382  bool expecting_durable_data() const;
383  SequenceNumber acked_sn() const { return cur_cumulative_ack_.previous(); }
384  bool reflects_heartbeat_count() const;
385  };
386 
388 #ifdef ACE_HAS_CPP11
389  typedef OPENDDS_UNORDERED_MAP(GUID_t, ReaderInfo_rch) ReaderInfoMap;
390 #else
391  typedef OPENDDS_MAP_CMP(GUID_t, ReaderInfo_rch, GUID_tKeyLessThan) ReaderInfoMap;
392 #endif
393  typedef OPENDDS_SET(ReaderInfo_rch) ReaderInfoSet;
395  ReaderInfoSet readers;
396  };
398  typedef OPENDDS_MAP(SequenceNumber, ReaderInfoSetHolder_rch) SNRIS;
399 
400  class ReplayDurableData : public EventBase {
401  public:
402  ReplayDurableData(WeakRcHandle<RtpsUdpDataLink> link, const GUID_t& local_pub_id, const GUID_t& remote_sub_id)
403  : link_(link)
404  , local_pub_id_(local_pub_id)
405  , remote_sub_id_(remote_sub_id)
406  {}
407 
408  private:
412 
413  void handle_event() {
414  RtpsUdpDataLink_rch link = link_.lock();
415 
416  if (!link) {
417  return;
418  }
419 
420  link->replay_durable_data(local_pub_id_, remote_sub_id_);
421  }
422  };
423 
424  class RtpsWriter : public virtual RcObject {
425  private:
426  ReaderInfoMap remote_readers_;
428  /// Preassociation readers require a non-final heartbeat.
429  ReaderInfoSet preassociation_readers_;
430  typedef OPENDDS_MULTISET(OpenDDS::DCPS::SequenceNumber) SequenceNumberMultiset;
431  SequenceNumberMultiset preassociation_reader_start_sns_;
432  /// These readers have not acked everything they are supposed to have
433  /// acked.
435  /// These reader have acked everything they are supposed to have acked.
437  /// These readers have sent a nack and are expecting data.
438  ReaderInfoSet readers_expecting_data_;
439  /// These readers have sent a non-final ack are are expecting a heartbeat.
445  SnToTqeMap elems_not_acked_;
448  const GUID_t id_;
449  const bool durable_;
450  bool stopping_;
452 #ifdef OPENDDS_SECURITY
453  /// Participant Volatile Secure writer
454  const bool is_pvs_writer_;
455  /// Partcicipant Secure (Reliable SPDP) writer
456  const bool is_ps_writer_;
457 #endif
461 
464 
467 
468  void send_heartbeats(const MonotonicTimePoint& now);
469  void send_nack_responses(const MonotonicTimePoint& now);
470  void add_gap_submsg_i(RTPS::SubmessageSeq& msg,
471  SequenceNumber gap_start);
472  void end_historic_samples_i(const DataSampleHeader& header,
474  MetaSubmessageVec& meta_submessages);
475  void request_ack_i(const DataSampleHeader& header,
477  MetaSubmessageVec& meta_submessages);
478  void send_heartbeats_manual_i(MetaSubmessageVec& meta_submessages);
479  void gather_gaps_i(const ReaderInfo_rch& reader,
480  const DisjointSequence& gaps,
481  MetaSubmessageVec& meta_submessages);
482  void acked_by_all_helper_i(TqeSet& to_deliver);
483  SequenceNumber expected_max_sn(const ReaderInfo_rch& reader) const;
484  static void snris_insert(RtpsUdpDataLink::SNRIS& snris, const ReaderInfo_rch& reader);
485  static void snris_erase(RtpsUdpDataLink::SNRIS& snris, const SequenceNumber sn, const ReaderInfo_rch& reader);
486  void make_leader_lagger(const GUID_t& reader, SequenceNumber previous_max_sn);
487  void make_lagger_leader(const ReaderInfo_rch& reader, const SequenceNumber previous_acked_sn);
488  bool is_lagging(const ReaderInfo_rch& reader) const;
489  bool is_leading(const ReaderInfo_rch& reader) const;
490  void check_leader_lagger() const;
491  void record_directed(const GUID_t& reader, SequenceNumber seq);
492  void update_remote_guids_cache_i(bool add, const GUID_t& guid);
493 
494 #ifdef OPENDDS_SECURITY
495  bool is_pvs_writer() const { return is_pvs_writer_; }
496 #else
497  bool is_pvs_writer() const { return false; }
498 #endif
499 
501  {
502  if (!proxy.empty()) {
503  return proxy.low();
504  }
505  if (!proxy.pre_empty()) {
506  return proxy.pre_low();
507  }
508  return max_sn_ + 1;
509  }
510 
511  void remove_preassociation_reader(const ReaderInfo_rch& reader)
512  {
513  if (preassociation_readers_.erase(reader)) {
514  SequenceNumberMultiset::iterator pos = preassociation_reader_start_sns_.find(reader->start_sn_);
515  OPENDDS_ASSERT(pos != preassociation_reader_start_sns_.end());
516  preassociation_reader_start_sns_.erase(pos);
517  }
518  }
519 
520  void initialize_heartbeat(const SingleSendBuffer::Proxy& proxy,
521  MetaSubmessage& meta_submessage);
522  void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy& proxy,
523  MetaSubmessageVec& meta_submessages,
524  MetaSubmessage& meta_submessage,
525  const ReaderInfo_rch& reader);
526 
527  void log_remote_counts(const char* funcname);
528 
529  public:
530  RtpsWriter(const TransportClient_rch& client, const RtpsUdpDataLink_rch& link,
531  const GUID_t& id, bool durable,
532  SequenceNumber max_sn, CORBA::Long heartbeat_count, size_t capacity);
533  virtual ~RtpsWriter();
534 
535  SequenceNumber max_data_seq(const SingleSendBuffer::Proxy& proxy,
536  const ReaderInfo_rch&) const;
537  SequenceNumber update_max_sn(const GUID_t& reader, SequenceNumber seq);
538  void add_elem_awaiting_ack(TransportQueueElement* element);
539 
540  RemoveResult remove_sample(const DataSampleElement* sample);
541  void remove_all_msgs();
542 
543  bool add_reader(const ReaderInfo_rch& reader);
544  bool has_reader(const GUID_t& id) const;
545  bool is_leading(const GUID_t& id) const;
546  bool remove_reader(const GUID_t& id);
547  size_t reader_count() const;
548  CORBA::Long inc_heartbeat_count();
549 
550  void pre_stop_helper(TqeVector& to_drop, bool true_stop);
551  TransportQueueElement* customize_queue_element_helper(TransportQueueElement* element,
552  bool requires_inline_qos,
553  MetaSubmessageVec& meta_submessages,
554  bool& deliver_after_send);
555 
556  void process_acknack(const RTPS::AckNackSubmessage& acknack,
557  const GUID_t& src,
558  MetaSubmessageVec& meta_submessages);
559  void process_nackfrag(const RTPS::NackFragSubmessage& nackfrag,
560  const GUID_t& src,
561  MetaSubmessageVec& meta_submessages);
562  void process_acked_by_all();
563  void gather_nack_replies_i(MetaSubmessageVec& meta_submessages);
564  void gather_heartbeats_i(MetaSubmessageVec& meta_submessages);
565  void gather_heartbeats(RcHandle<ConstSharedRepoIdSet> additional_guids,
566  MetaSubmessageVec& meta_submessages);
567  void update_required_acknack_count(const GUID_t& id, CORBA::Long current);
568 
569  RcHandle<SingleSendBuffer> get_send_buff() { return send_buff_; }
571  {
572  ACE_Guard<ACE_Thread_Mutex> guard(remote_reader_guids_mutex_);
573  return remote_reader_guids_;
574  }
575  };
577 
578 #ifdef ACE_HAS_CPP11
579  typedef OPENDDS_UNORDERED_MAP(GUID_t, RtpsWriter_rch) RtpsWriterMap;
580 #else
581  typedef OPENDDS_MAP_CMP(GUID_t, RtpsWriter_rch, GUID_tKeyLessThan) RtpsWriterMap;
582 #endif
583  RtpsWriterMap writers_;
584 
585 
586  // RTPS reliability support for local readers:
587 
588  struct WriterInfo : RcObject {
589  const GUID_t id_;
593  HeldMap held_;
596  CORBA::Long heartbeat_recvd_count_, hb_frag_recvd_count_;
598 
599  WriterInfo(const GUID_t& id,
600  const MonotonicTime_t& participant_discovered_at,
601  ACE_CDR::ULong participant_flags)
602  : id_(id)
603  , participant_discovered_at_(participant_discovered_at)
604  , hb_last_(SequenceNumber::ZERO())
605  , heartbeat_recvd_count_(0)
606  , hb_frag_recvd_count_(0)
607  , participant_flags_(participant_flags)
608  { }
609 
610  bool should_nack() const;
611  bool sends_directed_hb() const;
612  };
614 #ifdef ACE_HAS_CPP11
615  typedef OPENDDS_UNORDERED_MAP(GUID_t, WriterInfo_rch) WriterInfoMap;
616 #else
617  typedef OPENDDS_MAP_CMP(GUID_t, WriterInfo_rch, GUID_tKeyLessThan) WriterInfoMap;
618 #endif
619  typedef OPENDDS_SET(WriterInfo_rch) WriterInfoSet;
620 
621  class RtpsReader : public virtual RcObject {
622  public:
623  RtpsReader(const RtpsUdpDataLink_rch& link, const GUID_t& id);
624  virtual ~RtpsReader();
625 
626  bool add_writer(const WriterInfo_rch& info);
627  bool has_writer(const GUID_t& id) const;
628  bool remove_writer(const GUID_t& id);
629  size_t writer_count() const;
630 
631  bool should_nack_fragments(const RcHandle<RtpsUdpDataLink>& link,
632  const WriterInfo_rch& info);
633 
634  void pre_stop_helper();
635 
636  void process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
637  const GUID_t& src,
638  bool directed,
639  MetaSubmessageVec& meta_submessages);
640  bool process_data_i(const RTPS::DataSubmessage& data, const GUID_t& src, MetaSubmessageVec& meta_submessages);
641  void process_gap_i(const RTPS::GapSubmessage& gap,
642  const GUID_t& src,
643  bool directed,
644  MetaSubmessageVec& meta_submessages);
645  void process_heartbeat_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
646  const GUID_t& src,
647  bool directed,
648  MetaSubmessageVec& meta_submessages);
649  void deliver_held_data(const GUID_t& src);
650 
651  const GUID_t& id() const { return id_; }
652 
653  void log_remote_counts(const char* funcname);
654 
655  private:
656  void send_preassociation_acknacks(const MonotonicTimePoint& now);
657  void gather_preassociation_acknack_i(MetaSubmessageVec& meta_submessages,
658  const WriterInfo_rch& writer);
659 
660  void gather_ack_nacks_i(const WriterInfo_rch& writer,
661  const RtpsUdpDataLink_rch& link,
662  bool heartbeat_was_non_final,
663  MetaSubmessageVec& meta_submessages,
664  ACE_CDR::ULong& cumulative_bits_added);
665  void generate_nack_frags_i(MetaSubmessageVec& meta_submessages,
666  const WriterInfo_rch& wi,
667  EntityId_t reader_id,
668  EntityId_t writer_id,
669  ACE_CDR::ULong& cumulative_bits_added);
670 
673  const GUID_t id_;
674  WriterInfoMap remote_writers_;
675  WriterInfoSet preassociation_writers_;
676  bool stopping_;
680  };
682 
683  typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec;
684  typedef OPENDDS_MAP_CMP(GUID_t, MetaSubmessageIterVec, GUID_tKeyLessThan) DestMetaSubmessageMap;
685 #ifdef ACE_HAS_CPP11
686  typedef OPENDDS_UNORDERED_MAP(AddressCacheEntryProxy, DestMetaSubmessageMap) AddrDestMetaSubmessageMap;
687 #else
688  typedef OPENDDS_MAP(AddressCacheEntryProxy, DestMetaSubmessageMap) AddrDestMetaSubmessageMap;
689 #endif
690  typedef OPENDDS_VECTOR(MetaSubmessageIterVec) MetaSubmessageIterVecVec;
691  typedef OPENDDS_SET(CORBA::Long) CountSet;
692  typedef OPENDDS_MAP_CMP(EntityId_t, CountSet, EntityId_tKeyLessThan) IdCountSet;
693  struct CountMapPair {
694  CountMapPair() : undirected_(false), is_new_assigned_(false), new_(-1) {}
698  };
699  typedef OPENDDS_MAP(CORBA::Long, CountMapPair) CountMap;
700  struct CountMapping {
701  CountMap map_;
702  CountMap::iterator next_directed_unassigned_;
703  CountMap::iterator next_undirected_unassigned_;
704  };
706 
707  struct CountKeeper {
708  IdCountMapping heartbeat_counts_;
709  IdCountSet nackfrag_counts_;
710  };
711 
712 public:
713  struct Bundle {
714  explicit Bundle(const AddressCacheEntryProxy& proxy) : proxy_(proxy), size_(0) { submessages_.reserve(initial_bundle_size); }
715  MetaSubmessageIterVec submessages_; // a vectors of iterators pointing to meta_submessages
716  AddressCacheEntryProxy proxy_; // a bundle's destination address
717  size_t size_; // bundle message size
718  };
719 
720  typedef OPENDDS_VECTOR(Bundle) BundleVec;
721 
722 private:
723  void build_meta_submessage_map(MetaSubmessageVec& meta_submessages, AddrDestMetaSubmessageMap& addr_map);
724  void bundle_mapped_meta_submessages(
725  const Encoding& encoding,
726  AddrDestMetaSubmessageMap& addr_map,
727  BundleVec& bundles,
728  CountKeeper& counts);
729 
730  void queue_submessages(MetaSubmessageVec& meta_submessages);
731  void update_required_acknack_count(const GUID_t& local_id, const GUID_t& remote_id, CORBA::Long current);
732  void bundle_and_send_submessages(MetaSubmessageVec& meta_submessages);
733 
736  OPENDDS_VECTOR(MetaSubmessageVec) fsq_vec_;
738 
739  void harvest_send_queue(const MonotonicTimePoint& now);
741  void flush_send_queue(const MonotonicTimePoint& now);
742  void flush_send_queue_i();
744 
746 
747 #ifdef ACE_HAS_CPP11
748  typedef OPENDDS_UNORDERED_MAP(GUID_t, RtpsReader_rch) RtpsReaderMap;
749 #else
750  typedef OPENDDS_MAP_CMP(GUID_t, RtpsReader_rch, GUID_tKeyLessThan) RtpsReaderMap;
751 #endif
752  RtpsReaderMap readers_;
753 
754  typedef OPENDDS_MULTIMAP_CMP(GUID_t, RtpsReader_rch, GUID_tKeyLessThan) RtpsReaderMultiMap;
755  RtpsReaderMultiMap readers_of_writer_; // keys are remote data writer GUIDs
756 
757  WriterToSeqReadersMap writer_to_seq_best_effort_readers_;
758 
759  /// What was once a single lock for the whole datalink is now split between three (four including ch_lock_):
760  /// - readers_lock_ protects readers_, readers_of_writer_, pending_reliable_readers_, interesting_writers_, and
761  /// writer_to_seq_best_effort_readers_ along with anything else that fits the 'reader side activity' of the datalink
762  /// - writers_lock_ protects writers_, heartbeat_counts_ best_effort_heartbeat_count_, and interesting_readers_
763  /// along with anything else that fits the 'writers side activity' of the datalink
764  /// - locators_lock_ protects locators_ (and therefore calls to get_addresses_i())
765  /// for both remote writers and remote readers
766  /// - send_queues_lock protects thread_send_queues_
770 
771  /// Extend the FragmentNumberSet to cover the fragments that are
772  /// missing from our last known fragment to the extent
773  /// @param fnSet FragmentNumberSet for the message sequence number
774  /// in question
775  /// @param extent is the highest fragment sequence number for this
776  /// FragmentNumberSet
777  static void extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
778  CORBA::ULong extent,
779  ACE_CDR::ULong& cumulative_bits_added);
780 
781  void durability_resend(TransportQueueElement* element, size_t& cumulative_send_count);
782  void durability_resend(TransportQueueElement* element, const RTPS::FragmentNumberSet& fragmentSet, size_t& cumulative_send_count);
783 
784  static bool include_fragment(const TransportQueueElement& element,
785  const DisjointSequence& fragments,
786  SequenceNumber& lastFragment);
787 
788  template<typename T, typename FN>
789  void datawriter_dispatch(const T& submessage, const GuidPrefix_t& src_prefix,
790  const FN& func)
791  {
792  const GUID_t local = make_id(local_prefix_, submessage.writerId);
793  const GUID_t src = make_id(src_prefix, submessage.readerId);
794 
795  OPENDDS_VECTOR(RtpsWriter_rch) to_call;
796  {
797  ACE_GUARD(ACE_Thread_Mutex, g, writers_lock_);
798  const RtpsWriterMap::iterator rw = writers_.find(local);
799  if (rw == writers_.end()) {
801  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datawriter_dispatch - %C -> %C unknown local writer\n", LogGuid(local).c_str(), LogGuid(src).c_str()));
802  }
803  return;
804  }
805  to_call.push_back(rw->second);
806  }
807  MetaSubmessageVec meta_submessages;
808  for (OPENDDS_VECTOR(RtpsWriter_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
809  RtpsWriter& writer = **it;
810  (writer.*func)(submessage, src, meta_submessages);
811  }
812  queue_submessages(meta_submessages);
813  }
814 
815  template<typename T, typename FN>
816  void datareader_dispatch(const T& submessage,
817  const GuidPrefix_t& src_prefix,
818  bool directed,
819  const FN& func)
820  {
821  const GUID_t local = make_id(local_prefix_, submessage.readerId);
822  const GUID_t src = make_id(src_prefix, submessage.writerId);
823 
824  OPENDDS_VECTOR(RtpsReader_rch) to_call;
825  {
826  ACE_GUARD(ACE_Thread_Mutex, g, readers_lock_);
827  if (local.entityId == ENTITYID_UNKNOWN) {
828  typedef std::pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> RRMM_IterRange;
829  for (RRMM_IterRange iters = readers_of_writer_.equal_range(src); iters.first != iters.second; ++iters.first) {
830  to_call.push_back(iters.first->second);
831  }
832  if (to_call.empty()) {
834  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datawreader_dispatch - %C -> X no local readers\n", LogGuid(src).c_str()));
835  }
836  return;
837  }
838  } else {
839  const RtpsReaderMap::iterator rr = readers_.find(local);
840  if (rr == readers_.end()) {
842  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datareader_dispatch - %C -> %C unknown local reader\n", LogGuid(src).c_str(), LogGuid(local).c_str()));
843  }
844  return;
845  }
846  to_call.push_back(rr->second);
847  }
848  }
849  MetaSubmessageVec meta_submessages;
850  for (OPENDDS_VECTOR(RtpsReader_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
851  RtpsReader& reader = **it;
852  (reader.*func)(submessage, src, directed, meta_submessages);
853  }
854  queue_submessages(meta_submessages);
855  }
856 
857  void send_heartbeats(const MonotonicTimePoint& now);
858  void check_heartbeats(const MonotonicTimePoint& now);
859 
861 
864 
865  /// Data structure representing an "interesting" remote entity for static discovery.
867  /// id of local entity that is interested in this remote.
869  /// addresses of this entity
870  AddrSet addresses;
871  /// Callback to invoke.
873  /**
874  * Timestamp indicating the last HeartBeat or AckNack received from the
875  * remote entity.
876  */
878  /// Current status of the remote entity.
879  enum { DOES_NOT_EXIST, EXISTS } status;
880 
882  InterestingRemote(const GUID_t& w, const AddrSet& a, DiscoveryListener* l)
883  : localid(w)
884  , addresses(a)
885  , listener(l)
886  , status(DOES_NOT_EXIST)
887  { }
888  };
889  typedef OPENDDS_MULTIMAP_CMP(GUID_t, InterestingRemote, GUID_tKeyLessThan) InterestingRemoteMapType;
890  InterestingRemoteMapType interesting_readers_;
891  InterestingRemoteMapType interesting_writers_;
892 
893  typedef std::pair<GUID_t, InterestingRemote> CallbackType;
894 
895  TransportQueueElement* customize_queue_element_non_reliable_i(TransportQueueElement* element,
896  bool requires_inline_qos,
897  MetaSubmessageVec& meta_submessages,
898  bool& deliver_after_send,
900 
901  void send_heartbeats_manual_i(const TransportSendControlElement* tsce,
902  MetaSubmessageVec& meta_submessages);
903 
905  CountMapType heartbeat_counts_;
906 
907  const size_t max_bundle_size_;
910 
912  public:
914  : writer_id_(GUID_UNKNOWN)
915  {}
916 
917  DeliverHeldData(RtpsReader_rch reader,
918  const GUID_t& writer_id)
919  : reader_(reader)
920  , writer_id_(writer_id)
921  {}
922 
923  ~DeliverHeldData();
924 
925  private:
926  RtpsReader_rch reader_;
928  };
929 
930 #ifdef OPENDDS_SECURITY
936 #endif
937 
938  void accumulate_addresses(const GUID_t& local, const GUID_t& remote, AddrSet& addresses, bool prefer_unicast = false) const;
939 
942 };
943 
944 } // namespace DCPS
945 } // namespace OpenDDS
946 
948 
949 #if defined ACE_HAS_CPP11
950 namespace std
951 {
952 
953  template<> struct OpenDDS_Rtps_Udp_Export hash<OpenDDS::DCPS::AddressCacheEntryProxy>
954  {
955  std::size_t operator()(const OpenDDS::DCPS::AddressCacheEntryProxy& val) const noexcept
956  {
957  return val.hash();
958  }
959  };
960 
961 } // namespace std
962 #endif
963 
964 #ifdef __ACE_INLINE__
965 # include "RtpsUdpDataLink.inl"
966 #endif /* __ACE_INLINE__ */
967 
968 #endif /* OPENDDS_DCPS_TRANSPORT_RTPS_UDP_RTPSUDPDATALINK_H */
sequence< Submessage > SubmessageSeq
Definition: RtpsCore.idl:885
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
#define ACE_DEBUG(X)
ACE_CDR::Long Long
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
EventDispatcher_rch event_dispatcher()
Bundle(const AddressCacheEntryProxy &proxy)
FibonacciSequence< TimeDuration > fallback_
typedef OPENDDS_MAP(OPENDDS_STRING, ICE::AgentInfo) AgentInfoMap
SequenceNumber previous() const
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > bundle_allocator_
RemoteInfo(const AddrSet &unicast_addrs, const AddrSet &multicast_addrs, bool iqos)
WeakRcHandle< TransportClient > client_
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
RcHandle< WriterInfo > WriterInfo_rch
RcHandle< ReaderInfo > ReaderInfo_rch
bool log_dropped_messages
Log received RTPS messages that were dropped.
DataBlockAllocator db_allocator_
GuidSet RepoIdSet
Definition: GuidUtils.h:113
ReaderInfo(const GUID_t &id, bool durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, const SequenceNumber &start_sn)
Security::HandleRegistry_rch handle_registry() const
RcHandle< SporadicEvent > harvest_send_queue_sporadic_
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
AddrSet addresses
addresses of this entity
const bool is_ps_writer_
Partcicipant Secure (Reliable SPDP) writer.
#define OPENDDS_MULTIMAP(K, T)
ACE_Thread_Mutex & transport_statistics_mutex_
SeqReaders(const GUID_t &id)
RtpsReaderMultiMap readers_of_writer_
RcHandle< SingleSendBuffer > send_buff_
#define OPENDDS_MULTISET(K)
AddressCache< BundlingCacheKey > BundlingCache
const GuidPrefix_t & local_prefix() const
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
InternalTransportStatistics & transport_statistics_
#define OPENDDS_MULTIMAP_CMP(K, T, C)
RcHandle< ReaderInfoSetHolder > ReaderInfoSetHolder_rch
LM_DEBUG
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
RcHandle< RtpsWriter > RtpsWriter_rch
ReaderInfoSet readers_expecting_heartbeat_
These readers have sent a non-final ack are are expecting a heartbeat.
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > custom_allocator_
ACE_CDR::ULong ULong
ACE_HANDLE open(const char *filename, int mode, mode_t perms=ACE_DEFAULT_OPEN_PERMS, LPSECURITY_ATTRIBUTES sa=0)
static const void * body(MD5_CTX *ctx, const void *data, unsigned long size)
Definition: Hash.cpp:115
InterestingRemoteMapType interesting_readers_
SequenceNumberMultiset preassociation_reader_start_sns_
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
InterestingRemote(const GUID_t &w, const AddrSet &a, DiscoveryListener *l)
Holds a data sample received by the transport.
RcHandle< TransportClient > TransportClient_rch
STL namespace.
#define OPENDDS_SET_CMP(K, C)
TransactionalRtpsSendQueue sq_
RcHandle< SporadicEvent > preassociation_task_
RcHandle< SingleSendBuffer > get_send_buff()
WriterInfo(const GUID_t &id, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags)
Security::SecurityConfig_rch security_config() const
#define OPENDDS_MAP_CMP(K, V, C)
size_t size_
void replay_durable_data(const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
Definition: DataLink.cpp:1239
Data structure representing an "interesting" remote entity for static discovery.
void datawriter_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
MultiSendBuffer(RtpsUdpDataLink *outer, size_t capacity)
RcHandle< RtpsReader > RtpsReader_rch
RcHandle< JobQueue > get_job_queue() const
const bool is_pvs_writer_
Participant Volatile Secure writer.
RcHandle< JobQueue > job_queue_
RcHandle< ConstSharedRepoIdSet > remote_reader_guids_
ACE_UINT32 ULong
ReplayDurableData(WeakRcHandle< RtpsUdpDataLink > link, const GUID_t &local_pub_id, const GUID_t &remote_sub_id)
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
DiscoveryListener * listener
Callback to invoke.
Security::SecurityConfig_rch security_config_
ACE_INT32 Long
Security::HandleRegistry_rch handle_registry_
ACE_SOCK_Dgram_Mcast multicast_socket_
MessageBlockAllocator mb_allocator_
RcHandle< ICE::Agent > ice_agent_
RcHandle< ConstSharedRepoIdSet > get_remote_reader_guids()
static bool force_inline_qos_
static member used by testing code to force inline qos
std::pair< GUID_t, InterestingRemote > CallbackType
GUID_t localid
id of local entity that is interested in this remote.
RcHandle< SporadicEvent > flush_send_queue_sporadic_
EventDispatcher_rch event_dispatcher_
DeliverHeldData(RtpsReader_rch reader, const GUID_t &writer_id)
Sequence number abstraction. Only allows positive 64 bit values.
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
WeakRcHandle< RtpsUdpDataLink > link_
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
InterestingRemoteMapType interesting_writers_
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
#define OPENDDS_SET(K)
RcHandle< T > lock() const
Definition: RcObject.h:188
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
AddressCache< LocatorCacheKey > LocatorCache
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
RcHandle< PeriodicEvent > heartbeatchecker_
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
const size_t initial_bundle_size
unique_ptr< DataBlockLockPool > db_lock_pool_
RcHandle< PeriodicEvent > heartbeat_
Base wrapper class around a data/control sample to be sent.
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
void handle_event()
Called when the event is dispatched by an EventDispatcher.
typedef OPENDDS_VECTOR(ACE_INET_Addr) AddressListType
#define OpenDDS_Rtps_Udp_Export
void remove_preassociation_reader(const ReaderInfo_rch &reader)