00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsSampleHeader.h"
00009 #include "RtpsUdpSendStrategy.h"
00010
00011 #include "dds/DCPS/Serializer.h"
00012 #include "dds/DCPS/DataSampleElement.h"
00013 #include "dds/DCPS/Marked_Default_Qos.h"
00014 #include "dds/DCPS/Qos_Helper.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/DisjointSequence.h"
00017
00018 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00019 #include "dds/DCPS/RTPS/MessageTypes.h"
00020 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00021
00022 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00023 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00024
00025 #include <cstring>
00026
00027 #ifndef __ACE_INLINE__
00028 #include "RtpsSampleHeader.inl"
00029 #endif
00030
00031 namespace {
00032 const OpenDDS::RTPS::StatusInfo_t STATUS_INFO_REGISTER = { { 0, 0, 0, 0 } },
00033 STATUS_INFO_DISPOSE = { { 0, 0, 0, 1 } },
00034 STATUS_INFO_UNREGISTER = { { 0, 0, 0, 2 } },
00035 STATUS_INFO_DISPOSE_UNREGISTER = { { 0, 0, 0, 3 } };
00036 }
00037
00038 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00039
00040 namespace OpenDDS {
00041 namespace RTPS {
00042
00043 #ifndef OPENDDS_SAFETY_PROFILE
00044 inline bool
00045 operator==(const StatusInfo_t& lhs, const StatusInfo_t& rhs)
00046 {
00047 return
00048 (lhs.value[3] == rhs.value[3]) &&
00049 (lhs.value[2] == rhs.value[2]) &&
00050 (lhs.value[1] == rhs.value[1]) &&
00051 (lhs.value[0] == rhs.value[0]);
00052 }
00053 #endif
00054
00055 }
00056 namespace DCPS {
00057
00058 void
00059 RtpsSampleHeader::init(ACE_Message_Block& mb)
00060 {
00061 using namespace OpenDDS::RTPS;
00062
00063
00064
00065
00066 if (mb.length() == 0) {
00067 return;
00068 }
00069
00070 const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr());
00071
00072 ACE_CDR::Octet flags = 0;
00073
00074 if (mb.length() > 1) {
00075 flags = mb.rd_ptr()[1];
00076 } else if (mb.cont() && mb.cont()->length() > 0) {
00077 flags = mb.cont()->rd_ptr()[0];
00078 } else {
00079 return;
00080 }
00081
00082 const bool little_endian = flags & FLAG_E;
00083 const size_t starting_length = mb.total_length();
00084 Serializer ser(&mb, ACE_CDR_BYTE_ORDER != little_endian,
00085 Serializer::ALIGN_CDR);
00086
00087 ACE_CDR::UShort octetsToNextHeader = 0;
00088
00089 #define CASE_SMKIND(kind, class, name) case kind: { \
00090 class submessage; \
00091 if (ser >> submessage) { \
00092 octetsToNextHeader = submessage.smHeader.submessageLength; \
00093 submessage_.name##_sm(submessage); \
00094 valid_ = true; \
00095 } \
00096 break; \
00097 }
00098
00099 switch (kind) {
00100 CASE_SMKIND(PAD, PadSubmessage, pad)
00101 CASE_SMKIND(ACKNACK, AckNackSubmessage, acknack)
00102 CASE_SMKIND(HEARTBEAT, HeartBeatSubmessage, heartbeat)
00103 CASE_SMKIND(GAP, GapSubmessage, gap)
00104 CASE_SMKIND(INFO_TS, InfoTimestampSubmessage, info_ts)
00105 CASE_SMKIND(INFO_SRC, InfoSourceSubmessage, info_src)
00106 CASE_SMKIND(INFO_REPLY_IP4, InfoReplyIp4Submessage, info_reply_ipv4)
00107 CASE_SMKIND(INFO_DST, InfoDestinationSubmessage, info_dst)
00108 CASE_SMKIND(INFO_REPLY, InfoReplySubmessage, info_reply)
00109 CASE_SMKIND(NACK_FRAG, NackFragSubmessage, nack_frag)
00110 CASE_SMKIND(HEARTBEAT_FRAG, HeartBeatFragSubmessage, hb_frag)
00111 CASE_SMKIND(DATA, DataSubmessage, data)
00112 CASE_SMKIND(DATA_FRAG, DataFragSubmessage, data_frag)
00113
00114 #if defined(OPENDDS_SECURITY)
00115
00116
00117 case SEC_BODY:
00118 case SEC_PREFIX:
00119 case SEC_POSTFIX:
00120 case SRTPS_PREFIX:
00121 case SRTPS_POSTFIX: {
00122 SecuritySubmessage submessage;
00123 if (ser >> submessage) {
00124 octetsToNextHeader = submessage.smHeader.submessageLength;
00125 submessage_.security_sm(submessage);
00126 submessage_._d(kind);
00127 valid_ = true;
00128 }
00129 break;
00130 }
00131 #endif
00132
00133 default:
00134 {
00135 SubmessageHeader submessage;
00136 if (ser >> submessage) {
00137 octetsToNextHeader = submessage.submessageLength;
00138 submessage_.unknown_sm(submessage);
00139 valid_ = true;
00140 }
00141 break;
00142 }
00143 }
00144 #undef CASE_SMKIND
00145
00146 if (valid_) {
00147
00148 frag_ = (kind == DATA_FRAG);
00149
00150
00151 marshaled_size_ = starting_length - mb.total_length();
00152
00153 if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) {
00154
00155
00156
00157 octetsToNextHeader =
00158 static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ);
00159 }
00160
00161 if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA)))
00162 || kind == DATA_FRAG) {
00163
00164
00165
00166 message_length_ = octetsToNextHeader + SMHDR_SZ - marshaled_size_;
00167 } else {
00168
00169
00170
00171 message_length_ = 0;
00172 ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(marshaled_size_);
00173 if (octetsToNextHeader + SMHDR_SZ > marshaled) {
00174 valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled);
00175 marshaled_size_ = octetsToNextHeader + SMHDR_SZ;
00176 }
00177 }
00178 }
00179 }
00180
00181 void
00182 RtpsSampleHeader::process_iqos(DataSampleHeader& opendds,
00183 const RTPS::ParameterList& iqos)
00184 {
00185 using namespace OpenDDS::RTPS;
00186 #if defined(OPENDDS_TEST_INLINE_QOS)
00187 OPENDDS_STRING output("into_received_data_sample(): ");
00188 output += to_dds_string(iqos.length());
00189 output += " inline QoS parameters\n";
00190 for (CORBA::ULong index = 0; index < iqos.length(); ++index) {
00191 output += " parameter type = ";
00192 output += to_dds_string(iqos[index]._d());
00193 output += "\n";
00194 }
00195 ACE_DEBUG((LM_DEBUG, "%C", output.c_str()));
00196 #endif
00197 for (CORBA::ULong i = 0; i < iqos.length(); ++i) {
00198 if (iqos[i]._d() == PID_STATUS_INFO) {
00199 if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
00200 opendds.message_id_ = DISPOSE_INSTANCE;
00201 } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
00202 opendds.message_id_ = UNREGISTER_INSTANCE;
00203 } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
00204 opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE;
00205 } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
00206 opendds.message_id_ = INSTANCE_REGISTRATION;
00207 }
00208 } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) {
00209 opendds.historic_sample_ = true;
00210 #if defined(OPENDDS_TEST_INLINE_QOS)
00211 } else if (iqos[i]._d() == PID_TOPIC_NAME) {
00212 ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data()));
00213 } else if (iqos[i]._d() == PID_PRESENTATION) {
00214 DDS::PresentationQosPolicy pres_qos = iqos[i].presentation();
00215 ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, "
00216 "coherent_access = %d, ordered_access = %d\n",
00217 pres_qos.access_scope, pres_qos.coherent_access,
00218 pres_qos.ordered_access));
00219 } else if (iqos[i]._d() == PID_PARTITION) {
00220 DDS::PartitionQosPolicy part_qos = iqos[i].partition();
00221 ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length()));
00222 for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) {
00223 ACE_DEBUG((LM_DEBUG, "'%C' ", part_qos.name[j].in()));
00224 }
00225 ACE_DEBUG((LM_DEBUG, "\n"));
00226 #endif
00227 }
00228 }
00229 }
00230
00231 bool
00232 RtpsSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
00233 {
00234 using namespace OpenDDS::RTPS;
00235 DataSampleHeader& opendds = rds.header_;
00236
00237 switch (submessage_._d()) {
00238 case DATA: {
00239 const DataSubmessage& rtps = submessage_.data_sm();
00240 opendds.cdr_encapsulation_ = true;
00241 opendds.message_length_ = message_length();
00242 opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low);
00243 opendds.publication_id_.entityId = rtps.writerId;
00244 opendds.message_id_ = SAMPLE_DATA;
00245
00246 process_iqos(opendds, rtps.inlineQos);
00247
00248 if (rtps.smHeader.flags & FLAG_K_IN_DATA) {
00249 opendds.key_fields_only_ = true;
00250 } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) {
00251
00252
00253
00254
00255
00256 if ((rtps.writerId.entityKind & 0xC0) == 0xC0
00257 && (rtps.smHeader.flags & FLAG_Q) && !rds.sample_) {
00258 for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) {
00259 if (rtps.inlineQos[i]._d() == PID_KEY_HASH) {
00260 rds.sample_.reset(new ACE_Message_Block(20));
00261
00262 rds.sample_->copy("\x00\x00\x00\x00", 4);
00263 const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value;
00264 rds.sample_->copy(reinterpret_cast<const char*>(data), 16);
00265 opendds.message_length_ =
00266 static_cast<ACE_UINT32>(rds.sample_->length());
00267 opendds.key_fields_only_ = true;
00268 if (Transport_debug_level) {
00269 ACE_DEBUG((LM_DEBUG,
00270 "(%P|%t) RtpsSampleHeader::into_received_data_sample()"
00271 " - used KeyHash data as the key-only payload\n"));
00272 }
00273 break;
00274 }
00275 }
00276 } else {
00277
00278
00279 if (Transport_debug_level) {
00280 ACE_DEBUG((LM_WARNING,
00281 "(%P|%t) RtpsSampleHeader::into_received_data_sample() - "
00282 "Received a DATA Submessage with D = 0 and K = 0, "
00283 "dropping\n"));
00284 }
00285 return false;
00286 }
00287 }
00288
00289 if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
00290
00291 opendds.byte_order_ = payload_byte_order(rds);
00292 }
00293
00294 break;
00295 }
00296 case DATA_FRAG: {
00297 const DataFragSubmessage& rtps = submessage_.data_frag_sm();
00298 opendds.cdr_encapsulation_ = true;
00299 opendds.message_length_ = message_length();
00300 opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low);
00301 opendds.publication_id_.entityId = rtps.writerId;
00302 opendds.message_id_ = SAMPLE_DATA;
00303 opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG);
00304
00305
00306 process_iqos(opendds, rtps.inlineQos);
00307
00308 const CORBA::ULong lastFragInSubmsg =
00309 rtps.fragmentStartingNum.value - 1 + rtps.fragmentsInSubmessage;
00310 if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) {
00311 opendds.more_fragments_ = true;
00312 }
00313 break;
00314 }
00315 default:
00316 break;
00317 }
00318
00319 return true;
00320 }
00321
00322 bool RtpsSampleHeader::payload_byte_order(const ReceivedDataSample& rds)
00323 {
00324 return rds.sample_->rd_ptr()[1] & RTPS::FLAG_E;
00325 }
00326
00327 namespace {
00328 void add_timestamp(RTPS::SubmessageSeq& subm, ACE_CDR::Octet flags,
00329 const DataSampleHeader& header)
00330 {
00331 using namespace OpenDDS::RTPS;
00332 const DDS::Time_t st = {header.source_timestamp_sec_,
00333 header.source_timestamp_nanosec_};
00334 const InfoTimestampSubmessage ts = {
00335 {INFO_TS, flags, INFO_TS_SZ},
00336 {st.sec, static_cast<ACE_UINT32>(st.nanosec * NANOS_TO_RTPS_FRACS + .5)}
00337 };
00338 const CORBA::ULong i = subm.length();
00339 subm.length(i + 1);
00340 subm[i].info_ts_sm(ts);
00341 }
00342 }
00343
00344 void
00345 RtpsSampleHeader::populate_data_sample_submessages(
00346 RTPS::SubmessageSeq& subm,
00347 const DataSampleElement& dsle,
00348 bool requires_inline_qos)
00349 {
00350 using namespace OpenDDS::RTPS;
00351
00352 const ACE_CDR::Octet flags = dsle.get_header().byte_order_;
00353 add_timestamp(subm, flags, dsle.get_header());
00354 CORBA::ULong i = subm.length();
00355
00356 EntityId_t readerId = ENTITYID_UNKNOWN;
00357 if (dsle.get_num_subs() == 1) {
00358 readerId = dsle.get_sub_id(0).entityId;
00359 InfoDestinationSubmessage idest;
00360 idest.smHeader.submessageId = INFO_DST;
00361 idest.smHeader.flags = flags;
00362 idest.smHeader.submessageLength = INFO_DST_SZ;
00363 std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix,
00364 sizeof(GuidPrefix_t));
00365 subm.length(i + 1);
00366 subm[i++].info_dst_sm(idest);
00367 } else {
00368
00369 for (CORBA::ULong x = 0; x < i; ++x) {
00370 if (subm[x]._d() == INFO_DST) {
00371
00372 InfoDestinationSubmessage idest;
00373 idest.smHeader.submessageId = INFO_DST;
00374 idest.smHeader.flags = flags;
00375 idest.smHeader.submessageLength = INFO_DST_SZ;
00376 std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN,
00377 sizeof(GuidPrefix_t));
00378 subm.length(i + 1);
00379 subm[i++].info_dst_sm(idest);
00380 break;
00381 }
00382 }
00383 }
00384
00385 DataSubmessage data = {
00386 {DATA, flags, 0},
00387 0,
00388 DATA_OCTETS_TO_IQOS,
00389 readerId,
00390 dsle.get_pub_id().entityId,
00391 {dsle.get_header().sequence_.getHigh(), dsle.get_header().sequence_.getLow()},
00392 ParameterList()
00393 };
00394 const char message_id = dsle.get_header().message_id_;
00395 switch (message_id) {
00396 case SAMPLE_DATA:
00397
00398 data.smHeader.flags |= FLAG_D;
00399 break;
00400 default:
00401 ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): "
00402 "Non-sample messages seen, message_id = %d\n", int(message_id)));
00403 break;
00404 }
00405
00406 if (requires_inline_qos) {
00407 TransportSendListener::InlineQosData qos_data;
00408 dsle.get_send_listener()->retrieve_inline_qos_data(qos_data);
00409
00410 populate_inline_qos(qos_data, data.inlineQos);
00411 }
00412
00413 if (data.inlineQos.length() > 0) {
00414 data.smHeader.flags |= FLAG_Q;
00415 }
00416
00417 subm.length(i + 1);
00418 subm[i].data_sm(data);
00419 }
00420
00421 namespace {
00422
00423
00424
00425 void add_key_hash(RTPS::ParameterList& plist, const ACE_Message_Block* data)
00426 {
00427 RTPS::KeyHash_t kh;
00428 static const size_t offset = 8 ;
00429 std::memcpy(kh.value, data->rd_ptr() + offset, sizeof(GUID_t));
00430 RTPS::Parameter p;
00431 p.key_hash(kh);
00432 const CORBA::ULong i = plist.length();
00433 plist.length(i + 1);
00434 plist[i] = p;
00435 }
00436 }
00437
00438 void
00439 RtpsSampleHeader::populate_data_control_submessages(
00440 OpenDDS::RTPS::SubmessageSeq& subm,
00441 const TransportSendControlElement& tsce,
00442 bool requires_inline_qos)
00443 {
00444 using namespace OpenDDS::RTPS;
00445
00446 const DataSampleHeader& header = tsce.header();
00447 const ACE_CDR::Octet flags = header.byte_order_;
00448 add_timestamp(subm, flags, header);
00449 CORBA::ULong i = subm.length();
00450
00451 static const CORBA::Octet BUILT_IN_WRITER = 0xC2;
00452
00453 DataSubmessage data = {
00454 {DATA, flags, 0},
00455 0,
00456 DATA_OCTETS_TO_IQOS,
00457 ENTITYID_UNKNOWN,
00458 header.publication_id_.entityId,
00459 {header.sequence_.getHigh(), header.sequence_.getLow()},
00460 ParameterList()
00461 };
00462 switch (header.message_id_) {
00463 case INSTANCE_REGISTRATION: {
00464
00465
00466
00467 data.smHeader.flags |= FLAG_K_IN_DATA;
00468 const int qos_len = data.inlineQos.length();
00469 data.inlineQos.length(qos_len + 1);
00470 data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
00471 break;
00472 }
00473 case UNREGISTER_INSTANCE: {
00474 data.smHeader.flags |= FLAG_K_IN_DATA;
00475 const int qos_len = data.inlineQos.length();
00476 data.inlineQos.length(qos_len+1);
00477 data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
00478 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00479 add_key_hash(data.inlineQos, tsce.msg_payload());
00480 }
00481 break;
00482 }
00483 case DISPOSE_INSTANCE: {
00484 data.smHeader.flags |= FLAG_K_IN_DATA;
00485 const int qos_len = data.inlineQos.length();
00486 data.inlineQos.length(qos_len + 1);
00487 data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
00488 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00489 add_key_hash(data.inlineQos, tsce.msg_payload());
00490 }
00491 break;
00492 }
00493 case DISPOSE_UNREGISTER_INSTANCE: {
00494 data.smHeader.flags |= FLAG_K_IN_DATA;
00495 const int qos_len = data.inlineQos.length();
00496 data.inlineQos.length(qos_len + 1);
00497 data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
00498 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00499 add_key_hash(data.inlineQos, tsce.msg_payload());
00500 }
00501 break;
00502 }
00503
00504 default:
00505 ACE_DEBUG((LM_INFO,
00506 "RtpsSampleHeader::populate_data_control_submessages(): "
00507 "Non-sample messages seen, message_id = %d\n",
00508 header.message_id_));
00509 break;
00510 }
00511
00512 if (requires_inline_qos) {
00513 TransportSendListener::InlineQosData qos_data;
00514 tsce.listener()->retrieve_inline_qos_data(qos_data);
00515
00516 populate_inline_qos(qos_data, data.inlineQos);
00517 }
00518
00519 if (data.inlineQos.length() > 0) {
00520 data.smHeader.flags |= FLAG_Q;
00521 }
00522
00523 subm.length(i + 1);
00524 subm[i].data_sm(data);
00525 }
00526
00527 #define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS) \
00528 if (WRITER_QOS.QOS_NAME != DEFAULT_QOS.QOS_NAME) { \
00529 const int qos_len = plist.length(); \
00530 plist.length(qos_len + 1); \
00531 plist[qos_len].QOS_NAME(WRITER_QOS.QOS_NAME); \
00532 }
00533
00534 void
00535 RtpsSampleHeader::populate_inline_qos(
00536 const TransportSendListener::InlineQosData& qos_data,
00537 RTPS::ParameterList& plist)
00538 {
00539 using namespace OpenDDS::RTPS;
00540
00541
00542 {
00543 const int qos_len = plist.length();
00544 plist.length(qos_len + 1);
00545 plist[qos_len].string_data(qos_data.topic_name.c_str());
00546 plist[qos_len]._d(PID_TOPIC_NAME);
00547 }
00548
00549
00550
00551 DDS::PublisherQos default_pub_qos =
00552 TheServiceParticipant->initial_PublisherQos();
00553 PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos);
00554 PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos);
00555
00556 DDS::DataWriterQos default_dw_qos =
00557 TheServiceParticipant->initial_DataWriterQos();
00558 PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos);
00559 PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos);
00560 PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos);
00561 PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos);
00562 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00563 PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos);
00564 #endif
00565 PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos);
00566 PROCESS_INLINE_QOS(reliability, default_dw_qos, qos_data.dw_qos);
00567 PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos);
00568 PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos);
00569 PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos);
00570 }
00571
00572 #undef PROCESS_INLINE_QOS
00573
00574
00575 namespace {
00576 void write(const Message_Block_Ptr& mb, ACE_CDR::UShort s, bool swap_bytes)
00577 {
00578 const char* ps = reinterpret_cast<const char*>(&s);
00579 if (swap_bytes) {
00580 ACE_CDR::swap_2(ps, mb->wr_ptr());
00581 mb->wr_ptr(2);
00582 } else {
00583 mb->copy(ps, 2);
00584 }
00585 }
00586
00587 void write(const Message_Block_Ptr& mb, ACE_CDR::ULong i, bool swap_bytes)
00588 {
00589 const char* pi = reinterpret_cast<const char*>(&i);
00590 if (swap_bytes) {
00591 ACE_CDR::swap_4(pi, mb->wr_ptr());
00592 mb->wr_ptr(4);
00593 } else {
00594 mb->copy(pi, 4);
00595 }
00596 }
00597
00598
00599 void peek(ACE_CDR::UShort& target, const char* src, bool swap_bytes)
00600 {
00601 if (swap_bytes) {
00602 ACE_CDR::swap_2(src, reinterpret_cast<char*>(&target));
00603 } else {
00604 std::memcpy(&target, src, sizeof(ACE_CDR::UShort));
00605 }
00606 }
00607
00608 void peek(ACE_CDR::ULong& target, const char* src, bool swap_bytes)
00609 {
00610 if (swap_bytes) {
00611 ACE_CDR::swap_4(src, reinterpret_cast<char*>(&target));
00612 } else {
00613 std::memcpy(&target, src, sizeof(ACE_CDR::ULong));
00614 }
00615 }
00616
00617 const size_t FRAG_START_OFFSET = 24, FRAG_SAMPLE_SIZE_OFFSET = 32;
00618 }
00619
00620 SequenceRange
00621 RtpsSampleHeader::split(const ACE_Message_Block& orig, size_t size,
00622 Message_Block_Ptr& head, Message_Block_Ptr& tail)
00623 {
00624 using namespace RTPS;
00625 static const SequenceRange unknown_range(SequenceNumber::SEQUENCENUMBER_UNKNOWN(),
00626 SequenceNumber::SEQUENCENUMBER_UNKNOWN());
00627 size_t data_offset = 0;
00628 const char* rd = orig.rd_ptr();
00629 ACE_CDR::ULong starting_frag, sample_size;
00630 ACE_CDR::Octet flags;
00631 bool swap_bytes;
00632
00633
00634
00635
00636 while (true) {
00637 flags = rd[data_offset + 1];
00638 swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E);
00639 bool found_data = false;
00640
00641 switch (rd[data_offset]) {
00642 case DATA:
00643 if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) {
00644 if (Transport_debug_level) {
00645 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
00646 "attempting to fragment a Data submessage with no payload.\n"));
00647 }
00648 return unknown_range;
00649 }
00650 found_data = true;
00651 starting_frag = 1;
00652 sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length());
00653 break;
00654 case DATA_FRAG:
00655 found_data = true;
00656 peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
00657 peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
00658 break;
00659 }
00660
00661 if (found_data) {
00662 break;
00663 }
00664
00665
00666 ACE_CDR::UShort octetsToNextHeader;
00667 peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
00668
00669 data_offset += octetsToNextHeader + SMHDR_SZ;
00670 if (data_offset >= orig.length()) {
00671 if (Transport_debug_level) {
00672 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
00673 "invalid octetsToNextHeader encountered while fragmenting.\n"));
00674 }
00675 return unknown_range;
00676 }
00677 }
00678
00679
00680 size_t sz = orig.length();
00681 ACE_CDR::Octet new_flags = flags;
00682 size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS;
00683 if (rd[data_offset] == DATA) {
00684 sz += 12;
00685 iqos_offset -= 12;
00686 new_flags &= ~(FLAG_K_IN_DATA | FLAG_K_IN_FRAG);
00687 if (flags & FLAG_K_IN_DATA) {
00688 new_flags |= FLAG_K_IN_FRAG;
00689 }
00690 }
00691 head.reset(DataSampleHeader::alloc_msgblock(orig, sz, false));
00692
00693 head->copy(rd, data_offset);
00694
00695 head->wr_ptr()[0] = DATA_FRAG;
00696 head->wr_ptr()[1] = new_flags;
00697 head->wr_ptr(2);
00698
00699 std::memset(head->wr_ptr(), 0, 4);
00700 head->wr_ptr(4);
00701
00702 write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
00703
00704 head->copy(rd + data_offset + 8, 16);
00705
00706 write(head, starting_frag, swap_bytes);
00707 const size_t max_data = size - sz, orig_payload = orig.cont()->total_length();
00708 const ACE_CDR::UShort frags =
00709 static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
00710 write(head, frags, swap_bytes);
00711 write(head, FRAG_SIZE, swap_bytes);
00712 write(head, sample_size, swap_bytes);
00713
00714 if (flags & FLAG_Q) {
00715 head->copy(rd + iqos_offset, orig.length() - iqos_offset);
00716 }
00717
00718
00719 tail.reset(DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false));
00720
00721 tail->copy(rd, data_offset);
00722
00723 tail->wr_ptr()[0] = DATA_FRAG;
00724 tail->wr_ptr()[1] = new_flags & ~FLAG_Q;
00725 tail->wr_ptr(2);
00726
00727 std::memset(tail->wr_ptr(), 0, 4);
00728 tail->wr_ptr(4);
00729
00730 write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
00731 tail->copy(rd + data_offset + 8, 16);
00732
00733 write(tail, starting_frag + frags, swap_bytes);
00734 const size_t tail_data = orig_payload - frags * FRAG_SIZE;
00735 const ACE_CDR::UShort tail_frags =
00736 static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE);
00737 write(tail, tail_frags, swap_bytes);
00738 write(tail, FRAG_SIZE, swap_bytes);
00739 write(tail, sample_size, swap_bytes);
00740
00741 Message_Block_Ptr payload_head;
00742 Message_Block_Ptr payload_tail;
00743 DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE,
00744 payload_head, payload_tail);
00745 head->cont(payload_head.release());
00746 tail->cont(payload_tail.release());
00747
00748 return SequenceRange(starting_frag + frags - 1,
00749 starting_frag + frags + tail_frags - 1);
00750 }
00751
00752 }
00753 }
00754
00755 OPENDDS_END_VERSIONED_NAMESPACE_DECL