Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "RtpsSampleHeader.h"
9 :
10 : #include "RtpsUdpSendStrategy.h"
11 :
12 : #include <dds/DCPS/SequenceNumber.h>
13 : #include <dds/DCPS/Serializer.h>
14 : #include <dds/DCPS/DataSampleElement.h>
15 : #include <dds/DCPS/Marked_Default_Qos.h>
16 : #include <dds/DCPS/Qos_Helper.h>
17 : #include <dds/DCPS/Service_Participant.h>
18 : #include <dds/DCPS/DisjointSequence.h>
19 : #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h>
20 : #include <dds/DCPS/RTPS/MessageTypes.h>
21 : #include <dds/DCPS/RTPS/MessageUtils.h>
22 : #include <dds/DCPS/transport/framework/ReceivedDataSample.h>
23 : #include <dds/DCPS/transport/framework/TransportSendListener.h>
24 :
25 : #include <cstring>
26 :
27 : #ifndef __ACE_INLINE__
28 : # include "RtpsSampleHeader.inl"
29 : #endif
30 :
31 : namespace {
32 : const OpenDDS::RTPS::StatusInfo_t STATUS_INFO_REGISTER = { { 0, 0, 0, 0 } },
33 : STATUS_INFO_DISPOSE = { { 0, 0, 0, 1 } },
34 : STATUS_INFO_UNREGISTER = { { 0, 0, 0, 2 } },
35 : STATUS_INFO_DISPOSE_UNREGISTER = { { 0, 0, 0, 3 } };
36 : }
37 :
38 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
39 :
40 : namespace OpenDDS {
41 : namespace RTPS {
42 :
43 : #ifndef OPENDDS_SAFETY_PROFILE
44 : inline bool
45 0 : operator==(const StatusInfo_t& lhs, const StatusInfo_t& rhs)
46 : {
47 : return
48 0 : (lhs.value[3] == rhs.value[3]) &&
49 0 : (lhs.value[2] == rhs.value[2]) &&
50 0 : (lhs.value[1] == rhs.value[1]) &&
51 0 : (lhs.value[0] == rhs.value[0]);
52 : }
53 : #endif
54 :
55 : }
56 : namespace DCPS {
57 :
58 : void
59 8 : RtpsSampleHeader::init(ACE_Message_Block& mb)
60 : {
61 : using namespace OpenDDS::RTPS;
62 :
63 : // valid_ is false here, it will only be set to true if there is a Submessage
64 :
65 : // Manually grab the first two bytes for the SubmessageKind and the byte order
66 8 : if (mb.length() == 0) {
67 0 : return;
68 : }
69 :
70 8 : const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr());
71 :
72 8 : ACE_CDR::Octet flags = 0;
73 :
74 8 : if (mb.length() > 1) {
75 8 : flags = mb.rd_ptr()[1];
76 0 : } else if (mb.cont() && mb.cont()->length() > 0) {
77 0 : flags = mb.cont()->rd_ptr()[0];
78 : } else {
79 0 : return;
80 : }
81 :
82 8 : const size_t starting_length = mb.total_length();
83 : Serializer ser(&mb, Encoding::KIND_XCDR1,
84 8 : (flags & FLAG_E) ? ENDIAN_LITTLE : ENDIAN_BIG);
85 :
86 8 : ACE_CDR::UShort octetsToNextHeader = 0;
87 :
88 : #define CASE_SMKIND(kind, class, name) case kind: { \
89 : class submessage; \
90 : if (ser >> submessage) { \
91 : octetsToNextHeader = submessage.smHeader.submessageLength; \
92 : submessage_.name##_sm(submessage); \
93 : valid_ = true; \
94 : } \
95 : break; \
96 : }
97 :
98 8 : switch (kind) {
99 0 : CASE_SMKIND(PAD, PadSubmessage, pad)
100 0 : CASE_SMKIND(ACKNACK, AckNackSubmessage, acknack)
101 0 : CASE_SMKIND(HEARTBEAT, HeartBeatSubmessage, heartbeat)
102 0 : CASE_SMKIND(GAP, GapSubmessage, gap)
103 4 : CASE_SMKIND(INFO_TS, InfoTimestampSubmessage, info_ts)
104 0 : CASE_SMKIND(INFO_SRC, InfoSourceSubmessage, info_src)
105 0 : CASE_SMKIND(INFO_REPLY_IP4, InfoReplyIp4Submessage, info_reply_ipv4)
106 0 : CASE_SMKIND(INFO_DST, InfoDestinationSubmessage, info_dst)
107 0 : CASE_SMKIND(INFO_REPLY, InfoReplySubmessage, info_reply)
108 0 : CASE_SMKIND(NACK_FRAG, NackFragSubmessage, nack_frag)
109 0 : CASE_SMKIND(HEARTBEAT_FRAG, HeartBeatFragSubmessage, hb_frag)
110 0 : CASE_SMKIND(DATA, DataSubmessage, data)
111 4 : CASE_SMKIND(DATA_FRAG, DataFragSubmessage, data_frag)
112 :
113 : #if defined(OPENDDS_SECURITY)
114 : // Each submessage type introduced by the Security spec is treated
115 : // as an opaque octet sequence at this layer.
116 0 : case SEC_BODY:
117 : case SEC_PREFIX:
118 : case SEC_POSTFIX:
119 : case SRTPS_PREFIX:
120 : case SRTPS_POSTFIX: {
121 0 : SecuritySubmessage submessage;
122 0 : if (ser >> submessage) {
123 0 : octetsToNextHeader = submessage.smHeader.submessageLength;
124 0 : submessage_.security_sm(submessage);
125 0 : submessage_._d(kind);
126 0 : valid_ = true;
127 : }
128 0 : break;
129 0 : }
130 : #endif
131 :
132 0 : default:
133 : {
134 : SubmessageHeader submessage;
135 0 : if (ser >> submessage) {
136 0 : octetsToNextHeader = submessage.submessageLength;
137 0 : submessage_.unknown_sm(submessage);
138 0 : valid_ = true;
139 : }
140 0 : break;
141 : }
142 : }
143 : #undef CASE_SMKIND
144 :
145 8 : if (valid_) {
146 :
147 8 : frag_ = (kind == DATA_FRAG);
148 8 : data_ = (kind == DATA);
149 :
150 : // serialized_size_ is # of bytes of submessage we have read from "mb"
151 8 : serialized_size_ = starting_length - mb.total_length();
152 :
153 8 : const ACE_CDR::UShort remaining = static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ);
154 :
155 8 : if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) {
156 : // see RTPS v2.1 section 9.4.5.1.3
157 : // In this case the current Submessage extends to the end of Message,
158 : // so we will use the message_length_ that was set in pdu_remaining().
159 4 : octetsToNextHeader = remaining;
160 :
161 4 : } else if (octetsToNextHeader > remaining) {
162 0 : valid_ = false;
163 0 : return;
164 : }
165 :
166 8 : if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA)))
167 8 : || kind == DATA_FRAG) {
168 : // These Submessages have a payload which we haven't deserialized yet.
169 : // The TransportReceiveStrategy will know this via message_length().
170 : // octetsToNextHeader does not count the SubmessageHeader (4 bytes)
171 4 : message_length_ = octetsToNextHeader + SMHDR_SZ - serialized_size_;
172 : } else {
173 : // These Submessages _could_ have extra data that we don't know about
174 : // (from a newer minor version of the RTPS spec). Either way, indicate
175 : // to the TransportReceiveStrategy that there is no data payload here.
176 4 : message_length_ = 0;
177 4 : ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(serialized_size_);
178 4 : if (octetsToNextHeader + SMHDR_SZ > marshaled) {
179 0 : valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled);
180 0 : serialized_size_ = octetsToNextHeader + SMHDR_SZ;
181 : }
182 : }
183 : }
184 8 : }
185 :
186 : void
187 0 : RtpsSampleHeader::process_iqos(DataSampleHeader& opendds,
188 : const RTPS::ParameterList& iqos)
189 : {
190 : using namespace OpenDDS::RTPS;
191 : #if defined(OPENDDS_TEST_INLINE_QOS)
192 : OPENDDS_STRING output("into_received_data_sample(): ");
193 : output += to_dds_string(iqos.length());
194 : output += " inline QoS parameters\n";
195 : for (CORBA::ULong index = 0; index < iqos.length(); ++index) {
196 : output += " parameter type = ";
197 : output += to_dds_string(iqos[index]._d());
198 : output += "\n";
199 : }
200 : ACE_DEBUG((LM_DEBUG, "%C", output.c_str()));
201 : #endif
202 0 : for (CORBA::ULong i = 0; i < iqos.length(); ++i) {
203 0 : if (iqos[i]._d() == PID_STATUS_INFO) {
204 0 : if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
205 0 : opendds.message_id_ = DISPOSE_INSTANCE;
206 0 : } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
207 0 : opendds.message_id_ = UNREGISTER_INSTANCE;
208 0 : } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
209 0 : opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE;
210 0 : } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
211 0 : opendds.message_id_ = INSTANCE_REGISTRATION;
212 : }
213 0 : } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) {
214 0 : opendds.historic_sample_ = true;
215 : #if defined(OPENDDS_TEST_INLINE_QOS)
216 : } else if (iqos[i]._d() == PID_TOPIC_NAME) {
217 : ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data()));
218 : } else if (iqos[i]._d() == PID_PRESENTATION) {
219 : DDS::PresentationQosPolicy pres_qos = iqos[i].presentation();
220 : ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, "
221 : "coherent_access = %d, ordered_access = %d\n",
222 : pres_qos.access_scope, pres_qos.coherent_access,
223 : pres_qos.ordered_access));
224 : } else if (iqos[i]._d() == PID_PARTITION) {
225 : DDS::PartitionQosPolicy part_qos = iqos[i].partition();
226 : ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length()));
227 : for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) {
228 : ACE_DEBUG((LM_DEBUG, "'%C' ", part_qos.name[j].in()));
229 : }
230 : ACE_DEBUG((LM_DEBUG, "\n"));
231 : #endif
232 : }
233 : }
234 0 : }
235 :
236 : bool
237 0 : RtpsSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
238 : {
239 : using namespace OpenDDS::RTPS;
240 0 : DataSampleHeader& opendds = rds.header_;
241 :
242 0 : switch (submessage_._d()) {
243 0 : case DATA: {
244 0 : const DataSubmessage& rtps = submessage_.data_sm();
245 0 : opendds.cdr_encapsulation_ = true;
246 0 : opendds.message_length_ = message_length();
247 0 : opendds.sequence_ = to_opendds_seqnum(rtps.writerSN);
248 0 : opendds.publication_id_.entityId = rtps.writerId;
249 0 : opendds.message_id_ = SAMPLE_DATA;
250 :
251 0 : process_iqos(opendds, rtps.inlineQos);
252 :
253 0 : if (rtps.smHeader.flags & FLAG_K_IN_DATA) {
254 0 : opendds.key_fields_only_ = true;
255 0 : } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) {
256 : // Interoperability note: the Key may be hiding in the "key hash" param
257 : // in the InlineQos. In order to make use of this Key, it mst be 16
258 : // bytes or less. We have observed other DDS implementations only send
259 : // the MD5 hash of a >16 byte key, so we must limit this to Built-in
260 : // endpoints which are assumed to use GUIDs as keys.
261 0 : if ((rtps.writerId.entityKind & 0xC0) == 0xC0 // Only Built-in endpoints
262 0 : && (rtps.smHeader.flags & FLAG_Q) && !rds.has_data()) {
263 0 : for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) {
264 0 : if (rtps.inlineQos[i]._d() == PID_KEY_HASH) {
265 : // CDR_BE encapsulation scheme (endianness is not used for key hash)
266 0 : rds.replace("\x00\x00\x00\x00", EncapsulationHeader::serialized_size);
267 0 : const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value;
268 0 : rds.append(reinterpret_cast<const char*>(data), sizeof(DDS::OctetArray16));
269 0 : opendds.message_length_ = EncapsulationHeader::serialized_size + sizeof(DDS::OctetArray16);
270 0 : opendds.key_fields_only_ = true;
271 0 : if (Transport_debug_level) {
272 0 : ACE_DEBUG((LM_DEBUG,
273 : "(%P|%t) RtpsSampleHeader::into_received_data_sample()"
274 : " - used KeyHash data as the key-only payload\n"));
275 : }
276 0 : break;
277 : }
278 : }
279 : } else {
280 : // FUTURE: Handle the case of D = 0 and K = 0
281 : // used for Coherent Sets in PRESENTATION QoS (see 8.7.5)
282 0 : if (Transport_debug_level) {
283 0 : ACE_DEBUG((LM_WARNING,
284 : "(%P|%t) RtpsSampleHeader::into_received_data_sample() - "
285 : "Received a DATA Submessage with D = 0 and K = 0, "
286 : "dropping\n"));
287 : }
288 0 : return false;
289 : }
290 : }
291 :
292 0 : if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
293 : // TODO(iguessthislldo: Convert to use Encoding
294 : // Peek at the byte order from the encapsulation containing the payload.
295 0 : opendds.byte_order_ = payload_byte_order(rds);
296 : }
297 :
298 0 : break;
299 : }
300 0 : case DATA_FRAG: {
301 0 : const DataFragSubmessage& rtps = submessage_.data_frag_sm();
302 0 : opendds.cdr_encapsulation_ = true;
303 0 : opendds.message_length_ = message_length();
304 0 : opendds.sequence_ = to_opendds_seqnum(rtps.writerSN);
305 0 : opendds.publication_id_.entityId = rtps.writerId;
306 0 : opendds.message_id_ = SAMPLE_DATA;
307 0 : opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG);
308 : // opendds.byte_order_ set in RtpsUdpReceiveStrategy::reassemble().
309 :
310 0 : process_iqos(opendds, rtps.inlineQos);
311 :
312 0 : const CORBA::ULong lastFragInSubmsg =
313 0 : rtps.fragmentStartingNum.value - 1 + rtps.fragmentsInSubmessage;
314 0 : if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) {
315 0 : opendds.more_fragments_ = true;
316 : }
317 0 : break;
318 : }
319 0 : default:
320 0 : break;
321 : }
322 :
323 0 : return true;
324 : }
325 :
326 0 : bool RtpsSampleHeader::payload_byte_order(const ReceivedDataSample& rds)
327 : {
328 0 : return rds.peek(1) & RTPS::FLAG_E;
329 : }
330 :
331 : namespace {
332 0 : void add_timestamp(RTPS::SubmessageSeq& subm, ACE_CDR::Octet flags,
333 : const DataSampleHeader& header)
334 : {
335 : using namespace OpenDDS::RTPS;
336 0 : const DDS::Time_t st = {header.source_timestamp_sec_,
337 0 : header.source_timestamp_nanosec_};
338 0 : const InfoTimestampSubmessage ts = {
339 : {INFO_TS, flags, INFO_TS_SZ},
340 0 : {static_cast<ACE_UINT32>(st.sec), DCPS::nanoseconds_to_uint32_fractional_seconds(st.nanosec)}
341 0 : };
342 0 : const CORBA::ULong i = DCPS::grow(subm) - 1;
343 0 : subm[i].info_ts_sm(ts);
344 0 : }
345 : }
346 :
347 : void
348 0 : RtpsSampleHeader::populate_data_sample_submessages(
349 : RTPS::SubmessageSeq& subm,
350 : const DataSampleElement& dsle,
351 : bool requires_inline_qos)
352 : {
353 : using namespace OpenDDS::RTPS;
354 :
355 0 : const ACE_CDR::Octet flags = dsle.get_header().byte_order_;
356 0 : add_timestamp(subm, flags, dsle.get_header());
357 0 : CORBA::ULong i = subm.length();
358 :
359 0 : EntityId_t readerId = ENTITYID_UNKNOWN;
360 0 : if (dsle.get_num_subs() == 1) {
361 0 : readerId = dsle.get_sub_id(0).entityId;
362 : InfoDestinationSubmessage idest;
363 0 : idest.smHeader.submessageId = INFO_DST;
364 0 : idest.smHeader.flags = flags;
365 0 : idest.smHeader.submessageLength = INFO_DST_SZ;
366 0 : std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix,
367 : sizeof(GuidPrefix_t));
368 0 : i = DCPS::grow(subm);
369 0 : subm[i - 1].info_dst_sm(idest);
370 : } else {
371 : //Not durability resend, but could have inline gaps
372 0 : for (CORBA::ULong x = 0; x < i; ++x) {
373 0 : if (subm[x]._d() == INFO_DST) {
374 : //Need to add INFO_DST
375 : InfoDestinationSubmessage idest;
376 0 : idest.smHeader.submessageId = INFO_DST;
377 0 : idest.smHeader.flags = flags;
378 0 : idest.smHeader.submessageLength = INFO_DST_SZ;
379 0 : std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN,
380 : sizeof(GuidPrefix_t));
381 0 : i = DCPS::grow(subm);
382 0 : subm[i - 1].info_dst_sm(idest);
383 0 : break;
384 : }
385 : }
386 : }
387 :
388 0 : DataSubmessage data = {
389 : {DATA, flags, 0},
390 : 0,
391 : DATA_OCTETS_TO_IQOS,
392 : readerId,
393 0 : dsle.get_pub_id().entityId,
394 0 : RTPS::to_rtps_seqnum(dsle.get_header().sequence_),
395 : ParameterList()
396 0 : };
397 0 : const char message_id = dsle.get_header().message_id_;
398 0 : switch (message_id) {
399 0 : case SAMPLE_DATA:
400 : // Must be a data message
401 0 : data.smHeader.flags |= FLAG_D;
402 0 : break;
403 0 : default:
404 0 : ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): "
405 : "Non-sample messages seen, message_id = %d\n", int(message_id)));
406 0 : break;
407 : }
408 :
409 0 : if (requires_inline_qos) {
410 0 : TransportSendListener::InlineQosData qos_data;
411 0 : dsle.get_send_listener()->retrieve_inline_qos_data(qos_data);
412 :
413 0 : populate_inline_qos(qos_data, data.inlineQos);
414 0 : }
415 :
416 0 : if (data.inlineQos.length() > 0) {
417 0 : data.smHeader.flags |= FLAG_Q;
418 : }
419 :
420 0 : i = DCPS::grow(subm);
421 0 : subm[i - 1].data_sm(data);
422 0 : }
423 :
424 : namespace {
425 : // Interoperability note:
426 : // used for discovery (SEDP) only, this helps interoperability because they
427 : // spec is unclear about the use of PID_KEY_HASH vs. the key as the payload
428 0 : void add_key_hash(RTPS::ParameterList& plist, const ACE_Message_Block* data)
429 : {
430 : RTPS::KeyHash_t kh;
431 : static const size_t offset = 8 /*skip encap (4) and plist hdr (4)*/;
432 0 : std::memcpy(kh.value, data->rd_ptr() + offset, sizeof(GUID_t));
433 0 : RTPS::Parameter p;
434 0 : p.key_hash(kh);
435 0 : DCPS::push_back(plist, p);
436 0 : }
437 : }
438 :
439 : void
440 0 : RtpsSampleHeader::populate_data_control_submessages(
441 : OpenDDS::RTPS::SubmessageSeq& subm,
442 : const TransportSendControlElement& tsce,
443 : bool requires_inline_qos)
444 : {
445 : using namespace OpenDDS::RTPS;
446 :
447 0 : const DataSampleHeader& header = tsce.header();
448 0 : const ACE_CDR::Octet flags = header.byte_order_;
449 0 : add_timestamp(subm, flags, header);
450 :
451 : static const CORBA::Octet BUILT_IN_WRITER = 0xC2;
452 :
453 0 : DataSubmessage data = {
454 : {DATA, flags, 0},
455 : 0,
456 : DATA_OCTETS_TO_IQOS,
457 : ENTITYID_UNKNOWN,
458 : header.publication_id_.entityId,
459 0 : RTPS::to_rtps_seqnum(header.sequence_),
460 : ParameterList()
461 0 : };
462 0 : switch (header.message_id_) {
463 0 : case INSTANCE_REGISTRATION: {
464 : // NOTE: The RTPS spec is not entirely clear about instance registration.
465 : // We have decided to send a DATA Submessage containing the key and an
466 : // inlineQoS StatusInfo of zero.
467 0 : data.smHeader.flags |= FLAG_K_IN_DATA;
468 0 : const int qos_len = DCPS::grow(data.inlineQos) - 1;
469 0 : data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
470 0 : break;
471 : }
472 0 : case UNREGISTER_INSTANCE: {
473 0 : data.smHeader.flags |= FLAG_K_IN_DATA;
474 0 : const int qos_len = data.inlineQos.length();
475 0 : data.inlineQos.length(qos_len+1);
476 0 : data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
477 0 : if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
478 0 : add_key_hash(data.inlineQos, tsce.msg_payload());
479 : }
480 0 : break;
481 : }
482 0 : case DISPOSE_INSTANCE: {
483 0 : data.smHeader.flags |= FLAG_K_IN_DATA;
484 0 : const int qos_len = DCPS::grow(data.inlineQos) - 1;
485 0 : data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
486 0 : if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
487 0 : add_key_hash(data.inlineQos, tsce.msg_payload());
488 : }
489 0 : break;
490 : }
491 0 : case DISPOSE_UNREGISTER_INSTANCE: {
492 0 : data.smHeader.flags |= FLAG_K_IN_DATA;
493 0 : const int qos_len = DCPS::grow(data.inlineQos) - 1;
494 0 : data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
495 0 : if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
496 0 : add_key_hash(data.inlineQos, tsce.msg_payload());
497 : }
498 0 : break;
499 : }
500 : // update control_message_supported() when adding new cases here
501 0 : default:
502 0 : ACE_DEBUG((LM_INFO,
503 : "RtpsSampleHeader::populate_data_control_submessages(): "
504 : "Non-sample messages seen, message_id = %d\n",
505 : header.message_id_));
506 0 : break;
507 : }
508 :
509 0 : if (requires_inline_qos) {
510 0 : TransportSendListener::InlineQosData qos_data;
511 0 : tsce.listener()->retrieve_inline_qos_data(qos_data);
512 :
513 0 : populate_inline_qos(qos_data, data.inlineQos);
514 0 : }
515 :
516 0 : if (data.inlineQos.length() > 0) {
517 0 : data.smHeader.flags |= FLAG_Q;
518 : }
519 :
520 0 : CORBA::ULong idx = DCPS::grow(subm) - 1;
521 0 : subm[idx].data_sm(data);
522 0 : }
523 :
524 : #define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS) \
525 : if (WRITER_QOS.QOS_NAME != DEFAULT_QOS.QOS_NAME) { \
526 : const int idx = DCPS::grow(plist) - 1; \
527 : plist[idx].QOS_NAME(WRITER_QOS.QOS_NAME); \
528 : }
529 :
530 : void
531 0 : RtpsSampleHeader::populate_inline_qos(
532 : const TransportSendListener::InlineQosData& qos_data,
533 : RTPS::ParameterList& plist)
534 : {
535 : using namespace OpenDDS::RTPS;
536 :
537 : // Always include topic name (per the spec)
538 : {
539 0 : const int idx = DCPS::grow(plist) - 1;
540 0 : plist[idx].string_data(qos_data.topic_name.c_str());
541 0 : plist[idx]._d(PID_TOPIC_NAME);
542 : }
543 :
544 : // Conditionally include other QoS inline when the differ from the
545 : // default value.
546 : DDS::PublisherQos default_pub_qos =
547 0 : TheServiceParticipant->initial_PublisherQos();
548 0 : PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos);
549 0 : PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos);
550 :
551 : DDS::DataWriterQos default_dw_qos =
552 0 : TheServiceParticipant->initial_DataWriterQos();
553 0 : PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos);
554 0 : PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos);
555 0 : PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos);
556 0 : PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos);
557 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
558 0 : PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos);
559 : #endif
560 0 : PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos);
561 0 : if (qos_data.dw_qos.reliability != default_dw_qos.reliability) {
562 0 : const int idx = DCPS::grow(plist) - 1;
563 :
564 : ReliabilityQosPolicyRtps reliability;
565 0 : reliability.max_blocking_time = qos_data.dw_qos.reliability.max_blocking_time;
566 :
567 0 : if (qos_data.dw_qos.reliability.kind == DDS::BEST_EFFORT_RELIABILITY_QOS) {
568 0 : reliability.kind.value = RTPS::BEST_EFFORT;
569 : } else { // default to RELIABLE for writers
570 0 : reliability.kind.value = RTPS::RELIABLE;
571 : }
572 :
573 0 : plist[idx].reliability(reliability);
574 : }
575 0 : PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos);
576 0 : PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos);
577 0 : PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos);
578 0 : }
579 :
580 : #undef PROCESS_INLINE_QOS
581 :
582 : // simple marshaling helpers for RtpsSampleHeader::split()
583 : namespace {
584 12 : void write(const Message_Block_Ptr& mb, ACE_CDR::UShort s, bool swap_bytes)
585 : {
586 12 : const char* ps = reinterpret_cast<const char*>(&s);
587 12 : if (swap_bytes) {
588 0 : ACE_CDR::swap_2(ps, mb->wr_ptr());
589 0 : mb->wr_ptr(2);
590 : } else {
591 12 : mb->copy(ps, 2);
592 : }
593 12 : }
594 :
595 8 : void write(const Message_Block_Ptr& mb, ACE_CDR::ULong i, bool swap_bytes)
596 : {
597 8 : const char* pi = reinterpret_cast<const char*>(&i);
598 8 : if (swap_bytes) {
599 0 : ACE_CDR::swap_4(pi, mb->wr_ptr());
600 0 : mb->wr_ptr(4);
601 : } else {
602 8 : mb->copy(pi, 4);
603 : }
604 8 : }
605 :
606 : // read without advancing rd_ptr()
607 2 : void peek(ACE_CDR::UShort& target, const char* src, bool swap_bytes)
608 : {
609 2 : if (swap_bytes) {
610 0 : ACE_CDR::swap_2(src, reinterpret_cast<char*>(&target));
611 : } else {
612 2 : std::memcpy(&target, src, sizeof(ACE_CDR::UShort));
613 : }
614 2 : }
615 :
616 2 : void peek(ACE_CDR::ULong& target, const char* src, bool swap_bytes)
617 : {
618 2 : if (swap_bytes) {
619 0 : ACE_CDR::swap_4(src, reinterpret_cast<char*>(&target));
620 : } else {
621 2 : std::memcpy(&target, src, sizeof(ACE_CDR::ULong));
622 : }
623 2 : }
624 :
625 : const size_t FRAG_START_OFFSET = 24, FRAG_SAMPLE_SIZE_OFFSET = 32;
626 : }
627 :
628 : SequenceRange
629 2 : RtpsSampleHeader::split(const ACE_Message_Block& orig, size_t size,
630 : Message_Block_Ptr& head, Message_Block_Ptr& tail)
631 : {
632 : using namespace RTPS;
633 2 : size_t data_offset = 0;
634 2 : const char* rd = orig.rd_ptr();
635 : ACE_CDR::ULong starting_frag, sample_size;
636 : ACE_CDR::Octet flags;
637 : bool swap_bytes;
638 :
639 : // Find the start of the DATA | DATA_FRAG submessage in the orig msg block.
640 : // The submessages from the start of the msg block to this point (data_offset)
641 : // will be copied to both the head and tail fragments.
642 : while (true) {
643 4 : flags = rd[data_offset + 1];
644 4 : swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E);
645 4 : bool found_data = false;
646 :
647 4 : switch (rd[data_offset]) {
648 1 : case DATA:
649 1 : if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) {
650 0 : if (Transport_debug_level) {
651 0 : ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
652 : "attempting to fragment a Data submessage with no payload.\n"));
653 : }
654 0 : return unknown_sequence_range;
655 : }
656 1 : found_data = true;
657 1 : starting_frag = 1;
658 1 : sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length());
659 1 : break;
660 1 : case DATA_FRAG:
661 1 : found_data = true;
662 1 : peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
663 1 : peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
664 1 : break;
665 : }
666 :
667 4 : if (found_data) {
668 2 : break;
669 : }
670 :
671 : // Scan for next submessage in orig
672 : ACE_CDR::UShort octetsToNextHeader;
673 2 : peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
674 :
675 2 : data_offset += octetsToNextHeader + SMHDR_SZ;
676 2 : if (data_offset >= orig.length()) {
677 0 : if (Transport_debug_level) {
678 0 : ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
679 : "invalid octetsToNextHeader encountered while fragmenting.\n"));
680 : }
681 0 : return unknown_sequence_range;
682 : }
683 2 : }
684 :
685 : // Create the "head" message block (of size "sz") containing DATA_FRAG
686 2 : size_t sz = orig.length();
687 2 : ACE_CDR::Octet new_flags = flags;
688 2 : size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS;
689 2 : if (rd[data_offset] == DATA) {
690 1 : sz += 12; // DATA_FRAG is 12 bytes larger than DATA
691 1 : iqos_offset -= 12;
692 1 : new_flags = flags & (FLAG_E | FLAG_Q);
693 1 : if (flags & FLAG_K_IN_DATA) {
694 0 : new_flags |= FLAG_K_IN_FRAG;
695 : }
696 1 : if (flags & FLAG_N_IN_DATA) {
697 0 : new_flags |= FLAG_N_IN_FRAG;
698 : }
699 :
700 : }
701 2 : head.reset(DataSampleHeader::alloc_msgblock(orig, sz, false));
702 :
703 2 : head->copy(rd, data_offset);
704 :
705 2 : head->wr_ptr()[0] = DATA_FRAG;
706 2 : head->wr_ptr()[1] = new_flags;
707 2 : head->wr_ptr(2);
708 :
709 2 : std::memset(head->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
710 2 : head->wr_ptr(4);
711 :
712 2 : write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
713 :
714 2 : head->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
715 :
716 2 : write(head, starting_frag, swap_bytes);
717 2 : const size_t max_data = size - sz, orig_payload = orig.cont()->total_length();
718 : const ACE_CDR::UShort frags =
719 2 : static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
720 2 : if (frags == 0) {
721 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsSampleHeader::split: "
722 : "Number of Fragments is Zero: min(%B, %B) / FRAG_SIZE\n",
723 : max_data, orig_payload));
724 0 : return unknown_sequence_range;
725 : }
726 2 : write(head, frags, swap_bytes);
727 2 : write(head, FRAG_SIZE, swap_bytes);
728 2 : write(head, sample_size, swap_bytes);
729 :
730 2 : if (flags & FLAG_Q) {
731 1 : head->copy(rd + iqos_offset, orig.length() - iqos_offset);
732 : }
733 :
734 : // Create the "tail" message block containing DATA_FRAG with Q=0
735 2 : tail.reset(DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false));
736 :
737 2 : tail->copy(rd, data_offset);
738 :
739 2 : tail->wr_ptr()[0] = DATA_FRAG;
740 2 : tail->wr_ptr()[1] = new_flags & ~FLAG_Q;
741 2 : tail->wr_ptr(2);
742 :
743 2 : std::memset(tail->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
744 2 : tail->wr_ptr(4);
745 :
746 2 : write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
747 2 : tail->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
748 :
749 2 : write(tail, starting_frag + frags, swap_bytes);
750 2 : const size_t tail_data = orig_payload - frags * FRAG_SIZE;
751 2 : const ACE_CDR::UShort tail_frags =
752 2 : static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE);
753 2 : write(tail, tail_frags, swap_bytes);
754 2 : write(tail, FRAG_SIZE, swap_bytes);
755 2 : write(tail, sample_size, swap_bytes);
756 :
757 2 : Message_Block_Ptr payload_head;
758 0 : Message_Block_Ptr payload_tail;
759 2 : DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE,
760 : payload_head, payload_tail);
761 2 : head->cont(payload_head.release());
762 2 : tail->cont(payload_tail.release());
763 :
764 0 : return SequenceRange(starting_frag + frags - 1,
765 2 : starting_frag + frags + tail_frags - 1);
766 : }
767 :
768 : }
769 : }
770 :
771 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|