OpenDDS  Snapshot(2023/04/28-20:55)
ReceivedDataSample.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "ReceivedDataSample.h"
11 
12 #include <algorithm>
13 
14 #include <cstring>
15 
17 namespace OpenDDS {
18 namespace DCPS {
19 
21  : fragment_size_(0)
22 {
23 }
24 
26  : fragment_size_(0)
27 {
28  const ACE_Message_Block* amb = &payload;
29  do {
30  blocks_.push_back(MessageBlock(*amb));
31  amb = amb->cont();
32  } while (amb);
33 }
34 
36 {
37  size_t len = 0;
38  for (size_t i = 0; i < blocks_.size(); ++i) {
39  len += blocks_[i].len();
40  }
41  return len;
42 }
43 
44 namespace {
45  ACE_Message_Block* make_mb(ACE_Allocator* mb_alloc)
46  {
47  if (!mb_alloc) {
48  mb_alloc = ACE_Allocator::instance();
49  }
50  ACE_Message_Block* mb = 0;
52  (ACE_Message_Block*) mb_alloc->malloc(sizeof(ACE_Message_Block)),
53  ACE_Message_Block(mb_alloc),
54  0);
55  return mb;
56  }
57 }
58 
60 {
61  ACE_Message_Block* first = 0;
62  ACE_Message_Block* last = 0;
63  for (size_t i = 0; i < blocks_.size(); ++i) {
64  const MessageBlock& element = blocks_[i];
65  ACE_Message_Block* const mb = make_mb(mb_alloc);
66  mb->data_block(element.duplicate_data());
67  mb->rd_ptr(element.rd_ptr());
68  mb->wr_ptr(element.wr_ptr());
69  if (first) {
70  last->cont(mb);
71  } else {
72  first = mb;
73  }
74  last = mb;
75  }
76  return first;
77 }
78 
80 {
81  for (size_t i = 0; i < blocks_.size(); ++i) {
82  const MessageBlock& element = blocks_[i];
83  const unsigned int len = static_cast<unsigned int>(element.len());
84  const char* const data = element.rd_ptr();
85  if (!ser.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(data), len)) {
86  return false;
87  }
88  }
89  return true;
90 }
91 
93 {
94  DDS::OctetSeq dst;
95  dst.length(static_cast<unsigned int>(data_length()));
96  unsigned char* out_iter = dst.get_buffer();
97  for (size_t i = 0; i < blocks_.size(); ++i) {
98  const MessageBlock& element = blocks_[i];
99  const unsigned int len = static_cast<unsigned int>(element.len());
100  const char* const data = element.rd_ptr();
101  std::memcpy(out_iter, data, len);
102  out_iter += len;
103  }
104  return dst;
105 }
106 
107 unsigned char ReceivedDataSample::peek(size_t offset) const
108 {
109  size_t remain = offset;
110  for (size_t i = 0; i < blocks_.size(); ++i) {
111  const MessageBlock& element = blocks_[i];
112  const size_t len = element.len();
113  if (remain < len) {
114  return element.rd_ptr()[remain];
115  }
116  remain -= len;
117  }
118  return 0;
119 }
120 
121 #ifdef ACE_HAS_CPP11
122 #define OPENDDS_MOVE_OR_COPY std::move
123 #else
124 #define OPENDDS_MOVE_OR_COPY std::copy
125 #endif
126 
128 {
129  OPENDDS_MOVE_OR_COPY(prefix.blocks_.begin(), prefix.blocks_.end(),
130  std::inserter(blocks_, blocks_.begin()));
131  prefix.clear();
132 }
133 
135 {
136  OPENDDS_MOVE_OR_COPY(suffix.blocks_.begin(), suffix.blocks_.end(),
137  std::back_inserter(blocks_));
138  suffix.clear();
139 }
140 
141 void ReceivedDataSample::append(const char* data, size_t size)
142 {
143  blocks_.push_back(MessageBlock(size));
144  std::memcpy(blocks_.back().base(), data, size);
145  blocks_.back().write(size);
146 }
147 
148 void ReceivedDataSample::replace(const char* data, size_t size)
149 {
150  clear();
151  blocks_.push_back(MessageBlock(data, size));
152 }
153 
156 {
157  ReceivedDataSample result;
158  result.header_ = header_;
159 
160  const size_t fsize = static_cast<size_t>(fragment_size_);
161  const size_t start_offset = static_cast<size_t>(start_frag) * fsize;
162  const size_t end_offset = end_frag == INVALID_FRAGMENT ? std::numeric_limits<size_t>::max() : static_cast<size_t>(end_frag + 1) * fsize - 1;
163 
164  size_t current_offset = 0;
165 
166  bool default_push = false;
167 
168  for (OPENDDS_VECTOR(MessageBlock)::iterator it = blocks_.begin(); current_offset < end_offset && it != blocks_.end(); ++it) {
169  const size_t len = it->len();
170  if (default_push) {
171  result.blocks_.push_back(*it);
172  if (end_offset < current_offset + len) {
173  result.blocks_.back().write(end_offset - current_offset - len); // this should be negative
174  }
175  } else if (start_offset < current_offset + len) {
176  default_push = true;
177  result.blocks_.push_back(*it);
178  if (current_offset < start_offset) {
179  result.blocks_.back().read(start_offset - current_offset);
180  }
181  }
182  current_offset += len;
183  }
184 
185  return result;
186 }
187 
188 }
189 }
DataSampleHeader header_
The demarshalled sample header.
SequenceBackInsertIterator< Sequence > back_inserter(Sequence &seq)
void replace(const char *data, size_t size)
Replace all payload bytes with passed-in data Based on the ACE_Message_Block(const char*...
ReceivedDataSample get_fragment_range(FragmentNumber start_frag, FragmentNumber end_frag=INVALID_FRAGMENT)
#define OPENDDS_MOVE_OR_COPY
DDS::OctetSeq copy_data() const
copy the data payload into an OctetSeq
char * rd_ptr(void) const
ACE_Data_Block * duplicate_data() const
Definition: MessageBlock.h:41
bool write_octet_array(const ACE_CDR::Octet *x, ACE_CDR::ULong length)
Definition: Serializer.inl:697
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
bool write_data(Serializer &ser) const
write the data payload to the Serializer
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
Holds a data sample received by the transport.
OPENDDS_VECTOR(MessageBlock) blocks_
SequenceNumber::Value FragmentNumber
static ACE_Allocator * instance(void)
ACE_Data_Block * data_block(void) const
ACE_Message_Block * cont(void) const
unsigned char peek(size_t offset) const
Retreive one byte of data from the payload.
char * wr_ptr(void) const
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
static const FragmentNumber INVALID_FRAGMENT
size_t data_length() const
total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
ACE_UINT32 fragment_size_
Fragment size used by this sample.
void append(ReceivedDataSample &suffix)
Update this ReceivedDataSample&#39;s data payload to include the suffix&#39;s data payload after any existing...
void prepend(ReceivedDataSample &prefix)
Update this ReceivedDataSample&#39;s data payload to include the prefix&#39;s data payload before any existin...
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual void * malloc(size_type nbytes)=0