OpenDDS  Snapshot(2023/04/28-20:55)
RtpsSampleHeader.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "RtpsSampleHeader.h"
9 
10 #include "RtpsUdpSendStrategy.h"
11 
13 #include <dds/DCPS/Serializer.h>
16 #include <dds/DCPS/Qos_Helper.h>
19 #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h>
24 
25 #include <cstring>
26 
27 #ifndef __ACE_INLINE__
28 # include "RtpsSampleHeader.inl"
29 #endif
30 
31 namespace {
32  const OpenDDS::RTPS::StatusInfo_t STATUS_INFO_REGISTER = { { 0, 0, 0, 0 } },
33  STATUS_INFO_DISPOSE = { { 0, 0, 0, 1 } },
34  STATUS_INFO_UNREGISTER = { { 0, 0, 0, 2 } },
35  STATUS_INFO_DISPOSE_UNREGISTER = { { 0, 0, 0, 3 } };
36 }
37 
39 
40 namespace OpenDDS {
41 namespace RTPS {
42 
43 #ifndef OPENDDS_SAFETY_PROFILE
44 inline bool
45 operator==(const StatusInfo_t& lhs, const StatusInfo_t& rhs)
46 {
47  return
48  (lhs.value[3] == rhs.value[3]) &&
49  (lhs.value[2] == rhs.value[2]) &&
50  (lhs.value[1] == rhs.value[1]) &&
51  (lhs.value[0] == rhs.value[0]);
52 }
53 #endif
54 
55 }
56 namespace DCPS {
57 
58 void
59 RtpsSampleHeader::init(ACE_Message_Block& mb)
60 {
61  using namespace OpenDDS::RTPS;
62 
63  // valid_ is false here, it will only be set to true if there is a Submessage
64 
65  // Manually grab the first two bytes for the SubmessageKind and the byte order
66  if (mb.length() == 0) {
67  return;
68  }
69 
70  const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr());
71 
72  ACE_CDR::Octet flags = 0;
73 
74  if (mb.length() > 1) {
75  flags = mb.rd_ptr()[1];
76  } else if (mb.cont() && mb.cont()->length() > 0) {
77  flags = mb.cont()->rd_ptr()[0];
78  } else {
79  return;
80  }
81 
82  const size_t starting_length = mb.total_length();
84  (flags & FLAG_E) ? ENDIAN_LITTLE : ENDIAN_BIG);
85 
86  ACE_CDR::UShort octetsToNextHeader = 0;
87 
88 #define CASE_SMKIND(kind, class, name) case kind: { \
89  class submessage; \
90  if (ser >> submessage) { \
91  octetsToNextHeader = submessage.smHeader.submessageLength; \
92  submessage_.name##_sm(submessage); \
93  valid_ = true; \
94  } \
95  break; \
96  }
97 
98  switch (kind) {
112 
113 #if defined(OPENDDS_SECURITY)
114  // Each submessage type introduced by the Security spec is treated
115  // as an opaque octet sequence at this layer.
116  case SEC_BODY:
117  case SEC_PREFIX:
118  case SEC_POSTFIX:
119  case SRTPS_PREFIX:
120  case SRTPS_POSTFIX: {
121  SecuritySubmessage submessage;
122  if (ser >> submessage) {
123  octetsToNextHeader = submessage.smHeader.submessageLength;
124  submessage_.security_sm(submessage);
125  submessage_._d(kind);
126  valid_ = true;
127  }
128  break;
129  }
130 #endif
131 
132  default:
133  {
134  SubmessageHeader submessage;
135  if (ser >> submessage) {
136  octetsToNextHeader = submessage.submessageLength;
137  submessage_.unknown_sm(submessage);
138  valid_ = true;
139  }
140  break;
141  }
142  }
143 #undef CASE_SMKIND
144 
145  if (valid_) {
146 
147  frag_ = (kind == DATA_FRAG);
148  data_ = (kind == DATA);
149 
150  // serialized_size_ is # of bytes of submessage we have read from "mb"
151  serialized_size_ = starting_length - mb.total_length();
152 
153  const ACE_CDR::UShort remaining = static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ);
154 
155  if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) {
156  // see RTPS v2.1 section 9.4.5.1.3
157  // In this case the current Submessage extends to the end of Message,
158  // so we will use the message_length_ that was set in pdu_remaining().
159  octetsToNextHeader = remaining;
160 
161  } else if (octetsToNextHeader > remaining) {
162  valid_ = false;
163  return;
164  }
165 
166  if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA)))
167  || kind == DATA_FRAG) {
168  // These Submessages have a payload which we haven't deserialized yet.
169  // The TransportReceiveStrategy will know this via message_length().
170  // octetsToNextHeader does not count the SubmessageHeader (4 bytes)
171  message_length_ = octetsToNextHeader + SMHDR_SZ - serialized_size_;
172  } else {
173  // These Submessages _could_ have extra data that we don't know about
174  // (from a newer minor version of the RTPS spec). Either way, indicate
175  // to the TransportReceiveStrategy that there is no data payload here.
176  message_length_ = 0;
177  ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(serialized_size_);
178  if (octetsToNextHeader + SMHDR_SZ > marshaled) {
179  valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled);
180  serialized_size_ = octetsToNextHeader + SMHDR_SZ;
181  }
182  }
183  }
184 }
185 
186 void
187 RtpsSampleHeader::process_iqos(DataSampleHeader& opendds,
188  const RTPS::ParameterList& iqos)
189 {
190  using namespace OpenDDS::RTPS;
191 #if defined(OPENDDS_TEST_INLINE_QOS)
192  OPENDDS_STRING output("into_received_data_sample(): ");
193  output += to_dds_string(iqos.length());
194  output += " inline QoS parameters\n";
195  for (CORBA::ULong index = 0; index < iqos.length(); ++index) {
196  output += " parameter type = ";
197  output += to_dds_string(iqos[index]._d());
198  output += "\n";
199  }
200  ACE_DEBUG((LM_DEBUG, "%C", output.c_str()));
201 #endif
202  for (CORBA::ULong i = 0; i < iqos.length(); ++i) {
203  if (iqos[i]._d() == PID_STATUS_INFO) {
204  if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
205  opendds.message_id_ = DISPOSE_INSTANCE;
206  } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
208  } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
210  } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
212  }
213  } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) {
214  opendds.historic_sample_ = true;
215 #if defined(OPENDDS_TEST_INLINE_QOS)
216  } else if (iqos[i]._d() == PID_TOPIC_NAME) {
217  ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data()));
218  } else if (iqos[i]._d() == PID_PRESENTATION) {
219  DDS::PresentationQosPolicy pres_qos = iqos[i].presentation();
220  ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, "
221  "coherent_access = %d, ordered_access = %d\n",
222  pres_qos.access_scope, pres_qos.coherent_access,
223  pres_qos.ordered_access));
224  } else if (iqos[i]._d() == PID_PARTITION) {
225  DDS::PartitionQosPolicy part_qos = iqos[i].partition();
226  ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length()));
227  for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) {
228  ACE_DEBUG((LM_DEBUG, "'%C' ", part_qos.name[j].in()));
229  }
230  ACE_DEBUG((LM_DEBUG, "\n"));
231 #endif
232  }
233  }
234 }
235 
236 bool
237 RtpsSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
238 {
239  using namespace OpenDDS::RTPS;
240  DataSampleHeader& opendds = rds.header_;
241 
242  switch (submessage_._d()) {
243  case DATA: {
244  const DataSubmessage& rtps = submessage_.data_sm();
245  opendds.cdr_encapsulation_ = true;
246  opendds.message_length_ = message_length();
247  opendds.sequence_ = to_opendds_seqnum(rtps.writerSN);
248  opendds.publication_id_.entityId = rtps.writerId;
249  opendds.message_id_ = SAMPLE_DATA;
250 
251  process_iqos(opendds, rtps.inlineQos);
252 
253  if (rtps.smHeader.flags & FLAG_K_IN_DATA) {
254  opendds.key_fields_only_ = true;
255  } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) {
256  // Interoperability note: the Key may be hiding in the "key hash" param
257  // in the InlineQos. In order to make use of this Key, it mst be 16
258  // bytes or less. We have observed other DDS implementations only send
259  // the MD5 hash of a >16 byte key, so we must limit this to Built-in
260  // endpoints which are assumed to use GUIDs as keys.
261  if ((rtps.writerId.entityKind & 0xC0) == 0xC0 // Only Built-in endpoints
262  && (rtps.smHeader.flags & FLAG_Q) && !rds.has_data()) {
263  for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) {
264  if (rtps.inlineQos[i]._d() == PID_KEY_HASH) {
265  // CDR_BE encapsulation scheme (endianness is not used for key hash)
266  rds.replace("\x00\x00\x00\x00", EncapsulationHeader::serialized_size);
267  const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value;
268  rds.append(reinterpret_cast<const char*>(data), sizeof(DDS::OctetArray16));
270  opendds.key_fields_only_ = true;
271  if (Transport_debug_level) {
273  "(%P|%t) RtpsSampleHeader::into_received_data_sample()"
274  " - used KeyHash data as the key-only payload\n"));
275  }
276  break;
277  }
278  }
279  } else {
280  // FUTURE: Handle the case of D = 0 and K = 0
281  // used for Coherent Sets in PRESENTATION QoS (see 8.7.5)
282  if (Transport_debug_level) {
284  "(%P|%t) RtpsSampleHeader::into_received_data_sample() - "
285  "Received a DATA Submessage with D = 0 and K = 0, "
286  "dropping\n"));
287  }
288  return false;
289  }
290  }
291 
292  if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
293  // TODO(iguessthislldo: Convert to use Encoding
294  // Peek at the byte order from the encapsulation containing the payload.
295  opendds.byte_order_ = payload_byte_order(rds);
296  }
297 
298  break;
299  }
300  case DATA_FRAG: {
301  const DataFragSubmessage& rtps = submessage_.data_frag_sm();
302  opendds.cdr_encapsulation_ = true;
303  opendds.message_length_ = message_length();
304  opendds.sequence_ = to_opendds_seqnum(rtps.writerSN);
305  opendds.publication_id_.entityId = rtps.writerId;
306  opendds.message_id_ = SAMPLE_DATA;
307  opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG);
308  // opendds.byte_order_ set in RtpsUdpReceiveStrategy::reassemble().
309 
310  process_iqos(opendds, rtps.inlineQos);
311 
312  const CORBA::ULong lastFragInSubmsg =
314  if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) {
315  opendds.more_fragments_ = true;
316  }
317  break;
318  }
319  default:
320  break;
321  }
322 
323  return true;
324 }
325 
326 bool RtpsSampleHeader::payload_byte_order(const ReceivedDataSample& rds)
327 {
328  return rds.peek(1) & RTPS::FLAG_E;
329 }
330 
331 namespace {
332  void add_timestamp(RTPS::SubmessageSeq& subm, ACE_CDR::Octet flags,
333  const DataSampleHeader& header)
334  {
335  using namespace OpenDDS::RTPS;
336  const DDS::Time_t st = {header.source_timestamp_sec_,
338  const InfoTimestampSubmessage ts = {
339  {INFO_TS, flags, INFO_TS_SZ},
340  {static_cast<ACE_UINT32>(st.sec), DCPS::nanoseconds_to_uint32_fractional_seconds(st.nanosec)}
341  };
342  const CORBA::ULong i = DCPS::grow(subm) - 1;
343  subm[i].info_ts_sm(ts);
344  }
345 }
346 
347 void
348 RtpsSampleHeader::populate_data_sample_submessages(
349  RTPS::SubmessageSeq& subm,
350  const DataSampleElement& dsle,
351  bool requires_inline_qos)
352 {
353  using namespace OpenDDS::RTPS;
354 
355  const ACE_CDR::Octet flags = dsle.get_header().byte_order_;
356  add_timestamp(subm, flags, dsle.get_header());
357  CORBA::ULong i = subm.length();
358 
359  EntityId_t readerId = ENTITYID_UNKNOWN;
360  if (dsle.get_num_subs() == 1) {
361  readerId = dsle.get_sub_id(0).entityId;
364  idest.smHeader.flags = flags;
366  std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix,
367  sizeof(GuidPrefix_t));
368  i = DCPS::grow(subm);
369  subm[i - 1].info_dst_sm(idest);
370  } else {
371  //Not durability resend, but could have inline gaps
372  for (CORBA::ULong x = 0; x < i; ++x) {
373  if (subm[x]._d() == INFO_DST) {
374  //Need to add INFO_DST
377  idest.smHeader.flags = flags;
379  std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN,
380  sizeof(GuidPrefix_t));
381  i = DCPS::grow(subm);
382  subm[i - 1].info_dst_sm(idest);
383  break;
384  }
385  }
386  }
387 
388  DataSubmessage data = {
389  {DATA, flags, 0},
390  0,
392  readerId,
393  dsle.get_pub_id().entityId,
395  ParameterList()
396  };
397  const char message_id = dsle.get_header().message_id_;
398  switch (message_id) {
399  case SAMPLE_DATA:
400  // Must be a data message
401  data.smHeader.flags |= FLAG_D;
402  break;
403  default:
404  ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): "
405  "Non-sample messages seen, message_id = %d\n", int(message_id)));
406  break;
407  }
408 
409  if (requires_inline_qos) {
412 
413  populate_inline_qos(qos_data, data.inlineQos);
414  }
415 
416  if (data.inlineQos.length() > 0) {
417  data.smHeader.flags |= FLAG_Q;
418  }
419 
420  i = DCPS::grow(subm);
421  subm[i - 1].data_sm(data);
422 }
423 
424 namespace {
425  // Interoperability note:
426  // used for discovery (SEDP) only, this helps interoperability because they
427  // spec is unclear about the use of PID_KEY_HASH vs. the key as the payload
428  void add_key_hash(RTPS::ParameterList& plist, const ACE_Message_Block* data)
429  {
430  RTPS::KeyHash_t kh;
431  static const size_t offset = 8 /*skip encap (4) and plist hdr (4)*/;
432  std::memcpy(kh.value, data->rd_ptr() + offset, sizeof(GUID_t));
433  RTPS::Parameter p;
434  p.key_hash(kh);
435  DCPS::push_back(plist, p);
436  }
437 }
438 
439 void
440 RtpsSampleHeader::populate_data_control_submessages(
442  const TransportSendControlElement& tsce,
443  bool requires_inline_qos)
444 {
445  using namespace OpenDDS::RTPS;
446 
447  const DataSampleHeader& header = tsce.header();
448  const ACE_CDR::Octet flags = header.byte_order_;
449  add_timestamp(subm, flags, header);
450 
451  static const CORBA::Octet BUILT_IN_WRITER = 0xC2;
452 
453  DataSubmessage data = {
454  {DATA, flags, 0},
455  0,
458  header.publication_id_.entityId,
460  ParameterList()
461  };
462  switch (header.message_id_) {
463  case INSTANCE_REGISTRATION: {
464  // NOTE: The RTPS spec is not entirely clear about instance registration.
465  // We have decided to send a DATA Submessage containing the key and an
466  // inlineQoS StatusInfo of zero.
467  data.smHeader.flags |= FLAG_K_IN_DATA;
468  const int qos_len = DCPS::grow(data.inlineQos) - 1;
469  data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
470  break;
471  }
472  case UNREGISTER_INSTANCE: {
473  data.smHeader.flags |= FLAG_K_IN_DATA;
474  const int qos_len = data.inlineQos.length();
475  data.inlineQos.length(qos_len+1);
476  data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
477  if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
478  add_key_hash(data.inlineQos, tsce.msg_payload());
479  }
480  break;
481  }
482  case DISPOSE_INSTANCE: {
483  data.smHeader.flags |= FLAG_K_IN_DATA;
484  const int qos_len = DCPS::grow(data.inlineQos) - 1;
485  data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
486  if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
487  add_key_hash(data.inlineQos, tsce.msg_payload());
488  }
489  break;
490  }
492  data.smHeader.flags |= FLAG_K_IN_DATA;
493  const int qos_len = DCPS::grow(data.inlineQos) - 1;
494  data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
495  if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
496  add_key_hash(data.inlineQos, tsce.msg_payload());
497  }
498  break;
499  }
500  // update control_message_supported() when adding new cases here
501  default:
503  "RtpsSampleHeader::populate_data_control_submessages(): "
504  "Non-sample messages seen, message_id = %d\n",
505  header.message_id_));
506  break;
507  }
508 
509  if (requires_inline_qos) {
511  tsce.listener()->retrieve_inline_qos_data(qos_data);
512 
513  populate_inline_qos(qos_data, data.inlineQos);
514  }
515 
516  if (data.inlineQos.length() > 0) {
517  data.smHeader.flags |= FLAG_Q;
518  }
519 
520  CORBA::ULong idx = DCPS::grow(subm) - 1;
521  subm[idx].data_sm(data);
522 }
523 
524 #define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS) \
525  if (WRITER_QOS.QOS_NAME != DEFAULT_QOS.QOS_NAME) { \
526  const int idx = DCPS::grow(plist) - 1; \
527  plist[idx].QOS_NAME(WRITER_QOS.QOS_NAME); \
528  }
529 
530 void
531 RtpsSampleHeader::populate_inline_qos(
532  const TransportSendListener::InlineQosData& qos_data,
533  RTPS::ParameterList& plist)
534 {
535  using namespace OpenDDS::RTPS;
536 
537  // Always include topic name (per the spec)
538  {
539  const int idx = DCPS::grow(plist) - 1;
540  plist[idx].string_data(qos_data.topic_name.c_str());
541  plist[idx]._d(PID_TOPIC_NAME);
542  }
543 
544  // Conditionally include other QoS inline when the differ from the
545  // default value.
546  DDS::PublisherQos default_pub_qos =
547  TheServiceParticipant->initial_PublisherQos();
548  PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos);
549  PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos);
550 
551  DDS::DataWriterQos default_dw_qos =
552  TheServiceParticipant->initial_DataWriterQos();
553  PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos);
554  PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos);
555  PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos);
556  PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos);
557 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
558  PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos);
559 #endif
560  PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos);
561  if (qos_data.dw_qos.reliability != default_dw_qos.reliability) {
562  const int idx = DCPS::grow(plist) - 1;
563 
564  ReliabilityQosPolicyRtps reliability;
565  reliability.max_blocking_time = qos_data.dw_qos.reliability.max_blocking_time;
566 
568  reliability.kind.value = RTPS::BEST_EFFORT;
569  } else { // default to RELIABLE for writers
570  reliability.kind.value = RTPS::RELIABLE;
571  }
572 
573  plist[idx].reliability(reliability);
574  }
575  PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos);
576  PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos);
577  PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos);
578 }
579 
580 #undef PROCESS_INLINE_QOS
581 
582 // simple marshaling helpers for RtpsSampleHeader::split()
583 namespace {
584  void write(const Message_Block_Ptr& mb, ACE_CDR::UShort s, bool swap_bytes)
585  {
586  const char* ps = reinterpret_cast<const char*>(&s);
587  if (swap_bytes) {
588  ACE_CDR::swap_2(ps, mb->wr_ptr());
589  mb->wr_ptr(2);
590  } else {
591  mb->copy(ps, 2);
592  }
593  }
594 
595  void write(const Message_Block_Ptr& mb, ACE_CDR::ULong i, bool swap_bytes)
596  {
597  const char* pi = reinterpret_cast<const char*>(&i);
598  if (swap_bytes) {
599  ACE_CDR::swap_4(pi, mb->wr_ptr());
600  mb->wr_ptr(4);
601  } else {
602  mb->copy(pi, 4);
603  }
604  }
605 
606  // read without advancing rd_ptr()
607  void peek(ACE_CDR::UShort& target, const char* src, bool swap_bytes)
608  {
609  if (swap_bytes) {
610  ACE_CDR::swap_2(src, reinterpret_cast<char*>(&target));
611  } else {
612  std::memcpy(&target, src, sizeof(ACE_CDR::UShort));
613  }
614  }
615 
616  void peek(ACE_CDR::ULong& target, const char* src, bool swap_bytes)
617  {
618  if (swap_bytes) {
619  ACE_CDR::swap_4(src, reinterpret_cast<char*>(&target));
620  } else {
621  std::memcpy(&target, src, sizeof(ACE_CDR::ULong));
622  }
623  }
624 
625  const size_t FRAG_START_OFFSET = 24, FRAG_SAMPLE_SIZE_OFFSET = 32;
626 }
627 
629 RtpsSampleHeader::split(const ACE_Message_Block& orig, size_t size,
631 {
632  using namespace RTPS;
633  size_t data_offset = 0;
634  const char* rd = orig.rd_ptr();
635  ACE_CDR::ULong starting_frag, sample_size;
636  ACE_CDR::Octet flags;
637  bool swap_bytes;
638 
639  // Find the start of the DATA | DATA_FRAG submessage in the orig msg block.
640  // The submessages from the start of the msg block to this point (data_offset)
641  // will be copied to both the head and tail fragments.
642  while (true) {
643  flags = rd[data_offset + 1];
644  swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E);
645  bool found_data = false;
646 
647  switch (rd[data_offset]) {
648  case DATA:
649  if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) {
650  if (Transport_debug_level) {
651  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
652  "attempting to fragment a Data submessage with no payload.\n"));
653  }
654  return unknown_sequence_range;
655  }
656  found_data = true;
657  starting_frag = 1;
658  sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length());
659  break;
660  case DATA_FRAG:
661  found_data = true;
662  peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
663  peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
664  break;
665  }
666 
667  if (found_data) {
668  break;
669  }
670 
671  // Scan for next submessage in orig
672  ACE_CDR::UShort octetsToNextHeader;
673  peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
674 
675  data_offset += octetsToNextHeader + SMHDR_SZ;
676  if (data_offset >= orig.length()) {
677  if (Transport_debug_level) {
678  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
679  "invalid octetsToNextHeader encountered while fragmenting.\n"));
680  }
681  return unknown_sequence_range;
682  }
683  }
684 
685  // Create the "head" message block (of size "sz") containing DATA_FRAG
686  size_t sz = orig.length();
687  ACE_CDR::Octet new_flags = flags;
688  size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS;
689  if (rd[data_offset] == DATA) {
690  sz += 12; // DATA_FRAG is 12 bytes larger than DATA
691  iqos_offset -= 12;
692  new_flags = flags & (FLAG_E | FLAG_Q);
693  if (flags & FLAG_K_IN_DATA) {
694  new_flags |= FLAG_K_IN_FRAG;
695  }
696  if (flags & FLAG_N_IN_DATA) {
697  new_flags |= FLAG_N_IN_FRAG;
698  }
699 
700  }
701  head.reset(DataSampleHeader::alloc_msgblock(orig, sz, false));
702 
703  head->copy(rd, data_offset);
704 
705  head->wr_ptr()[0] = DATA_FRAG;
706  head->wr_ptr()[1] = new_flags;
707  head->wr_ptr(2);
708 
709  std::memset(head->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
710  head->wr_ptr(4);
711 
712  write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
713 
714  head->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
715 
716  write(head, starting_frag, swap_bytes);
717  const size_t max_data = size - sz, orig_payload = orig.cont()->total_length();
718  const ACE_CDR::UShort frags =
719  static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
720  if (frags == 0) {
721  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsSampleHeader::split: "
722  "Number of Fragments is Zero: min(%B, %B) / FRAG_SIZE\n",
723  max_data, orig_payload));
724  return unknown_sequence_range;
725  }
726  write(head, frags, swap_bytes);
727  write(head, FRAG_SIZE, swap_bytes);
728  write(head, sample_size, swap_bytes);
729 
730  if (flags & FLAG_Q) {
731  head->copy(rd + iqos_offset, orig.length() - iqos_offset);
732  }
733 
734  // Create the "tail" message block containing DATA_FRAG with Q=0
735  tail.reset(DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false));
736 
737  tail->copy(rd, data_offset);
738 
739  tail->wr_ptr()[0] = DATA_FRAG;
740  tail->wr_ptr()[1] = new_flags & ~FLAG_Q;
741  tail->wr_ptr(2);
742 
743  std::memset(tail->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
744  tail->wr_ptr(4);
745 
746  write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
747  tail->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
748 
749  write(tail, starting_frag + frags, swap_bytes);
750  const size_t tail_data = orig_payload - frags * FRAG_SIZE;
751  const ACE_CDR::UShort tail_frags =
752  static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE);
753  write(tail, tail_frags, swap_bytes);
754  write(tail, FRAG_SIZE, swap_bytes);
755  write(tail, sample_size, swap_bytes);
756 
757  Message_Block_Ptr payload_head;
758  Message_Block_Ptr payload_tail;
759  DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE,
760  payload_head, payload_tail);
761  head->cont(payload_head.release());
762  tail->cont(payload_tail.release());
763 
764  return SequenceRange(starting_frag + frags - 1,
765  starting_frag + frags + tail_frags - 1);
766 }
767 
768 }
769 }
770 
ACE_Byte Octet
DataSampleHeader header_
The demarshalled sample header.
sequence< Submessage > SubmessageSeq
Definition: RtpsCore.idl:885
const octet FLAG_N_IN_FRAG
Definition: RtpsCore.idl:531
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
const ACE_CDR::UShort DATA_FRAG_OCTETS_TO_IQOS
Definition: MessageTypes.h:103
#define CASE_SMKIND(kind, class, name)
const DataSampleHeader & get_header() const
const octet FLAG_E
Definition: RtpsCore.idl:521
char message_id_
The enum MessageId.
TransportSendListener * get_send_listener() const
ReliabilityQosPolicy reliability
size_t length(void) const
SubmessageHeader smHeader
Definition: RtpsCore.idl:667
bool has_data() const
true if at least one Data Block is stored (even if it has 0 useable bytes)
LM_INFO
const octet FLAG_K_IN_FRAG
Definition: RtpsCore.idl:528
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
void replace(const char *data, size_t size)
Replace all payload bytes with passed-in data Based on the ACE_Message_Block(const char*...
bool skip(size_t n, int size=1)
Definition: Serializer.inl:443
const ParameterId_t PID_KEY_HASH
Definition: RtpsCore.idl:295
const short BEST_EFFORT
Definition: RtpsCore.idl:142
String to_dds_string(unsigned short to_convert)
const ParameterId_t PID_PARTITION
Definition: RtpsCore.idl:273
PresentationQosPolicyAccessScopeKind access_scope
const ACE_CDR::UShort INFO_DST_SZ
Definition: MessageTypes.h:108
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
Definition: MessageTypes.h:102
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
#define ACE_CDR_BYTE_ORDER
const TransportSendListener * listener() const
const octet FLAG_N_IN_DATA
Definition: RtpsCore.idl:532
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
char * rd_ptr(void) const
OpenDDS::internal::special_serialization typedef sequence< Parameter > ParameterList
Definition: RtpsCore.idl:236
const ACE_CDR::UShort INFO_TS_SZ
Definition: MessageTypes.h:109
OpenDDS_Dcps_Export const SequenceRange unknown_sequence_range
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
const octet FLAG_Q
Definition: RtpsCore.idl:522
#define OPENDDS_STRING
int copy(const char *buf, size_t n)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
LM_DEBUG
const ParameterId_t PID_STATUS_INFO
Definition: RtpsCore.idl:296
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
OpenDDS::DCPS::GUID_t get_sub_id(CORBA::ULong index) const
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
SequenceNumber_t writerSN
Definition: RtpsCore.idl:675
Holds a data sample received by the transport.
bool operator==(const Duration_t &x, const Duration_t &y)
Definition: MessageUtils.h:109
ACE_INLINE OpenDDS_Dcps_Export ACE_UINT32 nanoseconds_to_uint32_fractional_seconds(ACE_UINT32 fraction)
ACE_UINT16 UShort
virtual const ACE_Message_Block * msg_payload() const
The marshalled payload only (sample data)
static void swap_2(char const *orig, char *target)
ACE_Message_Block * cont(void) const
const short RELIABLE
Definition: RtpsCore.idl:143
unsigned char peek(size_t offset) const
Retreive one byte of data from the payload.
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
DCPS::EntityId_t writerId
Definition: RtpsCore.idl:674
size_t total_length(void) const
char * wr_ptr(void) const
LM_WARNING
bool more_fragments_
The current "Data Sample" needs reassembly before further processing.
ACE_UINT32 ULong
unsigned short fragmentsInSubmessage
Definition: RtpsCore.idl:695
const ParameterId_t PID_PRESENTATION
Definition: RtpsCore.idl:272
const ParameterId_t PID_TOPIC_NAME
Definition: RtpsCore.idl:256
std::pair< SequenceNumber, SequenceNumber > SequenceRange
unsigned long nanosec
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
virtual void retrieve_inline_qos_data(InlineQosData &qos_data) const
FragmentNumber_t fragmentStartingNum
Definition: RtpsCore.idl:694
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
ACE_CDR::Octet Octet
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS)
::DDS::ReturnCode_t write(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t handle)
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
static void swap_4(char const *orig, char *target)
void append(ReceivedDataSample &suffix)
Update this ReceivedDataSample&#39;s data payload to include the suffix&#39;s data payload after any existing...
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:530
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
octet OctetArray16[16]
const ParameterId_t PID_ORIGINAL_WRITER_INFO
Definition: RtpsCore.idl:303
const octet FLAG_D
Definition: RtpsCore.idl:526
DDS::OctetArray16 value
Definition: RtpsCore.idl:151