OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Static Public Member Functions | Public Attributes | Static Public Attributes | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::RtpsSampleHeader Class Reference

Adapt the TransportReceiveStrategy for RTPS's "sample" (submessage) Header. More...

#include <RtpsSampleHeader.h>

Collaboration diagram for OpenDDS::DCPS::RtpsSampleHeader:
Collaboration graph
[legend]

Public Member Functions

 RtpsSampleHeader ()
 
 RtpsSampleHeader (ACE_Message_Block &mb)
 
RtpsSampleHeaderoperator= (ACE_Message_Block &mn)
 
void pdu_remaining (size_t size)
 
size_t get_serialized_size ()
 
ACE_UINT32 message_length ()
 
bool valid () const
 
bool into_received_data_sample (ReceivedDataSample &rds)
 
bool more_fragments () const
 

Static Public Member Functions

static size_t get_max_serialized_size ()
 
static bool partial (const ACE_Message_Block &)
 
static SequenceRange split (const ACE_Message_Block &orig, size_t size, Message_Block_Ptr &head, Message_Block_Ptr &tail)
 
static bool payload_byte_order (const ReceivedDataSample &rds)
 
static void populate_data_sample_submessages (RTPS::SubmessageSeq &subm, const DataSampleElement &dsle, bool requires_inline_qos)
 
static void populate_data_control_submessages (RTPS::SubmessageSeq &subm, const TransportSendControlElement &tsce, bool requires_inline_qos)
 
static void populate_inline_qos (const TransportSendListener::InlineQosData &qos_data, RTPS::ParameterList &plist)
 
static bool control_message_supported (char message_id)
 

Public Attributes

RTPS::Submessage submessage_
 

Static Public Attributes

static const ACE_CDR::UShort FRAG_SIZE = 1024
 

Private Member Functions

void init (ACE_Message_Block &mb)
 

Static Private Member Functions

static void process_iqos (DataSampleHeader &opendds, const OpenDDS::RTPS::ParameterList &iqos)
 

Private Attributes

bool valid_
 
bool frag_
 
bool data_
 
size_t serialized_size_
 
size_t message_length_
 

Detailed Description

Adapt the TransportReceiveStrategy for RTPS's "sample" (submessage) Header.

Definition at line 32 of file RtpsSampleHeader.h.

Constructor & Destructor Documentation

◆ RtpsSampleHeader() [1/2]

ACE_INLINE OpenDDS::DCPS::RtpsSampleHeader::RtpsSampleHeader ( )

◆ RtpsSampleHeader() [2/2]

ACE_INLINE OpenDDS::DCPS::RtpsSampleHeader::RtpsSampleHeader ( ACE_Message_Block mb)
explicit

Definition at line 24 of file RtpsSampleHeader.inl.

References ACE_INLINE, and init().

Member Function Documentation

◆ control_message_supported()

ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::control_message_supported ( char  message_id)
static

◆ get_max_serialized_size()

static size_t OpenDDS::DCPS::RtpsSampleHeader::get_max_serialized_size ( )
inlinestatic

Definition at line 39 of file RtpsSampleHeader.h.

39 { return 4; }

◆ get_serialized_size()

ACE_INLINE size_t OpenDDS::DCPS::RtpsSampleHeader::get_serialized_size ( )

Definition at line 59 of file RtpsSampleHeader.inl.

References ACE_INLINE, and serialized_size_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input().

60 {
61  return serialized_size_;
62 }

◆ init()

void OpenDDS::DCPS::RtpsSampleHeader::init ( ACE_Message_Block mb)
private

Definition at line 59 of file RtpsSampleHeader.cpp.

References OpenDDS::RTPS::ACKNACK, CASE_SMKIND, ACE_Message_Block::cont(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::DCPS::ENDIAN_BIG, OpenDDS::DCPS::ENDIAN_LITTLE, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::GAP, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::HEARTBEAT_FRAG, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_REPLY, OpenDDS::RTPS::INFO_REPLY_IP4, OpenDDS::RTPS::INFO_SRC, OpenDDS::RTPS::INFO_TS, OpenDDS::DCPS::Encoding::KIND_XCDR1, ACE_Message_Block::length(), OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::PAD, ACE_Message_Block::rd_ptr(), OpenDDS::RTPS::SEC_BODY, OpenDDS::RTPS::SEC_POSTFIX, OpenDDS::RTPS::SEC_PREFIX, OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::SecuritySubmessage::smHeader, OpenDDS::RTPS::SRTPS_POSTFIX, OpenDDS::RTPS::SRTPS_PREFIX, OpenDDS::RTPS::SubmessageHeader::submessageLength, and ACE_Message_Block::total_length().

Referenced by operator=(), and RtpsSampleHeader().

60 {
61  using namespace OpenDDS::RTPS;
62 
63  // valid_ is false here, it will only be set to true if there is a Submessage
64 
65  // Manually grab the first two bytes for the SubmessageKind and the byte order
66  if (mb.length() == 0) {
67  return;
68  }
69 
70  const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr());
71 
72  ACE_CDR::Octet flags = 0;
73 
74  if (mb.length() > 1) {
75  flags = mb.rd_ptr()[1];
76  } else if (mb.cont() && mb.cont()->length() > 0) {
77  flags = mb.cont()->rd_ptr()[0];
78  } else {
79  return;
80  }
81 
82  const size_t starting_length = mb.total_length();
83  Serializer ser(&mb, Encoding::KIND_XCDR1,
84  (flags & FLAG_E) ? ENDIAN_LITTLE : ENDIAN_BIG);
85 
86  ACE_CDR::UShort octetsToNextHeader = 0;
87 
88 #define CASE_SMKIND(kind, class, name) case kind: { \
89  class submessage; \
90  if (ser >> submessage) { \
91  octetsToNextHeader = submessage.smHeader.submessageLength; \
92  submessage_.name##_sm(submessage); \
93  valid_ = true; \
94  } \
95  break; \
96  }
97 
98  switch (kind) {
112 
113 #if defined(OPENDDS_SECURITY)
114  // Each submessage type introduced by the Security spec is treated
115  // as an opaque octet sequence at this layer.
116  case SEC_BODY:
117  case SEC_PREFIX:
118  case SEC_POSTFIX:
119  case SRTPS_PREFIX:
120  case SRTPS_POSTFIX: {
121  SecuritySubmessage submessage;
122  if (ser >> submessage) {
123  octetsToNextHeader = submessage.smHeader.submessageLength;
124  submessage_.security_sm(submessage);
125  submessage_._d(kind);
126  valid_ = true;
127  }
128  break;
129  }
130 #endif
131 
132  default:
133  {
134  SubmessageHeader submessage;
135  if (ser >> submessage) {
136  octetsToNextHeader = submessage.submessageLength;
137  submessage_.unknown_sm(submessage);
138  valid_ = true;
139  }
140  break;
141  }
142  }
143 #undef CASE_SMKIND
144 
145  if (valid_) {
146 
147  frag_ = (kind == DATA_FRAG);
148  data_ = (kind == DATA);
149 
150  // serialized_size_ is # of bytes of submessage we have read from "mb"
151  serialized_size_ = starting_length - mb.total_length();
152 
153  const ACE_CDR::UShort remaining = static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ);
154 
155  if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) {
156  // see RTPS v2.1 section 9.4.5.1.3
157  // In this case the current Submessage extends to the end of Message,
158  // so we will use the message_length_ that was set in pdu_remaining().
159  octetsToNextHeader = remaining;
160 
161  } else if (octetsToNextHeader > remaining) {
162  valid_ = false;
163  return;
164  }
165 
166  if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA)))
167  || kind == DATA_FRAG) {
168  // These Submessages have a payload which we haven't deserialized yet.
169  // The TransportReceiveStrategy will know this via message_length().
170  // octetsToNextHeader does not count the SubmessageHeader (4 bytes)
171  message_length_ = octetsToNextHeader + SMHDR_SZ - serialized_size_;
172  } else {
173  // These Submessages _could_ have extra data that we don't know about
174  // (from a newer minor version of the RTPS spec). Either way, indicate
175  // to the TransportReceiveStrategy that there is no data payload here.
176  message_length_ = 0;
177  ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(serialized_size_);
178  if (octetsToNextHeader + SMHDR_SZ > marshaled) {
179  valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled);
180  serialized_size_ = octetsToNextHeader + SMHDR_SZ;
181  }
182  }
183  }
184 }
ACE_Byte Octet
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:527
#define CASE_SMKIND(kind, class, name)
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
size_t length(void) const
SecuritySubmessage security_sm
Definition: RtpsCore.idl:873
const octet FLAG_E
Definition: RtpsCore.idl:518
char * rd_ptr(void) const
ACE_UINT16 UShort
ACE_Message_Block * cont(void) const
size_t total_length(void) const
const octet FLAG_D
Definition: RtpsCore.idl:523

◆ into_received_data_sample()

bool OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample ( ReceivedDataSample rds)

Definition at line 237 of file RtpsSampleHeader.cpp.

References ACE_DEBUG, OpenDDS::DCPS::ReceivedDataSample::append(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_K_IN_FRAG, OpenDDS::RTPS::FLAG_Q, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::RTPS::DataFragSubmessage::fragmentsInSubmessage, OpenDDS::RTPS::DataFragSubmessage::fragmentSize, OpenDDS::RTPS::DataFragSubmessage::fragmentStartingNum, OpenDDS::DCPS::ReceivedDataSample::has_data(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::RTPS::DataSubmessage::inlineQos, OpenDDS::RTPS::DataFragSubmessage::inlineQos, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, LM_DEBUG, LM_WARNING, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataSampleHeader::more_fragments_, OpenDDS::RTPS::PID_KEY_HASH, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceivedDataSample::replace(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::DataFragSubmessage::sampleSize, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::serialized_size(), OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::DataFragSubmessage::smHeader, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::RTPS::FragmentNumber_t::value, OpenDDS::RTPS::DataSubmessage::writerId, OpenDDS::RTPS::DataFragSubmessage::writerId, OpenDDS::RTPS::DataSubmessage::writerSN, and OpenDDS::RTPS::DataFragSubmessage::writerSN.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_from_secure(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input().

238 {
239  using namespace OpenDDS::RTPS;
240  DataSampleHeader& opendds = rds.header_;
241 
242  switch (submessage_._d()) {
243  case DATA: {
244  const DataSubmessage& rtps = submessage_.data_sm();
245  opendds.cdr_encapsulation_ = true;
246  opendds.message_length_ = message_length();
247  opendds.sequence_ = to_opendds_seqnum(rtps.writerSN);
248  opendds.publication_id_.entityId = rtps.writerId;
249  opendds.message_id_ = SAMPLE_DATA;
250 
251  process_iqos(opendds, rtps.inlineQos);
252 
253  if (rtps.smHeader.flags & FLAG_K_IN_DATA) {
254  opendds.key_fields_only_ = true;
255  } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) {
256  // Interoperability note: the Key may be hiding in the "key hash" param
257  // in the InlineQos. In order to make use of this Key, it mst be 16
258  // bytes or less. We have observed other DDS implementations only send
259  // the MD5 hash of a >16 byte key, so we must limit this to Built-in
260  // endpoints which are assumed to use GUIDs as keys.
261  if ((rtps.writerId.entityKind & 0xC0) == 0xC0 // Only Built-in endpoints
262  && (rtps.smHeader.flags & FLAG_Q) && !rds.has_data()) {
263  for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) {
264  if (rtps.inlineQos[i]._d() == PID_KEY_HASH) {
265  // CDR_BE encapsulation scheme (endianness is not used for key hash)
266  rds.replace("\x00\x00\x00\x00", EncapsulationHeader::serialized_size);
267  const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value;
268  rds.append(reinterpret_cast<const char*>(data), sizeof(DDS::OctetArray16));
269  opendds.message_length_ = EncapsulationHeader::serialized_size + sizeof(DDS::OctetArray16);
270  opendds.key_fields_only_ = true;
271  if (Transport_debug_level) {
272  ACE_DEBUG((LM_DEBUG,
273  "(%P|%t) RtpsSampleHeader::into_received_data_sample()"
274  " - used KeyHash data as the key-only payload\n"));
275  }
276  break;
277  }
278  }
279  } else {
280  // FUTURE: Handle the case of D = 0 and K = 0
281  // used for Coherent Sets in PRESENTATION QoS (see 8.7.5)
282  if (Transport_debug_level) {
283  ACE_DEBUG((LM_WARNING,
284  "(%P|%t) RtpsSampleHeader::into_received_data_sample() - "
285  "Received a DATA Submessage with D = 0 and K = 0, "
286  "dropping\n"));
287  }
288  return false;
289  }
290  }
291 
292  if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
293  // TODO(iguessthislldo: Convert to use Encoding
294  // Peek at the byte order from the encapsulation containing the payload.
295  opendds.byte_order_ = payload_byte_order(rds);
296  }
297 
298  break;
299  }
300  case DATA_FRAG: {
302  opendds.cdr_encapsulation_ = true;
303  opendds.message_length_ = message_length();
304  opendds.sequence_ = to_opendds_seqnum(rtps.writerSN);
305  opendds.publication_id_.entityId = rtps.writerId;
306  opendds.message_id_ = SAMPLE_DATA;
307  opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG);
308  // opendds.byte_order_ set in RtpsUdpReceiveStrategy::reassemble().
309 
310  process_iqos(opendds, rtps.inlineQos);
311 
312  const CORBA::ULong lastFragInSubmsg =
314  if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) {
315  opendds.more_fragments_ = true;
316  }
317  break;
318  }
319  default:
320  break;
321  }
322 
323  return true;
324 }
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:527
#define ACE_DEBUG(X)
const octet FLAG_Q
Definition: RtpsCore.idl:519
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
SequenceNumber_t writerSN
Definition: RtpsCore.idl:671
DataSubmessage data_sm
Definition: RtpsCore.idl:863
SubmessageHeader smHeader
Definition: RtpsCore.idl:663
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
DataFragSubmessage data_frag_sm
Definition: RtpsCore.idl:866
const ParameterId_t PID_KEY_HASH
Definition: RtpsCore.idl:293
ACE_CDR::ULong ULong
static void process_iqos(DataSampleHeader &opendds, const OpenDDS::RTPS::ParameterList &iqos)
static bool payload_byte_order(const ReceivedDataSample &rds)
DCPS::EntityId_t writerId
Definition: RtpsCore.idl:670
unsigned short fragmentsInSubmessage
Definition: RtpsCore.idl:691
ACE_CDR::Octet Octet
octet OctetArray16[16]
const octet FLAG_K_IN_FRAG
Definition: RtpsCore.idl:525
static const size_t serialized_size
Definition: Serializer.h:251
FragmentNumber_t fragmentStartingNum
Definition: RtpsCore.idl:690
const octet FLAG_D
Definition: RtpsCore.idl:523

◆ message_length()

ACE_INLINE ACE_UINT32 OpenDDS::DCPS::RtpsSampleHeader::message_length ( void  )

Definition at line 65 of file RtpsSampleHeader.inl.

References ACE_INLINE, and message_length_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input().

66 {
67  return static_cast<ACE_UINT32>(message_length_);
68 }

◆ more_fragments()

ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::more_fragments ( void  ) const

◆ operator=()

ACE_INLINE RtpsSampleHeader & OpenDDS::DCPS::RtpsSampleHeader::operator= ( ACE_Message_Block mn)

Definition at line 35 of file RtpsSampleHeader.inl.

References ACE_INLINE, data_, frag_, init(), and valid_.

36 {
37  valid_ = false;
38  frag_ = false;
39  data_ = false;
40  // message_length_ should not be reset here
41  // serialized_size_ doesn't need to be reset, init() will set it (if valid_)
42  init(mb);
43  return *this;
44 }
void init(ACE_Message_Block &mb)

◆ partial()

static bool OpenDDS::DCPS::RtpsSampleHeader::partial ( const ACE_Message_Block )
inlinestatic

Definition at line 44 of file RtpsSampleHeader.h.

44 { return false; }

◆ payload_byte_order()

bool OpenDDS::DCPS::RtpsSampleHeader::payload_byte_order ( const ReceivedDataSample rds)
static

◆ pdu_remaining()

ACE_INLINE void OpenDDS::DCPS::RtpsSampleHeader::pdu_remaining ( size_t  size)

Definition at line 53 of file RtpsSampleHeader.inl.

References ACE_INLINE, and message_length_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input().

54 {
55  message_length_ = size;
56 }

◆ populate_data_control_submessages()

void OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages ( RTPS::SubmessageSeq subm,
const TransportSendControlElement tsce,
bool  requires_inline_qos 
)
static

Definition at line 440 of file RtpsSampleHeader.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_Q, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::grow(), header, OpenDDS::DCPS::TransportSendControlElement::header(), OpenDDS::RTPS::DataSubmessage::inlineQos, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::TransportSendControlElement::listener(), LM_INFO, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::TransportSendControlElement::msg_payload(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::TransportSendListener::retrieve_inline_qos_data(), OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::to_rtps_seqnum(), and OpenDDS::DCPS::UNREGISTER_INSTANCE.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper(), and OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element_non_reliable_i().

444 {
445  using namespace OpenDDS::RTPS;
446 
447  const DataSampleHeader& header = tsce.header();
448  const ACE_CDR::Octet flags = header.byte_order_;
449  add_timestamp(subm, flags, header);
450 
451  static const CORBA::Octet BUILT_IN_WRITER = 0xC2;
452 
453  DataSubmessage data = {
454  {DATA, flags, 0},
455  0,
458  header.publication_id_.entityId,
459  RTPS::to_rtps_seqnum(header.sequence_),
460  ParameterList()
461  };
462  switch (header.message_id_) {
463  case INSTANCE_REGISTRATION: {
464  // NOTE: The RTPS spec is not entirely clear about instance registration.
465  // We have decided to send a DATA Submessage containing the key and an
466  // inlineQoS StatusInfo of zero.
467  data.smHeader.flags |= FLAG_K_IN_DATA;
468  const int qos_len = DCPS::grow(data.inlineQos) - 1;
469  data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
470  break;
471  }
472  case UNREGISTER_INSTANCE: {
473  data.smHeader.flags |= FLAG_K_IN_DATA;
474  const int qos_len = data.inlineQos.length();
475  data.inlineQos.length(qos_len+1);
476  data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
477  if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
478  add_key_hash(data.inlineQos, tsce.msg_payload());
479  }
480  break;
481  }
482  case DISPOSE_INSTANCE: {
483  data.smHeader.flags |= FLAG_K_IN_DATA;
484  const int qos_len = DCPS::grow(data.inlineQos) - 1;
485  data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
486  if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
487  add_key_hash(data.inlineQos, tsce.msg_payload());
488  }
489  break;
490  }
492  data.smHeader.flags |= FLAG_K_IN_DATA;
493  const int qos_len = DCPS::grow(data.inlineQos) - 1;
494  data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
495  if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
496  add_key_hash(data.inlineQos, tsce.msg_payload());
497  }
498  break;
499  }
500  // update control_message_supported() when adding new cases here
501  default:
502  ACE_DEBUG((LM_INFO,
503  "RtpsSampleHeader::populate_data_control_submessages(): "
504  "Non-sample messages seen, message_id = %d\n",
505  header.message_id_));
506  break;
507  }
508 
509  if (requires_inline_qos) {
510  TransportSendListener::InlineQosData qos_data;
511  tsce.listener()->retrieve_inline_qos_data(qos_data);
512 
513  populate_inline_qos(qos_data, data.inlineQos);
514  }
515 
516  if (data.inlineQos.length() > 0) {
517  data.smHeader.flags |= FLAG_Q;
518  }
519 
520  CORBA::ULong idx = DCPS::grow(subm) - 1;
521  subm[idx].data_sm(data);
522 }
ACE_Byte Octet
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:527
#define ACE_DEBUG(X)
const octet FLAG_Q
Definition: RtpsCore.idl:519
SubmessageHeader smHeader
Definition: RtpsCore.idl:663
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
Definition: MessageTypes.h:102
ACE_CDR::ULong ULong
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
static void populate_inline_qos(const TransportSendListener::InlineQosData &qos_data, RTPS::ParameterList &plist)
sequence< Parameter > ParameterList
ACE_CDR::Octet Octet
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ populate_data_sample_submessages()

void OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages ( RTPS::SubmessageSeq subm,
const DataSampleElement dsle,
bool  requires_inline_qos 
)
static

Definition at line 348 of file RtpsSampleHeader.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_Q, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_id(), OpenDDS::DCPS::grow(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::DCPS::GUIDPREFIX_UNKNOWN, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::RTPS::DataSubmessage::inlineQos, OpenDDS::RTPS::Parameter::key_hash, LM_INFO, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::push_back(), ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::TransportSendListener::retrieve_inline_qos_data(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::RTPS::DataSubmessage::smHeader, OpenDDS::RTPS::InfoDestinationSubmessage::smHeader, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::RTPS::to_rtps_seqnum(), and OpenDDS::RTPS::KeyHash_t::value.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper(), and OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element_non_reliable_i().

352 {
353  using namespace OpenDDS::RTPS;
354 
355  const ACE_CDR::Octet flags = dsle.get_header().byte_order_;
356  add_timestamp(subm, flags, dsle.get_header());
357  CORBA::ULong i = subm.length();
358 
359  EntityId_t readerId = ENTITYID_UNKNOWN;
360  if (dsle.get_num_subs() == 1) {
361  readerId = dsle.get_sub_id(0).entityId;
364  idest.smHeader.flags = flags;
366  std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix,
367  sizeof(GuidPrefix_t));
368  i = DCPS::grow(subm);
369  subm[i - 1].info_dst_sm(idest);
370  } else {
371  //Not durability resend, but could have inline gaps
372  for (CORBA::ULong x = 0; x < i; ++x) {
373  if (subm[x]._d() == INFO_DST) {
374  //Need to add INFO_DST
377  idest.smHeader.flags = flags;
379  std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN,
380  sizeof(GuidPrefix_t));
381  i = DCPS::grow(subm);
382  subm[i - 1].info_dst_sm(idest);
383  break;
384  }
385  }
386  }
387 
388  DataSubmessage data = {
389  {DATA, flags, 0},
390  0,
392  readerId,
393  dsle.get_pub_id().entityId,
394  RTPS::to_rtps_seqnum(dsle.get_header().sequence_),
395  ParameterList()
396  };
397  const char message_id = dsle.get_header().message_id_;
398  switch (message_id) {
399  case SAMPLE_DATA:
400  // Must be a data message
401  data.smHeader.flags |= FLAG_D;
402  break;
403  default:
404  ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): "
405  "Non-sample messages seen, message_id = %d\n", int(message_id)));
406  break;
407  }
408 
409  if (requires_inline_qos) {
410  TransportSendListener::InlineQosData qos_data;
411  dsle.get_send_listener()->retrieve_inline_qos_data(qos_data);
412 
413  populate_inline_qos(qos_data, data.inlineQos);
414  }
415 
416  if (data.inlineQos.length() > 0) {
417  data.smHeader.flags |= FLAG_Q;
418  }
419 
420  i = DCPS::grow(subm);
421  subm[i - 1].data_sm(data);
422 }
ACE_Byte Octet
#define ACE_DEBUG(X)
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
const octet FLAG_Q
Definition: RtpsCore.idl:519
SubmessageHeader smHeader
Definition: RtpsCore.idl:663
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
Definition: MessageTypes.h:102
ACE_CDR::ULong ULong
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
static void populate_inline_qos(const TransportSendListener::InlineQosData &qos_data, RTPS::ParameterList &plist)
sequence< Parameter > ParameterList
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
const octet FLAG_D
Definition: RtpsCore.idl:523
const ACE_CDR::UShort INFO_DST_SZ
Definition: MessageTypes.h:108

◆ populate_inline_qos()

void OpenDDS::DCPS::RtpsSampleHeader::populate_inline_qos ( const TransportSendListener::InlineQosData qos_data,
RTPS::ParameterList plist 
)
static

Definition at line 531 of file RtpsSampleHeader.cpp.

References OpenDDS::RTPS::BEST_EFFORT, DDS::BEST_EFFORT_RELIABILITY_QOS, ACE_Message_Block::copy(), OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::DCPS::grow(), OpenDDS::RTPS::ReliabilityQosPolicyRtps::kind, DDS::ReliabilityQosPolicy::kind, OpenDDS::RTPS::ReliabilityQosPolicyRtps::max_blocking_time, DDS::ReliabilityQosPolicy::max_blocking_time, OpenDDS::RTPS::PID_TOPIC_NAME, PROCESS_INLINE_QOS, OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, DDS::DataWriterQos::reliability, OpenDDS::RTPS::RELIABLE, ACE_CDR::swap_2(), ACE_CDR::swap_4(), TheServiceParticipant, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, OpenDDS::RTPS::ReliabilityKind_t::value, ACE_Message_Block::wr_ptr(), and write().

534 {
535  using namespace OpenDDS::RTPS;
536 
537  // Always include topic name (per the spec)
538  {
539  const int idx = DCPS::grow(plist) - 1;
540  plist[idx].string_data(qos_data.topic_name.c_str());
541  plist[idx]._d(PID_TOPIC_NAME);
542  }
543 
544  // Conditionally include other QoS inline when the differ from the
545  // default value.
546  DDS::PublisherQos default_pub_qos =
547  TheServiceParticipant->initial_PublisherQos();
548  PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos);
549  PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos);
550 
551  DDS::DataWriterQos default_dw_qos =
552  TheServiceParticipant->initial_DataWriterQos();
553  PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos);
554  PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos);
555  PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos);
556  PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos);
557 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
558  PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos);
559 #endif
560  PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos);
561  if (qos_data.dw_qos.reliability != default_dw_qos.reliability) {
562  const int idx = DCPS::grow(plist) - 1;
563 
564  ReliabilityQosPolicyRtps reliability;
565  reliability.max_blocking_time = qos_data.dw_qos.reliability.max_blocking_time;
566 
567  if (qos_data.dw_qos.reliability.kind == DDS::BEST_EFFORT_RELIABILITY_QOS) {
568  reliability.kind.value = RTPS::BEST_EFFORT;
569  } else { // default to RELIABLE for writers
570  reliability.kind.value = RTPS::RELIABLE;
571  }
572 
573  plist[idx].reliability(reliability);
574  }
575  PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos);
576  PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos);
577  PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos);
578 }
const short BEST_EFFORT
Definition: RtpsCore.idl:141
ReliabilityQosPolicy reliability
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
const ParameterId_t PID_TOPIC_NAME
Definition: RtpsCore.idl:254
const short RELIABLE
Definition: RtpsCore.idl:142
#define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS)
#define TheServiceParticipant

◆ process_iqos()

void OpenDDS::DCPS::RtpsSampleHeader::process_iqos ( DataSampleHeader opendds,
const OpenDDS::RTPS::ParameterList iqos 
)
staticprivate

Definition at line 187 of file RtpsSampleHeader.cpp.

References DDS::PresentationQosPolicy::access_scope, ACE_DEBUG, DDS::PresentationQosPolicy::coherent_access, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::INSTANCE_REGISTRATION, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, DDS::PartitionQosPolicy::name, OPENDDS_STRING, DDS::PresentationQosPolicy::ordered_access, OpenDDS::RTPS::PID_ORIGINAL_WRITER_INFO, OpenDDS::RTPS::PID_PARTITION, OpenDDS::RTPS::PID_PRESENTATION, OpenDDS::RTPS::PID_STATUS_INFO, OpenDDS::RTPS::PID_TOPIC_NAME, OpenDDS::DCPS::to_dds_string(), and OpenDDS::DCPS::UNREGISTER_INSTANCE.

189 {
190  using namespace OpenDDS::RTPS;
191 #if defined(OPENDDS_TEST_INLINE_QOS)
192  OPENDDS_STRING output("into_received_data_sample(): ");
193  output += to_dds_string(iqos.length());
194  output += " inline QoS parameters\n";
195  for (CORBA::ULong index = 0; index < iqos.length(); ++index) {
196  output += " parameter type = ";
197  output += to_dds_string(iqos[index]._d());
198  output += "\n";
199  }
200  ACE_DEBUG((LM_DEBUG, "%C", output.c_str()));
201 #endif
202  for (CORBA::ULong i = 0; i < iqos.length(); ++i) {
203  if (iqos[i]._d() == PID_STATUS_INFO) {
204  if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
205  opendds.message_id_ = DISPOSE_INSTANCE;
206  } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
207  opendds.message_id_ = UNREGISTER_INSTANCE;
208  } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
209  opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE;
210  } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
211  opendds.message_id_ = INSTANCE_REGISTRATION;
212  }
213  } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) {
214  opendds.historic_sample_ = true;
215 #if defined(OPENDDS_TEST_INLINE_QOS)
216  } else if (iqos[i]._d() == PID_TOPIC_NAME) {
217  ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data()));
218  } else if (iqos[i]._d() == PID_PRESENTATION) {
219  DDS::PresentationQosPolicy pres_qos = iqos[i].presentation();
220  ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, "
221  "coherent_access = %d, ordered_access = %d\n",
222  pres_qos.access_scope, pres_qos.coherent_access,
223  pres_qos.ordered_access));
224  } else if (iqos[i]._d() == PID_PARTITION) {
225  DDS::PartitionQosPolicy part_qos = iqos[i].partition();
226  ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length()));
227  for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) {
228  ACE_DEBUG((LM_DEBUG, "'%C' ", part_qos.name[j].in()));
229  }
230  ACE_DEBUG((LM_DEBUG, "\n"));
231 #endif
232  }
233  }
234 }
#define ACE_DEBUG(X)
const ParameterId_t PID_STATUS_INFO
Definition: RtpsCore.idl:294
#define OPENDDS_STRING
ACE_CDR::ULong ULong
const ParameterId_t PID_TOPIC_NAME
Definition: RtpsCore.idl:254
const ParameterId_t PID_PRESENTATION
Definition: RtpsCore.idl:270
const ParameterId_t PID_ORIGINAL_WRITER_INFO
Definition: RtpsCore.idl:301
String to_dds_string(unsigned short to_convert)
PresentationQosPolicyAccessScopeKind access_scope
const ParameterId_t PID_PARTITION
Definition: RtpsCore.idl:271

◆ split()

SequenceRange OpenDDS::DCPS::RtpsSampleHeader::split ( const ACE_Message_Block orig,
size_t  size,
Message_Block_Ptr head,
Message_Block_Ptr tail 
)
static

Create two new serialized headers (owned by caller), the "head" having at most "size" bytes (header + data) and the "tail" having the rest. Returns a pair containing the largest fragment number in each new header.

Definition at line 629 of file RtpsSampleHeader.cpp.

References ACE_CDR_BYTE_ORDER, ACE_ERROR, ACE_Message_Block::cont(), ACE_Message_Block::copy(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::DATA_FRAG_OCTETS_TO_IQOS, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_K_IN_FRAG, OpenDDS::RTPS::FLAG_N_IN_DATA, OpenDDS::RTPS::FLAG_N_IN_FRAG, OpenDDS::RTPS::FLAG_Q, ACE_Message_Block::length(), LM_ERROR, OPENDDS_END_VERSIONED_NAMESPACE_DECL, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::RTPS::SMHDR_SZ, ACE_Message_Block::total_length(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::unknown_sequence_range, ACE_Message_Block::wr_ptr(), and write().

Referenced by OpenDDS::DCPS::RtpsCustomizedElement::fragment().

631 {
632  using namespace RTPS;
633  size_t data_offset = 0;
634  const char* rd = orig.rd_ptr();
635  ACE_CDR::ULong starting_frag, sample_size;
636  ACE_CDR::Octet flags;
637  bool swap_bytes;
638 
639  // Find the start of the DATA | DATA_FRAG submessage in the orig msg block.
640  // The submessages from the start of the msg block to this point (data_offset)
641  // will be copied to both the head and tail fragments.
642  while (true) {
643  flags = rd[data_offset + 1];
644  swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E);
645  bool found_data = false;
646 
647  switch (rd[data_offset]) {
648  case DATA:
649  if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) {
650  if (Transport_debug_level) {
651  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
652  "attempting to fragment a Data submessage with no payload.\n"));
653  }
654  return unknown_sequence_range;
655  }
656  found_data = true;
657  starting_frag = 1;
658  sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length());
659  break;
660  case DATA_FRAG:
661  found_data = true;
662  peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
663  peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
664  break;
665  }
666 
667  if (found_data) {
668  break;
669  }
670 
671  // Scan for next submessage in orig
672  ACE_CDR::UShort octetsToNextHeader;
673  peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
674 
675  data_offset += octetsToNextHeader + SMHDR_SZ;
676  if (data_offset >= orig.length()) {
677  if (Transport_debug_level) {
678  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
679  "invalid octetsToNextHeader encountered while fragmenting.\n"));
680  }
681  return unknown_sequence_range;
682  }
683  }
684 
685  // Create the "head" message block (of size "sz") containing DATA_FRAG
686  size_t sz = orig.length();
687  ACE_CDR::Octet new_flags = flags;
688  size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS;
689  if (rd[data_offset] == DATA) {
690  sz += 12; // DATA_FRAG is 12 bytes larger than DATA
691  iqos_offset -= 12;
692  new_flags = flags & (FLAG_E | FLAG_Q);
693  if (flags & FLAG_K_IN_DATA) {
694  new_flags |= FLAG_K_IN_FRAG;
695  }
696  if (flags & FLAG_N_IN_DATA) {
697  new_flags |= FLAG_N_IN_FRAG;
698  }
699 
700  }
701  head.reset(DataSampleHeader::alloc_msgblock(orig, sz, false));
702 
703  head->copy(rd, data_offset);
704 
705  head->wr_ptr()[0] = DATA_FRAG;
706  head->wr_ptr()[1] = new_flags;
707  head->wr_ptr(2);
708 
709  std::memset(head->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
710  head->wr_ptr(4);
711 
712  write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
713 
714  head->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
715 
716  write(head, starting_frag, swap_bytes);
717  const size_t max_data = size - sz, orig_payload = orig.cont()->total_length();
718  const ACE_CDR::UShort frags =
719  static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
720  if (frags == 0) {
721  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsSampleHeader::split: "
722  "Number of Fragments is Zero: min(%B, %B) / FRAG_SIZE\n",
723  max_data, orig_payload));
724  return unknown_sequence_range;
725  }
726  write(head, frags, swap_bytes);
727  write(head, FRAG_SIZE, swap_bytes);
728  write(head, sample_size, swap_bytes);
729 
730  if (flags & FLAG_Q) {
731  head->copy(rd + iqos_offset, orig.length() - iqos_offset);
732  }
733 
734  // Create the "tail" message block containing DATA_FRAG with Q=0
735  tail.reset(DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false));
736 
737  tail->copy(rd, data_offset);
738 
739  tail->wr_ptr()[0] = DATA_FRAG;
740  tail->wr_ptr()[1] = new_flags & ~FLAG_Q;
741  tail->wr_ptr(2);
742 
743  std::memset(tail->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
744  tail->wr_ptr(4);
745 
746  write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
747  tail->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
748 
749  write(tail, starting_frag + frags, swap_bytes);
750  const size_t tail_data = orig_payload - frags * FRAG_SIZE;
751  const ACE_CDR::UShort tail_frags =
752  static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE);
753  write(tail, tail_frags, swap_bytes);
754  write(tail, FRAG_SIZE, swap_bytes);
755  write(tail, sample_size, swap_bytes);
756 
757  Message_Block_Ptr payload_head;
758  Message_Block_Ptr payload_tail;
760  payload_head, payload_tail);
761  head->cont(payload_head.release());
762  tail->cont(payload_tail.release());
763 
764  return SequenceRange(starting_frag + frags - 1,
765  starting_frag + frags + tail_frags - 1);
766 }
ACE_Byte Octet
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:527
#define ACE_ERROR(X)
OpenDDS_Dcps_Export const SequenceRange unknown_sequence_range
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
const octet FLAG_Q
Definition: RtpsCore.idl:519
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
size_t length(void) const
static ACE_Message_Block * alloc_msgblock(const ACE_Message_Block &mb, size_t size, bool use_data_alloc)
const octet FLAG_E
Definition: RtpsCore.idl:518
#define ACE_CDR_BYTE_ORDER
char * rd_ptr(void) const
const ACE_CDR::UShort DATA_FRAG_OCTETS_TO_IQOS
Definition: MessageTypes.h:103
static void split_payload(const ACE_Message_Block &orig, size_t size, Message_Block_Ptr &head, Message_Block_Ptr &tail)
ACE_UINT16 UShort
ACE_Message_Block * cont(void) const
const octet FLAG_N_IN_DATA
Definition: RtpsCore.idl:529
size_t total_length(void) const
ACE_UINT32 ULong
const octet FLAG_N_IN_FRAG
Definition: RtpsCore.idl:528
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
const octet FLAG_K_IN_FRAG
Definition: RtpsCore.idl:525
static const ACE_CDR::UShort FRAG_SIZE
::DDS::ReturnCode_t write(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t handle)
const octet FLAG_D
Definition: RtpsCore.idl:523
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ valid()

ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::valid ( void  ) const

Definition at line 47 of file RtpsSampleHeader.inl.

References ACE_INLINE, and valid_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header().

48 {
49  return valid_;
50 }

Member Data Documentation

◆ data_

bool OpenDDS::DCPS::RtpsSampleHeader::data_
private

Definition at line 74 of file RtpsSampleHeader.h.

Referenced by operator=().

◆ frag_

bool OpenDDS::DCPS::RtpsSampleHeader::frag_
private

Definition at line 74 of file RtpsSampleHeader.h.

Referenced by more_fragments(), and operator=().

◆ FRAG_SIZE

const ACE_CDR::UShort OpenDDS::DCPS::RtpsSampleHeader::FRAG_SIZE = 1024
static

Definition at line 98 of file RtpsSampleHeader.h.

◆ message_length_

size_t OpenDDS::DCPS::RtpsSampleHeader::message_length_
private

Definition at line 75 of file RtpsSampleHeader.h.

Referenced by message_length(), and pdu_remaining().

◆ serialized_size_

size_t OpenDDS::DCPS::RtpsSampleHeader::serialized_size_
private

Definition at line 75 of file RtpsSampleHeader.h.

Referenced by get_serialized_size().

◆ submessage_

RTPS::Submessage OpenDDS::DCPS::RtpsSampleHeader::submessage_

◆ valid_

bool OpenDDS::DCPS::RtpsSampleHeader::valid_
private

Definition at line 74 of file RtpsSampleHeader.h.

Referenced by operator=(), and valid().


The documentation for this class was generated from the following files: