OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Public Attributes | Private Member Functions | List of all members
OpenDDS::DCPS::ReceivedDataSample Class Reference

Holds a data sample received by the transport. More...

#include <ReceivedDataSample.h>

Collaboration diagram for OpenDDS::DCPS::ReceivedDataSample:
Collaboration graph
[legend]

Public Member Functions

 ReceivedDataSample ()
 
 ReceivedDataSample (const ACE_Message_Block &payload)
 
bool has_data () const
 true if at least one Data Block is stored (even if it has 0 useable bytes) More...
 
size_t data_length () const
 total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks More...
 
void clear ()
 
ACE_Message_Blockdata (ACE_Allocator *mb_alloc=0) const
 
bool write_data (Serializer &ser) const
 write the data payload to the Serializer More...
 
DDS::OctetSeq copy_data () const
 copy the data payload into an OctetSeq More...
 
unsigned char peek (size_t offset) const
 Retreive one byte of data from the payload. More...
 
void prepend (ReceivedDataSample &prefix)
 Update this ReceivedDataSample's data payload to include the prefix's data payload before any existing bytes. Headers are not modified. More...
 
void append (ReceivedDataSample &suffix)
 Update this ReceivedDataSample's data payload to include the suffix's data payload after any existing bytes. Headers are not modified. More...
 
void append (const char *data, size_t size)
 Add passed-in data to payload bytes. More...
 
void replace (const char *data, size_t size)
 Replace all payload bytes with passed-in data Based on the ACE_Message_Block(const char*, size_t) constructor, doesn't copy data. More...
 
ReceivedDataSample get_fragment_range (FragmentNumber start_frag, FragmentNumber end_frag=INVALID_FRAGMENT)
 

Public Attributes

DataSampleHeader header_
 The demarshalled sample header. More...
 
ACE_UINT32 fragment_size_
 Fragment size used by this sample. More...
 

Private Member Functions

 OPENDDS_VECTOR (MessageBlock) blocks_
 

Detailed Description

Holds a data sample received by the transport.

This is the type of object that is delivered to the TransportReceiveListener objects by the transport. Note that the data sample header has already been demarshalled by the transport, and the ACE_Message_Block (chain) represents the "data" portion of the sample.

Internally, ReceivedDataSample uses an alternate representation of the ACE_Message_Block with contiguous storage (vector) instead of a linked list to implement the continuation chain.

Definition at line 36 of file ReceivedDataSample.h.

Constructor & Destructor Documentation

◆ ReceivedDataSample() [1/2]

OpenDDS::DCPS::ReceivedDataSample::ReceivedDataSample ( )

Definition at line 20 of file ReceivedDataSample.cpp.

21  : fragment_size_(0)
22 {
23 }
ACE_UINT32 fragment_size_
Fragment size used by this sample.

◆ ReceivedDataSample() [2/2]

OpenDDS::DCPS::ReceivedDataSample::ReceivedDataSample ( const ACE_Message_Block payload)
explicit

Definition at line 25 of file ReceivedDataSample.cpp.

References ACE_Message_Block::cont().

26  : fragment_size_(0)
27 {
28  const ACE_Message_Block* amb = &payload;
29  do {
30  blocks_.push_back(MessageBlock(*amb));
31  amb = amb->cont();
32  } while (amb);
33 }
ACE_UINT32 fragment_size_
Fragment size used by this sample.
ACE_Message_Block * cont(void) const

Member Function Documentation

◆ append() [1/2]

void OpenDDS::DCPS::ReceivedDataSample::append ( ReceivedDataSample suffix)

Update this ReceivedDataSample's data payload to include the suffix's data payload after any existing bytes. Headers are not modified.

Parameters
suffixthe source ReceivedDataSample, its data will be removed and taken over by this ReceivedDataSample

Definition at line 134 of file ReceivedDataSample.cpp.

References OpenDDS::DCPS::back_inserter(), clear(), and OPENDDS_MOVE_OR_COPY.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::decode_payload(), OpenDDS::DCPS::TransportReassembly::FragInfo::insert(), and OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample().

135 {
136  OPENDDS_MOVE_OR_COPY(suffix.blocks_.begin(), suffix.blocks_.end(),
137  std::back_inserter(blocks_));
138  suffix.clear();
139 }
SequenceBackInsertIterator< Sequence > back_inserter(Sequence &seq)
#define OPENDDS_MOVE_OR_COPY

◆ append() [2/2]

void OpenDDS::DCPS::ReceivedDataSample::append ( const char *  data,
size_t  size 
)

Add passed-in data to payload bytes.

Parameters
datastart of bytes to add to the payload (makes a copy)
sizenumber of bytes to add to the payload

Definition at line 141 of file ReceivedDataSample.cpp.

References data().

142 {
143  blocks_.push_back(MessageBlock(size));
144  std::memcpy(blocks_.back().base(), data, size);
145  blocks_.back().write(size);
146 }
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const

◆ clear()

void OpenDDS::DCPS::ReceivedDataSample::clear ( void  )
inline

◆ copy_data()

DDS::OctetSeq OpenDDS::DCPS::ReceivedDataSample::copy_data ( ) const

copy the data payload into an OctetSeq

Definition at line 92 of file ReceivedDataSample.cpp.

References data(), data_length(), OpenDDS::DCPS::MessageBlock::len(), and OpenDDS::DCPS::MessageBlock::rd_ptr().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::decode_payload().

93 {
94  DDS::OctetSeq dst;
95  dst.length(static_cast<unsigned int>(data_length()));
96  unsigned char* out_iter = dst.get_buffer();
97  for (size_t i = 0; i < blocks_.size(); ++i) {
98  const MessageBlock& element = blocks_[i];
99  const unsigned int len = static_cast<unsigned int>(element.len());
100  const char* const data = element.rd_ptr();
101  std::memcpy(out_iter, data, len);
102  out_iter += len;
103  }
104  return dst;
105 }
char * rd_ptr(void) const
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
size_t data_length() const
total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64

◆ data()

ACE_Message_Block * OpenDDS::DCPS::ReceivedDataSample::data ( ACE_Allocator mb_alloc = 0) const

Definition at line 59 of file ReceivedDataSample.cpp.

References ACE_Message_Block::cont(), ACE_Message_Block::data_block(), OpenDDS::DCPS::MessageBlock::duplicate_data(), ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::MessageBlock::rd_ptr(), ACE_Message_Block::wr_ptr(), and OpenDDS::DCPS::MessageBlock::wr_ptr().

Referenced by append(), copy_data(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::RTPS::Sedp::Reader::data_received(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::dds_demarshal(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::lookup_instance(), OpenDDS::DCPS::UdpTransport::passive_connection(), OpenDDS::DCPS::MulticastDataLink::sample_received(), OpenDDS::DCPS::TransportReceiveStrategy<>::to_msgblock(), and write_data().

60 {
61  ACE_Message_Block* first = 0;
62  ACE_Message_Block* last = 0;
63  for (size_t i = 0; i < blocks_.size(); ++i) {
64  const MessageBlock& element = blocks_[i];
65  ACE_Message_Block* const mb = make_mb(mb_alloc);
66  mb->data_block(element.duplicate_data());
67  mb->rd_ptr(element.rd_ptr());
68  mb->wr_ptr(element.wr_ptr());
69  if (first) {
70  last->cont(mb);
71  } else {
72  first = mb;
73  }
74  last = mb;
75  }
76  return first;
77 }
char * rd_ptr(void) const
ACE_Data_Block * data_block(void) const
ACE_Message_Block * cont(void) const
char * wr_ptr(void) const

◆ data_length()

size_t OpenDDS::DCPS::ReceivedDataSample::data_length ( void  ) const

total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks

Definition at line 35 of file ReceivedDataSample.cpp.

References ACE_NEW_MALLOC_RETURN, ACE_Allocator::instance(), and ACE_Allocator::malloc().

Referenced by copy_data(), OpenDDS::DCPS::UdpTransport::passive_connection(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::sec_submsg_to_octets().

36 {
37  size_t len = 0;
38  for (size_t i = 0; i < blocks_.size(); ++i) {
39  len += blocks_[i].len();
40  }
41  return len;
42 }

◆ get_fragment_range()

ReceivedDataSample OpenDDS::DCPS::ReceivedDataSample::get_fragment_range ( FragmentNumber  start_frag,
FragmentNumber  end_frag = INVALID_FRAGMENT 
)

Definition at line 155 of file ReceivedDataSample.cpp.

References fragment_size_, header_, OpenDDS::DCPS::INVALID_FRAGMENT, OPENDDS_END_VERSIONED_NAMESPACE_DECL, and OPENDDS_VECTOR().

Referenced by OpenDDS::DCPS::TransportReassembly::FragInfo::insert().

156 {
157  ReceivedDataSample result;
158  result.header_ = header_;
159 
160  const size_t fsize = static_cast<size_t>(fragment_size_);
161  const size_t start_offset = static_cast<size_t>(start_frag) * fsize;
162  const size_t end_offset = end_frag == INVALID_FRAGMENT ? std::numeric_limits<size_t>::max() : static_cast<size_t>(end_frag + 1) * fsize - 1;
163 
164  size_t current_offset = 0;
165 
166  bool default_push = false;
167 
168  for (OPENDDS_VECTOR(MessageBlock)::iterator it = blocks_.begin(); current_offset < end_offset && it != blocks_.end(); ++it) {
169  const size_t len = it->len();
170  if (default_push) {
171  result.blocks_.push_back(*it);
172  if (end_offset < current_offset + len) {
173  result.blocks_.back().write(end_offset - current_offset - len); // this should be negative
174  }
175  } else if (start_offset < current_offset + len) {
176  default_push = true;
177  result.blocks_.push_back(*it);
178  if (current_offset < start_offset) {
179  result.blocks_.back().read(start_offset - current_offset);
180  }
181  }
182  current_offset += len;
183  }
184 
185  return result;
186 }
ACE_UINT32 fragment_size_
Fragment size used by this sample.
OPENDDS_VECTOR(MessageBlock) blocks_
DataSampleHeader header_
The demarshalled sample header.
static const FragmentNumber INVALID_FRAGMENT

◆ has_data()

bool OpenDDS::DCPS::ReceivedDataSample::has_data ( ) const
inline

true if at least one Data Block is stored (even if it has 0 useable bytes)

Definition at line 49 of file ReceivedDataSample.h.

Referenced by OpenDDS::DCPS::ReceiveListenerSet::data_received(), OpenDDS::DCPS::TransportReassembly::FragInfo::insert(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), and OpenDDS::DCPS::TransportReassembly::reassemble_i().

49 { return !blocks_.empty(); }

◆ OPENDDS_VECTOR()

OpenDDS::DCPS::ReceivedDataSample::OPENDDS_VECTOR ( MessageBlock  )
private

Referenced by get_fragment_range().

◆ peek()

unsigned char OpenDDS::DCPS::ReceivedDataSample::peek ( size_t  offset) const

Retreive one byte of data from the payload.

Parameters
offsetmust be in the range [0, data_length())

Definition at line 107 of file ReceivedDataSample.cpp.

References OpenDDS::DCPS::MessageBlock::len(), and OpenDDS::DCPS::MessageBlock::rd_ptr().

Referenced by OpenDDS::DCPS::RtpsSampleHeader::payload_byte_order(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble_i().

108 {
109  size_t remain = offset;
110  for (size_t i = 0; i < blocks_.size(); ++i) {
111  const MessageBlock& element = blocks_[i];
112  const size_t len = element.len();
113  if (remain < len) {
114  return element.rd_ptr()[remain];
115  }
116  remain -= len;
117  }
118  return 0;
119 }

◆ prepend()

void OpenDDS::DCPS::ReceivedDataSample::prepend ( ReceivedDataSample prefix)

Update this ReceivedDataSample's data payload to include the prefix's data payload before any existing bytes. Headers are not modified.

Parameters
prefixthe source ReceivedDataSample, its data will be removed and taken over by this ReceivedDataSample

Definition at line 127 of file ReceivedDataSample.cpp.

References clear(), and OPENDDS_MOVE_OR_COPY.

Referenced by OpenDDS::DCPS::TransportReassembly::FragInfo::insert().

128 {
129  OPENDDS_MOVE_OR_COPY(prefix.blocks_.begin(), prefix.blocks_.end(),
130  std::inserter(blocks_, blocks_.begin()));
131  prefix.clear();
132 }
#define OPENDDS_MOVE_OR_COPY

◆ replace()

void OpenDDS::DCPS::ReceivedDataSample::replace ( const char *  data,
size_t  size 
)

Replace all payload bytes with passed-in data Based on the ACE_Message_Block(const char*, size_t) constructor, doesn't copy data.

Parameters
datastart of bytes to use as the payload
sizenumber of bytes to use as the payload

Definition at line 148 of file ReceivedDataSample.cpp.

References clear().

Referenced by OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample().

149 {
150  clear();
151  blocks_.push_back(MessageBlock(data, size));
152 }
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const

◆ write_data()

bool OpenDDS::DCPS::ReceivedDataSample::write_data ( Serializer ser) const

write the data payload to the Serializer

Definition at line 79 of file ReceivedDataSample.cpp.

References data(), OpenDDS::DCPS::MessageBlock::len(), OpenDDS::DCPS::MessageBlock::rd_ptr(), and OpenDDS::DCPS::Serializer::write_octet_array().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::sec_submsg_to_octets().

80 {
81  for (size_t i = 0; i < blocks_.size(); ++i) {
82  const MessageBlock& element = blocks_[i];
83  const unsigned int len = static_cast<unsigned int>(element.len());
84  const char* const data = element.rd_ptr();
85  if (!ser.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(data), len)) {
86  return false;
87  }
88  }
89  return true;
90 }
char * rd_ptr(void) const
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const

Member Data Documentation

◆ fragment_size_

ACE_UINT32 OpenDDS::DCPS::ReceivedDataSample::fragment_size_

Fragment size used by this sample.

Definition at line 46 of file ReceivedDataSample.h.

Referenced by get_fragment_range(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble_i().

◆ header_

DataSampleHeader OpenDDS::DCPS::ReceivedDataSample::header_

The demarshalled sample header.

Definition at line 43 of file ReceivedDataSample.h.

Referenced by OpenDDS::DCPS::TcpDataLink::ack_received(), OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::RTPS::Sedp::Reader::data_received(), OpenDDS::DCPS::DataLink::data_received_i(), OpenDDS::RTPS::Sedp::DiscoveryReader::data_received_i(), OpenDDS::RTPS::Sedp::LivelinessReader::data_received_i(), OpenDDS::RTPS::Sedp::SecurityReader::data_received_i(), OpenDDS::RTPS::Sedp::TypeLookupRequestReader::data_received_i(), OpenDDS::RTPS::Sedp::TypeLookupReplyReader::data_received_i(), OpenDDS::DCPS::TransportReassembly::data_unavailable(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::dds_demarshal(), OpenDDS::RTPS::decode_parameter_list(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::decode_payload(), OpenDDS::DCPS::DataReaderImpl::deliver_historic(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::dispose_unregister(), OpenDDS::DCPS::RtpsUdpDataLink::filterBestEffortReaders(), get_fragment_range(), OpenDDS::DCPS::TransportReassembly::FragInfo::insert(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::lookup_instance(), OpenDDS::RTPS::Sedp::TypeLookupReplyReader::process_get_dependencies_reply(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::RTPS::Sedp::TypeLookupReplyReader::process_type_lookup_reply(), OpenDDS::DCPS::ReliableSession::ready_to_deliver(), OpenDDS::DCPS::TransportReassembly::reassemble_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble_i(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::ShmemDataLink::request_ack_received(), and OpenDDS::DCPS::MulticastDataLink::sample_received().


The documentation for this class was generated from the following files: