OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Types | Public Member Functions | Public Attributes | Static Public Attributes | List of all members
OpenDDS::RTPS::Spdp::SpdpTransport Struct Reference
Inheritance diagram for OpenDDS::RTPS::Spdp::SpdpTransport:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Spdp::SpdpTransport:
Collaboration graph
[legend]

Classes

class  RegisterHandlers
 

Public Types

typedef size_t WriteFlags
 
typedef DCPS::PmfPeriodicTask< SpdpTransportSpdpPeriodic
 
typedef DCPS::PmfSporadicTask< SpdpTransportSpdpSporadic
 
typedef DCPS::PmfMultiTask< SpdpTransportSpdpMulti
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Public Types inherited from OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >
typedef RcHandle< InternalDataReader< DCPS::NetworkInterfaceAddress > > InternalDataReader_rch
 

Public Member Functions

 SpdpTransport (DCPS::RcHandle< Spdp > outer)
 
 ~SpdpTransport ()
 
const ACE_SOCK_Dgramchoose_recv_socket (ACE_HANDLE h) const
 
virtual int handle_input (ACE_HANDLE h)
 
void open (const DCPS::ReactorTask_rch &reactor_task, const DCPS::JobQueue_rch &job_queue)
 
void register_unicast_socket (ACE_Reactor *reactor, ACE_SOCK_Dgram &socket, const char *what)
 
void register_handlers (const DCPS::ReactorTask_rch &reactor_task)
 
void enable_periodic_tasks ()
 
void shorten_local_sender_delay_i ()
 
void write (WriteFlags flags)
 
void write_i (WriteFlags flags)
 
void write_i (const DCPS::GUID_t &guid, const ACE_INET_Addr &local_address, WriteFlags flags)
 
void send (WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
 
const ACE_SOCK_Dgramchoose_send_socket (const ACE_INET_Addr &addr) const
 
ssize_t send (const ACE_INET_Addr &addr, bool relay)
 
void close (const DCPS::ReactorTask_rch &reactor_task)
 
void dispose_unregister ()
 
bool open_unicast_socket (u_short port_common, u_short participant_id)
 
void on_data_available (DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader)
 
DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
ICE::AddressListType host_addresses () const
 
void send (const ACE_INET_Addr &address, const STUN::Message &message)
 
ACE_INET_Addr stun_server_address () const
 
void ice_connect (const ICE::GuidSetType &guids, const ACE_INET_Addr &addr)
 
void ice_disconnect (const ICE::GuidSetType &guids, const ACE_INET_Addr &addr)
 
 OPENDDS_SET (ACE_INET_Addr) send_addrs_
 
void send_local (const DCPS::MonotonicTimePoint &now)
 
void send_directed (const DCPS::MonotonicTimePoint &now)
 
 OPENDDS_LIST (DCPS::GUID_t) directed_guids_
 
void process_lease_expirations (const DCPS::MonotonicTimePoint &now)
 
void thread_status_task (const DCPS::MonotonicTimePoint &now)
 
void process_handshake_deadlines (const DCPS::MonotonicTimePoint &now)
 
void process_handshake_resends (const DCPS::MonotonicTimePoint &now)
 
void send_relay (const DCPS::MonotonicTimePoint &now)
 
void relay_stun_task (const DCPS::MonotonicTimePoint &now)
 
void process_relay_sra (ICE::ServerReflexiveStateMachine::StateChange)
 
void disable_relay_stun_task ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >
 InternalDataReaderListener ()
 
 InternalDataReaderListener (JobQueue_rch job_queue)
 
void job_queue (JobQueue_rch job_queue)
 
virtual void on_data_available (InternalDataReader_rch reader)=0
 
void schedule (InternalDataReader_rch reader)
 
- Public Member Functions inherited from OpenDDS::ICE::Endpoint
virtual ~Endpoint ()
 

Public Attributes

DCPS::WeakRcHandle< Spdpouter_
 
Header hdr_
 
DataSubmessage data_
 
DCPS::SequenceNumber seq_
 
u_short uni_port_
 
ACE_SOCK_Dgram unicast_socket_
 
OPENDDS_STRING multicast_interface_
 
ACE_INET_Addr multicast_address_
 
ACE_SOCK_Dgram_Mcast multicast_socket_
 
DCPS::MulticastManager multicast_manager_
 
ACE_Message_Block buff_
 
ACE_Message_Block wbuff_
 
DCPS::RcHandle< SpdpMultilocal_send_task_
 
DCPS::RcHandle< SpdpSporadicdirected_send_task_
 
DCPS::RcHandle< SpdpSporadiclease_expiration_task_
 
DCPS::RcHandle< SpdpPeriodicthread_status_task_
 
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
 
DCPS::RcHandle< SpdpSporadichandshake_deadline_task_
 
DCPS::RcHandle< SpdpSporadichandshake_resend_task_
 
DCPS::RcHandle< SpdpSporadicrelay_spdp_task_
 
DCPS::FibonacciSequence< TimeDurationrelay_spdp_task_falloff_
 
DCPS::RcHandle< SpdpSporadicrelay_stun_task_
 
DCPS::FibonacciSequence< TimeDurationrelay_stun_task_falloff_
 
ICE::ServerReflexiveStateMachine relay_srsm_
 
bool network_is_unreachable_
 
bool ice_endpoint_added_
 
DCPS::InternalTransportStatistics transport_statistics_
 
DCPS::MonotonicTimePoint last_harvest
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 

Static Public Attributes

static const WriteFlags SEND_MULTICAST = (1 << 0)
 
static const WriteFlags SEND_RELAY = (1 << 1)
 
static const WriteFlags SEND_DIRECT = (1 << 2)
 

Additional Inherited Members

- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 425 of file Spdp.h.

Member Typedef Documentation

◆ SpdpMulti

Definition at line 523 of file Spdp.h.

◆ SpdpPeriodic

Definition at line 521 of file Spdp.h.

◆ SpdpSporadic

Definition at line 522 of file Spdp.h.

◆ WriteFlags

Definition at line 432 of file Spdp.h.

Constructor & Destructor Documentation

◆ SpdpTransport()

OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport ( DCPS::RcHandle< Spdp outer)
explicit

Definition at line 2305 of file Spdp.cpp.

References OpenDDS::DCPS::assign(), OpenDDS::RTPS::DATA, data_, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::DataSubmessage::extraFlags, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::SubmessageHeader::flags, ACE_OS::getpid(), OpenDDS::RTPS::Header::guidPrefix, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, OpenDDS::RTPS::SequenceNumber_t::low, multicast_address_, multicast_interface_, multicast_socket_, OpenDDS::RTPS::DataSubmessage::octetsToInlineQos, open_unicast_socket(), ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::RTPS::DataSubmessage::readerId, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, uni_port_, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, OpenDDS::RTPS::Header::version, OpenDDS::RTPS::DataSubmessage::writerId, and OpenDDS::RTPS::DataSubmessage::writerSN.

2306  : outer_(outer)
2307  , buff_(64 * 1024)
2308  , wbuff_(64 * 1024)
2309 #ifdef OPENDDS_SECURITY
2310  , relay_spdp_task_falloff_(outer->config()->sedp_heartbeat_period())
2311  , relay_stun_task_falloff_(outer->config()->sedp_heartbeat_period())
2312 #endif
2313  , network_is_unreachable_(false)
2314  , ice_endpoint_added_(false)
2316  OPENDDS_STRING("_SPDPTransportInst_") +
2317  DCPS::GuidConverter(outer->guid_).uniqueParticipantId() +
2318  DCPS::to_dds_string(outer->domain_))
2319 {
2320  hdr_.prefix[0] = 'R';
2321  hdr_.prefix[1] = 'T';
2322  hdr_.prefix[2] = 'P';
2323  hdr_.prefix[3] = 'S';
2326  DCPS::assign(hdr_.guidPrefix, outer->guid_.guidPrefix);
2329  data_.smHeader.submessageLength = 0; // last submessage in the Message
2330  data_.extraFlags = 0;
2334  data_.writerSN.high = 0;
2335  data_.writerSN.low = 0;
2336 
2337 #ifdef ACE_HAS_MAC_OSX
2340 #ifdef ACE_HAS_IPV6
2341  multicast_ipv6_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
2343 #endif
2344 #endif
2345 
2346  multicast_interface_ = outer->disco_->multicast_interface();
2347 
2348  const u_short port_common = outer->config_->port_common(outer->domain_);
2349  multicast_address_ = outer->config_->multicast_address(port_common);
2350 
2351 #ifdef ACE_HAS_IPV6
2352  multicast_ipv6_address_ = outer->config_->ipv6_multicast_address(port_common);
2353 #endif
2354 
2355  send_addrs_.insert(multicast_address_);
2356 #ifdef ACE_HAS_IPV6
2357  send_addrs_.insert(multicast_ipv6_address_);
2358 #endif
2359 
2360  typedef RtpsDiscovery::AddrVec::const_iterator iter;
2361  const RtpsDiscovery::AddrVec addrs = outer->config_->spdp_send_addrs();
2362  for (iter it = addrs.begin(),
2363  end = addrs.end(); it != end; ++it) {
2364  send_addrs_.insert(ACE_INET_Addr(it->c_str()));
2365  }
2366 
2367  u_short participantId = 0;
2368 
2369 #ifdef OPENDDS_SAFETY_PROFILE
2370  const u_short startingParticipantId = participantId;
2371 #endif
2372 
2373  while (!open_unicast_socket(port_common, participantId)) {
2374  ++participantId;
2375  }
2376 #ifdef ACE_HAS_IPV6
2377  u_short port = uni_port_;
2378 
2379  while (!open_unicast_ipv6_socket(port)) {
2380  ++port;
2381  }
2382 #endif
2383 
2384 #ifdef OPENDDS_SAFETY_PROFILE
2385  if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
2386  // Since pids are not available, use the fact that we had to increment
2387  // participantId to modify the GUID's pid bytes. This avoids GUID conflicts
2388  // between processes on the same host which start at the same time
2389  // (resulting in the same seed value for the random number generator).
2390  hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
2391  hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
2392  outer->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
2393  outer->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
2394  }
2395 #endif
2396 }
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
const ProtocolVersion_t PROTOCOLVERSION
Definition: MessageTypes.h:67
ProtocolVersion_t version
Definition: RtpsCore.idl:652
SequenceNumber_t writerSN
Definition: RtpsCore.idl:671
const octet FLAG_E
Definition: RtpsCore.idl:518
ACE_INET_Addr multicast_address_
Definition: Spdp.h:509
SubmessageHeader smHeader
Definition: RtpsCore.idl:663
void opts(int opts)
OctetArray4 prefix
Definition: RtpsCore.idl:651
static const char DEFAULT_INST_PREFIX[]
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
Definition: MessageTypes.h:102
bool open_unicast_socket(u_short port_common, u_short participant_id)
Definition: Spdp.cpp:3388
ACE_Message_Block wbuff_
Definition: Spdp.h:520
#define OPENDDS_STRING
unsigned short octetsToInlineQos
Definition: RtpsCore.idl:668
DCPS::FibonacciSequence< TimeDuration > relay_spdp_task_falloff_
Definition: Spdp.h:541
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
DCPS::EntityId_t readerId
Definition: RtpsCore.idl:669
OPENDDS_STRING multicast_interface_
Definition: Spdp.h:508
DCPS::GuidPrefix_t guidPrefix
Definition: RtpsCore.idl:654
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::EntityId_t writerId
Definition: RtpsCore.idl:670
DCPS::InternalTransportStatistics transport_statistics_
Definition: Spdp.h:552
const EntityId_t ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER
Definition: GuidUtils.h:44
ACE_Message_Block buff_
Definition: Spdp.h:520
RtpsDiscoveryConfig::AddrVec AddrVec
Definition: RtpsDiscovery.h:57
ACE_CDR::Octet Octet
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
String to_dds_string(unsigned short to_convert)
const VendorId_t VENDORID_OPENDDS
Definition: MessageTypes.h:26
const octet FLAG_D
Definition: RtpsCore.idl:523
pid_t getpid(void)
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510

◆ ~SpdpTransport()

OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport ( )

Definition at line 2465 of file Spdp.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), ACE_SOCK::close(), OpenDDS::DCPS::DCPS_debug_level, dispose_unregister(), LM_INFO, multicast_socket_, outer_, and unicast_socket_.

2466 {
2467  if (DCPS::DCPS_debug_level > 3) {
2468  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
2469  }
2470 
2471  DCPS::RcHandle<Spdp> outer = outer_.lock();
2472 
2473  if (outer) {
2474  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2475  try {
2477  } catch (const CORBA::BAD_PARAM&) {}
2478  outer->eh_shutdown_ = true;
2479  outer->shutdown_cond_.notify_all();
2480  }
2481 
2484 #ifdef ACE_HAS_IPV6
2485  unicast_ipv6_socket_.close();
2486  multicast_ipv6_socket_.close();
2487 #endif
2488 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
ACE_thread_mutex_t lock_
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
int close(void)
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510

Member Function Documentation

◆ choose_recv_socket()

const ACE_SOCK_Dgram & OpenDDS::RTPS::Spdp::SpdpTransport::choose_recv_socket ( ACE_HANDLE  h) const

Definition at line 2954 of file Spdp.cpp.

References ACE_IPC_SAP::get_handle(), multicast_socket_, and unicast_socket_.

Referenced by handle_input().

2955 {
2956 #ifdef ACE_HAS_IPV6
2957  if (h == unicast_ipv6_socket_.get_handle()) {
2958  return unicast_ipv6_socket_;
2959  }
2960  if (h == multicast_ipv6_socket_.get_handle()) {
2961  return multicast_ipv6_socket_;
2962  }
2963 #endif
2964  if (h == multicast_socket_.get_handle()) {
2965  return multicast_socket_;
2966  }
2967 
2968  return unicast_socket_;
2969 }
ACE_HANDLE get_handle(void) const
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510

◆ choose_send_socket()

const ACE_SOCK_Dgram & OpenDDS::RTPS::Spdp::SpdpTransport::choose_send_socket ( const ACE_INET_Addr addr) const

Definition at line 2897 of file Spdp.cpp.

References ACE_Addr::get_type(), and unicast_socket_.

Referenced by send().

2898 {
2899 #ifdef ACE_HAS_IPV6
2900  if (addr.get_type() == AF_INET6) {
2901  return unicast_ipv6_socket_;
2902  }
2903 #endif
2904  ACE_UNUSED_ARG(addr);
2905  return unicast_socket_;
2906 }
int get_type(void) const
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507

◆ close()

void OpenDDS::RTPS::Spdp::SpdpTransport::close ( const DCPS::ReactorTask_rch reactor_task)

Definition at line 2591 of file Spdp.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, directed_send_task_, ACE_Event_Handler::DONT_CALL, ACE_IPC_SAP::get_handle(), get_ice_endpoint(), OpenDDS::DCPS::ReactorTask::get_reactor(), handshake_deadline_task_, handshake_resend_task_, ice_endpoint_added_, lease_expiration_task_, LM_INFO, local_send_task_, multicast_socket_, network_interface_address_reader_, outer_, ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, relay_spdp_task_, relay_stun_task_, ACE_Reactor::remove_handler(), TheServiceParticipant, thread_status_task_, and unicast_socket_.

2592 {
2593  if (DCPS::DCPS_debug_level > 3) {
2594  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
2595  }
2596 
2597  DCPS::RcHandle<Spdp> outer = outer_.lock();
2598  if (!outer) return;
2599 
2600  TheServiceParticipant->network_interface_address_topic()->disconnect(network_interface_address_reader_);
2601 
2602 #ifdef OPENDDS_SECURITY
2603  DCPS::WeakRcHandle<ICE::Endpoint> endpoint = get_ice_endpoint();
2604  if (endpoint) {
2605  outer->ice_agent_->remove_endpoint(endpoint);
2606  ice_endpoint_added_ = false;
2607  }
2608 
2610  handshake_deadline_task_->cancel();
2611  }
2612  if (handshake_resend_task_) {
2613  handshake_resend_task_->cancel();
2614  }
2615  if (relay_spdp_task_) {
2616  relay_spdp_task_->cancel();
2617  }
2618  if (relay_stun_task_) {
2619  relay_stun_task_->cancel();
2620  }
2621 #endif
2622  if (local_send_task_) {
2623  local_send_task_->disable();
2624  }
2625  if (directed_send_task_) {
2626  directed_send_task_->cancel();
2627  }
2628  if (lease_expiration_task_) {
2629  lease_expiration_task_->cancel();
2630  }
2631  if (thread_status_task_) {
2632  thread_status_task_->disable();
2633  }
2634 
2635  ACE_Reactor* reactor = reactor_task->get_reactor();
2636  const ACE_Reactor_Mask mask =
2638  reactor->remove_handler(multicast_socket_.get_handle(), mask);
2639  reactor->remove_handler(unicast_socket_.get_handle(), mask);
2640 #ifdef ACE_HAS_IPV6
2641  reactor->remove_handler(multicast_ipv6_socket_.get_handle(), mask);
2642  reactor->remove_handler(unicast_ipv6_socket_.get_handle(), mask);
2643 #endif
2644 }
#define ACE_DEBUG(X)
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525
DCPS::RcHandle< SpdpSporadic > lease_expiration_task_
Definition: Spdp.h:530
DCPS::RcHandle< SpdpSporadic > directed_send_task_
Definition: Spdp.h:527
unsigned long ACE_Reactor_Mask
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
DCPS::RcHandle< SpdpSporadic > handshake_resend_task_
Definition: Spdp.h:538
ACE_HANDLE get_handle(void) const
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
virtual ACE_Reactor * reactor(void) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
DCPS::RcHandle< SpdpPeriodic > thread_status_task_
Definition: Spdp.h:532
#define TheServiceParticipant
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
DCPS::RcHandle< SpdpSporadic > handshake_deadline_task_
Definition: Spdp.h:536
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
Definition: Spdp.h:533
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3223
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510

◆ disable_relay_stun_task()

void OpenDDS::RTPS::Spdp::SpdpTransport::disable_relay_stun_task ( )

Definition at line 4189 of file Spdp.cpp.

References OpenDDS::DCPS::ConnectionRecord::address, OpenDDS::DCPS::ConnectionRecord::guid, OpenDDS::DCPS::ConnectionRecord::latency, outer_, OpenDDS::DCPS::ConnectionRecord::protocol, relay_srsm_, relay_stun_task_, OpenDDS::DCPS::RTPS_RELAY_STUN_PROTOCOL, OpenDDS::ICE::ServerReflexiveStateMachine::stun_server_address(), OpenDDS::DCPS::TimeDuration::to_dds_duration(), and OpenDDS::DCPS::TimeDuration::zero_value.

4190 {
4191 #ifndef DDS_HAS_MINIMUM_BIT
4192  DCPS::RcHandle<Spdp> outer = outer_.lock();
4193  if (!outer) return;
4194 
4195  relay_stun_task_->cancel();
4196 
4197  DCPS::ConnectionRecord connection_record;
4198  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
4199  connection_record.protocol = DCPS::RTPS_RELAY_STUN_PROTOCOL;
4200  connection_record.latency = DCPS::TimeDuration::zero_value.to_dds_duration();
4201 
4203  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4204  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, false, connection_record));
4205  }
4206 
4207  relay_srsm_ = ICE::ServerReflexiveStateMachine();
4208 #endif
4209 }
const ACE_INET_Addr & stun_server_address() const
Definition: RTPS/ICE/Ice.h:239
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
static const TimeDuration zero_value
Definition: TimeDuration.h:31
DDS::Duration_t to_dds_duration() const
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
const string RTPS_RELAY_STUN_PROTOCOL

◆ dispose_unregister()

void OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister ( )

Definition at line 2558 of file Spdp.cpp.

References ACE_ERROR, ACE_TEXT(), data_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::Serializer::encoding(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_Q, OpenDDS::RTPS::SubmessageHeader::flags, hdr_, OpenDDS::RTPS::DataSubmessage::inlineQos, LM_ERROR, OpenDDS::DCPS::MUTABLE, outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, ACE_Message_Block::reset(), send(), SEND_MULTICAST, SEND_RELAY, seq_, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::to_rtps_seqnum(), wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by ~SpdpTransport().

2559 {
2560  DCPS::RcHandle<Spdp> outer = outer_.lock();
2561  if (!outer) return;
2562 
2563  // Send the dispose/unregister SPDP sample
2566  data_.inlineQos.length(1);
2567  static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
2568  data_.inlineQos[0].status_info(dispose_unregister);
2569 
2570  ParameterList plist(1);
2571  plist.length(1);
2572  plist[0].guid(outer->guid_);
2573  plist[0]._d(PID_PARTICIPANT_GUID);
2574 
2575  wbuff_.reset();
2576  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2577  DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE);
2578  if (!(ser << hdr_) || !(ser << data_) || !(ser << encap) || !(ser << plist)) {
2579  if (DCPS::DCPS_debug_level > 0) {
2580  ACE_ERROR((LM_ERROR,
2581  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
2582  ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
2583  }
2584  return;
2585  }
2586 
2588 }
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:527
#define ACE_ERROR(X)
const ParameterId_t PID_PARTICIPANT_GUID
Definition: RtpsCore.idl:287
const octet FLAG_Q
Definition: RtpsCore.idl:519
SequenceNumber_t writerSN
Definition: RtpsCore.idl:671
const octet FLAG_E
Definition: RtpsCore.idl:518
void reset(void)
SubmessageHeader smHeader
Definition: RtpsCore.idl:663
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434
ACE_Message_Block wbuff_
Definition: Spdp.h:520
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
sequence< Parameter > ParameterList
ACE_TEXT("TCP_Factory")
DCPS::SequenceNumber seq_
Definition: Spdp.h:505
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2871
static const WriteFlags SEND_MULTICAST
Definition: Spdp.h:433

◆ enable_periodic_tasks()

void OpenDDS::RTPS::Spdp::SpdpTransport::enable_periodic_tasks ( )

Definition at line 2532 of file Spdp.cpp.

References local_send_task_, outer_, relay_spdp_task_, relay_spdp_task_falloff_, relay_stun_task_, relay_stun_task_falloff_, TheServiceParticipant, OpenDDS::DCPS::ThreadStatusManager::thread_status_interval(), thread_status_task_, OpenDDS::DCPS::ThreadStatusManager::update_thread_status(), and OpenDDS::DCPS::TimeDuration::zero_value.

2533 {
2534  if (local_send_task_) {
2536  }
2537 
2538 #ifdef OPENDDS_SECURITY
2539  DCPS::RcHandle<Spdp> outer = outer_.lock();
2540  if (!outer) return;
2541 
2542  relay_spdp_task_falloff_.set(outer->config_->sedp_heartbeat_period());
2544 
2545  relay_stun_task_falloff_.set(outer->config_->sedp_heartbeat_period());
2547 #endif
2548 
2549 #ifndef DDS_HAS_MINIMUM_BIT
2550  const DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
2551  if (thread_status_manager.update_thread_status()) {
2552  thread_status_task_->enable(false, thread_status_manager.thread_status_interval());
2553  }
2554 #endif /* DDS_HAS_MINIMUM_BIT */
2555 }
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
static const TimeDuration zero_value
Definition: TimeDuration.h:31
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
DCPS::FibonacciSequence< TimeDuration > relay_spdp_task_falloff_
Definition: Spdp.h:541
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::RcHandle< SpdpPeriodic > thread_status_task_
Definition: Spdp.h:532
#define TheServiceParticipant

◆ get_ice_endpoint()

DCPS::WeakRcHandle< ICE::Endpoint > OpenDDS::RTPS::Spdp::SpdpTransport::get_ice_endpoint ( )

Definition at line 3223 of file Spdp.cpp.

References outer_, OpenDDS::DCPS::rchandle_from(), and OpenDDS::DCPS::static_rchandle_cast().

Referenced by close(), handle_input(), open(), and write_i().

3224 {
3225 #ifdef OPENDDS_SECURITY
3226  DCPS::RcHandle<Spdp> outer = outer_.lock();
3227  return outer && outer->config_->use_ice() ? DCPS::static_rchandle_cast<ICE::Endpoint>(rchandle_from(this)) : DCPS::WeakRcHandle<ICE::Endpoint>();
3228 #else
3229  return DCPS::WeakRcHandle<ICE::Endpoint>();
3230 #endif
3231 }
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310

◆ handle_input()

int OpenDDS::RTPS::Spdp::SpdpTransport::handle_input ( ACE_HANDLE  h)
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 2977 of file Spdp.cpp.

References ACE_CDR_BYTE_ORDER, ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_NOTSUP_RETURN, ACE_TEXT(), OpenDDS::RTPS::append_submessage(), OpenDDS::STUN::Message::block, buff_, choose_recv_socket(), OpenDDS::RTPS::DATA, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::STUN::encoding(), OpenDDS::DCPS::Serializer::encoding(), OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::SubmessageHeader::flags, ACE_IPC_SAP::get_handle(), get_ice_endpoint(), OpenDDS::RTPS::Spdp::guid(), OpenDDS::RTPS::Header::guidPrefix, OpenDDS::RTPS::Message::hdr, header, OpenDDS::RTPS::INFO_DST, OpenDDS::ICE::ServerReflexiveStateMachine::is_response(), OpenDDS::DCPS::Encoding::kind(), OpenDDS::DCPS::Encoding::KIND_XCDR1, ACE_Message_Block::length(), LM_ERROR, LM_WARNING, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::MCK_RTPS, OpenDDS::DCPS::MCK_STUN, ACE_OS::memcmp(), OpenDDS::DCPS::InternalTransportStatistics::message_count, OpenDDS::DCPS::MUTABLE, outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, process_relay_sra(), ACE_Message_Block::rd_ptr(), read(), OpenDDS::ICE::ServerReflexiveStateMachine::receive(), relay_srsm_, ACE_Message_Block::reset(), ACE_Message_Block::size(), OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::InfoDestinationSubmessage::smHeader, socket(), ACE_Message_Block::space(), OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::DCPS::Serializer::swap_bytes(), TheServiceParticipant, OpenDDS::DCPS::EncapsulationHeader::to_encoding(), OpenDDS::DCPS::transport_debug, transport_statistics_, unicast_socket_, OpenDDS::RTPS::valid_size(), ACE_Message_Block::wr_ptr(), and OpenDDS::RTPS::DataSubmessage::writerId.

2978 {
2979  DCPS::ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2980 
2982  ACE_INET_Addr remote;
2983  buff_.reset();
2984 
2985 #ifdef ACE_LACKS_SENDMSG
2986  const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
2987 #else
2988  ACE_INET_Addr local;
2989 
2990  iovec iov[1];
2991  iov[0].iov_base = buff_.wr_ptr();
2992 #ifdef _MSC_VER
2993 #pragma warning(push)
2994  // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
2995  // since on other platforms iov_len is 64-bit
2996 #pragma warning(disable : 4267)
2997 #endif
2998  iov[0].iov_len = buff_.space();
2999 #ifdef _MSC_VER
3000 #pragma warning(pop)
3001 #endif
3002  const ssize_t bytes = socket.recv(iov, 1, remote, 0
3003 #if defined(ACE_RECVPKTINFO) || defined(ACE_RECVPKTINFO6)
3004  , &local
3005 #endif
3006  );
3007 #endif
3008 
3009  if (!valid_size(remote)) {
3010  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - invalid address size\n")));
3011  return 0;
3012  }
3013 
3014  if (bytes > 0) {
3015  buff_.wr_ptr(bytes);
3016  } else if (bytes == 0) {
3017  return 0;
3018  } else {
3019  ACE_DEBUG((
3020  LM_WARNING,
3021  ACE_TEXT("(%P|%t) WARNING: Spdp::SpdpTransport::handle_input() - ")
3022  ACE_TEXT("error reading from %C socket %p\n")
3023  , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
3024  ACE_TEXT("ACE_SOCK_Dgram::recv")));
3025  return 0;
3026  }
3027 
3028  DCPS::RcHandle<Spdp> outer = outer_.lock();
3029 
3030  if (!outer) {
3031  return 0;
3032  }
3033 
3034  const bool relay_in_use = (outer->config_->rtps_relay_only() || outer->config_->use_rtps_relay());
3035  const bool remote_matches_relay_addr = (remote == outer->config_->spdp_rtps_relay_address());
3036  const bool from_relay = relay_in_use && remote_matches_relay_addr;
3037 
3038  // Ignore messages from the relay when not using it.
3039  if (!relay_in_use && remote_matches_relay_addr) {
3040  return 0;
3041  }
3042 
3043  if ((buff_.size() >= 4) && ACE_OS::memcmp(buff_.rd_ptr(), "RTPS", 4) == 0) {
3044  RTPS::Message message;
3045 
3046  DCPS::Serializer ser(&buff_, encoding_plain_native);
3047  Header header;
3048  if (!(ser >> header)) {
3049  if (DCPS::DCPS_debug_level > 0) {
3050  ACE_ERROR((LM_ERROR,
3051  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3052  ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
3053  }
3054  return 0;
3055  }
3056 
3057  if (outer->sedp_->transport_inst()->count_messages()) {
3058  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(remote), DCPS::MCK_RTPS, from_relay);
3059  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, outer->lock_, -1);
3061  }
3062 
3063  if (DCPS::transport_debug.log_messages) {
3064  message.hdr = header;
3065  }
3066 
3067  while (buff_.length() > 3) {
3068  const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
3069  ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER);
3070  const size_t start = buff_.length();
3071  CORBA::UShort submessageLength = 0;
3072  switch (subm) {
3073  case DATA: {
3074  DataSubmessage data;
3075  if (!(ser >> data)) {
3076  if (DCPS::DCPS_debug_level > 0) {
3077  ACE_ERROR((LM_ERROR,
3078  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3079  ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
3080  }
3081  return 0;
3082  }
3083  submessageLength = data.smHeader.submessageLength;
3084 
3085  if (DCPS::transport_debug.log_messages) {
3086  append_submessage(message, data);
3087  }
3088 
3089  if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) {
3090  // Not our message: this could be the same multicast group used
3091  // for SEDP and other traffic.
3092  break;
3093  }
3094 
3095  ParameterList plist;
3096  if (data.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
3097  DCPS::EncapsulationHeader encap;
3098  DCPS::Encoding enc;
3099  if (!(ser >> encap) || !encap.to_encoding(enc, DCPS::MUTABLE) || enc.kind() != Encoding::KIND_XCDR1) {
3100  if (DCPS::DCPS_debug_level > 0) {
3101  ACE_ERROR((LM_ERROR,
3102  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3103  ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
3104  }
3105  return 0;
3106  }
3107  ser.encoding(enc);
3108  if (!(ser >> plist)) {
3109 
3110  if (DCPS::DCPS_debug_level > 0) {
3111  ACE_ERROR((LM_ERROR,
3112  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3113  ACE_TEXT("failed to deserialize data payload for SPDP\n")));
3114  }
3115  return 0;
3116  }
3117  } else {
3118  plist.length(1);
3119  const GUID_t guid = make_id(header.guidPrefix, ENTITYID_PARTICIPANT);
3120  plist[0].guid(guid);
3121  plist[0]._d(PID_PARTICIPANT_GUID);
3122  }
3123 
3124  DCPS::RcHandle<Spdp> outer = outer_.lock();
3125  if (outer) {
3126  outer->data_received(data, plist, remote);
3127  }
3128  break;
3129  }
3130  case INFO_DST: {
3131  if (DCPS::transport_debug.log_messages) {
3132  InfoDestinationSubmessage sm;
3133  if (!(ser >> sm)) {
3134  if (DCPS::DCPS_debug_level > 0) {
3135  ACE_ERROR((LM_ERROR,
3136  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3137  ACE_TEXT("failed to deserialize INFO_DST header for SPDP\n")));
3138  }
3139  return 0;
3140  }
3141  submessageLength = sm.smHeader.submessageLength;
3142  append_submessage(message, sm);
3143  break;
3144  }
3145  }
3146  // fallthrough
3147  default:
3148  SubmessageHeader smHeader;
3149  if (!(ser >> smHeader)) {
3150  if (DCPS::DCPS_debug_level > 0) {
3151  ACE_ERROR((LM_ERROR,
3152  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3153  ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
3154  }
3155  return 0;
3156  }
3157  submessageLength = smHeader.submessageLength;
3158  break;
3159  }
3160  if (submessageLength && buff_.length()) {
3161  const size_t read = start - buff_.length();
3162  if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
3163  if (!ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ
3164  - read))) {
3165  if (DCPS::DCPS_debug_level > 0) {
3166  ACE_ERROR((LM_ERROR,
3167  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3168  ACE_TEXT("failed to skip sub message length\n")));
3169  }
3170  return 0;
3171  }
3172  }
3173  } else if (!submessageLength) {
3174  break; // submessageLength of 0 indicates the last submessage
3175  }
3176  }
3177 
3178  } else if ((buff_.size() >= 4) && (ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4) == 0)) {
3179  // Handle some RTI protocol multicast to the same address
3180  return 0; // Ignore
3181  }
3182 
3183 #ifdef OPENDDS_SECURITY
3184  // Assume STUN
3185  if (!outer->initialized() || outer->shutting_down()) {
3186  return 0;
3187  }
3188 
3189 #ifndef ACE_RECVPKTINFO
3190  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() ")
3191  ACE_TEXT("potential STUN message received but this version of the ACE ")
3192  ACE_TEXT("library doesn't support the local_address extension in ")
3193  ACE_TEXT("ACE_SOCK_Dgram::recv\n")));
3194  ACE_NOTSUP_RETURN(0);
3195 #else
3196 
3197  DCPS::Serializer serializer(&buff_, STUN::encoding);
3198  STUN::Message message;
3199  message.block = &buff_;
3200  if (serializer >> message) {
3201  if (outer->sedp_->transport_inst()->count_messages()) {
3202  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(remote), DCPS::MCK_STUN, from_relay);
3203  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, outer->lock_, -1);
3205  }
3206 
3207  if (relay_srsm_.is_response(message)) {
3209  } else {
3210  DCPS::WeakRcHandle<ICE::Endpoint> endpoint = get_ice_endpoint();
3211  if (endpoint) {
3212  outer->ice_agent_->receive(endpoint, local, remote, message);
3213  }
3214  }
3215  }
3216 #endif
3217 #endif
3218 
3219  return 0;
3220 }
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:527
#define ACE_DEBUG(X)
StateChange receive(const STUN::Message &message)
Definition: Ice.cpp:156
const MessageCountKind MCK_STUN
#define ACE_ERROR(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const ParameterId_t PID_PARTICIPANT_GUID
Definition: RtpsCore.idl:287
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
size_t length(void) const
const octet FLAG_E
Definition: RtpsCore.idl:518
void reset(void)
sequence< octet > key
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
int ssize_t
char * rd_ptr(void) const
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
ACE_HANDLE socket(int protocol_family, int type, int proto)
void append_submessage(RTPS::Message &message, const RTPS::InfoDestinationSubmessage &submessage)
Definition: MessageUtils.h:147
const DCPS::GUID_t & guid() const
Definition: Spdp.h:94
bool valid_size(const ACE_INET_Addr &a)
Definition: Spdp.cpp:2971
size_t size(void) const
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
bool is_response(const STUN::Message &message) const
Definition: RTPS/ICE/Ice.h:241
ACE_CDR::UShort UShort
ssize_t read(ACE_HANDLE handle, void *buf, size_t len)
char * wr_ptr(void) const
ACE_HANDLE get_handle(void) const
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
void process_relay_sra(ICE::ServerReflexiveStateMachine::StateChange)
Definition: Spdp.cpp:4150
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS::InternalTransportStatistics transport_statistics_
Definition: Spdp.h:552
sequence< Parameter > ParameterList
ACE_TEXT("TCP_Factory")
size_t space(void) const
const EntityId_t ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER
Definition: GuidUtils.h:44
const ACE_SOCK_Dgram & choose_recv_socket(ACE_HANDLE h) const
Definition: Spdp.cpp:2954
const MessageCountKind MCK_RTPS
ACE_Message_Block buff_
Definition: Spdp.h:520
int memcmp(const void *t, const void *s, size_t len)
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
ACE_thread_mutex_t lock_
#define TheServiceParticipant
const octet FLAG_D
Definition: RtpsCore.idl:523
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
#define ACE_NOTSUP_RETURN(FAILVALUE)
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3223
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ host_addresses()

ICE::AddressListType OpenDDS::RTPS::Spdp::SpdpTransport::host_addresses ( ) const
virtual

Implements OpenDDS::ICE::Endpoint.

Definition at line 3235 of file Spdp.cpp.

References AF_INET, OpenDDS::DCPS::get_interface_addrs(), ACE_SOCK::get_local_addr(), ACE_INET_Addr::get_port_number(), ACE_INET_Addr::is_any(), and unicast_socket_.

3236 {
3237  ICE::AddressListType addresses;
3238  ACE_INET_Addr addr;
3239 
3241  if (addr != ACE_INET_Addr()) {
3242  if (addr.is_any()) {
3243  ICE::AddressListType addrs;
3245  for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
3246  if (pos->get_type() == AF_INET) {
3247  pos->set_port_number(addr.get_port_number());
3248  addresses.push_back(*pos);
3249  }
3250  }
3251  } else {
3252  addresses.push_back(addr);
3253  }
3254  }
3255 
3256 #ifdef ACE_HAS_IPV6
3257  unicast_ipv6_socket_.get_local_addr(addr);
3258  if (addr != ACE_INET_Addr()) {
3259  if (addr.is_any()) {
3260  ICE::AddressListType addrs;
3262  for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
3263  if (pos->get_type() == AF_INET6) {
3264  pos->set_port_number(addr.get_port_number());
3265  addresses.push_back(*pos);
3266  }
3267  }
3268  } else {
3269  addresses.push_back(addr);
3270  }
3271  }
3272 #endif
3273 
3274  return addresses;
3275 }
bool is_any(void) const
void get_interface_addrs(OPENDDS_VECTOR(ACE_INET_Addr)&addrs)
int get_local_addr(ACE_Addr &) const
#define AF_INET
u_short get_port_number(void) const
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507

◆ ice_connect()

void OpenDDS::RTPS::Spdp::SpdpTransport::ice_connect ( const ICE::GuidSetType guids,
const ACE_INET_Addr addr 
)
virtual

Reimplemented from OpenDDS::ICE::Endpoint.

Definition at line 3349 of file Spdp.cpp.

References outer_.

3350 {
3351  DCPS::RcHandle<Spdp> outer = outer_.lock();
3352  if (!outer) return;
3353 
3354  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<IceConnect>(outer, guids, addr, true));
3355 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ ice_disconnect()

void OpenDDS::RTPS::Spdp::SpdpTransport::ice_disconnect ( const ICE::GuidSetType guids,
const ACE_INET_Addr addr 
)
virtual

Reimplemented from OpenDDS::ICE::Endpoint.

Definition at line 3371 of file Spdp.cpp.

References outer_.

3372 {
3373  DCPS::RcHandle<Spdp> outer = outer_.lock();
3374  if (!outer) return;
3375 
3376  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<IceConnect>(outer, guids, addr, false));
3377 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ on_data_available()

void OpenDDS::RTPS::Spdp::SpdpTransport::on_data_available ( DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > >  reader)

Definition at line 3581 of file Spdp.cpp.

References ACE_GUARD, multicast_address_, multicast_interface_, multicast_manager_, multicast_socket_, network_interface_address_reader_, outer_, OpenDDS::DCPS::MulticastManager::process(), ACE_Event_Handler::reactor(), and shorten_local_sender_delay_i().

3582 {
3583  DCPS::RcHandle<Spdp> outer = outer_.lock();
3584  if (!outer) return;
3585 
3586  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
3587  if (outer->shutting_down()) {
3588  return;
3589  }
3590 
3591  if (outer->shutdown_flag_) {
3592  return;
3593  }
3594 
3595  DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress>::SampleSequence samples;
3596  DCPS::InternalSampleInfoSequence infos;
3597 
3598  network_interface_address_reader_->take(samples, infos);
3599 
3600  if (multicast_manager_.process(samples,
3601  infos,
3603  reactor(),
3604  this,
3605  DCPS::NetworkAddress(multicast_address_),
3607 #ifdef ACE_HAS_IPV6
3608  , DCPS::NetworkAddress(multicast_ipv6_address_),
3609  multicast_ipv6_socket_
3610 #endif
3611  )) {
3613  }
3614 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_INET_Addr multicast_address_
Definition: Spdp.h:509
DCPS::MulticastManager multicast_manager_
Definition: Spdp.h:518
OPENDDS_STRING multicast_interface_
Definition: Spdp.h:508
bool process(InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
Returns true if at least one group was joined.
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
virtual ACE_Reactor * reactor(void) const
ACE_thread_mutex_t lock_
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
Definition: Spdp.h:533
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510

◆ open()

void OpenDDS::RTPS::Spdp::SpdpTransport::open ( const DCPS::ReactorTask_rch reactor_task,
const DCPS::JobQueue_rch job_queue 
)

Definition at line 2399 of file Spdp.cpp.

References directed_send_task_, OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), get_ice_endpoint(), OpenDDS::DCPS::ReactorTask::get_reactor(), handshake_deadline_task_, handshake_resend_task_, ice_endpoint_added_, OpenDDS::DCPS::ReactorTask::interceptor(), OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >::job_queue(), lease_expiration_task_, local_send_task_, network_interface_address_reader_, outer_, process_handshake_deadlines(), process_handshake_resends(), process_lease_expirations(), OpenDDS::DCPS::rchandle_from(), ACE_Event_Handler::reactor(), OpenDDS::DCPS::ref(), relay_spdp_task_, relay_stun_task(), relay_stun_task_, send_directed(), send_local(), send_relay(), TheServiceParticipant, thread_status_task(), and thread_status_task_.

2401 {
2402  DCPS::RcHandle<Spdp> outer = outer_.lock();
2403  if (!outer) return;
2404 
2405 #ifdef OPENDDS_SECURITY
2406  // Add the endpoint before any sending and receiving occurs.
2407  DCPS::WeakRcHandle<ICE::Endpoint> endpoint = get_ice_endpoint();
2408  if (endpoint) {
2409  outer->ice_agent_->add_endpoint(endpoint);
2410  ice_endpoint_added_ = true;
2411  outer->ice_agent_->add_local_agent_info_listener(endpoint, outer->guid_, DCPS::static_rchandle_cast<ICE::AgentInfoListener>(outer));
2412  }
2413 #endif
2414 
2415  reactor(reactor_task->get_reactor());
2416  reactor_task->interceptor()->execute_or_enqueue(DCPS::make_rch<RegisterHandlers>(rchandle_from(this), reactor_task));
2417 
2418 #ifdef OPENDDS_SECURITY
2419  // Now that the endpoint is added, SEDP can write the SPDP info.
2420  if (outer->is_security_enabled()) {
2421  outer->write_secure_updates();
2422  }
2423 #endif
2424 
2425  local_send_task_ = DCPS::make_rch<SpdpMulti>(reactor_task->interceptor(), outer->config_->resend_period(), rchandle_from(this), &SpdpTransport::send_local);
2426 
2427  if (outer->config_->periodic_directed_spdp()) {
2429  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2431  }
2432 
2434  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2436 
2437 #ifdef OPENDDS_SECURITY
2439  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2442  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2444 
2446  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2449  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2451 #endif
2452 
2453 #ifndef DDS_HAS_MINIMUM_BIT
2454  // internal thread bit reporting
2455  if (TheServiceParticipant->get_thread_status_manager().update_thread_status()) {
2456  thread_status_task_ = DCPS::make_rch<SpdpPeriodic>(reactor_task->interceptor(), ref(*this), &SpdpTransport::thread_status_task);
2457  }
2458 #endif /* DDS_HAS_MINIMUM_BIT */
2459 
2460  this->job_queue(job_queue);
2461  network_interface_address_reader_ = DCPS::make_rch<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> >(true, rchandle_from(this));
2462  TheServiceParticipant->network_interface_address_topic()->connect(network_interface_address_reader_);
2463 }
void process_lease_expirations(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4256
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525
DCPS::RcHandle< SpdpSporadic > lease_expiration_task_
Definition: Spdp.h:530
DCPS::RcHandle< SpdpSporadic > directed_send_task_
Definition: Spdp.h:527
void thread_status_task(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4264
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
DCPS::RcHandle< SpdpSporadic > handshake_resend_task_
Definition: Spdp.h:538
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
virtual ACE_Reactor * reactor(void) const
void send_directed(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4232
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DCPS::RcHandle< SpdpPeriodic > thread_status_task_
Definition: Spdp.h:532
void process_handshake_resends(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4307
void relay_stun_task(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4134
void process_handshake_deadlines(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4299
#define TheServiceParticipant
DCPS::RcHandle< SpdpSporadic > handshake_deadline_task_
Definition: Spdp.h:536
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
Definition: Spdp.h:533
void send_local(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4227
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3223
void send_relay(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4211

◆ open_unicast_socket()

bool OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket ( u_short  port_common,
u_short  participant_id 
)

Definition at line 3388 of file Spdp.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ENOTSUP, ACE_SOCK::get_local_addr(), ACE_INET_Addr::get_port_number(), IPPROTO_IP, LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, ACE_SOCK_Dgram::open(), outer_, PF_INET, ACE_SOCK::set_option(), ACE_INET_Addr::set_port_number(), OpenDDS::DCPS::set_socket_multicast_ttl(), SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, uni_port_, and unicast_socket_.

Referenced by SpdpTransport().

3390 {
3391  DCPS::RcHandle<Spdp> outer = outer_.lock();
3392  if (!outer) return false;
3393 
3394  ACE_INET_Addr local_addr = outer->config_->spdp_local_address();
3395  const bool fixed_port = local_addr.get_port_number();
3396 
3397  if (fixed_port) {
3398  uni_port_ = local_addr.get_port_number();
3399  } else if (!outer->config_->spdp_request_random_port()) {
3400  uni_port_ = port_common + outer->config_->d1() + (outer->config_->pg() * participant_id);
3401  local_addr.set_port_number(uni_port_);
3402  }
3403 
3404  if (unicast_socket_.open(local_addr, PF_INET) != 0) {
3405  if (fixed_port) {
3406  if (DCPS::DCPS_debug_level > 0) {
3407  ACE_ERROR((LM_ERROR,
3408  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
3409  ACE_TEXT("failed to open %C %p.\n"),
3410  DCPS::LogAddr(local_addr).c_str(), ACE_TEXT("ACE_SOCK_Dgram::open")));
3411  }
3412  throw std::runtime_error("failed to open unicast port for SPDP");
3413  }
3414  if (DCPS::DCPS_debug_level > 3) {
3415  ACE_DEBUG((LM_DEBUG,
3416  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
3417  ACE_TEXT("failed to open %C %p. ")
3418  ACE_TEXT("Trying next participantId...\n"),
3419  DCPS::LogAddr(local_addr).c_str(), ACE_TEXT("ACE_SOCK_Dgram::open")));
3420  }
3421  return false;
3422  }
3423 
3424  if (!fixed_port && outer->config_->spdp_request_random_port()) {
3425  ACE_INET_Addr addr;
3426  if (unicast_socket_.get_local_addr(addr) == 0) {
3427  uni_port_ = addr.get_port_number();
3428  }
3429  }
3430 
3431  if (DCPS::DCPS_debug_level > 3) {
3432  ACE_DEBUG((LM_INFO,
3433  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
3434  ACE_TEXT("opened unicast socket on port %d\n"),
3435  uni_port_));
3436  }
3437 
3438  if (!DCPS::set_socket_multicast_ttl(unicast_socket_, outer->config_->ttl())) {
3439  if (DCPS::DCPS_debug_level > 0) {
3440  ACE_ERROR((LM_ERROR,
3441  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
3442  ACE_TEXT("failed to set TTL value to %d ")
3443  ACE_TEXT("for port:%hu %p\n"),
3444  outer->config_->ttl(), uni_port_, ACE_TEXT("DCPS::set_socket_multicast_ttl:")));
3445  }
3446  throw std::runtime_error("failed to set TTL");
3447  }
3448 
3449  const int send_buffer_size = outer->config()->send_buffer_size();
3450  if (send_buffer_size > 0) {
3451  if (unicast_socket_.set_option(SOL_SOCKET,
3452  SO_SNDBUF,
3453  (void *) &send_buffer_size,
3454  sizeof(send_buffer_size)) < 0
3455  && errno != ENOTSUP) {
3456  if (DCPS::DCPS_debug_level > 0) {
3457  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - failed to set the send buffer size to %d errno %m\n"), send_buffer_size));
3458  }
3459  throw std::runtime_error("failed to set send buffer size");
3460  }
3461  }
3462 
3463  const int recv_buffer_size = outer->config()->recv_buffer_size();
3464  if (recv_buffer_size > 0) {
3465  if (unicast_socket_.set_option(SOL_SOCKET,
3466  SO_RCVBUF,
3467  (void *) &recv_buffer_size,
3468  sizeof(recv_buffer_size)) < 0
3469  && errno != ENOTSUP) {
3470  if (DCPS::DCPS_debug_level > 0) {
3471  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - failed to set the recv buffer size to %d errno %m\n"), recv_buffer_size));
3472  }
3473  throw std::runtime_error("failed to set recv buffer size");
3474  }
3475  }
3476 
3477 #ifdef ACE_RECVPKTINFO
3478  int sockopt = 1;
3479  if (unicast_socket_.set_option(IPPROTO_IP, ACE_RECVPKTINFO, &sockopt, sizeof sockopt) == -1) {
3480  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket: set_option: %m\n")), false);
3481  }
3482 #endif
3483 
3484  return true;
3485 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ENOTSUP
int set_option(int level, int option, void *optval, int optlen) const
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
int get_local_addr(ACE_Addr &) const
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
u_short get_port_number(void) const
int open(const ACE_Addr &local, int protocol_family=ACE_PROTOCOL_FAMILY_INET, int protocol=0, int reuse_addr=0, int ipv6_only=0)
void set_port_number(u_short, int encode=1)
#define ACE_ERROR_RETURN(X, Y)
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507

◆ OPENDDS_LIST()

OpenDDS::RTPS::Spdp::SpdpTransport::OPENDDS_LIST ( DCPS::GUID_t  )

◆ OPENDDS_SET()

OpenDDS::RTPS::Spdp::SpdpTransport::OPENDDS_SET ( ACE_INET_Addr  )

Referenced by send().

◆ process_handshake_deadlines()

void OpenDDS::RTPS::Spdp::SpdpTransport::process_handshake_deadlines ( const DCPS::MonotonicTimePoint now)

Definition at line 4299 of file Spdp.cpp.

References outer_.

Referenced by open().

4300 {
4301  DCPS::RcHandle<Spdp> outer = outer_.lock();
4302  if (!outer) return;
4303 
4304  outer->process_handshake_deadlines(now);
4305 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ process_handshake_resends()

void OpenDDS::RTPS::Spdp::SpdpTransport::process_handshake_resends ( const DCPS::MonotonicTimePoint now)

Definition at line 4307 of file Spdp.cpp.

References outer_.

Referenced by open().

4308 {
4309  DCPS::RcHandle<Spdp> outer = outer_.lock();
4310  if (!outer) return;
4311 
4312  outer->process_handshake_resends(now);
4313 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ process_lease_expirations()

void OpenDDS::RTPS::Spdp::SpdpTransport::process_lease_expirations ( const DCPS::MonotonicTimePoint now)

Definition at line 4256 of file Spdp.cpp.

References outer_.

Referenced by open().

4257 {
4258  DCPS::RcHandle<Spdp> outer = outer_.lock();
4259  if (!outer) return;
4260 
4261  outer->process_lease_expirations(now);
4262 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ process_relay_sra()

void OpenDDS::RTPS::Spdp::SpdpTransport::process_relay_sra ( ICE::ServerReflexiveStateMachine::StateChange  sc)

Definition at line 4150 of file Spdp.cpp.

References OpenDDS::DCPS::ConnectionRecord::address, OpenDDS::DCPS::ConnectionRecord::guid, OpenDDS::ICE::Configuration::instance(), OpenDDS::DCPS::ConnectionRecord::latency, OpenDDS::ICE::ServerReflexiveStateMachine::latency(), OpenDDS::ICE::ServerReflexiveStateMachine::latency_available(), outer_, OpenDDS::DCPS::ConnectionRecord::protocol, relay_srsm_, relay_stun_task_falloff_, OpenDDS::DCPS::RTPS_RELAY_STUN_PROTOCOL, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Change, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_None, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Set, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Unset, OpenDDS::ICE::ServerReflexiveStateMachine::stun_server_address(), OpenDDS::DCPS::TimeDuration::to_dds_duration(), OpenDDS::ICE::ServerReflexiveStateMachine::unset_stun_server_address(), and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by handle_input(), and relay_stun_task().

4151 {
4152 #ifndef DDS_HAS_MINIMUM_BIT
4153  DCPS::RcHandle<Spdp> outer = outer_.lock();
4154  if (!outer) return;
4155 
4156  DCPS::ConnectionRecord connection_record;
4157  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
4158  connection_record.protocol = DCPS::RTPS_RELAY_STUN_PROTOCOL;
4159  connection_record.latency = DCPS::TimeDuration::zero_value.to_dds_duration();
4160 
4161  switch (sc) {
4164  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4165  connection_record.latency = relay_srsm_.latency().to_dds_duration();
4167  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, true, connection_record));
4168  }
4169  break;
4172  // Lengthen to normal period.
4173  relay_stun_task_falloff_.set(ICE::Configuration::instance()->server_reflexive_address_period());
4174  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4175  connection_record.latency = relay_srsm_.latency().to_dds_duration();
4177  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, true, connection_record));
4178  break;
4180  connection_record.address = DCPS::LogAddr(relay_srsm_.unset_stun_server_address()).c_str();
4181  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, false, connection_record));
4182  break;
4183  }
4184 #else
4185  ACE_UNUSED_ARG(sc);
4186 #endif
4187 }
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
const ACE_INET_Addr & unset_stun_server_address() const
Definition: RTPS/ICE/Ice.h:238
DCPS::TimeDuration latency() const
Definition: RTPS/ICE/Ice.h:246
const ACE_INET_Addr & stun_server_address() const
Definition: RTPS/ICE/Ice.h:239
static const TimeDuration zero_value
Definition: TimeDuration.h:31
static Configuration * instance()
Definition: Ice.cpp:109
DDS::Duration_t to_dds_duration() const
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
const string RTPS_RELAY_STUN_PROTOCOL

◆ register_handlers()

void OpenDDS::RTPS::Spdp::SpdpTransport::register_handlers ( const DCPS::ReactorTask_rch reactor_task)

Definition at line 2512 of file Spdp.cpp.

References ACE_GUARD, OpenDDS::DCPS::ReactorTask::get_reactor(), outer_, ACE_Event_Handler::reactor(), register_unicast_socket(), and unicast_socket_.

2513 {
2514  DCPS::RcHandle<Spdp> outer = outer_.lock();
2515  if (!outer) {
2516  return;
2517  }
2518  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2519 
2520  if (outer->shutdown_flag_) {
2521  return;
2522  }
2523 
2524  ACE_Reactor* const reactor = reactor_task->get_reactor();
2525  register_unicast_socket(reactor, unicast_socket_, "IPV4");
2526 #ifdef ACE_HAS_IPV6
2527  register_unicast_socket(reactor, unicast_ipv6_socket_, "IPV6");
2528 #endif
2529 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void register_unicast_socket(ACE_Reactor *reactor, ACE_SOCK_Dgram &socket, const char *what)
Definition: Spdp.cpp:2490
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
virtual ACE_Reactor * reactor(void) const
ACE_thread_mutex_t lock_
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507

◆ register_unicast_socket()

void OpenDDS::RTPS::Spdp::SpdpTransport::register_unicast_socket ( ACE_Reactor reactor,
ACE_SOCK_Dgram socket,
const char *  what 
)

Definition at line 2490 of file Spdp.cpp.

References ACE_IPC_SAP::control(), ACE_IPC_SAP::get_handle(), ACE_Event_Handler::READ_MASK, and ACE_Reactor::register_handler().

Referenced by register_handlers().

2492 {
2493 #ifdef ACE_WIN32
2494  // By default Winsock will cause reads to fail with "connection reset"
2495  // when UDP sends result in ICMP "port unreachable" messages.
2496  // The transport framework is not set up for this since returning <= 0
2497  // from our receive_bytes causes the framework to close down the datalink
2498  // which in this case is used to receive from multiple peers.
2499  {
2500  BOOL recv_udp_connreset = FALSE;
2501  socket.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
2502  }
2503 #endif
2504 
2505  if (reactor->register_handler(socket.get_handle(),
2506  this, ACE_Event_Handler::READ_MASK) != 0) {
2507  throw std::runtime_error(
2508  (DCPS::String("failed to register ") + what + " unicast input handler").c_str());
2509  }
2510 }
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int control(int cmd, void *) const
ACE_HANDLE get_handle(void) const
std::string String

◆ relay_stun_task()

void OpenDDS::RTPS::Spdp::SpdpTransport::relay_stun_task ( const DCPS::MonotonicTimePoint now)

Definition at line 4134 of file Spdp.cpp.

References OpenDDS::ICE::Configuration::instance(), OpenDDS::ICE::ServerReflexiveStateMachine::message(), outer_, process_relay_sra(), relay_srsm_, relay_stun_task_, relay_stun_task_falloff_, OpenDDS::ICE::ServerReflexiveStateMachine::send(), send(), and OpenDDS::ICE::Configuration::server_reflexive_indication_count().

Referenced by open().

4135 {
4136  DCPS::RcHandle<Spdp> outer = outer_.lock();
4137  if (!outer) return;
4138 
4139  if (outer->config_->use_rtps_relay() || outer->config_->rtps_relay_only()) {
4140  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
4141  if (relay_address != ACE_INET_Addr()) {
4143  send(relay_address, relay_srsm_.message());
4144  relay_stun_task_falloff_.advance(ICE::Configuration::instance()->server_reflexive_address_period());
4145  relay_stun_task_->schedule(relay_stun_task_falloff_.get());
4146  }
4147  }
4148 }
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
const STUN::Message & message() const
Definition: RTPS/ICE/Ice.h:237
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
static Configuration * instance()
Definition: Ice.cpp:109
StateChange send(const ACE_INET_Addr &address, size_t indication_count_limit, const DCPS::GuidPrefix_t &guid_prefix)
Definition: Ice.cpp:128
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
void process_relay_sra(ICE::ServerReflexiveStateMachine::StateChange)
Definition: Spdp.cpp:4150
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2871
void server_reflexive_indication_count(size_t x)
Definition: RTPS/ICE/Ice.h:127

◆ send() [1/3]

void OpenDDS::RTPS::Spdp::SpdpTransport::send ( WriteFlags  flags,
const ACE_INET_Addr local_address = ACE_INET_Addr() 
)

Definition at line 2871 of file Spdp.cpp.

References OPENDDS_SET(), outer_, SEND_DIRECT, SEND_MULTICAST, and SEND_RELAY.

Referenced by dispose_unregister(), relay_stun_task(), and write_i().

2872 {
2873  DCPS::RcHandle<Spdp> outer = outer_.lock();
2874  if (!outer) return;
2875 
2876  if ((flags & SEND_MULTICAST) && !outer->config_->rtps_relay_only()) {
2877  typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
2878  for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
2879  send(*iter, false);
2880  }
2881  }
2882 
2883  if (((flags & SEND_DIRECT) && !outer->config_->rtps_relay_only()) &&
2884  local_address != ACE_INET_Addr()) {
2885  send(local_address, false);
2886  }
2887 
2888  if ((flags & SEND_RELAY) || outer->config_->rtps_relay_only()) {
2889  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
2890  if (relay_address != ACE_INET_Addr()) {
2891  send(relay_address, true);
2892  }
2893  }
2894 }
static const WriteFlags SEND_DIRECT
Definition: Spdp.h:435
OPENDDS_SET(ACE_INET_Addr) send_addrs_
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2871
static const WriteFlags SEND_MULTICAST
Definition: Spdp.h:433

◆ send() [2/3]

ssize_t OpenDDS::RTPS::Spdp::SpdpTransport::send ( const ACE_INET_Addr addr,
bool  relay 
)

Definition at line 2909 of file Spdp.cpp.

References ACE_ERROR, ACE_TEXT(), choose_send_socket(), OpenDDS::DCPS::DCPS_debug_level, ENETUNREACH, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, ACE_Message_Block::length(), LM_WARNING, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::MCK_RTPS, OpenDDS::DCPS::InternalTransportStatistics::message_count, network_is_unreachable_, outer_, ACE_Message_Block::rd_ptr(), ACE_SOCK_Dgram::send(), socket(), transport_statistics_, wbuff_, and OpenDDS::DCPS::InternalTransportStatistics::writer_resend_count.

2910 {
2911  DCPS::RcHandle<Spdp> outer = outer_.lock();
2912  if (!outer) return -1;
2913 
2914 #ifdef OPENDDS_TESTING_FEATURES
2915  if (outer->sedp_->transport_inst()->should_drop(wbuff_.length())) {
2916  return wbuff_.length();
2917  }
2918 #endif
2919 
2920  const ACE_SOCK_Dgram& socket = choose_send_socket(addr);
2921  const ssize_t res = socket.send(wbuff_.rd_ptr(), wbuff_.length(), addr);
2922  if (outer->sedp_->transport_inst()->count_messages()) {
2924  }
2925  if (res < 0) {
2926  if (outer->sedp_->transport_inst()->count_messages()) {
2927  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(addr), DCPS::MCK_RTPS, relay);
2929  }
2930  const int err = errno;
2931  if (err != ENETUNREACH || !network_is_unreachable_) {
2932  errno = err;
2933  if (DCPS::DCPS_debug_level > 0) {
2934  ACE_ERROR((LM_WARNING,
2935  ACE_TEXT("(%P|%t) WARNING: Spdp::SpdpTransport::send() - ")
2936  ACE_TEXT("destination %C failed send: %m\n"), DCPS::LogAddr(addr).c_str()));
2937  }
2938  }
2939  if (err == ENETUNREACH) {
2940  network_is_unreachable_ = true;
2941  }
2942  } else {
2943  if (outer->sedp_->transport_inst()->count_messages()) {
2944  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(addr), DCPS::MCK_RTPS, relay);
2946  }
2947  network_is_unreachable_ = false;
2948  }
2949 
2950  return res;
2951 }
#define ACE_ERROR(X)
size_t length(void) const
const ACE_SOCK_Dgram & choose_send_socket(const ACE_INET_Addr &addr) const
Definition: Spdp.cpp:2897
sequence< octet > key
int ssize_t
char * rd_ptr(void) const
ACE_HANDLE socket(int protocol_family, int type, int proto)
ACE_Message_Block wbuff_
Definition: Spdp.h:520
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS::InternalTransportStatistics transport_statistics_
Definition: Spdp.h:552
ACE_TEXT("TCP_Factory")
const EntityId_t ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER
Definition: GuidUtils.h:44
const MessageCountKind MCK_RTPS
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ send() [3/3]

void OpenDDS::RTPS::Spdp::SpdpTransport::send ( const ACE_INET_Addr address,
const STUN::Message message 
)
virtual

Implements OpenDDS::ICE::Endpoint.

Definition at line 3278 of file Spdp.cpp.

References OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >::job_queue(), outer_, and OpenDDS::DCPS::rchandle_from().

3279 {
3280  DCPS::RcHandle<Spdp> outer = outer_.lock();
3281  if (!outer) return;
3282 
3283  DCPS::RcHandle<DCPS::JobQueue> job_queue = outer->sedp_->job_queue();
3284  if (job_queue) {
3285  job_queue->enqueue(DCPS::make_rch<SendStun>(rchandle_from(this), address, message));
3286  }
3287 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310

◆ send_directed()

void OpenDDS::RTPS::Spdp::SpdpTransport::send_directed ( const DCPS::MonotonicTimePoint now)

Definition at line 4232 of file Spdp.cpp.

References ACE_GUARD, directed_send_task_, outer_, SEND_DIRECT, SEND_RELAY, and write_i().

Referenced by open().

4233 {
4234  DCPS::RcHandle<Spdp> outer = outer_.lock();
4235  if (!outer) return;
4236 
4237  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
4238 
4239  while (!directed_guids_.empty()) {
4240  const DCPS::GUID_t id = directed_guids_.front();
4241  directed_guids_.pop_front();
4242 
4243  DiscoveredParticipantConstIter pos = outer->participants_.find(id);
4244  if (pos == outer->participants_.end()) {
4245  continue;
4246  }
4247 
4248  write_i(id, pos->second.last_recv_address_, SEND_DIRECT | SEND_RELAY);
4249  directed_guids_.push_back(id);
4250  directed_send_task_->schedule(outer->config_->resend_period() * (1.0 / directed_guids_.size()));
4251  break;
4252  }
4253 }
static const WriteFlags SEND_DIRECT
Definition: Spdp.h:435
DCPS::RcHandle< SpdpSporadic > directed_send_task_
Definition: Spdp.h:527
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434
DiscoveredParticipantMap::const_iterator DiscoveredParticipantConstIter
Definition: Spdp.h:71
void write_i(WriteFlags flags)
Definition: Spdp.cpp:2669
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
ACE_thread_mutex_t lock_

◆ send_local()

void OpenDDS::RTPS::Spdp::SpdpTransport::send_local ( const DCPS::MonotonicTimePoint now)

Definition at line 4227 of file Spdp.cpp.

References SEND_MULTICAST, and write().

Referenced by open().

4228 {
4230 }
void write(WriteFlags flags)
Definition: Spdp.cpp:2659
static const WriteFlags SEND_MULTICAST
Definition: Spdp.h:433

◆ send_relay()

void OpenDDS::RTPS::Spdp::SpdpTransport::send_relay ( const DCPS::MonotonicTimePoint now)

Definition at line 4211 of file Spdp.cpp.

References outer_, relay_spdp_task_, relay_spdp_task_falloff_, SEND_RELAY, and write().

Referenced by open().

4212 {
4213  DCPS::RcHandle<Spdp> outer = outer_.lock();
4214  if (!outer) return;
4215 
4216  if (outer->config_->use_rtps_relay() || outer->config_->rtps_relay_only()) {
4217  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
4218  if (relay_address != ACE_INET_Addr()) {
4219  write(SEND_RELAY);
4220  relay_spdp_task_falloff_.advance(outer->config_->spdp_rtps_relay_send_period());
4221  relay_spdp_task_->schedule(relay_spdp_task_falloff_.get());
4222  }
4223  }
4224 }
void write(WriteFlags flags)
Definition: Spdp.cpp:2659
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
DCPS::FibonacciSequence< TimeDuration > relay_spdp_task_falloff_
Definition: Spdp.h:541
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ shorten_local_sender_delay_i()

void OpenDDS::RTPS::Spdp::SpdpTransport::shorten_local_sender_delay_i ( )

Definition at line 2647 of file Spdp.cpp.

References local_send_task_, and outer_.

Referenced by on_data_available().

2648 {
2649  DCPS::RcHandle<Spdp> outer = outer_.lock();
2650  if (!outer) return;
2651 
2652  if (local_send_task_) {
2653  const TimeDuration quick_resend = outer->config_->resend_period() * outer->quick_resend_ratio_;
2654  local_send_task_->enable(std::max(quick_resend, outer->min_resend_delay_));
2655  }
2656 }
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ stun_server_address()

ACE_INET_Addr OpenDDS::RTPS::Spdp::SpdpTransport::stun_server_address ( ) const
virtual

Implements OpenDDS::ICE::Endpoint.

Definition at line 3341 of file Spdp.cpp.

References outer_.

3342 {
3343  DCPS::RcHandle<Spdp> outer = outer_.lock();
3344  return outer ? outer->config_->spdp_stun_server_address() : ACE_INET_Addr();
3345 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ thread_status_task()

void OpenDDS::RTPS::Spdp::SpdpTransport::thread_status_task ( const DCPS::MonotonicTimePoint now)

Definition at line 4264 of file Spdp.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::DCPS_debug_level, last_harvest, LM_DEBUG, DDS::NEW_VIEW_STATE, outer_, TheServiceParticipant, OpenDDS::DCPS::InternalThreadBuiltinTopicData::thread_id, and OpenDDS::DCPS::InternalThreadBuiltinTopicData::utilization.

Referenced by open().

4265 {
4266  ACE_UNUSED_ARG(now);
4267 #ifndef DDS_HAS_MINIMUM_BIT
4268  DCPS::RcHandle<Spdp> outer = outer_.lock();
4269  if (!outer) return;
4270 
4271  if (DCPS::DCPS_debug_level > 4) {
4272  ACE_DEBUG((LM_DEBUG,
4273  "(%P|%t) Spdp::SpdpTransport::thread_status_task(): Updating internal thread status BIT.\n"));
4274  }
4275 
4276  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
4277 
4278  typedef DCPS::ThreadStatusManager::List List;
4279  List running;
4280  List removed;
4281  TheServiceParticipant->get_thread_status_manager().harvest(last_harvest, running, removed);
4282  last_harvest = now;
4283  for (List::const_iterator i = removed.begin(); i != removed.end(); ++i) {
4284  DCPS::InternalThreadBuiltinTopicData data;
4285  data.thread_id = i->bit_key().c_str();
4286  outer->bit_subscriber_->remove_thread_status(data);
4287  }
4288  for (List::const_iterator i = running.begin(); i != running.end(); ++i) {
4289  DCPS::InternalThreadBuiltinTopicData data;
4290  data.thread_id = i->bit_key().c_str();
4291  data.utilization = i->utilization(now);
4292  outer->bit_subscriber_->add_thread_status(data, DDS::NEW_VIEW_STATE, i->timestamp());
4293  }
4294 
4295 #endif /* DDS_HAS_MINIMUM_BIT */
4296 }
DCPS::MonotonicTimePoint last_harvest
Definition: Spdp.h:553
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const ViewStateKind NEW_VIEW_STATE
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_thread_mutex_t lock_
#define TheServiceParticipant

◆ write()

void OpenDDS::RTPS::Spdp::SpdpTransport::write ( WriteFlags  flags)

Definition at line 2659 of file Spdp.cpp.

References ACE_GUARD, outer_, and write_i().

Referenced by send_local(), and send_relay().

2660 {
2661  DCPS::RcHandle<Spdp> outer = outer_.lock();
2662  if (!outer) return;
2663 
2664  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2665  write_i(flags);
2666 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void write_i(WriteFlags flags)
Definition: Spdp.cpp:2669
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
ACE_thread_mutex_t lock_

◆ write_i() [1/2]

void OpenDDS::RTPS::Spdp::SpdpTransport::write_i ( WriteFlags  flags)

Definition at line 2669 of file Spdp.cpp.

References ACE_ERROR, ACE_TEXT(), data_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Security::DPDK_ENHANCED, OpenDDS::Security::DPDK_ORIGINAL, OpenDDS::DCPS::Serializer::encoding(), get_ice_endpoint(), hdr_, LM_ERROR, OpenDDS::DCPS::MUTABLE, outer_, ACE_Message_Block::reset(), OpenDDS::RTPS::SEDP_AGENT_INFO_KEY, send(), seq_, OpenDDS::RTPS::SPDP_AGENT_INFO_KEY, OpenDDS::RTPS::ParameterListConverter::to_param_list(), OpenDDS::RTPS::to_rtps_seqnum(), wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by send_directed(), and write().

2670 {
2671  DCPS::RcHandle<Spdp> outer = outer_.lock();
2672  if (!outer) return;
2673 
2674  if (!outer->config_->undirected_spdp()) {
2675  return;
2676  }
2677 
2678  const ParticipantData_t pdata = outer->build_local_pdata(
2679 #ifdef OPENDDS_SECURITY
2680  true, outer->is_security_enabled() ? Security::DPDK_ENHANCED : Security::DPDK_ORIGINAL
2681 #endif
2682  );
2683 
2685  ++seq_;
2686 
2687  ParameterList plist;
2688  if (!ParameterListConverter::to_param_list(pdata, plist)) {
2689  if (DCPS::DCPS_debug_level > 0) {
2690  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2691  ACE_TEXT("Spdp::SpdpTransport::write() - ")
2692  ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
2693  ACE_TEXT("to ParameterList\n")));
2694  }
2695  return;
2696  }
2697 
2698 #ifdef OPENDDS_SECURITY
2699  if (!outer->is_security_enabled()) {
2700  ICE::AgentInfoMap ai_map;
2701  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = outer->sedp_->get_ice_endpoint();
2702  if (sedp_endpoint) {
2703  ai_map[SEDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(sedp_endpoint);
2704  }
2705  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = get_ice_endpoint();
2706  if (spdp_endpoint) {
2707  ai_map[SPDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(spdp_endpoint);
2708  }
2709 
2710  if (!ParameterListConverter::to_param_list(ai_map, plist)) {
2711  if (DCPS::DCPS_debug_level > 0) {
2712  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2713  ACE_TEXT("Spdp::SpdpTransport::write() - ")
2714  ACE_TEXT("failed to convert from ICE::AgentInfo ")
2715  ACE_TEXT("to ParameterList\n")));
2716  }
2717  return;
2718  }
2719  }
2720 #endif
2721 
2722  wbuff_.reset();
2723  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2724  DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE);
2725  if (!(ser << hdr_) || !(ser << data_) || !(ser << encap) || !(ser << plist)) {
2726  if (DCPS::DCPS_debug_level > 0) {
2727  ACE_ERROR((LM_ERROR,
2728  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
2729  ACE_TEXT("failed to serialize headers for SPDP\n")));
2730  }
2731  return;
2732  }
2733 
2734  send(flags);
2735 }
Security::SPDPdiscoveredParticipantData ParticipantData_t
#define ACE_ERROR(X)
SequenceNumber_t writerSN
Definition: RtpsCore.idl:671
void reset(void)
const char SEDP_AGENT_INFO_KEY[]
Definition: Spdp.h:57
bool to_param_list(const DDS::ParticipantBuiltinTopicData &pbtd, ParameterList &param_list)
ACE_Message_Block wbuff_
Definition: Spdp.h:520
const char SPDP_AGENT_INFO_KEY[]
Definition: Spdp.h:56
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
sequence< Parameter > ParameterList
ACE_TEXT("TCP_Factory")
DCPS::SequenceNumber seq_
Definition: Spdp.h:505
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2871
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3223

◆ write_i() [2/2]

void OpenDDS::RTPS::Spdp::SpdpTransport::write_i ( const DCPS::GUID_t guid,
const ACE_INET_Addr local_address,
WriteFlags  flags 
)

Definition at line 2801 of file Spdp.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::assign(), data_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Security::DPDK_ENHANCED, OpenDDS::Security::DPDK_ORIGINAL, OpenDDS::DCPS::Serializer::encoding(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::SubmessageHeader::flags, get_ice_endpoint(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, hdr_, OpenDDS::RTPS::INFO_DST, LM_ERROR, OpenDDS::DCPS::MUTABLE, outer_, ACE_Message_Block::reset(), OpenDDS::RTPS::SEDP_AGENT_INFO_KEY, send(), seq_, OpenDDS::RTPS::InfoDestinationSubmessage::smHeader, OpenDDS::RTPS::SPDP_AGENT_INFO_KEY, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::RTPS::ParameterListConverter::to_param_list(), OpenDDS::RTPS::to_rtps_seqnum(), wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

2802 {
2803  DCPS::RcHandle<Spdp> outer = outer_.lock();
2804  if (!outer) return;
2805 
2806  const ParticipantData_t pdata = outer->build_local_pdata(
2807 #ifdef OPENDDS_SECURITY
2808  true, outer->is_security_enabled() ? Security::DPDK_ENHANCED : Security::DPDK_ORIGINAL
2809 #endif
2810  );
2811 
2813  ++seq_;
2814 
2815  ParameterList plist;
2816  if (!ParameterListConverter::to_param_list(pdata, plist)) {
2817  if (DCPS::DCPS_debug_level > 0) {
2818  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2819  ACE_TEXT("Spdp::SpdpTransport::write_i() - ")
2820  ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
2821  ACE_TEXT("to ParameterList\n")));
2822  }
2823  return;
2824  }
2825 
2826 #ifdef OPENDDS_SECURITY
2827  if (!outer->is_security_enabled()) {
2828  ICE::AgentInfoMap ai_map;
2829  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = outer->sedp_->get_ice_endpoint();
2830  if (sedp_endpoint) {
2831  ai_map[SEDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(sedp_endpoint);
2832  }
2833  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = get_ice_endpoint();
2834  if (spdp_endpoint) {
2835  ai_map[SPDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(spdp_endpoint);
2836  }
2837 
2838  if (!ParameterListConverter::to_param_list(ai_map, plist)) {
2839  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2840  ACE_TEXT("Spdp::SpdpTransport::write_i() - ")
2841  ACE_TEXT("failed to convert from ICE::AgentInfo ")
2842  ACE_TEXT("to ParameterList\n")));
2843  return;
2844  }
2845  }
2846 #endif
2847 
2848  InfoDestinationSubmessage info_dst;
2849  info_dst.smHeader.submessageId = INFO_DST;
2850  info_dst.smHeader.flags = FLAG_E;
2851  info_dst.smHeader.submessageLength = sizeof(guid.guidPrefix);
2852  DCPS::assign(info_dst.guidPrefix, guid.guidPrefix);
2853 
2854  wbuff_.reset();
2855  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2856  DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE);
2857  if (!(ser << hdr_) || !(ser << info_dst) || !(ser << data_) || !(ser << encap)
2858  || !(ser << plist)) {
2859  if (DCPS::DCPS_debug_level > 0) {
2860  ACE_ERROR((LM_ERROR,
2861  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i() - ")
2862  ACE_TEXT("failed to serialize headers for SPDP\n")));
2863  }
2864  return;
2865  }
2866 
2867  send(flags, local_address);
2868 }
Security::SPDPdiscoveredParticipantData ParticipantData_t
#define ACE_ERROR(X)
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
SequenceNumber_t writerSN
Definition: RtpsCore.idl:671
const octet FLAG_E
Definition: RtpsCore.idl:518
void reset(void)
const char SEDP_AGENT_INFO_KEY[]
Definition: Spdp.h:57
bool to_param_list(const DDS::ParticipantBuiltinTopicData &pbtd, ParameterList &param_list)
ACE_Message_Block wbuff_
Definition: Spdp.h:520
const DCPS::GUID_t & guid() const
Definition: Spdp.h:94
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
const char SPDP_AGENT_INFO_KEY[]
Definition: Spdp.h:56
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
sequence< Parameter > ParameterList
ACE_TEXT("TCP_Factory")
DCPS::SequenceNumber seq_
Definition: Spdp.h:505
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2871
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3223

Member Data Documentation

◆ buff_

ACE_Message_Block OpenDDS::RTPS::Spdp::SpdpTransport::buff_

Definition at line 520 of file Spdp.h.

Referenced by handle_input().

◆ data_

DataSubmessage OpenDDS::RTPS::Spdp::SpdpTransport::data_

Definition at line 504 of file Spdp.h.

Referenced by dispose_unregister(), SpdpTransport(), and write_i().

◆ directed_send_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::directed_send_task_

Definition at line 527 of file Spdp.h.

Referenced by close(), open(), and send_directed().

◆ handshake_deadline_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::handshake_deadline_task_

Definition at line 536 of file Spdp.h.

Referenced by close(), and open().

◆ handshake_resend_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::handshake_resend_task_

Definition at line 538 of file Spdp.h.

Referenced by close(), and open().

◆ hdr_

Header OpenDDS::RTPS::Spdp::SpdpTransport::hdr_

Definition at line 503 of file Spdp.h.

Referenced by dispose_unregister(), SpdpTransport(), and write_i().

◆ ice_endpoint_added_

bool OpenDDS::RTPS::Spdp::SpdpTransport::ice_endpoint_added_

Definition at line 550 of file Spdp.h.

Referenced by close(), and open().

◆ last_harvest

DCPS::MonotonicTimePoint OpenDDS::RTPS::Spdp::SpdpTransport::last_harvest

Definition at line 553 of file Spdp.h.

Referenced by thread_status_task().

◆ lease_expiration_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::lease_expiration_task_

Definition at line 530 of file Spdp.h.

Referenced by close(), and open().

◆ local_send_task_

DCPS::RcHandle<SpdpMulti> OpenDDS::RTPS::Spdp::SpdpTransport::local_send_task_

Definition at line 525 of file Spdp.h.

Referenced by close(), enable_periodic_tasks(), open(), and shorten_local_sender_delay_i().

◆ multicast_address_

ACE_INET_Addr OpenDDS::RTPS::Spdp::SpdpTransport::multicast_address_

Definition at line 509 of file Spdp.h.

Referenced by on_data_available(), and SpdpTransport().

◆ multicast_interface_

OPENDDS_STRING OpenDDS::RTPS::Spdp::SpdpTransport::multicast_interface_

Definition at line 508 of file Spdp.h.

Referenced by on_data_available(), and SpdpTransport().

◆ multicast_manager_

DCPS::MulticastManager OpenDDS::RTPS::Spdp::SpdpTransport::multicast_manager_

Definition at line 518 of file Spdp.h.

Referenced by on_data_available().

◆ multicast_socket_

ACE_SOCK_Dgram_Mcast OpenDDS::RTPS::Spdp::SpdpTransport::multicast_socket_

Definition at line 510 of file Spdp.h.

Referenced by choose_recv_socket(), close(), on_data_available(), SpdpTransport(), and ~SpdpTransport().

◆ network_interface_address_reader_

DCPS::RcHandle<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> > OpenDDS::RTPS::Spdp::SpdpTransport::network_interface_address_reader_

Definition at line 533 of file Spdp.h.

Referenced by close(), on_data_available(), and open().

◆ network_is_unreachable_

bool OpenDDS::RTPS::Spdp::SpdpTransport::network_is_unreachable_

Definition at line 549 of file Spdp.h.

Referenced by send().

◆ outer_

DCPS::WeakRcHandle<Spdp> OpenDDS::RTPS::Spdp::SpdpTransport::outer_

◆ relay_spdp_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::relay_spdp_task_

Definition at line 540 of file Spdp.h.

Referenced by close(), enable_periodic_tasks(), open(), and send_relay().

◆ relay_spdp_task_falloff_

DCPS::FibonacciSequence<TimeDuration> OpenDDS::RTPS::Spdp::SpdpTransport::relay_spdp_task_falloff_

Definition at line 541 of file Spdp.h.

Referenced by enable_periodic_tasks(), and send_relay().

◆ relay_srsm_

ICE::ServerReflexiveStateMachine OpenDDS::RTPS::Spdp::SpdpTransport::relay_srsm_

Definition at line 545 of file Spdp.h.

Referenced by disable_relay_stun_task(), handle_input(), process_relay_sra(), and relay_stun_task().

◆ relay_stun_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::relay_stun_task_

Definition at line 543 of file Spdp.h.

Referenced by close(), disable_relay_stun_task(), enable_periodic_tasks(), open(), and relay_stun_task().

◆ relay_stun_task_falloff_

DCPS::FibonacciSequence<TimeDuration> OpenDDS::RTPS::Spdp::SpdpTransport::relay_stun_task_falloff_

Definition at line 544 of file Spdp.h.

Referenced by enable_periodic_tasks(), process_relay_sra(), and relay_stun_task().

◆ SEND_DIRECT

const Spdp::SpdpTransport::WriteFlags OpenDDS::RTPS::Spdp::SpdpTransport::SEND_DIRECT = (1 << 2)
static

◆ SEND_MULTICAST

const Spdp::SpdpTransport::WriteFlags OpenDDS::RTPS::Spdp::SpdpTransport::SEND_MULTICAST = (1 << 0)
static

◆ SEND_RELAY

const Spdp::SpdpTransport::WriteFlags OpenDDS::RTPS::Spdp::SpdpTransport::SEND_RELAY = (1 << 1)
static

◆ seq_

DCPS::SequenceNumber OpenDDS::RTPS::Spdp::SpdpTransport::seq_

Definition at line 505 of file Spdp.h.

Referenced by dispose_unregister(), and write_i().

◆ thread_status_task_

DCPS::RcHandle<SpdpPeriodic> OpenDDS::RTPS::Spdp::SpdpTransport::thread_status_task_

Definition at line 532 of file Spdp.h.

Referenced by close(), enable_periodic_tasks(), and open().

◆ transport_statistics_

DCPS::InternalTransportStatistics OpenDDS::RTPS::Spdp::SpdpTransport::transport_statistics_

Definition at line 552 of file Spdp.h.

Referenced by handle_input(), and send().

◆ uni_port_

u_short OpenDDS::RTPS::Spdp::SpdpTransport::uni_port_

Definition at line 506 of file Spdp.h.

Referenced by open_unicast_socket(), and SpdpTransport().

◆ unicast_socket_

ACE_SOCK_Dgram OpenDDS::RTPS::Spdp::SpdpTransport::unicast_socket_

◆ wbuff_

ACE_Message_Block OpenDDS::RTPS::Spdp::SpdpTransport::wbuff_

Definition at line 520 of file Spdp.h.

Referenced by dispose_unregister(), send(), and write_i().


The documentation for this struct was generated from the following files: