OpenDDS  Snapshot(2023/04/28-20:55)
Classes | Public Member Functions | Static Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Static Private Attributes | Friends | List of all members
OpenDDS::DCPS::RtpsUdpDataLink Class Reference

#include <RtpsUdpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpDataLink:
Collaboration graph
[legend]

Classes

struct  Bundle
 
struct  CountKeeper
 
struct  CountMapPair
 
struct  CountMapping
 
class  DeliverHeldData
 
struct  InterestingRemote
 Data structure representing an "interesting" remote entity for static discovery. More...
 
struct  MultiSendBuffer
 
struct  ReaderInfo
 
struct  ReaderInfoSetHolder
 
struct  RemoteInfo
 
class  ReplayDurableData
 
class  RtpsReader
 
class  RtpsWriter
 
struct  WriterInfo
 

Public Member Functions

 RtpsUdpDataLink (const RtpsUdpTransport_rch &transport, const GuidPrefix_t &local_prefix, const RtpsUdpInst_rch &config, const ReactorTask_rch &reactor_task, InternalTransportStatistics &transport_statistics, ACE_Thread_Mutex &transport_statistics_mutex)
 
 ~RtpsUdpDataLink ()
 
bool add_delayed_notification (TransportQueueElement *element)
 
RemoveResult remove_sample (const DataSampleElement *sample)
 
void remove_all_msgs (const GUID_t &pub_id)
 
RtpsUdpInst_rch config () const
 
ACE_Reactorget_reactor ()
 
ReactorInterceptor_rch get_reactor_interceptor () const
 
bool reactor_is_shut_down ()
 
ACE_SOCK_Dgramunicast_socket ()
 
ACE_SOCK_Dgram_Mcastmulticast_socket ()
 
bool open (const ACE_SOCK_Dgram &unicast_socket)
 
void received (const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
 
void received (const RTPS::GapSubmessage &gap, const GuidPrefix_t &src_prefix, bool directed, const NetworkAddress &remote_addr)
 
void received (const RTPS::HeartBeatSubmessage &heartbeat, const GuidPrefix_t &src_prefix, bool directed, const NetworkAddress &remote_addr)
 
void received (const RTPS::HeartBeatFragSubmessage &hb_frag, const GuidPrefix_t &src_prefix, bool directed, const NetworkAddress &remote_addr)
 
void received (const RTPS::AckNackSubmessage &acknack, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
 
void received (const RTPS::NackFragSubmessage &nackfrag, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
 
const GuidPrefix_tlocal_prefix () const
 
void remove_locator_and_bundling_cache (const GUID_t &remote_id)
 
NetworkAddress get_last_recv_address (const GUID_t &remote_id)
 
void update_locators (const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
 
AddrSet get_addresses (const GUID_t &local, const GUID_t &remote) const
 
AddrSet get_addresses (const GUID_t &local) const
 Given a 'local' id, return the set of address for all remote peers. More...
 
void filterBestEffortReaders (const ReceivedDataSample &ds, RepoIdSet &selected, RepoIdSet &withheld)
 
int make_reservation (const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
 
bool associated (const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
 
void disassociated (const GUID_t &local, const GUID_t &remote)
 
void register_for_reader (const GUID_t &writerid, const GUID_t &readerid, const AddrSet &addresses, DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &readerid, const GUID_t &writerid, const AddrSet &addresses, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &readerid, const GUID_t &writerid)
 
void client_stop (const GUID_t &localId)
 
virtual void pre_stop_i ()
 
DCPS::RcHandle< ICE::Agentget_ice_agent () const
 
virtual DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint () const
 
virtual bool is_leading (const GUID_t &writer_id, const GUID_t &reader_id) const
 
Security::SecurityConfig_rch security_config () const
 
Security::HandleRegistry_rch handle_registry () const
 
DDS::Security::ParticipantCryptoHandle local_crypto_handle () const
 
void local_crypto_handle (DDS::Security::ParticipantCryptoHandle pch)
 
RtpsUdpTransport_rch transport ()
 
void enable_response_queue ()
 
void disable_response_queue (bool send_immediately)
 
bool requires_inline_qos (const GUIDSeq_var &peers)
 
EventDispatcher_rch event_dispatcher ()
 
RcHandle< JobQueueget_job_queue () const
 
typedef OPENDDS_VECTOR (Bundle) BundleVec
 
- Public Member Functions inherited from OpenDDS::DCPS::DataLink
 DataLink (const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
 Only called by our TransportImpl object. More...
 
virtual ~DataLink ()
 
int handle_exception (ACE_HANDLE)
 Reactor invokes this after being notified in schedule_stop or cancel_release. More...
 
void schedule_stop (const MonotonicTimePoint &schedule_to_stop_at)
 
void stop ()
 The stop method is used to stop the DataLink prior to shutdown. More...
 
void resume_send ()
 
virtual int make_reservation (const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
 
void release_reservations (GUID_t remote_id, GUID_t local_id, DataLinkSetMap &released_locals)
 
void schedule_delayed_release ()
 
const TimeDurationdatalink_release_delay () const
 
void remove_listener (const GUID_t &local_id)
 
void send_start ()
 
void send (TransportQueueElement *element)
 
void send_stop (GUID_t repoId)
 
int data_received (ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
 
void data_received_include (ReceivedDataSample &sample, const RepoIdSet &incl)
 
DataLinkIdType id () const
 Obtain a unique identifier for this DataLink object. More...
 
void transport_shutdown ()
 
void notify (ConnectionNotice notice)
 
void release_resources ()
 
void terminate_send ()
 
void terminate_send_if_suspended ()
 
bool is_target (const GUID_t &remote_id)
 
void clear_associations ()
 
int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 
int handle_close (ACE_HANDLE h, ACE_Reactor_Mask m)
 
void set_dscp_codepoint (int cp, ACE_SOCK &socket)
 
Prioritytransport_priority ()
 
Priority transport_priority () const
 
bool & is_loopback ()
 
bool is_loopback () const
 
bool & is_active ()
 
bool is_active () const
 
bool cancel_release ()
 
ACE_Message_Blockcreate_control (char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr data)
 
GUIDSeqtarget_intersection (const GUID_t &pub_id, const GUIDSeq &in, size_t &n_subs)
 
TransportImpl_rch impl () const
 
void default_listener (const TransportReceiveListener_wrch &trl)
 
TransportReceiveListener_wrch default_listener () const
 
bool add_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void remove_on_start_callback (const TransportClient_wrch &client, const GUID_t &remote)
 
void invoke_on_start_callbacks (bool success)
 
bool invoke_on_start_callbacks (const GUID_t &local, const GUID_t &remote, bool success)
 
void remove_startup_callbacks (const GUID_t &local, const GUID_t &remote)
 
void set_scheduling_release (bool scheduling_release)
 
virtual void send_final_acks (const GUID_t &readerid)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >
 InternalDataReaderListener ()
 
 InternalDataReaderListener (JobQueue_rch job_queue)
 
void job_queue (JobQueue_rch job_queue)
 
virtual void on_data_available (InternalDataReader_rch reader)=0
 
void schedule (InternalDataReader_rch reader)
 

Static Public Member Functions

static bool separate_message (EntityId_t entity)
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 

Private Types

typedef CORBA::ULong FragmentNumberValue
 
typedef RcHandle< ReaderInfoReaderInfo_rch
 
typedef RcHandle< ReaderInfoSetHolderReaderInfoSetHolder_rch
 
typedef RcHandle< RtpsWriterRtpsWriter_rch
 
typedef RcHandle< WriterInfoWriterInfo_rch
 
typedef RcHandle< RtpsReaderRtpsReader_rch
 
typedef std::pair< GUID_t, InterestingRemoteCallbackType
 

Private Member Functions

void on_data_available (RcHandle< InternalDataReader< NetworkInterfaceAddress > > reader)
 
AddrSet get_addresses_i (const GUID_t &local, const GUID_t &remote) const
 
AddrSet get_addresses_i (const GUID_t &local) const
 
virtual void stop_i ()
 
virtual TransportQueueElementcustomize_queue_element (TransportQueueElement *element)
 
virtual void release_reservations_i (const GUID_t &remote_id, const GUID_t &local_id)
 
RtpsUdpSendStrategy_rch send_strategy ()
 
RtpsUdpReceiveStrategy_rch receive_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, RemoteInfo, GUID_tKeyLessThan) RemoteInfoMap
 
void update_last_recv_addr (const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
 
ACE_Message_Blockalloc_msgblock (size_t size, ACE_Allocator *data_allocator)
 
ACE_Message_Blocksubmsgs_to_msgblock (const RTPS::SubmessageSeq &subm)
 
RcHandle< SingleSendBufferget_writer_send_buffer (const GUID_t &pub_id)
 
typedef OPENDDS_MAP (FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
 
typedef OPENDDS_MAP (SequenceNumber, RequestedFragMap) RequestedFragSeqMap
 
typedef OPENDDS_MAP_CMP (GUID_t, ReaderInfo_rch, GUID_tKeyLessThan) ReaderInfoMap
 
typedef OPENDDS_SET (ReaderInfo_rch) ReaderInfoSet
 
typedef OPENDDS_MAP (SequenceNumber, ReaderInfoSetHolder_rch) SNRIS
 
typedef OPENDDS_MAP_CMP (GUID_t, RtpsWriter_rch, GUID_tKeyLessThan) RtpsWriterMap
 
typedef OPENDDS_MAP_CMP (GUID_t, WriterInfo_rch, GUID_tKeyLessThan) WriterInfoMap
 
typedef OPENDDS_SET (WriterInfo_rch) WriterInfoSet
 
typedef OPENDDS_VECTOR (MetaSubmessageVec::iterator) MetaSubmessageIterVec
 
typedef OPENDDS_MAP_CMP (GUID_t, MetaSubmessageIterVec, GUID_tKeyLessThan) DestMetaSubmessageMap
 
typedef OPENDDS_MAP (AddressCacheEntryProxy, DestMetaSubmessageMap) AddrDestMetaSubmessageMap
 
typedef OPENDDS_VECTOR (MetaSubmessageIterVec) MetaSubmessageIterVecVec
 
typedef OPENDDS_SET (CORBA::Long) CountSet
 
typedef OPENDDS_MAP_CMP (EntityId_t, CountSet, EntityId_tKeyLessThan) IdCountSet
 
typedef OPENDDS_MAP (CORBA::Long, CountMapPair) CountMap
 
typedef OPENDDS_MAP_CMP (EntityId_t, CountMapping, EntityId_tKeyLessThan) IdCountMapping
 
void build_meta_submessage_map (MetaSubmessageVec &meta_submessages, AddrDestMetaSubmessageMap &addr_map)
 
void bundle_mapped_meta_submessages (const Encoding &encoding, AddrDestMetaSubmessageMap &addr_map, BundleVec &bundles, CountKeeper &counts)
 
void queue_submessages (MetaSubmessageVec &meta_submessages)
 
void update_required_acknack_count (const GUID_t &local_id, const GUID_t &remote_id, CORBA::Long current)
 
void bundle_and_send_submessages (MetaSubmessageVec &meta_submessages)
 
 OPENDDS_VECTOR (MetaSubmessageVec) fsq_vec_
 
void harvest_send_queue (const MonotonicTimePoint &now)
 
void flush_send_queue (const MonotonicTimePoint &now)
 
void flush_send_queue_i ()
 
typedef OPENDDS_MAP_CMP (GUID_t, RtpsReader_rch, GUID_tKeyLessThan) RtpsReaderMap
 
typedef OPENDDS_MULTIMAP_CMP (GUID_t, RtpsReader_rch, GUID_tKeyLessThan) RtpsReaderMultiMap
 
void durability_resend (TransportQueueElement *element, size_t &cumulative_send_count)
 
void durability_resend (TransportQueueElement *element, const RTPS::FragmentNumberSet &fragmentSet, size_t &cumulative_send_count)
 
template<typename T , typename FN >
void datawriter_dispatch (const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
 
template<typename T , typename FN >
void datareader_dispatch (const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
 
void send_heartbeats (const MonotonicTimePoint &now)
 
void check_heartbeats (const MonotonicTimePoint &now)
 
typedef OPENDDS_MULTIMAP_CMP (GUID_t, InterestingRemote, GUID_tKeyLessThan) InterestingRemoteMapType
 
TransportQueueElementcustomize_queue_element_non_reliable_i (TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send, ACE_Guard< ACE_Thread_Mutex > &guard)
 
void send_heartbeats_manual_i (const TransportSendControlElement *tsce, MetaSubmessageVec &meta_submessages)
 
typedef OPENDDS_MAP_CMP (EntityId_t, CORBA::Long, EntityId_tKeyLessThan) CountMapType
 
void accumulate_addresses (const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const
 

Static Private Member Functions

static void extend_bitmap_range (RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent, ACE_CDR::ULong &cumulative_bits_added)
 
static bool include_fragment (const TransportQueueElement &element, const DisjointSequence &fragments, SequenceNumber &lastFragment)
 

Private Attributes

ReactorTask_rch reactor_task_
 
RcHandle< JobQueuejob_queue_
 
EventDispatcher_rch event_dispatcher_
 
GuidPrefix_t local_prefix_
 
RemoteInfoMap locators_
 
LocatorCache locator_cache_
 
BundlingCache bundling_cache_
 
ACE_SOCK_Dgram unicast_socket_
 
ACE_SOCK_Dgram_Mcast multicast_socket_
 
MessageBlockAllocator mb_allocator_
 
DataBlockAllocator db_allocator_
 
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutexcustom_allocator_
 
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutexbundle_allocator_
 
unique_ptr< DataBlockLockPooldb_lock_pool_
 
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
 
RtpsWriterMap writers_
 
TransactionalRtpsSendQueue sq_
 
ACE_Thread_Mutex fsq_mutex_
 
size_t fsq_vec_size_
 
RcHandle< SporadicEventharvest_send_queue_sporadic_
 
RcHandle< SporadicEventflush_send_queue_sporadic_
 
RepoIdSet pending_reliable_readers_
 
RtpsReaderMap readers_
 
RtpsReaderMultiMap readers_of_writer_
 
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
 
ACE_Thread_Mutex readers_lock_
 
ACE_Thread_Mutex writers_lock_
 
ACE_Thread_Mutex locators_lock_
 
CORBA::Long best_effort_heartbeat_count_
 
RcHandle< PeriodicEventheartbeat_
 
RcHandle< PeriodicEventheartbeatchecker_
 
InterestingRemoteMapType interesting_readers_
 
InterestingRemoteMapType interesting_writers_
 
CountMapType heartbeat_counts_
 
const size_t max_bundle_size_
 
InternalTransportStatisticstransport_statistics_
 
ACE_Thread_Mutextransport_statistics_mutex_
 
ACE_Thread_Mutex security_mutex_
 
Security::SecurityConfig_rch security_config_
 
Security::HandleRegistry_rch handle_registry_
 
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
 
RcHandle< ICE::Agentice_agent_
 
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
 
MulticastManager multicast_manager_
 

Static Private Attributes

static bool force_inline_qos_ = false
 static member used by testing code to force inline qos More...
 

Friends

class ::DDS_TEST
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::DataLink
enum  ConnectionNotice { DISCONNECTED, RECONNECTED, LOST }
 
typedef WeakRcHandle< TransportClientTransportClient_wrch
 
typedef std::pair< TransportClient_wrch, GUID_tOnStartCallback
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Public Types inherited from OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >
typedef RcHandle< InternalDataReader< NetworkInterfaceAddress > > InternalDataReader_rch
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from OpenDDS::DCPS::DataLink
typedef ACE_Guard< LockTypeGuardType
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::DataLink
int start (const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
 
void send_start_i ()
 
virtual void send_i (TransportQueueElement *element, bool relink=true)
 
void send_stop_i (GUID_t repoId)
 
GUIDSeqpeer_ids (const GUID_t &local_id) const
 
void network_change () const
 
void replay_durable_data (const GUID_t &local_pub_id, const GUID_t &remote_sub_id) const
 
TransportSendStrategy_rch get_send_strategy ()
 
typedef OPENDDS_MAP_CMP (GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Static Protected Member Functions inherited from OpenDDS::DCPS::DataLink
static ACE_UINT64 get_next_datalink_id ()
 Used to provide unique Ids to all DataLink methods. More...
 
- Protected Attributes inherited from OpenDDS::DCPS::DataLink
TransportStrategy_rch receive_strategy_
 The transport receive strategy object for this DataLink. More...
 
TransportSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink. More...
 
LockType strategy_lock_
 
OnStartCallbackMap on_start_callbacks_
 
PendingOnStartsMap pending_on_starts_
 
TimeDuration datalink_release_delay_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 
unique_ptr< DataBlockAllocatordb_allocator_
 
bool is_loopback_
 Is remote attached to same transport ? More...
 
bool is_active_
 Is pub or sub ? More...
 
bool started_
 
SendResponseListener send_response_listener_
 Listener for TransportSendControlElements created in send_control. More...
 
Interceptor interceptor_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 90 of file RtpsUdpDataLink.h.

Member Typedef Documentation

◆ CallbackType

Definition at line 893 of file RtpsUdpDataLink.h.

◆ FragmentNumberValue

Definition at line 339 of file RtpsUdpDataLink.h.

◆ ReaderInfo_rch

Definition at line 387 of file RtpsUdpDataLink.h.

◆ ReaderInfoSetHolder_rch

Definition at line 397 of file RtpsUdpDataLink.h.

◆ RtpsReader_rch

Definition at line 681 of file RtpsUdpDataLink.h.

◆ RtpsWriter_rch

Definition at line 576 of file RtpsUdpDataLink.h.

◆ WriterInfo_rch

Definition at line 613 of file RtpsUdpDataLink.h.

Constructor & Destructor Documentation

◆ RtpsUdpDataLink()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsUdpDataLink ( const RtpsUdpTransport_rch transport,
const GuidPrefix_t local_prefix,
const RtpsUdpInst_rch config,
const ReactorTask_rch reactor_task,
InternalTransportStatistics transport_statistics,
ACE_Thread_Mutex transport_statistics_mutex 
)

Definition at line 68 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::assign(), OpenDDS::DCPS::ENTITYID_PARTICIPANT, handle_registry_, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::job_queue(), job_queue_, local_prefix(), local_prefix_, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::ref(), security_config_, OpenDDS::DCPS::DataLink::send_strategy_, and TheServiceParticipant.

74  : DataLink(transport, // 3 data link "attributes", below, are unused
75  0, // priority
76  false, // is_loopback
77  false) // is_active
78  , reactor_task_(reactor_task)
79  , job_queue_(make_rch<JobQueue>(reactor_task->get_reactor()))
80  , event_dispatcher_(transport->event_dispatcher())
81  , mb_allocator_(TheServiceParticipant->association_chunk_multiplier())
82  , db_allocator_(TheServiceParticipant->association_chunk_multiplier())
83  , custom_allocator_(TheServiceParticipant->association_chunk_multiplier() * config->anticipated_fragments_, RtpsSampleHeader::FRAG_SIZE)
84  , bundle_allocator_(TheServiceParticipant->association_chunk_multiplier(), config->max_message_size_)
85  , db_lock_pool_(new DataBlockLockPool(static_cast<unsigned long>(TheServiceParticipant->n_chunks())))
86  , multi_buff_(this, config->nak_depth_)
87  , fsq_vec_size_(0)
88  , harvest_send_queue_sporadic_(make_rch<SporadicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::harvest_send_queue)))
89  , flush_send_queue_sporadic_(make_rch<SporadicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::flush_send_queue)))
91  , heartbeat_(make_rch<PeriodicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::send_heartbeats)))
92  , heartbeatchecker_(make_rch<PeriodicEvent>(event_dispatcher_, make_rch<PmfNowEvent<RtpsUdpDataLink> >(rchandle_from(this), &RtpsUdpDataLink::check_heartbeats)))
93  , max_bundle_size_(config->max_message_size_ - RTPS::RTPSHDR_SZ) // default maximum bundled message size is max udp message size (see TransportStrategy) minus RTPS header
94  , transport_statistics_(transport_statistics)
95  , transport_statistics_mutex_(transport_statistics_mutex)
96 #ifdef OPENDDS_SECURITY
100 #endif
101  , network_interface_address_reader_(make_rch<InternalDataReader<NetworkInterfaceAddress> >(DCPS::DataReaderQosBuilder().reliability_reliable().durability_transient_local(), rchandle_from(this)))
102 {
103 #ifdef OPENDDS_SECURITY
104  const GUID_t guid = make_id(local_prefix, ENTITYID_PARTICIPANT);
105  handle_registry_ = security_config_->get_handle_registry(guid);
106 #endif
107 
108  send_strategy_ = make_rch<RtpsUdpSendStrategy>(this, local_prefix);
109  receive_strategy_ = make_rch<RtpsUdpReceiveStrategy>(this, local_prefix, ref(TheServiceParticipant->get_thread_status_manager()));
110  assign(local_prefix_, local_prefix);
111 
112  this->job_queue(job_queue_);
113 }
static const ACE_CDR::UShort FRAG_SIZE
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
void send_heartbeats(const MonotonicTimePoint &now)
const InstanceHandle_t HANDLE_NIL
const ACE_CDR::UShort RTPSHDR_SZ
Definition: MessageTypes.h:105
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > bundle_allocator_
RtpsUdpTransport_rch transport()
DataBlockAllocator db_allocator_
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
static SecurityRegistry * instance()
Return a singleton instance of this class.
RcHandle< SporadicEvent > harvest_send_queue_sporadic_
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
ACE_Thread_Mutex & transport_statistics_mutex_
RtpsUdpInst_rch config() const
const GuidPrefix_t & local_prefix() const
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
InternalTransportStatistics & transport_statistics_
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > custom_allocator_
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
RcHandle< JobQueue > job_queue_
Security::SecurityConfig_rch security_config_
Security::HandleRegistry_rch handle_registry_
void check_heartbeats(const MonotonicTimePoint &now)
MessageBlockAllocator mb_allocator_
RcHandle< ICE::Agent > ice_agent_
RcHandle< SporadicEvent > flush_send_queue_sporadic_
EventDispatcher_rch event_dispatcher_
static DCPS::RcHandle< Agent > instance()
Definition: Ice.cpp:122
void harvest_send_queue(const MonotonicTimePoint &now)
void flush_send_queue(const MonotonicTimePoint &now)
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
DataLink(const TransportImpl_rch &impl, Priority priority, bool is_loopback, bool is_active)
Only called by our TransportImpl object.
Definition: DataLink.cpp:42
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
#define TheServiceParticipant
RcHandle< PeriodicEvent > heartbeatchecker_
unique_ptr< DataBlockLockPool > db_lock_pool_
RcHandle< PeriodicEvent > heartbeat_

◆ ~RtpsUdpDataLink()

OpenDDS::DCPS::RtpsUdpDataLink::~RtpsUdpDataLink ( )

Definition at line 115 of file RtpsUdpDataLink.cpp.

References flush_send_queue_sporadic_, and harvest_send_queue_sporadic_.

116 {
118  flush_send_queue_sporadic_->cancel();
119 }
RcHandle< SporadicEvent > harvest_send_queue_sporadic_
RcHandle< SporadicEvent > flush_send_queue_sporadic_

Member Function Documentation

◆ accumulate_addresses()

void OpenDDS::DCPS::RtpsUdpDataLink::accumulate_addresses ( const GUID_t local,
const GUID_t remote,
AddrSet &  addresses,
bool  prefer_unicast = false 
) const
private

Definition at line 4678 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), get_ice_endpoint(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUID_t::guidPrefix, ice_agent_, interesting_readers_, interesting_writers_, OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), locator_cache_, locators_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::max_value, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_ASSERT, readers_lock_, and writers_lock_.

Referenced by build_meta_submessage_map(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i(), and get_addresses_i().

4680 {
4681  OPENDDS_ASSERT(local != GUID_UNKNOWN);
4682  OPENDDS_ASSERT(remote != GUID_UNKNOWN);
4683 
4684  const LocatorCacheKey key(remote, local, prefer_unicast);
4685  LocatorCache::ScopedAccess entry(locator_cache_, key);
4686  if (!entry.is_new_) {
4687  addresses.insert(entry.value().addrs_.begin(), entry.value().addrs_.end());
4688  return;
4689  }
4690 
4691  RtpsUdpInst_rch cfg = config();
4692  if (!cfg) {
4693  return;
4694  }
4695 
4696  if (cfg->rtps_relay_only() && std::memcmp(&local.guidPrefix, &remote.guidPrefix, sizeof(GuidPrefix_t)) != 0) {
4697  if (NetworkAddress() != cfg->rtps_relay_address()) {
4698  addresses.insert(cfg->rtps_relay_address());
4699  entry.value().addrs_.insert(cfg->rtps_relay_address());
4700  }
4701  return;
4702  }
4703 
4704  AddrSet normal_addrs;
4705  MonotonicTimePoint normal_addrs_expires = MonotonicTimePoint::max_value;
4706  NetworkAddress ice_addr;
4707  bool valid_last_recv_addr = false;
4708  static const NetworkAddress NO_ADDR;
4709 
4710  const RemoteInfoMap::const_iterator pos = locators_.find(remote);
4711  if (pos != locators_.end()) {
4712  if (prefer_unicast && pos->second.insert_recv_addr(normal_addrs)) {
4713  normal_addrs_expires = pos->second.last_recv_time_ + cfg->receive_address_duration_;
4714  valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= cfg->receive_address_duration_;
4715  } else if (prefer_unicast && !pos->second.unicast_addrs_.empty()) {
4716  normal_addrs = pos->second.unicast_addrs_;
4717  } else if (!pos->second.multicast_addrs_.empty()) {
4718 #ifdef ACE_HAS_IPV6
4719  if (pos->second.last_recv_addr_ != NO_ADDR) {
4720  const AddrSet& mc_addrs = pos->second.multicast_addrs_;
4721  for (AddrSet::const_iterator it = mc_addrs.begin(); it != mc_addrs.end(); ++it) {
4722  if (it->get_type() == pos->second.last_recv_addr_.get_type()) {
4723  normal_addrs.insert(*it);
4724  }
4725  }
4726  } else {
4727  normal_addrs = pos->second.multicast_addrs_;
4728  }
4729 #else
4730  normal_addrs = pos->second.multicast_addrs_;
4731 #endif
4732  } else if (pos->second.insert_recv_addr(normal_addrs)) {
4733  normal_addrs_expires = pos->second.last_recv_time_ + cfg->receive_address_duration_;
4734  valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= cfg->receive_address_duration_;
4735  } else {
4736  normal_addrs = pos->second.unicast_addrs_;
4737  }
4738  } else {
4739  const GuidConverter conv(remote);
4740  if (conv.isReader()) {
4742  InterestingRemoteMapType::const_iterator ipos = interesting_readers_.find(remote);
4743  if (ipos != interesting_readers_.end()) {
4744  normal_addrs = ipos->second.addresses;
4745  }
4746  } else if (conv.isWriter()) {
4748  InterestingRemoteMapType::const_iterator ipos = interesting_writers_.find(remote);
4749  if (ipos != interesting_writers_.end()) {
4750  normal_addrs = ipos->second.addresses;
4751  }
4752  }
4753  }
4754 
4755 #ifdef OPENDDS_SECURITY
4756  WeakRcHandle<ICE::Endpoint> endpoint = get_ice_endpoint();
4757  if (endpoint) {
4758  ice_addr = ice_agent_->get_address(endpoint, local, remote);
4759  }
4760 #endif
4761 
4762  if (ice_addr == NO_ADDR) {
4763  addresses.insert(normal_addrs.begin(), normal_addrs.end());
4764  entry.value().addrs_.insert(normal_addrs.begin(), normal_addrs.end());
4765  entry.value().expires_ = normal_addrs_expires;
4766  const NetworkAddress relay_addr = cfg->rtps_relay_address();
4767  if (!valid_last_recv_addr && relay_addr != NO_ADDR) {
4768  addresses.insert(relay_addr);
4769  entry.value().addrs_.insert(relay_addr);
4770  }
4771  return;
4772  }
4773 
4774  if (normal_addrs.count(ice_addr) == 0) {
4775  addresses.insert(ice_addr);
4776  entry.value().addrs_.insert(ice_addr);
4777  return;
4778  }
4779 
4780  addresses.insert(normal_addrs.begin(), normal_addrs.end());
4781  entry.value().addrs_.insert(normal_addrs.begin(), normal_addrs.end());
4782  entry.value().expires_ = normal_addrs_expires;
4783 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
sequence< octet > key
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint() const
RtpsUdpInst_rch config() const
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
InterestingRemoteMapType interesting_readers_
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
static const TimePoint_T< MonotonicClock > max_value
Definition: TimePoint_T.h:41
RcHandle< ICE::Agent > ice_agent_
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
InterestingRemoteMapType interesting_writers_

◆ add_delayed_notification()

bool OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification ( TransportQueueElement element)

Definition at line 128 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::TransportQueueElement::publication_id(), writers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification().

129 {
131  RtpsWriter_rch writer;
132  RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
133  if (iter != writers_.end()) {
134  writer = iter->second;
135  }
136 
137  g.release();
138 
139  if (writer) {
140  writer->add_elem_awaiting_ack(element);
141  return true;
142  }
143  return false;
144 }
RcHandle< RtpsWriter > RtpsWriter_rch
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ alloc_msgblock()

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpDataLink::alloc_msgblock ( size_t  size,
ACE_Allocator data_allocator 
)
private

Definition at line 1036 of file RtpsUdpDataLink.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_NEW_MALLOC_RETURN, db_allocator_, db_lock_pool_, DataBlockLockPool::get_lock(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, and ACE_Time_Value::zero.

Referenced by bundle_and_send_submessages(), and submsgs_to_msgblock().

1036  {
1037  ACE_Message_Block* result;
1038  ACE_NEW_MALLOC_RETURN(result,
1039  static_cast<ACE_Message_Block*>(
1041  ACE_Message_Block(size,
1043  0, // cont
1044  0, // data
1045  data_allocator,
1046  db_lock_pool_->get_lock(), // locking_strategy
1050  &db_allocator_,
1051  &mb_allocator_),
1052  0);
1053  return result;
1054 }
static const ACE_Time_Value max_time
DataBlockAllocator db_allocator_
DataBlockLock * get_lock()
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
MessageBlockAllocator mb_allocator_
static const ACE_Time_Value zero
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
unique_ptr< DataBlockLockPool > db_lock_pool_

◆ associated()

bool OpenDDS::DCPS::RtpsUdpDataLink::associated ( const GUID_t local,
const GUID_t remote,
bool  local_reliable,
bool  remote_reliable,
bool  local_durable,
bool  remote_durable,
const MonotonicTime_t participant_discovered_at,
ACE_CDR::ULong  participant_flags,
SequenceNumber  max_sn,
const TransportClient_rch client,
AddrSet &  unicast_addresses,
AddrSet &  multicast_addresses,
const NetworkAddress last_addr_hint,
bool  requires_inline_qos 
)

Definition at line 559 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::DataLink::add_on_start_callback(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::add_writer(), OpenDDS::DCPS::TransportSendBuffer::capacity(), OpenDDS::DCPS::GUID_t::entityId, heartbeat_counts_, OpenDDS::DCPS::TransactionalRtpsSendQueue::ignore(), OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), OpenDDS::DCPS::log_progress(), OpenDDS::DCPS::TransportDebug::log_progress, multi_buff_, pending_reliable_readers_, readers_, readers_lock_, readers_of_writer_, receive_strategy(), sq_, OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::DCPS::transport_debug, update_last_recv_addr(), update_locators(), writer_to_seq_best_effort_readers_, writers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::use_datalink().

570 {
571  sq_.ignore(local_id, remote_id);
572 
573  update_locators(remote_id, unicast_addresses, multicast_addresses, requires_inline_qos, true);
574  if (last_addr_hint) {
575  update_last_recv_addr(remote_id, last_addr_hint);
576  }
577 
578  const GuidConverter conv(local_id);
579 
580  if (!local_reliable) {
581  if (conv.isReader()) {
583  WriterToSeqReadersMap::iterator i = writer_to_seq_best_effort_readers_.find(remote_id);
584  if (i == writer_to_seq_best_effort_readers_.end()) {
585  writer_to_seq_best_effort_readers_.insert(WriterToSeqReadersMap::value_type(remote_id, SeqReaders(local_id)));
586  } else if (i->second.readers.find(local_id) == i->second.readers.end()) {
587  i->second.readers.insert(local_id);
588  }
589  }
590  return true;
591  }
592 
593  bool associated = true;
594 
595  if (conv.isWriter()) {
597  log_progress("RTPS writer/reader association", local_id, remote_id, participant_discovered_at);
598  }
599 
600  if (remote_reliable) {
602  // Insert count if not already there.
603  RtpsWriterMap::iterator rw = writers_.find(local_id);
604  if (rw == writers_.end()) {
605  RtpsUdpDataLink_rch link(this, inc_count());
606  CORBA::Long hb_start = 0;
607  CountMapType::iterator hbc_it = heartbeat_counts_.find(local_id.entityId);
608  if (hbc_it != heartbeat_counts_.end()) {
609  hb_start = hbc_it->second;
610  heartbeat_counts_.erase(hbc_it);
611  }
612  RtpsWriter_rch writer = make_rch<RtpsWriter>(client, link, local_id, local_durable,
613  max_sn, hb_start, multi_buff_.capacity());
614  rw = writers_.insert(RtpsWriterMap::value_type(local_id, writer)).first;
615  }
616  RtpsWriter_rch writer = rw->second;
617  g.release();
618  const SequenceNumber writer_max_sn = writer->update_max_sn(remote_id, max_sn);
619  writer->add_reader(make_rch<ReaderInfo>(remote_id, remote_durable, participant_discovered_at, participant_flags, writer_max_sn + 1));
620  }
621  invoke_on_start_callbacks(local_id, remote_id, true);
622  } else if (conv.isReader()) {
624  log_progress("RTPS reader/writer association", local_id, remote_id, participant_discovered_at);
625  }
626  {
627  GuardType guard(strategy_lock_);
628  if (receive_strategy()) {
629  receive_strategy()->clear_completed_fragments(remote_id);
630  }
631  }
632  if (remote_reliable) {
634  RtpsReaderMap::iterator rr = readers_.find(local_id);
635  if (rr == readers_.end()) {
636  pending_reliable_readers_.erase(local_id);
637  RtpsUdpDataLink_rch link(this, inc_count());
638  RtpsReader_rch reader = make_rch<RtpsReader>(link, local_id);
639  rr = readers_.insert(RtpsReaderMap::value_type(local_id, reader)).first;
640  }
641  RtpsReader_rch reader = rr->second;
642  readers_of_writer_.insert(RtpsReaderMultiMap::value_type(remote_id, rr->second));
643  g.release();
644  add_on_start_callback(client, remote_id);
645  reader->add_writer(make_rch<WriterInfo>(remote_id, participant_discovered_at, participant_flags));
646  associated = false;
647  } else {
648  invoke_on_start_callbacks(local_id, remote_id, true);
649  }
650  }
651 
652  return associated;
653 }
ACE_CDR::Long Long
bool requires_inline_qos(const GUIDSeq_var &peers)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
RtpsUdpReceiveStrategy_rch receive_strategy()
RtpsReaderMultiMap readers_of_writer_
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
RcHandle< RtpsWriter > RtpsWriter_rch
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void ignore(const GUID_t &local, const GUID_t &remote)
Mark all queued submessage with the given source and destination as ignored.
TransactionalRtpsSendQueue sq_
RcHandle< RtpsReader > RtpsReader_rch
bool add_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
Definition: DataLink.cpp:111
bool associated(const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
void update_locators(const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
bool log_progress
Log progress for RTPS entity discovery and association.
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194

◆ build_meta_submessage_map()

void OpenDDS::DCPS::RtpsUdpDataLink::build_meta_submessage_map ( MetaSubmessageVec &  meta_submessages,
AddrDestMetaSubmessageMap &  addr_map 
)
private

Definition at line 2410 of file RtpsUdpDataLink.cpp.

References accumulate_addresses(), ACE_GUARD, bundling_cache_, get_addresses_i(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUIDPREFIX_UNKNOWN, DDS::HANDLE_NIL, LM_DEBUG, local_crypto_handle(), locators_lock_, OpenDDS::DCPS::make_unknown_guid(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), separate_message(), and VDBG.

Referenced by bundle_and_send_submessages().

2411 {
2412  size_t cache_hits = 0;
2413  size_t cache_misses = 0;
2414  size_t addrset_min_size = std::numeric_limits<size_t>::max();
2415  size_t addrset_max_size = 0;
2416 
2417  BundlingCache::ScopedAccess global_access(bundling_cache_);
2419 
2420  // Sort meta_submessages by address set and destination
2421  for (MetaSubmessageVec::iterator it = meta_submessages.begin(), limit = meta_submessages.end(); it != limit; ++it) {
2422  if (it->ignore_) {
2423  continue;
2424  }
2425 
2426  const BundlingCacheKey key(it->src_guid_, it->dst_guid_);
2427  BundlingCache::ScopedAccess entry(bundling_cache_, key, false, now);
2428  if (entry.is_new_) {
2429 
2430  AddrSet& addrs = entry.value().addrs_;
2432 
2433  const bool directed = it->dst_guid_ != GUID_UNKNOWN;
2434  if (directed) {
2435  accumulate_addresses(it->src_guid_, it->dst_guid_, addrs, true);
2436  } else {
2437  addrs = get_addresses_i(it->src_guid_);
2438  }
2439 #ifdef OPENDDS_SECURITY
2440  if (local_crypto_handle() != DDS::HANDLE_NIL && separate_message(it->src_guid_.entityId)) {
2441  addrs.insert(BUNDLING_PLACEHOLDER); // removed in bundle_mapped_meta_submessages
2442  }
2443 #endif
2444 #if defined ACE_HAS_CPP11
2445  entry.recalculate_hash();
2446 #endif
2447  ++cache_misses;
2448  } else {
2449  ++cache_hits;
2450  }
2451 
2452  const BundlingCache::ScopedAccess& const_entry = entry;
2453  const AddrSet& addrs = const_entry.value().addrs_;
2454  addrset_min_size = std::min(addrset_min_size, static_cast<size_t>(addrs.size()));
2455  addrset_max_size = std::max(addrset_max_size, static_cast<size_t>(addrs.size()));
2456  if (addrs.empty()) {
2457  continue;
2458 #ifdef OPENDDS_SECURITY
2459  } else if (addrs.size() == 1 && *addrs.begin() == BUNDLING_PLACEHOLDER) {
2460  continue;
2461 #endif
2462  }
2463 
2464  DestMetaSubmessageMap& dest_map = addr_map[AddressCacheEntryProxy(const_entry.rch_)];
2465  if (std::memcmp(&(it->dst_guid_.guidPrefix), &GUIDPREFIX_UNKNOWN, sizeof(GuidPrefix_t)) != 0) {
2466  MetaSubmessageIterVec& vec = dest_map[make_unknown_guid(it->dst_guid_.guidPrefix)];
2467  vec.reserve(meta_submessages.size());
2468  vec.push_back(it);
2469  } else {
2470  MetaSubmessageIterVec& vec = dest_map[GUID_UNKNOWN];
2471  vec.reserve(meta_submessages.size());
2472  vec.push_back(it);
2473  }
2474  }
2475 
2476  VDBG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::build_meta_submessage_map()"
2477  "- Bundling Cache Stats: hits = %B, misses = %B, min = %B, max = %B\n",
2478  cache_hits, cache_misses, addrset_min_size, addrset_max_size));
2479 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const InstanceHandle_t HANDLE_NIL
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
sequence< octet > key
void accumulate_addresses(const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const
OpenDDS_Dcps_Export GUID_t make_unknown_guid(const GuidPrefix_t &prefix)
Definition: GuidUtils.h:228
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define VDBG(DBG_ARGS)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
AddrSet get_addresses_i(const GUID_t &local, const GUID_t &remote) const
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
static bool separate_message(EntityId_t entity)

◆ bundle_and_send_submessages()

void OpenDDS::DCPS::RtpsUdpDataLink::bundle_and_send_submessages ( MetaSubmessageVec &  meta_submessages)
private

Definition at line 2758 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, alloc_msgblock(), OpenDDS::RTPS::append_submessage(), OpenDDS::DCPS::assign(), build_meta_submessage_map(), bundle_allocator_, bundle_mapped_meta_submessages(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::RTPS::AckNackSubmessage::count, OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::MetaSubmessage::dst_guid_, OpenDDS::STUN::encoding(), OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::RTPS::HEARTBEAT, OpenDDS::DCPS::RtpsUdpDataLink::CountKeeper::heartbeat_counts_, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::RtpsUdpDataLink::CountMapPair::is_new_assigned_, OpenDDS::DCPS::Encoding::KIND_XCDR1, OpenDDS::RTPS::HeartBeatSubmessage::lastSN, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_messages, OpenDDS::DCPS::TransportDebug::log_nonfinal_messages, OpenDDS::DCPS::make_unknown_guid(), OpenDDS::DCPS::RtpsUdpDataLink::CountMapping::map_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, OpenDDS::DCPS::RtpsUdpDataLink::CountKeeper::nackfrag_counts_, OpenDDS::DCPS::RtpsUdpDataLink::CountMapPair::new_, OpenDDS::DCPS::RtpsUdpDataLink::CountMapping::next_directed_unassigned_, OpenDDS::DCPS::RtpsUdpDataLink::CountMapping::next_undirected_unassigned_, OPENDDS_ASSERT, OpenDDS::RTPS::OPENDDS_FLAG_R, OpenDDS::DCPS::push_back(), OpenDDS::RTPS::NackFragSubmessage::readerId, OpenDDS::RTPS::AckNackSubmessage::readerSNState, send_strategy(), size_, OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::AckNackSubmessage::smHeader, OpenDDS::RTPS::HeartBeatSubmessage::smHeader, OpenDDS::DCPS::MetaSubmessage::src_guid_, OpenDDS::RTPS::Message::submessages, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::RtpsUdpDataLink::CountMapPair::undirected_, update_required_acknack_count(), OpenDDS::RTPS::Count_t::value, OpenDDS::RTPS::HeartBeatSubmessage::writerId, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

Referenced by flush_send_queue_i().

2759 {
2760  using namespace RTPS;
2761 
2762  // Sort meta_submessages based on both locator IPs and INFO_DST GUID destination/s
2763  AddrDestMetaSubmessageMap addr_map;
2764  build_meta_submessage_map(meta_submessages, addr_map);
2765 
2766  const Encoding encoding(Encoding::KIND_XCDR1);
2767 
2768  // Build reasonably-sized submessage bundles based on our destination map
2769  BundleVec bundles;
2770  bundles.reserve(meta_submessages.size());
2771 
2772  CountKeeper counts;
2773  bundle_mapped_meta_submessages(encoding, addr_map, bundles, counts);
2774 
2775  // Reusable INFO_DST
2776  InfoDestinationSubmessage idst = {
2778  {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
2779  };
2780 
2781  for (IdCountMapping::iterator it = counts.heartbeat_counts_.begin(), limit = counts.heartbeat_counts_.end(); it != limit; ++it) {
2782  it->second.next_directed_unassigned_ = it->second.map_.begin();
2783  it->second.next_undirected_unassigned_ = it->second.map_.begin();
2784  for (CountMap::iterator it2 = it->second.map_.begin(), limit2 = it->second.map_.end(); it2 != limit2; ++it2) {
2785  if (it2->second.undirected_) {
2786  ++(it->second.next_directed_unassigned_);
2787  }
2788  }
2789  }
2790 
2791  // Allocate buffers, seralize, and send bundles
2792  GUID_t prev_dst; // used to determine when we need to write a new info_dst
2793  for (size_t i = 0; i < bundles.size(); ++i) {
2794  RTPS::Message rtps_message;
2795  prev_dst = GUID_UNKNOWN;
2796  Message_Block_Ptr mb_bundle(alloc_msgblock(bundles[i].size_, &bundle_allocator_));
2797  Serializer ser(mb_bundle.get(), encoding);
2798  const MetaSubmessageIterVec& bundle_vec = bundles[i].submessages_;
2799  for (MetaSubmessageIterVec::const_iterator it = bundle_vec.begin(), limit = bundle_vec.end(); it != limit; ++it) {
2800  MetaSubmessage& res = **it;
2801  const GUID_t dst = make_unknown_guid(res.dst_guid_);
2802  if (dst != prev_dst) {
2803  assign(idst.guidPrefix, dst.guidPrefix);
2804  ser << idst;
2806  append_submessage(rtps_message, idst);
2807  }
2808  }
2809  switch (res.sm_._d()) {
2810  case HEARTBEAT: {
2811  CountMapping& mapping = counts.heartbeat_counts_[res.sm_.heartbeat_sm().writerId];
2812  CountMapPair& map_pair = mapping.map_[res.sm_.heartbeat_sm().count.value];
2813  if (!map_pair.is_new_assigned_) {
2814  if (map_pair.undirected_) {
2815  OPENDDS_ASSERT(mapping.next_undirected_unassigned_ != mapping.map_.end());
2816  map_pair.new_ = mapping.next_undirected_unassigned_->first;
2817  ++mapping.next_undirected_unassigned_;
2818  } else {
2819  OPENDDS_ASSERT(mapping.next_directed_unassigned_ != mapping.map_.end());
2820  map_pair.new_ = mapping.next_directed_unassigned_->first;
2821  ++mapping.next_directed_unassigned_;
2822  if (res.sm_.heartbeat_sm().smHeader.flags & RTPS::OPENDDS_FLAG_R) {
2823  if (res.sm_.heartbeat_sm().count.value != map_pair.new_) {
2824  update_required_acknack_count(res.src_guid_, res.dst_guid_, map_pair.new_);
2825  }
2826  res.sm_.heartbeat_sm().smHeader.flags &= ~RTPS::OPENDDS_FLAG_R;
2827  }
2828  }
2829  map_pair.is_new_assigned_ = true;
2830  }
2831  res.sm_.heartbeat_sm().count.value = map_pair.new_;
2832  const HeartBeatSubmessage& heartbeat = res.sm_.heartbeat_sm();
2834  const SequenceNumber hb_first = to_opendds_seqnum(heartbeat.firstSN);
2835  const SequenceNumber hb_last = to_opendds_seqnum(heartbeat.lastSN);
2836  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: HEARTBEAT: %C -> %C first %q last %q count %d\n",
2837  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
2838  }
2839  break;
2840  }
2841  case ACKNACK: {
2842  const AckNackSubmessage& acknack = res.sm_.acknack_sm();
2844  const SequenceNumber ack = to_opendds_seqnum(acknack.readerSNState.bitmapBase);
2845  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: ACKNACK: %C -> %C base %q bits %u count %d\n",
2846  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
2847  }
2848  break;
2849  }
2850  case NACK_FRAG: {
2851  CountSet& set = counts.nackfrag_counts_[res.sm_.nack_frag_sm().readerId];
2852  OPENDDS_ASSERT(!set.empty());
2853  res.sm_.nack_frag_sm().count.value = *set.begin();
2854  set.erase(set.begin());
2855  const NackFragSubmessage& nackfrag = res.sm_.nack_frag_sm();
2856  // All NackFrag messages are technically 'non-final' since they are only used to negatively acknowledge fragments and expect a response
2858  const SequenceNumber seq = to_opendds_seqnum(nackfrag.writerSN);
2859  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: NACKFRAG: %C -> %C seq %q base %u bits %u\n",
2860  LogGuid(res.src_guid_).c_str(), LogGuid(res.dst_guid_).c_str(), seq.getValue(), nackfrag.fragmentNumberState.bitmapBase.value, nackfrag.fragmentNumberState.numBits));
2861  }
2862  break;
2863  }
2864  default: {
2865  break;
2866  }
2867  }
2868  ser << res.sm_;
2870  push_back(rtps_message.submessages, res.sm_);
2871  }
2872  prev_dst = dst;
2873  }
2875  if (ss) {
2876  ss->send_rtps_control(rtps_message, *(mb_bundle.get()), bundles[i].proxy_.addrs());
2877  }
2878  }
2879 }
SequenceNumberSet readerSNState
Definition: RtpsCore.idl:567
#define ACE_DEBUG(X)
const octet FLAG_E
Definition: RtpsCore.idl:521
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > bundle_allocator_
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
const ACE_CDR::UShort INFO_DST_SZ
Definition: MessageTypes.h:108
OpenDDS_Dcps_Export GUID_t make_unknown_guid(const GuidPrefix_t &prefix)
Definition: GuidUtils.h:228
void update_required_acknack_count(const GUID_t &local_id, const GUID_t &remote_id, CORBA::Long current)
RcHandle< RtpsUdpSendStrategy > RtpsUdpSendStrategy_rch
const octet FLAG_F
Definition: RtpsCore.idl:523
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
FragmentNumberSet fragmentNumberState
Definition: RtpsCore.idl:598
size_t size_
void build_meta_submessage_map(MetaSubmessageVec &meta_submessages, AddrDestMetaSubmessageMap &addr_map)
void append_submessage(RTPS::Message &message, const RTPS::InfoDestinationSubmessage &submessage)
Definition: MessageUtils.h:147
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
ACE_Message_Block * alloc_msgblock(size_t size, ACE_Allocator *data_allocator)
bool log_messages
Log all RTPS messages sent or recieved.
const octet OPENDDS_FLAG_R
Definition: RtpsCore.idl:529
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
RtpsUdpSendStrategy_rch send_strategy()
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
void bundle_mapped_meta_submessages(const Encoding &encoding, AddrDestMetaSubmessageMap &addr_map, BundleVec &bundles, CountKeeper &counts)

◆ bundle_mapped_meta_submessages()

void OpenDDS::DCPS::RtpsUdpDataLink::bundle_mapped_meta_submessages ( const Encoding encoding,
AddrDestMetaSubmessageMap &  addr_map,
BundleVec &  bundles,
CountKeeper counts 
)
private

Definition at line 2562 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, config(), OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::MetaSubmessage::dst_guid_, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::RTPS::Submessage::gap_sm, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::HEARTBEAT, OpenDDS::DCPS::RtpsUdpDataLink::CountKeeper::heartbeat_counts_, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, max_bundle_size_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, OpenDDS::DCPS::RtpsUdpDataLink::CountKeeper::nackfrag_counts_, OPENDDS_ASSERT, OpenDDS::RTPS::NackFragSubmessage::readerId, OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::DCPS::RtpsUdpDataLink::CountMapPair::undirected_, OpenDDS::RTPS::Count_t::value, and OpenDDS::RTPS::HeartBeatSubmessage::writerId.

Referenced by bundle_and_send_submessages().

2566 {
2567  using namespace RTPS;
2568 
2569  // Reusable INFO_DST
2570  InfoDestinationSubmessage idst = {
2572  {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
2573  };
2574 
2575  RtpsUdpInst_rch cfg = config();
2576 
2577  const bool new_bundle_per_dest_guid = cfg && cfg->rtps_relay_only();
2578 
2579  BundleHelper helper(encoding, max_bundle_size_, bundles);
2580  GUID_t prev_dst; // used to determine when we need to write a new info_dst
2581  for (AddrDestMetaSubmessageMap::iterator addr_it = addr_map.begin(), limit = addr_map.end(); addr_it != limit; ++addr_it) {
2582 
2583  // A new address set always starts a new bundle
2584  bundles.push_back(Bundle(addr_it->first));
2585 
2586  prev_dst = GUID_UNKNOWN;
2587 
2588  for (DestMetaSubmessageMap::iterator dest_it = addr_it->second.begin(), limit2 = addr_it->second.end(); dest_it != limit2; ++dest_it) {
2589 
2590  if (dest_it->second.empty()) {
2591  continue;
2592  }
2593 
2594  // Check to see if we're sending separate messages per destination guid
2595  if (new_bundle_per_dest_guid && bundles.back().submessages_.size()) {
2596  helper.end_bundle();
2597  bundles.push_back(Bundle(addr_it->first));
2598  prev_dst = GUID_UNKNOWN;
2599  }
2600 
2601  for (MetaSubmessageIterVec::iterator resp_it = dest_it->second.begin(), limit3 = dest_it->second.end(); resp_it != limit3; ++resp_it) {
2602 
2603  // Check before every meta_submessage to see if we need to prefix a INFO_DST
2604  if (dest_it->first != prev_dst) {
2605  // If adding an INFO_DST prefix bumped us over the limit, push the
2606  // size difference into the next bundle, reset prev_dst, and keep going
2607  if (!helper.add_to_bundle(idst)) {
2608  bundles.push_back(Bundle(addr_it->first));
2609  }
2610  }
2611 
2612  // Attempt to add the submessage meta_submessage to the bundle
2613  bool result = false, unique = false;
2614  ACE_UNUSED_ARG(unique);
2615  MetaSubmessage& res = **resp_it;
2616  switch (res.sm_._d()) {
2617  case HEARTBEAT: {
2618  const EntityId_t id = res.sm_.heartbeat_sm().writerId;
2619  result = helper.add_to_bundle(res.sm_.heartbeat_sm());
2620  CountMapPair& map_pair = counts.heartbeat_counts_[id].map_[res.sm_.heartbeat_sm().count.value];
2621  if (res.dst_guid_ == GUID_UNKNOWN) {
2622  map_pair.undirected_ = true;
2623  }
2624  break;
2625  }
2626  case ACKNACK: {
2627  result = helper.add_to_bundle(res.sm_.acknack_sm());
2628  break;
2629  }
2630  case GAP: {
2631  result = helper.add_to_bundle(res.sm_.gap_sm());
2632  break;
2633  }
2634  case NACK_FRAG: {
2635  const EntityId_t id = res.sm_.nack_frag_sm().readerId;
2636  unique = counts.nackfrag_counts_[id].insert(res.sm_.nack_frag_sm().count.value).second;
2637  OPENDDS_ASSERT(unique);
2638  result = helper.add_to_bundle(res.sm_.nack_frag_sm());
2639  break;
2640  }
2641  default: {
2642  break;
2643  }
2644  }
2645  prev_dst = dest_it->first;
2646 
2647  // If adding the submessage bumped us over the limit, push the size
2648  // difference into the next bundle, reset prev_dst, and keep going
2649  if (!result) {
2650  bundles.push_back(Bundle(addr_it->first));
2651  prev_dst = GUID_UNKNOWN;
2652  }
2653  bundles.back().submessages_.push_back(*resp_it);
2654  }
2655  }
2656  helper.end_bundle();
2657  }
2658 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
const octet FLAG_E
Definition: RtpsCore.idl:521
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
const ACE_CDR::UShort INFO_DST_SZ
Definition: MessageTypes.h:108
RtpsUdpInst_rch config() const
DataLinkIdType id() const
Obtain a unique identifier for this DataLink object.
Definition: DataLink.inl:205
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)

◆ check_heartbeats()

void OpenDDS::DCPS::RtpsUdpDataLink::check_heartbeats ( const MonotonicTimePoint now)
private

Definition at line 4327 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, interesting_writers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, OPENDDS_VECTOR(), readers_lock_, and OpenDDS::DCPS::DiscoveryListener::writer_does_not_exist().

4328 {
4329  OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks;
4330 
4331  RtpsUdpInst_rch cfg = config();
4332 
4333  // Have any interesting writers timed out?
4334  const MonotonicTimePoint tv(now - 10 * (cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC)));
4335  {
4337 
4338  for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
4339  pos != limit;
4340  ++pos) {
4341  if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
4342  CallbackType callback(pos->first, pos->second);
4343  writerDoesNotExistCallbacks.push_back(callback);
4344  pos->second.status = InterestingRemote::DOES_NOT_EXIST;
4345  }
4346  }
4347  }
4348 
4349  OPENDDS_VECTOR(CallbackType)::iterator iter;
4350  for (iter = writerDoesNotExistCallbacks.begin(); iter != writerDoesNotExistCallbacks.end(); ++iter) {
4351  const GUID_t& rid = iter->first;
4352  const InterestingRemote& remote = iter->second;
4353  remote.listener->writer_does_not_exist(rid, remote.localid);
4354  }
4355 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_GUARD(MUTEX, OBJ, LOCK)
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
RtpsUdpInst_rch config() const
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
std::pair< GUID_t, InterestingRemote > CallbackType
InterestingRemoteMapType interesting_writers_

◆ client_stop()

void OpenDDS::DCPS::RtpsUdpDataLink::client_stop ( const GUID_t localId)

Definition at line 753 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::TransactionalRtpsSendQueue::ignore_local(), OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::pre_stop_helper(), readers_, readers_lock_, readers_of_writer_, OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs(), OpenDDS::DCPS::DataLink::send_strategy_, sq_, OpenDDS::DCPS::DataLink::strategy_lock_, writer_to_seq_best_effort_readers_, writers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::client_stop().

754 {
755  const GuidConverter conv(localId);
756 
757  if (conv.isReader()) {
759  RtpsReaderMap::iterator rr = readers_.find(localId);
760  if (rr != readers_.end()) {
761  for (RtpsReaderMultiMap::iterator iter = readers_of_writer_.begin();
762  iter != readers_of_writer_.end();) {
763  if (iter->second->id() == localId) {
764  readers_of_writer_.erase(iter++);
765  } else {
766  ++iter;
767  }
768  }
769 
770  RtpsReader_rch reader = rr->second;
771  readers_.erase(rr);
772  gr.release();
773 
774  reader->pre_stop_helper();
775 
776  } else {
777  for (WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.begin();
779  RepoIdSet::iterator r = w->second.readers.find(localId);
780  if (r != w->second.readers.end()) {
781  w->second.readers.erase(r);
782  if (w->second.readers.empty()) {
784  continue;
785  }
786  }
787  ++w;
788  }
789  }
790 
791  } else {
792  RtpsWriter_rch writer;
793  {
794  // Don't hold the writers lock when destroying a writer.
796  RtpsWriterMap::iterator pos = writers_.find(localId);
797  if (pos != writers_.end()) {
798  writer = pos->second;
799  writers_.erase(pos);
800  }
801  }
802 
803  if (writer) {
804  TqeVector to_drop;
805  writer->pre_stop_helper(to_drop, true);
806 
807  TqeVector::iterator drop_it = to_drop.begin();
808  while (drop_it != to_drop.end()) {
809  (*drop_it)->data_dropped(true);
810  ++drop_it;
811  }
812  writer->remove_all_msgs();
813  } else {
814  GuardType guard(strategy_lock_);
815  if (send_strategy_) {
817  }
818  }
819  }
820  sq_.ignore_local(localId);
821 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RtpsReaderMultiMap readers_of_writer_
RcHandle< RtpsWriter > RtpsWriter_rch
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
TransactionalRtpsSendQueue sq_
RcHandle< RtpsReader > RtpsReader_rch
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
void ignore_local(const GUID_t &id)
Mark all queued submessage with the given source (src_guid_) as ignored.

◆ config()

RtpsUdpInst_rch OpenDDS::DCPS::RtpsUdpDataLink::config ( ) const

◆ customize_queue_element()

TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element ( TransportQueueElement element)
privatevirtual

Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 1306 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, customize_queue_element_non_reliable_i(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::DataLink::peer_ids(), OpenDDS::DCPS::TransportQueueElement::publication_id(), queue_submessages(), requires_inline_qos(), writers_, and writers_lock_.

1307 {
1308  const ACE_Message_Block* msg = element->msg();
1309  if (!msg) {
1310  return element;
1311  }
1312 
1313  const GUID_t pub_id = element->publication_id();
1314  GUIDSeq_var peers = peer_ids(pub_id);
1315 
1316  bool require_iq = requires_inline_qos(peers);
1317 
1319 
1320  const RtpsWriterMap::iterator rw = writers_.find(pub_id);
1321  MetaSubmessageVec meta_submessages;
1322  RtpsWriter_rch writer;
1323  TransportQueueElement* result;
1324  bool deliver_after_send = false;
1325  if (rw != writers_.end()) {
1326  writer = rw->second;
1327  guard.release();
1328  result = writer->customize_queue_element_helper(element, require_iq, meta_submessages, deliver_after_send);
1329  } else {
1330  guard.release();
1331  result = customize_queue_element_non_reliable_i(element, require_iq, meta_submessages, deliver_after_send, guard);
1332  }
1333 
1334  queue_submessages(meta_submessages);
1335 
1336  if (deliver_after_send) {
1337  element->data_delivered();
1338  }
1339 
1340  return result;
1341 }
bool requires_inline_qos(const GUIDSeq_var &peers)
void queue_submessages(MetaSubmessageVec &meta_submessages)
RcHandle< RtpsWriter > RtpsWriter_rch
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
GUIDSeq * peer_ids(const GUID_t &local_id) const
Definition: DataLink.cpp:490
TransportQueueElement * customize_queue_element_non_reliable_i(TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send, ACE_Guard< ACE_Thread_Mutex > &guard)

◆ customize_queue_element_non_reliable_i()

TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element_non_reliable_i ( TransportQueueElement element,
bool  requires_inline_qos,
MetaSubmessageVec &  meta_submessages,
bool &  deliver_after_send,
ACE_Guard< ACE_Thread_Mutex > &  guard 
)
private

Definition at line 1228 of file RtpsUdpDataLink.cpp.

References ACE_Message_Block::cont(), OpenDDS::DCPS::RtpsSampleHeader::control_message_supported(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, ACE_Message_Block::duplicate(), OpenDDS::DCPS::TransportSendControlElement::header(), OpenDDS::DCPS::TransportDebug::log_messages, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::TransportCustomizedElement::original_send_element(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::TransportQueueElement::publication_id(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::TransportSendElement::sample(), send_heartbeats_manual_i(), send_strategy(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::DataLink::strategy_lock_, submsgs_to_msgblock(), and OpenDDS::DCPS::transport_debug.

Referenced by customize_queue_element().

1234 {
1235  RTPS::SubmessageSeq subm;
1236 
1237  TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
1238  TransportCustomizedElement* tce =
1239  dynamic_cast<TransportCustomizedElement*>(element);
1240  TransportSendControlElement* tsce =
1241  dynamic_cast<TransportSendControlElement*>(element);
1242 
1243  Message_Block_Ptr data;
1244 
1245  const ACE_Message_Block* msg = element->msg();
1246 
1247  // Based on the type of 'element', find and duplicate the data payload
1248  // continuation block.
1249  if (tsce) { // Control message
1250  if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
1251  data.reset(msg->cont()->duplicate());
1252  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1254  subm, *tsce, requires_inline_qos);
1255  } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
1256  send_heartbeats_manual_i(tsce, meta_submessages);
1257  deliver_after_send = true;
1258  return 0;
1259  } else {
1260  guard.release();
1261  element->data_dropped(true /*dropped_by_transport*/);
1262  return 0;
1263  }
1264 
1265  } else if (tse) { // Basic data message
1266  // {DataSampleHeader} -> {Data Payload}
1267  data.reset(msg->cont()->duplicate());
1268  const DataSampleElement* dsle = tse->sample();
1269  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1271  subm, *dsle, requires_inline_qos);
1272 
1273  } else if (tce) { // Customized data message
1274  // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
1275  data.reset(msg->cont()->cont()->duplicate());
1276  const DataSampleElement* dsle = tce->original_send_element()->sample();
1277  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1279  subm, *dsle, requires_inline_qos);
1280 
1281  } else {
1282  return element;
1283  }
1284 
1285 #ifdef OPENDDS_SECURITY
1286  const GUID_t pub_id = element->publication_id();
1287 
1288  {
1289  GuardType guard(strategy_lock_);
1290  if (send_strategy_) {
1291  send_strategy()->encode_payload(pub_id, data, subm);
1292  }
1293  }
1294 #endif
1295 
1297  send_strategy()->append_submessages(subm);
1298  }
1299 
1301  hdr->cont(data.release());
1302  return new RtpsCustomizedElement(element, move(hdr));
1303 }
sequence< Submessage > SubmessageSeq
Definition: RtpsCore.idl:885
static void populate_data_control_submessages(RTPS::SubmessageSeq &subm, const TransportSendControlElement &tsce, bool requires_inline_qos)
bool requires_inline_qos(const GUIDSeq_var &peers)
ACE_Message_Block * submsgs_to_msgblock(const RTPS::SubmessageSeq &subm)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
void reset(void)
static void populate_data_sample_submessages(RTPS::SubmessageSeq &subm, const DataSampleElement &dsle, bool requires_inline_qos)
static bool control_message_supported(char message_id)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
int release(void)
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
void send_heartbeats_manual_i(const TransportSendControlElement *tsce, MetaSubmessageVec &meta_submessages)
bool log_messages
Log all RTPS messages sent or recieved.
RtpsUdpSendStrategy_rch send_strategy()

◆ datareader_dispatch()

template<typename T , typename FN >
void OpenDDS::DCPS::RtpsUdpDataLink::datareader_dispatch ( const T &  submessage,
const GuidPrefix_t src_prefix,
bool  directed,
const FN &  func 
)
inlineprivate

Definition at line 816 of file RtpsUdpDataLink.h.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::make_id(), OpenDDS::ICE::OPENDDS_VECTOR(), and OpenDDS::DCPS::transport_debug.

Referenced by received().

820  {
821  const GUID_t local = make_id(local_prefix_, submessage.readerId);
822  const GUID_t src = make_id(src_prefix, submessage.writerId);
823 
825  {
827  if (local.entityId == ENTITYID_UNKNOWN) {
828  typedef std::pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> RRMM_IterRange;
829  for (RRMM_IterRange iters = readers_of_writer_.equal_range(src); iters.first != iters.second; ++iters.first) {
830  to_call.push_back(iters.first->second);
831  }
832  if (to_call.empty()) {
834  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datawreader_dispatch - %C -> X no local readers\n", LogGuid(src).c_str()));
835  }
836  return;
837  }
838  } else {
839  const RtpsReaderMap::iterator rr = readers_.find(local);
840  if (rr == readers_.end()) {
842  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datareader_dispatch - %C -> %C unknown local reader\n", LogGuid(src).c_str(), LogGuid(local).c_str()));
843  }
844  return;
845  }
846  to_call.push_back(rr->second);
847  }
848  }
849  MetaSubmessageVec meta_submessages;
850  for (OPENDDS_VECTOR(RtpsReader_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
851  RtpsReader& reader = **it;
852  (reader.*func)(submessage, src, directed, meta_submessages);
853  }
854  queue_submessages(meta_submessages);
855  }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
bool log_dropped_messages
Log received RTPS messages that were dropped.
void queue_submessages(MetaSubmessageVec &meta_submessages)
RtpsReaderMultiMap readers_of_writer_
RcHandle< RtpsReader > RtpsReader_rch
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ datawriter_dispatch()

template<typename T , typename FN >
void OpenDDS::DCPS::RtpsUdpDataLink::datawriter_dispatch ( const T &  submessage,
const GuidPrefix_t src_prefix,
const FN &  func 
)
inlineprivate

Definition at line 789 of file RtpsUdpDataLink.h.

References ACE_DEBUG, ACE_GUARD, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::make_id(), OpenDDS::ICE::OPENDDS_VECTOR(), and OpenDDS::DCPS::transport_debug.

Referenced by received().

791  {
792  const GUID_t local = make_id(local_prefix_, submessage.writerId);
793  const GUID_t src = make_id(src_prefix, submessage.readerId);
794 
796  {
798  const RtpsWriterMap::iterator rw = writers_.find(local);
799  if (rw == writers_.end()) {
801  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::datawriter_dispatch - %C -> %C unknown local writer\n", LogGuid(local).c_str(), LogGuid(src).c_str()));
802  }
803  return;
804  }
805  to_call.push_back(rw->second);
806  }
807  MetaSubmessageVec meta_submessages;
808  for (OPENDDS_VECTOR(RtpsWriter_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
809  RtpsWriter& writer = **it;
810  (writer.*func)(submessage, src, meta_submessages);
811  }
812  queue_submessages(meta_submessages);
813  }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
bool log_dropped_messages
Log received RTPS messages that were dropped.
void queue_submessages(MetaSubmessageVec &meta_submessages)
RcHandle< RtpsWriter > RtpsWriter_rch
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ disable_response_queue()

void OpenDDS::DCPS::RtpsUdpDataLink::disable_response_queue ( bool  send_immediately)

Definition at line 2694 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransactionalRtpsSendQueue::end_transaction(), flush_send_queue_i(), flush_send_queue_sporadic_, fsq_mutex_, fsq_vec_size_, sq_, and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::end_transport_header_processing(), and harvest_send_queue().

2695 {
2696  MetaSubmessageVec vec;
2698  sq_.end_transaction(vec);
2699  if (!vec.empty()) {
2700  if (fsq_vec_.size() == fsq_vec_size_) {
2701  fsq_vec_.resize(fsq_vec_.size() + 1);
2702  }
2703  fsq_vec_[fsq_vec_size_++].swap(vec);
2704  }
2705 
2706  if (fsq_vec_size_) {
2707  if (send_immediately) {
2709  } else {
2711  }
2712  }
2713 
2714 }
TransactionalRtpsSendQueue sq_
static const TimeDuration zero_value
Definition: TimeDuration.h:31
RcHandle< SporadicEvent > flush_send_queue_sporadic_

◆ disassociated()

void OpenDDS::DCPS::RtpsUdpDataLink::disassociated ( const GUID_t local,
const GUID_t remote 
)

Definition at line 656 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::TransactionalRtpsSendQueue::ignore_remote(), LM_DEBUG, locators_, locators_lock_, OPENDDS_ASSERT, release_reservations_i(), remove_locator_and_bundling_cache(), sq_, and OpenDDS::DCPS::Transport_debug_level.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::stop_accepting_or_connecting().

658 {
659  release_reservations_i(remote_id, local_id);
661  sq_.ignore_remote(remote_id);
662 
664 
665  RemoteInfoMap::iterator pos = locators_.find(remote_id);
666  if (pos != locators_.end()) {
667  OPENDDS_ASSERT(pos->second.ref_count_ > 0);
668 
669  --pos->second.ref_count_;
670  if (pos->second.ref_count_ == 0) {
671  locators_.erase(pos);
672  }
673  } else if (Transport_debug_level > 3) {
674  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::disassociated: local id %C does not have any locators\n", LogGuid(local_id).c_str()));
675  }
676 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
TransactionalRtpsSendQueue sq_
void ignore_remote(const GUID_t &id)
Mark all queued submessage with the given destination (dst_guid_) as ignored.
virtual void release_reservations_i(const GUID_t &remote_id, const GUID_t &local_id)
void remove_locator_and_bundling_cache(const GUID_t &remote_id)

◆ durability_resend() [1/2]

void OpenDDS::DCPS::RtpsUdpDataLink::durability_resend ( TransportQueueElement element,
size_t &  cumulative_send_count 
)
private

Definition at line 4015 of file RtpsUdpDataLink.cpp.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i().

4017 {
4018  static CORBA::Long buffer[8];
4019  static const RTPS::FragmentNumberSet none = { {0}, 0, RTPS::LongSeq8(0, buffer) };
4020  durability_resend(element, none, cumulative_send_count);
4021 }
ACE_CDR::Long Long
void durability_resend(TransportQueueElement *element, size_t &cumulative_send_count)
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69

◆ durability_resend() [2/2]

void OpenDDS::DCPS::RtpsUdpDataLink::durability_resend ( TransportQueueElement element,
const RTPS::FragmentNumberSet &  fragmentSet,
size_t &  cumulative_send_count 
)
private

Definition at line 4023 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, OpenDDS::DCPS::DisjointSequence::empty(), get_addresses(), OpenDDS::DCPS::SequenceNumber::getValue(), include_fragment(), OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportQueueElement::publication_id(), send_strategy(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), and OpenDDS::DCPS::Transport_debug_level.

4026 {
4027  if (Transport_debug_level > 5) {
4028  ACE_DEBUG((LM_DEBUG, "(%P|%t) TRACK RtpsUdpDataLink::durability_resend %q\n", element->sequence().getValue()));
4029  }
4030  const AddrSet addrs = get_addresses(element->publication_id(), element->subscription_id());
4031  if (addrs.empty()) {
4032  const LogGuid conv(element->subscription_id());
4033  ACE_ERROR((LM_ERROR,
4034  "(%P|%t) ERROR: RtpsUdpDataLink::durability_resend() - "
4035  "no locator for remote %C\n", conv.c_str()));
4036  return;
4037  }
4038 
4039  TqeVector to_send;
4040  if (!send_strategy()->fragmentation_helper(element, to_send)) {
4041  return;
4042  }
4043 
4044  DisjointSequence fragments;
4045  fragments.insert(fragmentSet.bitmapBase.value, fragmentSet.numBits,
4046  fragmentSet.bitmap.get_buffer());
4047  SequenceNumber lastFragment = 0;
4048 
4049  const TqeVector::iterator end = to_send.end();
4050  for (TqeVector::iterator i = to_send.begin(); i != end; ++i) {
4051  if (fragments.empty() || include_fragment(**i, fragments, lastFragment)) {
4052  RTPS::Message message;
4053  send_strategy()->send_rtps_control(message, *const_cast<ACE_Message_Block*>((*i)->msg()), addrs);
4054  ++cumulative_send_count;
4055  }
4056 
4057  (*i)->data_delivered();
4058  }
4059 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
static bool include_fragment(const TransportQueueElement &element, const DisjointSequence &fragments, SequenceNumber &lastFragment)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
AddrSet get_addresses(const GUID_t &local, const GUID_t &remote) const
RtpsUdpSendStrategy_rch send_strategy()

◆ enable_response_queue()

void OpenDDS::DCPS::RtpsUdpDataLink::enable_response_queue ( )

Definition at line 2688 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransactionalRtpsSendQueue::begin_transaction(), and sq_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::begin_transport_header_processing().

2689 {
2691 }
void begin_transaction()
Signal that a thread is beginning to send a sequence of submessages.
TransactionalRtpsSendQueue sq_

◆ event_dispatcher()

EventDispatcher_rch OpenDDS::DCPS::RtpsUdpDataLink::event_dispatcher ( )
inline

Definition at line 248 of file RtpsUdpDataLink.h.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acknack().

248 { return event_dispatcher_; }
EventDispatcher_rch event_dispatcher_

◆ extend_bitmap_range()

void OpenDDS::DCPS::RtpsUdpDataLink::extend_bitmap_range ( RTPS::FragmentNumberSet &  fnSet,
CORBA::ULong  extent,
ACE_CDR::ULong cumulative_bits_added 
)
staticprivate

Extend the FragmentNumberSet to cover the fragments that are missing from our last known fragment to the extent

Parameters
fnSetFragmentNumberSet for the message sequence number in question
extentis the highest fragment sequence number for this FragmentNumberSet

Definition at line 2968 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::fill_bitmap_range().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::generate_nack_frags_i().

2971 {
2972  if (extent < fnSet.bitmapBase.value) {
2973  return; // can't extend to some number under the base
2974  }
2975  // calculate the index to the extent to determine the new_num_bits
2976  const CORBA::ULong new_num_bits = std::min(CORBA::ULong(256),
2977  extent - fnSet.bitmapBase.value + 1),
2978  len = (new_num_bits + 31) / 32;
2979  if (new_num_bits < fnSet.numBits) {
2980  return; // bitmap already extends past "extent"
2981  }
2982  fnSet.bitmap.length(len);
2983  // We are missing from one past old bitmap end to the new end
2984  DisjointSequence::fill_bitmap_range(fnSet.numBits, new_num_bits,
2985  fnSet.bitmap.get_buffer(), len,
2986  fnSet.numBits, samples_requested);
2987 }
ACE_CDR::ULong ULong
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.

◆ filterBestEffortReaders()

void OpenDDS::DCPS::RtpsUdpDataLink::filterBestEffortReaders ( const ReceivedDataSample ds,
RepoIdSet selected,
RepoIdSet withheld 
)

Definition at line 525 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::publication_id_, readers_lock_, OpenDDS::DCPS::DataSampleHeader::sequence_, and writer_to_seq_best_effort_readers_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().

526 {
528  const GUID_t& writer = ds.header_.publication_id_;
529  const SequenceNumber& seq = ds.header_.sequence_;
530  WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.find(writer);
531  if (w != writer_to_seq_best_effort_readers_.end()) {
532  if (w->second.seq < seq) {
533  w->second.seq = seq;
534  selected.insert(w->second.readers.begin(), w->second.readers.end());
535  } else {
536  withheld.insert(w->second.readers.begin(), w->second.readers.end());
537  }
538  } // else the writer is not associated with best effort readers
539 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
WriterToSeqReadersMap writer_to_seq_best_effort_readers_

◆ flush_send_queue()

void OpenDDS::DCPS::RtpsUdpDataLink::flush_send_queue ( const MonotonicTimePoint now)
private

◆ flush_send_queue_i()

void OpenDDS::DCPS::RtpsUdpDataLink::flush_send_queue_i ( )
private

Definition at line 2677 of file RtpsUdpDataLink.cpp.

References bundle_and_send_submessages(), OpenDDS::DCPS::dedup(), and fsq_vec_size_.

Referenced by disable_response_queue(), and flush_send_queue().

2678 {
2679  for (size_t idx = 0; idx != fsq_vec_size_; ++idx) {
2680  dedup(fsq_vec_[idx]);
2681  bundle_and_send_submessages(fsq_vec_[idx]);
2682  fsq_vec_[idx].clear();
2683  }
2684  fsq_vec_size_ = 0;
2685 }
void bundle_and_send_submessages(MetaSubmessageVec &meta_submessages)
size_t dedup(MetaSubmessageVec &vec)

◆ get_addresses() [1/2]

AddrSet OpenDDS::DCPS::RtpsUdpDataLink::get_addresses ( const GUID_t local,
const GUID_t remote 
) const

Given a 'local' id and a 'remote' id of a publication or subscription, return the set of addresses of the remote peers.

Definition at line 4594 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, get_addresses_i(), and locators_lock_.

Referenced by durability_resend(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper().

4595 {
4597  return get_addresses_i(local, remote);
4598 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
AddrSet get_addresses_i(const GUID_t &local, const GUID_t &remote) const

◆ get_addresses() [2/2]

AddrSet OpenDDS::DCPS::RtpsUdpDataLink::get_addresses ( const GUID_t local) const

Given a 'local' id, return the set of address for all remote peers.

Definition at line 4601 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, get_addresses_i(), and locators_lock_.

4602 {
4604  return get_addresses_i(local);
4605 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
AddrSet get_addresses_i(const GUID_t &local, const GUID_t &remote) const

◆ get_addresses_i() [1/2]

AddrSet OpenDDS::DCPS::RtpsUdpDataLink::get_addresses_i ( const GUID_t local,
const GUID_t remote 
) const
private

Definition at line 4608 of file RtpsUdpDataLink.cpp.

References accumulate_addresses().

Referenced by build_meta_submessage_map(), and get_addresses().

4609 {
4610  AddrSet retval;
4611 
4612  accumulate_addresses(local, remote, retval, true);
4613 
4614  return retval;
4615 }
void accumulate_addresses(const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const

◆ get_addresses_i() [2/2]

AddrSet OpenDDS::DCPS::RtpsUdpDataLink::get_addresses_i ( const GUID_t local) const
private

Definition at line 4618 of file RtpsUdpDataLink.cpp.

References accumulate_addresses(), OpenDDS::DCPS::GuidConverter::isWriter(), OpenDDS::DCPS::DataLink::peer_ids(), ACE_Guard< ACE_LOCK >::release(), writers_, and writers_lock_.

4619 {
4620  AddrSet retval;
4621  bool use_peers = true;
4622 
4623  // For reliable writers, use remote_reader_guids()
4624  const GuidConverter conv(local);
4625  if (conv.isWriter()) {
4626  RtpsWriter_rch writer;
4628  RtpsWriterMap::const_iterator pos = writers_.find(local);
4629  if (pos != writers_.end()) {
4630  writer = pos->second;
4631  }
4632  guard.release();
4633  if (writer) {
4634  RcHandle<ConstSharedRepoIdSet> addr_guids = writer->get_remote_reader_guids();
4635  if (addr_guids) {
4636  for (RepoIdSet::const_iterator it = addr_guids->guids_.begin(),
4637  limit = addr_guids->guids_.end(); it != limit; ++it) {
4638  accumulate_addresses(local, *it, retval);
4639 
4640  }
4641  use_peers = false;
4642  }
4643  }
4644  }
4645 
4646  if (use_peers) {
4647  const GUIDSeq_var peers = peer_ids(local);
4648  if (peers.ptr()) {
4649  for (CORBA::ULong i = 0; i < peers->length(); ++i) {
4650  accumulate_addresses(local, peers[i], retval);
4651  }
4652  }
4653  }
4654 
4655  return retval;
4656 }
void accumulate_addresses(const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const
RcHandle< RtpsWriter > RtpsWriter_rch
ACE_CDR::ULong ULong
GUIDSeq * peer_ids(const GUID_t &local_id) const
Definition: DataLink.cpp:490

◆ get_ice_agent()

RcHandle< ICE::Agent > OpenDDS::DCPS::RtpsUdpDataLink::get_ice_agent ( ) const

Definition at line 4787 of file RtpsUdpDataLink.cpp.

References ice_agent_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes().

4788 {
4789  return ice_agent_;
4790 }
RcHandle< ICE::Agent > ice_agent_

◆ get_ice_endpoint()

WeakRcHandle< ICE::Endpoint > OpenDDS::DCPS::RtpsUdpDataLink::get_ice_endpoint ( ) const
virtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 4794 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransportImpl::get_ice_endpoint(), and OpenDDS::DCPS::DataLink::impl().

Referenced by accumulate_addresses(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes().

4795 {
4796  TransportImpl_rch ti = impl();
4797  return ti ? ti->get_ice_endpoint() : WeakRcHandle<ICE::Endpoint>();
4798 }
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.

◆ get_job_queue()

RcHandle<JobQueue> OpenDDS::DCPS::RtpsUdpDataLink::get_job_queue ( ) const
inline

Definition at line 249 of file RtpsUdpDataLink.h.

249 { return job_queue_; }
RcHandle< JobQueue > job_queue_

◆ get_last_recv_address()

NetworkAddress OpenDDS::DCPS::RtpsUdpDataLink::get_last_recv_address ( const GUID_t remote_id)

Definition at line 467 of file RtpsUdpDataLink.cpp.

References config(), locators_, locators_lock_, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now().

Referenced by OpenDDS::DCPS::RtpsUdpTransport::get_last_recv_locator().

468 {
470  const RemoteInfoMap::const_iterator pos = locators_.find(remote_id);
471  if (pos != locators_.end()) {
472  RtpsUdpInst_rch cfg = config();
473  const TimeDuration threshold = cfg ? cfg->receive_address_duration_ : TimeDuration();
474  const bool valid_last_recv_addr = (MonotonicTimePoint::now() - pos->second.last_recv_time_) <= threshold;
475  return valid_last_recv_addr ? pos->second.last_recv_addr_ : NetworkAddress();
476  }
477  return NetworkAddress();
478 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41

◆ get_reactor()

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::RtpsUdpDataLink::get_reactor ( void  )

Definition at line 16 of file RtpsUdpDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::ReactorTask::get_reactor(), and reactor_task_.

Referenced by on_data_available().

17 {
18  if (!reactor_task_) return 0;
19  return reactor_task_->get_reactor();
20 }
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14

◆ get_reactor_interceptor()

ACE_INLINE ReactorInterceptor_rch OpenDDS::DCPS::RtpsUdpDataLink::get_reactor_interceptor ( ) const

Definition at line 23 of file RtpsUdpDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::ReactorTask::interceptor(), and reactor_task_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().

24 {
26  return reactor_task_->interceptor();
27 }
RcHandle< ReactorInterceptor > ReactorInterceptor_rch
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65

◆ get_writer_send_buffer()

RcHandle< SingleSendBuffer > OpenDDS::DCPS::RtpsUdpDataLink::get_writer_send_buffer ( const GUID_t pub_id)
private

Definition at line 979 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, writers_, and writers_lock_.

980 {
981  RcHandle<SingleSendBuffer> result;
983 
984  const RtpsWriterMap::iterator wi = writers_.find(pub_id);
985  if (wi != writers_.end()) {
986  result = wi->second->get_send_buff();
987  }
988  return result;
989 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ handle_registry()

Security::HandleRegistry_rch OpenDDS::DCPS::RtpsUdpDataLink::handle_registry ( ) const
inline

◆ harvest_send_queue()

void OpenDDS::DCPS::RtpsUdpDataLink::harvest_send_queue ( const MonotonicTimePoint now)
private

Definition at line 2661 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransactionalRtpsSendQueue::begin_transaction(), disable_response_queue(), OpenDDS::DCPS::TransactionalRtpsSendQueue::ready_to_send(), and sq_.

2662 {
2664  sq_.ready_to_send();
2665 
2666  disable_response_queue(true);
2667 }
void ready_to_send()
Indicate that the queue is ready to send after all pending transactions are complete.
void begin_transaction()
Signal that a thread is beginning to send a sequence of submessages.
void disable_response_queue(bool send_immediately)
TransactionalRtpsSendQueue sq_

◆ include_fragment()

bool OpenDDS::DCPS::RtpsUdpDataLink::include_fragment ( const TransportQueueElement element,
const DisjointSequence fragments,
SequenceNumber lastFragment 
)
staticprivate

Definition at line 4061 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::contains_any(), OpenDDS::DCPS::TransportQueueElement::is_fragment(), and OpenDDS::DCPS::RtpsCustomizedElement::last_fragment().

Referenced by durability_resend().

4064 {
4065  if (!element.is_fragment()) {
4066  return true;
4067  }
4068 
4069  const RtpsCustomizedElement* const rce = dynamic_cast<const RtpsCustomizedElement*>(&element);
4070  if (!rce) {
4071  return true;
4072  }
4073 
4074  const SequenceRange thisElement(lastFragment + 1, rce->last_fragment());
4075  lastFragment = thisElement.second;
4076  return fragments.contains_any(thisElement);
4077 }
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ is_leading()

bool OpenDDS::DCPS::RtpsUdpDataLink::is_leading ( const GUID_t writer_id,
const GUID_t reader_id 
) const
virtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 4800 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, writers_, and writers_lock_.

4802 {
4803  RtpsWriterMap::mapped_type writer;
4804 
4805  {
4807  RtpsWriterMap::const_iterator pos = writers_.find(writer_id);
4808  if (pos == writers_.end()) {
4809  return false;
4810  }
4811  writer = pos->second;
4812  }
4813 
4814  return writer->is_leading(reader_id);
4815 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ local_crypto_handle() [1/2]

ACE_INLINE DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle ( ) const

◆ local_crypto_handle() [2/2]

ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle ( DDS::Security::ParticipantCryptoHandle  pch)

◆ local_prefix()

const GuidPrefix_t& OpenDDS::DCPS::RtpsUdpDataLink::local_prefix ( ) const
inline

◆ make_reservation()

int OpenDDS::DCPS::RtpsUdpDataLink::make_reservation ( const GUID_t remote_publication_id,
const GUID_t local_subscription_id,
const TransportReceiveListener_wrch receive_listener,
bool  reliable 
)
virtual

Only called by our TransportImpl object.

Return Codes: 0 means successful reservation made. -1 means failure.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 542 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DataLink::make_reservation(), pending_reliable_readers_, readers_, readers_lock_, and ACE_Guard< ACE_LOCK >::release().

546 {
548  if (reliable) {
549  RtpsReaderMap::iterator rr = readers_.find(lsi);
550  if (rr == readers_.end()) {
551  pending_reliable_readers_.insert(lsi);
552  }
553  }
554  guard.release();
555  return DataLink::make_reservation(rpi, lsi, trl, reliable);
556 }
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398

◆ multicast_socket()

ACE_INLINE ACE_SOCK_Dgram_Mcast & OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket ( )

◆ on_data_available()

void OpenDDS::DCPS::RtpsUdpDataLink::on_data_available ( RcHandle< InternalDataReader< NetworkInterfaceAddress > >  reader)
private

Definition at line 428 of file RtpsUdpDataLink.cpp.

References DDS::ANY_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, config(), get_reactor(), DDS::LENGTH_UNLIMITED, multicast_manager_, multicast_socket_, OpenDDS::DCPS::DataLink::network_change(), network_interface_address_reader_, OpenDDS::DCPS::MulticastManager::process(), and receive_strategy().

429 {
430  InternalDataReader<NetworkInterfaceAddress>::SampleSequence samples;
431  InternalSampleInfoSequence infos;
432 
434 
435  RtpsUdpInst_rch cfg = config();
436  if (!cfg || !cfg->use_multicast_) {
437  return;
438  }
439 
440  multicast_manager_.process(samples,
441  infos,
442  cfg->multicast_interface_,
443  get_reactor(),
444  receive_strategy().in(),
445  cfg->multicast_group_address(),
447 #ifdef ACE_HAS_IPV6
448  , cfg->ipv6_multicast_group_address(),
449  ipv6_multicast_socket_
450 #endif
451  );
452 
453  if (!samples.empty()) {
454  // FUTURE: This is propagating info to discovery. Write instead.
455  network_change();
456  }
457 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpReceiveStrategy_rch receive_strategy()
const SampleStateMask ANY_SAMPLE_STATE
RtpsUdpInst_rch config() const
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
const InstanceStateMask ANY_INSTANCE_STATE
const ViewStateMask ANY_VIEW_STATE
ACE_SOCK_Dgram_Mcast multicast_socket_
void network_change() const
Definition: DataLink.cpp:1212
const long LENGTH_UNLIMITED
bool process(InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
Returns true if at least one group was joined.

◆ open()

bool OpenDDS::DCPS::RtpsUdpDataLink::open ( const ACE_SOCK_Dgram unicast_socket)

Definition at line 294 of file RtpsUdpDataLink.cpp.

References ACE_ERROR, ACE_TEXT(), config(), OpenDDS::DCPS::DCPS_debug_level, ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, ENOTSUP, LM_ERROR, multi_buff_, multicast_socket_, network_interface_address_reader_, ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), OpenDDS::DCPS::DataLink::receive_strategy_, send_strategy(), OpenDDS::DCPS::DataLink::send_strategy_, ACE_SOCK::set_option(), OpenDDS::DCPS::set_socket_multicast_ttl(), SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, OpenDDS::DCPS::DataLink::start(), stop_i(), TheServiceParticipant, unicast_socket(), and unicast_socket_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::make_datalink().

299 {
301 #ifdef ACE_HAS_IPV6
302  ipv6_unicast_socket_ = ipv6_unicast_socket;
303 #endif
304 
305  RtpsUdpInst_rch cfg = config();
306  if (!cfg) {
307  return false;
308  }
309 
310  if (cfg->use_multicast_) {
311 #ifdef ACE_HAS_MAC_OSX
314 #ifdef ACE_HAS_IPV6
315  ipv6_multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
317 #endif
318 #endif
319  }
320 
321  if (cfg->use_multicast_) {
322  if (!set_socket_multicast_ttl(unicast_socket_, cfg->ttl_)) {
323  if (DCPS_debug_level > 0) {
324  ACE_ERROR((LM_ERROR,
325  ACE_TEXT("(%P|%t) ERROR: ")
326  ACE_TEXT("RtpsUdpDataLink::open: ")
327  ACE_TEXT("failed to set TTL: %d\n"),
328  cfg->ttl_));
329  }
330  return false;
331  }
332 #ifdef ACE_HAS_IPV6
333  if (!set_socket_multicast_ttl(ipv6_unicast_socket_, cfg->ttl_)) {
334  if (DCPS_debug_level > 0) {
335  ACE_ERROR((LM_ERROR,
336  ACE_TEXT("(%P|%t) ERROR: ")
337  ACE_TEXT("RtpsUdpDataLink::open: ")
338  ACE_TEXT("failed to set TTL: %d\n"),
339  cfg->ttl_));
340  }
341  return false;
342  }
343 #endif
344  }
345 
346  if (cfg->send_buffer_size_ > 0) {
347  const int snd_size = cfg->send_buffer_size_;
348  if (unicast_socket_.set_option(SOL_SOCKET,
349  SO_SNDBUF,
350  (void *) &snd_size,
351  sizeof(snd_size)) < 0
352  && errno != ENOTSUP) {
353  if (DCPS_debug_level > 0) {
354  ACE_ERROR((LM_ERROR,
355  ACE_TEXT("(%P|%t) ERROR: ")
356  ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
357  snd_size));
358  }
359  return false;
360  }
361 #ifdef ACE_HAS_IPV6
362  if (ipv6_unicast_socket_.set_option(SOL_SOCKET,
363  SO_SNDBUF,
364  (void *) &snd_size,
365  sizeof(snd_size)) < 0
366  && errno != ENOTSUP) {
367  if (DCPS_debug_level > 0) {
368  ACE_ERROR((LM_ERROR,
369  ACE_TEXT("(%P|%t) ERROR: ")
370  ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
371  snd_size));
372  }
373  return false;
374  }
375 #endif
376  }
377 
378  if (cfg->rcv_buffer_size_ > 0) {
379  const int rcv_size = cfg->rcv_buffer_size_;
380  if (unicast_socket_.set_option(SOL_SOCKET,
381  SO_RCVBUF,
382  (void *) &rcv_size,
383  sizeof(int)) < 0
384  && errno != ENOTSUP) {
385  if (DCPS_debug_level > 0) {
386  ACE_ERROR((LM_ERROR,
387  ACE_TEXT("(%P|%t) ERROR: ")
388  ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
389  rcv_size));
390  }
391  return false;
392  }
393 #ifdef ACE_HAS_IPV6
394  if (ipv6_unicast_socket_.set_option(SOL_SOCKET,
395  SO_RCVBUF,
396  (void *) &rcv_size,
397  sizeof(int)) < 0
398  && errno != ENOTSUP) {
399  if (DCPS_debug_level > 0) {
400  ACE_ERROR((LM_ERROR,
401  ACE_TEXT("(%P|%t) ERROR: ")
402  ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
403  rcv_size));
404  }
405  return false;
406  }
407 #endif
408  }
409 
410  send_strategy()->send_buffer(&multi_buff_);
411 
412  if (start(send_strategy_,
413  receive_strategy_, false) != 0) {
414  stop_i();
415  if (DCPS_debug_level > 0) {
416  ACE_ERROR((LM_ERROR,
417  ACE_TEXT("(%P|%t) ERROR: ")
418  ACE_TEXT("UdpDataLink::open: start failed!\n")));
419  }
420  return false;
421  }
422 
423  TheServiceParticipant->network_interface_address_topic()->connect(network_interface_address_reader_);
424 
425  return true;
426 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_ERROR(X)
#define ENOTSUP
void opts(int opts)
int set_option(int level, int option, void *optval, int optlen) const
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
RtpsUdpInst_rch config() const
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
ACE_TEXT("TCP_Factory")
ACE_SOCK_Dgram_Mcast multicast_socket_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
RtpsUdpSendStrategy_rch send_strategy()
#define TheServiceParticipant

◆ OPENDDS_MAP() [1/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( FragmentNumberValue  ,
RTPS::FragmentNumberSet   
)
private

◆ OPENDDS_MAP() [2/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( SequenceNumber  ,
RequestedFragMap   
)
private

◆ OPENDDS_MAP() [3/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( SequenceNumber  ,
ReaderInfoSetHolder_rch   
)
private

◆ OPENDDS_MAP() [4/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( AddressCacheEntryProxy  ,
DestMetaSubmessageMap   
)
private

◆ OPENDDS_MAP() [5/5]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP ( CORBA::Long  ,
CountMapPair   
)
private

◆ OPENDDS_MAP_CMP() [1/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
RemoteInfo  ,
GUID_tKeyLessThan   
)
private

Referenced by send_heartbeats().

◆ OPENDDS_MAP_CMP() [2/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
ReaderInfo_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [3/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
RtpsWriter_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [4/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
WriterInfo_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [5/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
MetaSubmessageIterVec  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [6/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( EntityId_t  ,
CountSet  ,
EntityId_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [7/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( EntityId_t  ,
CountMapping  ,
EntityId_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [8/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( GUID_t  ,
RtpsReader_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [9/9]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( EntityId_t  ,
CORBA::Long  ,
EntityId_tKeyLessThan   
)
private

◆ OPENDDS_MULTIMAP_CMP() [1/2]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP ( GUID_t  ,
RtpsReader_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MULTIMAP_CMP() [2/2]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP ( GUID_t  ,
InterestingRemote  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_SET() [1/3]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET ( ReaderInfo_rch  )
private

◆ OPENDDS_SET() [2/3]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET ( WriterInfo_rch  )
private

◆ OPENDDS_SET() [3/3]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET ( CORBA::Long  )
private

◆ OPENDDS_VECTOR() [1/4]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( MetaSubmessageVec::iterator  )
private

◆ OPENDDS_VECTOR() [2/4]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( MetaSubmessageIterVec  )
private

◆ OPENDDS_VECTOR() [3/4]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( Bundle  )

◆ OPENDDS_VECTOR() [4/4]

OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( MetaSubmessageVec  )
private

◆ pre_stop_i()

void OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i ( )
virtual

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 861 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, DBG_ENTRY_LVL, heartbeat_counts_, OpenDDS::DCPS::DataLink::pre_stop_i(), readers_, readers_lock_, writers_, and writers_lock_.

862 {
863  DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
865  TqeVector to_drop;
866 
867  RtpsWriterMap writers;
868  {
870  writers_.swap(writers);
871  for (RtpsWriterMap::const_iterator it = writers.begin(); it != writers.end(); ++it) {
872  heartbeat_counts_.erase(it->first.entityId);
873  }
874  }
875 
876  RtpsWriterMap::iterator w_iter = writers.begin();
877  while (w_iter != writers.end()) {
878  w_iter->second->pre_stop_helper(to_drop, true);
879  writers.erase(w_iter++);
880  }
881 
882  TqeVector::iterator drop_it = to_drop.begin();
883  while (drop_it != to_drop.end()) {
884  (*drop_it)->data_dropped(true);
885  ++drop_it;
886  }
887 
888  RtpsReaderMap readers;
889  {
891  readers = readers_;
892  }
893 
894  RtpsReaderMap::iterator r_iter = readers.begin();
895  while (r_iter != readers.end()) {
896  r_iter->second->pre_stop_helper();
897  ++r_iter;
898  }
899 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual void pre_stop_i()
Definition: DataLink.cpp:993

◆ queue_submessages()

void OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages ( MetaSubmessageVec &  meta_submessages)
private

◆ reactor_is_shut_down()

ACE_INLINE bool OpenDDS::DCPS::RtpsUdpDataLink::reactor_is_shut_down ( )

Definition at line 30 of file RtpsUdpDataLink.inl.

References ACE_INLINE, OpenDDS::DCPS::ReactorTask::is_shut_down(), and reactor_task_.

31 {
32  if (!reactor_task_) return true;
33  return reactor_task_->is_shut_down();
34 }

◆ receive_strategy()

RtpsUdpReceiveStrategy_rch OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy ( )
private

◆ received() [1/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::DataSubmessage data,
const GuidPrefix_t src_prefix,
const NetworkAddress remote_addr 
)

Definition at line 1568 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, local_prefix_, OpenDDS::DCPS::make_id(), OPENDDS_VECTOR(), pending_reliable_readers_, queue_submessages(), OpenDDS::RTPS::DataSubmessage::readerId, readers_, readers_lock_, readers_of_writer_, receive_strategy(), OpenDDS::DCPS::DataLink::strategy_lock_, update_last_recv_addr(), and OpenDDS::RTPS::DataSubmessage::writerId.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().

1571 {
1572  const GUID_t local = make_id(local_prefix_, data.readerId);
1573  const GUID_t src = make_id(src_prefix, data.writerId);
1574 
1575  update_last_recv_addr(src, remote_addr);
1576 
1577  OPENDDS_VECTOR(RtpsReader_rch) to_call;
1578  {
1580  if (local.entityId == ENTITYID_UNKNOWN) {
1581  typedef std::pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> RRMM_IterRange;
1582  for (RRMM_IterRange iters = readers_of_writer_.equal_range(src); iters.first != iters.second; ++iters.first) {
1583  to_call.push_back(iters.first->second);
1584  }
1585  if (!pending_reliable_readers_.empty()) {
1586  GuardType guard(strategy_lock_);
1588  if (trs) {
1589  for (RepoIdSet::const_iterator it = pending_reliable_readers_.begin();
1590  it != pending_reliable_readers_.end(); ++it)
1591  {
1592  trs->withhold_data_from(*it);
1593  }
1594  }
1595  }
1596  } else {
1597  const RtpsReaderMap::iterator rr = readers_.find(local);
1598  if (rr != readers_.end()) {
1599  to_call.push_back(rr->second);
1600  } else if (pending_reliable_readers_.count(local)) {
1601  GuardType guard(strategy_lock_);
1603  if (trs) {
1604  trs->withhold_data_from(local);
1605  }
1606  }
1607  }
1608  }
1609  MetaSubmessageVec meta_submessages;
1610  for (OPENDDS_VECTOR(RtpsReader_rch)::const_iterator it = to_call.begin(); it < to_call.end(); ++it) {
1611  (*it)->process_data_i(data, src, meta_submessages);
1612  }
1613  queue_submessages(meta_submessages);
1614 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
RtpsUdpReceiveStrategy_rch receive_strategy()
void queue_submessages(MetaSubmessageVec &meta_submessages)
RcHandle< RtpsUdpReceiveStrategy > RtpsUdpReceiveStrategy_rch
RtpsReaderMultiMap readers_of_writer_
RcHandle< RtpsReader > RtpsReader_rch
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ received() [2/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::GapSubmessage gap,
const GuidPrefix_t src_prefix,
bool  directed,
const NetworkAddress remote_addr 
)

Definition at line 1781 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), OpenDDS::DCPS::make_id(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_gap_i(), update_last_recv_addr(), and OpenDDS::RTPS::GapSubmessage::writerId.

1785 {
1786  update_last_recv_addr(make_id(src_prefix, gap.writerId), remote_addr);
1787  datareader_dispatch(gap, src_prefix, directed, &RtpsReader::process_gap_i);
1788 }
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
void process_gap_i(const RTPS::GapSubmessage &gap, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [3/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::HeartBeatSubmessage heartbeat,
const GuidPrefix_t src_prefix,
bool  directed,
const NetworkAddress remote_addr 
)

Definition at line 1865 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, datareader_dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, interesting_writers_, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_i(), queue_submessages(), readers_lock_, update_last_recv_addr(), and OpenDDS::RTPS::HeartBeatSubmessage::writerId.

1869 {
1870  const GUID_t src = make_id(src_prefix, heartbeat.writerId);
1872 
1873  update_last_recv_addr(src, remote_addr, now);
1874 
1875  MetaSubmessageVec meta_submessages;
1876  OPENDDS_VECTOR(InterestingRemote) callbacks;
1877  {
1879 
1880  // We received a heartbeat from a writer.
1881  // We should ACKNACK if the writer is interesting and there is no association.
1882 
1883  for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
1884  limit = interesting_writers_.upper_bound(src);
1885  pos != limit;
1886  ++pos) {
1887  pos->second.last_activity = now;
1888  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
1889  callbacks.push_back(pos->second);
1890  pos->second.status = InterestingRemote::EXISTS;
1891  }
1892  }
1893  }
1894  queue_submessages(meta_submessages);
1895 
1896  for (size_t i = 0; i < callbacks.size(); ++i) {
1897  callbacks[i].listener->writer_exists(src, callbacks[i].localid);
1898  }
1899 
1900  datareader_dispatch(heartbeat, src_prefix, directed, &RtpsReader::process_heartbeat_i);
1901 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
void queue_submessages(MetaSubmessageVec &meta_submessages)
void process_heartbeat_i(const RTPS::HeartBeatSubmessage &heartbeat, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
InterestingRemoteMapType interesting_writers_
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [4/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::HeartBeatFragSubmessage hb_frag,
const GuidPrefix_t src_prefix,
bool  directed,
const NetworkAddress remote_addr 
)

Definition at line 2990 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), OpenDDS::DCPS::make_id(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i(), update_last_recv_addr(), and OpenDDS::RTPS::HeartBeatFragSubmessage::writerId.

2994 {
2995  update_last_recv_addr(make_id(src_prefix, hb_frag.writerId), remote_addr);
2996  datareader_dispatch(hb_frag, src_prefix, directed, &RtpsReader::process_heartbeat_frag_i);
2997 }
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
void process_heartbeat_frag_i(const RTPS::HeartBeatFragSubmessage &hb_frag, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [5/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::AckNackSubmessage acknack,
const GuidPrefix_t src_prefix,
const NetworkAddress remote_addr 
)

Definition at line 3062 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, datawriter_dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, interesting_readers_, local_prefix_, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acknack(), OpenDDS::RTPS::AckNackSubmessage::readerId, update_last_recv_addr(), OpenDDS::RTPS::AckNackSubmessage::writerId, and writers_lock_.

3065 {
3066  // local side is DW
3067  const GUID_t local = make_id(local_prefix_, acknack.writerId); // can't be ENTITYID_UNKNOWN
3068  const GUID_t remote = make_id(src_prefix, acknack.readerId);
3070 
3071  update_last_recv_addr(remote, remote_addr, now);
3072 
3073  OPENDDS_VECTOR(DiscoveryListener*) callbacks;
3074 
3075  {
3077  for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
3078  limit = interesting_readers_.upper_bound(remote);
3079  pos != limit;
3080  ++pos) {
3081  pos->second.last_activity = now;
3082  // Ensure the acknack was for the writer.
3083  if (local == pos->second.localid) {
3084  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
3085  callbacks.push_back(pos->second.listener);
3086  pos->second.status = InterestingRemote::EXISTS;
3087  }
3088  }
3089  }
3090  }
3091 
3092  for (size_t i = 0; i < callbacks.size(); ++i) {
3093  callbacks[i]->reader_exists(remote, local);
3094  }
3095 
3096  datawriter_dispatch(acknack, src_prefix, &RtpsWriter::process_acknack);
3097 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
void process_acknack(const RTPS::AckNackSubmessage &acknack, const GUID_t &src, MetaSubmessageVec &meta_submessages)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
InterestingRemoteMapType interesting_readers_
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
void datawriter_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ received() [6/6]

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::NackFragSubmessage nackfrag,
const GuidPrefix_t src_prefix,
const NetworkAddress remote_addr 
)

Definition at line 3404 of file RtpsUdpDataLink.cpp.

References datawriter_dispatch(), OpenDDS::DCPS::make_id(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_nackfrag(), OpenDDS::RTPS::NackFragSubmessage::readerId, and update_last_recv_addr().

3407 {
3408  update_last_recv_addr(make_id(src_prefix, nackfrag.readerId), remote_addr);
3409  datawriter_dispatch(nackfrag, src_prefix, &RtpsWriter::process_nackfrag);
3410 }
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
void process_nackfrag(const RTPS::NackFragSubmessage &nackfrag, const GUID_t &src, MetaSubmessageVec &meta_submessages)
void datawriter_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ register_for_reader()

void OpenDDS::DCPS::RtpsUdpDataLink::register_for_reader ( const GUID_t writerid,
const GUID_t readerid,
const AddrSet &  addresses,
DiscoveryListener listener 
)

Definition at line 679 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, OpenDDS::DCPS::GUID_t::entityId, heartbeat_, heartbeat_counts_, interesting_readers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::register_for_reader().

683 {
685  const bool enableheartbeat = interesting_readers_.empty();
686  interesting_readers_.insert(
687  InterestingRemoteMapType::value_type(
688  readerid,
689  InterestingRemote(writerid, addresses, listener)));
690  if (heartbeat_counts_.find(writerid.entityId) == heartbeat_counts_.end()) {
691  heartbeat_counts_[writerid.entityId] = 0;
692  }
693  g.release();
694  if (enableheartbeat) {
695  RtpsUdpInst_rch cfg = config();
696  heartbeat_->enable(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
697  }
698 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
RtpsUdpInst_rch config() const
InterestingRemoteMapType interesting_readers_
RcHandle< PeriodicEvent > heartbeat_

◆ register_for_writer()

void OpenDDS::DCPS::RtpsUdpDataLink::register_for_writer ( const GUID_t readerid,
const GUID_t writerid,
const AddrSet &  addresses,
DiscoveryListener listener 
)

Definition at line 718 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, heartbeatchecker_, interesting_writers_, and readers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::register_for_writer().

722 {
724  bool enableheartbeatchecker = interesting_writers_.empty();
725  interesting_writers_.insert(
726  InterestingRemoteMapType::value_type(
727  writerid,
728  InterestingRemote(readerid, addresses, listener)));
729  g.release();
730  if (enableheartbeatchecker) {
731  RtpsUdpInst_rch cfg = config();
732  heartbeatchecker_->enable(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
733  }
734 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
RtpsUdpInst_rch config() const
InterestingRemoteMapType interesting_writers_
RcHandle< PeriodicEvent > heartbeatchecker_

◆ release_reservations_i()

void OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i ( const GUID_t remote_id,
const GUID_t local_id 
)
privatevirtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 902 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), readers_, readers_lock_, readers_of_writer_, remove_locator_and_bundling_cache(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::remove_writer(), writer_to_seq_best_effort_readers_, writers_, and writers_lock_.

Referenced by disassociated().

904 {
905  TqeVector to_drop;
906  using std::pair;
907  const GuidConverter conv(local_id);
908  if (conv.isWriter()) {
910  RtpsWriterMap::iterator rw = writers_.find(local_id);
911 
912  if (rw != writers_.end()) {
913  RtpsWriter_rch writer = rw->second;
914  g.release();
915  writer->remove_reader(remote_id);
916  if (writer->reader_count() == 0) {
917  writer->pre_stop_helper(to_drop, false);
918  }
919  writer->process_acked_by_all();
920  }
921 
922  } else if (conv.isReader()) {
924  RtpsReaderMap::iterator rr = readers_.find(local_id);
925 
926  if (rr != readers_.end()) {
927  for (pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> iters =
928  readers_of_writer_.equal_range(remote_id);
929  iters.first != iters.second;) {
930  if (iters.first->second->id() == local_id) {
931  readers_of_writer_.erase(iters.first++);
932  } else {
933  ++iters.first;
934  }
935  }
936 
937  RtpsReader_rch reader = rr->second;
938  g.release();
939 
940  reader->remove_writer(remote_id);
941 
942  } else {
943  WriterToSeqReadersMap::iterator w = writer_to_seq_best_effort_readers_.find(remote_id);
944  if (w != writer_to_seq_best_effort_readers_.end()) {
945  RepoIdSet::iterator r = w->second.readers.find(local_id);
946  if (r != w->second.readers.end()) {
947  w->second.readers.erase(r);
948  if (w->second.readers.empty()) {
950  }
951  }
952  }
953  }
954  }
955 
957 
958  for (TqeVector::iterator drop_it = to_drop.begin(); drop_it != to_drop.end(); ++drop_it) {
959  (*drop_it)->data_dropped(true);
960  }
961 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RtpsReaderMultiMap readers_of_writer_
RcHandle< RtpsWriter > RtpsWriter_rch
RcHandle< RtpsReader > RtpsReader_rch
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
void remove_locator_and_bundling_cache(const GUID_t &remote_id)

◆ remove_all_msgs()

void OpenDDS::DCPS::RtpsUdpDataLink::remove_all_msgs ( const GUID_t pub_id)
virtual

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 166 of file RtpsUdpDataLink.cpp.

References ACE_Guard< ACE_LOCK >::release(), writers_, and writers_lock_.

167 {
169  RtpsWriter_rch writer;
170  RtpsWriterMap::iterator iter = writers_.find(pub_id);
171  if (iter != writers_.end()) {
172  writer = iter->second;
173  }
174 
175  g.release();
176 
177  if (writer) {
178  writer->remove_all_msgs();
179  }
180 }
RcHandle< RtpsWriter > RtpsWriter_rch

◆ remove_locator_and_bundling_cache()

void OpenDDS::DCPS::RtpsUdpDataLink::remove_locator_and_bundling_cache ( const GUID_t remote_id)

◆ remove_sample()

RemoveResult OpenDDS::DCPS::RtpsUdpDataLink::remove_sample ( const DataSampleElement sample)
virtual

This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 147 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DataSampleElement::get_pub_id(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::REMOVE_NOT_FOUND, writers_, and writers_lock_.

148 {
149  GUID_t pub_id = sample->get_pub_id();
150 
152  RtpsWriter_rch writer;
153  RtpsWriterMap::iterator iter = writers_.find(pub_id);
154  if (iter != writers_.end()) {
155  writer = iter->second;
156  }
157 
158  g.release();
159 
160  if (writer) {
161  return writer->remove_sample(sample);
162  }
163  return REMOVE_NOT_FOUND;
164 }
RcHandle< RtpsWriter > RtpsWriter_rch

◆ requires_inline_qos()

bool OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos ( const GUIDSeq_var &  peers)

Definition at line 1427 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, force_inline_qos_, locators_, and locators_lock_.

Referenced by customize_queue_element(), OpenDDS::DCPS::RtpsUdpTransport::get_last_recv_locator(), and update_locators().

1428 {
1429  if (force_inline_qos_) {
1430  // Force true for testing purposes
1431  return true;
1432  } else {
1433  if (!peers.ptr()) {
1434  return false;
1435  }
1437  for (CORBA::ULong i = 0; i < peers->length(); ++i) {
1438  const RemoteInfoMap::const_iterator iter = locators_.find(peers[i]);
1439  if (iter != locators_.end() && iter->second.requires_inline_qos_) {
1440  return true;
1441  }
1442  }
1443  return false;
1444  }
1445 }
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
static bool force_inline_qos_
static member used by testing code to force inline qos

◆ security_config()

Security::SecurityConfig_rch OpenDDS::DCPS::RtpsUdpDataLink::security_config ( ) const
inline

◆ send_heartbeats()

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats ( const MonotonicTimePoint now)
private

Definition at line 4080 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::HEARTBEAT, heartbeat_counts_, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::DCPS::insert(), interesting_readers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, OPENDDS_MAP_CMP(), OPENDDS_VECTOR(), queue_submessages(), OpenDDS::DCPS::DiscoveryListener::reader_does_not_exist(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::to_rtps_seqnum(), writers_, writers_lock_, and OpenDDS::DCPS::SequenceNumber::ZERO().

4081 {
4082  OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks;
4083 
4084  RtpsUdpInst_rch cfg = config();
4085 
4086  MetaSubmessageVec meta_submessages;
4087 
4088  {
4090 
4091  const MonotonicTimePoint tv = now - 10 * cfg->heartbeat_period_;
4092  const MonotonicTimePoint tv3 = now - 3 * cfg->heartbeat_period_;
4093 
4094  typedef OPENDDS_MAP_CMP(GUID_t, RcHandle<ConstSharedRepoIdSet>, GUID_tKeyLessThan) WtaMap;
4095  WtaMap writers_to_advertise;
4096 
4097  for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
4098  limit = interesting_readers_.end();
4099  pos != limit;
4100  ++pos) {
4101  if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
4102  (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
4103  RcHandle<ConstSharedRepoIdSet>& tg = writers_to_advertise[pos->second.localid];
4104  if (!tg) {
4105  tg = make_rch<ConstSharedRepoIdSet>();
4106  }
4107  const_cast<RepoIdSet&>(tg->guids_).insert(pos->first);
4108  }
4109  if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
4110  CallbackType callback(pos->first, pos->second);
4111  readerDoesNotExistCallbacks.push_back(callback);
4112  pos->second.status = InterestingRemote::DOES_NOT_EXIST;
4113  }
4114  }
4115 
4116  for (WtaMap::const_iterator pos = writers_to_advertise.begin(),
4117  limit = writers_to_advertise.end();
4118  pos != limit;
4119  ++pos) {
4120  RtpsWriterMap::const_iterator wpos = writers_.find(pos->first);
4121  if (wpos != writers_.end()) {
4122  wpos->second->gather_heartbeats(pos->second, meta_submessages);
4123  } else {
4124  using namespace OpenDDS::RTPS;
4125  const int count = ++heartbeat_counts_[pos->first.entityId];
4126  const HeartBeatSubmessage hb = {
4128  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4129  pos->first.entityId,
4130  to_rtps_seqnum(SequenceNumber(1)),
4132  {count}
4133  };
4134 
4135  for (RepoIdSet::const_iterator it = pos->second->guids_.begin(),
4136  limit = pos->second->guids_.end(); it != limit; ++it) {
4137 
4138  MetaSubmessage meta_submessage(pos->first, *it);
4139  meta_submessage.sm_.heartbeat_sm(hb);
4140  meta_submessages.push_back(meta_submessage);
4141  }
4142  }
4143  }
4144  }
4145 
4146  queue_submessages(meta_submessages);
4147 
4148  for (OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks.begin();
4149  iter != readerDoesNotExistCallbacks.end(); ++iter) {
4150  const InterestingRemote& remote = iter->second;
4151  remote.listener->reader_does_not_exist(iter->first, remote.localid);
4152  }
4153 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
const octet FLAG_E
Definition: RtpsCore.idl:521
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
GuidSet RepoIdSet
Definition: GuidUtils.h:113
void queue_submessages(MetaSubmessageVec &meta_submessages)
RtpsUdpInst_rch config() const
typedef OPENDDS_MAP_CMP(GUID_t, RemoteInfo, GUID_tKeyLessThan) RemoteInfoMap
InterestingRemoteMapType interesting_readers_
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
static SequenceNumber ZERO()
std::pair< GUID_t, InterestingRemote > CallbackType
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ send_heartbeats_manual_i()

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual_i ( const TransportSendControlElement tsce,
MetaSubmessageVec &  meta_submessages 
)
private

Definition at line 4358 of file RtpsUdpDataLink.cpp.

References best_effort_heartbeat_count_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::FLAG_L, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::DCPS::TransportSendControlElement::publication_id(), OpenDDS::DCPS::TransportSendControlElement::sequence(), OpenDDS::DCPS::MetaSubmessage::sm_, and OpenDDS::RTPS::to_rtps_seqnum().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper(), and customize_queue_element_non_reliable_i().

4359 {
4360  using namespace OpenDDS::RTPS;
4361 
4362  const GUID_t pub_id = tsce->publication_id();
4363  const HeartBeatSubmessage hb = {
4364  {HEARTBEAT,
4366  HEARTBEAT_SZ},
4367  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4368  pub_id.entityId,
4369  // This liveliness heartbeat is from a best-effort Writer, the sequence numbers are not used
4370  to_rtps_seqnum(SequenceNumber(1)),
4371  to_rtps_seqnum(tsce->sequence()),
4373  };
4374 
4375  MetaSubmessage meta_submessage(pub_id, GUID_UNKNOWN);
4376  meta_submessage.sm_.heartbeat_sm(hb);
4377  meta_submessages.push_back(meta_submessage);
4378 }
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
const octet FLAG_E
Definition: RtpsCore.idl:521
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const octet FLAG_F
Definition: RtpsCore.idl:523
const octet FLAG_L
Definition: RtpsCore.idl:527
ACE_CDR::Octet Octet
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ send_strategy()

RtpsUdpSendStrategy_rch OpenDDS::DCPS::RtpsUdpDataLink::send_strategy ( )
private

◆ separate_message()

bool OpenDDS::DCPS::RtpsUdpDataLink::separate_message ( EntityId_t  entity)
static

Definition at line 2482 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER, and OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER.

Referenced by build_meta_submessage_map(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_encoded().

2483 {
2484  // submessages generated by these entities may not be combined
2485  // with other submessages when using full-message protection
2486  // DDS Security v1.1 8.4.2.4 Table 27 is_rtps_protected
2487  using namespace RTPS;
2492 }
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER
Definition: MessageTypes.h:83
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
Definition: MessageTypes.h:85
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER
Definition: MessageTypes.h:86
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER
Definition: MessageTypes.h:84

◆ stop_i()

void OpenDDS::DCPS::RtpsUdpDataLink::stop_i ( )
privatevirtual

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 964 of file RtpsUdpDataLink.cpp.

References ACE_SOCK::close(), heartbeat_, heartbeatchecker_, multicast_socket_, network_interface_address_reader_, TheServiceParticipant, and unicast_socket_.

Referenced by open().

965 {
966  TheServiceParticipant->network_interface_address_topic()->disconnect(network_interface_address_reader_);
967 
968  heartbeat_->disable();
969  heartbeatchecker_->disable();
972 #ifdef ACE_HAS_IPV6
973  ipv6_unicast_socket_.close();
974  ipv6_multicast_socket_.close();
975 #endif
976 }
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
ACE_SOCK_Dgram_Mcast multicast_socket_
#define TheServiceParticipant
RcHandle< PeriodicEvent > heartbeatchecker_
int close(void)
RcHandle< PeriodicEvent > heartbeat_

◆ submsgs_to_msgblock()

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpDataLink::submsgs_to_msgblock ( const RTPS::SubmessageSeq subm)
private

Definition at line 1057 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Encoding::align(), OpenDDS::DCPS::Serializer::align_w(), alloc_msgblock(), custom_allocator_, OpenDDS::STUN::encoding(), OpenDDS::DCPS::Encoding::KIND_XCDR1, OpenDDS::DCPS::serialized_size(), and OpenDDS::RTPS::SMHDR_SZ.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper(), and customize_queue_element_non_reliable_i().

1058 {
1059  // byte swapping is handled in the operator<<() implementation
1060  const Encoding encoding(Encoding::KIND_XCDR1);
1061  size_t size = 0;
1062  for (CORBA::ULong i = 0; i < subm.length(); ++i) {
1063  encoding.align(size, RTPS::SMHDR_SZ);
1064  serialized_size(encoding, size, subm[i]);
1065  }
1066 
1068 
1069  Serializer ser(hdr, encoding);
1070  for (CORBA::ULong i = 0; i < subm.length(); ++i) {
1071  ser << subm[i];
1072  ser.align_w(RTPS::SMHDR_SZ);
1073  }
1074  return hdr;
1075 }
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > custom_allocator_
ACE_CDR::ULong ULong
ACE_Message_Block * alloc_msgblock(size_t size, ACE_Allocator *data_allocator)
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
void align(size_t &value, size_t by=(std::numeric_limits< size_t >::max)()) const
Align "value" to "by" and according to the stream&#39;s alignment.
Definition: Serializer.inl:118

◆ transport()

RtpsUdpTransport_rch OpenDDS::DCPS::RtpsUdpDataLink::transport ( void  )

Definition at line 4576 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::dynamic_rchandle_cast(), and OpenDDS::DCPS::DataLink::impl().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i().

4577 {
4578  return dynamic_rchandle_cast<RtpsUdpTransport>(impl());
4579 }
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214

◆ unicast_socket()

ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket ( )

◆ unregister_for_reader()

void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_reader ( const GUID_t writerid,
const GUID_t readerid 
)

Definition at line 701 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, interesting_readers_, and writers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::unregister_for_reader().

703 {
705  for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
706  limit = interesting_readers_.upper_bound(readerid);
707  pos != limit;
708  ) {
709  if (pos->second.localid == writerid) {
710  interesting_readers_.erase(pos++);
711  } else {
712  ++pos;
713  }
714  }
715 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
InterestingRemoteMapType interesting_readers_

◆ unregister_for_writer()

void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_writer ( const GUID_t readerid,
const GUID_t writerid 
)

Definition at line 737 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, interesting_writers_, and readers_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpTransport::unregister_for_writer().

739 {
741  for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
742  limit = interesting_writers_.upper_bound(writerid);
743  pos != limit;
744  ) {
745  if (pos->second.localid == readerid) {
746  interesting_writers_.erase(pos++);
747  } else {
748  ++pos;
749  }
750  }
751 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
InterestingRemoteMapType interesting_writers_

◆ update_last_recv_addr()

void OpenDDS::DCPS::RtpsUdpDataLink::update_last_recv_addr ( const GUID_t src,
const NetworkAddress addr,
const MonotonicTimePoint now = MonotonicTimePoint::now() 
)
private

Definition at line 1532 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, config(), OpenDDS::DCPS::is_more_local(), locators_, locators_lock_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), and remove_locator_and_bundling_cache().

Referenced by associated(), and received().

1533 {
1534  RtpsUdpInst_rch cfg = config();
1535 
1536  if (!cfg) {
1537  return;
1538  }
1539 
1540  if (addr == cfg->rtps_relay_address()) {
1541  return;
1542  }
1543 
1544  bool remove_cache = false;
1545  {
1547  const RemoteInfoMap::iterator pos = locators_.find(src);
1548  if (pos != locators_.end()) {
1549  const bool expired = cfg->receive_address_duration_ < (MonotonicTimePoint::now() - pos->second.last_recv_time_);
1550  const bool allow_update = expired ||
1551  pos->second.last_recv_addr_ == addr ||
1552  is_more_local(pos->second.last_recv_addr_, addr);
1553  if (allow_update) {
1554  remove_cache = pos->second.last_recv_addr_ != addr;
1555  pos->second.last_recv_addr_ = addr;
1556  pos->second.last_recv_time_ = now;
1557  }
1558  }
1559  }
1560  if (remove_cache) {
1562  }
1563 }
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool is_more_local(const NetworkAddress &current, const NetworkAddress &incoming)
RtpsUdpInst_rch config() const
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
void remove_locator_and_bundling_cache(const GUID_t &remote_id)

◆ update_locators()

void OpenDDS::DCPS::RtpsUdpDataLink::update_locators ( const GUID_t remote_id,
AddrSet &  unicast_addresses,
AddrSet &  multicast_addresses,
bool  requires_inline_qos,
bool  add_ref 
)

Definition at line 481 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogAddr::c_str(), OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, LM_INFO, locators_, locators_lock_, OpenDDS::DCPS::RtpsUdpDataLink::RemoteInfo::multicast_addrs_, OpenDDS::DCPS::RtpsUdpDataLink::RemoteInfo::ref_count_, remove_locator_and_bundling_cache(), requires_inline_qos(), OpenDDS::DCPS::RtpsUdpDataLink::RemoteInfo::requires_inline_qos_, and OpenDDS::DCPS::RtpsUdpDataLink::RemoteInfo::unicast_addrs_.

Referenced by associated(), and OpenDDS::DCPS::RtpsUdpTransport::update_locators().

486 {
487  if (unicast_addresses.empty() && multicast_addresses.empty()) {
488  if (DCPS_debug_level > 0) {
489  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpDataLink::update_locators: no addresses for %C\n"), LogGuid(remote_id).c_str()));
490  }
491  }
492 
494 
496 
497  RemoteInfo& info = locators_[remote_id];
498  const bool log_unicast_change = DCPS_debug_level > 3 && info.unicast_addrs_ != unicast_addresses;
499  const bool log_multicast_change = DCPS_debug_level > 3 && info.multicast_addrs_ != multicast_addresses;
500  info.unicast_addrs_.swap(unicast_addresses);
501  info.multicast_addrs_.swap(multicast_addresses);
502  info.requires_inline_qos_ = requires_inline_qos;
503  if (add_ref) {
504  ++info.ref_count_;
505  }
506 
507  g.release();
508 
509  if (log_unicast_change) {
510  for (AddrSet::const_iterator pos = unicast_addresses.begin(), limit = unicast_addresses.end();
511  pos != limit; ++pos) {
512  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) RtpsUdpDataLink::update_locators %C is now at %C\n"),
513  LogGuid(remote_id).c_str(), LogAddr(*pos).c_str()));
514  }
515  }
516  if (log_multicast_change) {
517  for (AddrSet::const_iterator pos = multicast_addresses.begin(), limit = multicast_addresses.end();
518  pos != limit; ++pos) {
519  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) RtpsUdpDataLink::update_locators %C is now at %C\n"),
520  LogGuid(remote_id).c_str(), LogAddr(*pos).c_str()));
521  }
522  }
523 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool requires_inline_qos(const GUIDSeq_var &peers)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void remove_locator_and_bundling_cache(const GUID_t &remote_id)

◆ update_required_acknack_count()

void OpenDDS::DCPS::RtpsUdpDataLink::update_required_acknack_count ( const GUID_t local_id,
const GUID_t remote_id,
CORBA::Long  current 
)
private

Definition at line 2742 of file RtpsUdpDataLink.cpp.

References writers_, and writers_lock_.

Referenced by bundle_and_send_submessages().

2743 {
2744  RtpsWriter_rch writer;
2745  {
2747  RtpsWriterMap::iterator rw = writers_.find(local_id);
2748  if (rw != writers_.end()) {
2749  writer = rw->second;
2750  }
2751  }
2752  if (writer) {
2753  writer->update_required_acknack_count(remote_id, current);
2754  }
2755 }
RcHandle< RtpsWriter > RtpsWriter_rch

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 266 of file RtpsUdpDataLink.h.

Member Data Documentation

◆ best_effort_heartbeat_count_

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::best_effort_heartbeat_count_
private

Definition at line 860 of file RtpsUdpDataLink.h.

Referenced by send_heartbeats_manual_i().

◆ bundle_allocator_

Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> OpenDDS::DCPS::RtpsUdpDataLink::bundle_allocator_
private

Definition at line 314 of file RtpsUdpDataLink.h.

Referenced by bundle_and_send_submessages().

◆ bundling_cache_

BundlingCache OpenDDS::DCPS::RtpsUdpDataLink::bundling_cache_
mutableprivate

◆ custom_allocator_

Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> OpenDDS::DCPS::RtpsUdpDataLink::custom_allocator_
private

Definition at line 313 of file RtpsUdpDataLink.h.

Referenced by submsgs_to_msgblock().

◆ db_allocator_

DataBlockAllocator OpenDDS::DCPS::RtpsUdpDataLink::db_allocator_
private

Definition at line 312 of file RtpsUdpDataLink.h.

Referenced by alloc_msgblock().

◆ db_lock_pool_

unique_ptr<DataBlockLockPool> OpenDDS::DCPS::RtpsUdpDataLink::db_lock_pool_
private

Definition at line 315 of file RtpsUdpDataLink.h.

Referenced by alloc_msgblock().

◆ event_dispatcher_

EventDispatcher_rch OpenDDS::DCPS::RtpsUdpDataLink::event_dispatcher_
private

Definition at line 272 of file RtpsUdpDataLink.h.

◆ flush_send_queue_sporadic_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::flush_send_queue_sporadic_
private

Definition at line 743 of file RtpsUdpDataLink.h.

Referenced by disable_response_queue(), and ~RtpsUdpDataLink().

◆ force_inline_qos_

bool OpenDDS::DCPS::RtpsUdpDataLink::force_inline_qos_ = false
staticprivate

static member used by testing code to force inline qos

Definition at line 268 of file RtpsUdpDataLink.h.

Referenced by requires_inline_qos().

◆ fsq_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::fsq_mutex_
mutableprivate

Definition at line 735 of file RtpsUdpDataLink.h.

Referenced by disable_response_queue(), and flush_send_queue().

◆ fsq_vec_size_

size_t OpenDDS::DCPS::RtpsUdpDataLink::fsq_vec_size_
private

Definition at line 737 of file RtpsUdpDataLink.h.

Referenced by disable_response_queue(), and flush_send_queue_i().

◆ handle_registry_

Security::HandleRegistry_rch OpenDDS::DCPS::RtpsUdpDataLink::handle_registry_
private

Definition at line 933 of file RtpsUdpDataLink.h.

Referenced by RtpsUdpDataLink().

◆ harvest_send_queue_sporadic_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::harvest_send_queue_sporadic_
private

Definition at line 740 of file RtpsUdpDataLink.h.

Referenced by queue_submessages(), and ~RtpsUdpDataLink().

◆ heartbeat_

RcHandle<PeriodicEvent> OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_
private

◆ heartbeat_counts_

CountMapType OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_counts_
private

Definition at line 905 of file RtpsUdpDataLink.h.

Referenced by associated(), pre_stop_i(), register_for_reader(), and send_heartbeats().

◆ heartbeatchecker_

RcHandle<PeriodicEvent> OpenDDS::DCPS::RtpsUdpDataLink::heartbeatchecker_
private

Definition at line 863 of file RtpsUdpDataLink.h.

Referenced by register_for_writer(), and stop_i().

◆ ice_agent_

RcHandle<ICE::Agent> OpenDDS::DCPS::RtpsUdpDataLink::ice_agent_
private

Definition at line 935 of file RtpsUdpDataLink.h.

Referenced by accumulate_addresses(), and get_ice_agent().

◆ interesting_readers_

InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_readers_
private

◆ interesting_writers_

InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_writers_
private

◆ job_queue_

RcHandle<JobQueue> OpenDDS::DCPS::RtpsUdpDataLink::job_queue_
private

Definition at line 271 of file RtpsUdpDataLink.h.

Referenced by RtpsUdpDataLink().

◆ local_crypto_handle_

DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle_
private

Definition at line 934 of file RtpsUdpDataLink.h.

Referenced by local_crypto_handle().

◆ local_prefix_

GuidPrefix_t OpenDDS::DCPS::RtpsUdpDataLink::local_prefix_
private

Definition at line 277 of file RtpsUdpDataLink.h.

Referenced by received(), and RtpsUdpDataLink().

◆ locator_cache_

LocatorCache OpenDDS::DCPS::RtpsUdpDataLink::locator_cache_
mutableprivate

Definition at line 301 of file RtpsUdpDataLink.h.

Referenced by accumulate_addresses(), and remove_locator_and_bundling_cache().

◆ locators_

RemoteInfoMap OpenDDS::DCPS::RtpsUdpDataLink::locators_
private

◆ locators_lock_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::locators_lock_
mutableprivate

◆ max_bundle_size_

const size_t OpenDDS::DCPS::RtpsUdpDataLink::max_bundle_size_
private

Definition at line 907 of file RtpsUdpDataLink.h.

Referenced by bundle_mapped_meta_submessages().

◆ mb_allocator_

MessageBlockAllocator OpenDDS::DCPS::RtpsUdpDataLink::mb_allocator_
private

Definition at line 311 of file RtpsUdpDataLink.h.

Referenced by alloc_msgblock().

◆ multi_buff_

OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer OpenDDS::DCPS::RtpsUdpDataLink::multi_buff_
private

Referenced by associated(), and open().

◆ multicast_manager_

MulticastManager OpenDDS::DCPS::RtpsUdpDataLink::multicast_manager_
private

Definition at line 941 of file RtpsUdpDataLink.h.

Referenced by on_data_available().

◆ multicast_socket_

ACE_SOCK_Dgram_Mcast OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket_
private

Definition at line 305 of file RtpsUdpDataLink.h.

Referenced by multicast_socket(), on_data_available(), open(), and stop_i().

◆ network_interface_address_reader_

RcHandle<InternalDataReader<NetworkInterfaceAddress> > OpenDDS::DCPS::RtpsUdpDataLink::network_interface_address_reader_
private

Definition at line 940 of file RtpsUdpDataLink.h.

Referenced by on_data_available(), open(), and stop_i().

◆ pending_reliable_readers_

RepoIdSet OpenDDS::DCPS::RtpsUdpDataLink::pending_reliable_readers_
private

Definition at line 745 of file RtpsUdpDataLink.h.

Referenced by associated(), make_reservation(), and received().

◆ reactor_task_

ReactorTask_rch OpenDDS::DCPS::RtpsUdpDataLink::reactor_task_
private

Definition at line 270 of file RtpsUdpDataLink.h.

Referenced by get_reactor(), get_reactor_interceptor(), and reactor_is_shut_down().

◆ readers_

RtpsReaderMap OpenDDS::DCPS::RtpsUdpDataLink::readers_
private

◆ readers_lock_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::readers_lock_
mutableprivate

What was once a single lock for the whole datalink is now split between three (four including ch_lock_):

  • readers_lock_ protects readers_, readers_of_writer_, pending_reliable_readers_, interesting_writers_, and writer_to_seq_best_effort_readers_ along with anything else that fits the 'reader side activity' of the datalink
  • writers_lock_ protects writers_, heartbeat_counts_ best_effort_heartbeat_count_, and interesting_readers_ along with anything else that fits the 'writers side activity' of the datalink
  • locators_lock_ protects locators_ (and therefore calls to get_addresses_i()) for both remote writers and remote readers
  • send_queues_lock protects thread_send_queues_

Definition at line 767 of file RtpsUdpDataLink.h.

Referenced by accumulate_addresses(), associated(), check_heartbeats(), client_stop(), filterBestEffortReaders(), make_reservation(), pre_stop_i(), received(), register_for_writer(), release_reservations_i(), and unregister_for_writer().

◆ readers_of_writer_

RtpsReaderMultiMap OpenDDS::DCPS::RtpsUdpDataLink::readers_of_writer_
private

Definition at line 755 of file RtpsUdpDataLink.h.

Referenced by associated(), client_stop(), received(), and release_reservations_i().

◆ security_config_

Security::SecurityConfig_rch OpenDDS::DCPS::RtpsUdpDataLink::security_config_
private

Definition at line 932 of file RtpsUdpDataLink.h.

Referenced by RtpsUdpDataLink().

◆ security_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::security_mutex_
mutableprivate

Definition at line 931 of file RtpsUdpDataLink.h.

Referenced by local_crypto_handle().

◆ sq_

TransactionalRtpsSendQueue OpenDDS::DCPS::RtpsUdpDataLink::sq_
private

◆ transport_statistics_

InternalTransportStatistics& OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_
private

◆ transport_statistics_mutex_

ACE_Thread_Mutex& OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_mutex_
private

◆ unicast_socket_

ACE_SOCK_Dgram OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket_
private

Definition at line 304 of file RtpsUdpDataLink.h.

Referenced by open(), stop_i(), and unicast_socket().

◆ writer_to_seq_best_effort_readers_

WriterToSeqReadersMap OpenDDS::DCPS::RtpsUdpDataLink::writer_to_seq_best_effort_readers_
private

◆ writers_

RtpsWriterMap OpenDDS::DCPS::RtpsUdpDataLink::writers_
private

◆ writers_lock_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::writers_lock_
mutableprivate

The documentation for this class was generated from the following files: