6 #ifndef OPENDDS_DCPS_TRANSPORT_RTPS_UDP_RTPSUDPDATALINK_H 7 #define OPENDDS_DCPS_TRANSPORT_RTPS_UDP_RTPSUDPDATALINK_H 42 #ifdef OPENDDS_SECURITY 48 #ifdef OPENDDS_SECURITY 49 # include <dds/DdsSecurityCoreC.h> 57 # include <functional> 97 const RtpsUdpInst_rch& config,
107 void remove_all_msgs(
const GUID_t& pub_id);
109 RtpsUdpInst_rch config()
const;
113 bool reactor_is_shut_down();
157 void remove_locator_and_bundling_cache(
const GUID_t& remote_id);
161 void update_locators(
const GUID_t& remote_id,
162 AddrSet& unicast_addresses,
163 AddrSet& multicast_addresses,
164 bool requires_inline_qos,
169 AddrSet get_addresses(
const GUID_t& local,
const GUID_t& remote)
const;
171 AddrSet get_addresses(
const GUID_t& local)
const;
175 int make_reservation(
const GUID_t& remote_publication_id,
176 const GUID_t& local_subscription_id,
180 bool associated(
const GUID_t& local,
const GUID_t& remote,
181 bool local_reliable,
bool remote_reliable,
182 bool local_durable,
bool remote_durable,
186 const TransportClient_rch& client,
187 AddrSet& unicast_addresses,
188 AddrSet& multicast_addresses,
190 bool requires_inline_qos);
192 void disassociated(
const GUID_t& local,
const GUID_t& remote);
194 void register_for_reader(
const GUID_t& writerid,
196 const AddrSet& addresses,
199 void unregister_for_reader(
const GUID_t& writerid,
202 void register_for_writer(
const GUID_t& readerid,
204 const AddrSet& addresses,
207 void unregister_for_writer(
const GUID_t& readerid,
210 void client_stop(
const GUID_t& localId);
212 virtual void pre_stop_i();
214 #ifdef OPENDDS_SECURITY 219 virtual bool is_leading(
const GUID_t& writer_id,
220 const GUID_t& reader_id)
const;
222 #ifdef OPENDDS_SECURITY 226 return security_config_;
232 return handle_registry_;
238 static bool separate_message(
EntityId_t entity);
243 void enable_response_queue();
244 void disable_response_queue(
bool send_immediately);
246 bool requires_inline_qos(
const GUIDSeq_var& peers);
255 AddrSet get_addresses_i(
const GUID_t& local,
const GUID_t& remote)
const;
256 AddrSet get_addresses_i(
const GUID_t& local)
const;
258 virtual void stop_i();
263 virtual void release_reservations_i(
const GUID_t& remote_id,
266 friend class ::DDS_TEST;
280 RemoteInfo() : unicast_addrs_(), multicast_addrs_(), requires_inline_qos_(false), ref_count_(0) {}
281 RemoteInfo(
const AddrSet& unicast_addrs,
const AddrSet& multicast_addrs,
bool iqos)
282 : unicast_addrs_(unicast_addrs), multicast_addrs_(multicast_addrs), requires_inline_qos_(iqos), ref_count_(0) {}
289 bool insert_recv_addr(AddrSet& aset)
const;
340 typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap;
356 #ifdef OPENDDS_SECURITY 367 , participant_discovered_at_(participant_discovered_at)
368 , acknack_recvd_count_(0)
369 , nackfrag_recvd_count_(0)
372 , participant_flags_(participant_flags)
373 , required_acknack_count_(0)
374 , start_sn_(start_sn)
375 #ifdef OPENDDS_SECURITY
381 void expunge_durable_data();
382 bool expecting_durable_data()
const;
384 bool reflects_heartbeat_count()
const;
389 typedef OPENDDS_UNORDERED_MAP(
GUID_t, ReaderInfo_rch) ReaderInfoMap;
404 , local_pub_id_(local_pub_id)
405 , remote_sub_id_(remote_sub_id)
452 #ifdef OPENDDS_SECURITY 474 MetaSubmessageVec& meta_submessages);
477 MetaSubmessageVec& meta_submessages);
478 void send_heartbeats_manual_i(MetaSubmessageVec& meta_submessages);
479 void gather_gaps_i(
const ReaderInfo_rch& reader,
481 MetaSubmessageVec& meta_submessages);
482 void acked_by_all_helper_i(TqeSet& to_deliver);
483 SequenceNumber expected_max_sn(
const ReaderInfo_rch& reader)
const;
484 static void snris_insert(RtpsUdpDataLink::SNRIS& snris,
const ReaderInfo_rch& reader);
485 static void snris_erase(RtpsUdpDataLink::SNRIS& snris,
const SequenceNumber sn,
const ReaderInfo_rch& reader);
487 void make_lagger_leader(
const ReaderInfo_rch& reader,
const SequenceNumber previous_acked_sn);
488 bool is_lagging(
const ReaderInfo_rch& reader)
const;
489 bool is_leading(
const ReaderInfo_rch& reader)
const;
490 void check_leader_lagger()
const;
492 void update_remote_guids_cache_i(
bool add,
const GUID_t& guid);
494 #ifdef OPENDDS_SECURITY 497 bool is_pvs_writer()
const {
return false; }
502 if (!proxy.
empty()) {
513 if (preassociation_readers_.erase(reader)) {
514 SequenceNumberMultiset::iterator pos = preassociation_reader_start_sns_.find(reader->start_sn_);
516 preassociation_reader_start_sns_.erase(pos);
523 MetaSubmessageVec& meta_submessages,
525 const ReaderInfo_rch& reader);
527 void log_remote_counts(
const char* funcname);
531 const GUID_t&
id,
bool durable,
536 const ReaderInfo_rch&)
const;
541 void remove_all_msgs();
543 bool add_reader(
const ReaderInfo_rch& reader);
544 bool has_reader(
const GUID_t&
id)
const;
545 bool is_leading(
const GUID_t&
id)
const;
546 bool remove_reader(
const GUID_t&
id);
547 size_t reader_count()
const;
550 void pre_stop_helper(TqeVector& to_drop,
bool true_stop);
552 bool requires_inline_qos,
553 MetaSubmessageVec& meta_submessages,
554 bool& deliver_after_send);
558 MetaSubmessageVec& meta_submessages);
561 MetaSubmessageVec& meta_submessages);
562 void process_acked_by_all();
563 void gather_nack_replies_i(MetaSubmessageVec& meta_submessages);
564 void gather_heartbeats_i(MetaSubmessageVec& meta_submessages);
566 MetaSubmessageVec& meta_submessages);
573 return remote_reader_guids_;
579 typedef OPENDDS_UNORDERED_MAP(
GUID_t, RtpsWriter_rch) RtpsWriterMap;
603 , participant_discovered_at_(participant_discovered_at)
605 , heartbeat_recvd_count_(0)
606 , hb_frag_recvd_count_(0)
607 , participant_flags_(participant_flags)
610 bool should_nack()
const;
611 bool sends_directed_hb()
const;
615 typedef OPENDDS_UNORDERED_MAP(
GUID_t, WriterInfo_rch) WriterInfoMap;
626 bool add_writer(
const WriterInfo_rch& info);
627 bool has_writer(
const GUID_t&
id)
const;
628 bool remove_writer(
const GUID_t&
id);
629 size_t writer_count()
const;
632 const WriterInfo_rch& info);
634 void pre_stop_helper();
639 MetaSubmessageVec& meta_submessages);
644 MetaSubmessageVec& meta_submessages);
648 MetaSubmessageVec& meta_submessages);
649 void deliver_held_data(
const GUID_t& src);
653 void log_remote_counts(
const char* funcname);
657 void gather_preassociation_acknack_i(MetaSubmessageVec& meta_submessages,
658 const WriterInfo_rch& writer);
660 void gather_ack_nacks_i(
const WriterInfo_rch& writer,
662 bool heartbeat_was_non_final,
663 MetaSubmessageVec& meta_submessages,
665 void generate_nack_frags_i(MetaSubmessageVec& meta_submessages,
666 const WriterInfo_rch& wi,
683 typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec;
690 typedef OPENDDS_VECTOR(MetaSubmessageIterVec) MetaSubmessageIterVecVec;
694 CountMapPair() : undirected_(false), is_new_assigned_(false), new_(-1) {}
723 void build_meta_submessage_map(MetaSubmessageVec& meta_submessages, AddrDestMetaSubmessageMap& addr_map);
724 void bundle_mapped_meta_submessages(
726 AddrDestMetaSubmessageMap& addr_map,
730 void queue_submessages(MetaSubmessageVec& meta_submessages);
732 void bundle_and_send_submessages(MetaSubmessageVec& meta_submessages);
742 void flush_send_queue_i();
748 typedef OPENDDS_UNORDERED_MAP(
GUID_t, RtpsReader_rch) RtpsReaderMap;
777 static void extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
782 void durability_resend(
TransportQueueElement* element,
const RTPS::FragmentNumberSet& fragmentSet,
size_t& cumulative_send_count);
788 template<
typename T,
typename FN>
792 const GUID_t local =
make_id(local_prefix_, submessage.writerId);
798 const RtpsWriterMap::iterator rw = writers_.find(local);
799 if (rw == writers_.end()) {
801 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datawriter_dispatch - %C -> %C unknown local writer\n",
LogGuid(local).c_str(),
LogGuid(src).c_str()));
805 to_call.push_back(rw->second);
807 MetaSubmessageVec meta_submessages;
808 for (
OPENDDS_VECTOR(RtpsWriter_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
810 (writer.*func)(submessage, src, meta_submessages);
812 queue_submessages(meta_submessages);
815 template<
typename T,
typename FN>
821 const GUID_t local =
make_id(local_prefix_, submessage.readerId);
828 typedef std::pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> RRMM_IterRange;
829 for (RRMM_IterRange iters = readers_of_writer_.equal_range(src); iters.first != iters.second; ++iters.first) {
830 to_call.push_back(iters.first->second);
832 if (to_call.empty()) {
834 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datawreader_dispatch - %C -> X no local readers\n",
LogGuid(src).c_str()));
839 const RtpsReaderMap::iterator rr = readers_.find(local);
840 if (rr == readers_.end()) {
842 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datareader_dispatch - %C -> %C unknown local reader\n",
LogGuid(src).c_str(),
LogGuid(local).c_str()));
846 to_call.push_back(rr->second);
849 MetaSubmessageVec meta_submessages;
850 for (
OPENDDS_VECTOR(RtpsReader_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
852 (reader.*func)(submessage, src, directed, meta_submessages);
854 queue_submessages(meta_submessages);
879 enum { DOES_NOT_EXIST, EXISTS } status;
886 , status(DOES_NOT_EXIST)
896 bool requires_inline_qos,
897 MetaSubmessageVec& meta_submessages,
898 bool& deliver_after_send,
902 MetaSubmessageVec& meta_submessages);
920 , writer_id_(writer_id)
930 #ifdef OPENDDS_SECURITY 938 void accumulate_addresses(
const GUID_t& local,
const GUID_t& remote, AddrSet& addresses,
bool prefer_unicast =
false)
const;
949 #if defined ACE_HAS_CPP11 964 #ifdef __ACE_INLINE__
sequence< Submessage > SubmessageSeq
ACE_Thread_Mutex remote_reader_guids_mutex_
const ACE_CDR::ULong participant_flags_
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
TimeDuration heartbeat_period_
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
EventDispatcher_rch event_dispatcher()
AddressCacheEntryProxy proxy_
CountMapType heartbeat_counts_
Bundle(const AddressCacheEntryProxy &proxy)
FibonacciSequence< TimeDuration > fallback_
typedef OPENDDS_MAP(OPENDDS_STRING, ICE::AgentInfo) AgentInfoMap
SequenceNumber previous() const
OpenDDS_Dcps_Export TransportDebug transport_debug
ACE_Thread_Mutex fsq_mutex_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
BundlingCache bundling_cache_
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > bundle_allocator_
const GUID_t remote_sub_id_
RemoteInfo(const AddrSet &unicast_addrs, const AddrSet &multicast_addrs, bool iqos)
ReaderInfoMap remote_readers_
WeakRcHandle< TransportClient > client_
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
RcHandle< WriterInfo > WriterInfo_rch
RcHandle< ReaderInfo > ReaderInfo_rch
const SequenceNumber start_sn_
bool log_dropped_messages
Log received RTPS messages that were dropped.
ACE_Thread_Mutex elems_not_acked_mutex_
MonotonicTimePoint last_activity
DataBlockAllocator db_allocator_
ReaderInfo(const GUID_t &id, bool durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, const SequenceNumber &start_sn)
RepoIdSet pending_reliable_readers_
DisjointSequence requests_
Security::HandleRegistry_rch handle_registry() const
RcHandle< SporadicEvent > harvest_send_queue_sporadic_
#define OPENDDS_ASSERT(C)
WeakRcHandle< RtpsUdpDataLink > link_
AddrSet addresses
addresses of this entity
ACE_Thread_Mutex security_mutex_
const GUID_t & id() const
const bool is_ps_writer_
Partcicipant Secure (Reliable SPDP) writer.
#define OPENDDS_MULTIMAP(K, T)
ACE_SOCK_Dgram unicast_socket_
RequestedFragSeqMap requested_frags_
ACE_Thread_Mutex & transport_statistics_mutex_
CountMap::iterator next_directed_unassigned_
SequenceNumber low() const
ACE_Thread_Mutex readers_lock_
SeqReaders(const GUID_t &id)
CORBA::ULong FragmentNumberValue
RtpsReaderMultiMap readers_of_writer_
RcHandle< SporadicEvent > nack_response_
RcHandle< SingleSendBuffer > send_buff_
bool requires_inline_qos_
#define OPENDDS_MULTISET(K)
AddressCache< BundlingCacheKey > BundlingCache
const GuidPrefix_t & local_prefix() const
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
InternalTransportStatistics & transport_statistics_
CORBA::Long best_effort_heartbeat_count_
GuidPrefix_t local_prefix_
#define OPENDDS_MULTIMAP_CMP(K, T, C)
RcHandle< ReaderInfoSetHolder > ReaderInfoSetHolder_rch
const GUID_t local_pub_id_
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
RcHandle< RtpsWriter > RtpsWriter_rch
ReaderInfoSet readers_expecting_heartbeat_
These readers have sent a non-final ack are are expecting a heartbeat.
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > custom_allocator_
const MonotonicTime_t participant_discovered_at_
ACE_HANDLE open(const char *filename, int mode, mode_t perms=ACE_DEFAULT_OPEN_PERMS, LPSECURITY_ATTRIBUTES sa=0)
static const void * body(MD5_CTX *ctx, const void *data, unsigned long size)
CORBA::Long heartbeat_count_
InterestingRemoteMapType interesting_readers_
SequenceNumberMultiset preassociation_reader_start_sns_
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
InterestingRemote(const GUID_t &w, const AddrSet &a, DiscoveryListener *l)
Holds a data sample received by the transport.
RcHandle< TransportClient > TransportClient_rch
long ParticipantCryptoHandle
const TimeDuration initial_fallback_
MonotonicTimePoint durable_timestamp_
#define OPENDDS_SET_CMP(K, C)
TransactionalRtpsSendQueue sq_
RcHandle< SporadicEvent > preassociation_task_
RcHandle< SingleSendBuffer > get_send_buff()
WriterInfo(const GUID_t &id, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags)
const ACE_CDR::ULong participant_flags_
Security::SecurityConfig_rch security_config() const
#define OPENDDS_MAP_CMP(K, V, C)
MonotonicTimePoint last_recv_time_
void replay_durable_data(const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
Data structure representing an "interesting" remote entity for static discovery.
void datawriter_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
MultiSendBuffer(RtpsUdpDataLink *outer, size_t capacity)
RcHandle< RtpsReader > RtpsReader_rch
RcHandle< JobQueue > get_job_queue() const
const bool is_pvs_writer_
Participant Volatile Secure writer.
WriterInfoMap remote_writers_
RcHandle< JobQueue > job_queue_
RcHandle< ConstSharedRepoIdSet > remote_reader_guids_
NetworkAddress last_recv_addr_
IdCountMapping heartbeat_counts_
SnToTqeMap elems_not_acked_
ReplayDurableData(WeakRcHandle< RtpsUdpDataLink > link, const GUID_t &local_pub_id, const GUID_t &remote_sub_id)
IdCountSet nackfrag_counts_
ACE_Thread_Mutex locators_lock_
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
MulticastManager multicast_manager_
DiscoveryListener * listener
Callback to invoke.
Security::SecurityConfig_rch security_config_
WriterInfoSet preassociation_writers_
Security::HandleRegistry_rch handle_registry_
ACE_SOCK_Dgram_Mcast multicast_socket_
SequenceNumber pre_low() const
MessageBlockAllocator mb_allocator_
RcHandle< ICE::Agent > ice_agent_
CountMap::iterator next_undirected_unassigned_
RcHandle< ConstSharedRepoIdSet > get_remote_reader_guids()
bool is_pvs_writer() const
static bool force_inline_qos_
static member used by testing code to force inline qos
std::pair< GUID_t, InterestingRemote > CallbackType
MetaSubmessageIterVec submessages_
GUID_t localid
id of local entity that is interested in this remote.
RcHandle< SporadicEvent > flush_send_queue_sporadic_
EventDispatcher_rch event_dispatcher_
DeliverHeldData(RtpsReader_rch reader, const GUID_t &writer_id)
Sequence number abstraction. Only allows positive 64 bit values.
const size_t max_bundle_size_
WeakRcHandle< RtpsUdpDataLink > link_
CORBA::Long nackfrag_recvd_count_
DisjointSequence pvs_outstanding_
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
InterestingRemoteMapType interesting_writers_
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
ACE_Thread_Mutex writers_lock_
CORBA::Long nackfrag_count_
RcHandle< T > lock() const
int insert(Container &c, const ValueType &v)
LocatorCache locator_cache_
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
const EntityId_t ENTITYID_UNKNOWN
AddressCache< LocatorCacheKey > LocatorCache
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
SequenceNumber cur_cumulative_ack_
ReactorTask_rch reactor_task_
RcHandle< PeriodicEvent > heartbeatchecker_
SequenceNumber acked_sn() const
The Internal API and Implementation of OpenDDS.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
const size_t initial_bundle_size
unique_ptr< DataBlockLockPool > db_lock_pool_
RcHandle< PeriodicEvent > heartbeat_
Base wrapper class around a data/control sample to be sent.
const MonotonicTime_t participant_discovered_at_
void handle_event()
Called when the event is dispatched by an EventDispatcher.
ACE_CDR::Long required_acknack_count_
typedef OPENDDS_VECTOR(ACE_INET_Addr) AddressListType
#define OpenDDS_Rtps_Udp_Export
void remove_preassociation_reader(const ReaderInfo_rch &reader)
RcHandle< SporadicEvent > heartbeat_
SequenceNumber max_pvs_sn_
CORBA::Long heartbeat_recvd_count_