OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Static Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Static Private Attributes | Friends | List of all members
OpenDDS::DCPS::RtpsUdpDataLink Class Reference

#include <RtpsUdpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpDataLink:
Collaboration graph
[legend]

Classes

struct  Bundle
 
struct  CountKeeper
 
struct  CountMapPair
 
struct  CountMapping
 
class  DeliverHeldData
 
struct  InterestingRemote
 Data structure representing an "interesting" remote entity for static discovery. More...
 
struct  MultiSendBuffer
 
struct  ReaderInfo
 
struct  ReaderInfoSetHolder
 
struct  RemoteInfo
 
class  ReplayDurableData
 
class  RtpsReader
 
class  RtpsWriter
 
struct  WriterInfo
 

Public Member Functions

 RtpsUdpDataLink (const RtpsUdpTransport_rch &transport, const GuidPrefix_t &local_prefix, const RtpsUdpInst_rch &config, const ReactorTask_rch &reactor_task, InternalTransportStatistics &transport_statistics, ACE_Thread_Mutex &transport_statistics_mutex)
 
 ~RtpsUdpDataLink ()
 
bool add_delayed_notification (TransportQueueElement *element)
 
RemoveResult remove_sample (const DataSampleElement *sample)
 
void remove_all_msgs (const GUID_t &pub_id)
 
RtpsUdpInst_rch config () const
 
ACE_Reactorget_reactor ()
 
ReactorInterceptor_rch get_reactor_interceptor () const
 
bool reactor_is_shut_down ()
 
ACE_SOCK_Dgramunicast_socket ()
 
ACE_SOCK_Dgram_Mcastmulticast_socket ()
 
bool open (const ACE_SOCK_Dgram &unicast_socket)
 
void received (const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
 
void received (const RTPS::GapSubmessage &gap, const GuidPrefix_t &src_prefix, bool directed, const NetworkAddress &remote_addr)
 
void received (const RTPS::HeartBeatSubmessage &heartbeat, const GuidPrefix_t &src_prefix, bool directed, const NetworkAddress &remote_addr)
 
void received (const RTPS::HeartBeatFragSubmessage &hb_frag, const GuidPrefix_t &src_prefix, bool directed, const NetworkAddress &remote_addr)
 
void received (const RTPS::AckNackSubmessage &acknack, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
 
void received (const RTPS::NackFragSubmessage &nackfrag, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
 
const GuidPrefix_tlocal_prefix () const
 
void remove_locator_and_bundling_cache (const GUID_t &remote_id)
 
NetworkAddress get_last_recv_address (const GUID_t &remote_id)
 
void update_locators (const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
 
AddrSet get_addresses (const GUID_t &local, const GUID_t &remote) const
 
AddrSet get_addresses (const GUID_t &local) const
 Given a 'local' id, return the set of address for all remote peers. More...
 
void filterBestEffortReaders (const ReceivedDataSample &ds, RepoIdSet &selected, RepoIdSet &withheld)
 
int make_reservation (const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
 
bool associated (const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
 
void disassociated (const GUID_t &local, const GUID_t &remote)
 
void register_for_reader (const GUID_t &writerid, const GUID_t &readerid, const AddrSet &addresses, DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &readerid, const GUID_t &writerid, const AddrSet &addresses, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &readerid, const GUID_t &writerid)
 
void client_stop (const GUID_t &localId)
 
virtual void pre_stop_i ()
 
DCPS::RcHandle< ICE::Agentget_ice_agent () const
 
virtual DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint () const
 
virtual bool is_leading (const GUID_t &writer_id, const GUID_t &reader_id) const
 
Security::SecurityConfig_rch security_config () const
 
Security::HandleRegistry_rch handle_registry () const
 
DDS::Security::ParticipantCryptoHandle local_crypto_handle () const
 
void local_crypto_handle (DDS::Security::ParticipantCryptoHandle pch)
 
RtpsUdpTransport_rch transport ()
 
void enable_response_queue ()
 
void disable_response_queue (bool send_immediately)
 
bool requires_inline_qos (const GUIDSeq_var &peers)
 
EventDispatcher_rch event_dispatcher ()
 
RcHandle< JobQueueget_job_queue () const
 
typedef OPENDDS_VECTOR (Bundle) BundleVec
 
- Public Member Functions inherited from OpenDDS::DCPS::DataLink
 DataLink (const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object. More...
 
virtual ~DataLink ()
 
int handle_exception (ACE_HANDLE)
 Reactor invokes this after being notified in schedule_stop or cancel_release. More...
 
void schedule_stop (const MonotonicTimePoint &schedule_to_stop_at)
 
void stop ()
 The stop method is used to stop the DataLink prior to shutdown. More...
 
void resume_send ()
 
virtual int make_reservation (const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
 
void release_reservations (GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
 
void schedule_delayed_release ()
 
const TimeDurationdatalink_release_delay () const
 
void remove_listener (const GUID_t &local_id)
 
void send_start ()
 
void send (TransportQueueElement *element)
 
void send_stop (GUID_t repoId)
 
int data_received (ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
 
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
 
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object. More...
 
void transport_shutdown ()
 
void notify (ConnectionNotice notice)
 
void release_resources ()
 
void terminate_send ()
 
void terminate_send_if_suspended ()
 
bool is_target (const GUID_t &remote_id)
 
void clear_associations ()
 
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
 
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
 
Prioritytransport_priority ()
 
Priority transport_priority () const
 
bool & is_loopback ()
 
bool is_loopback () const
 
bool & is_active ()
 
bool is_active () const
 
bool cancel_release ()
 
ACE_Message_Blockcreate_control (char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr data)
 
GUIDSeqtarget_intersection (const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
 
TransportImpl_rch impl () const
 
void default_listener (const TransportReceiveListener_wrch &trl)
 
TransportReceiveListener_wrch default_listener () const
 
bool add_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void remove_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void invoke_on_start_callbacks (bool success)
 
bool invoke_on_start_callbacks (const GUID_t &local, const GUID_t &remote, bool success)
 
void remove_startup_callbacks (const GUID_t &local, const GUID_t &remote)
 
void set_scheduling_release (bool scheduling_release)
 
virtual void send_final_acks (const GUID_t &readerid)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >
 InternalDataReaderListener ()
 
 InternalDataReaderListener (JobQueue_rch job_queue)
 
void job_queue (JobQueue_rch job_queue)
 
virtual void on_data_available (InternalDataReader_rch reader)=0
 
void schedule (InternalDataReader_rch reader)
 

Static Public Member Functions

static bool separate_message (EntityId_t entity)
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 

Private Types

typedef CORBA::ULong FragmentNumberValue
 
typedef RcHandle< ReaderInfoReaderInfo_rch
 
typedef RcHandle< ReaderInfoSetHolderReaderInfoSetHolder_rch
 
typedef RcHandle< RtpsWriterRtpsWriter_rch
 
typedef RcHandle< WriterInfoWriterInfo_rch
 
typedef RcHandle< RtpsReaderRtpsReader_rch
 
typedef std::pair< GUID_t, InterestingRemoteCallbackType
 

Private Member Functions

void on_data_available (RcHandle< InternalDataReader< NetworkInterfaceAddress > > reader)
 
AddrSet get_addresses_i (const GUID_t &local, const GUID_t &remote) const
 
AddrSet get_addresses_i (const GUID_t &local) const
 
virtual void stop_i ()
 
virtual TransportQueueElementcustomize_queue_element (TransportQueueElement *element)
 
virtual void release_reservations_i (const GUID_t &remote_id, const GUID_t &local_id)
 
RtpsUdpSendStrategy_rch send_strategy ()
 
RtpsUdpReceiveStrategy_rch receive_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, RemoteInfo, GUID_tKeyLessThan) RemoteInfoMap
 
void update_last_recv_addr (const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
 
ACE_Message_Blockalloc_msgblock (size_t size, ACE_Allocator *data_allocator)
 
ACE_Message_Blocksubmsgs_to_msgblock (const RTPS::SubmessageSeq &subm)
 
RcHandle< SingleSendBufferget_writer_send_buffer (const GUID_t &pub_id)
 
typedef OPENDDS_MAP (FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
 
typedef OPENDDS_MAP (SequenceNumber, RequestedFragMap) RequestedFragSeqMap
 
typedef OPENDDS_MAP_CMP (GUID_t, ReaderInfo_rch, GUID_tKeyLessThan) ReaderInfoMap
 
typedef OPENDDS_SET (ReaderInfo_rch) ReaderInfoSet
 
typedef OPENDDS_MAP (SequenceNumber, ReaderInfoSetHolder_rch) SNRIS
 
typedef OPENDDS_MAP_CMP (GUID_t, RtpsWriter_rch, GUID_tKeyLessThan) RtpsWriterMap
 
typedef OPENDDS_MAP_CMP (GUID_t, WriterInfo_rch, GUID_tKeyLessThan) WriterInfoMap
 
typedef OPENDDS_SET (WriterInfo_rch) WriterInfoSet
 
typedef OPENDDS_VECTOR (MetaSubmessageVec::iterator) MetaSubmessageIterVec
 
typedef OPENDDS_MAP_CMP (GUID_t, MetaSubmessageIterVec, GUID_tKeyLessThan) DestMetaSubmessageMap
 
typedef OPENDDS_MAP (AddressCacheEntryProxy, DestMetaSubmessageMap) AddrDestMetaSubmessageMap
 
typedef OPENDDS_VECTOR (MetaSubmessageIterVec) MetaSubmessageIterVecVec
 
typedef OPENDDS_SET (CORBA::Long) CountSet
 
typedef OPENDDS_MAP_CMP (EntityId_t, CountSet, EntityId_tKeyLessThan) IdCountSet
 
typedef OPENDDS_MAP (CORBA::Long, CountMapPair) CountMap
 
typedef OPENDDS_MAP_CMP (EntityId_t, CountMapping, EntityId_tKeyLessThan) IdCountMapping
 
void build_meta_submessage_map (MetaSubmessageVec &meta_submessages, AddrDestMetaSubmessageMap &addr_map)
 
void bundle_mapped_meta_submessages (const Encoding &encoding, AddrDestMetaSubmessageMap &addr_map, BundleVec &bundles, CountKeeper &counts)
 
void queue_submessages (MetaSubmessageVec &meta_submessages)
 
void update_required_acknack_count (const GUID_t &local_id, const GUID_t &remote_id, CORBA::Long current)
 
void bundle_and_send_submessages (MetaSubmessageVec &meta_submessages)
 
 OPENDDS_VECTOR (MetaSubmessageVec) fsq_vec_
 
void harvest_send_queue (const MonotonicTimePoint &now)
 
void flush_send_queue (const MonotonicTimePoint &now)
 
void flush_send_queue_i ()
 
typedef OPENDDS_MAP_CMP (GUID_t, RtpsReader_rch, GUID_tKeyLessThan) RtpsReaderMap
 
typedef OPENDDS_MULTIMAP_CMP (GUID_t, RtpsReader_rch, GUID_tKeyLessThan) RtpsReaderMultiMap
 
void durability_resend (TransportQueueElement *element, size_t &cumulative_send_count)
 
void durability_resend (TransportQueueElement *element, const RTPS::FragmentNumberSet &fragmentSet, size_t &cumulative_send_count)
 
template<typename T , typename FN >
void datawriter_dispatch (const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
 
template<typename T , typename FN >
void datareader_dispatch (const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
 
void send_heartbeats (const MonotonicTimePoint &now)
 
void check_heartbeats (const MonotonicTimePoint &now)
 
typedef OPENDDS_MULTIMAP_CMP (GUID_t, InterestingRemote, GUID_tKeyLessThan) InterestingRemoteMapType
 
TransportQueueElementcustomize_queue_element_non_reliable_i (TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send, ACE_Guard< ACE_Thread_Mutex > &guard)
 
void send_heartbeats_manual_i (const TransportSendControlElement *tsce, MetaSubmessageVec &meta_submessages)
 
typedef OPENDDS_MAP_CMP (EntityId_t, CORBA::Long, EntityId_tKeyLessThan) CountMapType
 
void accumulate_addresses (const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const
 

Static Private Member Functions

static void extend_bitmap_range (RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent, ACE_CDR::ULong &cumulative_bits_added)
 
static bool include_fragment (const TransportQueueElement &element, const DisjointSequence &fragments, SequenceNumber &lastFragment)
 

Private Attributes

ReactorTask_rch reactor_task_
 
RcHandle< JobQueuejob_queue_
 
EventDispatcher_rch event_dispatcher_
 
GuidPrefix_t local_prefix_
 
RemoteInfoMap locators_
 
LocatorCache locator_cache_
 
BundlingCache bundling_cache_
 
ACE_SOCK_Dgram unicast_socket_
 
ACE_SOCK_Dgram_Mcast multicast_socket_
 
MessageBlockAllocator mb_allocator_
 
DataBlockAllocator db_allocator_
 
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutexcustom_allocator_
 
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutexbundle_allocator_
 
unique_ptr< DataBlockLockPooldb_lock_pool_
 
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
 
RtpsWriterMap writers_
 
TransactionalRtpsSendQueue sq_
 
ACE_Thread_Mutex fsq_mutex_
 
size_t fsq_vec_size_
 
RcHandle< SporadicEventharvest_send_queue_sporadic_
 
RcHandle< SporadicEventflush_send_queue_sporadic_
 
RepoIdSet pending_reliable_readers_
 
RtpsReaderMap readers_
 
RtpsReaderMultiMap readers_of_writer_
 
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
 
ACE_Thread_Mutex readers_lock_
 
ACE_Thread_Mutex writers_lock_
 
ACE_Thread_Mutex locators_lock_
 
CORBA::Long best_effort_heartbeat_count_
 
RcHandle< PeriodicEventheartbeat_
 
RcHandle< PeriodicEventheartbeatchecker_
 
InterestingRemoteMapType interesting_readers_
 
InterestingRemoteMapType interesting_writers_
 
CountMapType heartbeat_counts_
 
const size_t max_bundle_size_
 
InternalTransportStatisticstransport_statistics_
 
ACE_Thread_Mutextransport_statistics_mutex_
 
ACE_Thread_Mutex security_mutex_
 
Security::SecurityConfig_rch security_config_
 
Security::HandleRegistry_rch handle_registry_
 
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
 
RcHandle< ICE::Agentice_agent_
 
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
 
MulticastManager multicast_manager_
 

Static Private Attributes

static bool force_inline_qos_ = false
 static member used by testing code to force inline qos More...
 

Friends

class ::DDS_TEST
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::DataLink
enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }
 
typedef WeakRcHandle< TransportClientTransportClient_wrch
 
typedef std::pair< TransportClient_wrch, GUID_tOnStartCallback
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Public Types inherited from OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >
typedef RcHandle< InternalDataReader< NetworkInterfaceAddress > > InternalDataReader_rch
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from OpenDDS::DCPS::DataLink
typedef ACE_Guard< LockTypeGuardType
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::DataLink
int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
 
void send_start_i ()
 
virtual void send_i (TransportQueueElement *element, bool relink=true)
 
void send_stop_i (GUID_t repoId)
 
GUIDSeqpeer_ids (const GUID_t &local_id) const
 
void network_change () const
 
void replay_durable_data (const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
 
TransportSendStrategy_rch get_send_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Static Protected Member Functions inherited from OpenDDS::DCPS::DataLink
static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods. More...
 
- Protected Attributes inherited from OpenDDS::DCPS::DataLink
TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink. More...
 
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink. More...
 
LockType strategy_lock_
 
OnStartCallbackMap on_start_callbacks_
 
PendingOnStartsMap pending_on_starts_
 
TimeDuration datalink_release_delay_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 
unique_ptr< DataBlockAllocatordb_allocator_
 
bool is_loopback_
 Is remote attached to same transport ? More...
 
bool is_active_
 Is pub or sub ? More...
 
bool started_
 
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control. More...
 
Interceptor interceptor_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 90 of file RtpsUdpDataLink.h.

Member Typedef Documentation

◆ CallbackType

Definition at line 893 of file RtpsUdpDataLink.h.

◆ FragmentNumberValue

Definition at line 339 of file RtpsUdpDataLink.h.

◆ ReaderInfo_rch

Definition at line 387 of file RtpsUdpDataLink.h.

◆ ReaderInfoSetHolder_rch

Definition at line 397 of file RtpsUdpDataLink.h.

◆ RtpsReader_rch

Definition at line 681 of file RtpsUdpDataLink.h.

◆ RtpsWriter_rch

Definition at line 576 of file RtpsUdpDataLink.h.

◆ WriterInfo_rch

Definition at line 613 of file RtpsUdpDataLink.h.

Constructor & Destructor Documentation

◆ RtpsUdpDataLink()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsUdpDataLink ( const RtpsUdpTransport_rch transport,
const GuidPrefix_t local_prefix,
const RtpsUdpInst_rch config,
const ReactorTask_rch reactor_task,
InternalTransportStatistics transport_statistics,
ACE_Thread_Mutex transport_statistics_mutex 
)

Definition at line 67 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::assign(), OpenDDS::DCPS::ENTITYID_PARTICIPANT, handle_registry_, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::job_queue(), job_queue_, local_prefix(), local_prefix_, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::ref(), security_config_, OpenDDS::DCPS::DataLink::send_strategy_, and TheServiceParticipant.

73  : DataLink(transport, // 3 data link "attributes", below, are unused
74  0, // priority
75  false, // is_loopback
76  false) // is_active
77  , reactor_task_(reactor_task)
78  , job_queue_(make_rch<JobQueue>(reactor_task->get_reactor()))
79  , event_dispatcher_(transport->event_dispatcher())
80  , mb_allocator_(TheServiceParticipant->association_chunk_multiplier())
81  , db_allocator_(TheServiceParticipant->association_chunk_multiplier())
82  , custom_allocator_(TheServiceParticipant->association_chunk_multiplier() * config->anticipated_fragments_, RtpsSampleHeader::FRAG_SIZE)
83  , bundle_allocator_(TheServiceParticipant->association_chunk_multiplier(), config->max_message_size_)
84  , db_lock_pool_(new DataBlockLockPool(static_cast<unsigned long>(TheServiceParticipant->n_chunks())))
85  , multi_buff_(this, config->nak_depth_)
86  , fsq_vec_size_(0)
87  , harvest_send_queue_sporadic_(make_rch<SporadicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::harvest_send_queue)))
88  , flush_send_queue_sporadic_(make_rch<SporadicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::flush_send_queue)))
90  , heartbeat_(make_rch<PeriodicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::send_heartbeats)))
91  , heartbeatchecker_(make_rch<PeriodicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::check_heartbeats)))
92  , max_bundle_size_(config->max_message_size_ - RTPS::RTPSHDR_SZ) // default maximum bundled message size is max udp message size (see TransportStrategy) minus RTPS header
93  , transport_statistics_(transport_statistics)
94  , transport_statistics_mutex_(transport_statistics_mutex)
95 #ifdef OPENDDS_SECURITY
99 #endif
100  , network_interface_address_reader_(make_rch<InternalDataReader<NetworkInterfaceAddress> >(true, rchandle_from(this)))
101 {
102 #ifdef OPENDDS_SECURITY
103  const GUID_t guid = make_id(local_prefix, ENTITYID_PARTICIPANT);
104  handle_registry_ = security_config_->get_handle_registry(guid);
105 #endif
106 
107  send_strategy_ = make_rch<RtpsUdpSendStrategy>(this, local_prefix);
108  receive_strategy_ = make_rch<RtpsUdpReceiveStrategy>(this, local_prefix, ref(TheServiceParticipant->get_thread_status_manager()));
109  assign(local_prefix_, local_prefix);
110 
111  this->job_queue(job_queue_);
112 }
const ACE_CDR::UShort RTPSHDR_SZ
Definition: MessageTypes.h:105
EventDispatcher_rch event_dispatcher_
RtpsUdpTransport_rch transport()
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
RcHandle< SporadicEvent > harvest_send_queue_sporadic_
static SecurityRegistry * instance()
Return a singleton instance of this class.
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
const InstanceHandle_t HANDLE_NIL
const GuidPrefix_t & local_prefix() const
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
RcHandle< ICE::Agent > ice_agent_
unique_ptr< DataBlockLockPool > db_lock_pool_
void check_heartbeats(const MonotonicTimePoint &now)
DataLink(const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
Only called by our TransportImpl object.
Definition: DataLink.cpp:42
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > bundle_allocator_
MessageBlockAllocator mb_allocator_
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > custom_allocator_
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
RcHandle< SporadicEvent > flush_send_queue_sporadic_
RcHandle< PeriodicEvent > heartbeatchecker_
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
void send_heartbeats(const MonotonicTimePoint &now)
InternalTransportStatistics & transport_statistics_
RcHandle< JobQueue > job_queue_
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
Security::HandleRegistry_rch handle_registry_
ACE_Thread_Mutex & transport_statistics_mutex_
void harvest_send_queue(const MonotonicTimePoint &now)
Security::SecurityConfig_rch security_config_
static DCPS::RcHandle< Agent > instance()
Definition: Ice.cpp:122
DataBlockAllocator db_allocator_
static const ACE_CDR::UShort FRAG_SIZE
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
#define TheServiceParticipant
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
RtpsUdpInst_rch config() const
RcHandle< PeriodicEvent > heartbeat_
void flush_send_queue(const MonotonicTimePoint &now)

◆ ~RtpsUdpDataLink()

OpenDDS::DCPS::RtpsUdpDataLink::~RtpsUdpDataLink ( )

Definition at line 114 of file RtpsUdpDataLink.cpp.

References flush_send_queue_sporadic_, and harvest_send_queue_sporadic_.

115 {
117  flush_send_queue_sporadic_->cancel();
118 }
RcHandle< SporadicEvent > harvest_send_queue_sporadic_
RcHandle< SporadicEvent > flush_send_queue_sporadic_

Member Function Documentation

◆ accumulate_addresses()

void OpenDDS::DCPS::RtpsUdpDataLink::accumulate_addresses ( const GUID_t local,
const GUID_t remote,
AddrSet &  addresses,
bool  prefer_unicast = false 
) const
private

Definition at line 4677 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), get_ice_endpoint(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUID_t::guidPrefix, ice_agent_, interesting_readers_, interesting_writers_, OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), locator_cache_, locators_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::max_value, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_ASSERT, readers_lock_, and writers_lock_.

Referenced by build_meta_submessage_map(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i(), and get_addresses_i().

4679 {
4680  OPENDDS_ASSERT(local != GUID_UNKNOWN);
4681  OPENDDS_ASSERT(remote != GUID_UNKNOWN);
4682 
4683  const LocatorCacheKey key(remote, local, prefer_unicast);
4684  LocatorCache::ScopedAccess entry(locator_cache_, key);
4685  if (!entry.is_new_) {
4686  addresses.insert(entry.value().addrs_.begin(), entry.value().addrs_.end());
4687  return;
4688  }
4689 
4690  RtpsUdpInst_rch cfg = config();
4691  if (!cfg) {
4692  return;
4693  }
4694 
4695  if (cfg->rtps_relay_only() && std::memcmp(&local.guidPrefix, &remote.guidPrefix, sizeof(GuidPrefix_t)) != 0) {
4696  if (NetworkAddress() != cfg->rtps_relay_address()) {
4697  addresses.insert(cfg->rtps_relay_address());
4698  entry.value().addrs_.insert(cfg->rtps_relay_address());
4699  }
4700  return;
4701  }
4702 
4703  AddrSet normal_addrs;
4704  MonotonicTimePoint normal_addrs_expires = MonotonicTimePoint::max_value;
4705  NetworkAddress ice_addr;
4706  bool valid_last_recv_addr = false;
4707  static const NetworkAddress NO_ADDR;
4708 
4709  const RemoteInfoMap::const_iterator pos = locators_.find(remote);
4710  if (pos != locators_.end()) {
4711  if (prefer_unicast && pos->second.insert_recv_addr(normal_addrs)) {
4712  normal_addrs_expires = pos->second.last_recv_time_ + cfg->receive_address_duration_;
4713  valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= cfg->receive_address_duration_;
4714  } else if (prefer_unicast && !pos->second.unicast_addrs_.empty()) {
4715  normal_addrs = pos->second.unicast_addrs_;
4716  } else if (!pos->second.multicast_addrs_.empty()) {
4717 #ifdef ACE_HAS_IPV6
4718  if (pos->second.last_recv_addr_ != NO_ADDR) {
4719  const AddrSet& mc_addrs = pos->second.multicast_addrs_;
4720  for (AddrSet::const_iterator it = mc_addrs.begin(); it != mc_addrs.end(); ++it) {
4721  if (it->get_type() == pos->second.last_recv_addr_.get_type()) {
4722  normal_addrs.insert(*it);
4723  }
4724  }
4725  } else {
4726  normal_addrs = pos->second.multicast_addrs_;
4727  }
4728 #else
4729  normal_addrs = pos->second.multicast_addrs_;
4730 #endif
4731  } else if (pos->second.insert_recv_addr(normal_addrs)) {
4732  normal_addrs_expires = pos->second.last_recv_time_ + cfg->receive_address_duration_;
4733  valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= cfg->receive_address_duration_;
4734  } else {
4735  normal_addrs = pos->second.unicast_addrs_;
4736  }
4737  } else {
4738  const GuidConverter conv(remote);
4739  if (conv.isReader()) {
4741  InterestingRemoteMapType::const_iterator ipos = interesting_readers_.find(remote);
4742  if (ipos != interesting_readers_.end()) {
4743  normal_addrs = ipos->second.addresses;
4744  }
4745  } else if (conv.isWriter()) {
4747  InterestingRemoteMapType::const_iterator ipos = interesting_writers_.find(remote);
4748  if (ipos != interesting_writers_.end()) {
4749  normal_addrs = ipos->second.addresses;
4750  }
4751  }
4752  }
4753 
4754 #ifdef OPENDDS_SECURITY
4755  WeakRcHandle<ICE::Endpoint> endpoint = get_ice_endpoint();
4756  if (endpoint) {
4757  ice_addr = ice_agent_->get_address(endpoint, local, remote);
4758  }
4759 #endif
4760 
4761  if (ice_addr == NO_ADDR) {
4762  addresses.insert(normal_addrs.begin(), normal_addrs.end());
4763  entry.value().addrs_.insert(normal_addrs.begin(), normal_addrs.end());
4764  entry.value().expires_ = normal_addrs_expires;
4765  const NetworkAddress relay_addr = cfg->rtps_relay_address();
4766  if (!valid_last_recv_addr && relay_addr != NO_ADDR) {
4767  addresses.insert(relay_addr);
4768  entry.value().addrs_.insert(relay_addr);
4769  }
4770  return;
4771  }
4772 
4773  if (normal_addrs.count(ice_addr) == 0) {
4774  addresses.insert(ice_addr);
4775  entry.value().addrs_.insert(ice_addr);
4776  return;
4777  }
4778 
4779  addresses.insert(normal_addrs.begin(), normal_addrs.end());
4780  entry.value().addrs_.insert(normal_addrs.begin(), normal_addrs.end());
4781  entry.value().expires_ = normal_addrs_expires;
4782 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
sequence< octet > key
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
static const TimePoint_T< MonotonicClock > max_value
Definition: TimePoint_T.h:41
RcHandle< ICE::Agent > ice_agent_
InterestingRemoteMapType interesting_writers_
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
InterestingRemoteMapType interesting_readers_
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint() const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ add_delayed_notification()

bool OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification ( TransportQueueElement element)

Definition at line 127 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::TransportQueueElement::publication_id(), writers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification().

128 {
130  RtpsWriter_rch writer;
131  RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
132  if (iter != writers_.end()) {
133  writer = iter->second;
134  }
135 
136  g.release();
137 
138  if (writer) {
139  writer->add_elem_awaiting_ack(element);
140  return true;
141  }
142  return false;
143 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
RcHandle< RtpsWriter > RtpsWriter_rch

◆ alloc_msgblock()

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpDataLink::alloc_msgblock ( size_t  size,
ACE_Allocator data_allocator 
)
private

Definition at line 1035 of file RtpsUdpDataLink.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_NEW_MALLOC_RETURN, db_allocator_, db_lock_pool_, DataBlockLockPool::get_lock(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, and ACE_Time_Value::zero.

Referenced by bundle_and_send_submessages(), and submsgs_to_msgblock().

1035  {
1036  ACE_Message_Block* result;
1037  ACE_NEW_MALLOC_RETURN(result,
1038  static_cast<ACE_Message_Block*>(
1040  ACE_Message_Block(size,
1042  0, // cont
1043  0, // data
1044  data_allocator,
1045  db_lock_pool_->get_lock(), // locking_strategy
1049  &db_allocator_,
1050  &mb_allocator_),
1051  0);
1052  return result;
1053 }
static const ACE_Time_Value max_time
DataBlockLock * get_lock()
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
unique_ptr< DataBlockLockPool > db_lock_pool_
MessageBlockAllocator mb_allocator_
static const ACE_Time_Value zero
DataBlockAllocator db_allocator_
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY

◆ associated()

bool OpenDDS::DCPS::RtpsUdpDataLink::associated ( const GUID_t local,
const GUID_t remote,
bool  local_reliable,
bool  remote_reliable,
bool  local_durable,
bool  remote_durable,
const MonotonicTime_t participant_discovered_at,
ACE_CDR::ULong  participant_flags,
SequenceNumber  max_sn,
const TransportClient_rch client,
AddrSet &  unicast_addresses,
AddrSet &  multicast_addresses,
const NetworkAddress last_addr_hint,
bool  requires_inline_qos 
)

Definition at line 558 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::DataLink::add_on_start_callback(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::add_writer(), OpenDDS::DCPS::TransportSendBuffer::capacity(), OpenDDS::DCPS::GUID_t::entityId, heartbeat_counts_, OpenDDS::DCPS::TransactionalRtpsSendQueue::ignore(), OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), OpenDDS::DCPS::log_progress(), OpenDDS::DCPS::TransportDebug::log_progress, multi_buff_, pending_reliable_readers_, readers_, readers_lock_, readers_of_writer_, receive_strategy(), sq_, OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::DCPS::transport_debug, update_last_recv_addr(), update_locators(), writer_to_seq_best_effort_readers_, writers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::use_datalink().

569 {
570  sq_.ignore(local_id, remote_id);
571 
572  update_locators(remote_id, unicast_addresses, multicast_addresses, requires_inline_qos, true);
573  if (last_addr_hint) {
574  update_last_recv_addr(remote_id, last_addr_hint);
575  }
576 
577  const GuidConverter conv(local_id);
578 
579  if (!local_reliable) {
580  if (conv.isReader()) {
582  WriterToSeqReadersMap::iterator i = writer_to_seq_best_effort_readers_.find(remote_id);
583  if (i == writer_to_seq_best_effort_readers_.end()) {
584  writer_to_seq_best_effort_readers_.insert(WriterToSeqReadersMap::value_type(remote_id, SeqReaders(local_id)));
585  } else if (i->second.readers.find(local_id) == i->second.readers.end()) {
586  i->second.readers.insert(local_id);
587  }
588  }
589  return true;
590  }
591 
592  bool associated = true;
593 
594  if (conv.isWriter()) {
596  log_progress("RTPS writer/reader association", local_id, remote_id, participant_discovered_at);
597  }
598 
599  if (remote_reliable) {
601  // Insert count if not already there.
602  RtpsWriterMap::iterator rw = writers_.find(local_id);
603  if (rw == writers_.end()) {
604  RtpsUdpDataLink_rch link(this, inc_count());
605  CORBA::Long hb_start = 0;
606  CountMapType::iterator hbc_it = heartbeat_counts_.find(local_id.entityId);
607  if (hbc_it != heartbeat_counts_.end()) {
608  hb_start = hbc_it->second;
609  heartbeat_counts_.erase(hbc_it);
610  }
611  RtpsWriter_rch writer = make_rch<RtpsWriter>(client, link, local_id, local_durable,
612  max_sn, hb_start, multi_buff_.capacity());
613  rw = writers_.insert(RtpsWriterMap::value_type(local_id, writer)).first;
614  }
615  RtpsWriter_rch writer = rw->second;
616  g.release();
617  const SequenceNumber writer_max_sn = writer->update_max_sn(remote_id, max_sn);
618  writer->add_reader(make_rch<ReaderInfo>(remote_id, remote_durable, participant_discovered_at, participant_flags, writer_max_sn + 1));
619  }
620  invoke_on_start_callbacks(local_id, remote_id, true);
621  } else if (conv.isReader()) {
623  log_progress("RTPS reader/writer association", local_id, remote_id, participant_discovered_at);
624  }
625  {
626  GuardType guard(strategy_lock_);
627  if (receive_strategy()) {
628  receive_strategy()->clear_completed_fragments(remote_id);
629  }
630  }
631  if (remote_reliable) {
633  RtpsReaderMap::iterator rr = readers_.find(local_id);
634  if (rr == readers_.end()) {
635  pending_reliable_readers_.erase(local_id);
636  RtpsUdpDataLink_rch link(this, inc_count());
637  RtpsReader_rch reader = make_rch<RtpsReader>(link, local_id);
638  rr = readers_.insert(RtpsReaderMap::value_type(local_id, reader)).first;
639  }
640  RtpsReader_rch reader = rr->second;
641  readers_of_writer_.insert(RtpsReaderMultiMap::value_type(remote_id, rr->second));
642  g.release();
643  add_on_start_callback(client, remote_id);
644  reader->add_writer(make_rch<WriterInfo>(remote_id, participant_discovered_at, participant_flags));
645  associated = false;
646  } else {
647  invoke_on_start_callbacks(local_id, remote_id, true);
648  }
649  }
650 
651  return associated;
652 }
ACE_CDR::Long Long
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RtpsReaderMultiMap readers_of_writer_
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
bool associated(const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void update_locators(const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
RtpsUdpReceiveStrategy_rch receive_strategy()
RcHandle< RtpsReader > RtpsReader_rch
bool add_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
Definition: DataLink.cpp:111
RcHandle< RtpsWriter > RtpsWriter_rch
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
TransactionalRtpsSendQueue sq_
void ignore(const GUID_t &local, const GUID_t &remote)
Mark all queued submessage with the given source and destination as ignored.
bool log_progress
Log progress for RTPS entity discovery and association.
bool requires_inline_qos(const GUIDSeq_var &peers)

◆ build_meta_submessage_map()

void OpenDDS::DCPS::RtpsUdpDataLink::build_meta_submessage_map ( MetaSubmessageVec &  meta_submessages,
AddrDestMetaSubmessageMap &  addr_map 
)
private

Definition at line 2409 of file RtpsUdpDataLink.cpp.

References accumulate_addresses(), ACE_GUARD, bundling_cache_, get_addresses_i(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUIDPREFIX_UNKNOWN, DDS::HANDLE_NIL, LM_DEBUG, local_crypto_handle(), locators_lock_, OpenDDS::DCPS::make_unknown_guid(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), separate_message(), and VDBG.

Referenced by bundle_and_send_submessages().

2410 {
2411  size_t cache_hits = 0;
2412  size_t cache_misses = 0;
2413  size_t addrset_min_size = std::numeric_limits<size_t>::max();
2414  size_t addrset_max_size = 0;
2415 
2416  BundlingCache::ScopedAccess global_access(bundling_cache_);
2418 
2419  // Sort meta_submessages by address set and destination
2420  for (MetaSubmessageVec::iterator it = meta_submessages.begin(), limit = meta_submessages.end(); it != limit; ++it) {
2421  if (it->ignore_) {
2422  continue;
2423  }
2424 
2425  const BundlingCacheKey key(it->src_guid_, it->dst_guid_);
2426  BundlingCache::ScopedAccess entry(bundling_cache_, key, false, now);
2427  if (entry.is_new_) {
2428 
2429  AddrSet& addrs = entry.value().addrs_;
2431 
2432  const bool directed = it->dst_guid_ != GUID_UNKNOWN;
2433  if (directed) {
2434  accumulate_addresses(it->src_guid_, it->dst_guid_, addrs, true);
2435  } else {
2436  addrs = get_addresses_i(it->src_guid_);
2437  }
2438 #ifdef OPENDDS_SECURITY
2439  if (local_crypto_handle() != DDS::HANDLE_NIL && separate_message(it->src_guid_.entityId)) {
2440  addrs.insert(BUNDLING_PLACEHOLDER); // removed in bundle_mapped_meta_submessages
2441  }
2442 #endif
2443 #if defined ACE_HAS_CPP11
2444  entry.recalculate_hash();
2445 #endif
2446  ++cache_misses;
2447  } else {
2448  ++cache_hits;
2449  }
2450 
2451  const BundlingCache::ScopedAccess& const_entry = entry;
2452  const AddrSet& addrs = const_entry.value().addrs_;
2453  addrset_min_size = std::min(addrset_min_size, static_cast<size_t>(addrs.size()));
2454  addrset_max_size = std::max(addrset_max_size, static_cast<size_t>(addrs.size()));
2455  if (addrs.empty()) {
2456  continue;
2457 #ifdef OPENDDS_SECURITY
2458  } else if (addrs.size() == 1 && *addrs.begin() == BUNDLING_PLACEHOLDER) {
2459  continue;
2460 #endif
2461  }
2462 
2463  DestMetaSubmessageMap& dest_map = addr_map[AddressCacheEntryProxy(const_entry.rch_)];
2464  if (std::memcmp(&(it->dst_guid_.guidPrefix), &GUIDPREFIX_UNKNOWN, sizeof(GuidPrefix_t)) != 0) {
2465  MetaSubmessageIterVec& vec = dest_map[make_unknown_guid(it->dst_guid_.guidPrefix)];
2466  vec.reserve(meta_submessages.size());
2467  vec.push_back(it);
2468  } else {
2469  MetaSubmessageIterVec& vec = dest_map[GUID_UNKNOWN];
2470  vec.reserve(meta_submessages.size());
2471  vec.push_back(it);
2472  }
2473  }
2474 
2475  VDBG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::build_meta_submessage_map()"
2476  "- Bundling Cache Stats: hits = %B, misses = %B, min = %B, max = %B\n",
2477  cache_hits, cache_misses, addrset_min_size, addrset_max_size));
2478 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
sequence< octet > key
void accumulate_addresses(const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
const InstanceHandle_t HANDLE_NIL
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
static bool separate_message(EntityId_t entity)
#define VDBG(DBG_ARGS)
OpenDDS_Dcps_Export GUID_t make_unknown_guid(const GuidPrefix_t &prefix)
Definition: GuidUtils.h:228
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
AddrSet get_addresses_i(const GUID_t &local, const GUID_t &remote) const

◆ bundle_and_send_submessages()

void OpenDDS::DCPS::RtpsUdpDataLink::bundle_and_send_submessages ( MetaSubmessageVec &  meta_submessages)
private

Definition at line 2757 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, alloc_msgblock(), OpenDDS::RTPS::append_submessage(), OpenDDS::DCPS::assign(), OpenDDS::RTPS::SequenceNumberSet::bitmapBase, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, build_meta_submessage_map(), bundle_allocator_, bundle_mapped_meta_submessages(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::RTPS::AckNackSubmessage::count, OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::MetaSubmessage::dst_guid_, OpenDDS::STUN::encoding(), OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::RTPS::HEARTBEAT, OpenDDS::DCPS::RtpsUdpDataLink::CountKeeper::heartbeat_counts_, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::RtpsUdpDataLink::CountMapPair::is_new_assigned_, OpenDDS::DCPS::Encoding::KIND_XCDR1, OpenDDS::RTPS::HeartBeatSubmessage::lastSN, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_messages, OpenDDS::DCPS::TransportDebug::log_nonfinal_messages, OpenDDS::DCPS::make_unknown_guid(), OpenDDS::DCPS::RtpsUdpDataLink::CountMapping::map_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, OpenDDS::DCPS::RtpsUdpDataLink::CountKeeper::nackfrag_counts_, OpenDDS::DCPS::RtpsUdpDataLink::CountMapPair::new_, OpenDDS::DCPS::RtpsUdpDataLink::CountMapping::next_directed_unassigned_, OpenDDS::DCPS::RtpsUdpDataLink::CountMapping::next_undirected_unassigned_, OpenDDS::RTPS::SequenceNumberSet::numBits, OpenDDS::RTPS::FragmentNumberSet::numBits, OPENDDS_ASSERT, OpenDDS::RTPS::OPENDDS_FLAG_R, OpenDDS::DCPS::push_back(), OpenDDS::RTPS::NackFragSubmessage::readerId, OpenDDS::RTPS::AckNackSubmessage::readerSNState, send_strategy(), size_, OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::AckNackSubmessage::smHeader, OpenDDS::RTPS::HeartBeatSubmessage::smHeader, OpenDDS::DCPS::MetaSubmessage::src_guid_, OpenDDS::RTPS::Message::submessages, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::RtpsUdpDataLink::CountMapPair::undirected_, update_required_acknack_count(), OpenDDS::RTPS::Count_t::value, OpenDDS::RTPS::FragmentNumber_t::value, OpenDDS::RTPS::HeartBeatSubmessage::writerId, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

Referenced by flush_send_queue_i().

2758 {
2759  using namespace RTPS;
2760 
2761  // Sort meta_submessages based on both locator IPs and INFO_DST GUID destination/s
2762  AddrDestMetaSubmessageMap addr_map;
2763  build_meta_submessage_map(meta_submessages, addr_map);
2764 
2765  const Encoding encoding(Encoding::KIND_XCDR1);
2766 
2767  // Build reasonably-sized submessage bundles based on our destination map
2768  BundleVec bundles;
2769  bundles.reserve(meta_submessages.size());
2770 
2771  CountKeeper counts;
2772  bundle_mapped_meta_submessages(encoding, addr_map, bundles, counts);
2773 
2774  // Reusable INFO_DST
2775  InfoDestinationSubmessage idst = {
2777  {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
2778  };
2779 
2780  for (IdCountMapping::iterator it = counts.heartbeat_counts_.begin(), limit = counts.heartbeat_counts_.end(); it != limit; ++it) {
2781  it->second.next_directed_unassigned_ = it->second.map_.begin();
2782  it->second.next_undirected_unassigned_ = it->second.map_.begin();
2783  for (CountMap::iterator it2 = it->second.map_.begin(), limit2 = it->second.map_.end(); it2 != limit2; ++it2) {
2784  if (it2->second.undirected_) {
2785  ++(it->second.next_directed_unassigned_);
2786  }
2787  }
2788  }
2789 
2790  // Allocate buffers, seralize, and send bundles
2791  GUID_t prev_dst; // used to determine when we need to write a new info_dst
2792  for (size_t i = 0; i < bundles.size(); ++i) {
2793  RTPS::Message rtps_message;
2794  prev_dst = GUID_UNKNOWN;
2795  Message_Block_Ptr mb_bundle(alloc_msgblock(bundles[i].size_, &bundle_allocator_));
2796  Serializer ser(mb_bundle.get(), encoding);
2797  const MetaSubmessageIterVec& bundle_vec = bundles[i].submessages_;
2798  for (MetaSubmessageIterVec::const_iterator it = bundle_vec.begin(), limit = bundle_vec.end(); it != limit; ++it) {
2799  MetaSubmessage& res = **it;
2800  const GUID_t dst = make_unknown_guid(res.dst_guid_);
2801  if (dst != prev_dst) {
2802  assign(idst.guidPrefix, dst.guidPrefix);
2803  ser << idst;
2805  append_submessage(rtps_message, idst);
2806  }
2807  }
2808  switch (res.sm_._d()) {
2809  case HEARTBEAT: {
2810  CountMapping& mapping = counts.heartbeat_counts_[res.sm_.heartbeat_sm().writerId];
2811  CountMapPair& map_pair = mapping.map_[res.sm_.heartbeat_sm().count.value];
2812  if (!map_pair.is_new_assigned_) {
2813  if (map_pair.undirected_) {
2814  OPENDDS_ASSERT(mapping.next_undirected_unassigned_ != mapping.map_.end());
2815  map_pair.new_ = mapping.next_undirected_unassigned_->first;
2816  ++mapping.next_undirected_unassigned_;
2817  } else {
2818  OPENDDS_ASSERT(mapping.next_directed_unassigned_ != mapping.map_.end());
2819  map_pair.new_ = mapping.next_directed_unassigned_->first;
2820  ++mapping.next_directed_unassigned_;
2821  if (res.sm_.heartbeat_sm().smHeader.flags & RTPS::OPENDDS_FLAG_R) {
2822  if (res.sm_.heartbeat_sm().count.value != map_pair.new_) {
2823  update_required_acknack_count(res.src_guid_, res.dst_guid_, map_pair.new_);
2824  }
2825  res.sm_.heartbeat_sm().smHeader.flags &= ~RTPS::OPENDDS_FLAG_R;
2826  }
2827  }
2828  map_pair.is_new_assigned_ = true;
2829  }
2830  res.sm_.heartbeat_sm().count.value = map_pair.new_;
2831  const HeartBeatSubmessage& heartbeat = res.sm_.heartbeat_sm();
2833  const SequenceNumber hb_first = to_opendds_seqnum(heartbeat.firstSN);
2834  const SequenceNumber hb_last = to_opendds_seqnum(heartbeat.lastSN);
2835  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: HEARTBEAT: %C -> %C first %q last %q count %d\n",
2836  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
2837  }
2838  break;
2839  }
2840  case ACKNACK: {
2841  const AckNackSubmessage& acknack = res.sm_.acknack_sm();
2843  const SequenceNumber ack = to_opendds_seqnum(acknack.readerSNState.bitmapBase);
2844  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: ACKNACK: %C -> %C base %q bits %u count %d\n",
2845  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
2846  }
2847  break;
2848  }
2849  case NACK_FRAG: {
2850  CountSet& set = counts.nackfrag_counts_[res.sm_.nack_frag_sm().readerId];
2851  OPENDDS_ASSERT(!set.empty());
2852  res.sm_.nack_frag_sm().count.value = *set.begin();
2853  set.erase(set.begin());
2854  const NackFragSubmessage& nackfrag = res.sm_.nack_frag_sm();
2855  // All NackFrag messages are technically 'non-final' since they are only used to negatively acknowledge fragments and expect a response
2857  const SequenceNumber seq = to_opendds_seqnum(nackfrag.writerSN);
2858  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: NACKFRAG: %C -> %C seq %q base %u bits %u\n",
2859  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), seq.getValue(), nackfrag.fragmentNumberState.bitmapBase.value, nackfrag.fragmentNumberState.numBits));
2860  }
2861  break;
2862  }
2863  default: {
2864  break;
2865  }
2866  }
2867  ser << res.sm_;
2869  push_back(rtps_message.submessages, res.sm_);
2870  }
2871  prev_dst = dst;
2872  }
2874  if (ss) {
2875  ss->send_rtps_control(rtps_message, *(mb_bundle.get()), bundles[i].proxy_.addrs());
2876  }
2877  }
2878 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const octet FLAG_E
Definition: RtpsCore.idl:518
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void update_required_acknack_count(const GUID_t &local_id, const GUID_t &remote_id, CORBA::Long current)
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
RcHandle< RtpsUdpSendStrategy > RtpsUdpSendStrategy_rch
void append_submessage(RTPS::Message &message, const RTPS::InfoDestinationSubmessage &submessage)
Definition: MessageUtils.h:147
SequenceNumberSet readerSNState
Definition: RtpsCore.idl:563
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > bundle_allocator_
void bundle_mapped_meta_submessages(const Encoding &encoding, AddrDestMetaSubmessageMap &addr_map, BundleVec &bundles, CountKeeper &counts)
ACE_Message_Block * alloc_msgblock(size_t size, ACE_Allocator *data_allocator)
OpenDDS_Dcps_Export GUID_t make_unknown_guid(const GuidPrefix_t &prefix)
Definition: GuidUtils.h:228
RtpsUdpSendStrategy_rch send_strategy()
size_t size_
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
const octet FLAG_F
Definition: RtpsCore.idl:520
FragmentNumberSet fragmentNumberState
Definition: RtpsCore.idl:594
void build_meta_submessage_map(MetaSubmessageVec &meta_submessages, AddrDestMetaSubmessageMap &addr_map)
bool log_messages
Log all RTPS messages sent or recieved.
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
const octet OPENDDS_FLAG_R
Definition: RtpsCore.idl:526
const ACE_CDR::UShort INFO_DST_SZ
Definition: MessageTypes.h:108

◆ bundle_mapped_meta_submessages()

void OpenDDS::DCPS::RtpsUdpDataLink::bundle_mapped_meta_submessages ( const Encoding encoding,
AddrDestMetaSubmessageMap &  addr_map,
BundleVec &  bundles,
CountKeeper counts 
)
private

Definition at line 2561 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, config(), OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::MetaSubmessage::dst_guid_, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::RTPS::Submessage::gap_sm, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::HEARTBEAT, OpenDDS::DCPS::RtpsUdpDataLink::CountKeeper::heartbeat_counts_, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, max_bundle_size_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, OpenDDS::DCPS::RtpsUdpDataLink::CountKeeper::nackfrag_counts_, OPENDDS_ASSERT, OpenDDS::RTPS::NackFragSubmessage::readerId, OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::DCPS::RtpsUdpDataLink::CountMapPair::undirected_, OpenDDS::RTPS::Count_t::value, and OpenDDS::RTPS::HeartBeatSubmessage::writerId.

Referenced by bundle_and_send_submessages().

2565 {
2566  using namespace RTPS;
2567 
2568  // Reusable INFO_DST
2569  InfoDestinationSubmessage idst = {
2571  {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
2572  };
2573 
2574  RtpsUdpInst_rch cfg = config();
2575 
2576  const bool new_bundle_per_dest_guid = cfg && cfg->rtps_relay_only();
2577 
2578  BundleHelper helper(encoding, max_bundle_size_, bundles);
2579  GUID_t prev_dst; // used to determine when we need to write a new info_dst
2580  for (AddrDestMetaSubmessageMap::iterator addr_it = addr_map.begin(), limit = addr_map.end(); addr_it != limit; ++addr_it) {
2581 
2582  // A new address set always starts a new bundle
2583  bundles.push_back(Bundle(addr_it->first));
2584 
2585  prev_dst = GUID_UNKNOWN;
2586 
2587  for (DestMetaSubmessageMap::iterator dest_it = addr_it->second.begin(), limit2 = addr_it->second.end(); dest_it != limit2; ++dest_it) {
2588 
2589  if (dest_it->second.empty()) {
2590  continue;
2591  }
2592 
2593  // Check to see if we're sending separate messages per destination guid
2594  if (new_bundle_per_dest_guid && bundles.back().submessages_.size()) {
2595  helper.end_bundle();
2596  bundles.push_back(Bundle(addr_it->first));
2597  prev_dst = GUID_UNKNOWN;
2598  }
2599 
2600  for (MetaSubmessageIterVec::iterator resp_it = dest_it->second.begin(), limit3 = dest_it->second.end(); resp_it != limit3; ++resp_it) {
2601 
2602  // Check before every meta_submessage to see if we need to prefix a INFO_DST
2603  if (dest_it->first != prev_dst) {
2604  // If adding an INFO_DST prefix bumped us over the limit, push the
2605  // size difference into the next bundle, reset prev_dst, and keep going
2606  if (!helper.add_to_bundle(idst)) {
2607  bundles.push_back(Bundle(addr_it->first));
2608  }
2609  }
2610 
2611  // Attempt to add the submessage meta_submessage to the bundle
2612  bool result = false, unique = false;
2613  ACE_UNUSED_ARG(unique);
2614  MetaSubmessage& res = **resp_it;
2615  switch (res.sm_._d()) {
2616  case HEARTBEAT: {
2617  const EntityId_t id = res.sm_.heartbeat_sm().writerId;
2618  result = helper.add_to_bundle(res.sm_.heartbeat_sm());
2619  CountMapPair& map_pair = counts.heartbeat_counts_[id].map_[res.sm_.heartbeat_sm().count.value];
2620  if (res.dst_guid_ == GUID_UNKNOWN) {
2621  map_pair.undirected_ = true;
2622  }
2623  break;
2624  }
2625  case ACKNACK: {
2626  result = helper.add_to_bundle(res.sm_.acknack_sm());
2627  break;
2628  }
2629  case GAP: {
2630  result = helper.add_to_bundle(res.sm_.gap_sm());
2631  break;
2632  }
2633  case NACK_FRAG: {
2634  const EntityId_t id = res.sm_.nack_frag_sm().readerId;
2635  unique = counts.nackfrag_counts_[id].insert(res.sm_.nack_frag_sm().count.value).second;
2636  OPENDDS_ASSERT(unique);
2637  result = helper.add_to_bundle(res.sm_.nack_frag_sm());
2638  break;
2639  }
2640  default: {
2641  break;
2642  }
2643  }
2644  prev_dst = dest_it->first;
2645 
2646  // If adding the submessage bumped us over the limit, push the size
2647  // difference into the next bundle, reset prev_dst, and keep going
2648  if (!result) {
2649  bundles.push_back(Bundle(addr_it->first));
2650  prev_dst = GUID_UNKNOWN;
2651  }
2652  bundles.back().submessages_.push_back(*resp_it);
2653  }
2654  }
2655  helper.end_bundle();
2656  }
2657 }
const octet FLAG_E
Definition: RtpsCore.idl:518
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
DataLinkIdType id() const
Obtain a unique identifier for this DataLink object.
Definition: DataLink.inl:205
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
const ACE_CDR::UShort INFO_DST_SZ
Definition: MessageTypes.h:108
RtpsUdpInst_rch config() const

◆ check_heartbeats()

void OpenDDS::DCPS::RtpsUdpDataLink::check_heartbeats ( const MonotonicTimePoint now)
private

Definition at line 4326 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, interesting_writers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, OPENDDS_VECTOR(), readers_lock_, and OpenDDS::DCPS::DiscoveryListener::writer_does_not_exist().

4327 {
4328  OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks;
4329 
4330  RtpsUdpInst_rch cfg = config();
4331 
4332  // Have any interesting writers timed out?
4333  const MonotonicTimePoint tv(now - 10 * (cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC)));
4334  {
4336 
4337  for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
4338  pos != limit;
4339  ++pos) {
4340  if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
4341  CallbackType callback(pos->first, pos->second);
4342  writerDoesNotExistCallbacks.push_back(callback);
4343  pos->second.status = InterestingRemote::DOES_NOT_EXIST;
4344  }
4345  }
4346  }
4347 
4348  OPENDDS_VECTOR(CallbackType)::iterator iter;
4349  for (iter = writerDoesNotExistCallbacks.begin(); iter != writerDoesNotExistCallbacks.end(); ++iter) {
4350  const GUID_t& rid = iter->first;
4351  const InterestingRemote& remote = iter->second;
4352  remote.listener->writer_does_not_exist(rid, remote.localid);
4353  }
4354 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
std::pair< GUID_t, InterestingRemote > CallbackType
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
InterestingRemoteMapType interesting_writers_
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ client_stop()

void OpenDDS::DCPS::RtpsUdpDataLink::client_stop ( const GUID_t localId)

Definition at line 752 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::TransactionalRtpsSendQueue::ignore_local(), OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::pre_stop_helper(), readers_, readers_lock_, readers_of_writer_, OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs(), OpenDDS::DCPS::DataLink::send_strategy_, sq_, OpenDDS::DCPS::DataLink::strategy_lock_, writer_to_seq_best_effort_readers_, writers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::client_stop().

753 {
754  const GuidConverter conv(localId);
755 
756  if (conv.isReader()) {
758  RtpsReaderMap::iterator rr = readers_.find(localId);
759  if (rr != readers_.end()) {
760  for (RtpsReaderMultiMap::iterator iter = readers_of_writer_.begin();
761  iter != readers_of_writer_.end();) {
762  if (iter->second->id() == localId) {
763  readers_of_writer_.erase(iter++);
764  } else {
765  ++iter;
766  }
767  }
768 
769  RtpsReader_rch reader = rr->second;
770  readers_.erase(rr);
771  gr.release();
772 
773  reader->pre_stop_helper();
774 
775  } else {
776  for (WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.begin();
778  RepoIdSet::iterator r = w->second.readers.find(localId);
779  if (r != w->second.readers.end()) {
780  w->second.readers.erase(r);
781  if (w->second.readers.empty()) {
783  continue;
784  }
785  }
786  ++w;
787  }
788  }
789 
790  } else {
791  RtpsWriter_rch writer;
792  {
793  // Don't hold the writers lock when destroying a writer.
795  RtpsWriterMap::iterator pos = writers_.find(localId);
796  if (pos != writers_.end()) {
797  writer = pos->second;
798  writers_.erase(pos);
799  }
800  }
801 
802  if (writer) {
803  TqeVector to_drop;
804  writer->pre_stop_helper(to_drop, true);
805 
806  TqeVector::iterator drop_it = to_drop.begin();
807  while (drop_it != to_drop.end()) {
808  (*drop_it)->data_dropped(true);
809  ++drop_it;
810  }
811  writer->remove_all_msgs();
812  } else {
813  GuardType guard(strategy_lock_);
814  if (send_strategy_) {
816  }
817  }
818  }
819  sq_.ignore_local(localId);
820 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RtpsReaderMultiMap readers_of_writer_
void ignore_local(const GUID_t &id)
Mark all queued submessage with the given source (src_guid_) as ignored.
RcHandle< RtpsReader > RtpsReader_rch
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
RcHandle< RtpsWriter > RtpsWriter_rch
TransactionalRtpsSendQueue sq_

◆ config()

RtpsUdpInst_rch OpenDDS::DCPS::RtpsUdpDataLink::config ( ) const

◆ customize_queue_element()

TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element ( TransportQueueElement element)
privatevirtual

Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 1305 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, customize_queue_element_non_reliable_i(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::DataLink::peer_ids(), OpenDDS::DCPS::TransportQueueElement::publication_id(), queue_submessages(), requires_inline_qos(), writers_, and writers_lock_.

1306 {
1307  const ACE_Message_Block* msg = element->msg();
1308  if (!msg) {
1309  return element;
1310  }
1311 
1312  const GUID_t pub_id = element->publication_id();
1313  GUIDSeq_var peers = peer_ids(pub_id);
1314 
1315  bool require_iq = requires_inline_qos(peers);
1316 
1318 
1319  const RtpsWriterMap::iterator rw = writers_.find(pub_id);
1320  MetaSubmessageVec meta_submessages;
1321  RtpsWriter_rch writer;
1322  TransportQueueElement* result;
1323  bool deliver_after_send = false;
1324  if (rw != writers_.end()) {
1325  writer = rw->second;
1326  guard.release();
1327  result = writer->customize_queue_element_helper(element, require_iq, meta_submessages, deliver_after_send);
1328  } else {
1329  guard.release();
1330  result = customize_queue_element_non_reliable_i(element, require_iq, meta_submessages, deliver_after_send, guard);
1331  }
1332 
1333  queue_submessages(meta_submessages);
1334 
1335  if (deliver_after_send) {
1336  element->data_delivered();
1337  }
1338 
1339  return result;
1340 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void queue_submessages(MetaSubmessageVec &meta_submessages)
RcHandle< RtpsWriter > RtpsWriter_rch
TransportQueueElement * customize_queue_element_non_reliable_i(TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send, ACE_Guard< ACE_Thread_Mutex > &guard)
GUIDSeq * peer_ids(const GUID_t &local_id) const
Definition: DataLink.cpp:490
bool requires_inline_qos(const GUIDSeq_var &peers)

◆ customize_queue_element_non_reliable_i()

TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element_non_reliable_i ( TransportQueueElement element,
bool  requires_inline_qos,
MetaSubmessageVec &  meta_submessages,
bool &  deliver_after_send,
ACE_Guard< ACE_Thread_Mutex > &  guard 
)
private

Definition at line 1227 of file RtpsUdpDataLink.cpp.

References ACE_Message_Block::cont(), OpenDDS::DCPS::RtpsSampleHeader::control_message_supported(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, ACE_Message_Block::duplicate(), OpenDDS::DCPS::TransportSendControlElement::header(), OpenDDS::DCPS::TransportDebug::log_messages, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::TransportCustomizedElement::original_send_element(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::TransportQueueElement::publication_id(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::TransportSendElement::sample(), send_heartbeats_manual_i(), send_strategy(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::DataLink::strategy_lock_, submsgs_to_msgblock(), and OpenDDS::DCPS::transport_debug.

Referenced by customize_queue_element().

1233 {
1234  RTPS::SubmessageSeq subm;
1235 
1236  TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
1237  TransportCustomizedElement* tce =
1238  dynamic_cast<TransportCustomizedElement*>(element);
1239  TransportSendControlElement* tsce =
1240  dynamic_cast<TransportSendControlElement*>(element);
1241 
1242  Message_Block_Ptr data;
1243 
1244  const ACE_Message_Block* msg = element->msg();
1245 
1246  // Based on the type of 'element', find and duplicate the data payload
1247  // continuation block.
1248  if (tsce) { // Control message
1249  if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
1250  data.reset(msg->cont()->duplicate());
1251  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1253  subm, *tsce, requires_inline_qos);
1254  } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
1255  send_heartbeats_manual_i(tsce, meta_submessages);
1256  deliver_after_send = true;
1257  return 0;
1258  } else {
1259  guard.release();
1260  element->data_dropped(true /*dropped_by_transport*/);
1261  return 0;
1262  }
1263 
1264  } else if (tse) { // Basic data message
1265  // {DataSampleHeader} -> {Data Payload}
1266  data.reset(msg->cont()->duplicate());
1267  const DataSampleElement* dsle = tse->sample();
1268  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1270  subm, *dsle, requires_inline_qos);
1271 
1272  } else if (tce) { // Customized data message
1273  // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
1274  data.reset(msg->cont()->cont()->duplicate());
1275  const DataSampleElement* dsle = tce->original_send_element()->sample();
1276  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1278  subm, *dsle, requires_inline_qos);
1279 
1280  } else {
1281  return element;
1282  }
1283 
1284 #ifdef OPENDDS_SECURITY
1285  const GUID_t pub_id = element->publication_id();
1286 
1287  {
1288  GuardType guard(strategy_lock_);
1289  if (send_strategy_) {
1290  send_strategy()->encode_payload(pub_id, data, subm);
1291  }
1292  }
1293 #endif
1294 
1296  send_strategy()->append_submessages(subm);
1297  }
1298 
1300  hdr->cont(data.release());
1301  return new RtpsCustomizedElement(element, move(hdr));
1302 }
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
sequence< Submessage > SubmessageSeq
Definition: RtpsCore.idl:879
void reset(void)
static void populate_data_sample_submessages(RTPS::SubmessageSeq &subm, const DataSampleElement &dsle, bool requires_inline_qos)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
int release(void)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
RtpsUdpSendStrategy_rch send_strategy()
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
static bool control_message_supported(char message_id)
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
bool log_messages
Log all RTPS messages sent or recieved.
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
void send_heartbeats_manual_i(const TransportSendControlElement *tsce, MetaSubmessageVec &meta_submessages)
static void populate_data_control_submessages(RTPS::SubmessageSeq &subm, const TransportSendControlElement &tsce, bool requires_inline_qos)
ACE_Message_Block * submsgs_to_msgblock(const RTPS::SubmessageSeq &subm)
bool requires_inline_qos(const GUIDSeq_var &peers)

◆ datareader_dispatch()

template<typename T , typename FN >
void OpenDDS::DCPS::RtpsUdpDataLink::datareader_dispatch ( const T &  submessage,
const GuidPrefix_t src_prefix,
bool  directed,
const FN &  func 
)
inlineprivate

Definition at line 816 of file RtpsUdpDataLink.h.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, func(), LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::make_id(), OpenDDS::ICE::OPENDDS_VECTOR(), and OpenDDS::DCPS::transport_debug.

Referenced by received().

820  {
821  const GUID_t local = make_id(local_prefix_, submessage.readerId);
822  const GUID_t src = make_id(src_prefix, submessage.writerId);
823 
825  {
827  if (local.entityId == ENTITYID_UNKNOWN) {
828  typedef std::pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> RRMM_IterRange;
829  for (RRMM_IterRange iters = readers_of_writer_.equal_range(src); iters.first != iters.second; ++iters.first) {
830  to_call.push_back(iters.first->second);
831  }
832  if (to_call.empty()) {
834  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datawreader_dispatch - %C -> X no local readers\n", LogGuid(src).c_str()));
835  }
836  return;
837  }
838  } else {
839  const RtpsReaderMap::iterator rr = readers_.find(local);
840  if (rr == readers_.end()) {
842  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datareader_dispatch - %C -> %C unknown local reader\n", LogGuid(src).c_str(), LogGuid(local).c_str()));
843  }
844  return;
845  }
846  to_call.push_back(rr->second);
847  }
848  }
849  MetaSubmessageVec meta_submessages;
850  for (OPENDDS_VECTOR(RtpsReader_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
851  RtpsReader& reader = **it;
852  (reader.*func)(submessage, src, directed, meta_submessages);
853  }
854  queue_submessages(meta_submessages);
855  }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
RtpsReaderMultiMap readers_of_writer_
bool log_dropped_messages
Log received RTPS messages that were dropped.
static std::string func(const std::string &, AST_Decl *, const std::string &, AST_Type *br_type, const std::string &, bool, Intro &, const std::string &)
void queue_submessages(MetaSubmessageVec &meta_submessages)
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
RcHandle< RtpsReader > RtpsReader_rch
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ datawriter_dispatch()

template<typename T , typename FN >
void OpenDDS::DCPS::RtpsUdpDataLink::datawriter_dispatch ( const T &  submessage,
const GuidPrefix_t src_prefix,
const FN &  func 
)
inlineprivate

Definition at line 789 of file RtpsUdpDataLink.h.

References ACE_DEBUG, ACE_GUARD, func(), LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::make_id(), OpenDDS::ICE::OPENDDS_VECTOR(), and OpenDDS::DCPS::transport_debug.

Referenced by received().

791  {
792  const GUID_t local = make_id(local_prefix_, submessage.writerId);
793  const GUID_t src = make_id(src_prefix, submessage.readerId);
794 
796  {
798  const RtpsWriterMap::iterator rw = writers_.find(local);
799  if (rw == writers_.end()) {
801  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datawriter_dispatch - %C -> %C unknown local writer\n", LogGuid(local).c_str(), LogGuid(src).c_str()));
802  }
803  return;
804  }
805  to_call.push_back(rw->second);
806  }
807  MetaSubmessageVec meta_submessages;
808  for (OPENDDS_VECTOR(RtpsWriter_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
809  RtpsWriter& writer = **it;
810  (writer.*func)(submessage, src, meta_submessages);
811  }
812  queue_submessages(meta_submessages);
813  }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
bool log_dropped_messages
Log received RTPS messages that were dropped.
static std::string func(const std::string &, AST_Decl *, const std::string &, AST_Type *br_type, const std::string &, bool, Intro &, const std::string &)
void queue_submessages(MetaSubmessageVec &meta_submessages)
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
RcHandle< RtpsWriter > RtpsWriter_rch
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ disable_response_queue()

void OpenDDS::DCPS::RtpsUdpDataLink::disable_response_queue ( bool  send_immediately)

Definition at line 2693 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransactionalRtpsSendQueue::end_transaction(), flush_send_queue_i(), flush_send_queue_sporadic_, fsq_mutex_, fsq_vec_size_, sq_, and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::end_transport_header_processing(), and harvest_send_queue().

2694 {
2695  MetaSubmessageVec vec;
2697  sq_.end_transaction(vec);
2698  if (!vec.empty()) {
2699  if (fsq_vec_.size() == fsq_vec_size_) {
2700  fsq_vec_.resize(fsq_vec_.size() + 1);
2701  }
2702  fsq_vec_[fsq_vec_size_++].swap(vec);
2703  }
2704 
2705  if (fsq_vec_size_) {
2706  if (send_immediately) {
2708  } else {
2710  }
2711  }
2712 
2713 }
static const TimeDuration zero_value
Definition: TimeDuration.h:31
RcHandle< SporadicEvent > flush_send_queue_sporadic_
TransactionalRtpsSendQueue sq_

◆ disassociated()

void OpenDDS::DCPS::RtpsUdpDataLink::disassociated ( const GUID_t local,
const GUID_t remote 
)

Definition at line 655 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::TransactionalRtpsSendQueue::ignore_remote(), LM_DEBUG, locators_, locators_lock_, OPENDDS_ASSERT, release_reservations_i(), remove_locator_and_bundling_cache(), sq_, and OpenDDS::DCPS::Transport_debug_level.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::stop_accepting_or_connecting().

657 {
658  release_reservations_i(remote_id, local_id);
660  sq_.ignore_remote(remote_id);
661 
663 
664  RemoteInfoMap::iterator pos = locators_.find(remote_id);
665  if (pos != locators_.end()) {
666  OPENDDS_ASSERT(pos->second.ref_count_ > 0);
667 
668  --pos->second.ref_count_;
669  if (pos->second.ref_count_ == 0) {
670  locators_.erase(pos);
671  }
672  } else if (Transport_debug_level > 3) {
673  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::disassociated: local id %C does not have any locators\n", LogGuid(local_id).c_str()));
674  }
675 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
virtual void release_reservations_i(const GUID_t &remote_id, const GUID_t &local_id)
void remove_locator_and_bundling_cache(const GUID_t &remote_id)
void ignore_remote(const GUID_t &id)
Mark all queued submessage with the given destination (dst_guid_) as ignored.
TransactionalRtpsSendQueue sq_

◆ durability_resend() [1/2]

void OpenDDS::DCPS::RtpsUdpDataLink::durability_resend ( TransportQueueElement element,
size_t &  cumulative_send_count 
)
private

Definition at line 4014 of file RtpsUdpDataLink.cpp.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i().

4016 {
4017  static CORBA::Long buffer[8];
4018  static const RTPS::FragmentNumberSet none = { {0}, 0, RTPS::LongSeq8(0, buffer) };
4019  durability_resend(element, none, cumulative_send_count);
4020 }
ACE_CDR::Long Long
void durability_resend(TransportQueueElement *element, size_t &cumulative_send_count)
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69

◆ durability_resend() [2/2]

void OpenDDS::DCPS::RtpsUdpDataLink::durability_resend ( TransportQueueElement element,
const RTPS::FragmentNumberSet fragmentSet,
size_t &  cumulative_send_count 
)
private

Definition at line 4022 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::DisjointSequence::empty(), get_addresses(), OpenDDS::DCPS::SequenceNumber::getValue(), include_fragment(), OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, LM_ERROR, OpenDDS::RTPS::FragmentNumberSet::numBits, OpenDDS::DCPS::TransportQueueElement::publication_id(), send_strategy(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::Transport_debug_level, and OpenDDS::RTPS::FragmentNumber_t::value.

4025 {
4026  if (Transport_debug_level > 5) {
4027  ACE_DEBUG((LM_DEBUG, "(%P|%t) TRACK RtpsUdpDataLink::durability_resend %q\n", element->sequence().getValue()));
4028  }
4029  const AddrSet addrs = get_addresses(element->publication_id(), element->subscription_id());
4030  if (addrs.empty()) {
4031  const LogGuid conv(element->subscription_id());
4032  ACE_ERROR((LM_ERROR,
4033  "(%P|%t) ERROR: RtpsUdpDataLink::durability_resend() - "
4034  "no locator for remote %C\n", conv.c_str()));
4035  return;
4036  }
4037 
4038  TqeVector to_send;
4039  if (!send_strategy()->fragmentation_helper(element, to_send)) {
4040  return;
4041  }
4042 
4043  DisjointSequence fragments;
4044  fragments.insert(fragmentSet.bitmapBase.value, fragmentSet.numBits,
4045  fragmentSet.bitmap.get_buffer());
4046  SequenceNumber lastFragment = 0;
4047 
4048  const TqeVector::iterator end = to_send.end();
4049  for (TqeVector::iterator i = to_send.begin(); i != end; ++i) {
4050  if (fragments.empty() || include_fragment(**i, fragments, lastFragment)) {
4051  RTPS::Message message;
4052  send_strategy()->send_rtps_control(message, *const_cast<ACE_Message_Block*>((*i)->msg()), addrs);
4053  ++cumulative_send_count;
4054  }
4055 
4056  (*i)->data_delivered();
4057  }
4058 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
AddrSet get_addresses(const GUID_t &local, const GUID_t &remote) const
static bool include_fragment(const TransportQueueElement &element, const DisjointSequence &fragments, SequenceNumber &lastFragment)
RtpsUdpSendStrategy_rch send_strategy()

◆ enable_response_queue()

void OpenDDS::DCPS::RtpsUdpDataLink::enable_response_queue ( )

Definition at line 2687 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransactionalRtpsSendQueue::begin_transaction(), and sq_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::begin_transport_header_processing().

2688 {
2690 }
void begin_transaction()
Signal that a thread is beginning to send a sequence of submessages.
TransactionalRtpsSendQueue sq_

◆ event_dispatcher()

EventDispatcher_rch OpenDDS::DCPS::RtpsUdpDataLink::event_dispatcher ( )
inline

Definition at line 248 of file RtpsUdpDataLink.h.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acknack().

248 { return event_dispatcher_; }
EventDispatcher_rch event_dispatcher_

◆ extend_bitmap_range()

void OpenDDS::DCPS::RtpsUdpDataLink::extend_bitmap_range ( RTPS::FragmentNumberSet fnSet,
CORBA::ULong  extent,
ACE_CDR::ULong cumulative_bits_added 
)
staticprivate

Extend the FragmentNumberSet to cover the fragments that are missing from our last known fragment to the extent

Parameters
fnSetFragmentNumberSet for the message sequence number in question
extentis the highest fragment sequence number for this FragmentNumberSet

Definition at line 2967 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), OpenDDS::RTPS::FragmentNumberSet::numBits, and OpenDDS::RTPS::FragmentNumber_t::value.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::generate_nack_frags_i().

2970 {
2971  if (extent < fnSet.bitmapBase.value) {
2972  return; // can't extend to some number under the base
2973  }
2974  // calculate the index to the extent to determine the new_num_bits
2975  const CORBA::ULong new_num_bits = std::min(CORBA::ULong(256),
2976  extent - fnSet.bitmapBase.value + 1),
2977  len = (new_num_bits + 31) / 32;
2978  if (new_num_bits < fnSet.numBits) {
2979  return; // bitmap already extends past "extent"
2980  }
2981  fnSet.bitmap.length(len);
2982  // We are missing from one past old bitmap end to the new end
2983  DisjointSequence::fill_bitmap_range(fnSet.numBits, new_num_bits,
2984  fnSet.bitmap.get_buffer(), len,
2985  fnSet.numBits, samples_requested);
2986 }
ACE_CDR::ULong ULong
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.

◆ filterBestEffortReaders()

void OpenDDS::DCPS::RtpsUdpDataLink::filterBestEffortReaders ( const ReceivedDataSample ds,
RepoIdSet selected,
RepoIdSet withheld 
)

Definition at line 524 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::publication_id_, readers_lock_, OpenDDS::DCPS::DataSampleHeader::sequence_, and writer_to_seq_best_effort_readers_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().

525 {
527  const GUID_t& writer = ds.header_.publication_id_;
528  const SequenceNumber& seq = ds.header_.sequence_;
529  WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.find(writer);
530  if (w != writer_to_seq_best_effort_readers_.end()) {
531  if (w->second.seq < seq) {
532  w->second.seq = seq;
533  selected.insert(w->second.readers.begin(), w->second.readers.end());
534  } else {
535  withheld.insert(w->second.readers.begin(), w->second.readers.end());
536  }
537  } // else the writer is not associated with best effort readers
538 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
WriterToSeqReadersMap writer_to_seq_best_effort_readers_

◆ flush_send_queue()

void OpenDDS::DCPS::RtpsUdpDataLink::flush_send_queue ( const MonotonicTimePoint now)
private

◆ flush_send_queue_i()

void OpenDDS::DCPS::RtpsUdpDataLink::flush_send_queue_i ( )
private

Definition at line 2676 of file RtpsUdpDataLink.cpp.

References bundle_and_send_submessages(), OpenDDS::DCPS::dedup(), and fsq_vec_size_.

Referenced by disable_response_queue(), and flush_send_queue().

2677 {
2678  for (size_t idx = 0; idx != fsq_vec_size_; ++idx) {
2679  dedup(fsq_vec_[idx]);
2680  bundle_and_send_submessages(fsq_vec_[idx]);
2681  fsq_vec_[idx].clear();
2682  }
2683  fsq_vec_size_ = 0;
2684 }
void bundle_and_send_submessages(MetaSubmessageVec &meta_submessages)
size_t dedup(MetaSubmessageVec &vec)

◆ get_addresses() [1/2]

AddrSet OpenDDS::DCPS::RtpsUdpDataLink::get_addresses ( const GUID_t local,
const GUID_t remote 
) const

Given a 'local' id and a 'remote' id of a publication or subscription, return the set of addresses of the remote peers.

Definition at line 4593 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, get_addresses_i(), and locators_lock_.

Referenced by durability_resend(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper().

4594 {
4596  return get_addresses_i(local, remote);
4597 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
AddrSet get_addresses_i(const GUID_t &local, const GUID_t &remote) const

◆ get_addresses() [2/2]

AddrSet OpenDDS::DCPS::RtpsUdpDataLink::get_addresses ( const GUID_t local) const

Given a 'local' id, return the set of address for all remote peers.

Definition at line 4600 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, get_addresses_i(), and locators_lock_.

4601 {
4603  return get_addresses_i(local);
4604 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
AddrSet get_addresses_i(const GUID_t &local, const GUID_t &remote) const

◆ get_addresses_i() [1/2]

AddrSet OpenDDS::DCPS::RtpsUdpDataLink::get_addresses_i ( const GUID_t local,
const GUID_t remote 
) const
private

Definition at line 4607 of file RtpsUdpDataLink.cpp.

References accumulate_addresses().

Referenced by build_meta_submessage_map(), and get_addresses().

4608 {
4609  AddrSet retval;
4610 
4611  accumulate_addresses(local, remote, retval, true);
4612 
4613  return retval;
4614 }
void accumulate_addresses(const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const

◆ get_addresses_i() [2/2]

AddrSet OpenDDS::DCPS::RtpsUdpDataLink::get_addresses_i ( const GUID_t local) const
private

Definition at line 4617 of file RtpsUdpDataLink.cpp.

References accumulate_addresses(), OpenDDS::DCPS::GuidConverter::isWriter(), OpenDDS::DCPS::DataLink::peer_ids(), ACE_Guard< ACE_LOCK >::release(), writers_, and writers_lock_.

4618 {
4619  AddrSet retval;
4620  bool use_peers = true;
4621 
4622  // For reliable writers, use remote_reader_guids()
4623  const GuidConverter conv(local);
4624  if (conv.isWriter()) {
4625  RtpsWriter_rch writer;
4627  RtpsWriterMap::const_iterator pos = writers_.find(local);
4628  if (pos != writers_.end()) {
4629  writer = pos->second;
4630  }
4631  guard.release();
4632  if (writer) {
4633  RcHandle<ConstSharedRepoIdSet> addr_guids = writer->get_remote_reader_guids();
4634  if (addr_guids) {
4635  for (RepoIdSet::const_iterator it = addr_guids->guids_.begin(),
4636  limit = addr_guids->guids_.end(); it != limit; ++it) {
4637  accumulate_addresses(local, *it, retval);
4638 
4639  }
4640  use_peers = false;
4641  }
4642  }
4643  }
4644 
4645  if (use_peers) {
4646  const GUIDSeq_var peers = peer_ids(local);
4647  if (peers.ptr()) {
4648  for (CORBA::ULong i = 0; i < peers->length(); ++i) {
4649  accumulate_addresses(local, peers[i], retval);
4650  }
4651  }
4652  }
4653 
4654  return retval;
4655 }
void accumulate_addresses(const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const
ACE_CDR::ULong ULong
RcHandle< RtpsWriter > RtpsWriter_rch
GUIDSeq * peer_ids(const GUID_t &local_id) const
Definition: DataLink.cpp:490

◆ get_ice_agent()

RcHandle< ICE::Agent > OpenDDS::DCPS::RtpsUdpDataLink::get_ice_agent ( ) const

Definition at line 4786 of file RtpsUdpDataLink.cpp.

References ice_agent_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes().

4787 {
4788  return ice_agent_;
4789 }
RcHandle< ICE::Agent > ice_agent_

◆ get_ice_endpoint()

WeakRcHandle< ICE::Endpoint > OpenDDS::DCPS::RtpsUdpDataLink::get_ice_endpoint ( ) const
virtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 4793 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransportImpl::get_ice_endpoint(), and OpenDDS::DCPS::DataLink::impl().

Referenced by accumulate_addresses(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes().

4794 {
4795  TransportImpl_rch ti = impl();
4796  return ti ? ti->get_ice_endpoint() : WeakRcHandle<ICE::Endpoint>();
4797 }
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
TransportImpl_rch impl() const
Definition: DataLink.cpp:105

◆ get_job_queue()

RcHandle<JobQueue> OpenDDS::DCPS::RtpsUdpDataLink::get_job_queue ( ) const
inline

Definition at line 249 of file RtpsUdpDataLink.h.

249 { return job_queue_; }
RcHandle< JobQueue > job_queue_

◆ get_last_recv_address()

NetworkAddress OpenDDS::DCPS::RtpsUdpDataLink::get_last_recv_address ( const GUID_t remote_id)

Definition at line 466 of file RtpsUdpDataLink.cpp.

References config(), locators_, locators_lock_, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now().

Referenced by OpenDDS::DCPS::RtpsUdpTransport::get_last_recv_locator().

467 {
469  const RemoteInfoMap::const_iterator pos = locators_.find(remote_id);
470  if (pos != locators_.end()) {
471  RtpsUdpInst_rch cfg = config();
472  const TimeDuration threshold = cfg ? cfg->receive_address_duration_ : TimeDuration();
473  const bool valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= threshold;
474  return valid_last_recv_addr ? pos->second.last_recv_addr_ : NetworkAddress();
475  }
476  return NetworkAddress();
477 }
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ get_reactor()

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::RtpsUdpDataLink::get_reactor ( void  )

Definition at line 16 of file RtpsUdpDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::ReactorTask::get_reactor(), and reactor_task_.

Referenced by on_data_available().

17 {
18  if (!reactor_task_) return 0;
19  return reactor_task_->get_reactor();
20 }
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14

◆ get_reactor_interceptor()

ACE_INLINE ReactorInterceptor_rch OpenDDS::DCPS::RtpsUdpDataLink::get_reactor_interceptor ( ) const

Definition at line 23 of file RtpsUdpDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::ReactorTask::interceptor(), and reactor_task_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().

24 {
26  return reactor_task_->interceptor();
27 }
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
RcHandle< ReactorInterceptor > ReactorInterceptor_rch

◆ get_writer_send_buffer()

RcHandle< SingleSendBuffer > OpenDDS::DCPS::RtpsUdpDataLink::get_writer_send_buffer ( const GUID_t pub_id)
private

Definition at line 978 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, writers_, and writers_lock_.

979 {
980  RcHandle<SingleSendBuffer> result;
982 
983  const RtpsWriterMap::iterator wi = writers_.find(pub_id);
984  if (wi != writers_.end()) {
985  result = wi->second->get_send_buff();
986  }
987  return result;
988 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ handle_registry()

Security::HandleRegistry_rch OpenDDS::DCPS::RtpsUdpDataLink::handle_registry ( ) const
inline

◆ harvest_send_queue()

void OpenDDS::DCPS::RtpsUdpDataLink::harvest_send_queue ( const MonotonicTimePoint now)
private

Definition at line 2660 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransactionalRtpsSendQueue::begin_transaction(), disable_response_queue(), OpenDDS::DCPS::TransactionalRtpsSendQueue::ready_to_send(), and sq_.

2661 {
2663  sq_.ready_to_send();
2664 
2665  disable_response_queue(true);
2666 }
void ready_to_send()
Indicate that the queue is ready to send after all pending transactions are complete.
void begin_transaction()
Signal that a thread is beginning to send a sequence of submessages.
TransactionalRtpsSendQueue sq_
void disable_response_queue(bool send_immediately)

◆ include_fragment()

bool OpenDDS::DCPS::RtpsUdpDataLink::include_fragment ( const TransportQueueElement element,
const DisjointSequence fragments,
SequenceNumber lastFragment 
)
staticprivate

Definition at line 4060 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::contains_any(), OpenDDS::DCPS::TransportQueueElement::is_fragment(), and OpenDDS::DCPS::RtpsCustomizedElement::last_fragment().

Referenced by durability_resend().

4063 {
4064  if (!element.is_fragment()) {
4065  return true;
4066  }
4067 
4068  const RtpsCustomizedElement* const rce = dynamic_cast<const RtpsCustomizedElement*>(&element);
4069  if (!rce) {
4070  return true;
4071  }
4072 
4073  const SequenceRange thisElement(lastFragment + 1, rce->last_fragment());
4074  lastFragment = thisElement.second;
4075  return fragments.contains_any(thisElement);
4076 }
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ is_leading()

bool OpenDDS::DCPS::RtpsUdpDataLink::is_leading ( const GUID_t writer_id,
const GUID_t reader_id 
) const
virtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 4799 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, writers_, and writers_lock_.

4801 {
4802  RtpsWriterMap::mapped_type writer;
4803 
4804  {
4806  RtpsWriterMap::const_iterator pos = writers_.find(writer_id);
4807  if (pos == writers_.end()) {
4808  return false;
4809  }
4810  writer = pos->second;
4811  }
4812 
4813  return writer->is_leading(reader_id);
4814 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ local_crypto_handle() [1/2]

ACE_INLINE DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle ( ) const

◆ local_crypto_handle() [2/2]

ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle ( DDS::Security::ParticipantCryptoHandle  pch)

◆ local_prefix()

const GuidPrefix_t& OpenDDS::DCPS::RtpsUdpDataLink::local_prefix ( ) const
inline

◆ make_reservation()

int OpenDDS::DCPS::RtpsUdpDataLink::make_reservation ( const GUID_t remote_publication_id,
const GUID_t local_subscription_id,
const TransportReceiveListener_wrch receive_listener,
bool  reliable 
)
virtual

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 541 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DataLink::make_reservation(), pending_reliable_readers_, readers_, readers_lock_, and ACE_Guard< ACE_LOCK >::release().

545 {
547  if (reliable) {
548  RtpsReaderMap::iterator rr = readers_.find(lsi);
549  if (rr == readers_.end()) {
550  pending_reliable_readers_.insert(lsi);
551  }
552  }
553  guard.release();
554  return DataLink::make_reservation(rpi, lsi, trl, reliable);
555 }
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398

◆ multicast_socket()

ACE_INLINE ACE_SOCK_Dgram_Mcast & OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket ( )

◆ on_data_available()

void OpenDDS::DCPS::RtpsUdpDataLink::on_data_available ( RcHandle< InternalDataReader< NetworkInterfaceAddress > >  reader)
private

Definition at line 427 of file RtpsUdpDataLink.cpp.

References config(), get_reactor(), multicast_manager_, multicast_socket_, OpenDDS::DCPS::DataLink::network_change(), network_interface_address_reader_, OpenDDS::DCPS::MulticastManager::process(), and receive_strategy().

428 {
429  InternalDataReader<NetworkInterfaceAddress>::SampleSequence samples;
430  InternalSampleInfoSequence infos;
431 
432  network_interface_address_reader_->take(samples, infos);
433 
434  RtpsUdpInst_rch cfg = config();
435  if (!cfg || !cfg->use_multicast_) {
436  return;
437  }
438 
439  multicast_manager_.process(samples,
440  infos,
441  cfg->multicast_interface_,
442  get_reactor(),
443  receive_strategy().in(),
444  cfg->multicast_group_address(),
446 #ifdef ACE_HAS_IPV6
447  , cfg->ipv6_multicast_group_address(),
448  ipv6_multicast_socket_
449 #endif
450  );
451 
452  if (!samples.empty()) {
453  // FUTURE: This is propagating info to discovery. Write instead.
454  network_change();
455  }
456 }
ACE_SOCK_Dgram_Mcast multicast_socket_
void network_change() const
Definition: DataLink.cpp:1212
RtpsUdpReceiveStrategy_rch receive_strategy()
bool process(InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
Returns true if at least one group was joined.
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ open()

bool OpenDDS::DCPS::RtpsUdpDataLink::open ( const ACE_SOCK_Dgram unicast_socket)

Definition at line 293 of file RtpsUdpDataLink.cpp.

References ACE_ERROR, ACE_TEXT(), config(), OpenDDS::DCPS::DCPS_debug_level, ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, ENOTSUP, LM_ERROR, multi_buff_, multicast_socket_, network_interface_address_reader_, ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), OpenDDS::DCPS::DataLink::receive_strategy_, send_strategy(), OpenDDS::DCPS::DataLink::send_strategy_, ACE_SOCK::set_option(), OpenDDS::DCPS::set_socket_multicast_ttl(), SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, OpenDDS::DCPS::DataLink::start(), stop_i(), TheServiceParticipant, unicast_socket(), and unicast_socket_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::make_datalink().

298 {
300 #ifdef ACE_HAS_IPV6
301  ipv6_unicast_socket_ = ipv6_unicast_socket;
302 #endif
303 
304  RtpsUdpInst_rch cfg = config();
305  if (!cfg) {
306  return false;
307  }
308 
309  if (cfg->use_multicast_) {
310 #ifdef ACE_HAS_MAC_OSX
313 #ifdef ACE_HAS_IPV6
314  ipv6_multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
316 #endif
317 #endif
318  }
319 
320  if (cfg->use_multicast_) {
321  if (!set_socket_multicast_ttl(unicast_socket_, cfg->ttl_)) {
322  if (DCPS_debug_level > 0) {
323  ACE_ERROR((LM_ERROR,
324  ACE_TEXT("(%P|%t) ERROR: ")
325  ACE_TEXT("RtpsUdpDataLink::open: ")
326  ACE_TEXT("failed to set TTL: %d\n"),
327  cfg->ttl_));
328  }
329  return false;
330  }
331 #ifdef ACE_HAS_IPV6
332  if (!set_socket_multicast_ttl(ipv6_unicast_socket_, cfg->ttl_)) {
333  if (DCPS_debug_level > 0) {
334  ACE_ERROR((LM_ERROR,
335  ACE_TEXT("(%P|%t) ERROR: ")
336  ACE_TEXT("RtpsUdpDataLink::open: ")
337  ACE_TEXT("failed to set TTL: %d\n"),
338  cfg->ttl_));
339  }
340  return false;
341  }
342 #endif
343  }
344 
345  if (cfg->send_buffer_size_ > 0) {
346  const int snd_size = cfg->send_buffer_size_;
347  if (unicast_socket_.set_option(SOL_SOCKET,
348  SO_SNDBUF,
349  (void *) &snd_size,
350  sizeof(snd_size)) < 0
351  && errno != ENOTSUP) {
352  if (DCPS_debug_level > 0) {
353  ACE_ERROR((LM_ERROR,
354  ACE_TEXT("(%P|%t) ERROR: ")
355  ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
356  snd_size));
357  }
358  return false;
359  }
360 #ifdef ACE_HAS_IPV6
361  if (ipv6_unicast_socket_.set_option(SOL_SOCKET,
362  SO_SNDBUF,
363  (void *) &snd_size,
364  sizeof(snd_size)) < 0
365  && errno != ENOTSUP) {
366  if (DCPS_debug_level > 0) {
367  ACE_ERROR((LM_ERROR,
368  ACE_TEXT("(%P|%t) ERROR: ")
369  ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
370  snd_size));
371  }
372  return false;
373  }
374 #endif
375  }
376 
377  if (cfg->rcv_buffer_size_ > 0) {
378  const int rcv_size = cfg->rcv_buffer_size_;
379  if (unicast_socket_.set_option(SOL_SOCKET,
380  SO_RCVBUF,
381  (void *) &rcv_size,
382  sizeof(int)) < 0
383  && errno != ENOTSUP) {
384  if (DCPS_debug_level > 0) {
385  ACE_ERROR((LM_ERROR,
386  ACE_TEXT("(%P|%t) ERROR: ")
387  ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
388  rcv_size));
389  }
390  return false;
391  }
392 #ifdef ACE_HAS_IPV6
393  if (ipv6_unicast_socket_.set_option(SOL_SOCKET,
394  SO_RCVBUF,
395  (void *) &rcv_size,
396  sizeof(int)) < 0
397  && errno != ENOTSUP) {
398  if (DCPS_debug_level > 0) {
399  ACE_ERROR((LM_ERROR,
400  ACE_TEXT("(%P|%t) ERROR: ")
401  ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
402  rcv_size));
403  }
404  return false;
405  }
406 #endif
407  }
408 
409  send_strategy()->send_buffer(&multi_buff_);
410 
411  if (start(send_strategy_,
412  receive_strategy_, false) != 0) {
413  stop_i();
414  if (DCPS_debug_level > 0) {
415  ACE_ERROR((LM_ERROR,
416  ACE_TEXT("(%P|%t) ERROR: ")
417  ACE_TEXT("UdpDataLink::open: start failed!\n")));
418  }
419  return false;
420  }
421 
422  TheServiceParticipant->network_interface_address_topic()->connect(network_interface_address_reader_);
423 
424  return true;
425 }
#define ACE_ERROR(X)
#define ENOTSUP
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
ACE_SOCK_Dgram_Mcast multicast_socket_
void opts(int opts)
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
int set_option(int level, int option, void *optval, int optlen) const
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
RtpsUdpSendStrategy_rch send_strategy()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
#define TheServiceParticipant
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ OPENDDS_MAP() [1/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( FragmentNumberValue  ,
RTPS::FragmentNumberSet   
)
private

◆ OPENDDS_MAP() [2/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( SequenceNumber  ,
RequestedFragMap   
)
private

◆ OPENDDS_MAP() [3/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( SequenceNumber  ,
ReaderInfoSetHolder_rch   
)
private

◆ OPENDDS_MAP() [4/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( AddressCacheEntryProxy  ,
DestMetaSubmessageMap   
)
private

◆ OPENDDS_MAP() [5/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( CORBA::Long  ,
CountMapPair   
)
private

◆ OPENDDS_MAP_CMP() [1/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
RemoteInfo  ,
GUID_tKeyLessThan   
)
private

Referenced by send_heartbeats().

◆ OPENDDS_MAP_CMP() [2/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
ReaderInfo_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [3/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
RtpsWriter_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [4/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
WriterInfo_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [5/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
MetaSubmessageIterVec  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [6/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( EntityId_t  ,
CountSet  ,
EntityId_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [7/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( EntityId_t  ,
CountMapping  ,
EntityId_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [8/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
RtpsReader_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [9/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( EntityId_t  ,
CORBA::Long  ,
EntityId_tKeyLessThan   
)
private

◆ OPENDDS_MULTIMAP_CMP() [1/2]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP ( GUID_t  ,
RtpsReader_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MULTIMAP_CMP() [2/2]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP ( GUID_t  ,
InterestingRemote  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_SET() [1/3]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET ( ReaderInfo_rch  )
private

◆ OPENDDS_SET() [2/3]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET ( WriterInfo_rch  )
private

◆ OPENDDS_SET() [3/3]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET ( CORBA::Long  )
private

◆ OPENDDS_VECTOR() [1/4]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( MetaSubmessageVec::iterator  )
private

◆ OPENDDS_VECTOR() [2/4]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( MetaSubmessageIterVec  )
private

◆ OPENDDS_VECTOR() [3/4]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( Bundle  )

◆ OPENDDS_VECTOR() [4/4]

OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( MetaSubmessageVec  )
private

◆ pre_stop_i()

void OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i ( )
virtual

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 860 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, DBG_ENTRY_LVL, heartbeat_counts_, OpenDDS::DCPS::DataLink::pre_stop_i(), readers_, readers_lock_, writers_, and writers_lock_.

861 {
862  DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
864  TqeVector to_drop;
865 
866  RtpsWriterMap writers;
867  {
869  writers_.swap(writers);
870  for (RtpsWriterMap::const_iterator it = writers.begin(); it != writers.end(); ++it) {
871  heartbeat_counts_.erase(it->first.entityId);
872  }
873  }
874 
875  RtpsWriterMap::iterator w_iter = writers.begin();
876  while (w_iter != writers.end()) {
877  w_iter->second->pre_stop_helper(to_drop, true);
878  writers.erase(w_iter++);
879  }
880 
881  TqeVector::iterator drop_it = to_drop.begin();
882  while (drop_it != to_drop.end()) {
883  (*drop_it)->data_dropped(true);
884  ++drop_it;
885  }
886 
887  RtpsReaderMap readers;
888  {
890  readers = readers_;
891  }
892 
893  RtpsReaderMap::iterator r_iter = readers.begin();
894  while (r_iter != readers.end()) {
895  r_iter->second->pre_stop_helper();
896  ++r_iter;
897  }
898 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void pre_stop_i()
Definition: DataLink.cpp:993
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ queue_submessages()

void OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages ( MetaSubmessageVec &  meta_submessages)
private

◆ reactor_is_shut_down()

ACE_INLINE bool OpenDDS::DCPS::RtpsUdpDataLink::reactor_is_shut_down ( )

Definition at line 30 of file RtpsUdpDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::ReactorTask::is_shut_down(), and reactor_task_.

31 {
32  if (!reactor_task_) return true;
33  return reactor_task_->is_shut_down();
34 }

◆ receive_strategy()

RtpsUdpReceiveStrategy_rch OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy ( )
private

◆ received() [1/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::DataSubmessage data,
const GuidPrefix_t src_prefix,
const NetworkAddress remote_addr 
)

Definition at line 1567 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, local_prefix_, OpenDDS::DCPS::make_id(), OPENDDS_VECTOR(), pending_reliable_readers_, queue_submessages(), OpenDDS::RTPS::DataSubmessage::readerId, readers_, readers_lock_, readers_of_writer_, receive_strategy(), OpenDDS::DCPS::DataLink::strategy_lock_, update_last_recv_addr(), and OpenDDS::RTPS::DataSubmessage::writerId.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().

1570 {
1571  const GUID_t local = make_id(local_prefix_, data.readerId);
1572  const GUID_t src = make_id(src_prefix, data.writerId);
1573 
1574  update_last_recv_addr(src, remote_addr);
1575 
1576  OPENDDS_VECTOR(RtpsReader_rch) to_call;
1577  {
1579  if (local.entityId == ENTITYID_UNKNOWN) {
1580  typedef std::pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> RRMM_IterRange;
1581  for (RRMM_IterRange iters = readers_of_writer_.equal_range(src); iters.first != iters.second; ++iters.first) {
1582  to_call.push_back(iters.first->second);
1583  }
1584  if (!pending_reliable_readers_.empty()) {
1585  GuardType guard(strategy_lock_);
1587  if (trs) {
1588  for (RepoIdSet::const_iterator it = pending_reliable_readers_.begin();
1589  it != pending_reliable_readers_.end(); ++it)
1590  {
1591  trs->withhold_data_from(*it);
1592  }
1593  }
1594  }
1595  } else {
1596  const RtpsReaderMap::iterator rr = readers_.find(local);
1597  if (rr != readers_.end()) {
1598  to_call.push_back(rr->second);
1599  } else if (pending_reliable_readers_.count(local)) {
1600  GuardType guard(strategy_lock_);
1602  if (trs) {
1603  trs->withhold_data_from(local);
1604  }
1605  }
1606  }
1607  }
1608  MetaSubmessageVec meta_submessages;
1609  for (OPENDDS_VECTOR(RtpsReader_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
1610  (*it)->process_data_i(data, src, meta_submessages);
1611  }
1612  queue_submessages(meta_submessages);
1613 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RtpsReaderMultiMap readers_of_writer_
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
RcHandle< RtpsUdpReceiveStrategy > RtpsUdpReceiveStrategy_rch
RtpsUdpReceiveStrategy_rch receive_strategy()
void queue_submessages(MetaSubmessageVec &meta_submessages)
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
RcHandle< RtpsReader > RtpsReader_rch
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [2/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::GapSubmessage gap,
const GuidPrefix_t src_prefix,
bool  directed,
const NetworkAddress remote_addr 
)

Definition at line 1780 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), OpenDDS::DCPS::make_id(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_gap_i(), update_last_recv_addr(), and OpenDDS::RTPS::GapSubmessage::writerId.

1784 {
1785  update_last_recv_addr(make_id(src_prefix, gap.writerId), remote_addr);
1786  datareader_dispatch(gap, src_prefix, directed, &RtpsReader::process_gap_i);
1787 }
void process_gap_i(const RTPS::GapSubmessage &gap, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [3/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::HeartBeatSubmessage heartbeat,
const GuidPrefix_t src_prefix,
bool  directed,
const NetworkAddress remote_addr 
)

Definition at line 1864 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, datareader_dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, interesting_writers_, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_i(), queue_submessages(), readers_lock_, update_last_recv_addr(), and OpenDDS::RTPS::HeartBeatSubmessage::writerId.

1868 {
1869  const GUID_t src = make_id(src_prefix, heartbeat.writerId);
1871 
1872  update_last_recv_addr(src, remote_addr, now);
1873 
1874  MetaSubmessageVec meta_submessages;
1875  OPENDDS_VECTOR(InterestingRemote) callbacks;
1876  {
1878 
1879  // We received a heartbeat from a writer.
1880  // We should ACKNACK if the writer is interesting and there is no association.
1881 
1882  for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
1883  limit = interesting_writers_.upper_bound(src);
1884  pos != limit;
1885  ++pos) {
1886  pos->second.last_activity = now;
1887  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
1888  callbacks.push_back(pos->second);
1889  pos->second.status = InterestingRemote::EXISTS;
1890  }
1891  }
1892  }
1893  queue_submessages(meta_submessages);
1894 
1895  for (size_t i = 0; i < callbacks.size(); ++i) {
1896  callbacks[i].listener->writer_exists(src, callbacks[i].localid);
1897  }
1898 
1899  datareader_dispatch(heartbeat, src_prefix, directed, &RtpsReader::process_heartbeat_i);
1900 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
InterestingRemoteMapType interesting_writers_
void queue_submessages(MetaSubmessageVec &meta_submessages)
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
void process_heartbeat_i(const RTPS::HeartBeatSubmessage &heartbeat, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [4/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::HeartBeatFragSubmessage hb_frag,
const GuidPrefix_t src_prefix,
bool  directed,
const NetworkAddress remote_addr 
)

Definition at line 2989 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), OpenDDS::DCPS::make_id(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i(), update_last_recv_addr(), and OpenDDS::RTPS::HeartBeatFragSubmessage::writerId.

2993 {
2994  update_last_recv_addr(make_id(src_prefix, hb_frag.writerId), remote_addr);
2995  datareader_dispatch(hb_frag, src_prefix, directed, &RtpsReader::process_heartbeat_frag_i);
2996 }
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
void process_heartbeat_frag_i(const RTPS::HeartBeatFragSubmessage &hb_frag, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [5/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::AckNackSubmessage acknack,
const GuidPrefix_t src_prefix,
const NetworkAddress remote_addr 
)

Definition at line 3061 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, datawriter_dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, interesting_readers_, local_prefix_, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acknack(), OpenDDS::RTPS::AckNackSubmessage::readerId, update_last_recv_addr(), OpenDDS::RTPS::AckNackSubmessage::writerId, and writers_lock_.

3064 {
3065  // local side is DW
3066  const GUID_t local = make_id(local_prefix_, acknack.writerId); // can't be ENTITYID_UNKNOWN
3067  const GUID_t remote = make_id(src_prefix, acknack.readerId);
3069 
3070  update_last_recv_addr(remote, remote_addr, now);
3071 
3072  OPENDDS_VECTOR(DiscoveryListener*) callbacks;
3073 
3074  {
3076  for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
3077  limit = interesting_readers_.upper_bound(remote);
3078  pos != limit;
3079  ++pos) {
3080  pos->second.last_activity = now;
3081  // Ensure the acknack was for the writer.
3082  if (local == pos->second.localid) {
3083  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
3084  callbacks.push_back(pos->second.listener);
3085  pos->second.status = InterestingRemote::EXISTS;
3086  }
3087  }
3088  }
3089  }
3090 
3091  for (size_t i = 0; i < callbacks.size(); ++i) {
3092  callbacks[i]->reader_exists(remote, local);
3093  }
3094 
3095  datawriter_dispatch(acknack, src_prefix, &RtpsWriter::process_acknack);
3096 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void process_acknack(const RTPS::AckNackSubmessage &acknack, const GUID_t &src, MetaSubmessageVec &meta_submessages)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
void datawriter_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
InterestingRemoteMapType interesting_readers_
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [6/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::NackFragSubmessage nackfrag,
const GuidPrefix_t src_prefix,
const NetworkAddress remote_addr 
)

Definition at line 3403 of file RtpsUdpDataLink.cpp.

References datawriter_dispatch(), OpenDDS::DCPS::make_id(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_nackfrag(), OpenDDS::RTPS::NackFragSubmessage::readerId, and update_last_recv_addr().

3406 {
3407  update_last_recv_addr(make_id(src_prefix, nackfrag.readerId), remote_addr);
3408  datawriter_dispatch(nackfrag, src_prefix, &RtpsWriter::process_nackfrag);
3409 }
void datawriter_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
void process_nackfrag(const RTPS::NackFragSubmessage &nackfrag, const GUID_t &src, MetaSubmessageVec &meta_submessages)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ register_for_reader()

void OpenDDS::DCPS::RtpsUdpDataLink::register_for_reader ( const GUID_t writerid,
const GUID_t readerid,
const AddrSet &  addresses,
DiscoveryListener listener 
)

Definition at line 678 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, OpenDDS::DCPS::GUID_t::entityId, heartbeat_, heartbeat_counts_, interesting_readers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::register_for_reader().

682 {
684  const bool enableheartbeat = interesting_readers_.empty();
685  interesting_readers_.insert(
686  InterestingRemoteMapType::value_type(
687  readerid,
688  InterestingRemote(writerid, addresses, listener)));
689  if (heartbeat_counts_.find(writerid.entityId) == heartbeat_counts_.end()) {
690  heartbeat_counts_[writerid.entityId] = 0;
691  }
692  g.release();
693  if (enableheartbeat) {
694  RtpsUdpInst_rch cfg = config();
695  heartbeat_->enable(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
696  }
697 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
InterestingRemoteMapType interesting_readers_
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const
RcHandle< PeriodicEvent > heartbeat_

◆ register_for_writer()

void OpenDDS::DCPS::RtpsUdpDataLink::register_for_writer ( const GUID_t readerid,
const GUID_t writerid,
const AddrSet &  addresses,
DiscoveryListener listener 
)

Definition at line 717 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, heartbeatchecker_, interesting_writers_, and readers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::register_for_writer().

721 {
723  bool enableheartbeatchecker = interesting_writers_.empty();
724  interesting_writers_.insert(
725  InterestingRemoteMapType::value_type(
726  writerid,
727  InterestingRemote(readerid, addresses, listener)));
728  g.release();
729  if (enableheartbeatchecker) {
730  RtpsUdpInst_rch cfg = config();
731  heartbeatchecker_->enable(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
732  }
733 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
InterestingRemoteMapType interesting_writers_
RcHandle< PeriodicEvent > heartbeatchecker_
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ release_reservations_i()

void OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i ( const GUID_t remote_id,
const GUID_t local_id 
)
privatevirtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 901 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), readers_, readers_lock_, readers_of_writer_, remove_locator_and_bundling_cache(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::remove_writer(), writer_to_seq_best_effort_readers_, writers_, and writers_lock_.

Referenced by disassociated().

903 {
904  TqeVector to_drop;
905  using std::pair;
906  const GuidConverter conv(local_id);
907  if (conv.isWriter()) {
909  RtpsWriterMap::iterator rw = writers_.find(local_id);
910 
911  if (rw != writers_.end()) {
912  RtpsWriter_rch writer = rw->second;
913  g.release();
914  writer->remove_reader(remote_id);
915  if (writer->reader_count() == 0) {
916  writer->pre_stop_helper(to_drop, false);
917  }
918  writer->process_acked_by_all();
919  }
920 
921  } else if (conv.isReader()) {
923  RtpsReaderMap::iterator rr = readers_.find(local_id);
924 
925  if (rr != readers_.end()) {
926  for (pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> iters =
927  readers_of_writer_.equal_range(remote_id);
928  iters.first != iters.second;) {
929  if (iters.first->second->id() == local_id) {
930  readers_of_writer_.erase(iters.first++);
931  } else {
932  ++iters.first;
933  }
934  }
935 
936  RtpsReader_rch reader = rr->second;
937  g.release();
938 
939  reader->remove_writer(remote_id);
940 
941  } else {
942  WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.find(remote_id);
943  if (w != writer_to_seq_best_effort_readers_.end()) {
944  RepoIdSet::iterator r = w->second.readers.find(local_id);
945  if (r != w->second.readers.end()) {
946  w->second.readers.erase(r);
947  if (w->second.readers.empty()) {
949  }
950  }
951  }
952  }
953  }
954 
956 
957  for (TqeVector::iterator drop_it = to_drop.begin(); drop_it != to_drop.end(); ++drop_it) {
958  (*drop_it)->data_dropped(true);
959  }
960 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
RtpsReaderMultiMap readers_of_writer_
void remove_locator_and_bundling_cache(const GUID_t &remote_id)
RcHandle< RtpsReader > RtpsReader_rch
RcHandle< RtpsWriter > RtpsWriter_rch

◆ remove_all_msgs()

void OpenDDS::DCPS::RtpsUdpDataLink::remove_all_msgs ( const GUID_t pub_id)
virtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 165 of file RtpsUdpDataLink.cpp.

References ACE_Guard< ACE_LOCK >::release(), writers_, and writers_lock_.

166 {
168  RtpsWriter_rch writer;
169  RtpsWriterMap::iterator iter = writers_.find(pub_id);
170  if (iter != writers_.end()) {
171  writer = iter->second;
172  }
173 
174  g.release();
175 
176  if (writer) {
177  writer->remove_all_msgs();
178  }
179 }
RcHandle< RtpsWriter > RtpsWriter_rch

◆ remove_locator_and_bundling_cache()

void OpenDDS::DCPS::RtpsUdpDataLink::remove_locator_and_bundling_cache ( const GUID_t remote_id)

◆ remove_sample()

RemoveResult OpenDDS::DCPS::RtpsUdpDataLink::remove_sample ( const DataSampleElement sample)
virtual

This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 146 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DataSampleElement::get_pub_id(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::REMOVE_NOT_FOUND, writers_, and writers_lock_.

147 {
148  GUID_t pub_id = sample->get_pub_id();
149 
151  RtpsWriter_rch writer;
152  RtpsWriterMap::iterator iter = writers_.find(pub_id);
153  if (iter != writers_.end()) {
154  writer = iter->second;
155  }
156 
157  g.release();
158 
159  if (writer) {
160  return writer->remove_sample(sample);
161  }
162  return REMOVE_NOT_FOUND;
163 }
RcHandle< RtpsWriter > RtpsWriter_rch

◆ requires_inline_qos()

bool OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos ( const GUIDSeq_var &  peers)

Definition at line 1426 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, force_inline_qos_, locators_, and locators_lock_.

Referenced by customize_queue_element(), OpenDDS::DCPS::RtpsUdpTransport::get_last_recv_locator(), and update_locators().

1427 {
1428  if (force_inline_qos_) {
1429  // Force true for testing purposes
1430  return true;
1431  } else {
1432  if (!peers.ptr()) {
1433  return false;
1434  }
1436  for (CORBA::ULong i = 0; i < peers->length(); ++i) {
1437  const RemoteInfoMap::const_iterator iter = locators_.find(peers[i]);
1438  if (iter != locators_.end() && iter->second.requires_inline_qos_) {
1439  return true;
1440  }
1441  }
1442  return false;
1443  }
1444 }
static bool force_inline_qos_
static member used by testing code to force inline qos
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ security_config()

Security::SecurityConfig_rch OpenDDS::DCPS::RtpsUdpDataLink::security_config ( ) const
inline

◆ send_heartbeats()

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats ( const MonotonicTimePoint now)
private

Definition at line 4079 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::HEARTBEAT, heartbeat_counts_, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::DCPS::insert(), interesting_readers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, OPENDDS_MAP_CMP(), OPENDDS_VECTOR(), queue_submessages(), OpenDDS::DCPS::DiscoveryListener::reader_does_not_exist(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::to_rtps_seqnum(), writers_, writers_lock_, and OpenDDS::DCPS::SequenceNumber::ZERO().

4080 {
4081  OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks;
4082 
4083  RtpsUdpInst_rch cfg = config();
4084 
4085  MetaSubmessageVec meta_submessages;
4086 
4087  {
4089 
4090  const MonotonicTimePoint tv = now - 10 * cfg->heartbeat_period_;
4091  const MonotonicTimePoint tv3 = now - 3 * cfg->heartbeat_period_;
4092 
4093  typedef OPENDDS_MAP_CMP(GUID_t, RcHandle<ConstSharedRepoIdSet>, GUID_tKeyLessThan) WtaMap;
4094  WtaMap writers_to_advertise;
4095 
4096  for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
4097  limit = interesting_readers_.end();
4098  pos != limit;
4099  ++pos) {
4100  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
4101  (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
4102  RcHandle<ConstSharedRepoIdSet>& tg = writers_to_advertise[pos->second.localid];
4103  if (!tg) {
4104  tg = make_rch<ConstSharedRepoIdSet>();
4105  }
4106  const_cast<RepoIdSet&>(tg->guids_).insert(pos->first);
4107  }
4108  if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
4109  CallbackType callback(pos->first, pos->second);
4110  readerDoesNotExistCallbacks.push_back(callback);
4111  pos->second.status = InterestingRemote::DOES_NOT_EXIST;
4112  }
4113  }
4114 
4115  for (WtaMap::const_iterator pos = writers_to_advertise.begin(),
4116  limit = writers_to_advertise.end();
4117  pos != limit;
4118  ++pos) {
4119  RtpsWriterMap::const_iterator wpos = writers_.find(pos->first);
4120  if (wpos != writers_.end()) {
4121  wpos->second->gather_heartbeats(pos->second, meta_submessages);
4122  } else {
4123  using namespace OpenDDS::RTPS;
4124  const int count = ++heartbeat_counts_[pos->first.entityId];
4125  const HeartBeatSubmessage hb = {
4127  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4128  pos->first.entityId,
4129  to_rtps_seqnum(SequenceNumber(1)),
4131  {count}
4132  };
4133 
4134  for (RepoIdSet::const_iterator it = pos->second->guids_.begin(),
4135  limit = pos->second->guids_.end(); it != limit; ++it) {
4136 
4137  MetaSubmessage meta_submessage(pos->first, *it);
4138  meta_submessage.sm_.heartbeat_sm(hb);
4139  meta_submessages.push_back(meta_submessage);
4140  }
4141  }
4142  }
4143  }
4144 
4145  queue_submessages(meta_submessages);
4146 
4147  for (OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks.begin();
4148  iter != readerDoesNotExistCallbacks.end(); ++iter) {
4149  const InterestingRemote& remote = iter->second;
4150  remote.listener->reader_does_not_exist(iter->first, remote.localid);
4151  }
4152 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
std::pair< GUID_t, InterestingRemote > CallbackType
GuidSet RepoIdSet
Definition: GuidUtils.h:113
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
const octet FLAG_E
Definition: RtpsCore.idl:518
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
typedef OPENDDS_MAP_CMP(GUID_t, RemoteInfo, GUID_tKeyLessThan) RemoteInfoMap
static SequenceNumber ZERO()
void queue_submessages(MetaSubmessageVec &meta_submessages)
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
InterestingRemoteMapType interesting_readers_
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ send_heartbeats_manual_i()

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual_i ( const TransportSendControlElement tsce,
MetaSubmessageVec &  meta_submessages 
)
private

Definition at line 4357 of file RtpsUdpDataLink.cpp.

References best_effort_heartbeat_count_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::FLAG_L, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::DCPS::TransportSendControlElement::publication_id(), OpenDDS::DCPS::TransportSendControlElement::sequence(), OpenDDS::DCPS::MetaSubmessage::sm_, and OpenDDS::RTPS::to_rtps_seqnum().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper(), and customize_queue_element_non_reliable_i().

4358 {
4359  using namespace OpenDDS::RTPS;
4360 
4361  const GUID_t pub_id = tsce->publication_id();
4362  const HeartBeatSubmessage hb = {
4363  {HEARTBEAT,
4365  HEARTBEAT_SZ},
4366  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4367  pub_id.entityId,
4368  // This liveliness heartbeat is from a best-effort Writer, the sequence numbers are not used
4369  to_rtps_seqnum(SequenceNumber(1)),
4370  to_rtps_seqnum(tsce->sequence()),
4372  };
4373 
4374  MetaSubmessage meta_submessage(pub_id, GUID_UNKNOWN);
4375  meta_submessage.sm_.heartbeat_sm(hb);
4376  meta_submessages.push_back(meta_submessage);
4377 }
const octet FLAG_E
Definition: RtpsCore.idl:518
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
const octet FLAG_L
Definition: RtpsCore.idl:524
const octet FLAG_F
Definition: RtpsCore.idl:520
ACE_CDR::Octet Octet
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ send_strategy()

RtpsUdpSendStrategy_rch OpenDDS::DCPS::RtpsUdpDataLink::send_strategy ( )
private

◆ separate_message()

bool OpenDDS::DCPS::RtpsUdpDataLink::separate_message ( EntityId_t  entity)
static

Definition at line 2481 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER, and OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER.

Referenced by build_meta_submessage_map(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_encoded().

2482 {
2483  // submessages generated by these entities may not be combined
2484  // with other submessages when using full-message protection
2485  // DDS Security v1.1 8.4.2.4 Table 27 is_rtps_protected
2486  using namespace RTPS;
2491 }
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER
Definition: MessageTypes.h:86
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
Definition: MessageTypes.h:85
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER
Definition: MessageTypes.h:83
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER
Definition: MessageTypes.h:84

◆ stop_i()

void OpenDDS::DCPS::RtpsUdpDataLink::stop_i ( )
privatevirtual

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 963 of file RtpsUdpDataLink.cpp.

References ACE_SOCK::close(), heartbeat_, heartbeatchecker_, multicast_socket_, network_interface_address_reader_, TheServiceParticipant, and unicast_socket_.

Referenced by open().

964 {
965  TheServiceParticipant->network_interface_address_topic()->disconnect(network_interface_address_reader_);
966 
967  heartbeat_->disable();
968  heartbeatchecker_->disable();
971 #ifdef ACE_HAS_IPV6
972  ipv6_unicast_socket_.close();
973  ipv6_multicast_socket_.close();
974 #endif
975 }
ACE_SOCK_Dgram_Mcast multicast_socket_
RcHandle< PeriodicEvent > heartbeatchecker_
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
#define TheServiceParticipant
int close(void)
RcHandle< PeriodicEvent > heartbeat_

◆ submsgs_to_msgblock()

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpDataLink::submsgs_to_msgblock ( const RTPS::SubmessageSeq subm)
private

Definition at line 1056 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Encoding::align(), OpenDDS::DCPS::Serializer::align_w(), alloc_msgblock(), custom_allocator_, OpenDDS::STUN::encoding(), OpenDDS::DCPS::Encoding::KIND_XCDR1, OpenDDS::DCPS::serialized_size(), and OpenDDS::RTPS::SMHDR_SZ.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper(), and customize_queue_element_non_reliable_i().

1057 {
1058  // byte swapping is handled in the operator<<() implementation
1059  const Encoding encoding(Encoding::KIND_XCDR1);
1060  size_t size = 0;
1061  for (CORBA::ULong i = 0; i < subm.length(); ++i) {
1062  encoding.align(size, RTPS::SMHDR_SZ);
1063  serialized_size(encoding, size, subm[i]);
1064  }
1065 
1067 
1068  Serializer ser(hdr, encoding);
1069  for (CORBA::ULong i = 0; i < subm.length(); ++i) {
1070  ser << subm[i];
1071  ser.align_w(RTPS::SMHDR_SZ);
1072  }
1073  return hdr;
1074 }
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
ACE_CDR::ULong ULong
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > custom_allocator_
ACE_Message_Block * alloc_msgblock(size_t size, ACE_Allocator *data_allocator)
void align(size_t &value, size_t by=(std::numeric_limits< size_t >::max)()) const
Align "value" to "by" and according to the stream&#39;s alignment.
Definition: Serializer.inl:118
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)

◆ transport()

RtpsUdpTransport_rch OpenDDS::DCPS::RtpsUdpDataLink::transport ( void  )

Definition at line 4575 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::dynamic_rchandle_cast(), and OpenDDS::DCPS::DataLink::impl().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i().

4576 {
4577  return dynamic_rchandle_cast<RtpsUdpTransport>(impl());
4578 }
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214

◆ unicast_socket()

ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket ( )

◆ unregister_for_reader()

void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_reader ( const GUID_t writerid,
const GUID_t readerid 
)

Definition at line 700 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, interesting_readers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::unregister_for_reader().

702 {
704  for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
705  limit = interesting_readers_.upper_bound(readerid);
706  pos != limit;
707  ) {
708  if (pos->second.localid == writerid) {
709  interesting_readers_.erase(pos++);
710  } else {
711  ++pos;
712  }
713  }
714 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
InterestingRemoteMapType interesting_readers_

◆ unregister_for_writer()

void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_writer ( const GUID_t readerid,
const GUID_t writerid 
)

Definition at line 736 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, interesting_writers_, and readers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::unregister_for_writer().

738 {
740  for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
741  limit = interesting_writers_.upper_bound(writerid);
742  pos != limit;
743  ) {
744  if (pos->second.localid == readerid) {
745  interesting_writers_.erase(pos++);
746  } else {
747  ++pos;
748  }
749  }
750 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
InterestingRemoteMapType interesting_writers_

◆ update_last_recv_addr()

void OpenDDS::DCPS::RtpsUdpDataLink::update_last_recv_addr ( const GUID_t src,
const NetworkAddress addr,
const MonotonicTimePoint now = MonotonicTimePoint::now() 
)
private

Definition at line 1531 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::is_more_local(), locators_, locators_lock_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), and remove_locator_and_bundling_cache().

Referenced by associated(), and received().

1532 {
1533  RtpsUdpInst_rch cfg = config();
1534 
1535  if (!cfg) {
1536  return;
1537  }
1538 
1539  if (addr == cfg->rtps_relay_address()) {
1540  return;
1541  }
1542 
1543  bool remove_cache = false;
1544  {
1546  const RemoteInfoMap::iterator pos = locators_.find(src);
1547  if (pos != locators_.end()) {
1548  const bool expired = cfg->receive_address_duration_ < (MonotonicTimePoint::now() - pos->second.last_recv_time_);
1549  const bool allow_update = expired ||
1550  pos->second.last_recv_addr_ == addr ||
1551  is_more_local(pos->second.last_recv_addr_, addr);
1552  if (allow_update) {
1553  remove_cache = pos->second.last_recv_addr_ != addr;
1554  pos->second.last_recv_addr_ = addr;
1555  pos->second.last_recv_time_ = now;
1556  }
1557  }
1558  }
1559  if (remove_cache) {
1561  }
1562 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
bool is_more_local(const NetworkAddress &current, const NetworkAddress &incoming)
void remove_locator_and_bundling_cache(const GUID_t &remote_id)
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ update_locators()

void OpenDDS::DCPS::RtpsUdpDataLink::update_locators ( const GUID_t remote_id,
AddrSet &  unicast_addresses,
AddrSet &  multicast_addresses,
bool  requires_inline_qos,
bool  add_ref 
)

Definition at line 480 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogAddr::c_str(), OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, LM_INFO, locators_, locators_lock_, OpenDDS::DCPS::RtpsUdpDataLink::RemoteInfo::multicast_addrs_, OpenDDS::DCPS::RtpsUdpDataLink::RemoteInfo::ref_count_, remove_locator_and_bundling_cache(), requires_inline_qos(), OpenDDS::DCPS::RtpsUdpDataLink::RemoteInfo::requires_inline_qos_, and OpenDDS::DCPS::RtpsUdpDataLink::RemoteInfo::unicast_addrs_.

Referenced by associated(), and OpenDDS::DCPS::RtpsUdpTransport::update_locators().

485 {
486  if (unicast_addresses.empty() && multicast_addresses.empty()) {
487  if (DCPS_debug_level > 0) {
488  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::update_locators: no addresses for %C\n"), LogGuid(remote_id).c_str()));
489  }
490  }
491 
493 
495 
496  RemoteInfo& info = locators_[remote_id];
497  const bool log_unicast_change = DCPS_debug_level > 3 && info.unicast_addrs_ != unicast_addresses;
498  const bool log_multicast_change = DCPS_debug_level > 3 && info.multicast_addrs_ != multicast_addresses;
499  info.unicast_addrs_.swap(unicast_addresses);
500  info.multicast_addrs_.swap(multicast_addresses);
501  info.requires_inline_qos_ = requires_inline_qos;
502  if (add_ref) {
503  ++info.ref_count_;
504  }
505 
506  g.release();
507 
508  if (log_unicast_change) {
509  for (AddrSet::const_iterator pos = unicast_addresses.begin(), limit = unicast_addresses.end();
510  pos != limit; ++pos) {
511  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) RtpsUdpDataLink::update_locators %C is now at %C\n"),
512  LogGuid(remote_id).c_str(), LogAddr(*pos).c_str()));
513  }
514  }
515  if (log_multicast_change) {
516  for (AddrSet::const_iterator pos = multicast_addresses.begin(), limit = multicast_addresses.end();
517  pos != limit; ++pos) {
518  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) RtpsUdpDataLink::update_locators %C is now at %C\n"),
519  LogGuid(remote_id).c_str(), LogAddr(*pos).c_str()));
520  }
521  }
522 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void remove_locator_and_bundling_cache(const GUID_t &remote_id)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
bool requires_inline_qos(const GUIDSeq_var &peers)

◆ update_required_acknack_count()

void OpenDDS::DCPS::RtpsUdpDataLink::update_required_acknack_count ( const GUID_t local_id,
const GUID_t remote_id,
CORBA::Long  current 
)
private

Definition at line 2741 of file RtpsUdpDataLink.cpp.

References writers_, and writers_lock_.

Referenced by bundle_and_send_submessages().

2742 {
2743  RtpsWriter_rch writer;
2744  {
2746  RtpsWriterMap::iterator rw = writers_.find(local_id);
2747  if (rw != writers_.end()) {
2748  writer = rw->second;
2749  }
2750  }
2751  if (writer) {
2752  writer->update_required_acknack_count(remote_id, current);
2753  }
2754 }
RcHandle< RtpsWriter > RtpsWriter_rch

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 266 of file RtpsUdpDataLink.h.

Member Data Documentation

◆ best_effort_heartbeat_count_

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::best_effort_heartbeat_count_
private

Definition at line 860 of file RtpsUdpDataLink.h.

Referenced by send_heartbeats_manual_i().

◆ bundle_allocator_

Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> OpenDDS::DCPS::RtpsUdpDataLink::bundle_allocator_
private

Definition at line 314 of file RtpsUdpDataLink.h.

Referenced by bundle_and_send_submessages().

◆ bundling_cache_

BundlingCache OpenDDS::DCPS::RtpsUdpDataLink::bundling_cache_
mutableprivate

◆ custom_allocator_

Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> OpenDDS::DCPS::RtpsUdpDataLink::custom_allocator_
private

Definition at line 313 of file RtpsUdpDataLink.h.

Referenced by submsgs_to_msgblock().

◆ db_allocator_

DataBlockAllocator OpenDDS::DCPS::RtpsUdpDataLink::db_allocator_
private

Definition at line 312 of file RtpsUdpDataLink.h.

Referenced by alloc_msgblock().

◆ db_lock_pool_

unique_ptr<DataBlockLockPool> OpenDDS::DCPS::RtpsUdpDataLink::db_lock_pool_
private

Definition at line 315 of file RtpsUdpDataLink.h.

Referenced by alloc_msgblock().

◆ event_dispatcher_

EventDispatcher_rch OpenDDS::DCPS::RtpsUdpDataLink::event_dispatcher_
private

Definition at line 272 of file RtpsUdpDataLink.h.

◆ flush_send_queue_sporadic_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::flush_send_queue_sporadic_
private

Definition at line 743 of file RtpsUdpDataLink.h.

Referenced by disable_response_queue(), and ~RtpsUdpDataLink().

◆ force_inline_qos_

bool OpenDDS::DCPS::RtpsUdpDataLink::force_inline_qos_ = false
staticprivate

static member used by testing code to force inline qos

Definition at line 268 of file RtpsUdpDataLink.h.

Referenced by requires_inline_qos().

◆ fsq_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::fsq_mutex_
mutableprivate

Definition at line 735 of file RtpsUdpDataLink.h.

Referenced by disable_response_queue(), and flush_send_queue().

◆ fsq_vec_size_

size_t OpenDDS::DCPS::RtpsUdpDataLink::fsq_vec_size_
private

Definition at line 737 of file RtpsUdpDataLink.h.

Referenced by disable_response_queue(), and flush_send_queue_i().

◆ handle_registry_

Security::HandleRegistry_rch OpenDDS::DCPS::RtpsUdpDataLink::handle_registry_
private

Definition at line 933 of file RtpsUdpDataLink.h.

Referenced by RtpsUdpDataLink().

◆ harvest_send_queue_sporadic_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::harvest_send_queue_sporadic_
private

Definition at line 740 of file RtpsUdpDataLink.h.

Referenced by queue_submessages(), and ~RtpsUdpDataLink().

◆ heartbeat_

RcHandle<PeriodicEvent> OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_
private

◆ heartbeat_counts_

CountMapType OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_counts_
private

Definition at line 905 of file RtpsUdpDataLink.h.

Referenced by associated(), pre_stop_i(), register_for_reader(), and send_heartbeats().

◆ heartbeatchecker_

RcHandle<PeriodicEvent> OpenDDS::DCPS::RtpsUdpDataLink::heartbeatchecker_
private

Definition at line 863 of file RtpsUdpDataLink.h.

Referenced by register_for_writer(), and stop_i().

◆ ice_agent_

RcHandle<ICE::Agent> OpenDDS::DCPS::RtpsUdpDataLink::ice_agent_
private

Definition at line 935 of file RtpsUdpDataLink.h.

Referenced by accumulate_addresses(), and get_ice_agent().

◆ interesting_readers_

InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_readers_
private

◆ interesting_writers_

InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_writers_
private

◆ job_queue_

RcHandle<JobQueue> OpenDDS::DCPS::RtpsUdpDataLink::job_queue_
private

Definition at line 271 of file RtpsUdpDataLink.h.

Referenced by RtpsUdpDataLink().

◆ local_crypto_handle_

DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle_
private

Definition at line 934 of file RtpsUdpDataLink.h.

Referenced by local_crypto_handle().

◆ local_prefix_

GuidPrefix_t OpenDDS::DCPS::RtpsUdpDataLink::local_prefix_
private

Definition at line 277 of file RtpsUdpDataLink.h.

Referenced by received(), and RtpsUdpDataLink().

◆ locator_cache_

LocatorCache OpenDDS::DCPS::RtpsUdpDataLink::locator_cache_
mutableprivate

Definition at line 301 of file RtpsUdpDataLink.h.

Referenced by accumulate_addresses(), and remove_locator_and_bundling_cache().

◆ locators_

RemoteInfoMap OpenDDS::DCPS::RtpsUdpDataLink::locators_
private

◆ locators_lock_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::locators_lock_
mutableprivate

◆ max_bundle_size_

const size_t OpenDDS::DCPS::RtpsUdpDataLink::max_bundle_size_
private

Definition at line 907 of file RtpsUdpDataLink.h.

Referenced by bundle_mapped_meta_submessages().

◆ mb_allocator_

MessageBlockAllocator OpenDDS::DCPS::RtpsUdpDataLink::mb_allocator_
private

Definition at line 311 of file RtpsUdpDataLink.h.

Referenced by alloc_msgblock().

◆ multi_buff_

OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer OpenDDS::DCPS::RtpsUdpDataLink::multi_buff_
private

Referenced by associated(), and open().

◆ multicast_manager_

MulticastManager OpenDDS::DCPS::RtpsUdpDataLink::multicast_manager_
private

Definition at line 941 of file RtpsUdpDataLink.h.

Referenced by on_data_available().

◆ multicast_socket_

ACE_SOCK_Dgram_Mcast OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket_
private

Definition at line 305 of file RtpsUdpDataLink.h.

Referenced by multicast_socket(), on_data_available(), open(), and stop_i().

◆ network_interface_address_reader_

RcHandle<InternalDataReader<NetworkInterfaceAddress> > OpenDDS::DCPS::RtpsUdpDataLink::network_interface_address_reader_
private

Definition at line 940 of file RtpsUdpDataLink.h.

Referenced by on_data_available(), open(), and stop_i().

◆ pending_reliable_readers_

RepoIdSet OpenDDS::DCPS::RtpsUdpDataLink::pending_reliable_readers_
private

Definition at line 745 of file RtpsUdpDataLink.h.

Referenced by associated(), make_reservation(), and received().

◆ reactor_task_

ReactorTask_rch OpenDDS::DCPS::RtpsUdpDataLink::reactor_task_
private

Definition at line 270 of file RtpsUdpDataLink.h.

Referenced by get_reactor(), get_reactor_interceptor(), and reactor_is_shut_down().

◆ readers_

RtpsReaderMap OpenDDS::DCPS::RtpsUdpDataLink::readers_
private

◆ readers_lock_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::readers_lock_
mutableprivate

What was once a single lock for the whole datalink is now split between three (four including ch_lock_):

  • readers_lock_ protects readers_, readers_of_writer_, pending_reliable_readers_, interesting_writers_, and writer_to_seq_best_effort_readers_ along with anything else that fits the 'reader side activity' of the datalink
  • writers_lock_ protects writers_, heartbeat_counts_ best_effort_heartbeat_count_, and interesting_readers_ along with anything else that fits the 'writers side activity' of the datalink
  • locators_lock_ protects locators_ (and therefore calls to get_addresses_i()) for both remote writers and remote readers
  • send_queues_lock protects thread_send_queues_

Definition at line 767 of file RtpsUdpDataLink.h.

Referenced by accumulate_addresses(), associated(), check_heartbeats(), client_stop(), filterBestEffortReaders(), make_reservation(), pre_stop_i(), received(), register_for_writer(), release_reservations_i(), and unregister_for_writer().

◆ readers_of_writer_

RtpsReaderMultiMap OpenDDS::DCPS::RtpsUdpDataLink::readers_of_writer_
private

Definition at line 755 of file RtpsUdpDataLink.h.

Referenced by associated(), client_stop(), received(), and release_reservations_i().

◆ security_config_

Security::SecurityConfig_rch OpenDDS::DCPS::RtpsUdpDataLink::security_config_
private

Definition at line 932 of file RtpsUdpDataLink.h.

Referenced by RtpsUdpDataLink().

◆ security_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::security_mutex_
mutableprivate

Definition at line 931 of file RtpsUdpDataLink.h.

Referenced by local_crypto_handle().

◆ sq_

TransactionalRtpsSendQueue OpenDDS::DCPS::RtpsUdpDataLink::sq_
private

◆ transport_statistics_

InternalTransportStatistics& OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_
private

◆ transport_statistics_mutex_

ACE_Thread_Mutex& OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_mutex_
private

◆ unicast_socket_

ACE_SOCK_Dgram OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket_
private

Definition at line 304 of file RtpsUdpDataLink.h.

Referenced by open(), stop_i(), and unicast_socket().

◆ writer_to_seq_best_effort_readers_

WriterToSeqReadersMap OpenDDS::DCPS::RtpsUdpDataLink::writer_to_seq_best_effort_readers_
private

◆ writers_

RtpsWriterMap OpenDDS::DCPS::RtpsUdpDataLink::writers_
private

◆ writers_lock_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::writers_lock_
mutableprivate

The documentation for this class was generated from the following files: