00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsSampleHeader.h"
00009 #include "RtpsUdpSendStrategy.h"
00010
00011 #include "dds/DCPS/Serializer.h"
00012 #include "dds/DCPS/DataSampleElement.h"
00013 #include "dds/DCPS/Marked_Default_Qos.h"
00014 #include "dds/DCPS/Qos_Helper.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/DisjointSequence.h"
00017
00018 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00019 #include "dds/DCPS/RTPS/MessageTypes.h"
00020 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00021
00022 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00023 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00024
00025 #include <cstring>
00026
00027 #ifndef __ACE_INLINE__
00028 #include "RtpsSampleHeader.inl"
00029 #endif
00030
00031 namespace {
00032 enum { FLAG_E = 1, FLAG_Q = 2, FLAG_D = 4,
00033 FLAG_K_IN_DATA = 8, FLAG_K_IN_FRAG = 4 };
00034
00035 const OpenDDS::RTPS::StatusInfo_t STATUS_INFO_REGISTER = { { 0, 0, 0, 0 } },
00036 STATUS_INFO_DISPOSE = { { 0, 0, 0, 1 } },
00037 STATUS_INFO_UNREGISTER = { { 0, 0, 0, 2 } },
00038 STATUS_INFO_DISPOSE_UNREGISTER = { { 0, 0, 0, 3 } };
00039 }
00040
00041 namespace OpenDDS {
00042 namespace RTPS {
00043
00044 #ifndef OPENDDS_SAFETY_PROFILE
00045 inline bool
00046 operator==(const StatusInfo_t& lhs, const StatusInfo_t& rhs)
00047 {
00048 return
00049 (lhs.value[3] == rhs.value[3]) &&
00050 (lhs.value[2] == rhs.value[2]) &&
00051 (lhs.value[1] == rhs.value[1]) &&
00052 (lhs.value[0] == rhs.value[0]);
00053 }
00054 #endif
00055
00056 }
00057 namespace DCPS {
00058
00059 void
00060 RtpsSampleHeader::init(ACE_Message_Block& mb)
00061 {
00062 using namespace OpenDDS::RTPS;
00063
00064
00065
00066
00067 if (mb.length() == 0) {
00068 return;
00069 }
00070
00071 const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr());
00072
00073 ACE_CDR::Octet flags = 0;
00074
00075 if (mb.length() > 1) {
00076 flags = mb.rd_ptr()[1];
00077 } else if (mb.cont() && mb.cont()->length() > 0) {
00078 flags = mb.cont()->rd_ptr()[0];
00079 } else {
00080 return;
00081 }
00082
00083 const bool little_endian = flags & FLAG_E;
00084 const size_t starting_length = mb.total_length();
00085 Serializer ser(&mb, ACE_CDR_BYTE_ORDER != little_endian,
00086 Serializer::ALIGN_CDR);
00087
00088 ACE_CDR::UShort octetsToNextHeader = 0;
00089
00090 #define CASE_SMKIND(kind, class, name) case kind: { \
00091 class submessage; \
00092 if (ser >> submessage) { \
00093 octetsToNextHeader = submessage.smHeader.submessageLength; \
00094 submessage_.name##_sm(submessage); \
00095 valid_ = true; \
00096 } \
00097 break; \
00098 }
00099
00100 switch (kind) {
00101 CASE_SMKIND(PAD, PadSubmessage, pad)
00102 CASE_SMKIND(ACKNACK, AckNackSubmessage, acknack)
00103 CASE_SMKIND(HEARTBEAT, HeartBeatSubmessage, heartbeat)
00104 CASE_SMKIND(GAP, GapSubmessage, gap)
00105 CASE_SMKIND(INFO_TS, InfoTimestampSubmessage, info_ts)
00106 CASE_SMKIND(INFO_SRC, InfoSourceSubmessage, info_src)
00107 CASE_SMKIND(INFO_REPLY_IP4, InfoReplyIp4Submessage, info_reply_ipv4)
00108 CASE_SMKIND(INFO_DST, InfoDestinationSubmessage, info_dst)
00109 CASE_SMKIND(INFO_REPLY, InfoReplySubmessage, info_reply)
00110 CASE_SMKIND(NACK_FRAG, NackFragSubmessage, nack_frag)
00111 CASE_SMKIND(HEARTBEAT_FRAG, HeartBeatFragSubmessage, hb_frag)
00112 CASE_SMKIND(DATA, DataSubmessage, data)
00113 CASE_SMKIND(DATA_FRAG, DataFragSubmessage, data_frag)
00114 default:
00115 {
00116 SubmessageHeader submessage;
00117 if (ser >> submessage) {
00118 octetsToNextHeader = submessage.submessageLength;
00119 submessage_.unknown_sm(submessage);
00120 valid_ = true;
00121 }
00122 break;
00123 }
00124 }
00125 #undef CASE_SMKIND
00126
00127 if (valid_) {
00128
00129 frag_ = (kind == DATA_FRAG);
00130
00131
00132 marshaled_size_ = starting_length - mb.total_length();
00133
00134 if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) {
00135
00136
00137
00138 octetsToNextHeader =
00139 static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ);
00140 }
00141
00142 if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA)))
00143 || kind == DATA_FRAG) {
00144
00145
00146
00147 message_length_ = octetsToNextHeader + SMHDR_SZ - marshaled_size_;
00148 } else {
00149
00150
00151
00152 message_length_ = 0;
00153 ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(marshaled_size_);
00154 if (octetsToNextHeader + SMHDR_SZ > marshaled) {
00155 valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled);
00156 marshaled_size_ = octetsToNextHeader + SMHDR_SZ;
00157 }
00158 }
00159 }
00160 }
00161
00162 void
00163 RtpsSampleHeader::process_iqos(DataSampleHeader& opendds,
00164 const RTPS::ParameterList& iqos)
00165 {
00166 using namespace OpenDDS::RTPS;
00167 #if defined(OPENDDS_TEST_INLINE_QOS)
00168 OPENDDS_STRING output("into_received_data_sample(): ");
00169 output += to_dds_string(iqos.length());
00170 output += " inline QoS parameters\n";
00171 for (CORBA::ULong index = 0; index < iqos.length(); ++index) {
00172 output += " parameter type = ";
00173 output += to_dds_string(iqos[index]._d());
00174 output += "\n";
00175 }
00176 ACE_DEBUG((LM_DEBUG, "%C", output.c_str()));
00177 #endif
00178 for (CORBA::ULong i = 0; i < iqos.length(); ++i) {
00179 if (iqos[i]._d() == PID_STATUS_INFO) {
00180 if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
00181 opendds.message_id_ = DISPOSE_INSTANCE;
00182 } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
00183 opendds.message_id_ = UNREGISTER_INSTANCE;
00184 } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
00185 opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE;
00186 } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
00187 opendds.message_id_ = INSTANCE_REGISTRATION;
00188 }
00189 } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) {
00190 opendds.historic_sample_ = true;
00191 #if defined(OPENDDS_TEST_INLINE_QOS)
00192 } else if (iqos[i]._d() == PID_TOPIC_NAME) {
00193 ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data()));
00194 } else if (iqos[i]._d() == PID_PRESENTATION) {
00195 DDS::PresentationQosPolicy pres_qos = iqos[i].presentation();
00196 ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, "
00197 "coherent_access = %d, ordered_access = %d\n",
00198 pres_qos.access_scope, pres_qos.coherent_access,
00199 pres_qos.ordered_access));
00200 } else if (iqos[i]._d() == PID_PARTITION) {
00201 DDS::PartitionQosPolicy part_qos = iqos[i].partition();
00202 ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length()));
00203 for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) {
00204 ACE_DEBUG((LM_DEBUG, "'%C' ", part_qos.name[j].in()));
00205 }
00206 ACE_DEBUG((LM_DEBUG, "\n"));
00207 #endif
00208 }
00209 }
00210 }
00211
00212 bool
00213 RtpsSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
00214 {
00215 using namespace OpenDDS::RTPS;
00216 DataSampleHeader& opendds = rds.header_;
00217
00218 switch (submessage_._d()) {
00219 case DATA: {
00220 const DataSubmessage& rtps = submessage_.data_sm();
00221 opendds.cdr_encapsulation_ = true;
00222 opendds.message_length_ = message_length();
00223 opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low);
00224 opendds.publication_id_.entityId = rtps.writerId;
00225 opendds.message_id_ = SAMPLE_DATA;
00226
00227 process_iqos(opendds, rtps.inlineQos);
00228
00229 if (rtps.smHeader.flags & FLAG_K_IN_DATA) {
00230 opendds.key_fields_only_ = true;
00231 } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) {
00232
00233
00234
00235
00236
00237 if ((rtps.writerId.entityKind & 0xC0) == 0xC0
00238 && (rtps.smHeader.flags & FLAG_Q) && !rds.sample_) {
00239 for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) {
00240 if (rtps.inlineQos[i]._d() == PID_KEY_HASH) {
00241 rds.sample_ = new ACE_Message_Block(20);
00242
00243 rds.sample_->copy("\x00\x00\x00\x00", 4);
00244 const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value;
00245 rds.sample_->copy(reinterpret_cast<const char*>(data), 16);
00246 opendds.message_length_ =
00247 static_cast<ACE_UINT32>(rds.sample_->length());
00248 opendds.key_fields_only_ = true;
00249 if (Transport_debug_level) {
00250 ACE_DEBUG((LM_DEBUG,
00251 "(%P|%t) RtpsSampleHeader::into_received_data_sample()"
00252 " - used KeyHash data as the key-only payload\n"));
00253 }
00254 break;
00255 }
00256 }
00257 } else {
00258
00259
00260 if (Transport_debug_level) {
00261 ACE_DEBUG((LM_WARNING,
00262 "(%P|%t) RtpsSampleHeader::into_received_data_sample() - "
00263 "Received a DATA Submessage with D = 0 and K = 0, "
00264 "dropping\n"));
00265 }
00266 return false;
00267 }
00268 }
00269
00270 if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
00271
00272 opendds.byte_order_ = rds.sample_->rd_ptr()[1] & FLAG_E;
00273 }
00274
00275 break;
00276 }
00277 case DATA_FRAG: {
00278 const DataFragSubmessage& rtps = submessage_.data_frag_sm();
00279 opendds.cdr_encapsulation_ = true;
00280 opendds.message_length_ = message_length();
00281 opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low);
00282 opendds.publication_id_.entityId = rtps.writerId;
00283 opendds.message_id_ = SAMPLE_DATA;
00284 opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG);
00285
00286
00287 process_iqos(opendds, rtps.inlineQos);
00288
00289 const CORBA::ULong lastFragInSubmsg =
00290 rtps.fragmentStartingNum.value - 1 + rtps.fragmentsInSubmessage;
00291 if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) {
00292 opendds.more_fragments_ = true;
00293 }
00294 break;
00295 }
00296 default:
00297 break;
00298 }
00299
00300 return true;
00301 }
00302
00303 namespace {
00304 void add_timestamp(RTPS::SubmessageSeq& subm, ACE_CDR::Octet flags,
00305 const DataSampleHeader& header)
00306 {
00307 using namespace OpenDDS::RTPS;
00308 const DDS::Time_t st = {header.source_timestamp_sec_,
00309 header.source_timestamp_nanosec_};
00310 const InfoTimestampSubmessage ts = {
00311 {INFO_TS, flags, INFO_TS_SZ},
00312 {st.sec, static_cast<ACE_UINT32>(st.nanosec * NANOS_TO_RTPS_FRACS + .5)}
00313 };
00314 const CORBA::ULong i = subm.length();
00315 subm.length(i + 1);
00316 subm[i].info_ts_sm(ts);
00317 }
00318 }
00319
00320 void
00321 RtpsSampleHeader::populate_data_sample_submessages(
00322 RTPS::SubmessageSeq& subm,
00323 const DataSampleElement& dsle,
00324 bool requires_inline_qos)
00325 {
00326 using namespace OpenDDS::RTPS;
00327
00328 const ACE_CDR::Octet flags = dsle.get_header().byte_order_;
00329 add_timestamp(subm, flags, dsle.get_header());
00330 CORBA::ULong i = subm.length();
00331
00332 EntityId_t readerId = ENTITYID_UNKNOWN;
00333 if (dsle.get_num_subs() == 1) {
00334 readerId = dsle.get_sub_id(0).entityId;
00335 InfoDestinationSubmessage idest;
00336 idest.smHeader.submessageId = INFO_DST;
00337 idest.smHeader.flags = flags;
00338 idest.smHeader.submessageLength = INFO_DST_SZ;
00339 std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix,
00340 sizeof(GuidPrefix_t));
00341 subm.length(i + 1);
00342 subm[i++].info_dst_sm(idest);
00343 } else {
00344
00345 for (CORBA::ULong x = 0; x < i; ++x) {
00346 if (subm[x]._d() == INFO_DST) {
00347
00348 InfoDestinationSubmessage idest;
00349 idest.smHeader.submessageId = INFO_DST;
00350 idest.smHeader.flags = flags;
00351 idest.smHeader.submessageLength = INFO_DST_SZ;
00352 std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN,
00353 sizeof(GuidPrefix_t));
00354 subm.length(i + 1);
00355 subm[i++].info_dst_sm(idest);
00356 break;
00357 }
00358 }
00359 }
00360
00361 DataSubmessage data = {
00362 {DATA, flags, 0},
00363 0,
00364 DATA_OCTETS_TO_IQOS,
00365 readerId,
00366 dsle.get_pub_id().entityId,
00367 {dsle.get_header().sequence_.getHigh(), dsle.get_header().sequence_.getLow()},
00368 ParameterList()
00369 };
00370 const char message_id = dsle.get_header().message_id_;
00371 switch (message_id) {
00372 case SAMPLE_DATA:
00373
00374 data.smHeader.flags |= FLAG_D;
00375 break;
00376 default:
00377 ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): "
00378 "Non-sample messages seen, message_id = %d\n", int(message_id)));
00379 break;
00380 }
00381
00382 if (requires_inline_qos) {
00383 TransportSendListener::InlineQosData qos_data;
00384 dsle.get_send_listener()->retrieve_inline_qos_data(qos_data);
00385
00386 populate_inline_qos(qos_data, data.inlineQos);
00387 }
00388
00389 if (data.inlineQos.length() > 0) {
00390 data.smHeader.flags |= FLAG_Q;
00391 }
00392
00393 subm.length(i + 1);
00394 subm[i].data_sm(data);
00395 }
00396
00397 namespace {
00398
00399
00400
00401 void add_key_hash(RTPS::ParameterList& plist, const ACE_Message_Block* data)
00402 {
00403 RTPS::KeyHash_t kh;
00404 static const size_t offset = 8 ;
00405 std::memcpy(kh.value, data->rd_ptr() + offset, sizeof(GUID_t));
00406 RTPS::Parameter p;
00407 p.key_hash(kh);
00408 const CORBA::ULong i = plist.length();
00409 plist.length(i + 1);
00410 plist[i] = p;
00411 }
00412 }
00413
00414 void
00415 RtpsSampleHeader::populate_data_control_submessages(
00416 OpenDDS::RTPS::SubmessageSeq& subm,
00417 const TransportSendControlElement& tsce,
00418 bool requires_inline_qos)
00419 {
00420 using namespace OpenDDS::RTPS;
00421
00422 const DataSampleHeader& header = tsce.header();
00423 const ACE_CDR::Octet flags = header.byte_order_;
00424 add_timestamp(subm, flags, header);
00425 CORBA::ULong i = subm.length();
00426
00427 static const CORBA::Octet BUILT_IN_WRITER = 0xC2;
00428
00429 DataSubmessage data = {
00430 {DATA, flags, 0},
00431 0,
00432 DATA_OCTETS_TO_IQOS,
00433 ENTITYID_UNKNOWN,
00434 header.publication_id_.entityId,
00435 {header.sequence_.getHigh(), header.sequence_.getLow()},
00436 ParameterList()
00437 };
00438 switch (header.message_id_) {
00439 case INSTANCE_REGISTRATION: {
00440
00441
00442
00443 data.smHeader.flags |= FLAG_K_IN_DATA;
00444 const int qos_len = data.inlineQos.length();
00445 data.inlineQos.length(qos_len + 1);
00446 data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
00447 break;
00448 }
00449 case UNREGISTER_INSTANCE: {
00450 data.smHeader.flags |= FLAG_K_IN_DATA;
00451 const int qos_len = data.inlineQos.length();
00452 data.inlineQos.length(qos_len+1);
00453 data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
00454 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00455 add_key_hash(data.inlineQos, tsce.msg_payload());
00456 }
00457 break;
00458 }
00459 case DISPOSE_INSTANCE: {
00460 data.smHeader.flags |= FLAG_K_IN_DATA;
00461 const int qos_len = data.inlineQos.length();
00462 data.inlineQos.length(qos_len + 1);
00463 data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
00464 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00465 add_key_hash(data.inlineQos, tsce.msg_payload());
00466 }
00467 break;
00468 }
00469 case DISPOSE_UNREGISTER_INSTANCE: {
00470 data.smHeader.flags |= FLAG_K_IN_DATA;
00471 const int qos_len = data.inlineQos.length();
00472 data.inlineQos.length(qos_len + 1);
00473 data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
00474 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00475 add_key_hash(data.inlineQos, tsce.msg_payload());
00476 }
00477 break;
00478 }
00479
00480 default:
00481 ACE_DEBUG((LM_INFO,
00482 "RtpsSampleHeader::populate_data_control_submessages(): "
00483 "Non-sample messages seen, message_id = %d\n",
00484 header.message_id_));
00485 break;
00486 }
00487
00488 if (requires_inline_qos) {
00489 TransportSendListener::InlineQosData qos_data;
00490 tsce.listener()->retrieve_inline_qos_data(qos_data);
00491
00492 populate_inline_qos(qos_data, data.inlineQos);
00493 }
00494
00495 if (data.inlineQos.length() > 0) {
00496 data.smHeader.flags |= FLAG_Q;
00497 }
00498
00499 subm.length(i + 1);
00500 subm[i].data_sm(data);
00501 }
00502
00503 #define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS) \
00504 if (WRITER_QOS.QOS_NAME != DEFAULT_QOS.QOS_NAME) { \
00505 const int qos_len = plist.length(); \
00506 plist.length(qos_len + 1); \
00507 plist[qos_len].QOS_NAME(WRITER_QOS.QOS_NAME); \
00508 }
00509
00510 void
00511 RtpsSampleHeader::populate_inline_qos(
00512 const TransportSendListener::InlineQosData& qos_data,
00513 RTPS::ParameterList& plist)
00514 {
00515 using namespace OpenDDS::RTPS;
00516
00517
00518 {
00519 const int qos_len = plist.length();
00520 plist.length(qos_len + 1);
00521 plist[qos_len].string_data(qos_data.topic_name.c_str());
00522 plist[qos_len]._d(PID_TOPIC_NAME);
00523 }
00524
00525
00526
00527 DDS::PublisherQos default_pub_qos =
00528 TheServiceParticipant->initial_PublisherQos();
00529 PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos);
00530 PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos);
00531
00532 DDS::DataWriterQos default_dw_qos =
00533 TheServiceParticipant->initial_DataWriterQos();
00534 PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos);
00535 PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos);
00536 PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos);
00537 PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos);
00538 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00539 PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos);
00540 #endif
00541 PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos);
00542 PROCESS_INLINE_QOS(reliability, default_dw_qos, qos_data.dw_qos);
00543 PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos);
00544 PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos);
00545 PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos);
00546 }
00547
00548 #undef PROCESS_INLINE_QOS
00549
00550
00551 namespace {
00552 void write(ACE_Message_Block* mb, ACE_CDR::UShort s, bool swap_bytes)
00553 {
00554 const char* ps = reinterpret_cast<const char*>(&s);
00555 if (swap_bytes) {
00556 ACE_CDR::swap_2(ps, mb->wr_ptr());
00557 mb->wr_ptr(2);
00558 } else {
00559 mb->copy(ps, 2);
00560 }
00561 }
00562
00563 void write(ACE_Message_Block* mb, ACE_CDR::ULong i, bool swap_bytes)
00564 {
00565 const char* pi = reinterpret_cast<const char*>(&i);
00566 if (swap_bytes) {
00567 ACE_CDR::swap_4(pi, mb->wr_ptr());
00568 mb->wr_ptr(4);
00569 } else {
00570 mb->copy(pi, 4);
00571 }
00572 }
00573
00574
00575 void peek(ACE_CDR::UShort& target, const char* src, bool swap_bytes)
00576 {
00577 if (swap_bytes) {
00578 ACE_CDR::swap_2(src, reinterpret_cast<char*>(&target));
00579 } else {
00580 std::memcpy(&target, src, sizeof(ACE_CDR::UShort));
00581 }
00582 }
00583
00584 void peek(ACE_CDR::ULong& target, const char* src, bool swap_bytes)
00585 {
00586 if (swap_bytes) {
00587 ACE_CDR::swap_4(src, reinterpret_cast<char*>(&target));
00588 } else {
00589 std::memcpy(&target, src, sizeof(ACE_CDR::ULong));
00590 }
00591 }
00592
00593 const size_t FRAG_START_OFFSET = 24, FRAG_SAMPLE_SIZE_OFFSET = 32;
00594 }
00595
00596 SequenceRange
00597 RtpsSampleHeader::split(const ACE_Message_Block& orig, size_t size,
00598 ACE_Message_Block*& head, ACE_Message_Block*& tail)
00599 {
00600 using namespace RTPS;
00601 static const SequenceRange unknown_range(SequenceNumber::SEQUENCENUMBER_UNKNOWN(),
00602 SequenceNumber::SEQUENCENUMBER_UNKNOWN());
00603 size_t data_offset = 0;
00604 const char* rd = orig.rd_ptr();
00605 ACE_CDR::ULong starting_frag, sample_size;
00606 ACE_CDR::Octet flags;
00607 bool swap_bytes;
00608
00609
00610
00611
00612 while (true) {
00613 flags = rd[data_offset + 1];
00614 swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E);
00615 bool found_data = false;
00616
00617 switch (rd[data_offset]) {
00618 case DATA:
00619 if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) {
00620 if (Transport_debug_level) {
00621 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
00622 "attempting to fragment a Data submessage with no payload.\n"));
00623 }
00624 return unknown_range;
00625 }
00626 found_data = true;
00627 starting_frag = 1;
00628 sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length());
00629 break;
00630 case DATA_FRAG:
00631 found_data = true;
00632 peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
00633 peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
00634 break;
00635 }
00636
00637 if (found_data) {
00638 break;
00639 }
00640
00641
00642 ACE_CDR::UShort octetsToNextHeader;
00643 peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
00644
00645 data_offset += octetsToNextHeader + SMHDR_SZ;
00646 if (data_offset >= orig.length()) {
00647 if (Transport_debug_level) {
00648 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
00649 "invalid octetsToNextHeader encountered while fragmenting.\n"));
00650 }
00651 return unknown_range;
00652 }
00653 }
00654
00655
00656 size_t sz = orig.length();
00657 ACE_CDR::Octet new_flags = flags;
00658 size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS;
00659 if (rd[data_offset] == DATA) {
00660 sz += 12;
00661 iqos_offset -= 12;
00662 new_flags &= ~(FLAG_K_IN_DATA | FLAG_K_IN_FRAG);
00663 if (flags & FLAG_K_IN_DATA) {
00664 new_flags |= FLAG_K_IN_FRAG;
00665 }
00666 }
00667 head = DataSampleHeader::alloc_msgblock(orig, sz, false);
00668
00669 head->copy(rd, data_offset);
00670
00671 head->wr_ptr()[0] = DATA_FRAG;
00672 head->wr_ptr()[1] = new_flags;
00673 head->wr_ptr(2);
00674
00675 std::memset(head->wr_ptr(), 0, 4);
00676 head->wr_ptr(4);
00677
00678 write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
00679
00680 head->copy(rd + data_offset + 8, 16);
00681
00682 write(head, starting_frag, swap_bytes);
00683 const size_t max_data = size - sz, orig_payload = orig.cont()->total_length();
00684 const ACE_CDR::UShort frags =
00685 static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
00686 write(head, frags, swap_bytes);
00687 write(head, FRAG_SIZE, swap_bytes);
00688 write(head, sample_size, swap_bytes);
00689
00690 if (flags & FLAG_Q) {
00691 head->copy(rd + iqos_offset, orig.length() - iqos_offset);
00692 }
00693
00694
00695 tail = DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false);
00696
00697 tail->copy(rd, data_offset);
00698
00699 tail->wr_ptr()[0] = DATA_FRAG;
00700 tail->wr_ptr()[1] = new_flags & ~FLAG_Q;
00701 tail->wr_ptr(2);
00702
00703 std::memset(tail->wr_ptr(), 0, 4);
00704 tail->wr_ptr(4);
00705
00706 write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
00707 tail->copy(rd + data_offset + 8, 16);
00708
00709 write(tail, starting_frag + frags, swap_bytes);
00710 const size_t tail_data = orig_payload - frags * FRAG_SIZE;
00711 const ACE_CDR::UShort tail_frags =
00712 static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE);
00713 write(tail, tail_frags, swap_bytes);
00714 write(tail, FRAG_SIZE, swap_bytes);
00715 write(tail, sample_size, swap_bytes);
00716
00717 ACE_Message_Block* payload_head = 0;
00718 ACE_Message_Block* payload_tail;
00719 DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE,
00720 payload_head, payload_tail);
00721 head->cont(payload_head);
00722 tail->cont(payload_tail);
00723
00724 return SequenceRange(starting_frag + frags - 1,
00725 starting_frag + frags + tail_frags - 1);
00726 }
00727
00728 }
00729 }