Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 :
10 : #include "DataSampleHeader.h"
11 :
12 : #include "Serializer.h"
13 : #include "GuidConverter.h"
14 : #include "transport/framework/ReceivedDataSample.h"
15 : #include "SafetyProfileStreams.h"
16 : #ifndef OPENDDS_SAFETY_PROFILE
17 : # include "RestoreOutputStreamState.h"
18 : #endif
19 :
20 : #include <dds/DdsDcpsGuidTypeSupportImpl.h>
21 :
22 : #include <cstdio>
23 :
24 : #if !defined (__ACE_INLINE__)
25 : #include "DataSampleHeader.inl"
26 : #endif /* __ACE_INLINE__ */
27 :
28 : namespace {
29 :
30 : using OpenDDS::DCPS::Encoding;
31 : const Encoding::Kind encoding_kind = Encoding::KIND_UNALIGNED_CDR;
32 :
33 0 : bool mb_copy(char& dest, const ACE_Message_Block& mb, size_t offset, const Encoding&)
34 : {
35 0 : dest = mb.rd_ptr()[offset];
36 0 : return true;
37 : }
38 :
39 : template <typename T>
40 0 : bool mb_copy(T& dest, const ACE_Message_Block& mb, size_t offset, const Encoding& encoding)
41 : {
42 0 : if (mb.length() >= sizeof(T)) {
43 : // Avoid creating ACE_Message_Block from the heap if we just need one.
44 0 : ACE_Message_Block temp(mb.data_block (), ACE_Message_Block::DONT_DELETE);
45 0 : temp.rd_ptr(mb.rd_ptr()+offset);
46 0 : temp.wr_ptr(mb.wr_ptr());
47 0 : OpenDDS::DCPS::Serializer ser(&temp, encoding);
48 0 : ser.buffer_read(reinterpret_cast<char*>(&dest), sizeof(T), ser.swap_bytes());
49 0 : return ser.good_bit();
50 0 : }
51 :
52 0 : OpenDDS::DCPS::Message_Block_Ptr temp(mb.duplicate());
53 0 : if (!temp) { // couldn't allocate
54 0 : return false;
55 : }
56 0 : temp->rd_ptr(offset);
57 0 : if (temp->total_length() < sizeof(T)) {
58 0 : return false;
59 : }
60 0 : OpenDDS::DCPS::Serializer ser(temp.get(), encoding);
61 0 : ser.buffer_read(reinterpret_cast<char*>(&dest), sizeof(T), ser.swap_bytes());
62 0 : return ser.good_bit();
63 0 : }
64 :
65 : // Skip "offset" bytes from the mb and copy the subsequent data
66 : // (sizeof(T) bytes) into dest. Return false if there is not enough data
67 : // in the mb to complete the operation. Continuation pointers are followed.
68 : template <typename T>
69 0 : bool mb_peek(T& dest, const ACE_Message_Block& mb, size_t offset, const Encoding& encoding)
70 : {
71 0 : for (const ACE_Message_Block* iter = &mb; iter; iter = iter->cont()) {
72 0 : const size_t len = iter->length();
73 0 : if (len > offset) {
74 0 : return mb_copy(dest, *iter, offset, encoding);
75 : }
76 0 : offset -= len;
77 : }
78 0 : return false;
79 : }
80 : }
81 :
82 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
83 :
84 : namespace OpenDDS {
85 : namespace DCPS {
86 :
87 : // Allocate a new message block using the allocators from an existing
88 : // message block, "mb". Use of mb's data_allocator_ is optional.
89 : ACE_Message_Block*
90 33 : DataSampleHeader::alloc_msgblock(const ACE_Message_Block& mb,
91 : size_t size, bool use_data_alloc)
92 : {
93 : enum { DATA, DB, MB, N_ALLOC };
94 : ACE_Allocator* allocators[N_ALLOC];
95 : // It's an ACE bug that access_allocators isn't const
96 33 : ACE_Message_Block& mut_mb = const_cast<ACE_Message_Block&>(mb);
97 33 : mut_mb.access_allocators(allocators[DATA], allocators[DB], allocators[MB]);
98 33 : if (allocators[MB]) {
99 : ACE_Message_Block* result;
100 0 : ACE_NEW_MALLOC_RETURN(result,
101 : static_cast<ACE_Message_Block*>(
102 : allocators[MB]->malloc(sizeof(ACE_Message_Block))),
103 : ACE_Message_Block(size,
104 : ACE_Message_Block::MB_DATA,
105 : 0, // cont
106 : 0, // data
107 : use_data_alloc ? allocators[DATA] : 0,
108 : mut_mb.locking_strategy(), // locking_strategy
109 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
110 : ACE_Time_Value::zero,
111 : ACE_Time_Value::max_time,
112 : allocators[DB],
113 : allocators[MB]),
114 : 0);
115 0 : return result;
116 : } else {
117 : return new ACE_Message_Block(size,
118 : ACE_Message_Block::MB_DATA,
119 : 0, // cont
120 : 0, // data
121 : use_data_alloc ? allocators[DATA] : 0,
122 33 : mut_mb.locking_strategy(), // locking_strategy
123 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
124 : ACE_Time_Value::zero,
125 : ACE_Time_Value::max_time,
126 33 : allocators[DB]);
127 : }
128 : }
129 :
130 0 : bool DataSampleHeader::partial(const ACE_Message_Block& mb)
131 : {
132 0 : static const unsigned int LIFESPAN_MASK = mask_flag(LIFESPAN_DURATION_FLAG),
133 : LIFESPAN_LENGTH = 8,
134 0 : COHERENT_MASK = mask_flag(GROUP_COHERENT_FLAG),
135 : COHERENT_LENGTH = 16,
136 0 : CONTENT_FILT_MASK = mask_flag(CONTENT_FILTER_FLAG),
137 0 : BYTE_ORDER_MASK = mask_flag(BYTE_ORDER_FLAG);
138 :
139 0 : const size_t len = mb.total_length();
140 :
141 0 : if (len <= FLAGS_OFFSET) return true;
142 :
143 0 : Encoding encoding(encoding_kind);
144 : unsigned char msg_id;
145 0 : if (!mb_peek(msg_id, mb, MESSAGE_ID_OFFSET, encoding)
146 0 : || int(msg_id) >= MESSAGE_ID_MAX) {
147 : // This check, and the similar one below for submessage id, are actually
148 : // indicating an invalid header (and not a partial header) but we can
149 : // treat it the same as partial for the sake of the TransportRecvStrategy.
150 0 : return true;
151 : }
152 :
153 0 : if (!mb_peek(msg_id, mb, SUBMESSAGE_ID_OFFSET, encoding)
154 0 : || int(msg_id) >= SUBMESSAGE_ID_MAX) {
155 0 : return true;
156 : }
157 :
158 : char flags;
159 0 : if (!mb_peek(flags, mb, FLAGS_OFFSET, encoding)) {
160 0 : return true;
161 : }
162 :
163 0 : size_t expected = get_max_serialized_size();
164 0 : if (!(flags & LIFESPAN_MASK)) expected -= LIFESPAN_LENGTH;
165 0 : if (!(flags & COHERENT_MASK)) expected -= COHERENT_LENGTH;
166 :
167 0 : if (flags & CONTENT_FILT_MASK) {
168 : CORBA::ULong seqLen;
169 0 : encoding.endianness(static_cast<Endianness>(flags & BYTE_ORDER_MASK));
170 0 : if (!mb_peek(seqLen, mb, expected, encoding)) {
171 0 : return true;
172 : }
173 0 : expected += int32_cdr_size + guid_cdr_size * seqLen;
174 : }
175 :
176 0 : return len < expected;
177 : }
178 :
179 : void
180 33 : DataSampleHeader::init(ACE_Message_Block* buffer)
181 : {
182 33 : Encoding encoding(encoding_kind);
183 33 : Serializer reader(buffer, encoding);
184 33 : serialized_size_ = 0;
185 :
186 : // Only byte-sized reads until we get the byte_order_ flag.
187 :
188 33 : if (!(reader >> this->message_id_)) {
189 0 : return;
190 : }
191 33 : serialized_size_ += byte_cdr_size;
192 :
193 33 : if (!(reader >> this->submessage_id_)) {
194 0 : return;
195 : }
196 33 : serialized_size_ += byte_cdr_size;
197 :
198 : // Extract the flag values.
199 : ACE_CDR::Octet byte;
200 33 : if (!(reader >> ACE_InputCDR::to_octet(byte))) {
201 0 : return;
202 : }
203 33 : serialized_size_ += byte_cdr_size;
204 :
205 33 : this->byte_order_ = byte & mask_flag(BYTE_ORDER_FLAG);
206 33 : this->coherent_change_ = byte & mask_flag(COHERENT_CHANGE_FLAG);
207 33 : this->historic_sample_ = byte & mask_flag(HISTORIC_SAMPLE_FLAG);
208 33 : this->lifespan_duration_ = byte & mask_flag(LIFESPAN_DURATION_FLAG);
209 33 : this->group_coherent_ = byte & mask_flag(GROUP_COHERENT_FLAG);
210 33 : this->content_filter_ = byte & mask_flag(CONTENT_FILTER_FLAG);
211 33 : this->sequence_repair_ = byte & mask_flag(SEQUENCE_REPAIR_FLAG);
212 33 : this->more_fragments_ = byte & mask_flag(MORE_FRAGMENTS_FLAG);
213 :
214 : // Set swap_bytes flag to the Serializer if data sample from
215 : // the publisher is in different byte order.
216 33 : reader.swap_bytes(this->byte_order_ != ACE_CDR_BYTE_ORDER);
217 :
218 33 : if (!(reader >> ACE_InputCDR::to_octet(byte))) {
219 0 : return;
220 : }
221 33 : serialized_size_ += sizeof(byte);
222 33 : this->cdr_encapsulation_ = byte & mask_flag(CDR_ENCAP_FLAG);
223 33 : this->key_fields_only_ = byte & mask_flag(KEY_ONLY_FLAG);
224 :
225 33 : if (!(reader >> this->message_length_)) {
226 0 : return;
227 : }
228 33 : serialized_size_ += sizeof(message_length_);
229 :
230 33 : if (!(reader >> this->sequence_)) {
231 0 : return;
232 : }
233 33 : serialized_size(encoding, serialized_size_, sequence_);
234 :
235 33 : if (!(reader >> this->source_timestamp_sec_)) {
236 0 : return;
237 : }
238 33 : serialized_size_ += sizeof(source_timestamp_sec_);
239 :
240 33 : if (!(reader >> this->source_timestamp_nanosec_)) {
241 0 : return;
242 : }
243 33 : serialized_size_ += sizeof(source_timestamp_nanosec_);
244 :
245 33 : if (this->lifespan_duration_) {
246 0 : if (!(reader >> this->lifespan_duration_sec_)) {
247 0 : return;
248 : }
249 0 : serialized_size_ += sizeof(lifespan_duration_sec_);
250 :
251 0 : if (!(reader >> this->lifespan_duration_nanosec_)) {
252 0 : return;
253 : }
254 0 : serialized_size_ += sizeof(lifespan_duration_nanosec_);
255 : }
256 :
257 33 : if (!(reader >> this->publication_id_)) {
258 0 : return;
259 : }
260 33 : serialized_size(encoding, serialized_size_, publication_id_);
261 :
262 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
263 33 : if (this->group_coherent_) {
264 0 : if (!(reader >> this->publisher_id_)) {
265 0 : return;
266 : }
267 0 : serialized_size(encoding, serialized_size_, publisher_id_);
268 : }
269 : #endif
270 :
271 33 : if (this->content_filter_) {
272 7 : if (!(reader >> this->content_filter_entries_)) {
273 0 : return;
274 : }
275 7 : serialized_size(encoding, serialized_size_, content_filter_entries_);
276 : }
277 33 : }
278 :
279 : bool
280 23 : operator<<(ACE_Message_Block& buffer, const DataSampleHeader& value)
281 : {
282 23 : Serializer writer(&buffer, encoding_kind, value.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
283 :
284 23 : writer << value.message_id_;
285 23 : writer << value.submessage_id_;
286 :
287 : // Write the flags as a single byte.
288 23 : ACE_CDR::Octet flags = (value.byte_order_ << BYTE_ORDER_FLAG)
289 23 : | (value.coherent_change_ << COHERENT_CHANGE_FLAG)
290 23 : | (value.historic_sample_ << HISTORIC_SAMPLE_FLAG)
291 23 : | (value.lifespan_duration_ << LIFESPAN_DURATION_FLAG)
292 23 : | (value.group_coherent_ << GROUP_COHERENT_FLAG)
293 23 : | (value.content_filter_ << CONTENT_FILTER_FLAG)
294 23 : | (value.sequence_repair_ << SEQUENCE_REPAIR_FLAG)
295 23 : | (value.more_fragments_ << MORE_FRAGMENTS_FLAG)
296 : ;
297 23 : writer << ACE_OutputCDR::from_octet(flags);
298 :
299 23 : flags = (value.cdr_encapsulation_ << CDR_ENCAP_FLAG)
300 23 : | (value.key_fields_only_ << KEY_ONLY_FLAG)
301 : ;
302 23 : writer << ACE_OutputCDR::from_octet(flags);
303 :
304 23 : writer << value.message_length_;
305 23 : writer << value.sequence_;
306 23 : writer << value.source_timestamp_sec_;
307 23 : writer << value.source_timestamp_nanosec_;
308 :
309 23 : if (value.lifespan_duration_) {
310 0 : writer << value.lifespan_duration_sec_;
311 0 : writer << value.lifespan_duration_nanosec_;
312 : }
313 :
314 23 : writer << value.publication_id_;
315 :
316 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
317 23 : if (value.group_coherent_) {
318 0 : writer << value.publisher_id_;
319 : }
320 : #endif
321 :
322 : // content_filter_entries_ is deliberately not marshaled here.
323 : // It's variable sized, so it won't fit into our pre-allocated data block.
324 : // It may be customized per-datalink so it will be handled later with a
325 : // a chained (continuation) ACE_Message_Block.
326 :
327 46 : return writer.good_bit();
328 23 : }
329 :
330 : void
331 7 : DataSampleHeader::add_cfentries(const GUIDSeq* guids, ACE_Message_Block* mb)
332 : {
333 : Encoding encoding(encoding_kind,
334 7 : ACE_CDR_BYTE_ORDER != test_flag(BYTE_ORDER_FLAG, mb));
335 7 : size_t size = 0;
336 7 : if (guids) {
337 6 : serialized_size(encoding, size, *guids);
338 : } else {
339 1 : size = int32_cdr_size;
340 : }
341 7 : ACE_Message_Block* optHdr = alloc_msgblock(*mb, size, false);
342 :
343 7 : Serializer ser(optHdr, encoding);
344 7 : if (guids) {
345 6 : ser << *guids;
346 : } else {
347 1 : ser << CORBA::ULong(0);
348 : }
349 :
350 : // New chain: mb (DataSampleHeader), optHdr (GUIDSeq), data (Foo or control)
351 7 : optHdr->cont(mb->cont());
352 7 : mb->cont(optHdr);
353 7 : }
354 :
355 : void
356 12 : DataSampleHeader::split_payload(const ACE_Message_Block& orig, size_t size,
357 : Message_Block_Ptr& head,
358 : Message_Block_Ptr& tail)
359 : {
360 12 : if (!head) {
361 2 : head.reset(orig.duplicate());
362 : }
363 :
364 12 : ACE_Message_Block* frag = head.get();
365 12 : size_t frag_remain = size;
366 14 : for (; frag_remain > frag->length(); frag = frag->cont()) {
367 2 : frag_remain -= frag->length();
368 : }
369 :
370 12 : if (frag_remain == frag->length()) { // split at ACE_Message_Block boundary
371 1 : tail.reset(frag->cont());
372 : } else {
373 11 : tail.reset(frag->duplicate());
374 11 : frag->wr_ptr(frag->wr_ptr() - frag->length() + frag_remain);
375 11 : ACE_Message_Block::release(frag->cont());
376 11 : tail->rd_ptr(frag_remain);
377 : }
378 12 : frag->cont(0);
379 12 : }
380 :
381 : void
382 11 : DataSampleHeader::split(const ACE_Message_Block& orig, size_t size,
383 : Message_Block_Ptr& head, Message_Block_Ptr& tail)
384 : {
385 11 : Message_Block_Ptr dup (orig.duplicate());
386 11 : const Encoding encoding(encoding_kind);
387 :
388 11 : const size_t length = dup->total_length();
389 11 : DataSampleHeader hdr(*dup); // deserialize entire header (with cfentries)
390 11 : const size_t hdr_len = length - dup->total_length();
391 11 : const size_t this_max_serialized_size = get_max_serialized_size();
392 :
393 11 : ACE_Message_Block* payload = dup.get();
394 : //skip zero length message blocks
395 11 : ACE_Message_Block* prev = 0;
396 25 : for (; payload->length() == 0; payload = payload->cont()) {
397 14 : prev = payload;
398 : }
399 11 : prev->cont(0);
400 11 : Message_Block_Ptr payload_head(payload);
401 :
402 11 : if (size < hdr_len) { // need to fragment the content_filter_entries_
403 1 : head.reset(alloc_msgblock(*dup, this_max_serialized_size, true));
404 1 : hdr.more_fragments_ = true;
405 1 : hdr.message_length_ = 0; // no room for payload data
406 1 : *head << hdr;
407 1 : const size_t avail = size - head->length() - 4 /* sequence length */;
408 1 : const CORBA::ULong n_entries = static_cast<CORBA::ULong>(avail / guid_cdr_size);
409 1 : GUIDSeq entries(n_entries);
410 1 : entries.length(n_entries);
411 : // remove from the end of hdr's entries (order doesn't matter)
412 2 : for (CORBA::ULong i(0), x(hdr.content_filter_entries_.length());
413 2 : i < n_entries; ++i) {
414 1 : entries[i] = hdr.content_filter_entries_[--x];
415 1 : hdr.content_filter_entries_.length(x);
416 : }
417 1 : add_cfentries(&entries, head.get());
418 :
419 1 : tail.reset(alloc_msgblock(*dup, this_max_serialized_size, true));
420 1 : hdr.more_fragments_ = false;
421 1 : hdr.content_filter_ = (hdr.content_filter_entries_.length() > 0);
422 1 : hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length());
423 1 : *tail << hdr;
424 1 : tail->cont(payload_head.release());
425 1 : if (hdr.content_filter_) {
426 1 : add_cfentries(&hdr.content_filter_entries_, tail.get());
427 : }
428 1 : return;
429 1 : }
430 :
431 10 : Message_Block_Ptr payload_tail;
432 10 : split_payload(*payload, size - hdr_len, payload_head, payload_tail);
433 :
434 10 : hdr.more_fragments_ = true;
435 10 : hdr.message_length_ = static_cast<ACE_UINT32>(payload_head->total_length());
436 :
437 10 : head.reset(alloc_msgblock(*dup, this_max_serialized_size, true));
438 10 : *head << hdr;
439 10 : head->cont(payload_head.release());
440 10 : if (hdr.content_filter_) {
441 2 : add_cfentries(&hdr.content_filter_entries_, head.get());
442 : }
443 :
444 10 : hdr.more_fragments_ = false;
445 10 : hdr.content_filter_ = false;
446 10 : hdr.message_length_ = static_cast<ACE_UINT32>(payload_tail->total_length());
447 :
448 10 : tail.reset(alloc_msgblock(*dup, this_max_serialized_size, true));
449 10 : *tail << hdr;
450 10 : tail->cont(payload_tail.release());
451 13 : }
452 :
453 : bool
454 551 : DataSampleHeader::join(const DataSampleHeader& first,
455 : const DataSampleHeader& second, DataSampleHeader& result)
456 : {
457 551 : if (!first.more_fragments_ || first.sequence_ != second.sequence_) {
458 0 : return false;
459 : }
460 551 : result = second;
461 551 : result.message_length_ += first.message_length_;
462 551 : if (first.content_filter_) {
463 3 : result.content_filter_ = true;
464 3 : const CORBA::ULong entries = first.content_filter_entries_.length();
465 3 : CORBA::ULong x = result.content_filter_entries_.length();
466 3 : result.content_filter_entries_.length(x + entries);
467 10 : for (CORBA::ULong i(entries); i > 0;) {
468 7 : result.content_filter_entries_[x++] = first.content_filter_entries_[--i];
469 : }
470 : }
471 551 : return true;
472 : }
473 :
474 0 : const char* to_string(MessageId value)
475 : {
476 0 : switch (value) {
477 0 : case SAMPLE_DATA:
478 0 : return "SAMPLE_DATA";
479 0 : case DATAWRITER_LIVELINESS:
480 0 : return "DATAWRITER_LIVELINESS";
481 0 : case INSTANCE_REGISTRATION:
482 0 : return "INSTANCE_REGISTRATION";
483 0 : case UNREGISTER_INSTANCE:
484 0 : return "UNREGISTER_INSTANCE";
485 0 : case DISPOSE_INSTANCE:
486 0 : return "DISPOSE_INSTANCE";
487 0 : case GRACEFUL_DISCONNECT:
488 0 : return "GRACEFUL_DISCONNECT";
489 0 : case REQUEST_ACK:
490 0 : return "REQUEST_ACK";
491 0 : case SAMPLE_ACK:
492 0 : return "SAMPLE_ACK";
493 0 : case END_COHERENT_CHANGES:
494 0 : return "END_COHERENT_CHANGES";
495 0 : case TRANSPORT_CONTROL:
496 0 : return "TRANSPORT_CONTROL";
497 0 : case DISPOSE_UNREGISTER_INSTANCE:
498 0 : return "DISPOSE_UNREGISTER_INSTANCE";
499 0 : case END_HISTORIC_SAMPLES:
500 0 : return "END_HISTORIC_SAMPLES";
501 0 : default:
502 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: to_string(MessageId): ")
503 : ACE_TEXT("%d is either invalid or not recognized.\n"),
504 : value));
505 0 : return "Invalid MessageId";
506 : }
507 : }
508 :
509 0 : const char* to_string(SubMessageId value)
510 : {
511 0 : switch (value) {
512 0 : case SUBMESSAGE_NONE:
513 0 : return "SUBMESSAGE_NONE";
514 0 : case MULTICAST_SYN:
515 0 : return "MULTICAST_SYN";
516 0 : case MULTICAST_SYNACK:
517 0 : return "MULTICAST_SYNACK";
518 0 : case MULTICAST_NAK:
519 0 : return "MULTICAST_NAK";
520 0 : case MULTICAST_NAKACK:
521 0 : return "MULTICAST_NAKACK";
522 0 : default:
523 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: to_string(SubMessageId): ")
524 : ACE_TEXT("%d is either invalid or not recognized.\n"),
525 : value));
526 0 : return "Invalid SubMessageId";
527 : }
528 : }
529 :
530 0 : OPENDDS_STRING to_string(const DataSampleHeader& value)
531 : {
532 0 : OPENDDS_STRING ret;
533 0 : if (value.submessage_id_ != SUBMESSAGE_NONE) {
534 0 : ret += to_string(SubMessageId(value.submessage_id_));
535 0 : ret += " 0x";
536 0 : ret += to_dds_string(unsigned(value.submessage_id_), true);
537 0 : ret += "), ";
538 : } else {
539 0 : ret += to_string(MessageId(value.message_id_));
540 0 : ret += " (0x";
541 0 : ret += to_dds_string(unsigned(value.message_id_), true);
542 0 : ret += "), ";
543 : }
544 :
545 0 : ret += "Length: ";
546 0 : ret += to_dds_string(value.message_length_);
547 0 : ret += ", ";
548 :
549 0 : ret += "Byte order: ";
550 0 : ret += (value.byte_order_ == 1 ? "Little" : "Big");
551 0 : ret += " Endian";
552 :
553 0 : if (value.message_id_ != TRANSPORT_CONTROL) {
554 0 : ret += ", ";
555 :
556 0 : if (value.coherent_change_ == 1) ret += "Coherent, ";
557 0 : if (value.historic_sample_ == 1) ret += "Historic, ";
558 0 : if (value.lifespan_duration_ == 1) ret += "Lifespan, ";
559 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
560 0 : if (value.group_coherent_ == 1) ret += "Group-Coherent, ";
561 : #endif
562 0 : if (value.content_filter_ == 1) ret += "Content-Filtered, ";
563 0 : if (value.sequence_repair_ == 1) ret += "Sequence Repair, ";
564 0 : if (value.more_fragments_ == 1) ret += "More Fragments, ";
565 0 : if (value.cdr_encapsulation_ == 1) ret += "CDR Encapsulation, ";
566 0 : if (value.key_fields_only_ == 1) ret += "Key Fields Only, ";
567 :
568 0 : ret += "Sequence: 0x";
569 0 : ret += to_dds_string(unsigned(value.sequence_.getValue()), true);
570 0 : ret += ", ";
571 :
572 0 : ret += "Timestamp: ";
573 0 : ret += to_dds_string(value.source_timestamp_sec_);
574 0 : ret += ".";
575 0 : ret += to_dds_string(value.source_timestamp_nanosec_);
576 0 : ret += ", ";
577 :
578 0 : if (value.lifespan_duration_) {
579 0 : ret += "Lifespan: ";
580 0 : ret += to_dds_string(value.lifespan_duration_sec_);
581 0 : ret += ".";
582 0 : ret += to_dds_string(value.lifespan_duration_nanosec_);
583 0 : ret += ", ";
584 : }
585 :
586 0 : ret += "Publication: " + LogGuid(value.publication_id_).conv_;
587 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
588 0 : if (value.group_coherent_) {
589 0 : ret += ", Publisher: " + LogGuid(value.publisher_id_).conv_;
590 : }
591 : #endif
592 :
593 0 : if (value.content_filter_) {
594 0 : const CORBA::ULong len = value.content_filter_entries_.length();
595 0 : ret += ", Content-Filter Entries (";
596 0 : ret += to_dds_string(len);
597 0 : ret += "): [";
598 0 : for (CORBA::ULong i(0); i < len; ++i) {
599 0 : ret += LogGuid(value.content_filter_entries_[i]).conv_ + ' ';
600 : }
601 0 : ret += ']';
602 : }
603 : }
604 0 : return ret;
605 0 : }
606 :
607 : #ifndef OPENDDS_SAFETY_PROFILE
608 : /// Message Id enumeration insertion onto an ostream.
609 0 : std::ostream& operator<<(std::ostream& os, const MessageId value)
610 : {
611 0 : os << to_string(value);
612 0 : return os;
613 : }
614 :
615 : /// Sub-Message Id enumeration insertion onto an ostream.
616 0 : std::ostream& operator<<(std::ostream& os, const SubMessageId value)
617 : {
618 0 : os << to_string(value);
619 0 : return os;
620 : }
621 :
622 : /// Message header insertion onto an ostream.
623 : extern OpenDDS_Dcps_Export
624 0 : std::ostream& operator<<(std::ostream& str, const DataSampleHeader& value)
625 : {
626 0 : RestoreOutputStreamState stream_state(str);
627 :
628 0 : if (value.submessage_id_ != SUBMESSAGE_NONE) {
629 0 : str << SubMessageId(value.submessage_id_)
630 0 : << " (0x" << std::hex << std::setw(2) << std::setfill('0')
631 0 : << unsigned(value.submessage_id_) << "), ";
632 :
633 : } else {
634 0 : str << MessageId(value.message_id_)
635 0 : << " (0x" << std::hex << std::setw(2) << std::setfill('0')
636 0 : << unsigned(value.message_id_) << "), ";
637 : }
638 :
639 0 : str << "Length: " << std::dec << value.message_length_ << ", ";
640 :
641 0 : str << "Byte order: " << (value.byte_order_ == 1 ? "Little" : "Big")
642 0 : << " Endian";
643 :
644 0 : if (value.message_id_ != TRANSPORT_CONTROL) {
645 0 : str << ", ";
646 :
647 0 : if (value.coherent_change_ == 1) str << "Coherent, ";
648 0 : if (value.historic_sample_ == 1) str << "Historic, ";
649 0 : if (value.lifespan_duration_ == 1) str << "Lifespan, ";
650 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
651 0 : if (value.group_coherent_ == 1) str << "Group-Coherent, ";
652 : #endif
653 0 : if (value.content_filter_ == 1) str << "Content-Filtered, ";
654 0 : if (value.sequence_repair_ == 1) str << "Sequence Repair, ";
655 0 : if (value.more_fragments_ == 1) str << "More Fragments, ";
656 0 : if (value.cdr_encapsulation_ == 1) str << "CDR Encapsulation, ";
657 0 : if (value.key_fields_only_ == 1) str << "Key Fields Only, ";
658 :
659 0 : str << "Sequence: 0x" << std::hex << std::setw(4) << std::setfill('0')
660 0 : << value.sequence_.getValue() << ", ";
661 :
662 0 : str << "Timestamp: " << std::dec << value.source_timestamp_sec_ << "."
663 0 : << std::dec << value.source_timestamp_nanosec_ << ", ";
664 :
665 0 : if (value.lifespan_duration_) {
666 0 : str << "Lifespan: " << std::dec << value.lifespan_duration_sec_ << "."
667 0 : << std::dec << value.lifespan_duration_nanosec_ << ", ";
668 : }
669 :
670 0 : str << "Publication: " << GuidConverter(value.publication_id_);
671 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
672 0 : if (value.group_coherent_) {
673 0 : str << ", Publisher: " << GuidConverter(value.publisher_id_);
674 : }
675 : #endif
676 :
677 0 : if (value.content_filter_) {
678 0 : const CORBA::ULong len = value.content_filter_entries_.length();
679 0 : str << ", Content-Filter Entries (" << len << "): [";
680 0 : for (CORBA::ULong i(0); i < len; ++i) {
681 0 : str << GuidConverter(value.content_filter_entries_[i]) << ' ';
682 : }
683 0 : str << ']';
684 : }
685 : }
686 :
687 0 : return str;
688 0 : }
689 : #endif //OPENDDS_SAFETY_PROFILE
690 :
691 :
692 : bool
693 0 : DataSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
694 : {
695 0 : rds.header_ = *this;
696 0 : return true;
697 : }
698 :
699 : DataSampleHeader::MaybeGuard::NoOpLock DataSampleHeader::MaybeGuard::non_lock;
700 :
701 : } // namespace DCPS
702 : } // namespace OpenDDS
703 :
704 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|