OpenDDS  Snapshot(2023/04/28-20:55)
RtpsUdpReceiveStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
9 #include "RtpsUdpDataLink.h"
10 #include "RtpsUdpInst.h"
11 #include "RtpsUdpTransport.h"
12 
15 #include "dds/DCPS/GuidUtils.h"
16 #include <dds/DCPS/LogAddr.h>
17 #include "dds/DCPS/Util.h"
19 
20 #include "ace/Reactor.h"
21 
22 #include <algorithm>
23 #include <cstring>
24 
25 
27 
28 namespace OpenDDS {
29 namespace DCPS {
30 
32  const GuidPrefix_t& local_prefix,
33  ThreadStatusManager& thread_status_manager)
34  : BaseReceiveStrategy(link->config(), BUFFER_COUNT)
35  , link_(link)
36  , last_received_()
37  , recvd_sample_(0)
38  , fragment_size_(0)
39  , total_frags_(0)
40  , reassembly_(link->config()->fragment_reassembly_timeout_)
41  , receiver_(local_prefix)
42  , thread_status_manager_(thread_status_manager)
43 #ifdef OPENDDS_SECURITY
44  , secure_sample_()
45  , encoded_rtps_(false)
46  , encoded_submsg_(false)
47 #endif
48 {
49  // Since BUFFER_COUNT is 1, the index will always be 0
50  const size_t INDEX = 0;
51 
52  if (receive_buffers_[INDEX] == 0) {
54  receive_buffers_[INDEX],
57  RECEIVE_DATA_BUFFER_SIZE, // Buffer size
58  ACE_Message_Block::MB_DATA, // Default
59  0, // Start with no continuation
60  0, // Let the constructor allocate
61  &data_allocator_, // Our buffer cache
62  &receive_lock_, // Our locking strategy
64  ACE_Time_Value::zero, // Default
65  ACE_Time_Value::max_time, // Default
66  &db_allocator_, // Our data block cache
67  &mb_allocator_ // Our message block cache
68  ));
69  }
70 
71 #ifdef OPENDDS_SECURITY
73 #endif
74 }
75 
76 int
78 {
80 
81  // Since BUFFER_COUNT is 1, the index will always be 0
82  const size_t INDEX = 0;
83 
84  ACE_Message_Block* const cur_rb = receive_buffers_[INDEX];
85  cur_rb->reset();
86 
87  iovec iov;
88 #ifdef _MSC_VER
89 #pragma warning(push)
90 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
91 // since on other platforms iov_len is 64-bit
92 #pragma warning(disable : 4267)
93 #endif
94  iov.iov_len = cur_rb->space();
95 #ifdef _MSC_VER
96 #pragma warning(pop)
97 #endif
98  iov.iov_base = cur_rb->wr_ptr();
99 
100  ACE_INET_Addr remote_address;
101  bool stop = false;
102  ssize_t bytes_remaining = receive_bytes(&iov,
103  1,
104  remote_address,
105  fd,
106  stop);
107 
108  if (stop) {
109  return 0;
110  }
111 
112  if (bytes_remaining < 0) {
113  relink();
114  return -1;
115  }
116 
117  cur_rb->wr_ptr(bytes_remaining);
118 
119  if (bytes_remaining == 0) {
121  return -1;
122  } else {
123  relink();
124  return -1;
125  }
126  }
127 
128  if (!pdu_remaining_) {
129  receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes_remaining);
130  }
131 
132  receive_transport_header_ = *cur_rb;
134  cur_rb->reset();
135  if (DCPS_debug_level > 0) {
136  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: RtpsUdpReceiveStrategy::handle_input: TransportHeader invalid.\n")));
137  }
138  return 0;
139  }
140 
141  bytes_remaining = receive_transport_header_.length_;
143  return 0;
144  }
145 
146  {
147  const ScopedHeaderProcessing shp(*this);
148  while (bytes_remaining > 0) {
149  data_sample_header_.pdu_remaining(bytes_remaining);
150  data_sample_header_ = *cur_rb;
151  bytes_remaining -= data_sample_header_.get_serialized_size();
153  return 0;
154  }
157 
159  VDBG((LM_DEBUG,"(%P|%t) DBG: Attempt reassembly of fragments\n"));
160 
161  if (reassemble(rds)) {
162  VDBG((LM_DEBUG,"(%P|%t) DBG: Reassembled complete message\n"));
163  deliver_sample(rds, remote_address);
164  }
165  // If reassemble() returned false, it takes ownership of the data
166  // just like deliver_sample() does.
167 
168  } else {
169  deliver_sample(rds, remote_address);
170  }
171  }
173  bytes_remaining -= data_sample_header_.message_length();
174 
175  // For the reassembly algorithm, the 'last_fragment_' header bit only
176  // applies to the first DataSampleHeader in the TransportHeader
178  }
179  }
180 
181  // If newly selected buffer index still has a reference count, we'll need to allocate a new one for the read
182  if (receive_buffers_[INDEX]->data_block()->reference_count() > 1) {
183 
184  if (log_level >= LogLevel::Info) {
185  ACE_DEBUG((LM_INFO, "(%P|%t) INFO: RtpsUdpReceiveStrategy::handle_input: reallocating primary receive buffer based on reference count\n"));
186  }
187 
188  ACE_DES_FREE(
189  receive_buffers_[INDEX],
192 
194  receive_buffers_[INDEX],
197  RECEIVE_DATA_BUFFER_SIZE, // Buffer size
198  ACE_Message_Block::MB_DATA, // Default
199  0, // Start with no continuation
200  0, // Let the constructor allocate
201  &data_allocator_, // Our buffer cache
202  &receive_lock_, // Our locking strategy
204  ACE_Time_Value::zero, // Default
205  ACE_Time_Value::max_time, // Default
206  &db_allocator_, // Our data block cache
207  &mb_allocator_ // Our message block cache
208  ),
209  -1);
210  }
211 
212  return 0;
213 }
214 
215 ssize_t
217  int n,
218  const ACE_SOCK_Dgram& socket,
219  ACE_INET_Addr& remote_address,
220 #ifdef OPENDDS_SECURITY
221  DCPS::RcHandle<ICE::Agent> ice_agent,
223 #endif
224  RtpsUdpTransport& tport,
225  bool& stop)
226 {
227  ACE_INET_Addr local_address;
228  const ssize_t ret = socket.recv(iov, n, remote_address, 0
229 #if defined(ACE_RECVPKTINFO) || defined(ACE_RECVPKTINFO6)
230  , &local_address
231 #endif
232  );
233 
234  if (ret == -1) {
235  return ret;
236  }
237 
238  if (remote_address.get_size() > remote_address.get_addr_size()) {
239  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpReceiveStrategy::receive_bytes_helper - invalid address size\n"));
240  return 0;
241  }
242 
243  if (n > 0 && ret > 0 && iov[0].iov_len >= 4 && std::memcmp(iov[0].iov_base, "RTPS", 4) == 0) {
244  RtpsUdpInst_rch cfg = tport.config();
245  if (cfg && cfg->count_messages()) {
246  const NetworkAddress ra(remote_address);
247  const InternalMessageCountKey key(ra, MCK_RTPS, ra == cfg->rtps_relay_address());
248  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, tport.transport_statistics_mutex_, -1);
249  tport.transport_statistics_.message_count[key].recv(ret);
250  }
251  return ret;
252  }
253 
254 #ifdef OPENDDS_SECURITY
255  // Assume STUN
256 # ifndef ACE_RECVPKTINFO
257  ACE_ERROR((LM_ERROR, "ERROR: RtpsUdpReceiveStrategy::receive_bytes_helper potential STUN message "
258  "received but this version of the ACE library doesn't support the local_address "
259  "extension in ACE_SOCK_Dgram::recv\n"));
260  ACE_UNUSED_ARG(stop);
261  ACE_NOTSUP_RETURN(-1);
262 # else
263 
264  stop = true;
265  size_t bytes = ret;
266  size_t block_size = std::min(bytes, static_cast<size_t>(iov[0].iov_len));
267  ACE_Message_Block* head = new ACE_Message_Block(static_cast<const char*>(iov[0].iov_base), block_size);
268  head->length(block_size);
269  bytes -= block_size;
270 
271  ACE_Message_Block* tail = head;
272  for (int i = 1; i < n && bytes != 0; ++i) {
273  block_size = std::min(bytes, static_cast<size_t>(iov[i].iov_len));
274  ACE_Message_Block* mb = new ACE_Message_Block(static_cast<const char*>(iov[i].iov_base), block_size);
275  mb->length(block_size);
276  tail->cont(mb);
277  tail = mb;
278  bytes -= block_size;
279  }
280 
281  DCPS::Serializer serializer(head, STUN::encoding);
282  STUN::Message message;
283  message.block = head;
284  if (serializer >> message) {
285  RtpsUdpInst_rch cfg = tport.config();
286  if (cfg && cfg->count_messages()) {
287  const NetworkAddress ra(remote_address);
288  const InternalMessageCountKey key(ra, MCK_STUN, ra == cfg->rtps_relay_address());
289  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, tport.transport_statistics_mutex_, -1);
290  tport.transport_statistics_.message_count[key].recv(ret);
291  }
292 
293  if (tport.relay_srsm().is_response(message)) {
294  tport.process_relay_sra(tport.relay_srsm().receive(message));
295 #ifdef OPENDDS_SECURITY
296  } else if (endpoint) {
297  ice_agent->receive(endpoint, local_address, remote_address, message);
298 #endif
299  }
300  }
301  head->release();
302 # endif
303 #else
304  ACE_UNUSED_ARG(stop);
305 #endif
306 
307  return ret;
308 }
309 
310 #ifdef OPENDDS_SECURITY
311 namespace {
312  ssize_t recv_err(const char* msg, const ACE_INET_Addr& remote, const DCPS::GUID_t& peer, bool& stop)
313  {
315  ACE_ERROR((LM_WARNING, "(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::receive_bytes - "
316  "from %C %C secure RTPS processing failed: %C\n",
317  DCPS::LogAddr(remote).c_str(), LogGuid(peer).c_str(), msg));
318  }
319  stop = true;
320  return 0;
321  }
322 }
323 #endif
324 
325 const ACE_SOCK_Dgram&
327 {
328 #ifdef ACE_HAS_IPV6
329  if (fd == link_->ipv6_multicast_socket().get_handle()) {
330  return link_->ipv6_multicast_socket();
331  }
332  if (fd == link_->ipv6_unicast_socket().get_handle()) {
333  return link_->ipv6_unicast_socket();
334  }
335 #endif
336  if (fd == link_->multicast_socket().get_handle()) {
337  return link_->multicast_socket();
338  }
339  return link_->unicast_socket();
340 }
341 
342 ssize_t
344  int n,
345  ACE_INET_Addr& remote_address,
346  ACE_HANDLE fd,
347  bool& stop)
348 {
350 #ifdef ACE_LACKS_SENDMSG
351  ACE_UNUSED_ARG(stop);
352  char buffer[0x10000];
353  ssize_t scatter = socket.recv(buffer, sizeof buffer, remote_address);
354  char* iter = buffer;
355  for (int i = 0; scatter > 0 && i < n; ++i) {
356  const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len), // int on LynxOS
357  static_cast<size_t>(scatter));
358  std::memcpy(iov[i].iov_base, iter, chunk);
359  scatter -= chunk;
360  iter += chunk;
361  }
362  const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer);
363 #else
364  const ssize_t ret = receive_bytes_helper(iov, n, socket, remote_address,
365 #ifdef OPENDDS_SECURITY
367 #endif
368  *link_->transport(), stop);
369 #endif
370  remote_address_ = remote_address;
371 
372 #ifdef OPENDDS_SECURITY
373  if (stop) {
374  return ret;
375  }
376 
377  using namespace DDS::Security;
379  if (ret > 0 && receiver != DDS::HANDLE_NIL) {
380  encoded_rtps_ = false;
381 
382  GUID_t peer = GUID_UNKNOWN;
383 
384  const CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
385  if (!crypto) {
386  return recv_err("no crypto plugin", remote_address, peer, stop);
387  }
388 
389  if (ret < RTPS::RTPSHDR_SZ + RTPS::SMHDR_SZ) {
390  return recv_err("message too short", remote_address, peer, stop);
391  }
392 
393  const unsigned int encLen = static_cast<unsigned int>(ret);
394  DDS::OctetSeq encoded(encLen);
395  encoded.length(encLen);
396  unsigned char* const encBuf = encoded.get_buffer();
397  size_t copied = 0;
398  for (int i = 0; i < n && copied < encLen; ++i) {
399  const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
400  static_cast<size_t>(encLen - copied));
401  std::memcpy(encBuf + copied, iov[i].iov_base, chunk);
402  copied += chunk;
403  }
404 
405  if (copied != encLen) {
406  return recv_err("received bytes didn't fit in iovec array", remote_address, peer, stop);
407  }
408 
409  if (encoded[RTPS::RTPSHDR_SZ] != RTPS::SRTPS_PREFIX) {
410  return ret;
411  }
412 
413  static const int GuidPrefixOffset = 8; // "RTPS", Version(2), Vendor(2)
414  std::memcpy(peer.guidPrefix, encBuf + GuidPrefixOffset, sizeof peer.guidPrefix);
418  link_->handle_registry()->get_remote_participant_crypto_handle(peer);
419  if (sender == DDS::HANDLE_NIL) {
421  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::receive_bytes - decode error from %C\n", LogGuid(peer).c_str()));
422  }
424  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::receive_bytes: ")
425  ACE_TEXT("decode_rtps_message no remote participant crypto handle for %C, dropping\n"),
426  LogGuid(peer).c_str()));
427  }
428  stop = true;
429  return ret;
430  }
431 
432  DDS::OctetSeq plain;
433  SecurityException ex = {"", 0, 0};
434  if (!crypto->decode_rtps_message(plain, encoded, receiver, sender, ex)) {
436  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::receive_bytes - decode error from %C\n", LogGuid(peer).c_str()));
437  }
439  ACE_ERROR((LM_WARNING, "(%P|%t) {encdec_warn} decode_rtps_message SecurityException [%d.%d]: %C\n",
440  ex.code, ex.minor_code, ex.message.in()));
441  }
444  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::receive_bytes: ")
445  ACE_TEXT("decode_rtps_message remote participant has crypto handle but no key, dropping\n")));
446  }
447  stop = true;
448  return ret;
449  }
450  return recv_err("decode_rtps_message failed", remote_address, peer, stop);
451  }
452 
453  copied = 0;
454  const size_t plainLen = plain.length();
455  const unsigned char* const plainBuf = plain.get_buffer();
456  for (int i = 0; i < n && copied < plainLen; ++i) {
457  const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
458  plainLen - copied);
459  std::memcpy(iov[i].iov_base, plainBuf + copied, chunk);
460  copied += chunk;
461  }
462 
463  if (copied != plainLen) {
464  return recv_err("plaintext doesn't fit in iovec array", remote_address, peer, stop);
465  }
466 
467  encoded_rtps_ = true;
468  return plainLen;
469  }
470 #endif
471 
472  return ret;
473 }
474 
476 {
477 #ifdef OPENDDS_SECURITY
478  using namespace DDS::Security;
479  const GUID_t sendGuid = make_id(receiver_.source_guid_prefix_, sender);
480  const GuidConverter conv(sendGuid);
481 
485  ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy::check_encoded "
486  "Full message from %C requires protection, dropping\n",
487  OPENDDS_STRING(conv).c_str()));
488  }
489  return false;
490  }
491 
493  conv.isReader() ?
494  link_->handle_registry()->get_remote_datareader_security_attributes(sendGuid) :
495  link_->handle_registry()->get_remote_datawriter_security_attributes(sendGuid));
496  static const EndpointSecurityAttributesMask MASK_PROTECT_SUBMSG =
498  if ((esa & MASK_PROTECT_SUBMSG) == MASK_PROTECT_SUBMSG && !encoded_submsg_) {
500  ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy::check_encoded "
501  "Submessage from %C requires protection, dropping\n",
502  OPENDDS_STRING(conv).c_str()));
503  }
504  return false;
505  }
506 #else
507  ACE_UNUSED_ARG(sender);
508 #endif
509  return true;
510 }
511 
512 void
514  const ACE_INET_Addr& remote_address)
515 {
516  using namespace RTPS;
517 
518  if (std::memcmp(receiver_.dest_guid_prefix_, link_->local_prefix(),
519  sizeof(GuidPrefix_t))) {
520  // Not our message, we may be on multicast listening to all the others.
522  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample - not destination\n"));
523  }
524  return;
525  }
526 
528 
531  }
532 
533 #ifdef OPENDDS_SECURITY
534  const SubmessageKind kind = rsh.submessage_._d();
535 
537  // secure envelope in progress, defer processing
538  secure_submessages_.push_back(rsh.submessage_);
539  if (kind == DATA) {
540  secure_sample_ = sample;
541  }
542  return;
543  }
544 
545  encoded_submsg_ = false;
546 #endif
547 
548  deliver_sample_i(sample, rsh.submessage_, NetworkAddress(remote_address));
549 }
550 
551 void
553  const RTPS::Submessage& submessage,
554  const NetworkAddress& remote_addr)
555 {
556  using namespace RTPS;
557  const SubmessageKind kind = submessage._d();
558 
559  switch (kind) {
560  case INFO_SRC:
561  case INFO_REPLY_IP4:
562  case INFO_DST:
563  case INFO_REPLY:
564  case INFO_TS:
565  // No-op: the INFO_* submessages only modify the state of the
566  // MessageReceiver (see check_header()), they are not passed up to DCPS.
567  break;
568 
569  case DATA: {
570  receiver_.fill_header(sample.header_);
571  const DataSubmessage& data = submessage.data_sm();
572  if (!check_encoded(data.writerId)) {
574  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
575  }
576  break;
577  }
578 
579 #ifdef OPENDDS_SECURITY
580  if (!decode_payload(sample, data)) {
582  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
583  }
584  break;
585  }
586 #endif
587 
588  RepoIdSet directedWriteReaders;
589  getDirectedWriteReaders(directedWriteReaders, data);
590 
591  recvd_sample_ = &sample;
592  readers_selected_.clear();
593  readers_withheld_.clear();
594  // If this sample should be withheld from some readers in order to maintain
595  // in-order delivery, link_->received() will add it to readers_withheld_ otherwise
596  // it will be added to readers_selected_
597  link_->received(data, receiver_.source_guid_prefix_, remote_addr);
598  recvd_sample_ = 0;
599 
601 
602  if (data.readerId != ENTITYID_UNKNOWN) {
603  GUID_t reader;
604  std::memcpy(reader.guidPrefix, link_->local_prefix(),
605  sizeof(GuidPrefix_t));
606  reader.entityId = data.readerId;
607  if (!readers_withheld_.count(reader) &&
608  (directedWriteReaders.empty() || directedWriteReaders.find(reader) != directedWriteReaders.end())) {
609  if (Transport_debug_level > 5) {
610  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
611  ACE_TEXT("calling DataLink::data_received for seq: %q to reader %C\n"),
612  this, sample.header_.sequence_.getValue(), LogGuid(reader).c_str()));
613  }
614  link_->data_received(sample, reader);
615  }
616  } else {
617  if (Transport_debug_level > 5) {
618  OPENDDS_STRING included_ids;
619  bool first = true;
620  RepoIdSet::iterator iter = readers_selected_.begin();
621  while (iter != readers_selected_.end()) {
622  included_ids += (first ? "" : "\n") + LogGuid(*iter).conv_;
623  first = false;
624  ++iter;
625  }
626  OPENDDS_STRING excluded_ids;
627  first = true;
628  RepoIdSet::iterator iter2 = this->readers_withheld_.begin();
629  while (iter2 != readers_withheld_.end()) {
630  excluded_ids += (first ? "" : "\n") + LogGuid(*iter2).conv_;
631  first = false;
632  ++iter2;
633  }
634  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i:")
635  ACE_TEXT(" readers_selected ids: %C\n")
636  ACE_TEXT(" readers_withheld ids: %C\n"),
637  this, included_ids.c_str(), excluded_ids.c_str()));
638  }
639 
640  if (readers_withheld_.empty() && readers_selected_.empty()) {
641  if (directedWriteReaders.empty()) {
642  if (Transport_debug_level > 5) {
643  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
644  ACE_TEXT("calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n"),
645  this, sample.header_.sequence_.getValue()));
646  }
647  link_->data_received(sample);
648  } else {
649  if (Transport_debug_level > 5) {
650  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
651  ACE_TEXT("calling DataLink::data_received_include for seq: %q to directedWriteReaders\n"),
652  this, sample.header_.sequence_.getValue()));
653  }
654  link_->data_received_include(sample, directedWriteReaders);
655  }
656  } else {
657  if (directedWriteReaders.empty()) {
658  if (Transport_debug_level > 5) {
659  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
660  ACE_TEXT("calling DataLink::data_received_include for seq: %q to readers_selected_\n"),
661  this, sample.header_.sequence_.getValue()));
662  }
664  } else {
665  if (Transport_debug_level > 5) {
666  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
667  ACE_TEXT("calling DataLink::data_received_include for seq: %q to intersection of readers\n"),
668  this, sample.header_.sequence_.getValue()));
669  }
670  set_intersect(directedWriteReaders, readers_selected_, GUID_tKeyLessThan());
671  link_->data_received_include(sample, directedWriteReaders);
672  }
673  }
674  }
675  break;
676  }
677  case GAP:
678  if (!check_encoded(submessage.gap_sm().writerId)) {
680  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
681  }
682  break;
683  }
684  link_->received(submessage.gap_sm(), receiver_.source_guid_prefix_, receiver_.directed_, remote_addr);
685  break;
686 
687  case HEARTBEAT:
688  if (!check_encoded(submessage.heartbeat_sm().writerId)) {
690  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
691  }
692  break;
693  }
695  if (submessage.heartbeat_sm().smHeader.flags & FLAG_L) {
696  // Liveliness has been asserted. Create a DATAWRITER_LIVELINESS message.
698  receiver_.fill_header(sample.header_);
699  sample.header_.publication_id_.entityId = submessage.heartbeat_sm().writerId;
700  link_->data_received(sample);
701  }
702  break;
703 
704  case ACKNACK:
705  if (!check_encoded(submessage.acknack_sm().readerId)) {
707  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
708  }
709  break;
710  }
711  link_->received(submessage.acknack_sm(), receiver_.source_guid_prefix_, remote_addr);
712  break;
713 
714  case HEARTBEAT_FRAG:
715  if (!check_encoded(submessage.hb_frag_sm().writerId)) {
717  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
718  }
719  break;
720  }
722  break;
723 
724  case NACK_FRAG:
725  if (!check_encoded(submessage.nack_frag_sm().readerId)) {
727  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
728  }
729  break;
730  }
731  link_->received(submessage.nack_frag_sm(), receiver_.source_guid_prefix_, remote_addr);
732  break;
733 
734  /* no case DATA_FRAG: by the time deliver_sample() is called, reassemble()
735  has successfully reassembled the fragments and we now have a DATA submsg
736  */
737 
738 #ifdef OPENDDS_SECURITY
739  case SEC_PREFIX:
740  secure_prefix_ = submessage.security_sm();
741  break;
742 
743  case SEC_POSTFIX:
744  deliver_from_secure(submessage, remote_addr);
745  break;
746 #endif
747 
748  default:
749  break;
750  }
751 }
752 
753 #ifdef OPENDDS_SECURITY
754 void
756  const NetworkAddress& remote_addr)
757 {
758  using namespace DDS::Security;
759 
760  const CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
761  if (!crypto) {
762  // security not enabled for this datalink -- this can be reached
763  // when a secure message is seen on the same multicast group
764  return;
765  }
766 
770  link_->handle_registry()->get_remote_participant_crypto_handle(peer);
771 
772  DDS::OctetSeq encoded_submsg, plain_submsg;
773  if (!sec_submsg_to_octets(encoded_submsg, submessage)) {
775  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy: ")
776  ACE_TEXT("deliver_from_secure failed to encode submessage %C RPCH %d\n"),
777  LogGuid(peer).c_str(), peer_pch));
778  }
779  return;
780  }
783 
787  SecurityException ex = {"", 0, 0};
788 
789  bool ok = crypto->preprocess_secure_submsg(dwch, drch, category, encoded_submsg,
790  link_->local_crypto_handle(), peer_pch, ex);
791 
792  if (ok) {
793  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy::deliver_from_secure ")
794  ACE_TEXT("dwch is %d and drch is %d\n"), dwch, drch), 4);
795  }
796 
797  if (ok && category == DATAWRITER_SUBMESSAGE) {
798  ok = crypto->decode_datawriter_submessage(plain_submsg, encoded_submsg,
799  drch, dwch, ex);
800 
801  } else if (ok && category == DATAREADER_SUBMESSAGE) {
802  ok = crypto->decode_datareader_submessage(plain_submsg, encoded_submsg,
803  dwch, drch, ex);
804 
805  } else if (ok && category == INFO_SUBMESSAGE) {
806  return;
807 
808  } else {
810  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::deliver_from_secure ")
811  ACE_TEXT("failed remote %C RPCH %d, [%d.%d]: %C\n"),
812  LogGuid(peer).c_str(), peer_pch, ex.code, ex.minor_code, ex.message.in()));
813  }
814  return;
815  }
816 
817  if (!ok) {
818  bool dw = category == DATAWRITER_SUBMESSAGE;
820  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::deliver_from_secure ")
821  ACE_TEXT("decode %C submessage failed [%d.%d]: \"%C\" ")
822  ACE_TEXT("(rpch: %u, local d%cch: %u, remote d%cch: %u)\n"),
823  dw ? "writer" : "reader",
824  ex.code, ex.minor_code, ex.message.in(),
825  peer_pch,
826  dw ? 'r' : 'w',
827  dw ? drch : dwch,
828  dw ? 'w' : 'r',
829  dw ? dwch : drch
830  ));
831  }
832  return;
833  }
834 
835  ACE_Message_Block mb(plain_submsg.length());
836  mb.copy(reinterpret_cast<const char*>(plain_submsg.get_buffer()), mb.size());
837 
838  if (Transport_debug_level > 5) {
839  ACE_HEX_DUMP((LM_DEBUG, mb.rd_ptr(), mb.length(),
840  category == DATAWRITER_SUBMESSAGE ?
841  ACE_TEXT("RtpsUdpReceiveStrategy: decoded writer submessage") :
842  ACE_TEXT("RtpsUdpReceiveStrategy: decoded reader submessage")));
843  }
844 
845  RtpsSampleHeader rsh(mb);
846  if (check_header(rsh)) {
847  ReceivedDataSample plain_sample(mb);
848  if (rsh.into_received_data_sample(plain_sample)) {
849  if (rsh.more_fragments()) {
850  VDBG((LM_DEBUG, "(%P|%t) DBG: Attempt reassembly of decoded fragments\n"));
851  if (reassemble_i(plain_sample, rsh)) {
852  VDBG((LM_DEBUG, "(%P|%t) DBG: Reassembled complete message from decoded\n"));
853  encoded_submsg_ = true;
855  // Pop the secure envelope.
856  message_.submessages.length(message_.submessages.length() - 3);
858  }
859  deliver_sample_i(plain_sample, rsh.submessage_, remote_addr);
860  return;
861  }
862  }
863  encoded_submsg_ = true;
865  // Pop the secure envelope.
866  message_.submessages.length(message_.submessages.length() - 3);
868  }
869  deliver_sample_i(plain_sample, rsh.submessage_, remote_addr);
870  }
871  }
872 }
873 
874 bool
876  const RTPS::Submessage& postfix)
877 {
879  size_t size = serialized_size(encoding, secure_prefix_);
880 
881  for (size_t i = 0; i < secure_submessages_.size(); ++i) {
882  serialized_size(encoding, size, secure_submessages_[i]);
883  const RTPS::SubmessageKind kind = secure_submessages_[i]._d();
884  if (kind == RTPS::DATA || kind == RTPS::DATA_FRAG) {
885  size += secure_sample_.data_length();
886  }
887  align(size, RTPS::SMHDR_SZ);
888  }
889  serialized_size(encoding, size, postfix);
890 
891  ACE_Message_Block mb(size);
892  Serializer ser(&mb, encoding);
893  if (!(ser << secure_prefix_)) {
894  return false;
895  }
896 
897  if (!ser.align_r(RTPS::SMHDR_SZ)) {
898  return false;
899  }
900 
901  for (size_t i = 0; i < secure_submessages_.size(); ++i) {
902  if (!(ser << secure_submessages_[i])) {
903  return false;
904  }
905  const RTPS::SubmessageKind kind = secure_submessages_[i]._d();
906  if (kind == RTPS::DATA || kind == RTPS::DATA_FRAG) {
907  if (!secure_sample_.write_data(ser)) {
908  return false;
909  }
910  }
911  if (!ser.align_r(RTPS::SMHDR_SZ)) {
912  return false;
913  }
914  }
915  if (!(ser << postfix)) {
916  return false;
917  }
918 
919  encoded.length(static_cast<unsigned int>(mb.length()));
920  std::memcpy(encoded.get_buffer(), mb.rd_ptr(), mb.length());
921  secure_submessages_.resize(0);
922 
923  return true;
924 }
925 
927  const RTPS::DataSubmessage& submsg)
928 {
929  using namespace DDS::Security;
930 
931  static const EndpointSecurityAttributesMask MASK_PROTECT_PAYLOAD =
933  const CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
934  DatawriterCryptoHandle writer_crypto_handle;
936 
938  writer_crypto_handle =
939  link_->handle_registry()->get_local_datawriter_crypto_handle(sample.header_.publication_id_);
940  esa =
941  RTPS::security_attributes_to_bitmask(link_->handle_registry()->get_local_datawriter_security_attributes(sample.header_.publication_id_));
942  } else {
943  writer_crypto_handle =
944  link_->handle_registry()->get_remote_datawriter_crypto_handle(sample.header_.publication_id_);
945  esa =
946  RTPS::security_attributes_to_bitmask(link_->handle_registry()->get_remote_datawriter_security_attributes(sample.header_.publication_id_));
947  }
948 
949  const bool payload_protected = (esa & MASK_PROTECT_PAYLOAD) == MASK_PROTECT_PAYLOAD;
950 
951  if (writer_crypto_handle == DDS::HANDLE_NIL || !crypto || !payload_protected) {
952  return true;
953  }
954 
955  DDS::OctetSeq encoded = sample.copy_data(), plain, iQos;
956 
958  static_cast<Endianness>(submsg.smHeader.flags & 1));
959  size_t iQosSize = 0;
960  serialized_size(encoding, iQosSize, submsg.inlineQos);
961  iQos.length(static_cast<unsigned int>(iQosSize));
962  const char* iQos_raw = reinterpret_cast<const char*>(iQos.get_buffer());
963  ACE_Message_Block iQosMb(iQos_raw, iQos.length());
964  Serializer ser(&iQosMb, encoding);
965  ser << submsg.inlineQos;
966 
967  SecurityException ex = {"", 0, 0};
968  // DDS-Security: since origin authentication for payload is not yet supported
969  // the reader's crypto handle is NIL here (could be multiple readers in this
970  // participant)
971  const bool ok = crypto->decode_serialized_payload(plain, encoded, iQos,
973  writer_crypto_handle, ex);
974  if (ok) {
975  // The ReceivedDataSample's message block uses the transport's data block so it
976  // can't be modified in-place, instead replace it with a new block.
977  sample.clear();
978  sample.append(reinterpret_cast<const char*>(plain.get_buffer()), plain.length());
979 
980  if (plain.length() > 1) {
982  }
983 
984  } else if (security_debug.encdec_warn) {
985  ACE_ERROR((LM_WARNING, "(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy: "
986  "decode_serialized_payload failed [%d.%d]: %C\n",
987  ex.code, ex.minor_code, ex.message.in()));
988  }
989 
990  return ok;
991 }
992 #endif
993 
994 int
996 {
998  ri->execute_or_enqueue(make_rch<RegisterHandler>(link_->unicast_socket().get_handle(), this, static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
999 #ifdef ACE_HAS_IPV6
1000  ri->execute_or_enqueue(make_rch<RegisterHandler>(link_->ipv6_unicast_socket().get_handle(), this, static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1001 #endif
1002 
1003  return 0;
1004 }
1005 
1006 void
1008 {
1010  ri->execute_or_enqueue(make_rch<RemoveHandler>(link_->unicast_socket().get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1011 #ifdef ACE_HAS_IPV6
1012  ri->execute_or_enqueue(make_rch<RemoveHandler>(link_->ipv6_unicast_socket().get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1013 #endif
1014 
1015  RtpsUdpInst_rch cfg = link_->config();
1016  if (cfg && cfg->use_multicast_) {
1017  ri->execute_or_enqueue(make_rch<RemoveHandler>(link_->multicast_socket().get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1018 #ifdef ACE_HAS_IPV6
1019  ri->execute_or_enqueue(make_rch<RemoveHandler>(link_->ipv6_multicast_socket().get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1020 #endif
1021  }
1022 }
1023 
1024 bool
1026 {
1029  message_.submessages.length(0);
1030  message_.hdr = header.header_;
1031  }
1032 
1033 #ifdef OPENDDS_SECURITY
1035 #endif
1036 
1037  return header.valid();
1038 }
1039 
1040 bool
1042 {
1043 
1044 #ifdef OPENDDS_SECURITY
1046  return header.valid();
1047  }
1048 #endif
1049 
1050  receiver_.submsg(header.submessage_);
1051 
1052  // save fragmentation details for use in reassemble()
1053  if (header.valid() && header.submessage_._d() == RTPS::DATA_FRAG) {
1054  const RTPS::DataFragSubmessage& rtps = header.submessage_.data_frag_sm();
1055  frags_.first = rtps.fragmentStartingNum.value;
1056  frags_.second = frags_.first + (rtps.fragmentsInSubmessage - 1);
1058  total_frags_ = (rtps.sampleSize / rtps.fragmentSize) + (rtps.sampleSize % rtps.fragmentSize ? 1 : 0);
1059  }
1060 
1061  return header.valid();
1062 }
1063 
1064 void
1066 {
1068 }
1069 
1070 void
1072 {
1073  link_->disable_response_queue(false);
1074 }
1075 
1076 const ReceivedDataSample*
1078 {
1079  readers_withheld_.insert(sub_id);
1080  return recvd_sample_;
1081 }
1082 
1083 void
1085 {
1086  readers_selected_.insert(sub_id);
1087 }
1088 
1090 {
1091  directedWriteReaders.clear();
1092  for (CORBA::ULong i = 0; i < ds.inlineQos.length(); ++i) {
1093  if (ds.inlineQos[i]._d() == RTPS::PID_DIRECTED_WRITE
1094  && receiver_.source_version_.minor >= 4) {
1095  directedWriteReaders.insert(ds.inlineQos[i].guid());
1096  }
1097  }
1098  return !directedWriteReaders.empty();
1099 }
1100 
1102 {
1104  return reassemble_i(data, rsh);
1105 }
1106 
1108 {
1109  using namespace RTPS;
1110  receiver_.fill_header(data.header_); // set publication_id_.guidPrefix
1113 
1114  // Reassembly was successful, replace DataFrag with Data. This doesn't have
1115  // to be a fully-formed DataSubmessage, just enough for this class to use
1116  // in deliver_sample() which ends up calling RtpsUdpDataLink::received().
1117  // In particular we will need the SequenceNumber, but ignore the iQoS.
1118 
1119  // Peek at the byte order from the encapsulation containing the payload.
1120  data.header_.byte_order_ = data.peek(1) & FLAG_E;
1121 
1122  const DataFragSubmessage& dfsm = rsh.submessage_.data_frag_sm();
1123 
1124  const CORBA::Octet data_flags = (data.header_.byte_order_ ? FLAG_E : 0)
1126  const DataSubmessage dsm = {
1127  {DATA, data_flags, 0}, 0, DATA_OCTETS_TO_IQOS,
1128  dfsm.readerId, dfsm.writerId, dfsm.writerSN, ParameterList()};
1129  rsh.submessage_.data_sm(dsm);
1130  return true;
1131  }
1132  return false;
1133 }
1134 
1135 bool
1137  CORBA::ULong num_bits,
1138  const SequenceNumber& base,
1139  const GUID_t& pub_id,
1140  ACE_CDR::ULong& cumulative_bits_added)
1141 {
1142  bool modified = false;
1143  for (CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) {
1144  if (bit == 32) bit = 0;
1145 
1146  if (bit == 0) {
1147  x = static_cast<CORBA::ULong>(bitmap[i / 32]);
1148  if (x == 0) {
1149  // skip an entire Long if it's all 0's (adds 32 due to ++i)
1150  i += 31;
1151  bit = 31;
1152  //FUTURE: this could be generalized with something like the x86 "bsr"
1153  // instruction using compiler intrinsics, VC++ _BitScanReverse()
1154  // and GCC __builtin_clz()
1155  continue;
1156  }
1157  }
1158 
1159  const CORBA::ULong mask = 1 << (31 - bit);
1160  if (x & mask) {
1161  const bool has_frags = reassembly_.has_frags(base + i, pub_id);
1162  if (has_frags) {
1163  x &= ~mask;
1164  bitmap[i / 32] = x;
1165  modified = true;
1166  --cumulative_bits_added;
1167  }
1168  }
1169  }
1170  return modified;
1171 }
1172 
1173 void
1175  const GUID_t& pub_id)
1176 {
1177  for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
1178  reassembly_.data_unavailable(sn, pub_id);
1179  }
1180 }
1181 
1182 void
1184 {
1185  reassembly_.clear_completed(pub_id);
1186 }
1187 
1188 bool
1190  const GUID_t& pub_id,
1191  FragmentInfo* frag_info)
1192 {
1193  for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
1194  ACE_UINT32 total_frags = 0;
1195  if (reassembly_.has_frags(sn, pub_id, total_frags)) {
1196  if (frag_info) {
1197  if (total_frags > 256) {
1198  static const CORBA::Long empty_buffer[8] = { 0, 0, 0, 0, 0, 0, 0, 0 };
1199  OPENDDS_VECTOR(CORBA::Long) buffer(total_frags + 31 / 32, 0);
1200  ACE_UINT32 numBits = 0;
1201  size_t idx = 0;
1202  const ACE_UINT32 base = reassembly_.get_gaps(sn, pub_id, &buffer[0], static_cast<CORBA::ULong>(buffer.size()), numBits);
1203  const CORBA::ULong end = base + numBits;
1204  for (CORBA::ULong i = base; i <= end; i += 256) {
1205  const CORBA::ULong remain = end - i;
1206  const CORBA::ULong len = std::min(remain, static_cast<CORBA::ULong>(256));
1207  const CORBA::ULong len32 = (len + 31) / 32;
1208  const CORBA::ULong len8 = len32 * 4;
1209  if (std::memcmp(&buffer[idx], &empty_buffer[0], len8) != 0) {
1210  std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
1211  p.first = sn;
1212  p.second = RTPS::FragmentNumberSet();
1213  frag_info->push_back(p);
1214  RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
1215  missing_frags.numBits = len;
1216  missing_frags.bitmapBase.value = i;
1217  missing_frags.bitmap.length(len32);
1218  std::memcpy(missing_frags.bitmap.get_buffer(), &buffer[idx], len8);
1219  }
1220  idx += 8;
1221  }
1222  } else {
1223  std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
1224  p.first = sn;
1225  p.second = RTPS::FragmentNumberSet();
1226  frag_info->push_back(p);
1227  RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
1228  missing_frags.numBits = 0; // make sure this is a valid number before passing to get_gaps
1229  missing_frags.bitmap.length(8); // start at max length
1230  missing_frags.bitmapBase.value =
1231  reassembly_.get_gaps(sn, pub_id, missing_frags.bitmap.get_buffer(),
1232  8, missing_frags.numBits);
1233  // reduce length in case get_gaps() didn't need all that room
1234  missing_frags.bitmap.length((missing_frags.numBits + 31) / 32);
1235  }
1236  } else {
1237  return true;
1238  }
1239  }
1240  }
1241  return frag_info ? !frag_info->empty() : false;
1242 }
1243 
1244 
1245 // MessageReceiver nested class
1246 
1248  : directed_(false)
1249  , have_timestamp_(false)
1250 {
1251  assign(local_, local);
1254  for (size_t i = 0; i < sizeof(GuidPrefix_t); ++i) {
1255  source_guid_prefix_[i] = 0;
1256  dest_guid_prefix_[i] = 0;
1257  }
1258  timestamp_.seconds = 0;
1259  timestamp_.fraction = 0;
1260 }
1261 
1262 void
1264  const RTPS::Header& hdr)
1265 {
1266  using namespace RTPS;
1267  // see RTPS spec v2.1 section 8.3.4 table 8.16 and section 8.3.6.4
1268  source_version_ = hdr.version;
1269  source_vendor_ = hdr.vendorId;
1270 
1273  directed_ = false;
1274 
1275  unicast_reply_locator_list_.length(1);
1278 
1282 
1283  have_timestamp_ = false;
1285 }
1286 
1287 void
1289 {
1290  using namespace RTPS;
1291 
1292  switch (s._d()) {
1293  case INFO_TS:
1294  submsg(s.info_ts_sm());
1295  break;
1296 
1297  case INFO_SRC:
1298  submsg(s.info_src_sm());
1299  break;
1300 
1301  case INFO_REPLY_IP4:
1303  break;
1304 
1305  case INFO_DST:
1306  submsg(s.info_dst_sm());
1307  break;
1308 
1309  case INFO_REPLY:
1310  submsg(s.info_reply_sm());
1311  break;
1312 
1313  default:
1314  break;
1315  }
1316 }
1317 
1318 void
1321 {
1322  // see RTPS spec v2.1 section 8.3.7.7.4
1323  for (size_t i = 0; i < sizeof(GuidPrefix_t); ++i) {
1324  if (id.guidPrefix[i]) { // if some byte is > 0, it's not UNKNOWN
1325  assign(dest_guid_prefix_, id.guidPrefix);
1326  directed_ = true;
1327  return;
1328  }
1329  }
1331  directed_ = false;
1332 }
1333 
1334 void
1336 {
1337  // see RTPS spec v2.1 section 8.3.7.8.4
1338  unicast_reply_locator_list_.length(ir.unicastLocatorList.length());
1339  for (CORBA::ULong i = 0; i < ir.unicastLocatorList.length(); ++i) {
1341  }
1342 
1343  if (ir.smHeader.flags & 2 /* MulticastFlag */) {
1345  for (CORBA::ULong i = 0; i < ir.multicastLocatorList.length(); ++i) {
1347  }
1348 
1349  } else {
1351  }
1352 }
1353 
1354 void
1356  const RTPS::InfoReplyIp4Submessage& iri4)
1357 {
1358  // see RTPS spec v2.1 sections 8.3.7.8.4 and 9.4.5.14
1359  unicast_reply_locator_list_.length(1);
1361  unicast_reply_locator_list_[0].port = iri4.unicastLocator.port;
1363 
1364  if (iri4.smHeader.flags & 2 /* MulticastFlag */) {
1367  multicast_reply_locator_list_[0].port = iri4.multicastLocator.port;
1369  } else {
1371  }
1372 }
1373 
1374 void
1377 {
1378  // see RTPS spec v2.1 section 8.3.7.9.10
1379  if (!(it.smHeader.flags & 2 /* InvalidateFlag */)) {
1380  have_timestamp_ = true;
1381  timestamp_ = it.timestamp;
1382  } else {
1383  have_timestamp_ = false;
1384  }
1385 }
1386 
1387 void
1389  const RTPS::InfoSourceSubmessage& is)
1390 {
1391  // see RTPS spec v2.1 section 8.3.7.9.4
1393  source_version_ = is.version;
1394  source_vendor_ = is.vendorId;
1395  unicast_reply_locator_list_.length(1);
1399  have_timestamp_ = false;
1400 }
1401 
1402 void
1404  DataSampleHeader& header) const
1405 {
1406  using namespace RTPS;
1407  if (have_timestamp_) {
1409  header.source_timestamp_nanosec_ =
1411  }
1413 }
1414 
1415 } // namespace DCPS
1416 } // namespace OpenDDS
1417 
DataSampleHeader header_
The demarshalled sample header.
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.
#define ACE_DEBUG(X)
RtpsUdpReceiveStrategy(RtpsUdpDataLink *link, const GuidPrefix_t &local_prefix, ThreadStatusManager &thread_status_manager)
ACE_CDR::Long Long
#define ACE_ERROR(X)
void deliver_sample_i(ReceivedDataSample &sample, const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
const octet FLAG_E
Definition: RtpsCore.idl:521
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
char message_id_
The enum MessageId.
AckNackSubmessage acknack_sm
Definition: RtpsCore.idl:839
const ACE_CDR::UShort RTPSHDR_SZ
Definition: MessageTypes.h:105
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_PAYLOAD_PROTECTED
size_t length(void) const
unsigned long ACE_Reactor_Mask
SubmessageHeader smHeader
Definition: RtpsCore.idl:667
LM_INFO
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const long LOCATOR_KIND_UDPv4
Definition: RtpsCore.idl:111
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RtpsUdpTransport_rch transport()
bool isReader() const
Returns true if the GUID represents a reader entity.
void data_received_include(ReceivedDataSample &sample, const RepoIdSet &incl)
Definition: DataLink.cpp:698
void reset(void)
sequence< octet > key
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
InfoDestinationSubmessage info_dst_sm
Definition: RtpsCore.idl:857
virtual int start_i()
Let the subclass start.
bool log_dropped_messages
Log received RTPS messages that were dropped.
Adapt the TransportReceiveStrategy for RTPS&#39;s "transport" (message) Header.
GuidSet RepoIdSet
Definition: GuidUtils.h:113
const ACE_SOCK_Dgram & choose_recv_socket(ACE_HANDLE fd) const
const char * c_str() const
Security::HandleRegistry_rch handle_registry() const
CommandPtr execute_or_enqueue(CommandPtr command)
void remove_fragments(const SequenceRange &range, const GUID_t &pub_id)
int get_size(void) const
bool has_fragments(const SequenceRange &range, const GUID_t &pub_id, FragmentInfo *frag_info=0)
const OpenDDS::DCPS::Locator_t LOCATOR_INVALID
Definition: MessageTypes.h:52
DDS::OctetSeq copy_data() const
copy the data payload into an OctetSeq
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
Definition: MessageTypes.h:102
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint() const
bool remove_frags_from_bitmap(CORBA::Long bitmap[], CORBA::ULong num_bits, const SequenceNumber &base, const GUID_t &pub_id, ACE_CDR::ULong &samples_requested)
int ssize_t
const Time_t TIME_INVALID
Definition: MessageTypes.h:20
InfoSourceSubmessage info_src_sm
Definition: RtpsCore.idl:851
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
void disable_response_queue(bool send_immediately)
const MessageCountKind MCK_STUN
char * rd_ptr(void) const
ReactorInterceptor_rch get_reactor_interceptor() const
void deliver_from_secure(const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
ACE_SOCK_Dgram_Mcast & multicast_socket()
bool is_target(const GUID_t &remote_id)
Definition: DataLink.cpp:1013
virtual void end_transport_header_processing()
End Current Transport Header Processing.
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
OpenDDS_Dcps_Export void address_to_locator(Locator_t &locator, const ACE_INET_Addr &addr)
RtpsUdpInst_rch config() const
SecuritySubmessage security_sm
Definition: RtpsCore.idl:879
ACE_HANDLE socket(int protocol_family, int type, int proto)
bool has_frags(const SequenceNumber &seq, const GUID_t &pub_id) const
virtual void stop_i()
Let the subclass stop.
const GuidPrefix_t & local_prefix() const
#define OPENDDS_STRING
int copy(const char *buf, size_t n)
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
void clear_completed_fragments(const GUID_t &pub_id)
ProtocolVersion_t version
Definition: RtpsCore.idl:656
static ssize_t receive_bytes_helper(iovec iov[], int n, const ACE_SOCK_Dgram &socket, ACE_INET_Addr &remote_address, DCPS::RcHandle< ICE::Agent > agent, DCPS::WeakRcHandle< ICE::Endpoint > endpoint, RtpsUdpTransport &tport, bool &stop)
DCPS::EntityId_t writerId
Definition: RtpsCore.idl:576
void reset(const ACE_INET_Addr &remote_address, const RTPS::Header &hdr)
ACE_INLINE OpenDDS_Dcps_Export ACE_UINT32 uint32_fractional_seconds_to_nanoseconds(ACE_UINT32 fraction)
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
LM_DEBUG
NativeCryptoHandle DatawriterCryptoHandle
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
DCPS::GuidPrefix_t guidPrefix
Definition: RtpsCore.idl:658
ACE_CDR::ULong ULong
bool write_data(Serializer &ser) const
write the data payload to the Serializer
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
HeartBeatFragSubmessage hb_frag_sm
Definition: RtpsCore.idl:866
virtual ssize_t receive_bytes(iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
Only our subclass knows how to do this.
const long OPENDDS_EXCEPTION_CODE_NO_KEY
#define VDBG(DBG_ARGS)
void data_unavailable(const FragmentRange &transportSeqDropped)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
bool sec_submsg_to_octets(DDS::OctetSeq &encoded, const RTPS::Submessage &postfix)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
bool into_received_data_sample(ReceivedDataSample &rds)
NativeCryptoHandle DatareaderCryptoHandle
const octet FLAG_L
Definition: RtpsCore.idl:527
void free(void *ptr)
Return a chunk of memory back to free list cache.
virtual ACE_Message_Block * release(void)
ACE_Message_Block * block
Definition: Stun.h:241
bool align_r(size_t alignment)
Definition: Serializer.inl:813
unsigned long EndpointSecurityAttributesMask
Security::SecurityConfig_rch security_config() const
virtual void begin_transport_header_processing()
Begin Current Transport Header Processing.
ACE_Message_Block * cont(void) const
unsigned long fraction
Definition: RtpsCore.idl:93
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID
unsigned char peek(size_t offset) const
Retreive one byte of data from the payload.
#define ACE_HEX_DUMP(X)
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > receive_lock_
Locking strategy for the allocators.
const ReceivedDataSample * withhold_data_from(const GUID_t &sub_id)
char * wr_ptr(void) const
ACE_HANDLE get_handle(void) const
const unsigned long LOCATOR_PORT_INVALID
LM_WARNING
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
bool getDirectedWriteReaders(RepoIdSet &directedWriteReaders, const RTPS::DataSubmessage &ds) const
ACE_UINT32 ULong
OpenDDS_Dcps_Export void align(size_t &value, size_t by)
Align "value" by "by" if it&#39;s not already.
Definition: Serializer.inl:23
OPENDDS_STRING conv_
unsigned short fragmentsInSubmessage
Definition: RtpsCore.idl:695
sequence< Parameter > ParameterList
ACE_TEXT("TCP_Factory")
size_t space(void) const
std::pair< SequenceNumber, SequenceNumber > SequenceRange
void clear_completed(const GUID_t &pub_id)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
HeartBeatSubmessage heartbeat_sm
Definition: RtpsCore.idl:842
virtual ACE_HANDLE get_handle(void) const
OpenDDS_Dcps_Export LogLevel log_level
virtual bool reassemble(ReceivedDataSample &data)
InfoReplyIp4Submessage info_reply_ipv4_sm
Definition: RtpsCore.idl:854
FragmentNumber_t fragmentStartingNum
Definition: RtpsCore.idl:694
size_t data_length() const
total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks
const long OPENDDS_EXCEPTION_MINOR_CODE_NO_KEY
Sequence number abstraction. Only allows positive 64 bit values.
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
bool reassemble(const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags=0)
static const ACE_Time_Value zero
DCPS::RcHandle< ICE::Agent > get_ice_agent() const
#define VDBG_LVL(DBG_ARGS, LEVEL)
int get_addr_size(void) const
Adapt the TransportReceiveStrategy for RTPS&#39;s "sample" (submessage) Header.
ACE_CDR::Octet Octet
bool log_messages
Log all RTPS messages sent or recieved.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
bool set_intersect(SetA &sA, const SortedB &sB, LessThan lessThan)
Definition: Util.h:200
const MessageCountKind MCK_RTPS
CORBA::ULong get_gaps(const SequenceNumber &msg_seq, const GUID_t &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const
bool check_encoded(const EntityId_t &sender)
static bool payload_byte_order(const ReceivedDataSample &rds)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
NackFragSubmessage nack_frag_sm
Definition: RtpsCore.idl:863
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
InfoTimestampSubmessage info_ts_sm
Definition: RtpsCore.idl:848
void filterBestEffortReaders(const ReceivedDataSample &ds, RepoIdSet &selected, RepoIdSet &withheld)
sequence<<%SCOPED%><%TYPE%><%SEQ%> local interface<%TYPE%> out string encoded
Definition: IDLTemplate.txt:4
ACE_UINT32 fragment_size_
Fragment size used by this sample.
static bool separate_message(EntityId_t entity)
void append(ReceivedDataSample &suffix)
Update this ReceivedDataSample&#39;s data payload to include the suffix&#39;s data payload after any existing...
InfoReplySubmessage info_reply_sm
Definition: RtpsCore.idl:860
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
virtual bool check_header(const RtpsTransportHeader &header)
Check the transport header for suitability.
bool decode_payload(ReceivedDataSample &sample, const RTPS::DataSubmessage &submessage)
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
DataFragSubmessage data_frag_sm
Definition: RtpsCore.idl:872
typedef OPENDDS_VECTOR(SeqFragPair) FragmentInfo
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
DataSubmessage data_sm
Definition: RtpsCore.idl:869
LM_ERROR
void do_not_withhold_data_from(const GUID_t &sub_id)
virtual bool reassemble_i(ReceivedDataSample &data, RtpsSampleHeader &rsh)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:530
void received(const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
SubmessageSeq submessages
Definition: RtpsCore.idl:903
#define ACE_NOTSUP_RETURN(FAILVALUE)
OpenDDS_Dcps_Export SecurityDebug security_debug
Definition: debug.cpp:32
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
unsigned long seconds
Definition: RtpsCore.idl:92
DDS::Security::ParticipantSecurityAttributesMask security_attributes_to_bitmask(const DDS::Security::ParticipantSecurityAttributes &sec_attr)
Definition: MessageUtils.h:177
const octet FLAG_D
Definition: RtpsCore.idl:526
const ParameterId_t PID_DIRECTED_WRITE
Definition: RtpsCore.idl:302
virtual void deliver_sample(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
Called when there is a ReceivedDataSample to be delivered.