OpenDDS  Snapshot(2023/04/28-20:55)
Classes | Public Types | Public Member Functions | Public Attributes | Static Public Attributes | List of all members
OpenDDS::RTPS::Spdp::SpdpTransport Struct Reference
Inheritance diagram for OpenDDS::RTPS::Spdp::SpdpTransport:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Spdp::SpdpTransport:
Collaboration graph
[legend]

Classes

class  RegisterHandlers
 

Public Types

typedef size_t WriteFlags
 
typedef DCPS::PmfPeriodicTask< SpdpTransportSpdpPeriodic
 
typedef DCPS::PmfSporadicTask< SpdpTransportSpdpSporadic
 
typedef DCPS::PmfMultiTask< SpdpTransportSpdpMulti
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Public Types inherited from OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >
typedef RcHandle< InternalDataReader< DCPS::NetworkInterfaceAddress > > InternalDataReader_rch
 

Public Member Functions

 SpdpTransport (DCPS::RcHandle< Spdp > outer)
 
 ~SpdpTransport ()
 
const ACE_SOCK_Dgramchoose_recv_socket (ACE_HANDLE h) const
 
virtual int handle_input (ACE_HANDLE h)
 
void open (const DCPS::ReactorTask_rch &reactor_task, const DCPS::JobQueue_rch &job_queue)
 
void register_unicast_socket (ACE_Reactor *reactor, ACE_SOCK_Dgram &socket, const char *what)
 
void register_handlers (const DCPS::ReactorTask_rch &reactor_task)
 
void enable_periodic_tasks ()
 
void shorten_local_sender_delay_i ()
 
void write (WriteFlags flags)
 
void write_i (WriteFlags flags)
 
void write_i (const DCPS::GUID_t &guid, const ACE_INET_Addr &local_address, WriteFlags flags)
 
void send (WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
 
const ACE_SOCK_Dgramchoose_send_socket (const ACE_INET_Addr &addr) const
 
ssize_t send (const ACE_INET_Addr &addr, bool relay)
 
void close (const DCPS::ReactorTask_rch &reactor_task)
 
void dispose_unregister ()
 
bool open_unicast_socket (u_short port_common, u_short participant_id)
 
void on_data_available (DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader)
 
DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
ICE::AddressListType host_addresses () const
 
void send (const ACE_INET_Addr &address, const STUN::Message &message)
 
ACE_INET_Addr stun_server_address () const
 
void ice_connect (const ICE::GuidSetType &guids, const ACE_INET_Addr &addr)
 
void ice_disconnect (const ICE::GuidSetType &guids, const ACE_INET_Addr &addr)
 
 OPENDDS_SET (ACE_INET_Addr) send_addrs_
 
void send_local (const DCPS::MonotonicTimePoint &now)
 
void send_directed (const DCPS::MonotonicTimePoint &now)
 
 OPENDDS_LIST (DCPS::GUID_t) directed_guids_
 
void process_lease_expirations (const DCPS::MonotonicTimePoint &now)
 
void thread_status_task (const DCPS::MonotonicTimePoint &now)
 
void process_handshake_deadlines (const DCPS::MonotonicTimePoint &now)
 
void process_handshake_resends (const DCPS::MonotonicTimePoint &now)
 
void send_relay (const DCPS::MonotonicTimePoint &now)
 
void relay_stun_task (const DCPS::MonotonicTimePoint &now)
 
void process_relay_sra (ICE::ServerReflexiveStateMachine::StateChange)
 
void disable_relay_stun_task ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >
 InternalDataReaderListener ()
 
 InternalDataReaderListener (JobQueue_rch job_queue)
 
void job_queue (JobQueue_rch job_queue)
 
virtual void on_data_available (InternalDataReader_rch reader)=0
 
void schedule (InternalDataReader_rch reader)
 
- Public Member Functions inherited from OpenDDS::ICE::Endpoint
virtual ~Endpoint ()
 

Public Attributes

DCPS::WeakRcHandle< Spdpouter_
 
Header hdr_
 
DataSubmessage data_
 
DCPS::SequenceNumber seq_
 
u_short uni_port_
 
ACE_SOCK_Dgram unicast_socket_
 
OPENDDS_STRING multicast_interface_
 
ACE_INET_Addr multicast_address_
 
ACE_SOCK_Dgram_Mcast multicast_socket_
 
DCPS::MulticastManager multicast_manager_
 
ACE_Message_Block buff_
 
ACE_Message_Block wbuff_
 
DCPS::RcHandle< SpdpMultilocal_send_task_
 
DCPS::RcHandle< SpdpSporadicdirected_send_task_
 
DCPS::RcHandle< SpdpSporadiclease_expiration_task_
 
DCPS::RcHandle< SpdpPeriodicthread_status_task_
 
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
 
DCPS::RcHandle< SpdpSporadichandshake_deadline_task_
 
DCPS::RcHandle< SpdpSporadichandshake_resend_task_
 
DCPS::RcHandle< SpdpSporadicrelay_spdp_task_
 
DCPS::FibonacciSequence< TimeDurationrelay_spdp_task_falloff_
 
DCPS::RcHandle< SpdpSporadicrelay_stun_task_
 
DCPS::FibonacciSequence< TimeDurationrelay_stun_task_falloff_
 
ICE::ServerReflexiveStateMachine relay_srsm_
 
bool network_is_unreachable_
 
bool ice_endpoint_added_
 
DCPS::InternalTransportStatistics transport_statistics_
 
DCPS::MonotonicTimePoint last_harvest
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 

Static Public Attributes

static const WriteFlags SEND_MULTICAST = (1 << 0)
 
static const WriteFlags SEND_RELAY = (1 << 1)
 
static const WriteFlags SEND_DIRECT = (1 << 2)
 

Additional Inherited Members

- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 425 of file Spdp.h.

Member Typedef Documentation

◆ SpdpMulti

Definition at line 523 of file Spdp.h.

◆ SpdpPeriodic

Definition at line 521 of file Spdp.h.

◆ SpdpSporadic

Definition at line 522 of file Spdp.h.

◆ WriteFlags

Definition at line 432 of file Spdp.h.

Constructor & Destructor Documentation

◆ SpdpTransport()

OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport ( DCPS::RcHandle< Spdp outer)
explicit

Definition at line 2308 of file Spdp.cpp.

References ACE_ERROR, OpenDDS::DCPS::assign(), OpenDDS::RTPS::DATA, data_, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::DataSubmessage::extraFlags, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::SubmessageHeader::flags, ACE_OS::getpid(), OpenDDS::RTPS::Header::guidPrefix, hdr_, OpenDDS::RTPS::SequenceNumber_t::high, LM_WARNING, OpenDDS::DCPS::log_level, OpenDDS::RTPS::SequenceNumber_t::low, multicast_address_, multicast_interface_, multicast_socket_, OpenDDS::RTPS::DataSubmessage::octetsToInlineQos, open_unicast_socket(), ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::RTPS::DataSubmessage::readerId, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, uni_port_, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, OpenDDS::RTPS::Header::version, OpenDDS::DCPS::LogLevel::Warning, OpenDDS::RTPS::DataSubmessage::writerId, and OpenDDS::RTPS::DataSubmessage::writerSN.

2309  : outer_(outer)
2310  , buff_(64 * 1024)
2311  , wbuff_(64 * 1024)
2312 #ifdef OPENDDS_SECURITY
2313  , relay_spdp_task_falloff_(outer->config()->sedp_heartbeat_period())
2314  , relay_stun_task_falloff_(outer->config()->sedp_heartbeat_period())
2315 #endif
2316  , network_is_unreachable_(false)
2317  , ice_endpoint_added_(false)
2319  OPENDDS_STRING("_SPDPTransportInst_") +
2320  DCPS::GuidConverter(outer->guid_).uniqueParticipantId() +
2321  DCPS::to_dds_string(outer->domain_))
2322 {
2323  hdr_.prefix[0] = 'R';
2324  hdr_.prefix[1] = 'T';
2325  hdr_.prefix[2] = 'P';
2326  hdr_.prefix[3] = 'S';
2329  DCPS::assign(hdr_.guidPrefix, outer->guid_.guidPrefix);
2332  data_.smHeader.submessageLength = 0; // last submessage in the Message
2333  data_.extraFlags = 0;
2337  data_.writerSN.high = 0;
2338  data_.writerSN.low = 0;
2339 
2340 #ifdef ACE_HAS_MAC_OSX
2343 #ifdef ACE_HAS_IPV6
2344  multicast_ipv6_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
2346 #endif
2347 #endif
2348 
2349  multicast_interface_ = outer->disco_->multicast_interface();
2350 
2351  const u_short port_common = outer->config_->port_common(outer->domain_);
2352  multicast_address_ = outer->config_->multicast_address(port_common);
2353 
2354 #ifdef ACE_HAS_IPV6
2355  multicast_ipv6_address_ = outer->config_->ipv6_multicast_address(port_common);
2356 #endif
2357 
2358  send_addrs_.insert(multicast_address_);
2359 #ifdef ACE_HAS_IPV6
2360  send_addrs_.insert(multicast_ipv6_address_);
2361 #endif
2362 
2363  typedef RtpsDiscovery::AddrVec::const_iterator iter;
2364  const RtpsDiscovery::AddrVec addrs = outer->config_->spdp_send_addrs();
2365  for (iter it = addrs.begin(),
2366  end = addrs.end(); it != end; ++it) {
2367  send_addrs_.insert(ACE_INET_Addr(it->c_str()));
2368  }
2369 
2370  u_short participantId = 0;
2371 
2372 #ifdef OPENDDS_SAFETY_PROFILE
2373  const u_short startingParticipantId = participantId;
2374 #endif
2375 
2376  const u_short max_part_id = 119; // RTPS 2.5 9.6.2.3
2377  while (!open_unicast_socket(port_common, participantId)) {
2378  if (participantId == max_part_id && log_level >= LogLevel::Warning) {
2379  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: Spdp::SpdpTransport: "
2380  "participant id is going above max %u allowed by RTPS spec\n", max_part_id));
2381  // As long as it doesn't result in an invalid port, going past this
2382  // shouldn't cause a problem, but it could be a sign that OpenDDS has a
2383  // limited number of ports at its disposal. Also another implementation
2384  // could use this as a hard limit, but that's much less of a concern.
2385  }
2386  ++participantId;
2387  }
2388 #ifdef ACE_HAS_IPV6
2389  u_short port = uni_port_;
2390 
2391  while (!open_unicast_ipv6_socket(port)) {
2392  ++port;
2393  }
2394 #endif
2395 
2396 #ifdef OPENDDS_SAFETY_PROFILE
2397  if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
2398  // Since pids are not available, use the fact that we had to increment
2399  // participantId to modify the GUID's pid bytes. This avoids GUID conflicts
2400  // between processes on the same host which start at the same time
2401  // (resulting in the same seed value for the random number generator).
2402  hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
2403  hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
2404  outer->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
2405  outer->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
2406  }
2407 #endif
2408 }
#define ACE_ERROR(X)
const octet FLAG_E
Definition: RtpsCore.idl:521
SubmessageHeader smHeader
Definition: RtpsCore.idl:667
DCPS::FibonacciSequence< TimeDuration > relay_spdp_task_falloff_
Definition: Spdp.h:541
String to_dds_string(unsigned short to_convert)
const VendorId_t VENDORID_OPENDDS
Definition: MessageTypes.h:26
void opts(int opts)
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
Definition: MessageTypes.h:102
ACE_Message_Block buff_
Definition: Spdp.h:520
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
const ProtocolVersion_t PROTOCOLVERSION
Definition: MessageTypes.h:67
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
ACE_Message_Block wbuff_
Definition: Spdp.h:520
unsigned short octetsToInlineQos
Definition: RtpsCore.idl:672
#define OPENDDS_STRING
ProtocolVersion_t version
Definition: RtpsCore.idl:656
DCPS::InternalTransportStatistics transport_statistics_
Definition: Spdp.h:552
const EntityId_t ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER
Definition: GuidUtils.h:44
DCPS::GuidPrefix_t guidPrefix
Definition: RtpsCore.idl:658
SequenceNumber_t writerSN
Definition: RtpsCore.idl:675
RtpsDiscoveryConfig::AddrVec AddrVec
Definition: RtpsDiscovery.h:57
DCPS::EntityId_t writerId
Definition: RtpsCore.idl:674
bool open_unicast_socket(u_short port_common, u_short participant_id)
Definition: Spdp.cpp:3400
DCPS::EntityId_t readerId
Definition: RtpsCore.idl:673
OctetArray4 prefix
Definition: RtpsCore.idl:655
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510
OpenDDS_Dcps_Export LogLevel log_level
ACE_CDR::Octet Octet
static const char DEFAULT_INST_PREFIX[]
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
ACE_INET_Addr multicast_address_
Definition: Spdp.h:509
OPENDDS_STRING multicast_interface_
Definition: Spdp.h:508
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
pid_t getpid(void)
const octet FLAG_D
Definition: RtpsCore.idl:526

◆ ~SpdpTransport()

OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport ( )

Definition at line 2477 of file Spdp.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), ACE_SOCK::close(), OpenDDS::DCPS::DCPS_debug_level, dispose_unregister(), LM_INFO, multicast_socket_, outer_, and unicast_socket_.

2478 {
2479  if (DCPS::DCPS_debug_level > 3) {
2480  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
2481  }
2482 
2483  DCPS::RcHandle<Spdp> outer = outer_.lock();
2484 
2485  if (outer) {
2486  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2487  try {
2489  } catch (const CORBA::BAD_PARAM&) {}
2490  outer->eh_shutdown_ = true;
2491  outer->shutdown_cond_.notify_all();
2492  }
2493 
2496 #ifdef ACE_HAS_IPV6
2497  unicast_ipv6_socket_.close();
2498  multicast_ipv6_socket_.close();
2499 #endif
2500 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
ACE_TEXT("TCP_Factory")
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_thread_mutex_t lock_
int close(void)

Member Function Documentation

◆ choose_recv_socket()

const ACE_SOCK_Dgram & OpenDDS::RTPS::Spdp::SpdpTransport::choose_recv_socket ( ACE_HANDLE  h) const

Definition at line 2966 of file Spdp.cpp.

References ACE_IPC_SAP::get_handle(), multicast_socket_, and unicast_socket_.

Referenced by handle_input().

2967 {
2968 #ifdef ACE_HAS_IPV6
2969  if (h == unicast_ipv6_socket_.get_handle()) {
2970  return unicast_ipv6_socket_;
2971  }
2972  if (h == multicast_ipv6_socket_.get_handle()) {
2973  return multicast_ipv6_socket_;
2974  }
2975 #endif
2976  if (h == multicast_socket_.get_handle()) {
2977  return multicast_socket_;
2978  }
2979 
2980  return unicast_socket_;
2981 }
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
ACE_HANDLE get_handle(void) const
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510

◆ choose_send_socket()

const ACE_SOCK_Dgram & OpenDDS::RTPS::Spdp::SpdpTransport::choose_send_socket ( const ACE_INET_Addr addr) const

Definition at line 2909 of file Spdp.cpp.

References ACE_Addr::get_type(), and unicast_socket_.

Referenced by send().

2910 {
2911 #ifdef ACE_HAS_IPV6
2912  if (addr.get_type() == AF_INET6) {
2913  return unicast_ipv6_socket_;
2914  }
2915 #endif
2916  ACE_UNUSED_ARG(addr);
2917  return unicast_socket_;
2918 }
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
int get_type(void) const

◆ close()

void OpenDDS::RTPS::Spdp::SpdpTransport::close ( const DCPS::ReactorTask_rch reactor_task)

Definition at line 2603 of file Spdp.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, directed_send_task_, ACE_Event_Handler::DONT_CALL, ACE_IPC_SAP::get_handle(), get_ice_endpoint(), OpenDDS::DCPS::ReactorTask::get_reactor(), handshake_deadline_task_, handshake_resend_task_, ice_endpoint_added_, lease_expiration_task_, LM_INFO, local_send_task_, multicast_socket_, network_interface_address_reader_, outer_, ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, relay_spdp_task_, relay_stun_task_, ACE_Reactor::remove_handler(), TheServiceParticipant, thread_status_task_, and unicast_socket_.

2604 {
2605  if (DCPS::DCPS_debug_level > 3) {
2606  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
2607  }
2608 
2609  DCPS::RcHandle<Spdp> outer = outer_.lock();
2610  if (!outer) return;
2611 
2612  TheServiceParticipant->network_interface_address_topic()->disconnect(network_interface_address_reader_);
2613 
2614 #ifdef OPENDDS_SECURITY
2615  DCPS::WeakRcHandle<ICE::Endpoint> endpoint = get_ice_endpoint();
2616  if (endpoint) {
2617  outer->ice_agent_->remove_endpoint(endpoint);
2618  ice_endpoint_added_ = false;
2619  }
2620 
2622  handshake_deadline_task_->cancel();
2623  }
2624  if (handshake_resend_task_) {
2625  handshake_resend_task_->cancel();
2626  }
2627  if (relay_spdp_task_) {
2628  relay_spdp_task_->cancel();
2629  }
2630  if (relay_stun_task_) {
2631  relay_stun_task_->cancel();
2632  }
2633 #endif
2634  if (local_send_task_) {
2635  local_send_task_->disable();
2636  }
2637  if (directed_send_task_) {
2638  directed_send_task_->cancel();
2639  }
2640  if (lease_expiration_task_) {
2641  lease_expiration_task_->cancel();
2642  }
2643  if (thread_status_task_) {
2644  thread_status_task_->disable();
2645  }
2646 
2647  ACE_Reactor* reactor = reactor_task->get_reactor();
2648  const ACE_Reactor_Mask mask =
2650  reactor->remove_handler(multicast_socket_.get_handle(), mask);
2651  reactor->remove_handler(unicast_socket_.get_handle(), mask);
2652 #ifdef ACE_HAS_IPV6
2653  reactor->remove_handler(multicast_ipv6_socket_.get_handle(), mask);
2654  reactor->remove_handler(unicast_ipv6_socket_.get_handle(), mask);
2655 #endif
2656 }
#define ACE_DEBUG(X)
unsigned long ACE_Reactor_Mask
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3235
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
DCPS::RcHandle< SpdpSporadic > directed_send_task_
Definition: Spdp.h:527
ACE_HANDLE get_handle(void) const
DCPS::RcHandle< SpdpSporadic > handshake_deadline_task_
Definition: Spdp.h:536
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
Definition: Spdp.h:533
virtual ACE_Reactor * reactor(void) const
DCPS::RcHandle< SpdpPeriodic > thread_status_task_
Definition: Spdp.h:532
ACE_TEXT("TCP_Factory")
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
DCPS::RcHandle< SpdpSporadic > lease_expiration_task_
Definition: Spdp.h:530
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DCPS::RcHandle< SpdpSporadic > handshake_resend_task_
Definition: Spdp.h:538
#define TheServiceParticipant

◆ disable_relay_stun_task()

void OpenDDS::RTPS::Spdp::SpdpTransport::disable_relay_stun_task ( )

Definition at line 4211 of file Spdp.cpp.

References OpenDDS::DCPS::ConnectionRecord::address, OpenDDS::DCPS::ConnectionRecord::guid, OpenDDS::DCPS::ConnectionRecord::latency, outer_, OpenDDS::DCPS::ConnectionRecord::protocol, relay_srsm_, relay_stun_task_, OpenDDS::DCPS::RTPS_RELAY_STUN_PROTOCOL, OpenDDS::ICE::ServerReflexiveStateMachine::stun_server_address(), OpenDDS::DCPS::TimeDuration::to_dds_duration(), and OpenDDS::DCPS::TimeDuration::zero_value.

4212 {
4213 #ifndef DDS_HAS_MINIMUM_BIT
4214  DCPS::RcHandle<Spdp> outer = outer_.lock();
4215  if (!outer) return;
4216 
4217  relay_stun_task_->cancel();
4218 
4219  DCPS::ConnectionRecord connection_record;
4220  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
4221  connection_record.protocol = DCPS::RTPS_RELAY_STUN_PROTOCOL;
4222  connection_record.latency = DCPS::TimeDuration::zero_value.to_dds_duration();
4223 
4225  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4226  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, false, connection_record));
4227  }
4228 
4229  relay_srsm_ = ICE::ServerReflexiveStateMachine();
4230 #endif
4231 }
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
const ACE_INET_Addr & stun_server_address() const
Definition: RTPS/ICE/Ice.h:239
DDS::Duration_t to_dds_duration() const
static const TimeDuration zero_value
Definition: TimeDuration.h:31
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
const string RTPS_RELAY_STUN_PROTOCOL

◆ dispose_unregister()

void OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister ( )

Definition at line 2570 of file Spdp.cpp.

References ACE_ERROR, ACE_TEXT(), data_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::Serializer::encoding(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_Q, OpenDDS::RTPS::SubmessageHeader::flags, hdr_, OpenDDS::RTPS::DataSubmessage::inlineQos, LM_ERROR, OpenDDS::DCPS::MUTABLE, outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, ACE_Message_Block::reset(), send(), SEND_MULTICAST, SEND_RELAY, seq_, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::to_rtps_seqnum(), wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by ~SpdpTransport().

2571 {
2572  DCPS::RcHandle<Spdp> outer = outer_.lock();
2573  if (!outer) return;
2574 
2575  // Send the dispose/unregister SPDP sample
2578  data_.inlineQos.length(1);
2579  static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
2580  data_.inlineQos[0].status_info(dispose_unregister);
2581 
2582  ParameterList plist(1);
2583  plist.length(1);
2584  plist[0].guid(outer->guid_);
2585  plist[0]._d(PID_PARTICIPANT_GUID);
2586 
2587  wbuff_.reset();
2588  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2589  DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE);
2590  if (!(ser << hdr_) || !(ser << data_) || !(ser << encap) || !(ser << plist)) {
2591  if (DCPS::DCPS_debug_level > 0) {
2592  ACE_ERROR((LM_ERROR,
2593  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
2594  ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
2595  }
2596  return;
2597  }
2598 
2600 }
#define ACE_ERROR(X)
const octet FLAG_E
Definition: RtpsCore.idl:521
SubmessageHeader smHeader
Definition: RtpsCore.idl:667
const ParameterId_t PID_PARTICIPANT_GUID
Definition: RtpsCore.idl:289
void reset(void)
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
ACE_Message_Block wbuff_
Definition: Spdp.h:520
DCPS::SequenceNumber seq_
Definition: Spdp.h:505
const octet FLAG_Q
Definition: RtpsCore.idl:522
SequenceNumber_t writerSN
Definition: RtpsCore.idl:675
sequence< Parameter > ParameterList
ACE_TEXT("TCP_Factory")
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2883
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:530
static const WriteFlags SEND_MULTICAST
Definition: Spdp.h:433

◆ enable_periodic_tasks()

void OpenDDS::RTPS::Spdp::SpdpTransport::enable_periodic_tasks ( )

Definition at line 2544 of file Spdp.cpp.

References local_send_task_, outer_, relay_spdp_task_, relay_spdp_task_falloff_, relay_stun_task_, relay_stun_task_falloff_, TheServiceParticipant, OpenDDS::DCPS::ThreadStatusManager::thread_status_interval(), thread_status_task_, OpenDDS::DCPS::ThreadStatusManager::update_thread_status(), and OpenDDS::DCPS::TimeDuration::zero_value.

2545 {
2546  if (local_send_task_) {
2548  }
2549 
2550 #ifdef OPENDDS_SECURITY
2551  DCPS::RcHandle<Spdp> outer = outer_.lock();
2552  if (!outer) return;
2553 
2554  relay_spdp_task_falloff_.set(outer->config_->sedp_heartbeat_period());
2556 
2557  relay_stun_task_falloff_.set(outer->config_->sedp_heartbeat_period());
2559 #endif
2560 
2561 #ifndef DDS_HAS_MINIMUM_BIT
2562  const DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
2563  if (thread_status_manager.update_thread_status()) {
2564  thread_status_task_->enable(false, thread_status_manager.thread_status_interval());
2565  }
2566 #endif /* DDS_HAS_MINIMUM_BIT */
2567 }
DCPS::FibonacciSequence< TimeDuration > relay_spdp_task_falloff_
Definition: Spdp.h:541
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
DCPS::RcHandle< SpdpPeriodic > thread_status_task_
Definition: Spdp.h:532
static const TimeDuration zero_value
Definition: TimeDuration.h:31
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
#define TheServiceParticipant

◆ get_ice_endpoint()

DCPS::WeakRcHandle< ICE::Endpoint > OpenDDS::RTPS::Spdp::SpdpTransport::get_ice_endpoint ( )

Definition at line 3235 of file Spdp.cpp.

References outer_, OpenDDS::DCPS::rchandle_from(), and OpenDDS::DCPS::static_rchandle_cast().

Referenced by close(), handle_input(), open(), and write_i().

3236 {
3237 #ifdef OPENDDS_SECURITY
3238  DCPS::RcHandle<Spdp> outer = outer_.lock();
3239  return outer && outer->config_->use_ice() ? DCPS::static_rchandle_cast<ICE::Endpoint>(rchandle_from(this)) : DCPS::WeakRcHandle<ICE::Endpoint>();
3240 #else
3241  return DCPS::WeakRcHandle<ICE::Endpoint>();
3242 #endif
3243 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202

◆ handle_input()

int OpenDDS::RTPS::Spdp::SpdpTransport::handle_input ( ACE_HANDLE  h)
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 2989 of file Spdp.cpp.

References ACE_CDR_BYTE_ORDER, ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_NOTSUP_RETURN, ACE_TEXT(), OpenDDS::RTPS::append_submessage(), OpenDDS::STUN::Message::block, buff_, choose_recv_socket(), OpenDDS::RTPS::DATA, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::STUN::encoding(), OpenDDS::DCPS::Serializer::encoding(), OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::SubmessageHeader::flags, ACE_IPC_SAP::get_handle(), get_ice_endpoint(), OpenDDS::RTPS::Spdp::guid(), OpenDDS::RTPS::Header::guidPrefix, OpenDDS::RTPS::Message::hdr, header, OpenDDS::RTPS::INFO_DST, OpenDDS::ICE::ServerReflexiveStateMachine::is_response(), OpenDDS::DCPS::Encoding::kind(), OpenDDS::DCPS::Encoding::KIND_XCDR1, ACE_Message_Block::length(), LM_ERROR, LM_WARNING, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::MCK_RTPS, OpenDDS::DCPS::MCK_STUN, ACE_OS::memcmp(), OpenDDS::DCPS::InternalTransportStatistics::message_count, OpenDDS::DCPS::MUTABLE, outer_, OpenDDS::RTPS::PID_PARTICIPANT_GUID, process_relay_sra(), ACE_Message_Block::rd_ptr(), read(), OpenDDS::ICE::ServerReflexiveStateMachine::receive(), relay_srsm_, ACE_Message_Block::reset(), ACE_Message_Block::size(), OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::InfoDestinationSubmessage::smHeader, socket(), ACE_Message_Block::space(), OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::DCPS::Serializer::swap_bytes(), TheServiceParticipant, OpenDDS::DCPS::EncapsulationHeader::to_encoding(), OpenDDS::DCPS::transport_debug, transport_statistics_, unicast_socket_, OpenDDS::RTPS::valid_size(), ACE_Message_Block::wr_ptr(), and OpenDDS::RTPS::DataSubmessage::writerId.

2990 {
2991  DCPS::ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2992 
2994  ACE_INET_Addr remote;
2995  buff_.reset();
2996 
2997 #ifdef ACE_LACKS_SENDMSG
2998  const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
2999 #else
3000  ACE_INET_Addr local;
3001 
3002  iovec iov[1];
3003  iov[0].iov_base = buff_.wr_ptr();
3004 #ifdef _MSC_VER
3005 #pragma warning(push)
3006  // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
3007  // since on other platforms iov_len is 64-bit
3008 #pragma warning(disable : 4267)
3009 #endif
3010  iov[0].iov_len = buff_.space();
3011 #ifdef _MSC_VER
3012 #pragma warning(pop)
3013 #endif
3014  const ssize_t bytes = socket.recv(iov, 1, remote, 0
3015 #if defined(ACE_RECVPKTINFO) || defined(ACE_RECVPKTINFO6)
3016  , &local
3017 #endif
3018  );
3019 #endif
3020 
3021  if (!valid_size(remote)) {
3022  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - invalid address size\n")));
3023  return 0;
3024  }
3025 
3026  if (bytes > 0) {
3027  buff_.wr_ptr(bytes);
3028  } else if (bytes == 0) {
3029  return 0;
3030  } else {
3031  ACE_DEBUG((
3032  LM_WARNING,
3033  ACE_TEXT("(%P|%t) WARNING: Spdp::SpdpTransport::handle_input() - ")
3034  ACE_TEXT("error reading from %C socket %p\n")
3035  , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
3036  ACE_TEXT("ACE_SOCK_Dgram::recv")));
3037  return 0;
3038  }
3039 
3040  DCPS::RcHandle<Spdp> outer = outer_.lock();
3041 
3042  if (!outer) {
3043  return 0;
3044  }
3045 
3046  const bool relay_in_use = (outer->config_->rtps_relay_only() || outer->config_->use_rtps_relay());
3047  const bool remote_matches_relay_addr = (remote == outer->config_->spdp_rtps_relay_address());
3048  const bool from_relay = relay_in_use && remote_matches_relay_addr;
3049 
3050  // Ignore messages from the relay when not using it.
3051  if (!relay_in_use && remote_matches_relay_addr) {
3052  return 0;
3053  }
3054 
3055  if ((buff_.size() >= 4) && ACE_OS::memcmp(buff_.rd_ptr(), "RTPS", 4) == 0) {
3056  RTPS::Message message;
3057 
3058  DCPS::Serializer ser(&buff_, encoding_plain_native);
3059  Header header;
3060  if (!(ser >> header)) {
3061  if (DCPS::DCPS_debug_level > 0) {
3062  ACE_ERROR((LM_ERROR,
3063  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3064  ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
3065  }
3066  return 0;
3067  }
3068 
3069  if (outer->sedp_->transport_inst()->count_messages()) {
3070  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(remote), DCPS::MCK_RTPS, from_relay);
3071  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, outer->lock_, -1);
3073  }
3074 
3075  if (DCPS::transport_debug.log_messages) {
3076  message.hdr = header;
3077  }
3078 
3079  while (buff_.length() > 3) {
3080  const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
3081  ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER);
3082  const size_t start = buff_.length();
3083  CORBA::UShort submessageLength = 0;
3084  switch (subm) {
3085  case DATA: {
3086  DataSubmessage data;
3087  if (!(ser >> data)) {
3088  if (DCPS::DCPS_debug_level > 0) {
3089  ACE_ERROR((LM_ERROR,
3090  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3091  ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
3092  }
3093  return 0;
3094  }
3095  submessageLength = data.smHeader.submessageLength;
3096 
3097  if (DCPS::transport_debug.log_messages) {
3098  append_submessage(message, data);
3099  }
3100 
3101  if (data.writerId != ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER) {
3102  // Not our message: this could be the same multicast group used
3103  // for SEDP and other traffic.
3104  break;
3105  }
3106 
3107  ParameterList plist;
3108  if (data.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
3109  DCPS::EncapsulationHeader encap;
3110  DCPS::Encoding enc;
3111  if (!(ser >> encap) || !encap.to_encoding(enc, DCPS::MUTABLE) || enc.kind() != Encoding::KIND_XCDR1) {
3112  if (DCPS::DCPS_debug_level > 0) {
3113  ACE_ERROR((LM_ERROR,
3114  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3115  ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
3116  }
3117  return 0;
3118  }
3119  ser.encoding(enc);
3120  if (!(ser >> plist)) {
3121 
3122  if (DCPS::DCPS_debug_level > 0) {
3123  ACE_ERROR((LM_ERROR,
3124  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3125  ACE_TEXT("failed to deserialize data payload for SPDP\n")));
3126  }
3127  return 0;
3128  }
3129  } else {
3130  plist.length(1);
3131  const GUID_t guid = make_id(header.guidPrefix, ENTITYID_PARTICIPANT);
3132  plist[0].guid(guid);
3133  plist[0]._d(PID_PARTICIPANT_GUID);
3134  }
3135 
3136  DCPS::RcHandle<Spdp> outer = outer_.lock();
3137  if (outer) {
3138  outer->data_received(data, plist, remote);
3139  }
3140  break;
3141  }
3142  case INFO_DST: {
3143  if (DCPS::transport_debug.log_messages) {
3144  InfoDestinationSubmessage sm;
3145  if (!(ser >> sm)) {
3146  if (DCPS::DCPS_debug_level > 0) {
3147  ACE_ERROR((LM_ERROR,
3148  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3149  ACE_TEXT("failed to deserialize INFO_DST header for SPDP\n")));
3150  }
3151  return 0;
3152  }
3153  submessageLength = sm.smHeader.submessageLength;
3154  append_submessage(message, sm);
3155  break;
3156  }
3157  }
3158  // fallthrough
3159  default:
3160  SubmessageHeader smHeader;
3161  if (!(ser >> smHeader)) {
3162  if (DCPS::DCPS_debug_level > 0) {
3163  ACE_ERROR((LM_ERROR,
3164  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3165  ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
3166  }
3167  return 0;
3168  }
3169  submessageLength = smHeader.submessageLength;
3170  break;
3171  }
3172  if (submessageLength && buff_.length()) {
3173  const size_t read = start - buff_.length();
3174  if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
3175  if (!ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ
3176  - read))) {
3177  if (DCPS::DCPS_debug_level > 0) {
3178  ACE_ERROR((LM_ERROR,
3179  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3180  ACE_TEXT("failed to skip sub message length\n")));
3181  }
3182  return 0;
3183  }
3184  }
3185  } else if (!submessageLength) {
3186  break; // submessageLength of 0 indicates the last submessage
3187  }
3188  }
3189 
3190  } else if ((buff_.size() >= 4) && (ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4) == 0)) {
3191  // Handle some RTI protocol multicast to the same address
3192  return 0; // Ignore
3193  }
3194 
3195 #ifdef OPENDDS_SECURITY
3196  // Assume STUN
3197  if (!outer->initialized() || outer->shutting_down()) {
3198  return 0;
3199  }
3200 
3201 #ifndef ACE_RECVPKTINFO
3202  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() ")
3203  ACE_TEXT("potential STUN message received but this version of the ACE ")
3204  ACE_TEXT("library doesn't support the local_address extension in ")
3205  ACE_TEXT("ACE_SOCK_Dgram::recv\n")));
3206  ACE_NOTSUP_RETURN(0);
3207 #else
3208 
3209  DCPS::Serializer serializer(&buff_, STUN::encoding);
3210  STUN::Message message;
3211  message.block = &buff_;
3212  if (serializer >> message) {
3213  if (outer->sedp_->transport_inst()->count_messages()) {
3214  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(remote), DCPS::MCK_STUN, from_relay);
3215  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, outer->lock_, -1);
3217  }
3218 
3219  if (relay_srsm_.is_response(message)) {
3221  } else {
3222  DCPS::WeakRcHandle<ICE::Endpoint> endpoint = get_ice_endpoint();
3223  if (endpoint) {
3224  outer->ice_agent_->receive(endpoint, local, remote, message);
3225  }
3226  }
3227  }
3228 #endif
3229 #endif
3230 
3231  return 0;
3232 }
#define ACE_DEBUG(X)
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
#define ACE_ERROR(X)
const octet FLAG_E
Definition: RtpsCore.idl:521
size_t length(void) const
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const ParameterId_t PID_PARTICIPANT_GUID
Definition: RtpsCore.idl:289
void reset(void)
sequence< octet > key
bool is_response(const STUN::Message &message) const
Definition: RTPS/ICE/Ice.h:241
ACE_Message_Block buff_
Definition: Spdp.h:520
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
int ssize_t
const MessageCountKind MCK_STUN
char * rd_ptr(void) const
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3235
const ACE_SOCK_Dgram & choose_recv_socket(ACE_HANDLE h) const
Definition: Spdp.cpp:2966
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
ACE_HANDLE socket(int protocol_family, int type, int proto)
DCPS::InternalTransportStatistics transport_statistics_
Definition: Spdp.h:552
const EntityId_t ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER
Definition: GuidUtils.h:44
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
size_t size(void) const
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_CDR::UShort UShort
ssize_t read(ACE_HANDLE handle, void *buf, size_t len)
char * wr_ptr(void) const
ACE_HANDLE get_handle(void) const
StateChange receive(const STUN::Message &message)
Definition: Ice.cpp:156
sequence< Parameter > ParameterList
void append_submessage(RTPS::Message &message, const RTPS::InfoDestinationSubmessage &submessage)
Definition: MessageUtils.h:147
ACE_TEXT("TCP_Factory")
size_t space(void) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
int memcmp(const void *t, const void *s, size_t len)
bool valid_size(const ACE_INET_Addr &a)
Definition: Spdp.cpp:2983
const DCPS::GUID_t & guid() const
Definition: Spdp.h:94
const MessageCountKind MCK_RTPS
ACE_thread_mutex_t lock_
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define TheServiceParticipant
void process_relay_sra(ICE::ServerReflexiveStateMachine::StateChange)
Definition: Spdp.cpp:4172
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:530
#define ACE_NOTSUP_RETURN(FAILVALUE)
const octet FLAG_D
Definition: RtpsCore.idl:526

◆ host_addresses()

ICE::AddressListType OpenDDS::RTPS::Spdp::SpdpTransport::host_addresses ( ) const
virtual

Implements OpenDDS::ICE::Endpoint.

Definition at line 3247 of file Spdp.cpp.

References AF_INET, OpenDDS::DCPS::get_interface_addrs(), ACE_SOCK::get_local_addr(), ACE_INET_Addr::get_port_number(), ACE_INET_Addr::is_any(), and unicast_socket_.

3248 {
3249  ICE::AddressListType addresses;
3250  ACE_INET_Addr addr;
3251 
3253  if (addr != ACE_INET_Addr()) {
3254  if (addr.is_any()) {
3255  ICE::AddressListType addrs;
3257  for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
3258  if (pos->get_type() == AF_INET) {
3259  pos->set_port_number(addr.get_port_number());
3260  addresses.push_back(*pos);
3261  }
3262  }
3263  } else {
3264  addresses.push_back(addr);
3265  }
3266  }
3267 
3268 #ifdef ACE_HAS_IPV6
3269  unicast_ipv6_socket_.get_local_addr(addr);
3270  if (addr != ACE_INET_Addr()) {
3271  if (addr.is_any()) {
3272  ICE::AddressListType addrs;
3274  for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
3275  if (pos->get_type() == AF_INET6) {
3276  pos->set_port_number(addr.get_port_number());
3277  addresses.push_back(*pos);
3278  }
3279  }
3280  } else {
3281  addresses.push_back(addr);
3282  }
3283  }
3284 #endif
3285 
3286  return addresses;
3287 }
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
bool is_any(void) const
void get_interface_addrs(OPENDDS_VECTOR(ACE_INET_Addr)&addrs)
int get_local_addr(ACE_Addr &) const
#define AF_INET
u_short get_port_number(void) const

◆ ice_connect()

void OpenDDS::RTPS::Spdp::SpdpTransport::ice_connect ( const ICE::GuidSetType guids,
const ACE_INET_Addr addr 
)
virtual

Reimplemented from OpenDDS::ICE::Endpoint.

Definition at line 3361 of file Spdp.cpp.

References outer_.

3362 {
3363  DCPS::RcHandle<Spdp> outer = outer_.lock();
3364  if (!outer) return;
3365 
3366  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<IceConnect>(outer, guids, addr, true));
3367 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ ice_disconnect()

void OpenDDS::RTPS::Spdp::SpdpTransport::ice_disconnect ( const ICE::GuidSetType guids,
const ACE_INET_Addr addr 
)
virtual

Reimplemented from OpenDDS::ICE::Endpoint.

Definition at line 3383 of file Spdp.cpp.

References outer_.

3384 {
3385  DCPS::RcHandle<Spdp> outer = outer_.lock();
3386  if (!outer) return;
3387 
3388  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<IceConnect>(outer, guids, addr, false));
3389 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ on_data_available()

void OpenDDS::RTPS::Spdp::SpdpTransport::on_data_available ( DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > >  reader)

Definition at line 3603 of file Spdp.cpp.

References ACE_GUARD, DDS::ANY_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, DDS::LENGTH_UNLIMITED, multicast_address_, multicast_interface_, multicast_manager_, multicast_socket_, network_interface_address_reader_, outer_, OpenDDS::DCPS::MulticastManager::process(), ACE_Event_Handler::reactor(), and shorten_local_sender_delay_i().

3604 {
3605  DCPS::RcHandle<Spdp> outer = outer_.lock();
3606  if (!outer) return;
3607 
3608  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
3609  if (outer->shutting_down()) {
3610  return;
3611  }
3612 
3613  if (outer->shutdown_flag_) {
3614  return;
3615  }
3616 
3617  DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress>::SampleSequence samples;
3618  DCPS::InternalSampleInfoSequence infos;
3619 
3621 
3622  if (multicast_manager_.process(samples,
3623  infos,
3625  reactor(),
3626  this,
3627  DCPS::NetworkAddress(multicast_address_),
3629 #ifdef ACE_HAS_IPV6
3630  , DCPS::NetworkAddress(multicast_ipv6_address_),
3631  multicast_ipv6_socket_
3632 #endif
3633  )) {
3635  }
3636 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DCPS::MulticastManager multicast_manager_
Definition: Spdp.h:518
const SampleStateMask ANY_SAMPLE_STATE
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
const InstanceStateMask ANY_INSTANCE_STATE
const ViewStateMask ANY_VIEW_STATE
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
Definition: Spdp.h:533
virtual ACE_Reactor * reactor(void) const
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510
ACE_INET_Addr multicast_address_
Definition: Spdp.h:509
const long LENGTH_UNLIMITED
ACE_thread_mutex_t lock_
OPENDDS_STRING multicast_interface_
Definition: Spdp.h:508
bool process(InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
Returns true if at least one group was joined.

◆ open()

void OpenDDS::RTPS::Spdp::SpdpTransport::open ( const DCPS::ReactorTask_rch reactor_task,
const DCPS::JobQueue_rch job_queue 
)

Definition at line 2411 of file Spdp.cpp.

References directed_send_task_, OpenDDS::DCPS::DataReaderQosBuilder::durability_transient_local(), OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), get_ice_endpoint(), OpenDDS::DCPS::ReactorTask::get_reactor(), handshake_deadline_task_, handshake_resend_task_, ice_endpoint_added_, OpenDDS::DCPS::ReactorTask::interceptor(), OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >::job_queue(), lease_expiration_task_, local_send_task_, network_interface_address_reader_, outer_, process_handshake_deadlines(), process_handshake_resends(), process_lease_expirations(), OpenDDS::DCPS::rchandle_from(), ACE_Event_Handler::reactor(), OpenDDS::DCPS::ref(), relay_spdp_task_, relay_stun_task(), relay_stun_task_, OpenDDS::DCPS::DataReaderQosBuilder::reliability_reliable(), send_directed(), send_local(), send_relay(), TheServiceParticipant, thread_status_task(), and thread_status_task_.

2413 {
2414  DCPS::RcHandle<Spdp> outer = outer_.lock();
2415  if (!outer) return;
2416 
2417 #ifdef OPENDDS_SECURITY
2418  // Add the endpoint before any sending and receiving occurs.
2419  DCPS::WeakRcHandle<ICE::Endpoint> endpoint = get_ice_endpoint();
2420  if (endpoint) {
2421  outer->ice_agent_->add_endpoint(endpoint);
2422  ice_endpoint_added_ = true;
2423  outer->ice_agent_->add_local_agent_info_listener(endpoint, outer->guid_, DCPS::static_rchandle_cast<ICE::AgentInfoListener>(outer));
2424  }
2425 #endif
2426 
2427  reactor(reactor_task->get_reactor());
2428  reactor_task->interceptor()->execute_or_enqueue(DCPS::make_rch<RegisterHandlers>(rchandle_from(this), reactor_task));
2429 
2430 #ifdef OPENDDS_SECURITY
2431  // Now that the endpoint is added, SEDP can write the SPDP info.
2432  if (outer->is_security_enabled()) {
2433  outer->write_secure_updates();
2434  }
2435 #endif
2436 
2437  local_send_task_ = DCPS::make_rch<SpdpMulti>(reactor_task->interceptor(), outer->config_->resend_period(), rchandle_from(this), &SpdpTransport::send_local);
2438 
2439  if (outer->config_->periodic_directed_spdp()) {
2441  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2443  }
2444 
2446  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2448 
2449 #ifdef OPENDDS_SECURITY
2451  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2454  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2456 
2458  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2461  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2463 #endif
2464 
2465 #ifndef DDS_HAS_MINIMUM_BIT
2466  // internal thread bit reporting
2467  if (TheServiceParticipant->get_thread_status_manager().update_thread_status()) {
2468  thread_status_task_ = DCPS::make_rch<SpdpPeriodic>(reactor_task->interceptor(), ref(*this), &SpdpTransport::thread_status_task);
2469  }
2470 #endif /* DDS_HAS_MINIMUM_BIT */
2471 
2472  this->job_queue(job_queue);
2473  network_interface_address_reader_ = DCPS::make_rch<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> >(DCPS::DataReaderQosBuilder().reliability_reliable().durability_transient_local(), rchandle_from(this));
2474  TheServiceParticipant->network_interface_address_topic()->connect(network_interface_address_reader_);
2475 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
void relay_stun_task(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4156
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3235
void process_handshake_deadlines(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4321
void send_directed(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4254
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
void thread_status_task(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4286
DCPS::RcHandle< SpdpSporadic > directed_send_task_
Definition: Spdp.h:527
void send_local(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4249
void process_handshake_resends(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4329
DCPS::RcHandle< SpdpSporadic > handshake_deadline_task_
Definition: Spdp.h:536
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
Definition: Spdp.h:533
virtual ACE_Reactor * reactor(void) const
DCPS::RcHandle< SpdpPeriodic > thread_status_task_
Definition: Spdp.h:532
void process_lease_expirations(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4278
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
DCPS::RcHandle< SpdpSporadic > lease_expiration_task_
Definition: Spdp.h:530
DCPS::RcHandle< SpdpSporadic > handshake_resend_task_
Definition: Spdp.h:538
#define TheServiceParticipant
void send_relay(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4233

◆ open_unicast_socket()

bool OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket ( u_short  port_common,
u_short  participant_id 
)

Definition at line 3400 of file Spdp.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ENOTSUP, OpenDDS::DCPS::LogLevel::Error, ACE_SOCK::get_local_addr(), ACE_INET_Addr::get_port_number(), IPPROTO_IP, LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::log_level, ACE_SOCK_Dgram::open(), outer_, PF_INET, ACE_SOCK::set_option(), ACE_INET_Addr::set_port_number(), OpenDDS::DCPS::set_socket_multicast_ttl(), SO_RCVBUF, SO_SNDBUF, SOL_SOCKET, uni_port_, and unicast_socket_.

Referenced by SpdpTransport().

3402 {
3403  DCPS::RcHandle<Spdp> outer = outer_.lock();
3404  if (!outer) {
3405  throw std::runtime_error("couldn't get Spdp");
3406  }
3407 
3408  ACE_INET_Addr local_addr = outer->config_->spdp_local_address();
3409  const bool fixed_port = local_addr.get_port_number();
3410 
3411  if (fixed_port) {
3412  uni_port_ = local_addr.get_port_number();
3413  } else if (!outer->config_->spdp_request_random_port()) {
3414  const ACE_UINT32 port = static_cast<ACE_UINT32>(port_common) + outer->config_->d1() +
3415  outer->config_->pg() * participant_id;
3416  if (port > 65535) {
3417  if (log_level >= LogLevel::Error) {
3418  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket: "
3419  "port %u is too high\n", port));
3420  }
3421  throw std::runtime_error("failed to open unicast port for SPDP (port too high)");
3422  }
3423  uni_port_ = static_cast<unsigned short>(port);
3424  local_addr.set_port_number(uni_port_);
3425  }
3426 
3427  if (unicast_socket_.open(local_addr, PF_INET) != 0) {
3428  if (fixed_port) {
3429  if (log_level >= LogLevel::Error) {
3430  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket: "
3431  "failed to open %C %p.\n",
3432  LogAddr(local_addr).c_str(), ACE_TEXT("ACE_SOCK_Dgram::open")));
3433  }
3434  throw std::runtime_error("failed to open unicast port for SPDP");
3435  }
3436  if (DCPS::DCPS_debug_level > 3) {
3437  ACE_DEBUG((LM_DEBUG,
3438  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
3439  ACE_TEXT("failed to open %C %p. ")
3440  ACE_TEXT("Trying next participantId...\n"),
3441  DCPS::LogAddr(local_addr).c_str(), ACE_TEXT("ACE_SOCK_Dgram::open")));
3442  }
3443  return false;
3444  }
3445 
3446  if (!fixed_port && outer->config_->spdp_request_random_port()) {
3447  ACE_INET_Addr addr;
3448  if (unicast_socket_.get_local_addr(addr) == 0) {
3449  uni_port_ = addr.get_port_number();
3450  }
3451  }
3452 
3453  if (DCPS::DCPS_debug_level > 3) {
3454  ACE_DEBUG((LM_INFO,
3455  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
3456  ACE_TEXT("opened unicast socket on port %d\n"),
3457  uni_port_));
3458  }
3459 
3460  if (!DCPS::set_socket_multicast_ttl(unicast_socket_, outer->config_->ttl())) {
3461  if (DCPS::DCPS_debug_level > 0) {
3462  ACE_ERROR((LM_ERROR,
3463  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
3464  ACE_TEXT("failed to set TTL value to %d ")
3465  ACE_TEXT("for port:%hu %p\n"),
3466  outer->config_->ttl(), uni_port_, ACE_TEXT("DCPS::set_socket_multicast_ttl:")));
3467  }
3468  throw std::runtime_error("failed to set TTL");
3469  }
3470 
3471  const int send_buffer_size = outer->config()->send_buffer_size();
3472  if (send_buffer_size > 0) {
3473  if (unicast_socket_.set_option(SOL_SOCKET,
3474  SO_SNDBUF,
3475  (void *) &send_buffer_size,
3476  sizeof(send_buffer_size)) < 0
3477  && errno != ENOTSUP) {
3478  if (DCPS::DCPS_debug_level > 0) {
3479  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - failed to set the send buffer size to %d errno %m\n"), send_buffer_size));
3480  }
3481  throw std::runtime_error("failed to set send buffer size");
3482  }
3483  }
3484 
3485  const int recv_buffer_size = outer->config()->recv_buffer_size();
3486  if (recv_buffer_size > 0) {
3487  if (unicast_socket_.set_option(SOL_SOCKET,
3488  SO_RCVBUF,
3489  (void *) &recv_buffer_size,
3490  sizeof(recv_buffer_size)) < 0
3491  && errno != ENOTSUP) {
3492  if (DCPS::DCPS_debug_level > 0) {
3493  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - failed to set the recv buffer size to %d errno %m\n"), recv_buffer_size));
3494  }
3495  throw std::runtime_error("failed to set recv buffer size");
3496  }
3497  }
3498 
3499 #ifdef ACE_RECVPKTINFO
3500  int sockopt = 1;
3501  if (unicast_socket_.set_option(IPPROTO_IP, ACE_RECVPKTINFO, &sockopt, sizeof sockopt) == -1) {
3502  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket: set_option: %m\n")), false);
3503  }
3504 #endif
3505 
3506  return true;
3507 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ENOTSUP
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
int set_option(int level, int option, void *optval, int optlen) const
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
int get_local_addr(ACE_Addr &) const
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS_Dcps_Export LogLevel log_level
u_short get_port_number(void) const
int open(const ACE_Addr &local, int protocol_family=ACE_PROTOCOL_FAMILY_INET, int protocol=0, int reuse_addr=0, int ipv6_only=0)
void set_port_number(u_short, int encode=1)
#define ACE_ERROR_RETURN(X, Y)

◆ OPENDDS_LIST()

OpenDDS::RTPS::Spdp::SpdpTransport::OPENDDS_LIST ( DCPS::GUID_t  )

◆ OPENDDS_SET()

OpenDDS::RTPS::Spdp::SpdpTransport::OPENDDS_SET ( ACE_INET_Addr  )

Referenced by send().

◆ process_handshake_deadlines()

void OpenDDS::RTPS::Spdp::SpdpTransport::process_handshake_deadlines ( const DCPS::MonotonicTimePoint now)

Definition at line 4321 of file Spdp.cpp.

References outer_.

Referenced by open().

4322 {
4323  DCPS::RcHandle<Spdp> outer = outer_.lock();
4324  if (!outer) return;
4325 
4326  outer->process_handshake_deadlines(now);
4327 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ process_handshake_resends()

void OpenDDS::RTPS::Spdp::SpdpTransport::process_handshake_resends ( const DCPS::MonotonicTimePoint now)

Definition at line 4329 of file Spdp.cpp.

References outer_.

Referenced by open().

4330 {
4331  DCPS::RcHandle<Spdp> outer = outer_.lock();
4332  if (!outer) return;
4333 
4334  outer->process_handshake_resends(now);
4335 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ process_lease_expirations()

void OpenDDS::RTPS::Spdp::SpdpTransport::process_lease_expirations ( const DCPS::MonotonicTimePoint now)

Definition at line 4278 of file Spdp.cpp.

References outer_.

Referenced by open().

4279 {
4280  DCPS::RcHandle<Spdp> outer = outer_.lock();
4281  if (!outer) return;
4282 
4283  outer->process_lease_expirations(now);
4284 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ process_relay_sra()

void OpenDDS::RTPS::Spdp::SpdpTransport::process_relay_sra ( ICE::ServerReflexiveStateMachine::StateChange  sc)

Definition at line 4172 of file Spdp.cpp.

References OpenDDS::DCPS::ConnectionRecord::address, OpenDDS::DCPS::ConnectionRecord::guid, OpenDDS::ICE::Configuration::instance(), OpenDDS::DCPS::ConnectionRecord::latency, OpenDDS::ICE::ServerReflexiveStateMachine::latency(), OpenDDS::ICE::ServerReflexiveStateMachine::latency_available(), outer_, OpenDDS::DCPS::ConnectionRecord::protocol, relay_srsm_, relay_stun_task_falloff_, OpenDDS::DCPS::RTPS_RELAY_STUN_PROTOCOL, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Change, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_None, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Set, OpenDDS::ICE::ServerReflexiveStateMachine::SRSM_Unset, OpenDDS::ICE::ServerReflexiveStateMachine::stun_server_address(), OpenDDS::DCPS::TimeDuration::to_dds_duration(), OpenDDS::ICE::ServerReflexiveStateMachine::unset_stun_server_address(), and OpenDDS::DCPS::TimeDuration::zero_value.

Referenced by handle_input(), and relay_stun_task().

4173 {
4174 #ifndef DDS_HAS_MINIMUM_BIT
4175  DCPS::RcHandle<Spdp> outer = outer_.lock();
4176  if (!outer) return;
4177 
4178  DCPS::ConnectionRecord connection_record;
4179  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
4180  connection_record.protocol = DCPS::RTPS_RELAY_STUN_PROTOCOL;
4181  connection_record.latency = DCPS::TimeDuration::zero_value.to_dds_duration();
4182 
4183  switch (sc) {
4186  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4187  connection_record.latency = relay_srsm_.latency().to_dds_duration();
4189  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, true, connection_record));
4190  }
4191  break;
4194  // Lengthen to normal period.
4195  relay_stun_task_falloff_.set(ICE::Configuration::instance()->server_reflexive_address_period());
4196  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4197  connection_record.latency = relay_srsm_.latency().to_dds_duration();
4199  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, true, connection_record));
4200  break;
4202  connection_record.address = DCPS::LogAddr(relay_srsm_.unset_stun_server_address()).c_str();
4203  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, false, connection_record));
4204  break;
4205  }
4206 #else
4207  ACE_UNUSED_ARG(sc);
4208 #endif
4209 }
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
const ACE_INET_Addr & stun_server_address() const
Definition: RTPS/ICE/Ice.h:239
DDS::Duration_t to_dds_duration() const
static const TimeDuration zero_value
Definition: TimeDuration.h:31
DCPS::TimeDuration latency() const
Definition: RTPS/ICE/Ice.h:246
static Configuration * instance()
Definition: Ice.cpp:109
const ACE_INET_Addr & unset_stun_server_address() const
Definition: RTPS/ICE/Ice.h:238
const string RTPS_RELAY_STUN_PROTOCOL

◆ register_handlers()

void OpenDDS::RTPS::Spdp::SpdpTransport::register_handlers ( const DCPS::ReactorTask_rch reactor_task)

Definition at line 2524 of file Spdp.cpp.

References ACE_GUARD, OpenDDS::DCPS::ReactorTask::get_reactor(), outer_, ACE_Event_Handler::reactor(), register_unicast_socket(), and unicast_socket_.

2525 {
2526  DCPS::RcHandle<Spdp> outer = outer_.lock();
2527  if (!outer) {
2528  return;
2529  }
2530  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2531 
2532  if (outer->shutdown_flag_) {
2533  return;
2534  }
2535 
2536  ACE_Reactor* const reactor = reactor_task->get_reactor();
2537  register_unicast_socket(reactor, unicast_socket_, "IPV4");
2538 #ifdef ACE_HAS_IPV6
2539  register_unicast_socket(reactor, unicast_ipv6_socket_, "IPV6");
2540 #endif
2541 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
void register_unicast_socket(ACE_Reactor *reactor, ACE_SOCK_Dgram &socket, const char *what)
Definition: Spdp.cpp:2502
virtual ACE_Reactor * reactor(void) const
ACE_thread_mutex_t lock_

◆ register_unicast_socket()

void OpenDDS::RTPS::Spdp::SpdpTransport::register_unicast_socket ( ACE_Reactor reactor,
ACE_SOCK_Dgram socket,
const char *  what 
)

Definition at line 2502 of file Spdp.cpp.

References ACE_IPC_SAP::control(), ACE_IPC_SAP::get_handle(), ACE_Event_Handler::READ_MASK, and ACE_Reactor::register_handler().

Referenced by register_handlers().

2504 {
2505 #ifdef ACE_WIN32
2506  // By default Winsock will cause reads to fail with "connection reset"
2507  // when UDP sends result in ICMP "port unreachable" messages.
2508  // The transport framework is not set up for this since returning <= 0
2509  // from our receive_bytes causes the framework to close down the datalink
2510  // which in this case is used to receive from multiple peers.
2511  {
2512  BOOL recv_udp_connreset = FALSE;
2513  socket.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
2514  }
2515 #endif
2516 
2517  if (reactor->register_handler(socket.get_handle(),
2518  this, ACE_Event_Handler::READ_MASK) != 0) {
2519  throw std::runtime_error(
2520  (DCPS::String("failed to register ") + what + " unicast input handler").c_str());
2521  }
2522 }
std::string String
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int control(int cmd, void *) const
ACE_HANDLE get_handle(void) const

◆ relay_stun_task()

void OpenDDS::RTPS::Spdp::SpdpTransport::relay_stun_task ( const DCPS::MonotonicTimePoint now)

Definition at line 4156 of file Spdp.cpp.

References OpenDDS::ICE::Configuration::instance(), OpenDDS::ICE::ServerReflexiveStateMachine::message(), outer_, process_relay_sra(), relay_srsm_, relay_stun_task_, relay_stun_task_falloff_, OpenDDS::ICE::ServerReflexiveStateMachine::send(), send(), and OpenDDS::ICE::Configuration::server_reflexive_indication_count().

Referenced by open().

4157 {
4158  DCPS::RcHandle<Spdp> outer = outer_.lock();
4159  if (!outer) return;
4160 
4161  if (outer->config_->use_rtps_relay() || outer->config_->rtps_relay_only()) {
4162  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
4163  if (relay_address != ACE_INET_Addr()) {
4165  send(relay_address, relay_srsm_.message());
4166  relay_stun_task_falloff_.advance(ICE::Configuration::instance()->server_reflexive_address_period());
4167  relay_stun_task_->schedule(relay_stun_task_falloff_.get());
4168  }
4169  }
4170 }
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
void server_reflexive_indication_count(size_t x)
Definition: RTPS/ICE/Ice.h:127
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
StateChange send(const ACE_INET_Addr &address, size_t indication_count_limit, const DCPS::GuidPrefix_t &guid_prefix)
Definition: Ice.cpp:128
const STUN::Message & message() const
Definition: RTPS/ICE/Ice.h:237
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2883
static Configuration * instance()
Definition: Ice.cpp:109
void process_relay_sra(ICE::ServerReflexiveStateMachine::StateChange)
Definition: Spdp.cpp:4172

◆ send() [1/3]

void OpenDDS::RTPS::Spdp::SpdpTransport::send ( WriteFlags  flags,
const ACE_INET_Addr local_address = ACE_INET_Addr() 
)

Definition at line 2883 of file Spdp.cpp.

References OPENDDS_SET(), outer_, SEND_DIRECT, SEND_MULTICAST, and SEND_RELAY.

Referenced by dispose_unregister(), relay_stun_task(), and write_i().

2884 {
2885  DCPS::RcHandle<Spdp> outer = outer_.lock();
2886  if (!outer) return;
2887 
2888  if ((flags & SEND_MULTICAST) && !outer->config_->rtps_relay_only()) {
2889  typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
2890  for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
2891  send(*iter, false);
2892  }
2893  }
2894 
2895  if (((flags & SEND_DIRECT) && !outer->config_->rtps_relay_only()) &&
2896  local_address != ACE_INET_Addr()) {
2897  send(local_address, false);
2898  }
2899 
2900  if ((flags & SEND_RELAY) || outer->config_->rtps_relay_only()) {
2901  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
2902  if (relay_address != ACE_INET_Addr()) {
2903  send(relay_address, true);
2904  }
2905  }
2906 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2883
static const WriteFlags SEND_DIRECT
Definition: Spdp.h:435
OPENDDS_SET(ACE_INET_Addr) send_addrs_
static const WriteFlags SEND_MULTICAST
Definition: Spdp.h:433

◆ send() [2/3]

ssize_t OpenDDS::RTPS::Spdp::SpdpTransport::send ( const ACE_INET_Addr addr,
bool  relay 
)

Definition at line 2921 of file Spdp.cpp.

References ACE_ERROR, ACE_TEXT(), choose_send_socket(), OpenDDS::DCPS::DCPS_debug_level, ENETUNREACH, OpenDDS::DCPS::ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER, ACE_Message_Block::length(), LM_WARNING, OpenDDS::DCPS::make_id(), OpenDDS::DCPS::MCK_RTPS, OpenDDS::DCPS::InternalTransportStatistics::message_count, network_is_unreachable_, outer_, ACE_Message_Block::rd_ptr(), ACE_SOCK_Dgram::send(), socket(), transport_statistics_, wbuff_, and OpenDDS::DCPS::InternalTransportStatistics::writer_resend_count.

2922 {
2923  DCPS::RcHandle<Spdp> outer = outer_.lock();
2924  if (!outer) return -1;
2925 
2926 #ifdef OPENDDS_TESTING_FEATURES
2927  if (outer->sedp_->transport_inst()->should_drop(wbuff_.length())) {
2928  return wbuff_.length();
2929  }
2930 #endif
2931 
2932  const ACE_SOCK_Dgram& socket = choose_send_socket(addr);
2933  const ssize_t res = socket.send(wbuff_.rd_ptr(), wbuff_.length(), addr);
2934  if (outer->sedp_->transport_inst()->count_messages()) {
2936  }
2937  if (res < 0) {
2938  if (outer->sedp_->transport_inst()->count_messages()) {
2939  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(addr), DCPS::MCK_RTPS, relay);
2941  }
2942  const int err = errno;
2943  if (err != ENETUNREACH || !network_is_unreachable_) {
2944  errno = err;
2945  if (DCPS::DCPS_debug_level > 0) {
2946  ACE_ERROR((LM_WARNING,
2947  ACE_TEXT("(%P|%t) WARNING: Spdp::SpdpTransport::send() - ")
2948  ACE_TEXT("destination %C failed send: %m\n"), DCPS::LogAddr(addr).c_str()));
2949  }
2950  }
2951  if (err == ENETUNREACH) {
2952  network_is_unreachable_ = true;
2953  }
2954  } else {
2955  if (outer->sedp_->transport_inst()->count_messages()) {
2956  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(addr), DCPS::MCK_RTPS, relay);
2958  }
2959  network_is_unreachable_ = false;
2960  }
2961 
2962  return res;
2963 }
#define ACE_ERROR(X)
size_t length(void) const
sequence< octet > key
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
int ssize_t
ACE_Message_Block wbuff_
Definition: Spdp.h:520
char * rd_ptr(void) const
ACE_HANDLE socket(int protocol_family, int type, int proto)
DCPS::InternalTransportStatistics transport_statistics_
Definition: Spdp.h:552
const EntityId_t ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER
Definition: GuidUtils.h:44
const ACE_SOCK_Dgram & choose_send_socket(const ACE_INET_Addr &addr) const
Definition: Spdp.cpp:2909
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const MessageCountKind MCK_RTPS
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const

◆ send() [3/3]

void OpenDDS::RTPS::Spdp::SpdpTransport::send ( const ACE_INET_Addr address,
const STUN::Message message 
)
virtual

Implements OpenDDS::ICE::Endpoint.

Definition at line 3290 of file Spdp.cpp.

References OpenDDS::DCPS::InternalDataReaderListener< DCPS::NetworkInterfaceAddress >::job_queue(), outer_, and OpenDDS::DCPS::rchandle_from().

3291 {
3292  DCPS::RcHandle<Spdp> outer = outer_.lock();
3293  if (!outer) return;
3294 
3295  DCPS::RcHandle<DCPS::JobQueue> job_queue = outer->sedp_->job_queue();
3296  if (job_queue) {
3297  job_queue->enqueue(DCPS::make_rch<SendStun>(rchandle_from(this), address, message));
3298  }
3299 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ send_directed()

void OpenDDS::RTPS::Spdp::SpdpTransport::send_directed ( const DCPS::MonotonicTimePoint now)

Definition at line 4254 of file Spdp.cpp.

References ACE_GUARD, directed_send_task_, outer_, SEND_DIRECT, SEND_RELAY, and write_i().

Referenced by open().

4255 {
4256  DCPS::RcHandle<Spdp> outer = outer_.lock();
4257  if (!outer) return;
4258 
4259  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
4260 
4261  while (!directed_guids_.empty()) {
4262  const DCPS::GUID_t id = directed_guids_.front();
4263  directed_guids_.pop_front();
4264 
4265  DiscoveredParticipantConstIter pos = outer->participants_.find(id);
4266  if (pos == outer->participants_.end()) {
4267  continue;
4268  }
4269 
4270  write_i(id, pos->second.last_recv_address_, SEND_DIRECT | SEND_RELAY);
4271  directed_guids_.push_back(id);
4272  directed_send_task_->schedule(outer->config_->resend_period() * (1.0 / directed_guids_.size()));
4273  break;
4274  }
4275 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DiscoveredParticipantMap::const_iterator DiscoveredParticipantConstIter
Definition: Spdp.h:71
DCPS::RcHandle< SpdpSporadic > directed_send_task_
Definition: Spdp.h:527
void write_i(WriteFlags flags)
Definition: Spdp.cpp:2681
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434
static const WriteFlags SEND_DIRECT
Definition: Spdp.h:435
ACE_thread_mutex_t lock_

◆ send_local()

void OpenDDS::RTPS::Spdp::SpdpTransport::send_local ( const DCPS::MonotonicTimePoint now)

Definition at line 4249 of file Spdp.cpp.

References SEND_MULTICAST, and write().

Referenced by open().

4250 {
4252 }
void write(WriteFlags flags)
Definition: Spdp.cpp:2671
static const WriteFlags SEND_MULTICAST
Definition: Spdp.h:433

◆ send_relay()

void OpenDDS::RTPS::Spdp::SpdpTransport::send_relay ( const DCPS::MonotonicTimePoint now)

Definition at line 4233 of file Spdp.cpp.

References outer_, relay_spdp_task_, relay_spdp_task_falloff_, SEND_RELAY, and write().

Referenced by open().

4234 {
4235  DCPS::RcHandle<Spdp> outer = outer_.lock();
4236  if (!outer) return;
4237 
4238  if (outer->config_->use_rtps_relay() || outer->config_->rtps_relay_only()) {
4239  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
4240  if (relay_address != ACE_INET_Addr()) {
4241  write(SEND_RELAY);
4242  relay_spdp_task_falloff_.advance(outer->config_->spdp_rtps_relay_send_period());
4243  relay_spdp_task_->schedule(relay_spdp_task_falloff_.get());
4244  }
4245  }
4246 }
DCPS::FibonacciSequence< TimeDuration > relay_spdp_task_falloff_
Definition: Spdp.h:541
void write(WriteFlags flags)
Definition: Spdp.cpp:2671
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434

◆ shorten_local_sender_delay_i()

void OpenDDS::RTPS::Spdp::SpdpTransport::shorten_local_sender_delay_i ( )

Definition at line 2659 of file Spdp.cpp.

References local_send_task_, and outer_.

Referenced by on_data_available().

2660 {
2661  DCPS::RcHandle<Spdp> outer = outer_.lock();
2662  if (!outer) return;
2663 
2664  if (local_send_task_) {
2665  const TimeDuration quick_resend = outer->config_->resend_period() * outer->quick_resend_ratio_;
2666  local_send_task_->enable(std::max(quick_resend, outer->min_resend_delay_));
2667  }
2668 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525

◆ stun_server_address()

ACE_INET_Addr OpenDDS::RTPS::Spdp::SpdpTransport::stun_server_address ( ) const
virtual

Implements OpenDDS::ICE::Endpoint.

Definition at line 3353 of file Spdp.cpp.

References outer_.

3354 {
3355  DCPS::RcHandle<Spdp> outer = outer_.lock();
3356  return outer ? outer->config_->spdp_stun_server_address() : ACE_INET_Addr();
3357 }
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502

◆ thread_status_task()

void OpenDDS::RTPS::Spdp::SpdpTransport::thread_status_task ( const DCPS::MonotonicTimePoint now)

Definition at line 4286 of file Spdp.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::DCPS_debug_level, last_harvest, LM_DEBUG, DDS::NEW_VIEW_STATE, outer_, TheServiceParticipant, OpenDDS::DCPS::InternalThreadBuiltinTopicData::thread_id, and OpenDDS::DCPS::InternalThreadBuiltinTopicData::utilization.

Referenced by open().

4287 {
4288  ACE_UNUSED_ARG(now);
4289 #ifndef DDS_HAS_MINIMUM_BIT
4290  DCPS::RcHandle<Spdp> outer = outer_.lock();
4291  if (!outer) return;
4292 
4293  if (DCPS::DCPS_debug_level > 4) {
4294  ACE_DEBUG((LM_DEBUG,
4295  "(%P|%t) Spdp::SpdpTransport::thread_status_task(): Updating internal thread status BIT.\n"));
4296  }
4297 
4298  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
4299 
4300  typedef DCPS::ThreadStatusManager::List List;
4301  List running;
4302  List removed;
4303  TheServiceParticipant->get_thread_status_manager().harvest(last_harvest, running, removed);
4304  last_harvest = now;
4305  for (List::const_iterator i = removed.begin(); i != removed.end(); ++i) {
4306  DCPS::InternalThreadBuiltinTopicData data;
4307  data.thread_id = i->bit_key().c_str();
4308  outer->bit_subscriber_->remove_thread_status(data);
4309  }
4310  for (List::const_iterator i = running.begin(); i != running.end(); ++i) {
4311  DCPS::InternalThreadBuiltinTopicData data;
4312  data.thread_id = i->bit_key().c_str();
4313  data.utilization = i->utilization(now);
4314  outer->bit_subscriber_->add_thread_status(data, DDS::NEW_VIEW_STATE, i->timestamp());
4315  }
4316 
4317 #endif /* DDS_HAS_MINIMUM_BIT */
4318 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
const ViewStateKind NEW_VIEW_STATE
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_thread_mutex_t lock_
DCPS::MonotonicTimePoint last_harvest
Definition: Spdp.h:553
#define TheServiceParticipant

◆ write()

void OpenDDS::RTPS::Spdp::SpdpTransport::write ( WriteFlags  flags)

Definition at line 2671 of file Spdp.cpp.

References ACE_GUARD, outer_, and write_i().

Referenced by send_local(), and send_relay().

2672 {
2673  DCPS::RcHandle<Spdp> outer = outer_.lock();
2674  if (!outer) return;
2675 
2676  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2677  write_i(flags);
2678 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
void write_i(WriteFlags flags)
Definition: Spdp.cpp:2681
ACE_thread_mutex_t lock_

◆ write_i() [1/2]

void OpenDDS::RTPS::Spdp::SpdpTransport::write_i ( WriteFlags  flags)

Definition at line 2681 of file Spdp.cpp.

References ACE_ERROR, ACE_TEXT(), data_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Security::DPDK_ENHANCED, OpenDDS::Security::DPDK_ORIGINAL, OpenDDS::DCPS::Serializer::encoding(), get_ice_endpoint(), hdr_, LM_ERROR, OpenDDS::DCPS::MUTABLE, outer_, ACE_Message_Block::reset(), OpenDDS::RTPS::SEDP_AGENT_INFO_KEY, send(), seq_, OpenDDS::RTPS::SPDP_AGENT_INFO_KEY, OpenDDS::RTPS::ParameterListConverter::to_param_list(), OpenDDS::RTPS::to_rtps_seqnum(), wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by send_directed(), and write().

2682 {
2683  DCPS::RcHandle<Spdp> outer = outer_.lock();
2684  if (!outer) return;
2685 
2686  if (!outer->config_->undirected_spdp()) {
2687  return;
2688  }
2689 
2690  const ParticipantData_t pdata = outer->build_local_pdata(
2691 #ifdef OPENDDS_SECURITY
2692  true, outer->is_security_enabled() ? Security::DPDK_ENHANCED : Security::DPDK_ORIGINAL
2693 #endif
2694  );
2695 
2697  ++seq_;
2698 
2699  ParameterList plist;
2700  if (!ParameterListConverter::to_param_list(pdata, plist)) {
2701  if (DCPS::DCPS_debug_level > 0) {
2702  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2703  ACE_TEXT("Spdp::SpdpTransport::write() - ")
2704  ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
2705  ACE_TEXT("to ParameterList\n")));
2706  }
2707  return;
2708  }
2709 
2710 #ifdef OPENDDS_SECURITY
2711  if (!outer->is_security_enabled()) {
2712  ICE::AgentInfoMap ai_map;
2713  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = outer->sedp_->get_ice_endpoint();
2714  if (sedp_endpoint) {
2715  ai_map[SEDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(sedp_endpoint);
2716  }
2717  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = get_ice_endpoint();
2718  if (spdp_endpoint) {
2719  ai_map[SPDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(spdp_endpoint);
2720  }
2721 
2722  if (!ParameterListConverter::to_param_list(ai_map, plist)) {
2723  if (DCPS::DCPS_debug_level > 0) {
2724  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2725  ACE_TEXT("Spdp::SpdpTransport::write() - ")
2726  ACE_TEXT("failed to convert from ICE::AgentInfo ")
2727  ACE_TEXT("to ParameterList\n")));
2728  }
2729  return;
2730  }
2731  }
2732 #endif
2733 
2734  wbuff_.reset();
2735  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2736  DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE);
2737  if (!(ser << hdr_) || !(ser << data_) || !(ser << encap) || !(ser << plist)) {
2738  if (DCPS::DCPS_debug_level > 0) {
2739  ACE_ERROR((LM_ERROR,
2740  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
2741  ACE_TEXT("failed to serialize headers for SPDP\n")));
2742  }
2743  return;
2744  }
2745 
2746  send(flags);
2747 }
#define ACE_ERROR(X)
Security::SPDPdiscoveredParticipantData ParticipantData_t
void reset(void)
const char SEDP_AGENT_INFO_KEY[]
Definition: Spdp.h:57
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
ACE_Message_Block wbuff_
Definition: Spdp.h:520
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3235
DCPS::SequenceNumber seq_
Definition: Spdp.h:505
SequenceNumber_t writerSN
Definition: RtpsCore.idl:675
sequence< Parameter > ParameterList
ACE_TEXT("TCP_Factory")
const char SPDP_AGENT_INFO_KEY[]
Definition: Spdp.h:56
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2883
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
bool to_param_list(const DDS::ParticipantBuiltinTopicData &pbtd, ParameterList &param_list)
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139

◆ write_i() [2/2]

void OpenDDS::RTPS::Spdp::SpdpTransport::write_i ( const DCPS::GUID_t guid,
const ACE_INET_Addr local_address,
WriteFlags  flags 
)

Definition at line 2813 of file Spdp.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::assign(), data_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Security::DPDK_ENHANCED, OpenDDS::Security::DPDK_ORIGINAL, OpenDDS::DCPS::Serializer::encoding(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::SubmessageHeader::flags, get_ice_endpoint(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, hdr_, OpenDDS::RTPS::INFO_DST, LM_ERROR, OpenDDS::DCPS::MUTABLE, outer_, ACE_Message_Block::reset(), OpenDDS::RTPS::SEDP_AGENT_INFO_KEY, send(), seq_, OpenDDS::RTPS::InfoDestinationSubmessage::smHeader, OpenDDS::RTPS::SPDP_AGENT_INFO_KEY, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::RTPS::ParameterListConverter::to_param_list(), OpenDDS::RTPS::to_rtps_seqnum(), wbuff_, and OpenDDS::RTPS::DataSubmessage::writerSN.

2814 {
2815  DCPS::RcHandle<Spdp> outer = outer_.lock();
2816  if (!outer) return;
2817 
2818  const ParticipantData_t pdata = outer->build_local_pdata(
2819 #ifdef OPENDDS_SECURITY
2820  true, outer->is_security_enabled() ? Security::DPDK_ENHANCED : Security::DPDK_ORIGINAL
2821 #endif
2822  );
2823 
2825  ++seq_;
2826 
2827  ParameterList plist;
2828  if (!ParameterListConverter::to_param_list(pdata, plist)) {
2829  if (DCPS::DCPS_debug_level > 0) {
2830  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2831  ACE_TEXT("Spdp::SpdpTransport::write_i() - ")
2832  ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
2833  ACE_TEXT("to ParameterList\n")));
2834  }
2835  return;
2836  }
2837 
2838 #ifdef OPENDDS_SECURITY
2839  if (!outer->is_security_enabled()) {
2840  ICE::AgentInfoMap ai_map;
2841  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = outer->sedp_->get_ice_endpoint();
2842  if (sedp_endpoint) {
2843  ai_map[SEDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(sedp_endpoint);
2844  }
2845  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = get_ice_endpoint();
2846  if (spdp_endpoint) {
2847  ai_map[SPDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(spdp_endpoint);
2848  }
2849 
2850  if (!ParameterListConverter::to_param_list(ai_map, plist)) {
2851  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2852  ACE_TEXT("Spdp::SpdpTransport::write_i() - ")
2853  ACE_TEXT("failed to convert from ICE::AgentInfo ")
2854  ACE_TEXT("to ParameterList\n")));
2855  return;
2856  }
2857  }
2858 #endif
2859 
2860  InfoDestinationSubmessage info_dst;
2861  info_dst.smHeader.submessageId = INFO_DST;
2862  info_dst.smHeader.flags = FLAG_E;
2863  info_dst.smHeader.submessageLength = sizeof(guid.guidPrefix);
2864  DCPS::assign(info_dst.guidPrefix, guid.guidPrefix);
2865 
2866  wbuff_.reset();
2867  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2868  DCPS::EncapsulationHeader encap(ser.encoding(), DCPS::MUTABLE);
2869  if (!(ser << hdr_) || !(ser << info_dst) || !(ser << data_) || !(ser << encap)
2870  || !(ser << plist)) {
2871  if (DCPS::DCPS_debug_level > 0) {
2872  ACE_ERROR((LM_ERROR,
2873  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i() - ")
2874  ACE_TEXT("failed to serialize headers for SPDP\n")));
2875  }
2876  return;
2877  }
2878 
2879  send(flags, local_address);
2880 }
#define ACE_ERROR(X)
const octet FLAG_E
Definition: RtpsCore.idl:521
Security::SPDPdiscoveredParticipantData ParticipantData_t
void reset(void)
const char SEDP_AGENT_INFO_KEY[]
Definition: Spdp.h:57
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
ACE_Message_Block wbuff_
Definition: Spdp.h:520
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3235
DCPS::SequenceNumber seq_
Definition: Spdp.h:505
SequenceNumber_t writerSN
Definition: RtpsCore.idl:675
sequence< Parameter > ParameterList
ACE_TEXT("TCP_Factory")
const char SPDP_AGENT_INFO_KEY[]
Definition: Spdp.h:56
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2883
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const DCPS::GUID_t & guid() const
Definition: Spdp.h:94
bool to_param_list(const DDS::ParticipantBuiltinTopicData &pbtd, ParameterList &param_list)
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139

Member Data Documentation

◆ buff_

ACE_Message_Block OpenDDS::RTPS::Spdp::SpdpTransport::buff_

Definition at line 520 of file Spdp.h.

Referenced by handle_input().

◆ data_

DataSubmessage OpenDDS::RTPS::Spdp::SpdpTransport::data_

Definition at line 504 of file Spdp.h.

Referenced by dispose_unregister(), SpdpTransport(), and write_i().

◆ directed_send_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::directed_send_task_

Definition at line 527 of file Spdp.h.

Referenced by close(), open(), and send_directed().

◆ handshake_deadline_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::handshake_deadline_task_

Definition at line 536 of file Spdp.h.

Referenced by close(), and open().

◆ handshake_resend_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::handshake_resend_task_

Definition at line 538 of file Spdp.h.

Referenced by close(), and open().

◆ hdr_

Header OpenDDS::RTPS::Spdp::SpdpTransport::hdr_

Definition at line 503 of file Spdp.h.

Referenced by dispose_unregister(), SpdpTransport(), and write_i().

◆ ice_endpoint_added_

bool OpenDDS::RTPS::Spdp::SpdpTransport::ice_endpoint_added_

Definition at line 550 of file Spdp.h.

Referenced by close(), and open().

◆ last_harvest

DCPS::MonotonicTimePoint OpenDDS::RTPS::Spdp::SpdpTransport::last_harvest

Definition at line 553 of file Spdp.h.

Referenced by thread_status_task().

◆ lease_expiration_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::lease_expiration_task_

Definition at line 530 of file Spdp.h.

Referenced by close(), and open().

◆ local_send_task_

DCPS::RcHandle<SpdpMulti> OpenDDS::RTPS::Spdp::SpdpTransport::local_send_task_

Definition at line 525 of file Spdp.h.

Referenced by close(), enable_periodic_tasks(), open(), and shorten_local_sender_delay_i().

◆ multicast_address_

ACE_INET_Addr OpenDDS::RTPS::Spdp::SpdpTransport::multicast_address_

Definition at line 509 of file Spdp.h.

Referenced by on_data_available(), and SpdpTransport().

◆ multicast_interface_

OPENDDS_STRING OpenDDS::RTPS::Spdp::SpdpTransport::multicast_interface_

Definition at line 508 of file Spdp.h.

Referenced by on_data_available(), and SpdpTransport().

◆ multicast_manager_

DCPS::MulticastManager OpenDDS::RTPS::Spdp::SpdpTransport::multicast_manager_

Definition at line 518 of file Spdp.h.

Referenced by on_data_available().

◆ multicast_socket_

ACE_SOCK_Dgram_Mcast OpenDDS::RTPS::Spdp::SpdpTransport::multicast_socket_

Definition at line 510 of file Spdp.h.

Referenced by choose_recv_socket(), close(), on_data_available(), SpdpTransport(), and ~SpdpTransport().

◆ network_interface_address_reader_

DCPS::RcHandle<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> > OpenDDS::RTPS::Spdp::SpdpTransport::network_interface_address_reader_

Definition at line 533 of file Spdp.h.

Referenced by close(), on_data_available(), and open().

◆ network_is_unreachable_

bool OpenDDS::RTPS::Spdp::SpdpTransport::network_is_unreachable_

Definition at line 549 of file Spdp.h.

Referenced by send().

◆ outer_

DCPS::WeakRcHandle<Spdp> OpenDDS::RTPS::Spdp::SpdpTransport::outer_

◆ relay_spdp_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::relay_spdp_task_

Definition at line 540 of file Spdp.h.

Referenced by close(), enable_periodic_tasks(), open(), and send_relay().

◆ relay_spdp_task_falloff_

DCPS::FibonacciSequence<TimeDuration> OpenDDS::RTPS::Spdp::SpdpTransport::relay_spdp_task_falloff_

Definition at line 541 of file Spdp.h.

Referenced by enable_periodic_tasks(), and send_relay().

◆ relay_srsm_

ICE::ServerReflexiveStateMachine OpenDDS::RTPS::Spdp::SpdpTransport::relay_srsm_

Definition at line 545 of file Spdp.h.

Referenced by disable_relay_stun_task(), handle_input(), process_relay_sra(), and relay_stun_task().

◆ relay_stun_task_

DCPS::RcHandle<SpdpSporadic> OpenDDS::RTPS::Spdp::SpdpTransport::relay_stun_task_

Definition at line 543 of file Spdp.h.

Referenced by close(), disable_relay_stun_task(), enable_periodic_tasks(), open(), and relay_stun_task().

◆ relay_stun_task_falloff_

DCPS::FibonacciSequence<TimeDuration> OpenDDS::RTPS::Spdp::SpdpTransport::relay_stun_task_falloff_

Definition at line 544 of file Spdp.h.

Referenced by enable_periodic_tasks(), process_relay_sra(), and relay_stun_task().

◆ SEND_DIRECT

const Spdp::SpdpTransport::WriteFlags OpenDDS::RTPS::Spdp::SpdpTransport::SEND_DIRECT = (1 << 2)
static

◆ SEND_MULTICAST

const Spdp::SpdpTransport::WriteFlags OpenDDS::RTPS::Spdp::SpdpTransport::SEND_MULTICAST = (1 << 0)
static

◆ SEND_RELAY

const Spdp::SpdpTransport::WriteFlags OpenDDS::RTPS::Spdp::SpdpTransport::SEND_RELAY = (1 << 1)
static

◆ seq_

DCPS::SequenceNumber OpenDDS::RTPS::Spdp::SpdpTransport::seq_

Definition at line 505 of file Spdp.h.

Referenced by dispose_unregister(), and write_i().

◆ thread_status_task_

DCPS::RcHandle<SpdpPeriodic> OpenDDS::RTPS::Spdp::SpdpTransport::thread_status_task_

Definition at line 532 of file Spdp.h.

Referenced by close(), enable_periodic_tasks(), and open().

◆ transport_statistics_

DCPS::InternalTransportStatistics OpenDDS::RTPS::Spdp::SpdpTransport::transport_statistics_

Definition at line 552 of file Spdp.h.

Referenced by handle_input(), and send().

◆ uni_port_

u_short OpenDDS::RTPS::Spdp::SpdpTransport::uni_port_

Definition at line 506 of file Spdp.h.

Referenced by open_unicast_socket(), and SpdpTransport().

◆ unicast_socket_

ACE_SOCK_Dgram OpenDDS::RTPS::Spdp::SpdpTransport::unicast_socket_

◆ wbuff_

ACE_Message_Block OpenDDS::RTPS::Spdp::SpdpTransport::wbuff_

Definition at line 520 of file Spdp.h.

Referenced by dispose_unregister(), send(), and write_i().


The documentation for this struct was generated from the following files: