14 #include <dds/OpenddsDcpsExtTypeSupportImpl.h> 23 #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h> 36 #
if defined(OPENDDS_SECURITY)
39 #ifdef OPENDDS_SECURITY
42 , ice_agent_(ICE::Agent::instance())
44 , transport_statistics_(inst->
name())
58 #ifdef OPENDDS_SECURITY 69 #ifdef OPENDDS_SECURITY 82 #ifdef OPENDDS_SECURITY 91 if (!cfg || cfg->use_rtps_relay()) {
101 ACE_UNUSED_ARG(flag);
103 #ifdef OPENDDS_SECURITY 112 if (!cfg || !cfg->rtps_relay_only()) {
122 ACE_UNUSED_ARG(after);
124 #ifdef OPENDDS_SECURITY 126 const bool before = cfg && cfg->use_ice();
131 if (before && !after) {
133 }
else if (!before && after) {
151 #ifdef OPENDDS_SECURITY 160 #if defined(OPENDDS_SECURITY) 161 if (cfg->use_ice()) {
172 #if defined(OPENDDS_SECURITY) 178 , ipv6_unicast_socket_
182 const ACE_HANDLE v6handle = ipv6_unicast_socket_.get_handle();
184 const ACE_HANDLE v6handle = ACE_INVALID_HANDLE;
188 ACE_TEXT(
"RtpsUdpTransport::make_datalink: ")
189 ACE_TEXT(
"failed to open DataLink for sockets %d %d\n"),
198 ipv6_unicast_socket_.set_handle(ACE_INVALID_HANDLE);
209 bit_sub_ = client->get_builtin_subscriber_proxy();
235 VDBG_LVL((
LM_DEBUG,
"(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2);
244 bit_sub_ = client->get_builtin_subscriber_proxy();
269 VDBG_LVL((
LM_DEBUG,
"(%P|%t) RtpsUdpTransport::accept_datalink pending.\n"), 2);
278 bool association_failed)
280 if (disassociate || association_failed) {
292 typedef PendConnMap::iterator iter_t;
293 const std::pair<iter_t, iter_t> range =
295 for (iter_t iter = range.first; iter != range.second; ++iter) {
296 iter->second->remove_on_start_callback(client, remote_id);
309 bool local_reliable,
bool remote_reliable,
310 bool local_durable,
bool remote_durable,
314 AddrSet uc_addrs, mc_addrs;
315 bool requires_inline_qos;
316 unsigned int blob_bytes_read;
317 get_connection_addrs(remote_data, &uc_addrs, &mc_addrs, &requires_inline_qos, &blob_bytes_read);
320 if (discovery_locator.length()) {
321 AddrSet disco_uc_addrs, disco_mc_addrs;
322 bool disco_requires_inline_qos;
323 unsigned int disco_blob_bytes_read;
324 get_connection_addrs(discovery_locator, &disco_uc_addrs, &disco_mc_addrs, &disco_requires_inline_qos, &disco_blob_bytes_read);
326 for (AddrSet::const_iterator it = disco_uc_addrs.begin(), limit = disco_uc_addrs.end(); it != limit; ++it) {
327 for (AddrSet::const_iterator it2 = uc_addrs.begin(), limit2 = uc_addrs.end(); it2 != limit2; ++it2) {
329 disco_addr_hint = *it2;
336 return link_->
associated(local_id, remote_id, local_reliable, remote_reliable,
337 local_durable, remote_durable,
338 participant_discovered_at, participant_flags, max_sn, client,
339 uc_addrs, mc_addrs, disco_addr_hint, requires_inline_qos);
345 #if defined(OPENDDS_SECURITY) 365 bool* requires_inline_qos,
366 unsigned int* blob_bytes_read)
const 382 if (cfg && cfg->use_multicast_ && mc_addrs) {
385 }
else if (uc_addrs) {
397 cfg->populate_locator(info, flags);
512 AddrSet uc_addrs, mc_addrs;
513 bool requires_inline_qos;
514 unsigned int blob_bytes_read;
530 bool expects_inline_qos =
false;
537 GUIDSeq_var guids(
new GUIDSeq);
564 #ifdef OPENDDS_SECURITY 595 if (config->multicast_interface_.empty() &&
611 ACE_TEXT(
"RtpsUdpTransport::configure_i: open:")
623 BOOL recv_udp_connreset = FALSE;
631 ACE_TEXT(
"RtpsUdpTransport::configure_i: get_local_addr:")
638 #ifdef ACE_RECVPKTINFO 646 address = config->ipv6_local_address().to_addr();
648 if (ipv6_unicast_socket_.open(address, PF_INET6) != 0) {
651 ACE_TEXT(
"RtpsUdpTransport::configure_i: open:")
663 BOOL recv_udp_connreset = FALSE;
664 ipv6_unicast_socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
668 if (ipv6_unicast_socket_.get_local_addr(address) != 0) {
671 ACE_TEXT(
"RtpsUdpTransport::configure_i: get_local_addr:")
677 if (address.is_ipv4_mapped_ipv6() && temp.
is_any()) {
680 config->ipv6_local_address(temp);
682 #ifdef ACE_RECVPKTINFO6 683 if (ipv6_unicast_socket_.set_option(IPPROTO_IPV6, ACE_RECVPKTINFO6, &sockopt,
sizeof sockopt) == -1) {
694 #ifdef OPENDDS_SECURITY 695 if (config->use_ice()) {
702 if (config->opendds_discovery_default_listener_) {
708 #ifdef OPENDDS_SECURITY 732 #ifdef OPENDDS_SECURITY 735 if (cfg && cfg->use_ice()) {
759 #ifdef OPENDDS_SECURITY 765 if (fd == transport.ipv6_unicast_socket_.get_handle()) {
766 return transport.ipv6_unicast_socket_;
770 return transport.unicast_socket_;
779 char buffer[0x10000];
780 iov[0].iov_base = buffer;
781 iov[0].iov_len =
sizeof buffer;
786 #ifdef OPENDDS_SECURITY
787 transport.get_ice_agent(), transport.get_ice_endpoint(),
795 bool shouldWarn(
int code) {
796 return code ==
EPERM || code == EACCES || code == EINTR || code ==
ENOBUFS || code == ENOMEM
809 #ifdef OPENDDS_TESTING_FEATURES 811 if (config->should_drop(iov, n, total_length)) {
816 #ifdef ACE_LACKS_SENDMSG 817 char buffer[UDP_MAX_MESSAGE_SIZE];
819 for (
int i = 0; i < n; ++i) {
820 if (
size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) {
822 "message too large at index %d size %d\n", i, iov[i].iov_len));
825 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
826 iter += iov[i].iov_len;
828 const ssize_t result = socket.
send(buffer, iter - buffer, addr);
833 const int err = errno;
834 if (err !=
ENETUNREACH || !network_is_unreachable) {
837 ACE_ERROR((prio,
"(%P|%t) RtpsUdpTransport.cpp send_single_i() - " 841 network_is_unreachable =
true;
846 network_is_unreachable =
false;
854 ICE::AddressListType addresses;
865 ICE::AddressListType addrs;
867 for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
868 if (pos->get_type() ==
AF_INET) {
870 addresses.push_back(*pos);
874 addresses.push_back(addr);
879 addr = cfg->ipv6_local_address().to_addr();
882 ICE::AddressListType addrs;
884 for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
885 if (pos->get_type() == AF_INET6) {
887 addresses.push_back(*pos);
891 addresses.push_back(addr);
903 if (destination.
get_type() == AF_INET6) {
904 return transport.link_ ? transport.link_->ipv6_unicast_socket() : transport.ipv6_unicast_socket_;
908 ACE_UNUSED_ARG(destination);
909 return transport.link_ ? transport.link_->unicast_socket() : transport.unicast_socket_;
920 serializer << message;
924 const ssize_t result = send_single_i(transport.config(), socket, iov, num_blocks, destination, network_is_unreachable_);
928 if (cfg && cfg->count_messages()) {
930 for (
int i = 0; i < num_blocks; ++i) {
931 bytes += iov[i].iov_len;
936 transport.transport_statistics_.message_count[
key].send_fail(bytes);
938 if (!network_is_unreachable_) {
940 ACE_ERROR((prio,
"(%P|%t) RtpsUdpTransport::send() - " 941 "failed to send STUN message\n"));
943 }
else if (cfg && cfg->count_messages()) {
947 transport.transport_statistics_.message_count[
key].send(result);
954 return cfg ? cfg->stun_server_address().to_addr() :
ACE_INET_Addr();
1007 const ACE_INET_Addr relay_address = cfg->rtps_relay_address().to_addr();
1009 if ((cfg->use_rtps_relay() || cfg->rtps_relay_only()) &&
1025 #ifndef DDS_HAS_MINIMUM_BIT 1027 std::memset(connection_record.
guid, 0,
sizeof(connection_record.
guid));
1077 #ifndef DDS_HAS_MINIMUM_BIT 1081 std::memset(connection_record.
guid, 0,
sizeof(connection_record.
guid));
virtual int handle_input(ACE_HANDLE fd)
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
virtual bool connection_info_i(TransportLocator &info, ConnectionInfoFlags flags) const
virtual void use_rtps_relay_now(bool flag)
void local_crypto_handle(DDS::Security::ParticipantCryptoHandle pch)
RcHandle< T > rchandle_from(T *pointer)
ACE_SOCK_Dgram & choose_send_socket(const ACE_INET_Addr &address) const
PendConnMap pending_connections_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool requires_inline_qos(const GUIDSeq_var &peers)
const InstanceHandle_t HANDLE_NIL
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
virtual void get_last_recv_locator(const GUID_t &, TransportLocator &)
BUILT_IN_TOPIC_KEY string protocol
void transport_shutdown()
bool configure_i(const RtpsUdpInst_rch &config)
ACE_INET_Addr to_addr() const
void unregister_for_reader(const GUID_t &writerid, const GUID_t &readerid)
void register_for_writer(const GUID_t &readerid, const GUID_t &writerid, const AddrSet &addresses, DiscoveryListener *listener)
unsigned long ACE_Reactor_Mask
void server_reflexive_indication_count(size_t x)
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
LockType connections_lock_
ACE_SOCK_Dgram unicast_socket_
RtpsUdpTransport(const RtpsUdpInst_rch &inst)
RcHandle< BitSubscriber > bit_sub_
virtual void shutdown_i()
int locator_to_address(ACE_INET_Addr &dest, const DCPS::Locator_t &locator, bool map)
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
ACE_Thread_Mutex transport_statistics_mutex_
virtual void send(const ACE_INET_Addr &address, const STUN::Message &message)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
bool is_more_local(const NetworkAddress ¤t, const NetworkAddress &incoming)
CommandPtr execute_or_enqueue(CommandPtr command)
#define OPENDDS_ASSERT(C)
sequence< Locator_t > LocatorSeq
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
bool use_datalink(const GUID_t &local_id, const GUID_t &remote_id, const TransportBLOB &remote_data, const TransportBLOB &discovery_locator, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, SequenceNumber max_sn, const TransportClient_rch &client)
RcHandle< Sporadic > relay_stun_task_
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused as reported by Andy Elvey and Dan Kosecki *resynced with Christopher Diggins s branch as it exists in tree building code is back Christopher Diggins *resynced codebase with Chris s branch *removed tree building code
ICE::ServerReflexiveStateMachine relay_srsm_
key GuidPrefix_t guidPrefix
reference_wrapper< T > ref(T &r)
sequence< TransportLocator > TransportLocatorSeq
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
int set_option(int level, int option, void *optval, int optlen) const
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
const MessageCountKind MCK_STUN
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
void register_for_reader(const GUID_t &writerid, const GUID_t &readerid, const AddrSet &addresses, DiscoveryListener *listener)
virtual void rtps_relay_only_now(bool flag)
OpenDDS_Dcps_Export void address_to_locator(Locator_t &locator, const ACE_INET_Addr &addr)
ACE_HANDLE socket(int protocol_family, int type, int proto)
BUILT_IN_TOPIC_KEY string address
void disable_relay_stun_task()
void disassociated(const GUID_t &local, const GUID_t &remote)
static ssize_t receive_bytes_helper(iovec iov[], int n, const ACE_SOCK_Dgram &socket, ACE_INET_Addr &remote_address, DCPS::RcHandle< ICE::Agent > agent, DCPS::WeakRcHandle< ICE::Endpoint > endpoint, RtpsUdpTransport &tport, bool &stop)
MonotonicTime_t participant_discovered_at_
RtpsUdpDataLink_rch link_
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
ThreadLockType relay_stun_task_falloff_mutex_
StateChange send(const ACE_INET_Addr &address, size_t indication_count_limit, const DCPS::GuidPrefix_t &guid_prefix)
NetworkAddress get_last_recv_address(const GUID_t &remote_id)
virtual void release_datalink(DataLink *link)
void client_stop(const GUID_t &localId)
const ACE_INET_Addr & stun_server_address() const
Class to serialize and deserialize data for DDS.
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
void client_stop(const GUID_t &localId)
virtual void update_locators(const GUID_t &, const TransportLocatorSeq &)
sequence< TransportStatistics > TransportStatisticsSequence
long ParticipantCryptoHandle
ConnectionRecords deferred_connection_records_
virtual void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
bool is_shut_down() const
RtpsUdpDataLink_rch make_datalink(const GuidPrefix_t &local_prefix)
void default_listener(const TransportReceiveListener_wrch &trl)
void set_handle(ACE_HANDLE handle)
int control(int cmd, void *) const
DDS::Duration_t to_dds_duration() const
void message_block_to_sequence(const ACE_Message_Block &mb_locator, T &out)
ReactorTask_rch reactor_task_
void get_interface_addrs(OPENDDS_VECTOR(ACE_INET_Addr)&addrs)
RcHandle< ICE::Agent > ice_agent_
int get_local_addr(ACE_Addr &) const
void relay_stun_task(const MonotonicTimePoint &now)
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)
ACE_HANDLE get_handle(void) const
void append(TransportStatisticsSequence &seq, const InternalTransportStatistics &istats)
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
const STUN::Message & message() const
bool associated(const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
void update_locators(const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
static const TimeDuration zero_value
ACE_UINT16 length() const
void unregister_for_writer(const GUID_t &readerid, const GUID_t &writerid)
bool open(const ACE_SOCK_Dgram &unicast_socket)
ReactorTask_rch reactor_task()
sequence< GUID_t > GUIDSeq
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
u_short get_port_number(void) const
bool is_multicast(void) const
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
Sequence number abstraction. Only allows positive 64 bit values.
DCPS::TimeDuration latency() const
ReactorInterceptor_rch interceptor() const
static Configuration * instance()
const DCPS::Encoding & get_locators_encoding()
virtual ICE::AddressListType host_addresses() const
#define VDBG_LVL(DBG_ARGS, LEVEL)
const ACE_SOCK_Dgram & choose_recv_socket(ACE_HANDLE fd) const
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void append_transport_statistics(TransportStatisticsSequence &seq)
InternalTransportStatistics transport_statistics_
const ACE_INET_Addr & unset_stun_server_address() const
int open(const ACE_Addr &local, int protocol_family=ACE_PROTOCOL_FAMILY_INET, int protocol=0, int reuse_addr=0, int ipv6_only=0)
string transport_type
The transport type (e.g. tcp or udp)
ACE_Reactor * get_reactor()
RcHandle< IceEndpoint > ice_endpoint_
const ReturnCode_t RETCODE_OK
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
void assign(EntityId_t &dest, const EntityId_t &src)
ACE_Reactor * reactor() const
#define ACE_ERROR_RETURN(X, Y)
const string RTPS_RELAY_STUN_PROTOCOL
DDS::ReturnCode_t blob_to_locators(const DCPS::TransportBLOB &blob, DCPS::LocatorSeq &locators, bool *requires_inline_qos, unsigned int *pBytesRead)
RcHandle< T > lock() const
static const String ip(const ACE_INET_Addr &addr)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
ThreadLockType links_lock_
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define TheServiceParticipant
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
TransportBLOB discovery_blob_
DDS::OctetSeq TransportBLOB
The Internal API and Implementation of OpenDDS.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
virtual void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
virtual ACE_INET_Addr stun_server_address() const
void process_relay_sra(ICE::ServerReflexiveStateMachine::StateChange)
RtpsUdpInst_rch config() const
DCPS::RcHandle< ICE::Agent > get_ice_agent() const
OpenDDS_Dcps_Export void primitive_serialized_size_boolean(const Encoding &encoding, size_t &size, size_t count=1)
bool latency_available() const
void rtps_relay_address_change()
void get_connection_addrs(const TransportBLOB &data, AddrSet *uc_addrs, AddrSet *mc_addrs=0, bool *requires_inline_qos=0, unsigned int *blob_bytes_read=0) const
BUILT_IN_TOPIC_KEY DDS::OctetArray16 guid
GuidPrefix_t local_prefix_
virtual void use_ice_now(bool flag)
size_t ConnectionInfoFlags
TransportInst_rch config() const