OpenDDS  Snapshot(2023/04/28-20:55)
DataSampleHeader.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "DataSampleHeader.h"
11 
12 #include "Serializer.h"
13 #include "GuidConverter.h"
15 #include "SafetyProfileStreams.h"
16 #ifndef OPENDDS_SAFETY_PROFILE
18 #endif
19 
20 #include <dds/DdsDcpsGuidTypeSupportImpl.h>
21 
22 #include <cstdio>
23 
24 #if !defined (__ACE_INLINE__)
25 #include "DataSampleHeader.inl"
26 #endif /* __ACE_INLINE__ */
27 
28 namespace {
29 
31  const Encoding::Kind encoding_kind = Encoding::KIND_UNALIGNED_CDR;
32 
33  bool mb_copy(char& dest, const ACE_Message_Block& mb, size_t offset, const Encoding&)
34  {
35  dest = mb.rd_ptr()[offset];
36  return true;
37  }
38 
39  template <typename T>
40  bool mb_copy(T& dest, const ACE_Message_Block& mb, size_t offset, const Encoding& encoding)
41  {
42  if (mb.length() >= sizeof(T)) {
43  // Avoid creating ACE_Message_Block from the heap if we just need one.
45  temp.rd_ptr(mb.rd_ptr()+offset);
46  temp.wr_ptr(mb.wr_ptr());
47  OpenDDS::DCPS::Serializer ser(&temp, encoding);
48  ser.buffer_read(reinterpret_cast<char*>(&dest), sizeof(T), ser.swap_bytes());
49  return ser.good_bit();
50  }
51 
53  if (!temp) { // couldn't allocate
54  return false;
55  }
56  temp->rd_ptr(offset);
57  if (temp->total_length() < sizeof(T)) {
58  return false;
59  }
60  OpenDDS::DCPS::Serializer ser(temp.get(), encoding);
61  ser.buffer_read(reinterpret_cast<char*>(&dest), sizeof(T), ser.swap_bytes());
62  return ser.good_bit();
63  }
64 
65  // Skip "offset" bytes from the mb and copy the subsequent data
66  // (sizeof(T) bytes) into dest. Return false if there is not enough data
67  // in the mb to complete the operation. Continuation pointers are followed.
68  template <typename T>
69  bool mb_peek(T& dest, const ACE_Message_Block& mb, size_t offset, const Encoding& encoding)
70  {
71  for (const ACE_Message_Block* iter = &mb; iter; iter = iter->cont()) {
72  const size_t len = iter->length();
73  if (len > offset) {
74  return mb_copy(dest, *iter, offset, encoding);
75  }
76  offset -= len;
77  }
78  return false;
79  }
80 }
81 
83 
84 namespace OpenDDS {
85 namespace DCPS {
86 
87 // Allocate a new message block using the allocators from an existing
88 // message block, "mb". Use of mb's data_allocator_ is optional.
91  size_t size, bool use_data_alloc)
92 {
93  enum { DATA, DB, MB, N_ALLOC };
94  ACE_Allocator* allocators[N_ALLOC];
95  // It's an ACE bug that access_allocators isn't const
96  ACE_Message_Block& mut_mb = const_cast<ACE_Message_Block&>(mb);
97  mut_mb.access_allocators(allocators[DATA], allocators[DB], allocators[MB]);
98  if (allocators[MB]) {
99  ACE_Message_Block* result;
100  ACE_NEW_MALLOC_RETURN(result,
101  static_cast<ACE_Message_Block*>(
102  allocators[MB]->malloc(sizeof(ACE_Message_Block))),
103  ACE_Message_Block(size,
105  0, // cont
106  0, // data
107  use_data_alloc ? allocators[DATA] : 0,
108  mut_mb.locking_strategy(), // locking_strategy
112  allocators[DB],
113  allocators[MB]),
114  0);
115  return result;
116  } else {
117  return new ACE_Message_Block(size,
119  0, // cont
120  0, // data
121  use_data_alloc ? allocators[DATA] : 0,
122  mut_mb.locking_strategy(), // locking_strategy
126  allocators[DB]);
127  }
128 }
129 
131 {
132  static const unsigned int LIFESPAN_MASK = mask_flag(LIFESPAN_DURATION_FLAG),
133  LIFESPAN_LENGTH = 8,
134  COHERENT_MASK = mask_flag(GROUP_COHERENT_FLAG),
135  COHERENT_LENGTH = 16,
136  CONTENT_FILT_MASK = mask_flag(CONTENT_FILTER_FLAG),
137  BYTE_ORDER_MASK = mask_flag(BYTE_ORDER_FLAG);
138 
139  const size_t len = mb.total_length();
140 
141  if (len <= FLAGS_OFFSET) return true;
142 
143  Encoding encoding(encoding_kind);
144  unsigned char msg_id;
145  if (!mb_peek(msg_id, mb, MESSAGE_ID_OFFSET, encoding)
146  || int(msg_id) >= MESSAGE_ID_MAX) {
147  // This check, and the similar one below for submessage id, are actually
148  // indicating an invalid header (and not a partial header) but we can
149  // treat it the same as partial for the sake of the TransportRecvStrategy.
150  return true;
151  }
152 
153  if (!mb_peek(msg_id, mb, SUBMESSAGE_ID_OFFSET, encoding)
154  || int(msg_id) >= SUBMESSAGE_ID_MAX) {
155  return true;
156  }
157 
158  char flags;
159  if (!mb_peek(flags, mb, FLAGS_OFFSET, encoding)) {
160  return true;
161  }
162 
163  size_t expected = get_max_serialized_size();
164  if (!(flags & LIFESPAN_MASK)) expected -= LIFESPAN_LENGTH;
165  if (!(flags & COHERENT_MASK)) expected -= COHERENT_LENGTH;
166 
167  if (flags & CONTENT_FILT_MASK) {
168  CORBA::ULong seqLen;
169  encoding.endianness(static_cast<Endianness>(flags & BYTE_ORDER_MASK));
170  if (!mb_peek(seqLen, mb, expected, encoding)) {
171  return true;
172  }
173  expected += int32_cdr_size + guid_cdr_size * seqLen;
174  }
175 
176  return len < expected;
177 }
178 
179 void
181 {
182  Encoding encoding(encoding_kind);
183  Serializer reader(buffer, encoding);
184  serialized_size_ = 0;
185 
186  // Only byte-sized reads until we get the byte_order_ flag.
187 
188  if (!(reader >> this->message_id_)) {
189  return;
190  }
192 
193  if (!(reader >> this->submessage_id_)) {
194  return;
195  }
197 
198  // Extract the flag values.
199  ACE_CDR::Octet byte;
200  if (!(reader >> ACE_InputCDR::to_octet(byte))) {
201  return;
202  }
204 
205  this->byte_order_ = byte & mask_flag(BYTE_ORDER_FLAG);
213 
214  // Set swap_bytes flag to the Serializer if data sample from
215  // the publisher is in different byte order.
216  reader.swap_bytes(this->byte_order_ != ACE_CDR_BYTE_ORDER);
217 
218  if (!(reader >> ACE_InputCDR::to_octet(byte))) {
219  return;
220  }
221  serialized_size_ += sizeof(byte);
223  this->key_fields_only_ = byte & mask_flag(KEY_ONLY_FLAG);
224 
225  if (!(reader >> this->message_length_)) {
226  return;
227  }
229 
230  if (!(reader >> this->sequence_)) {
231  return;
232  }
234 
235  if (!(reader >> this->source_timestamp_sec_)) {
236  return;
237  }
239 
240  if (!(reader >> this->source_timestamp_nanosec_)) {
241  return;
242  }
244 
245  if (this->lifespan_duration_) {
246  if (!(reader >> this->lifespan_duration_sec_)) {
247  return;
248  }
250 
251  if (!(reader >> this->lifespan_duration_nanosec_)) {
252  return;
253  }
255  }
256 
257  if (!(reader >> this->publication_id_)) {
258  return;
259  }
261 
262 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
263  if (this->group_coherent_) {
264  if (!(reader >> this->publisher_id_)) {
265  return;
266  }
268  }
269 #endif
270 
271  if (this->content_filter_) {
272  if (!(reader >> this->content_filter_entries_)) {
273  return;
274  }
276  }
277 }
278 
279 bool
281 {
282  Serializer writer(&buffer, encoding_kind, value.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
283 
284  writer << value.message_id_;
285  writer << value.submessage_id_;
286 
287  // Write the flags as a single byte.
288  ACE_CDR::Octet flags = (value.byte_order_ << BYTE_ORDER_FLAG)
296  ;
297  writer << ACE_OutputCDR::from_octet(flags);
298 
299  flags = (value.cdr_encapsulation_ << CDR_ENCAP_FLAG)
300  | (value.key_fields_only_ << KEY_ONLY_FLAG)
301  ;
302  writer << ACE_OutputCDR::from_octet(flags);
303 
304  writer << value.message_length_;
305  writer << value.sequence_;
306  writer << value.source_timestamp_sec_;
307  writer << value.source_timestamp_nanosec_;
308 
309  if (value.lifespan_duration_) {
310  writer << value.lifespan_duration_sec_;
311  writer << value.lifespan_duration_nanosec_;
312  }
313 
314  writer << value.publication_id_;
315 
316 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
317  if (value.group_coherent_) {
318  writer << value.publisher_id_;
319  }
320 #endif
321 
322  // content_filter_entries_ is deliberately not marshaled here.
323  // It's variable sized, so it won't fit into our pre-allocated data block.
324  // It may be customized per-datalink so it will be handled later with a
325  // a chained (continuation) ACE_Message_Block.
326 
327  return writer.good_bit();
328 }
329 
330 void
332 {
333  Encoding encoding(encoding_kind,
335  size_t size = 0;
336  if (guids) {
337  serialized_size(encoding, size, *guids);
338  } else {
339  size = int32_cdr_size;
340  }
341  ACE_Message_Block* optHdr = alloc_msgblock(*mb, size, false);
342 
343  Serializer ser(optHdr, encoding);
344  if (guids) {
345  ser << *guids;
346  } else {
347  ser << CORBA::ULong(0);
348  }
349 
350  // New chain: mb (DataSampleHeader), optHdr (GUIDSeq), data (Foo or control)
351  optHdr->cont(mb->cont());
352  mb->cont(optHdr);
353 }
354 
355 void
357  Message_Block_Ptr& head,
358  Message_Block_Ptr& tail)
359 {
360  if (!head) {
361  head.reset(orig.duplicate());
362  }
363 
364  ACE_Message_Block* frag = head.get();
365  size_t frag_remain = size;
366  for (; frag_remain > frag->length(); frag = frag->cont()) {
367  frag_remain -= frag->length();
368  }
369 
370  if (frag_remain == frag->length()) { // split at ACE_Message_Block boundary
371  tail.reset(frag->cont());
372  } else {
373  tail.reset(frag->duplicate());
374  frag->wr_ptr(frag->wr_ptr() - frag->length() + frag_remain);
376  tail->rd_ptr(frag_remain);
377  }
378  frag->cont(0);
379 }
380 
381 void
384 {
386  const Encoding encoding(encoding_kind);
387 
388  const size_t length = dup->total_length();
389  DataSampleHeader hdr(*dup); // deserialize entire header (with cfentries)
390  const size_t hdr_len = length - dup->total_length();
391  const size_t this_max_serialized_size = get_max_serialized_size();
392 
393  ACE_Message_Block* payload = dup.get();
394  //skip zero length message blocks
395  ACE_Message_Block* prev = 0;
396  for (; payload->length() == 0; payload = payload->cont()) {
397  prev = payload;
398  }
399  prev->cont(0);
400  Message_Block_Ptr payload_head(payload);
401 
402  if (size < hdr_len) { // need to fragment the content_filter_entries_
403  head.reset(alloc_msgblock(*dup, this_max_serialized_size, true));
404  hdr.more_fragments_ = true;
405  hdr.message_length_ = 0; // no room for payload data
406  *head << hdr;
407  const size_t avail = size - head->length() - 4 /* sequence length */;
408  const CORBA::ULong n_entries = static_cast<CORBA::ULong>(avail / guid_cdr_size);
409  GUIDSeq entries(n_entries);
410  entries.length(n_entries);
411  // remove from the end of hdr's entries (order doesn't matter)
412  for (CORBA::ULong i(0), x(hdr.content_filter_entries_.length());
413  i < n_entries; ++i) {
414  entries[i] = hdr.content_filter_entries_[--x];
415  hdr.content_filter_entries_.length(x);
416  }
417  add_cfentries(&entries, head.get());
418 
419  tail.reset(alloc_msgblock(*dup, this_max_serialized_size, true));
420  hdr.more_fragments_ = false;
421  hdr.content_filter_ = (hdr.content_filter_entries_.length() > 0);
422  hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length());
423  *tail << hdr;
424  tail->cont(payload_head.release());
425  if (hdr.content_filter_) {
426  add_cfentries(&hdr.content_filter_entries_, tail.get());
427  }
428  return;
429  }
430 
431  Message_Block_Ptr payload_tail;
432  split_payload(*payload, size - hdr_len, payload_head, payload_tail);
433 
434  hdr.more_fragments_ = true;
435  hdr.message_length_ = static_cast<ACE_UINT32>(payload_head->total_length());
436 
437  head.reset(alloc_msgblock(*dup, this_max_serialized_size, true));
438  *head << hdr;
439  head->cont(payload_head.release());
440  if (hdr.content_filter_) {
441  add_cfentries(&hdr.content_filter_entries_, head.get());
442  }
443 
444  hdr.more_fragments_ = false;
445  hdr.content_filter_ = false;
446  hdr.message_length_ = static_cast<ACE_UINT32>(payload_tail->total_length());
447 
448  tail.reset(alloc_msgblock(*dup, this_max_serialized_size, true));
449  *tail << hdr;
450  tail->cont(payload_tail.release());
451 }
452 
453 bool
455  const DataSampleHeader& second, DataSampleHeader& result)
456 {
457  if (!first.more_fragments_ || first.sequence_ != second.sequence_) {
458  return false;
459  }
460  result = second;
461  result.message_length_ += first.message_length_;
462  if (first.content_filter_) {
463  result.content_filter_ = true;
464  const CORBA::ULong entries = first.content_filter_entries_.length();
465  CORBA::ULong x = result.content_filter_entries_.length();
466  result.content_filter_entries_.length(x + entries);
467  for (CORBA::ULong i(entries); i > 0;) {
468  result.content_filter_entries_[x++] = first.content_filter_entries_[--i];
469  }
470  }
471  return true;
472 }
473 
475 {
476  switch (value) {
477  case SAMPLE_DATA:
478  return "SAMPLE_DATA";
480  return "DATAWRITER_LIVELINESS";
482  return "INSTANCE_REGISTRATION";
483  case UNREGISTER_INSTANCE:
484  return "UNREGISTER_INSTANCE";
485  case DISPOSE_INSTANCE:
486  return "DISPOSE_INSTANCE";
487  case GRACEFUL_DISCONNECT:
488  return "GRACEFUL_DISCONNECT";
489  case REQUEST_ACK:
490  return "REQUEST_ACK";
491  case SAMPLE_ACK:
492  return "SAMPLE_ACK";
494  return "END_COHERENT_CHANGES";
495  case TRANSPORT_CONTROL:
496  return "TRANSPORT_CONTROL";
498  return "DISPOSE_UNREGISTER_INSTANCE";
500  return "END_HISTORIC_SAMPLES";
501  default:
502  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: to_string(MessageId): ")
503  ACE_TEXT("%d is either invalid or not recognized.\n"),
504  value));
505  return "Invalid MessageId";
506  }
507 }
508 
510 {
511  switch (value) {
512  case SUBMESSAGE_NONE:
513  return "SUBMESSAGE_NONE";
514  case MULTICAST_SYN:
515  return "MULTICAST_SYN";
516  case MULTICAST_SYNACK:
517  return "MULTICAST_SYNACK";
518  case MULTICAST_NAK:
519  return "MULTICAST_NAK";
520  case MULTICAST_NAKACK:
521  return "MULTICAST_NAKACK";
522  default:
523  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: to_string(SubMessageId): ")
524  ACE_TEXT("%d is either invalid or not recognized.\n"),
525  value));
526  return "Invalid SubMessageId";
527  }
528 }
529 
531 {
532  OPENDDS_STRING ret;
533  if (value.submessage_id_ != SUBMESSAGE_NONE) {
534  ret += to_string(SubMessageId(value.submessage_id_));
535  ret += " 0x";
536  ret += to_dds_string(unsigned(value.submessage_id_), true);
537  ret += "), ";
538  } else {
539  ret += to_string(MessageId(value.message_id_));
540  ret += " (0x";
541  ret += to_dds_string(unsigned(value.message_id_), true);
542  ret += "), ";
543  }
544 
545  ret += "Length: ";
546  ret += to_dds_string(value.message_length_);
547  ret += ", ";
548 
549  ret += "Byte order: ";
550  ret += (value.byte_order_ == 1 ? "Little" : "Big");
551  ret += " Endian";
552 
553  if (value.message_id_ != TRANSPORT_CONTROL) {
554  ret += ", ";
555 
556  if (value.coherent_change_ == 1) ret += "Coherent, ";
557  if (value.historic_sample_ == 1) ret += "Historic, ";
558  if (value.lifespan_duration_ == 1) ret += "Lifespan, ";
559 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
560  if (value.group_coherent_ == 1) ret += "Group-Coherent, ";
561 #endif
562  if (value.content_filter_ == 1) ret += "Content-Filtered, ";
563  if (value.sequence_repair_ == 1) ret += "Sequence Repair, ";
564  if (value.more_fragments_ == 1) ret += "More Fragments, ";
565  if (value.cdr_encapsulation_ == 1) ret += "CDR Encapsulation, ";
566  if (value.key_fields_only_ == 1) ret += "Key Fields Only, ";
567 
568  ret += "Sequence: 0x";
569  ret += to_dds_string(unsigned(value.sequence_.getValue()), true);
570  ret += ", ";
571 
572  ret += "Timestamp: ";
573  ret += to_dds_string(value.source_timestamp_sec_);
574  ret += ".";
576  ret += ", ";
577 
578  if (value.lifespan_duration_) {
579  ret += "Lifespan: ";
581  ret += ".";
583  ret += ", ";
584  }
585 
586  ret += "Publication: " + LogGuid(value.publication_id_).conv_;
587 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
588  if (value.group_coherent_) {
589  ret += ", Publisher: " + LogGuid(value.publisher_id_).conv_;
590  }
591 #endif
592 
593  if (value.content_filter_) {
594  const CORBA::ULong len = value.content_filter_entries_.length();
595  ret += ", Content-Filter Entries (";
596  ret += to_dds_string(len);
597  ret += "): [";
598  for (CORBA::ULong i(0); i < len; ++i) {
599  ret += LogGuid(value.content_filter_entries_[i]).conv_ + ' ';
600  }
601  ret += ']';
602  }
603  }
604  return ret;
605 }
606 
607 #ifndef OPENDDS_SAFETY_PROFILE
608 /// Message Id enumeration insertion onto an ostream.
609 std::ostream& operator<<(std::ostream& os, const MessageId value)
610 {
611  os << to_string(value);
612  return os;
613 }
614 
615 /// Sub-Message Id enumeration insertion onto an ostream.
616 std::ostream& operator<<(std::ostream& os, const SubMessageId value)
617 {
618  os << to_string(value);
619  return os;
620 }
621 
622 /// Message header insertion onto an ostream.
623 extern OpenDDS_Dcps_Export
624 std::ostream& operator<<(std::ostream& str, const DataSampleHeader& value)
625 {
626  RestoreOutputStreamState stream_state(str);
627 
628  if (value.submessage_id_ != SUBMESSAGE_NONE) {
629  str << SubMessageId(value.submessage_id_)
630  << " (0x" << std::hex << std::setw(2) << std::setfill('0')
631  << unsigned(value.submessage_id_) << "), ";
632 
633  } else {
634  str << MessageId(value.message_id_)
635  << " (0x" << std::hex << std::setw(2) << std::setfill('0')
636  << unsigned(value.message_id_) << "), ";
637  }
638 
639  str << "Length: " << std::dec << value.message_length_ << ", ";
640 
641  str << "Byte order: " << (value.byte_order_ == 1 ? "Little" : "Big")
642  << " Endian";
643 
644  if (value.message_id_ != TRANSPORT_CONTROL) {
645  str << ", ";
646 
647  if (value.coherent_change_ == 1) str << "Coherent, ";
648  if (value.historic_sample_ == 1) str << "Historic, ";
649  if (value.lifespan_duration_ == 1) str << "Lifespan, ";
650 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
651  if (value.group_coherent_ == 1) str << "Group-Coherent, ";
652 #endif
653  if (value.content_filter_ == 1) str << "Content-Filtered, ";
654  if (value.sequence_repair_ == 1) str << "Sequence Repair, ";
655  if (value.more_fragments_ == 1) str << "More Fragments, ";
656  if (value.cdr_encapsulation_ == 1) str << "CDR Encapsulation, ";
657  if (value.key_fields_only_ == 1) str << "Key Fields Only, ";
658 
659  str << "Sequence: 0x" << std::hex << std::setw(4) << std::setfill('0')
660  << value.sequence_.getValue() << ", ";
661 
662  str << "Timestamp: " << std::dec << value.source_timestamp_sec_ << "."
663  << std::dec << value.source_timestamp_nanosec_ << ", ";
664 
665  if (value.lifespan_duration_) {
666  str << "Lifespan: " << std::dec << value.lifespan_duration_sec_ << "."
667  << std::dec << value.lifespan_duration_nanosec_ << ", ";
668  }
669 
670  str << "Publication: " << GuidConverter(value.publication_id_);
671 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
672  if (value.group_coherent_) {
673  str << ", Publisher: " << GuidConverter(value.publisher_id_);
674  }
675 #endif
676 
677  if (value.content_filter_) {
678  const CORBA::ULong len = value.content_filter_entries_.length();
679  str << ", Content-Filter Entries (" << len << "): [";
680  for (CORBA::ULong i(0); i < len; ++i) {
681  str << GuidConverter(value.content_filter_entries_[i]) << ' ';
682  }
683  str << ']';
684  }
685  }
686 
687  return str;
688 }
689 #endif //OPENDDS_SAFETY_PROFILE
690 
691 
692 bool
694 {
695  rds.header_ = *this;
696  return true;
697 }
698 
700 
701 } // namespace DCPS
702 } // namespace OpenDDS
703 
ACE_Byte Octet
DataSampleHeader header_
The demarshalled sample header.
#define ACE_ERROR(X)
const LogLevel::Value value
Definition: debug.cpp:61
static void add_cfentries(const GUIDSeq *guids, ACE_Message_Block *mb)
static const ACE_Time_Value max_time
char message_id_
The enum MessageId.
size_t length(void) const
ACE_Lock * locking_strategy(void)
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
String to_dds_string(unsigned short to_convert)
const size_t guid_cdr_size
Definition: GuidUtils.h:115
Endianness endianness() const
Definition: Serializer.inl:64
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
static ACE_Message_Block * alloc_msgblock(const ACE_Message_Block &mb, size_t size, bool use_data_alloc)
static void split(const ACE_Message_Block &orig, size_t size, Message_Block_Ptr &head, Message_Block_Ptr &tail)
#define ACE_CDR_BYTE_ORDER
void access_allocators(ACE_Allocator *&allocator_strategy, ACE_Allocator *&data_block_allocator, ACE_Allocator *&message_block_allocator)
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
char * rd_ptr(void) const
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
void * malloc(size_t)
MessageId
One byte message id (<256)
#define OPENDDS_STRING
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
ACE_CDR::ULong ULong
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
ACE_CDR::Boolean operator<<(Serializer &serializer, CoherentChangeControl &value)
Marshal/Insertion into a buffer.
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
Holds a data sample received by the transport.
void init(ACE_Message_Block *buffer)
Implement load from buffer.
virtual ACE_Message_Block * release(void)
ACE_Data_Block * data_block(void) const
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
size_t serialized_size_
Keep track of the amount of data read from a buffer.
size_t total_length(void) const
char * wr_ptr(void) const
bool more_fragments_
The current "Data Sample" needs reassembly before further processing.
static void split_payload(const ACE_Message_Block &orig, size_t size, Message_Block_Ptr &head, Message_Block_Ptr &tail)
OPENDDS_STRING conv_
const size_t int32_cdr_size
Definition: Serializer.h:95
static bool join(const DataSampleHeader &first, const DataSampleHeader &second, DataSampleHeader &result)
char submessage_id_
Implementation-specific sub-message Ids.
ACE_TEXT("TCP_Factory")
void swap_bytes(bool do_swap)
Definition: Serializer.inl:403
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
bool into_received_data_sample(ReceivedDataSample &rds)
static const ACE_Time_Value zero
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const char * to_string(MessageId value)
static bool partial(const ACE_Message_Block &mb)
Does the data in this mb constitute a partial Sample Header?
ACE_HANDLE dup(ACE_HANDLE handle)
void buffer_read(char *dest, size_t size, bool swap)
Read from the chain into a destination buffer.
Definition: Serializer.inl:305
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
const size_t byte_cdr_size
Definition: Serializer.h:90
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
static ACE_UINT8 mask_flag(DataSampleHeaderFlag flag)