OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::RtpsUdpTransport Class Reference

#include <RtpsUdpTransport.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpTransport:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpTransport:
Collaboration graph
[legend]

Classes

struct  IceEndpoint
 

Public Member Functions

 RtpsUdpTransport (const RtpsUdpInst_rch &inst)
 
RtpsUdpInst_rch config () const
 
DCPS::RcHandle< ICE::Agentget_ice_agent () const
 
virtual DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
virtual void rtps_relay_only_now (bool flag)
 
virtual void use_rtps_relay_now (bool flag)
 
virtual void use_ice_now (bool flag)
 
ICE::ServerReflexiveStateMachinerelay_srsm ()
 
void process_relay_sra (ICE::ServerReflexiveStateMachine::StateChange)
 
void disable_relay_stun_task ()
 
virtual void update_locators (const GUID_t &, const TransportLocatorSeq &)
 
virtual void get_last_recv_locator (const GUID_t &, TransportLocator &)
 
void rtps_relay_address_change ()
 
void append_transport_statistics (TransportStatisticsSequence &seq)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportImpl
virtual ~TransportImpl ()
 
virtual void unbind_link (DataLink *link)
 Remove any pending_release mappings. More...
 
bool release_link_resources (DataLink *link)
 
TransportInst_rch config () const
 
ACE_Reactor_Timer_Interfacetimer () const
 Interface to the transport's reactor for scheduling timers. More...
 
ACE_Reactorreactor () const
 
ACE_thread_t reactor_owner () const
 
bool is_shut_down () const
 
void create_reactor_task (bool useAsyncSend=false, const OPENDDS_STRING &name="")
 
void dump ()
 Diagnostic aid. More...
 
OPENDDS_STRING dump_to_str ()
 
void report ()
 
ReactorTask_rch reactor_task ()
 
EventDispatcher_rch event_dispatcher ()
 
int acquire ()
 
int tryacquire ()
 
int release ()
 
int remove ()
 
bool connection_info (TransportLocator &local_info, ConnectionInfoFlags flags) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Types

typedef ACE_Thread_Mutex ThreadLockType
 
typedef ACE_Guard< ThreadLockTypeGuardThreadType
 
typedef ACE_SYNCH_MUTEX LockType
 This protects the connections_ data member. More...
 
typedef ACE_Guard< LockTypeGuardType
 
typedef PmfSporadicTask< RtpsUdpTransportSporadic
 

Private Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
 
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
 
virtual void stop_accepting_or_connecting (const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
 
bool configure_i (const RtpsUdpInst_rch &config)
 
void client_stop (const GUID_t &localId)
 
virtual void shutdown_i ()
 
virtual void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
virtual void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
virtual void register_for_writer (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
 
virtual void unregister_for_writer (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual bool connection_info_i (TransportLocator &info, ConnectionInfoFlags flags) const
 
void get_connection_addrs (const TransportBLOB &data, AddrSet *uc_addrs, AddrSet *mc_addrs=0, bool *requires_inline_qos=0, unsigned int *blob_bytes_read=0) const
 
virtual void release_datalink (DataLink *link)
 
virtual OPENDDS_STRING transport_type () const
 
RtpsUdpDataLink_rch make_datalink (const GuidPrefix_t &local_prefix)
 
bool use_datalink (const GUID_t &local_id, const GUID_t &remote_id, const TransportBLOB &remote_data, const TransportBLOB &discovery_locator, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, SequenceNumber max_sn, const TransportClient_rch &client)
 
void local_crypto_handle (DDS::Security::ParticipantCryptoHandle pch)
 
void relay_stun_task (const MonotonicTimePoint &now)
 
void start_ice ()
 
void stop_ice ()
 

Private Attributes

ThreadLockType links_lock_
 
LockType connections_lock_
 
RcHandle< BitSubscriberbit_sub_
 
GuidPrefix_t local_prefix_
 
RtpsUdpDataLink_rch link_
 
ACE_SOCK_Dgram unicast_socket_
 
TransportClient_wrch default_listener_
 
JobQueue_rch job_queue_
 
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
 
ConnectionRecords deferred_connection_records_
 
RcHandle< IceEndpointice_endpoint_
 
RcHandle< Sporadicrelay_stun_task_
 
FibonacciSequence< TimeDurationrelay_stun_task_falloff_
 
ThreadLockType relay_stun_task_falloff_mutex_
 
ICE::ServerReflexiveStateMachine relay_srsm_
 
RcHandle< ICE::Agentice_agent_
 
InternalTransportStatistics transport_statistics_
 
ACE_Thread_Mutex transport_statistics_mutex_
 

Friends

class RtpsUdpSendStrategy
 
class RtpsUdpReceiveStrategy
 

Additional Inherited Members

- Public Attributes inherited from OpenDDS::DCPS::TransportImpl
LockType lock_
 Lock to protect the config_ and reactor_task_ data members. More...
 
WeakRcHandle< TransportInstconfig_
 
ReactorTask_rch reactor_task_
 
EventDispatcher_rch event_dispatcher_
 smart ptr to the associated DL cleanup task More...
 
unique_ptr< Monitormonitor_
 Monitor object for this entity. More...
 
- Protected Types inherited from OpenDDS::DCPS::TransportImpl
typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportImpl
 TransportImpl (TransportInst_rch config)
 
bool open ()
 
typedef OPENDDS_MULTIMAP (TransportClient_wrch, DataLink_rch) PendConnMap
 
void add_pending_connection (const TransportClient_rch &client, DataLink_rch link)
 
void shutdown ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from OpenDDS::DCPS::TransportImpl
LockType pending_connections_lock_
 Lock to protect the pending_connections_ data member. More...
 
PendConnMap pending_connections_
 
AtomicBool is_shut_down_
 Id of the last link established. More...
 

Detailed Description

Definition at line 30 of file RtpsUdpTransport.h.

Member Typedef Documentation

◆ GuardThreadType

Definition at line 127 of file RtpsUdpTransport.h.

◆ GuardType

Definition at line 132 of file RtpsUdpTransport.h.

◆ LockType

This protects the connections_ data member.

Definition at line 131 of file RtpsUdpTransport.h.

◆ Sporadic

Definition at line 180 of file RtpsUdpTransport.h.

◆ ThreadLockType

Definition at line 126 of file RtpsUdpTransport.h.

Constructor & Destructor Documentation

◆ RtpsUdpTransport()

OpenDDS::DCPS::RtpsUdpTransport::RtpsUdpTransport ( const RtpsUdpInst_rch inst)

Definition at line 34 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::assign(), configure_i(), OpenDDS::DCPS::GUIDPREFIX_UNKNOWN, if(), local_prefix_, and OpenDDS::DCPS::TransportImpl::open().

35  : TransportImpl(inst)
36 #if defined(OPENDDS_SECURITY)
38 #endif
39 #ifdef OPENDDS_SECURITY
40  , ice_endpoint_(make_rch<IceEndpoint>(ref(*this)))
43 #endif
44  , transport_statistics_(inst->name())
45 {
47  if (!(configure_i(inst) && open())) {
48  throw Transport::UnableToCreate();
49  }
50 }
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
const InstanceHandle_t HANDLE_NIL
static const TimeDuration zero_value
Definition: TimeDuration.h:31
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
RcHandle< ICE::Agent > ice_agent_
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
InternalTransportStatistics transport_statistics_
static DCPS::RcHandle< Agent > instance()
Definition: Ice.cpp:122
TransportImpl(TransportInst_rch config)
bool configure_i(const RtpsUdpInst_rch &config)
RcHandle< IceEndpoint > ice_endpoint_

Member Function Documentation

◆ accept_datalink()

TransportImpl::AcceptConnectResult OpenDDS::DCPS::RtpsUdpTransport::accept_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
)
privatevirtual

accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 240 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::TransportImpl::add_pending_connection(), bit_sub_, OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, connections_lock_, OpenDDS::DCPS::TransportImpl::RemoteTransport::context_, OpenDDS::DCPS::TransportImpl::RemoteTransport::discovery_blob_, OpenDDS::DCPS::TransportImpl::RemoteTransport::durable_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportImpl::is_shut_down(), link_, links_lock_, LM_DEBUG, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_durable_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_reliable_, make_datalink(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::max_sn_, OpenDDS::DCPS::TransportImpl::RemoteTransport::participant_discovered_at_, OpenDDS::DCPS::TransportImpl::RemoteTransport::reliable_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, use_datalink(), and VDBG_LVL.

243 {
244  bit_sub_ = client->get_builtin_subscriber_proxy();
245 
246  GuardThreadType guard_links(links_lock_);
247 
248  if (is_shut_down()) {
249  return AcceptConnectResult();
250  }
251 
252  if (!link_) {
253  link_ = make_datalink(attribs.local_id_.guidPrefix);
254  if (!link_) {
255  return AcceptConnectResult();
256  }
257  }
258  RtpsUdpDataLink_rch link = link_;
259 
260  if (use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_, remote.discovery_blob_, remote.participant_discovered_at_,
261  remote.context_,
262  attribs.local_reliable_, remote.reliable_,
263  attribs.local_durable_, remote.durable_, attribs.max_sn_, client)) {
264  return AcceptConnectResult(link);
265  }
266 
268  add_pending_connection(client, link);
269  VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::accept_datalink pending.\n"), 2);
270  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
271 }
bool use_datalink(const GUID_t &local_id, const GUID_t &remote_id, const TransportBLOB &remote_data, const TransportBLOB &discovery_locator, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, SequenceNumber max_sn, const TransportClient_rch &client)
ACE_Guard< LockType > GuardType
RcHandle< BitSubscriber > bit_sub_
RtpsUdpDataLink_rch make_datalink(const GuidPrefix_t &local_prefix)
#define VDBG_LVL(DBG_ARGS, LEVEL)
ACE_Guard< ThreadLockType > GuardThreadType
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ append_transport_statistics()

void OpenDDS::DCPS::RtpsUdpTransport::append_transport_statistics ( TransportStatisticsSequence seq)
virtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 576 of file RtpsUdpTransport.cpp.

References ACE_GUARD, OpenDDS::DCPS::append(), OpenDDS::DCPS::InternalTransportStatistics::clear(), transport_statistics_, and transport_statistics_mutex_.

577 {
581 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex transport_statistics_mutex_
void append(TransportStatisticsSequence &seq, const InternalTransportStatistics &istats)
InternalTransportStatistics transport_statistics_

◆ client_stop()

void OpenDDS::DCPS::RtpsUdpTransport::client_stop ( const GUID_t localId)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 719 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::client_stop(), link_, links_lock_, and ACE_Guard< ACE_LOCK >::release().

720 {
721  GuardThreadType guard_links(links_lock_);
722  const RtpsUdpDataLink_rch link = link_;
723  guard_links.release();
724  if (link) {
725  link->client_stop(localId);
726  }
727 }
void client_stop(const GUID_t &localId)
ACE_Guard< ThreadLockType > GuardThreadType
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ config()

RtpsUdpInst_rch OpenDDS::DCPS::RtpsUdpTransport::config ( ) const

◆ configure_i()

bool OpenDDS::DCPS::RtpsUdpTransport::configure_i ( const RtpsUdpInst_rch config)
private

Definition at line 584 of file RtpsUdpTransport.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), ACE_IPC_SAP::control(), OpenDDS::DCPS::TransportImpl::create_reactor_task(), OpenDDS::DCPS::DataLink::default_listener(), ACE_SOCK::get_local_addr(), OpenDDS::DCPS::ReactorTask::get_reactor(), OpenDDS::DCPS::ReactorTask::interceptor(), OpenDDS::DCPS::LogAddr::ip(), IPPROTO_IP, OpenDDS::DCPS::NetworkAddress::is_any(), job_queue_, link_, links_lock_, LM_ERROR, make_datalink(), ACE_SOCK_Dgram::open(), PF_INET, OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::TransportImpl::reactor(), OpenDDS::DCPS::TransportImpl::reactor_task(), OpenDDS::DCPS::TransportImpl::reactor_task_, relay_stun_task(), relay_stun_task_, relay_stun_task_falloff_, relay_stun_task_falloff_mutex_, ACE_SOCK::set_option(), start_ice(), TheServiceParticipant, unicast_socket_, and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by RtpsUdpTransport().

585 {
586  if (!config) {
587  return false;
588  }
589 
590  // Override with DCPSDefaultAddress.
591  if (config->local_address() == NetworkAddress() &&
592  TheServiceParticipant->default_address() != NetworkAddress()) {
593  config->local_address(TheServiceParticipant->default_address());
594  }
595  if (config->multicast_interface_.empty() &&
596  TheServiceParticipant->default_address().to_addr() != ACE_INET_Addr()) {
597  config->multicast_interface_ = DCPS::LogAddr::ip(TheServiceParticipant->default_address().to_addr());
598  }
599 
600  // Open the socket here so that any addresses/ports left
601  // unspecified in the RtpsUdpInst are known by the time we get to
602  // connection_info_i(). Opening the sockets here also allows us to
603  // detect and report errors during DataReader/Writer setup instead
604  // of during association.
605 
606  ACE_INET_Addr address = config->local_address().to_addr();
607 
608  if (unicast_socket_.open(address, PF_INET) != 0) {
609  ACE_ERROR_RETURN((LM_ERROR,
610  ACE_TEXT("(%P|%t) ERROR: ")
611  ACE_TEXT("RtpsUdpTransport::configure_i: open:")
612  ACE_TEXT("%m\n")),
613  false);
614  }
615 
616 #ifdef ACE_WIN32
617  // By default Winsock will cause reads to fail with "connection reset"
618  // when UDP sends result in ICMP "port unreachable" messages.
619  // The transport framework is not set up for this since returning <= 0
620  // from our receive_bytes causes the framework to close down the datalink
621  // which in this case is used to receive from multiple peers.
622  {
623  BOOL recv_udp_connreset = FALSE;
624  unicast_socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
625  }
626 #endif
627 
628  if (unicast_socket_.get_local_addr(address) != 0) {
629  ACE_ERROR_RETURN((LM_ERROR,
630  ACE_TEXT("(%P|%t) ERROR: ")
631  ACE_TEXT("RtpsUdpTransport::configure_i: get_local_addr:")
632  ACE_TEXT("%m\n")),
633  false);
634  }
635 
636  config->local_address(NetworkAddress(address));
637 
638 #ifdef ACE_RECVPKTINFO
639  int sockopt = 1;
640  if (unicast_socket_.set_option(IPPROTO_IP, ACE_RECVPKTINFO, &sockopt, sizeof sockopt) == -1) {
641  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpTransport::configure_i: set_option: %m\n")), false);
642  }
643 #endif
644 
645 #ifdef ACE_HAS_IPV6
646  address = config->ipv6_local_address().to_addr();
647 
648  if (ipv6_unicast_socket_.open(address, PF_INET6) != 0) {
649  ACE_ERROR_RETURN((LM_ERROR,
650  ACE_TEXT("(%P|%t) ERROR: ")
651  ACE_TEXT("RtpsUdpTransport::configure_i: open:")
652  ACE_TEXT("%m\n")),
653  false);
654  }
655 
656 #ifdef ACE_WIN32
657  // By default Winsock will cause reads to fail with "connection reset"
658  // when UDP sends result in ICMP "port unreachable" messages.
659  // The transport framework is not set up for this since returning <= 0
660  // from our receive_bytes causes the framework to close down the datalink
661  // which in this case is used to receive from multiple peers.
662  {
663  BOOL recv_udp_connreset = FALSE;
664  ipv6_unicast_socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
665  }
666 #endif
667 
668  if (ipv6_unicast_socket_.get_local_addr(address) != 0) {
669  ACE_ERROR_RETURN((LM_ERROR,
670  ACE_TEXT("(%P|%t) ERROR: ")
671  ACE_TEXT("RtpsUdpTransport::configure_i: get_local_addr:")
672  ACE_TEXT("%m\n")),
673  false);
674  }
675 
676  NetworkAddress temp(address);
677  if (address.is_ipv4_mapped_ipv6() && temp.is_any()) {
678  temp = NetworkAddress(address.get_port_number(), "::");
679  }
680  config->ipv6_local_address(temp);
681 
682 #ifdef ACE_RECVPKTINFO6
683  if (ipv6_unicast_socket_.set_option(IPPROTO_IPV6, ACE_RECVPKTINFO6, &sockopt, sizeof sockopt) == -1) {
684  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpTransport::configure_i: set_option: %m\n")), false);
685  }
686 #endif
687 #endif
688 
689  create_reactor_task(false, "RtpsUdpTransport" + config->name());
690 
692  job_queue_ = DCPS::make_rch<DCPS::JobQueue>(reactor);
693 
694 #ifdef OPENDDS_SECURITY
695  if (config->use_ice()) {
696  start_ice();
697  }
698 
700 #endif
701 
702  if (config->opendds_discovery_default_listener_) {
703  GuardThreadType guard_links(links_lock_);
704  link_ = make_datalink(config->opendds_discovery_guid_.guidPrefix);
705  link_->default_listener(*config->opendds_discovery_default_listener_);
706  }
707 
708 #ifdef OPENDDS_SECURITY
709  {
711  relay_stun_task_falloff_.set(config->heartbeat_period_);
712  }
714 #endif
715 
716  return true;
717 }
static const String ip(const ACE_INET_Addr &addr)
Definition: LogAddr.cpp:15
ReactorTask_rch reactor_task()
ACE_Reactor * reactor() const
RcHandle< Sporadic > relay_stun_task_
static const TimeDuration zero_value
Definition: TimeDuration.h:31
int set_option(int level, int option, void *optval, int optlen) const
RtpsUdpDataLink_rch make_datalink(const GuidPrefix_t &local_prefix)
void default_listener(const TransportReceiveListener_wrch &trl)
Definition: DataLink.inl:352
void relay_stun_task(const MonotonicTimePoint &now)
ThreadLockType relay_stun_task_falloff_mutex_
int control(int cmd, void *) const
int get_local_addr(ACE_Addr &) const
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
ACE_TEXT("TCP_Factory")
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_Guard< ThreadLockType > GuardThreadType
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
RtpsUdpInst_rch config() const
int open(const ACE_Addr &local, int protocol_family=ACE_PROTOCOL_FAMILY_INET, int protocol=0, int reuse_addr=0, int ipv6_only=0)
#define ACE_ERROR_RETURN(X, Y)
#define TheServiceParticipant
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14

◆ connect_datalink()

TransportImpl::AcceptConnectResult OpenDDS::DCPS::RtpsUdpTransport::connect_datalink ( const RemoteTransport remote,
const ConnectionAttribs attribs,
const TransportClient_rch client 
)
privatevirtual

connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 205 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::TransportImpl::AcceptConnectResult::ACR_SUCCESS, OpenDDS::DCPS::TransportImpl::add_pending_connection(), bit_sub_, OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, connections_lock_, OpenDDS::DCPS::TransportImpl::RemoteTransport::context_, OpenDDS::DCPS::TransportImpl::RemoteTransport::discovery_blob_, OpenDDS::DCPS::TransportImpl::RemoteTransport::durable_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportImpl::is_shut_down(), link_, links_lock_, LM_DEBUG, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_durable_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_id_, OpenDDS::DCPS::TransportImpl::ConnectionAttribs::local_reliable_, make_datalink(), OpenDDS::DCPS::TransportImpl::ConnectionAttribs::max_sn_, OpenDDS::DCPS::TransportImpl::RemoteTransport::participant_discovered_at_, OpenDDS::DCPS::TransportImpl::RemoteTransport::reliable_, OpenDDS::DCPS::TransportImpl::RemoteTransport::repo_id_, use_datalink(), and VDBG_LVL.

208 {
209  bit_sub_ = client->get_builtin_subscriber_proxy();
210 
211  GuardThreadType guard_links(links_lock_);
212 
213  if (is_shut_down()) {
214  return AcceptConnectResult();
215  }
216 
217  if (!link_) {
218  link_ = make_datalink(attribs.local_id_.guidPrefix);
219  if (!link_) {
220  return AcceptConnectResult();
221  }
222  }
223 
224  RtpsUdpDataLink_rch link = link_;
225 
226  if (use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_, remote.discovery_blob_, remote.participant_discovered_at_,
227  remote.context_,
228  attribs.local_reliable_, remote.reliable_,
229  attribs.local_durable_, remote.durable_, attribs.max_sn_, client)) {
230  return AcceptConnectResult(link);
231  }
232 
234  add_pending_connection(client, link);
235  VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2);
236  return AcceptConnectResult(AcceptConnectResult::ACR_SUCCESS);
237 }
bool use_datalink(const GUID_t &local_id, const GUID_t &remote_id, const TransportBLOB &remote_data, const TransportBLOB &discovery_locator, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, SequenceNumber max_sn, const TransportClient_rch &client)
ACE_Guard< LockType > GuardType
RcHandle< BitSubscriber > bit_sub_
RtpsUdpDataLink_rch make_datalink(const GuidPrefix_t &local_prefix)
#define VDBG_LVL(DBG_ARGS, LEVEL)
ACE_Guard< ThreadLockType > GuardThreadType
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ connection_info_i()

bool OpenDDS::DCPS::RtpsUdpTransport::connection_info_i ( TransportLocator local_info,
ConnectionInfoFlags  flags 
) const
privatevirtual

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 393 of file RtpsUdpTransport.cpp.

References config().

394 {
395  RtpsUdpInst_rch cfg = config();
396  if (cfg) {
397  cfg->populate_locator(info, flags);
398  return true;
399  }
400  return false;
401 }
RtpsUdpInst_rch config() const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ disable_relay_stun_task()

void OpenDDS::DCPS::RtpsUdpTransport::disable_relay_stun_task ( )

Definition at line 1075 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::ConnectionRecord::address, bit_sub_, deferred_connection_records_, OpenDDS::DCPS::JobQueue::enqueue(), OpenDDS::DCPS::ConnectionRecord::guid, job_queue_, OpenDDS::DCPS::ConnectionRecord::latency, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OpenDDS::DCPS::ConnectionRecord::protocol, relay_srsm_, relay_stun_task_, OpenDDS::DCPS::RTPS_RELAY_STUN_PROTOCOL, OpenDDS::ICE::ServerReflexiveStateMachine::stun_server_address(), OpenDDS::DCPS::TimeDuration::to_dds_duration(), and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by rtps_relay_only_now(), and use_rtps_relay_now().

1076 {
1077 #ifndef DDS_HAS_MINIMUM_BIT
1078  relay_stun_task_->cancel();
1079 
1080  DCPS::ConnectionRecord connection_record;
1081  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
1082  connection_record.protocol = RTPS_RELAY_STUN_PROTOCOL;
1083  connection_record.latency = TimeDuration::zero_value.to_dds_duration();
1084 
1086  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
1087  deferred_connection_records_.push_back(std::make_pair(false, connection_record));
1088  }
1089 
1090  if (!bit_sub_) {
1091  return;
1092  }
1093 
1094  if (!deferred_connection_records_.empty()) {
1095  job_queue_->enqueue(DCPS::make_rch<WriteConnectionRecords>(bit_sub_, deferred_connection_records_));
1097  }
1098 
1099  relay_srsm_ = ICE::ServerReflexiveStateMachine();
1100 #endif
1101 }
void enqueue(JobPtr job)
Definition: JobQueue.h:61
ICE::ServerReflexiveStateMachine relay_srsm_
const ACE_INET_Addr & stun_server_address() const
Definition: RTPS/ICE/Ice.h:239
RcHandle< Sporadic > relay_stun_task_
static const TimeDuration zero_value
Definition: TimeDuration.h:31
RcHandle< BitSubscriber > bit_sub_
DDS::Duration_t to_dds_duration() const
ConnectionRecords deferred_connection_records_
const string RTPS_RELAY_STUN_PROTOCOL

◆ get_connection_addrs()

void OpenDDS::DCPS::RtpsUdpTransport::get_connection_addrs ( const TransportBLOB data,
AddrSet *  uc_addrs,
AddrSet *  mc_addrs = 0,
bool *  requires_inline_qos = 0,
unsigned int *  blob_bytes_read = 0 
) const
private

Definition at line 362 of file RtpsUdpTransport.cpp.

References OpenDDS::RTPS::blob_to_locators(), config(), ACE_INET_Addr::is_multicast(), OpenDDS::DCPS::locator_to_address(), and DDS::RETCODE_OK.

Referenced by register_for_reader(), register_for_writer(), update_locators(), and use_datalink().

367 {
368  using namespace OpenDDS::RTPS;
369  LocatorSeq locators;
370  DDS::ReturnCode_t result =
371  blob_to_locators(remote, locators, requires_inline_qos, blob_bytes_read);
372  if (result != DDS::RETCODE_OK) {
373  return;
374  }
375 
376  for (CORBA::ULong i = 0; i < locators.length(); ++i) {
377  ACE_INET_Addr addr;
378  // If conversion was successful
379  if (locator_to_address(addr, locators[i], false) == 0) {
380  if (addr.is_multicast()) {
381  RtpsUdpInst_rch cfg = config();
382  if (cfg && cfg->use_multicast_ && mc_addrs) {
383  mc_addrs->insert(NetworkAddress(addr));
384  }
385  } else if (uc_addrs) {
386  uc_addrs->insert(NetworkAddress(addr));
387  }
388  }
389  }
390 }
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t blob_to_locators(const DCPS::TransportBLOB &blob, DCPS::LocatorSeq &locators, bool *requires_inline_qos, unsigned int *pBytesRead)
ACE_CDR::ULong ULong
bool is_multicast(void) const
RtpsUdpInst_rch config() const
int locator_to_address(ACE_INET_Addr &dest, const DCPS::Locator_t &locator, bool map)
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
sequence< Locator_t > LocatorSeq

◆ get_ice_agent()

DCPS::RcHandle< ICE::Agent > OpenDDS::DCPS::RtpsUdpTransport::get_ice_agent ( ) const

Definition at line 60 of file RtpsUdpTransport.cpp.

References ice_agent_.

61 {
62  return ice_agent_;
63 }
RcHandle< ICE::Agent > ice_agent_

◆ get_ice_endpoint()

DCPS::WeakRcHandle< ICE::Endpoint > OpenDDS::DCPS::RtpsUdpTransport::get_ice_endpoint ( )
virtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 67 of file RtpsUdpTransport.cpp.

References config(), and ice_endpoint_.

68 {
69 #ifdef OPENDDS_SECURITY
70  RtpsUdpInst_rch cfg = config();
71  return (cfg && cfg->use_ice()) ? static_rchandle_cast<ICE::Endpoint>(ice_endpoint_) : DCPS::WeakRcHandle<ICE::Endpoint>();
72 #else
73  return DCPS::WeakRcHandle<ICE::Endpoint>();
74 #endif
75 }
RtpsUdpInst_rch config() const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RcHandle< IceEndpoint > ice_endpoint_

◆ get_last_recv_locator()

void OpenDDS::DCPS::RtpsUdpTransport::get_last_recv_locator ( const GUID_t remote,
TransportLocator tl 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 521 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::address_to_locator(), OpenDDS::DCPS::TransportLocator::data, OpenDDS::STUN::encoding(), OpenDDS::DCPS::RtpsUdpDataLink::get_last_recv_address(), OpenDDS::RTPS::get_locators_encoding(), OpenDDS::DCPS::TransportImpl::is_shut_down(), link_, links_lock_, OpenDDS::RTPS::message_block_to_sequence(), OpenDDS::DCPS::primitive_serialized_size_boolean(), OpenDDS::DCPS::ref(), OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos(), OpenDDS::DCPS::serialized_size(), OpenDDS::DCPS::NetworkAddress::to_addr(), and OpenDDS::DCPS::TransportLocator::transport_type.

523 {
524  if (is_shut_down()) {
525  return;
526  }
527 
528  GuardThreadType guard_links(links_lock_);
529 
530  bool expects_inline_qos = false;
531  NetworkAddress addr;
532  if (link_) {
533  addr = link_->get_last_recv_address(remote);
534  if (addr == NetworkAddress()) {
535  return;
536  }
537  GUIDSeq_var guids(new GUIDSeq);
538  GUIDSeq& ref = static_cast<GUIDSeq&>(guids);
539  ref.length(1);
540  ref[0] = remote;
541  expects_inline_qos = link_->requires_inline_qos(guids);
542  }
543 
544  LocatorSeq locators;
545  locators.length(1);
546  address_to_locator(locators[0], addr.to_addr());
547 
548  const Encoding& encoding = RTPS::get_locators_encoding();
549  size_t size = serialized_size(encoding, locators);
550  primitive_serialized_size_boolean(encoding, size);
551 
552  ACE_Message_Block mb_locator(size);
553  Serializer ser_loc(&mb_locator, encoding);
554  ser_loc << locators;
555  ser_loc << ACE_OutputCDR::from_boolean(expects_inline_qos);
556 
557  tl.transport_type = "rtps_udp";
558  RTPS::message_block_to_sequence(mb_locator, tl.data);
559 }
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
OpenDDS_Dcps_Export void address_to_locator(Locator_t &locator, const ACE_INET_Addr &addr)
NetworkAddress get_last_recv_address(const GUID_t &remote_id)
ACE_Guard< ThreadLockType > GuardThreadType
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
void message_block_to_sequence(const ACE_Message_Block &mb_locator, T &out)
Definition: MessageUtils.h:101
OpenDDS_Dcps_Export void primitive_serialized_size_boolean(const Encoding &encoding, size_t &size, size_t count=1)
const DCPS::Encoding & get_locators_encoding()
sequence< Locator_t > LocatorSeq
bool requires_inline_qos(const GUIDSeq_var &peers)

◆ local_crypto_handle()

void OpenDDS::DCPS::RtpsUdpTransport::local_crypto_handle ( DDS::Security::ParticipantCryptoHandle  pch)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 347 of file RtpsUdpTransport.cpp.

References link_, links_lock_, OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle(), and local_crypto_handle_.

348 {
349  RtpsUdpDataLink_rch link;
350  {
352  local_crypto_handle_ = pch;
353  link = link_;
354  }
355  if (link) {
356  link->local_crypto_handle(pch);
357  }
358 }
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ make_datalink()

RtpsUdpDataLink_rch OpenDDS::DCPS::RtpsUdpTransport::make_datalink ( const GuidPrefix_t local_prefix)
private

Definition at line 140 of file RtpsUdpTransport.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::assign(), config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, OpenDDS::DCPS::equal_guid_prefixes(), OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), ACE_IPC_SAP::get_handle(), OpenDDS::DCPS::GUIDPREFIX_UNKNOWN, OpenDDS::DCPS::ReactorTask::interceptor(), LM_ERROR, OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle(), local_crypto_handle_, local_prefix_, OpenDDS::DCPS::RtpsUdpDataLink::open(), OPENDDS_ASSERT, OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::TransportImpl::reactor_task(), OpenDDS::DCPS::TransportImpl::reactor_task_, ACE_Event_Handler::READ_MASK, OpenDDS::DCPS::ref(), relay_stun_task_, relay_stun_task_falloff_, relay_stun_task_falloff_mutex_, ACE_IPC_SAP::set_handle(), transport_statistics_, transport_statistics_mutex_, unicast_socket_, and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by accept_datalink(), configure_i(), connect_datalink(), register_for_reader(), and register_for_writer().

141 {
143 
144  RtpsUdpInst_rch cfg = config();
145  if (!cfg) {
146  return RtpsUdpDataLink_rch();
147  }
148 
150  assign(local_prefix_, local_prefix);
151 #ifdef OPENDDS_SECURITY
152  {
154  relay_stun_task_falloff_.set(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
155  }
157 #endif
158  }
159 
160 #if defined(OPENDDS_SECURITY)
161  if (cfg->use_ice()) {
163  ri->execute_or_enqueue(make_rch<RemoveHandler>(unicast_socket_.get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
164 #ifdef ACE_HAS_IPV6
165  ri->execute_or_enqueue(make_rch<RemoveHandler>(ipv6_unicast_socket_.get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
166 #endif
167  }
168 #endif
169 
170  RtpsUdpDataLink_rch link = make_rch<RtpsUdpDataLink>(rchandle_from(this), local_prefix, config(), reactor_task(), ref(transport_statistics_), ref(transport_statistics_mutex_));
171 
172 #if defined(OPENDDS_SECURITY)
174 #endif
175 
176  if (!link->open(unicast_socket_
177 #ifdef ACE_HAS_IPV6
178  , ipv6_unicast_socket_
179 #endif
180  )) {
181 #ifdef ACE_HAS_IPV6
182  const ACE_HANDLE v6handle = ipv6_unicast_socket_.get_handle();
183 #else
184  const ACE_HANDLE v6handle = ACE_INVALID_HANDLE;
185 #endif
186  ACE_ERROR((LM_ERROR,
187  ACE_TEXT("(%P|%t) ERROR: ")
188  ACE_TEXT("RtpsUdpTransport::make_datalink: ")
189  ACE_TEXT("failed to open DataLink for sockets %d %d\n"),
190  unicast_socket_.get_handle(), v6handle
191  ));
192  return RtpsUdpDataLink_rch();
193  }
194 
195  // RtpsUdpDataLink now owns the socket
196  unicast_socket_.set_handle(ACE_INVALID_HANDLE);
197 #ifdef ACE_HAS_IPV6
198  ipv6_unicast_socket_.set_handle(ACE_INVALID_HANDLE);
199 #endif
200 
201  return link;
202 }
#define ACE_ERROR(X)
ACE_Thread_Mutex transport_statistics_mutex_
ReactorTask_rch reactor_task()
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
unsigned long ACE_Reactor_Mask
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
RcHandle< Sporadic > relay_stun_task_
static const TimeDuration zero_value
Definition: TimeDuration.h:31
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
ThreadLockType relay_stun_task_falloff_mutex_
void set_handle(ACE_HANDLE handle)
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
ACE_HANDLE get_handle(void) const
ACE_TEXT("TCP_Factory")
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
InternalTransportStatistics transport_statistics_
RtpsUdpInst_rch config() const
RcHandle< ReactorInterceptor > ReactorInterceptor_rch
CommandPtr execute_or_enqueue(CommandPtr command)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ process_relay_sra()

void OpenDDS::DCPS::RtpsUdpTransport::process_relay_sra ( ICE::ServerReflexiveStateMachine::StateChange  sc)

Definition at line 1023 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::ConnectionRecord::address, bit_sub_, deferred_connection_records_, OpenDDS::DCPS::JobQueue::enqueue(), OpenDDS::DCPS::ConnectionRecord::guid, OpenDDS::ICE::Configuration::instance(), job_queue_, OpenDDS::DCPS::ConnectionRecord::latency, OpenDDS::ICE::ServerReflexiveStateMachine::latency(), OpenDDS::ICE::ServerReflexiveStateMachine::latency_available(), OpenDDS::DCPS::ConnectionRecord::protocol, relay_srsm_, relay_stun_task_falloff_, relay_stun_task_falloff_mutex_, OpenDDS::DCPS::RTPS_RELAY_STUN_PROTOCOL, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Change, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_None, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Set, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Unset, OpenDDS::ICE::ServerReflexiveStateMachine::stun_server_address(), OpenDDS::DCPS::TimeDuration::to_dds_duration(), OpenDDS::ICE::ServerReflexiveStateMachine::unset_stun_server_address(), and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by relay_stun_task().

1024 {
1025 #ifndef DDS_HAS_MINIMUM_BIT
1026  DCPS::ConnectionRecord connection_record;
1027  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
1028  connection_record.protocol = RTPS_RELAY_STUN_PROTOCOL;
1029  connection_record.latency = TimeDuration::zero_value.to_dds_duration();
1030 
1031  switch (sc) {
1034  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
1035  connection_record.latency = relay_srsm_.latency().to_dds_duration();
1037  deferred_connection_records_.push_back(std::make_pair(true, connection_record));
1038  }
1039  break;
1042  // Lengthen to normal period.
1043  {
1045  relay_stun_task_falloff_.set(ICE::Configuration::instance()->server_reflexive_address_period());
1046  }
1047  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
1048  connection_record.latency = relay_srsm_.latency().to_dds_duration();
1050  deferred_connection_records_.push_back(std::make_pair(true, connection_record));
1051  break;
1053  {
1054  connection_record.address = DCPS::LogAddr(relay_srsm_.unset_stun_server_address()).c_str();
1055  deferred_connection_records_.push_back(std::make_pair(false, connection_record));
1056  break;
1057  }
1058  }
1059 
1060  if (!bit_sub_) {
1061  return;
1062  }
1063 
1064  if (!deferred_connection_records_.empty()) {
1065  job_queue_->enqueue(DCPS::make_rch<WriteConnectionRecords>(bit_sub_, deferred_connection_records_));
1067  }
1068 
1069 #else
1070  ACE_UNUSED_ARG(sc);
1071 #endif
1072 }
void enqueue(JobPtr job)
Definition: JobQueue.h:61
const ACE_INET_Addr & unset_stun_server_address() const
Definition: RTPS/ICE/Ice.h:238
DCPS::TimeDuration latency() const
Definition: RTPS/ICE/Ice.h:246
ICE::ServerReflexiveStateMachine relay_srsm_
const ACE_INET_Addr & stun_server_address() const
Definition: RTPS/ICE/Ice.h:239
static const TimeDuration zero_value
Definition: TimeDuration.h:31
RcHandle< BitSubscriber > bit_sub_
static Configuration * instance()
Definition: Ice.cpp:109
ThreadLockType relay_stun_task_falloff_mutex_
DDS::Duration_t to_dds_duration() const
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
ConnectionRecords deferred_connection_records_
const string RTPS_RELAY_STUN_PROTOCOL

◆ register_for_reader()

void OpenDDS::DCPS::RtpsUdpTransport::register_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 404 of file RtpsUdpTransport.cpp.

References config(), get_connection_addrs(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportImpl::is_shut_down(), link_, links_lock_, make_datalink(), and OpenDDS::DCPS::RtpsUdpDataLink::register_for_reader().

409 {
410  if (is_shut_down()) {
411  return;
412  }
413 
414  RtpsUdpInst_rch cfg = config();
415  if (!cfg) {
416  return;
417  }
418 
419  const TransportBLOB* blob = cfg->get_blob(locators);
420  if (!blob) {
421  return;
422  }
423 
424  GuardThreadType guard_links(links_lock_);
425 
426  if (!link_) {
427  link_ = make_datalink(participant.guidPrefix);
428  }
429 
430  AddrSet uc_addrs;
431  get_connection_addrs(*blob, &uc_addrs);
432  link_->register_for_reader(writerid, readerid, uc_addrs, listener);
433 }
void register_for_reader(const GUID_t &writerid, const GUID_t &readerid, const AddrSet &addresses, DiscoveryListener *listener)
RtpsUdpDataLink_rch make_datalink(const GuidPrefix_t &local_prefix)
DDS::OctetSeq TransportBLOB
ACE_Guard< ThreadLockType > GuardThreadType
RtpsUdpInst_rch config() const
void get_connection_addrs(const TransportBLOB &data, AddrSet *uc_addrs, AddrSet *mc_addrs=0, bool *requires_inline_qos=0, unsigned int *blob_bytes_read=0) const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ register_for_writer()

void OpenDDS::DCPS::RtpsUdpTransport::register_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 448 of file RtpsUdpTransport.cpp.

References config(), get_connection_addrs(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportImpl::is_shut_down(), link_, links_lock_, make_datalink(), and OpenDDS::DCPS::RtpsUdpDataLink::register_for_writer().

453 {
454  if (is_shut_down()) {
455  return;
456  }
457 
458  RtpsUdpInst_rch cfg = config();
459  if (!cfg) {
460  return;
461  }
462 
463  const TransportBLOB* blob = cfg->get_blob(locators);
464  if (!blob) {
465  return;
466  }
467 
468  GuardThreadType guard_links(links_lock_);
469 
470  if (!link_) {
471  link_ = make_datalink(participant.guidPrefix);
472  }
473 
474  AddrSet uc_addrs;
475  get_connection_addrs(*blob, &uc_addrs);
476  link_->register_for_writer(readerid, writerid, uc_addrs, listener);
477 }
RtpsUdpDataLink_rch make_datalink(const GuidPrefix_t &local_prefix)
DDS::OctetSeq TransportBLOB
ACE_Guard< ThreadLockType > GuardThreadType
RtpsUdpInst_rch config() const
void get_connection_addrs(const TransportBLOB &data, AddrSet *uc_addrs, AddrSet *mc_addrs=0, bool *requires_inline_qos=0, unsigned int *blob_bytes_read=0) const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
void register_for_writer(const GUID_t &readerid, const GUID_t &writerid, const AddrSet &addresses, DiscoveryListener *listener)

◆ relay_srsm()

ICE::ServerReflexiveStateMachine& OpenDDS::DCPS::RtpsUdpTransport::relay_srsm ( )
inline

Definition at line 42 of file RtpsUdpTransport.h.

42 { return relay_srsm_; }
ICE::ServerReflexiveStateMachine relay_srsm_

◆ relay_stun_task()

void OpenDDS::DCPS::RtpsUdpTransport::relay_stun_task ( const MonotonicTimePoint now)
private

Definition at line 998 of file RtpsUdpTransport.cpp.

References config(), OpenDDS::DCPS::equal_guid_prefixes(), OpenDDS::DCPS::GUIDPREFIX_UNKNOWN, ice_endpoint_, OpenDDS::ICE::Configuration::instance(), links_lock_, local_prefix_, OpenDDS::ICE::ServerReflexiveStateMachine::message(), process_relay_sra(), relay_srsm_, relay_stun_task_, relay_stun_task_falloff_, relay_stun_task_falloff_mutex_, OpenDDS::ICE::ServerReflexiveStateMachine::send(), and OpenDDS::ICE::Configuration::server_reflexive_indication_count().

Referenced by configure_i().

999 {
1000  GuardThreadType guard_links(links_lock_);
1001 
1002  RtpsUdpInst_rch cfg = config();
1003  if (!cfg) {
1004  return;
1005  }
1006 
1007  const ACE_INET_Addr relay_address = cfg->rtps_relay_address().to_addr();
1008 
1009  if ((cfg->use_rtps_relay() || cfg->rtps_relay_only()) &&
1010  relay_address != ACE_INET_Addr() &&
1013  ice_endpoint_->send(relay_address, relay_srsm_.message());
1014  {
1016  relay_stun_task_falloff_.advance(ICE::Configuration::instance()->server_reflexive_address_period());
1017  relay_stun_task_->schedule(relay_stun_task_falloff_.get());
1018  }
1019  }
1020 }
ICE::ServerReflexiveStateMachine relay_srsm_
const STUN::Message & message() const
Definition: RTPS/ICE/Ice.h:237
RcHandle< Sporadic > relay_stun_task_
static Configuration * instance()
Definition: Ice.cpp:109
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
ThreadLockType relay_stun_task_falloff_mutex_
StateChange send(const ACE_INET_Addr &address, size_t indication_count_limit, const DCPS::GuidPrefix_t &guid_prefix)
Definition: Ice.cpp:128
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
ACE_Guard< ThreadLockType > GuardThreadType
RtpsUdpInst_rch config() const
void server_reflexive_indication_count(size_t x)
Definition: RTPS/ICE/Ice.h:127
void process_relay_sra(ICE::ServerReflexiveStateMachine::StateChange)
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RcHandle< IceEndpoint > ice_endpoint_

◆ release_datalink()

void OpenDDS::DCPS::RtpsUdpTransport::release_datalink ( DataLink link)
privatevirtual

Called by the TransportRegistry when this TransportImpl object is released while the TransportRegistry is handling a release() "event". The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 752 of file RtpsUdpTransport.cpp.

753 {
754  // No-op for rtps_udp: keep the link_ around until the transport is shut down.
755 }

◆ rtps_relay_address_change()

void OpenDDS::DCPS::RtpsUdpTransport::rtps_relay_address_change ( )
virtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 562 of file RtpsUdpTransport.cpp.

References config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, relay_stun_task_, relay_stun_task_falloff_, relay_stun_task_falloff_mutex_, and OpenDDS::DCPS::TimeDuration::zero_value.

563 {
564 #ifdef OPENDDS_SECURITY
565  relay_stun_task_->cancel();
566  RtpsUdpInst_rch cfg = config();
567  {
569  relay_stun_task_falloff_.set(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
570  }
572 #endif
573 }
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
RcHandle< Sporadic > relay_stun_task_
static const TimeDuration zero_value
Definition: TimeDuration.h:31
ThreadLockType relay_stun_task_falloff_mutex_
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
RtpsUdpInst_rch config() const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ rtps_relay_only_now()

void OpenDDS::DCPS::RtpsUdpTransport::rtps_relay_only_now ( bool  flag)
virtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 78 of file RtpsUdpTransport.cpp.

References config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, disable_relay_stun_task(), relay_stun_task_, relay_stun_task_falloff_, relay_stun_task_falloff_mutex_, and OpenDDS::DCPS::TimeDuration::zero_value.

79 {
80  ACE_UNUSED_ARG(flag);
81 
82 #ifdef OPENDDS_SECURITY
83  RtpsUdpInst_rch cfg = config();
84  if (flag) {
85  {
87  relay_stun_task_falloff_.set(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
88  }
90  } else {
91  if (!cfg || cfg->use_rtps_relay()) {
93  }
94  }
95 #endif
96 }
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
RcHandle< Sporadic > relay_stun_task_
static const TimeDuration zero_value
Definition: TimeDuration.h:31
ThreadLockType relay_stun_task_falloff_mutex_
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
RtpsUdpInst_rch config() const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ shutdown_i()

void OpenDDS::DCPS::RtpsUdpTransport::shutdown_i ( )
privatevirtual

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 730 of file RtpsUdpTransport.cpp.

References config(), job_queue_, link_, links_lock_, relay_stun_task_, OpenDDS::DCPS::RcHandle< T >::reset(), stop_ice(), and OpenDDS::DCPS::DataLink::transport_shutdown().

731 {
732 #ifdef OPENDDS_SECURITY
733  RtpsUdpInst_rch cfg = config();
734 
735  if (cfg && cfg->use_ice()) {
736  stop_ice();
737  }
738 
739  relay_stun_task_->cancel();
740 #endif
741 
742  job_queue_.reset();
743 
744  GuardThreadType guard_links(links_lock_);
745  if (link_) {
747  }
748  link_.reset();
749 }
RcHandle< Sporadic > relay_stun_task_
ACE_Guard< ThreadLockType > GuardThreadType
RtpsUdpInst_rch config() const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ start_ice()

void OpenDDS::DCPS::RtpsUdpTransport::start_ice ( )
private

Definition at line 958 of file RtpsUdpTransport.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), ACE_IPC_SAP::get_handle(), ice_agent_, ice_endpoint_, OpenDDS::DCPS::ReactorTask::interceptor(), link_, links_lock_, LM_INFO, OpenDDS::DCPS::TransportImpl::reactor_task_, ACE_Event_Handler::READ_MASK, and unicast_socket_.

Referenced by configure_i(), and use_ice_now().

959 {
960  if (DCPS::DCPS_debug_level > 3) {
961  ACE_DEBUG((LM_INFO, "(%P|%t) RtpsUdpTransport::start_ice\n"));
962  }
963 
964  ice_agent_->add_endpoint(static_rchandle_cast<ICE::Endpoint>(ice_endpoint_));
965 
966  GuardThreadType guard_links(links_lock_);
967 
968  if (!link_) {
970  ri->execute_or_enqueue(make_rch<RegisterHandler>(unicast_socket_.get_handle(), ice_endpoint_.get(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
971 #ifdef ACE_HAS_IPV6
972  ri->execute_or_enqueue(make_rch<RegisterHandler>(ipv6_unicast_socket_.get_handle(), ice_endpoint_.get(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
973 #endif
974  }
975 }
#define ACE_DEBUG(X)
unsigned long ACE_Reactor_Mask
RcHandle< ICE::Agent > ice_agent_
ACE_HANDLE get_handle(void) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
ACE_Guard< ThreadLockType > GuardThreadType
RcHandle< ReactorInterceptor > ReactorInterceptor_rch
CommandPtr execute_or_enqueue(CommandPtr command)
RcHandle< IceEndpoint > ice_endpoint_

◆ stop_accepting_or_connecting()

void OpenDDS::DCPS::RtpsUdpTransport::stop_accepting_or_connecting ( const TransportClient_wrch client,
const GUID_t remote_id,
bool  disassociate,
bool  association_failed 
)
privatevirtual

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 275 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::disassociated(), link_, links_lock_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), OpenDDS::DCPS::TransportImpl::pending_connections_, and OpenDDS::DCPS::TransportImpl::pending_connections_lock_.

279 {
280  if (disassociate || association_failed) {
281  GuardThreadType guard_links(links_lock_);
282  if (link_) {
283  TransportClient_rch c = client.lock();
284  if (c) {
285  link_->disassociated(c->get_guid(), remote_id);
286  }
287  }
288  }
289 
290  {
292  typedef PendConnMap::iterator iter_t;
293  const std::pair<iter_t, iter_t> range =
294  pending_connections_.equal_range(client);
295  for (iter_t iter = range.first; iter != range.second; ++iter) {
296  iter->second->remove_on_start_callback(client, remote_id);
297  }
298  pending_connections_.erase(range.first, range.second);
299  }
300 }
RcHandle< TransportClient > TransportClient_rch
void disassociated(const GUID_t &local, const GUID_t &remote)
ACE_Guard< LockType > GuardType
ACE_Guard< ThreadLockType > GuardThreadType
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.

◆ stop_ice()

void OpenDDS::DCPS::RtpsUdpTransport::stop_ice ( )
private

Definition at line 978 of file RtpsUdpTransport.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), ACE_IPC_SAP::get_handle(), ice_agent_, ice_endpoint_, OpenDDS::DCPS::ReactorTask::interceptor(), link_, links_lock_, LM_INFO, OpenDDS::DCPS::TransportImpl::reactor_task_, ACE_Event_Handler::READ_MASK, and unicast_socket_.

Referenced by shutdown_i(), and use_ice_now().

979 {
980  if (DCPS::DCPS_debug_level > 3) {
981  ACE_DEBUG((LM_INFO, "(%P|%t) RtpsUdpTransport::stop_ice\n"));
982  }
983 
984  GuardThreadType guard_links(links_lock_);
985 
986  if (!link_) {
988  ri->execute_or_enqueue(make_rch<RemoveHandler>(unicast_socket_.get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
989 #ifdef ACE_HAS_IPV6
990  ri->execute_or_enqueue(make_rch<RemoveHandler>(ipv6_unicast_socket_.get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
991 #endif
992  }
993 
994  ice_agent_->remove_endpoint(static_rchandle_cast<ICE::Endpoint>(ice_endpoint_));
995 }
#define ACE_DEBUG(X)
unsigned long ACE_Reactor_Mask
RcHandle< ICE::Agent > ice_agent_
ACE_HANDLE get_handle(void) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
ACE_Guard< ThreadLockType > GuardThreadType
RcHandle< ReactorInterceptor > ReactorInterceptor_rch
CommandPtr execute_or_enqueue(CommandPtr command)
RcHandle< IceEndpoint > ice_endpoint_

◆ transport_type()

virtual OPENDDS_STRING OpenDDS::DCPS::RtpsUdpTransport::transport_type ( ) const
inlineprivatevirtual

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 106 of file RtpsUdpTransport.h.

106 { return "rtps_udp"; }

◆ unregister_for_reader()

void OpenDDS::DCPS::RtpsUdpTransport::unregister_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid 
)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 436 of file RtpsUdpTransport.cpp.

References link_, links_lock_, and OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_reader().

439 {
440  GuardThreadType guard_links(links_lock_);
441 
442  if (link_) {
443  link_->unregister_for_reader(writerid, readerid);
444  }
445 }
void unregister_for_reader(const GUID_t &writerid, const GUID_t &readerid)
ACE_Guard< ThreadLockType > GuardThreadType

◆ unregister_for_writer()

void OpenDDS::DCPS::RtpsUdpTransport::unregister_for_writer ( const GUID_t ,
const GUID_t readerid,
const GUID_t writerid 
)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 480 of file RtpsUdpTransport.cpp.

References link_, links_lock_, and OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_writer().

483 {
484  GuardThreadType guard_links(links_lock_);
485 
486  if (link_) {
487  link_->unregister_for_writer(readerid, writerid);
488  }
489 }
void unregister_for_writer(const GUID_t &readerid, const GUID_t &writerid)
ACE_Guard< ThreadLockType > GuardThreadType

◆ update_locators()

void OpenDDS::DCPS::RtpsUdpTransport::update_locators ( const GUID_t remote,
const TransportLocatorSeq locators 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 492 of file RtpsUdpTransport.cpp.

References config(), get_connection_addrs(), OpenDDS::DCPS::TransportImpl::is_shut_down(), link_, links_lock_, and OpenDDS::DCPS::RtpsUdpDataLink::update_locators().

494 {
495  if (is_shut_down()) {
496  return;
497  }
498 
499  RtpsUdpInst_rch cfg = config();
500  if (!cfg) {
501  return;
502  }
503 
504  const TransportBLOB* blob = cfg->get_blob(locators);
505  if (!blob) {
506  return;
507  }
508 
509  GuardThreadType guard_links(links_lock_);
510 
511  if (link_) {
512  AddrSet uc_addrs, mc_addrs;
513  bool requires_inline_qos;
514  unsigned int blob_bytes_read;
515  get_connection_addrs(*blob, &uc_addrs, &mc_addrs, &requires_inline_qos, &blob_bytes_read);
516  link_->update_locators(remote, uc_addrs, mc_addrs, requires_inline_qos, false);
517  }
518 }
void update_locators(const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
DDS::OctetSeq TransportBLOB
ACE_Guard< ThreadLockType > GuardThreadType
RtpsUdpInst_rch config() const
void get_connection_addrs(const TransportBLOB &data, AddrSet *uc_addrs, AddrSet *mc_addrs=0, bool *requires_inline_qos=0, unsigned int *blob_bytes_read=0) const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ use_datalink()

bool OpenDDS::DCPS::RtpsUdpTransport::use_datalink ( const GUID_t local_id,
const GUID_t remote_id,
const TransportBLOB remote_data,
const TransportBLOB discovery_locator,
const MonotonicTime_t participant_discovered_at,
ACE_CDR::ULong  participant_flags,
bool  local_reliable,
bool  remote_reliable,
bool  local_durable,
bool  remote_durable,
SequenceNumber  max_sn,
const TransportClient_rch client 
)
private

Definition at line 303 of file RtpsUdpTransport.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::associated(), get_connection_addrs(), OpenDDS::DCPS::is_more_local(), and link_.

Referenced by accept_datalink(), and connect_datalink().

313 {
314  AddrSet uc_addrs, mc_addrs;
315  bool requires_inline_qos;
316  unsigned int blob_bytes_read;
317  get_connection_addrs(remote_data, &uc_addrs, &mc_addrs, &requires_inline_qos, &blob_bytes_read);
318 
319  NetworkAddress disco_addr_hint;
320  if (discovery_locator.length()) {
321  AddrSet disco_uc_addrs, disco_mc_addrs;
322  bool disco_requires_inline_qos;
323  unsigned int disco_blob_bytes_read;
324  get_connection_addrs(discovery_locator, &disco_uc_addrs, &disco_mc_addrs, &disco_requires_inline_qos, &disco_blob_bytes_read);
325 
326  for (AddrSet::const_iterator it = disco_uc_addrs.begin(), limit = disco_uc_addrs.end(); it != limit; ++it) {
327  for (AddrSet::const_iterator it2 = uc_addrs.begin(), limit2 = uc_addrs.end(); it2 != limit2; ++it2) {
328  if (it->addr_bytes_equal(*it2) && DCPS::is_more_local(disco_addr_hint, *it2)) {
329  disco_addr_hint = *it2;
330  }
331  }
332  }
333  }
334 
335  if (link_) {
336  return link_->associated(local_id, remote_id, local_reliable, remote_reliable,
337  local_durable, remote_durable,
338  participant_discovered_at, participant_flags, max_sn, client,
339  uc_addrs, mc_addrs, disco_addr_hint, requires_inline_qos);
340  }
341 
342  return true;
343 }
bool is_more_local(const NetworkAddress &current, const NetworkAddress &incoming)
bool associated(const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
void get_connection_addrs(const TransportBLOB &data, AddrSet *uc_addrs, AddrSet *mc_addrs=0, bool *requires_inline_qos=0, unsigned int *blob_bytes_read=0) const

◆ use_ice_now()

void OpenDDS::DCPS::RtpsUdpTransport::use_ice_now ( bool  flag)
virtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 120 of file RtpsUdpTransport.cpp.

References config(), start_ice(), and stop_ice().

121 {
122  ACE_UNUSED_ARG(after);
123 
124 #ifdef OPENDDS_SECURITY
125  RtpsUdpInst_rch cfg = config();
126  const bool before = cfg && cfg->use_ice();
127  if (cfg) {
128  cfg->use_ice(after);
129  }
130 
131  if (before && !after) {
132  stop_ice();
133  } else if (!before && after) {
134  start_ice();
135  }
136 #endif
137 }
RtpsUdpInst_rch config() const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ use_rtps_relay_now()

void OpenDDS::DCPS::RtpsUdpTransport::use_rtps_relay_now ( bool  flag)
virtual

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 99 of file RtpsUdpTransport.cpp.

References config(), OpenDDS::DCPS::RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC, disable_relay_stun_task(), relay_stun_task_, relay_stun_task_falloff_, relay_stun_task_falloff_mutex_, and OpenDDS::DCPS::TimeDuration::zero_value.

100 {
101  ACE_UNUSED_ARG(flag);
102 
103 #ifdef OPENDDS_SECURITY
104  RtpsUdpInst_rch cfg = config();
105  if (flag) {
106  {
108  relay_stun_task_falloff_.set(cfg ? cfg->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC));
109  }
111  } else {
112  if (!cfg || !cfg->rtps_relay_only()) {
114  }
115  }
116 #endif
117 }
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
RcHandle< Sporadic > relay_stun_task_
static const TimeDuration zero_value
Definition: TimeDuration.h:31
ThreadLockType relay_stun_task_falloff_mutex_
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
RtpsUdpInst_rch config() const
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

Friends And Related Function Documentation

◆ RtpsUdpReceiveStrategy

friend class RtpsUdpReceiveStrategy
friend

Definition at line 197 of file RtpsUdpTransport.h.

◆ RtpsUdpSendStrategy

friend class RtpsUdpSendStrategy
friend

Definition at line 196 of file RtpsUdpTransport.h.

Member Data Documentation

◆ bit_sub_

RcHandle<BitSubscriber> OpenDDS::DCPS::RtpsUdpTransport::bit_sub_
private

◆ connections_lock_

LockType OpenDDS::DCPS::RtpsUdpTransport::connections_lock_
private

Definition at line 133 of file RtpsUdpTransport.h.

Referenced by accept_datalink(), and connect_datalink().

◆ default_listener_

TransportClient_wrch OpenDDS::DCPS::RtpsUdpTransport::default_listener_
private

Definition at line 149 of file RtpsUdpTransport.h.

◆ deferred_connection_records_

ConnectionRecords OpenDDS::DCPS::RtpsUdpTransport::deferred_connection_records_
private

Definition at line 158 of file RtpsUdpTransport.h.

Referenced by disable_relay_stun_task(), and process_relay_sra().

◆ ice_agent_

RcHandle<ICE::Agent> OpenDDS::DCPS::RtpsUdpTransport::ice_agent_
private

Definition at line 190 of file RtpsUdpTransport.h.

Referenced by get_ice_agent(), start_ice(), and stop_ice().

◆ ice_endpoint_

RcHandle<IceEndpoint> OpenDDS::DCPS::RtpsUdpTransport::ice_endpoint_
private

Definition at line 178 of file RtpsUdpTransport.h.

Referenced by get_ice_endpoint(), relay_stun_task(), start_ice(), and stop_ice().

◆ job_queue_

JobQueue_rch OpenDDS::DCPS::RtpsUdpTransport::job_queue_
private

◆ link_

RtpsUdpDataLink_rch OpenDDS::DCPS::RtpsUdpTransport::link_
private

RTPS uses only one link per transport. This link can be safely reused by any clients that belong to the same domain participant (same GUID prefix). Use by a second participant is not possible because the network location returned by connection_info_i() can't be shared among participants.

Definition at line 143 of file RtpsUdpTransport.h.

Referenced by accept_datalink(), client_stop(), configure_i(), connect_datalink(), get_last_recv_locator(), local_crypto_handle(), register_for_reader(), register_for_writer(), shutdown_i(), start_ice(), stop_accepting_or_connecting(), stop_ice(), unregister_for_reader(), unregister_for_writer(), update_locators(), and use_datalink().

◆ links_lock_

ThreadLockType OpenDDS::DCPS::RtpsUdpTransport::links_lock_
private

◆ local_crypto_handle_

DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::RtpsUdpTransport::local_crypto_handle_
private

Definition at line 155 of file RtpsUdpTransport.h.

Referenced by local_crypto_handle(), and make_datalink().

◆ local_prefix_

GuidPrefix_t OpenDDS::DCPS::RtpsUdpTransport::local_prefix_
private

Definition at line 136 of file RtpsUdpTransport.h.

Referenced by make_datalink(), relay_stun_task(), and RtpsUdpTransport().

◆ relay_srsm_

ICE::ServerReflexiveStateMachine OpenDDS::DCPS::RtpsUdpTransport::relay_srsm_
private

Definition at line 185 of file RtpsUdpTransport.h.

Referenced by disable_relay_stun_task(), process_relay_sra(), and relay_stun_task().

◆ relay_stun_task_

RcHandle<Sporadic> OpenDDS::DCPS::RtpsUdpTransport::relay_stun_task_
private

◆ relay_stun_task_falloff_

FibonacciSequence<TimeDuration> OpenDDS::DCPS::RtpsUdpTransport::relay_stun_task_falloff_
private

◆ relay_stun_task_falloff_mutex_

ThreadLockType OpenDDS::DCPS::RtpsUdpTransport::relay_stun_task_falloff_mutex_
private

◆ transport_statistics_

InternalTransportStatistics OpenDDS::DCPS::RtpsUdpTransport::transport_statistics_
private

Definition at line 193 of file RtpsUdpTransport.h.

Referenced by append_transport_statistics(), and make_datalink().

◆ transport_statistics_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpTransport::transport_statistics_mutex_
private

Definition at line 194 of file RtpsUdpTransport.h.

Referenced by append_transport_statistics(), and make_datalink().

◆ unicast_socket_

ACE_SOCK_Dgram OpenDDS::DCPS::RtpsUdpTransport::unicast_socket_
private

Definition at line 145 of file RtpsUdpTransport.h.

Referenced by configure_i(), make_datalink(), start_ice(), and stop_ice().


The documentation for this class was generated from the following files: