OpenDDS  Snapshot(2023/04/28-20:55)
RtpsUdpInst.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "RtpsSampleHeader.h"
9 #include "RtpsUdpInst.h"
10 #include "RtpsUdpLoader.h"
11 #include "RtpsUdpTransport.h"
12 #include "RtpsUdpSendStrategy.h"
13 
14 #include <dds/DCPS/LogAddr.h>
19 
20 #include <ace/Configuration.h>
21 
22 #include <cstring>
23 
25 
26 namespace OpenDDS {
27 namespace DCPS {
28 
30  : TransportInst("rtps_udp", name)
32  , send_buffer_size_(ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
33  , rcv_buffer_size_(ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
34 #else
35  , send_buffer_size_(0)
36  , rcv_buffer_size_(0)
37 #endif
38  , use_multicast_(true)
39  , ttl_(1)
40  , anticipated_fragments_(RtpsUdpSendStrategy::UDP_MAX_MESSAGE_SIZE / RtpsSampleHeader::FRAG_SIZE)
41  , max_message_size_(RtpsUdpSendStrategy::UDP_MAX_MESSAGE_SIZE)
42  , nak_depth_(0)
43  , nak_response_delay_(0, DEFAULT_NAK_RESPONSE_DELAY_USEC)
44  , heartbeat_period_(DEFAULT_HEARTBEAT_PERIOD_SEC)
45  , receive_address_duration_(5)
46  , responsive_mode_(false)
47  , send_delay_(0, 10 * 1000)
48  , opendds_discovery_guid_(GUID_UNKNOWN)
49  , multicast_group_address_(7401, "239.255.0.2")
50  , local_address_(u_short(0), "0.0.0.0")
51 #ifdef ACE_HAS_IPV6
52  , ipv6_multicast_group_address_(7401, "FF03::2")
53  , ipv6_local_address_(u_short(0), "::")
54 #endif
55  , rtps_relay_only_(false)
56  , use_rtps_relay_(false)
57  , use_ice_(false)
58 {}
59 
62 {
63  return make_rch<RtpsUdpTransport>(rchandle_from(this));
64 }
65 
66 int
69 {
70  TransportInst::load(cf, sect); // delegate to parent
71 
72  ACE_TString local_address_s;
73  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("local_address"),
74  local_address_s);
75  if (!local_address_s.is_empty()) {
76  NetworkAddress addr(local_address_s.c_str());
77  local_address(addr);
78  }
79 
80  ACE_TString advertised_address_s;
81  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("advertised_address"),
82  advertised_address_s);
83  if (!advertised_address_s.is_empty()) {
84  NetworkAddress addr(advertised_address_s.c_str());
85  advertised_address(addr);
86  }
87 
88 #ifdef ACE_HAS_IPV6
89  ACE_TString ipv6_local_address_s;
90  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("ipv6_local_address"),
91  ipv6_local_address_s);
92  if (!ipv6_local_address_s.is_empty()) {
93  NetworkAddress addr(ipv6_local_address_s.c_str());
94  ipv6_local_address(addr);
95  }
96 
97  ACE_TString ipv6_advertised_address_s;
98  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("ipv6_advertised_address"),
99  ipv6_advertised_address_s);
100  if (!ipv6_advertised_address_s.is_empty()) {
101  NetworkAddress addr(ipv6_advertised_address_s.c_str());
102  ipv6_advertised_address(addr);
103  }
104 #endif
105 
106  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("send_buffer_size"), send_buffer_size_, ACE_UINT32);
107 
108  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("rcv_buffer_size"), rcv_buffer_size_, ACE_UINT32);
109 
110  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("use_multicast"), use_multicast_, bool);
111 
112  ACE_TString group_address_s;
113  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("multicast_group_address"),
114  group_address_s);
115  if (!group_address_s.is_empty()) {
116  if (group_address_s.rfind(':') == group_address_s.npos) {
117  // Concatenate a port number if the user does not supply one.
118  group_address_s += ACE_TEXT(":7401");
119  }
120  NetworkAddress addr(group_address_s.c_str());
122  }
123 
124  GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("multicast_interface"),
126 
127  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("anticipated_fragments"), anticipated_fragments_, size_t);
128 
129  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("max_message_size"), max_message_size_, size_t);
130 
131  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("nak_depth"), nak_depth_, size_t);
132 
133  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("ttl"), ttl_, unsigned char);
134 
135  GET_CONFIG_TIME_VALUE(cf, sect, ACE_TEXT("nak_response_delay"),
137  GET_CONFIG_TIME_VALUE(cf, sect, ACE_TEXT("heartbeat_period"),
139  GET_CONFIG_TIME_VALUE(cf, sect, ACE_TEXT("send_delay"),
140  send_delay_);
141 
142  ACE_TString rtps_relay_address_s;
143  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DataRtpsRelayAddress"),
144  rtps_relay_address_s);
145  if (!rtps_relay_address_s.is_empty()) {
146  NetworkAddress addr(rtps_relay_address_s.c_str());
147  rtps_relay_address(addr);
148  }
149 
150  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("RtpsRelayOnly"), rtps_relay_only_, bool);
151  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("UseRtpsRelay"), use_rtps_relay_, bool);
152 
153  ACE_TString stun_server_address_s;
154  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DataStunServerAddress"),
155  stun_server_address_s);
156  if (!stun_server_address_s.is_empty()) {
157  NetworkAddress addr(stun_server_address_s.c_str());
158  stun_server_address(addr);
159  }
160 
161 #ifdef OPENDDS_SECURITY
162  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("UseIce"), use_ice_, bool);
163  if (use_ice_ && !TheServiceParticipant->get_security()) {
164  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Security must be enabled (-DCPSSecurity 1) when using ICE (UseIce)\n")), -1);
165  }
166 #endif
167 
168  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("ResponsiveMode"), responsive_mode_, bool);
169 
170  return 0;
171 }
172 
175 {
177  ret += formatNameForDump("local_address") + LogAddr(local_address()).str() + '\n';
178  ret += formatNameForDump("use_multicast") + (use_multicast_ ? "true" : "false") + '\n';
179  ret += formatNameForDump("multicast_group_address") + LogAddr(multicast_group_address_).str() + '\n';
180  ret += formatNameForDump("multicast_interface") + multicast_interface_ + '\n';
181  ret += formatNameForDump("nak_depth") + to_dds_string(unsigned(nak_depth_)) + '\n';
182  ret += formatNameForDump("anticipated_fragments") + to_dds_string(unsigned(anticipated_fragments_)) + '\n';
183  ret += formatNameForDump("max_message_size") + to_dds_string(unsigned(max_message_size_)) + '\n';
184  ret += formatNameForDump("nak_response_delay") + nak_response_delay_.str() + '\n';
185  ret += formatNameForDump("heartbeat_period") + heartbeat_period_.str() + '\n';
186  ret += formatNameForDump("send_buffer_size") + to_dds_string(send_buffer_size_) + '\n';
187  ret += formatNameForDump("rcv_buffer_size") + to_dds_string(rcv_buffer_size_) + '\n';
188  ret += formatNameForDump("ttl") + to_dds_string(ttl_) + '\n';
189  ret += formatNameForDump("responsive_mode") + (responsive_mode_ ? "true" : "false") + '\n';
190  return ret;
191 }
192 
193 size_t
195 {
196  using namespace OpenDDS::RTPS;
197 
198  LocatorSeq locators;
199  CORBA::ULong idx = 0;
200 
201  // multicast first so it's preferred by remote peers
203  grow(locators);
205  }
206 #ifdef ACE_HAS_IPV6
207  if ((flags & CONNINFO_MULTICAST) && use_multicast_ && ipv6_multicast_group_address_ != NetworkAddress()) {
208  grow(locators);
209  address_to_locator(locators[idx++], ipv6_multicast_group_address_.to_addr());
210  }
211 #endif
212 
213  if (flags & CONNINFO_UNICAST) {
214  if (local_address() != NetworkAddress()) {
215  if (advertised_address() != NetworkAddress()) {
216  grow(locators);
217  address_to_locator(locators[idx], advertised_address().to_addr());
218  if (locators[idx].port == 0) {
219  locators[idx].port = local_address().get_port_number();
220  }
221  ++idx;
222  } else if (local_address().is_any()) {
223  typedef OPENDDS_VECTOR(ACE_INET_Addr) AddrVector;
224  AddrVector addrs;
225  get_interface_addrs(addrs);
226  for (AddrVector::iterator adr_it = addrs.begin(); adr_it != addrs.end(); ++adr_it) {
227  if (*adr_it != ACE_INET_Addr() && adr_it->get_type() == AF_INET) {
228  grow(locators);
229  address_to_locator(locators[idx], *adr_it);
230  locators[idx].port = local_address().get_port_number();
231  ++idx;
232  }
233  }
234  } else {
235  grow(locators);
236  address_to_locator(locators[idx++], local_address().to_addr());
237  }
238  }
239 #ifdef ACE_HAS_IPV6
240  if (ipv6_local_address() != NetworkAddress()) {
241  if (ipv6_advertised_address() != NetworkAddress()) {
242  grow(locators);
243  address_to_locator(locators[idx], ipv6_advertised_address().to_addr());
244  if (locators[idx].port == 0) {
245  locators[idx].port = ipv6_local_address().get_port_number();
246  }
247  ++idx;
248  } else if (ipv6_local_address().is_any()) {
249  typedef OPENDDS_VECTOR(ACE_INET_Addr) AddrVector;
250  AddrVector addrs;
251  get_interface_addrs(addrs);
252  for (AddrVector::iterator adr_it = addrs.begin(); adr_it != addrs.end(); ++adr_it) {
253  if (*adr_it != ACE_INET_Addr() && adr_it->get_type() == AF_INET6) {
254  grow(locators);
255  address_to_locator(locators[idx], *adr_it);
256  locators[idx].port = ipv6_local_address().get_port_number();
257  ++idx;
258  }
259  }
260  } else {
261  grow(locators);
262  address_to_locator(locators[idx++], ipv6_local_address().to_addr());
263  }
264  }
265 #endif
266  }
267 
268  info.transport_type = "rtps_udp";
269  RTPS::locators_to_blob(locators, info.data);
270 
271  return locators.length();
272 }
273 
274 const TransportBLOB*
276 {
277  for (CORBA::ULong idx = 0, limit = trans_info.length(); idx != limit; ++idx) {
278  if (std::strcmp(trans_info[idx].transport_type, "rtps_udp") == 0) {
279  return &trans_info[idx].data;
280  }
281  }
282 
283  return 0;
284 }
285 
286 void
288  const TransportLocatorSeq& locators)
289 {
291  if (imp) {
293  rtps_impl->update_locators(remote_id, locators);
294  }
295 }
296 
297 void
299  TransportLocator& locator)
300 {
302  if (imp) {
304  rtps_impl->get_last_recv_locator(remote_id, locator);
305  }
306 }
307 
308 void
310 {
311  TransportImpl_rch imp = get_impl();
312  if (imp) {
314  rtps_impl->rtps_relay_address_change();
315  }
316 }
317 
318 void
320 {
322  if (imp) {
324  rtps_impl->append_transport_statistics(seq);
325  }
326 }
327 
328 } // namespace DCPS
329 } // namespace OpenDDS
330 
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
TimeDuration heartbeat_period_
Definition: RtpsUdpInst.h:48
RtpsUdpInst(const OPENDDS_STRING &name)
Definition: RtpsUdpInst.cpp:29
TransportImpl_rch get_or_create_impl()
const char * c_str(void) const
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
ACE_INET_Addr to_addr() const
if(!(yy_init))
void update_locators(const GUID_t &remote_id, const TransportLocatorSeq &locators)
}
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
String to_dds_string(unsigned short to_convert)
#define GET_CONFIG_STRING_VALUE(CF, SECT, KEY, VALUE)
Definition: TransportDefs.h:76
sequence< Locator_t > LocatorSeq
sequence< TransportLocator > TransportLocatorSeq
NetworkAddress stun_server_address() const
Definition: RtpsUdpInst.h:243
TransportImpl_rch new_impl()
Definition: RtpsUdpInst.cpp:61
int get_type(void) const
size_type rfind(char c, size_type pos=npos) const
TimeDuration nak_response_delay_
Definition: RtpsUdpInst.h:47
OpenDDS_Dcps_Export void address_to_locator(Locator_t &locator, const ACE_INET_Addr &addr)
static size_type const npos
#define OPENDDS_STRING
void append_transport_statistics(TransportStatisticsSequence &seq)
NetworkAddress local_address() const
Definition: RtpsUdpInst.h:75
ACE_CDR::ULong ULong
bool is_empty(void) const
void get_last_recv_locator(const GUID_t &, TransportLocator &)
ACE_UINT16 get_port_number() const
sequence< TransportStatistics > TransportStatisticsSequence
NetworkAddress rtps_relay_address() const
Definition: RtpsUdpInst.h:216
NetworkAddress advertised_address() const
Definition: RtpsUdpInst.h:85
#define GET_CONFIG_VALUE(CF, SECT, KEY, VALUE, TYPE)
Definition: TransportDefs.h:45
#define GET_CONFIG_TSTRING_VALUE(CF, SECT, KEY, VALUE)
void get_interface_addrs(OPENDDS_VECTOR(ACE_INET_Addr)&addrs)
NetworkAddress multicast_group_address() const
Definition: RtpsUdpInst.h:65
#define AF_INET
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
virtual OPENDDS_STRING dump_to_str() const
Diagnostic aid.
#define ACE_DEFAULT_MAX_SOCKET_BUFSIZ
OPENDDS_STRING multicast_interface_
Definition: RtpsUdpInst.h:42
const char *const name
Definition: debug.cpp:60
#define GET_CONFIG_TIME_VALUE(CF, SECT, KEY, VALUE)
ACE_TEXT("TCP_Factory")
virtual int load(ACE_Configuration_Heap &cf, ACE_Configuration_Section_Key &sect)
TransportImpl_rch get_impl()
String str(unsigned decimal_places=3, bool just_sec=false) const
const String & str() const
Definition: LogAddr.h:31
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
virtual size_t populate_locator(OpenDDS::DCPS::TransportLocator &trans_info, ConnectionInfoFlags flags) const
Populate a transport locator sequence. Return the number of "locators.".
Adapt the TransportReceiveStrategy for RTPS&#39;s "sample" (submessage) Header.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual int load(ACE_Configuration_Heap &cf, ACE_Configuration_Section_Key &sect)
Definition: RtpsUdpInst.cpp:67
string transport_type
The transport type (e.g. tcp or udp)
static OPENDDS_STRING formatNameForDump(const char *name)
const TransportBLOB * get_blob(const OpenDDS::DCPS::TransportLocatorSeq &trans_info) const
#define ACE_ERROR_RETURN(X, Y)
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
#define TheServiceParticipant
DDS::OctetSeq TransportBLOB
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
void locators_to_blob(const DCPS::LocatorSeq &locators, DCPS::TransportBLOB &blob)
NetworkAddress multicast_group_address_
Definition: RtpsUdpInst.h:163
static const ConnectionInfoFlags CONNINFO_MULTICAST
virtual OPENDDS_STRING dump_to_str() const
static const ConnectionInfoFlags CONNINFO_UNICAST
size_t ConnectionInfoFlags