RtpsSampleHeader.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsSampleHeader.h"
00009 #include "RtpsUdpSendStrategy.h"
00010 
00011 #include "dds/DCPS/Serializer.h"
00012 #include "dds/DCPS/DataSampleElement.h"
00013 #include "dds/DCPS/Marked_Default_Qos.h"
00014 #include "dds/DCPS/Qos_Helper.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/DisjointSequence.h"
00017 
00018 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00019 #include "dds/DCPS/RTPS/MessageTypes.h"
00020 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00021 
00022 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00023 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00024 
00025 #include <cstring>
00026 
00027 #ifndef __ACE_INLINE__
00028 #include "RtpsSampleHeader.inl"
00029 #endif
00030 
00031 namespace {
00032   enum { FLAG_E = 1, FLAG_Q = 2, FLAG_D = 4,
00033          FLAG_K_IN_DATA = 8, FLAG_K_IN_FRAG = 4 };
00034 
00035   const OpenDDS::RTPS::StatusInfo_t STATUS_INFO_REGISTER = { { 0, 0, 0, 0 } },
00036           STATUS_INFO_DISPOSE = { { 0, 0, 0, 1 } },
00037           STATUS_INFO_UNREGISTER = { { 0, 0, 0, 2 } },
00038           STATUS_INFO_DISPOSE_UNREGISTER = { { 0, 0, 0, 3 } };
00039 }
00040 
00041 namespace OpenDDS {
00042 namespace RTPS {
00043 
00044 #ifndef OPENDDS_SAFETY_PROFILE
00045 inline bool
00046 operator==(const StatusInfo_t& lhs, const StatusInfo_t& rhs)
00047 {
00048   return
00049     (lhs.value[3] == rhs.value[3]) &&
00050     (lhs.value[2] == rhs.value[2]) &&
00051     (lhs.value[1] == rhs.value[1]) &&
00052     (lhs.value[0] == rhs.value[0]);
00053 }
00054 #endif
00055 
00056 }
00057 namespace DCPS {
00058 
00059 void
00060 RtpsSampleHeader::init(ACE_Message_Block& mb)
00061 {
00062   using namespace OpenDDS::RTPS;
00063 
00064   // valid_ is false here, it will only be set to true if there is a Submessage
00065 
00066   // Manually grab the first two bytes for the SubmessageKind and the byte order
00067   if (mb.length() == 0) {
00068     return;
00069   }
00070 
00071   const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr());
00072 
00073   ACE_CDR::Octet flags = 0;
00074 
00075   if (mb.length() > 1) {
00076     flags = mb.rd_ptr()[1];
00077   } else if (mb.cont() && mb.cont()->length() > 0) {
00078     flags = mb.cont()->rd_ptr()[0];
00079   } else {
00080     return;
00081   }
00082 
00083   const bool little_endian = flags & FLAG_E;
00084   const size_t starting_length = mb.total_length();
00085   Serializer ser(&mb, ACE_CDR_BYTE_ORDER != little_endian,
00086                  Serializer::ALIGN_CDR);
00087 
00088   ACE_CDR::UShort octetsToNextHeader = 0;
00089 
00090 #define CASE_SMKIND(kind, class, name) case kind: {               \
00091     class submessage;                                             \
00092     if (ser >> submessage) {                                      \
00093       octetsToNextHeader = submessage.smHeader.submessageLength;  \
00094       submessage_.name##_sm(submessage);                          \
00095       valid_ = true;                                              \
00096     }                                                             \
00097     break;                                                        \
00098   }
00099 
00100   switch (kind) {
00101   CASE_SMKIND(PAD, PadSubmessage, pad)
00102   CASE_SMKIND(ACKNACK, AckNackSubmessage, acknack)
00103   CASE_SMKIND(HEARTBEAT, HeartBeatSubmessage, heartbeat)
00104   CASE_SMKIND(GAP, GapSubmessage, gap)
00105   CASE_SMKIND(INFO_TS, InfoTimestampSubmessage, info_ts)
00106   CASE_SMKIND(INFO_SRC, InfoSourceSubmessage, info_src)
00107   CASE_SMKIND(INFO_REPLY_IP4, InfoReplyIp4Submessage, info_reply_ipv4)
00108   CASE_SMKIND(INFO_DST, InfoDestinationSubmessage, info_dst)
00109   CASE_SMKIND(INFO_REPLY, InfoReplySubmessage, info_reply)
00110   CASE_SMKIND(NACK_FRAG, NackFragSubmessage, nack_frag)
00111   CASE_SMKIND(HEARTBEAT_FRAG, HeartBeatFragSubmessage, hb_frag)
00112   CASE_SMKIND(DATA, DataSubmessage, data)
00113   CASE_SMKIND(DATA_FRAG, DataFragSubmessage, data_frag)
00114   default:
00115     {
00116       SubmessageHeader submessage;
00117       if (ser >> submessage) {
00118         octetsToNextHeader = submessage.submessageLength;
00119         submessage_.unknown_sm(submessage);
00120         valid_ = true;
00121       }
00122       break;
00123     }
00124   }
00125 #undef CASE_SMKIND
00126 
00127   if (valid_) {
00128 
00129     frag_ = (kind == DATA_FRAG);
00130 
00131     // marshaled_size_ is # of bytes of submessage we have read from "mb"
00132     marshaled_size_ = starting_length - mb.total_length();
00133 
00134     if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) {
00135       // see RTPS v2.1 section 9.4.5.1.3
00136       // In this case the current Submessage extends to the end of Message,
00137       // so we will use the message_length_ that was set in pdu_remaining().
00138       octetsToNextHeader =
00139         static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ);
00140     }
00141 
00142     if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA)))
00143         || kind == DATA_FRAG) {
00144       // These Submessages have a payload which we haven't deserialized yet.
00145       // The TransportReceiveStrategy will know this via message_length().
00146       // octetsToNextHeader does not count the SubmessageHeader (4 bytes)
00147       message_length_ = octetsToNextHeader + SMHDR_SZ - marshaled_size_;
00148     } else {
00149       // These Submessages _could_ have extra data that we don't know about
00150       // (from a newer minor version of the RTPS spec).  Either way, indicate
00151       // to the TransportReceiveStrategy that there is no data payload here.
00152       message_length_ = 0;
00153       ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(marshaled_size_);
00154       if (octetsToNextHeader + SMHDR_SZ > marshaled) {
00155         valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled);
00156         marshaled_size_ = octetsToNextHeader + SMHDR_SZ;
00157       }
00158     }
00159   }
00160 }
00161 
00162 void
00163 RtpsSampleHeader::process_iqos(DataSampleHeader& opendds,
00164                                const RTPS::ParameterList& iqos)
00165 {
00166   using namespace OpenDDS::RTPS;
00167 #if defined(OPENDDS_TEST_INLINE_QOS)
00168   OPENDDS_STRING output("into_received_data_sample(): ");
00169   output += to_dds_string(iqos.length());
00170   output += " inline QoS parameters\n";
00171   for (CORBA::ULong index = 0; index < iqos.length(); ++index) {
00172     output += "  parameter type = ";
00173     output += to_dds_string(iqos[index]._d());
00174     output += "\n";
00175   }
00176   ACE_DEBUG((LM_DEBUG, "%C", output.c_str()));
00177 #endif
00178   for (CORBA::ULong i = 0; i < iqos.length(); ++i) {
00179     if (iqos[i]._d() == PID_STATUS_INFO) {
00180       if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
00181         opendds.message_id_ = DISPOSE_INSTANCE;
00182       } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
00183         opendds.message_id_ = UNREGISTER_INSTANCE;
00184       } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
00185         opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE;
00186       } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
00187         opendds.message_id_ = INSTANCE_REGISTRATION;
00188       }
00189     } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) {
00190       opendds.historic_sample_ = true;
00191 #if defined(OPENDDS_TEST_INLINE_QOS)
00192     } else if (iqos[i]._d() == PID_TOPIC_NAME) {
00193       ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data()));
00194     } else if (iqos[i]._d() == PID_PRESENTATION) {
00195       DDS::PresentationQosPolicy pres_qos = iqos[i].presentation();
00196       ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, "
00197                  "coherent_access = %d, ordered_access = %d\n",
00198                  pres_qos.access_scope, pres_qos.coherent_access,
00199                  pres_qos.ordered_access));
00200     } else if (iqos[i]._d() == PID_PARTITION) {
00201       DDS::PartitionQosPolicy part_qos = iqos[i].partition();
00202       ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length()));
00203       for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) {
00204         ACE_DEBUG((LM_DEBUG, "'%C'  ", part_qos.name[j].in()));
00205       }
00206       ACE_DEBUG((LM_DEBUG, "\n"));
00207 #endif
00208     }
00209   }
00210 }
00211 
00212 bool
00213 RtpsSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
00214 {
00215   using namespace OpenDDS::RTPS;
00216   DataSampleHeader& opendds = rds.header_;
00217 
00218   switch (submessage_._d()) {
00219   case DATA: {
00220     const DataSubmessage& rtps = submessage_.data_sm();
00221     opendds.cdr_encapsulation_ = true;
00222     opendds.message_length_ = message_length();
00223     opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low);
00224     opendds.publication_id_.entityId = rtps.writerId;
00225     opendds.message_id_ = SAMPLE_DATA;
00226 
00227     process_iqos(opendds, rtps.inlineQos);
00228 
00229     if (rtps.smHeader.flags & FLAG_K_IN_DATA) {
00230       opendds.key_fields_only_ = true;
00231     } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) {
00232       // Interoperability note: the Key may be hiding in the "key hash" param
00233       // in the InlineQos.  In order to make use of this Key, it mst be 16
00234       // bytes or less.  We have observed other DDS implementations only send
00235       // the MD5 hash of a >16 byte key, so we must limit this to Built-in
00236       // endpoints which are assumed to use GUIDs as keys.
00237       if ((rtps.writerId.entityKind & 0xC0) == 0xC0 // Only Built-in endpoints
00238           && (rtps.smHeader.flags & FLAG_Q) && !rds.sample_) {
00239         for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) {
00240           if (rtps.inlineQos[i]._d() == PID_KEY_HASH) {
00241             rds.sample_ = new ACE_Message_Block(20);
00242             // CDR_BE encapsuation scheme (endianness is not used for key hash)
00243             rds.sample_->copy("\x00\x00\x00\x00", 4);
00244             const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value;
00245             rds.sample_->copy(reinterpret_cast<const char*>(data), 16);
00246             opendds.message_length_ =
00247               static_cast<ACE_UINT32>(rds.sample_->length());
00248             opendds.key_fields_only_ = true;
00249             if (Transport_debug_level) {
00250               ACE_DEBUG((LM_DEBUG,
00251                          "(%P|%t) RtpsSampleHeader::into_received_data_sample()"
00252                          " - used KeyHash data as the key-only payload\n"));
00253             }
00254             break;
00255           }
00256         }
00257       } else {
00258       // FUTURE: Handle the case of D = 0 and K = 0
00259       // used for Coherent Sets in PRESENTATION QoS (see 8.7.5)
00260         if (Transport_debug_level) {
00261           ACE_DEBUG((LM_WARNING,
00262                      "(%P|%t) RtpsSampleHeader::into_received_data_sample() - "
00263                      "Received a DATA Submessage with D = 0 and K = 0, "
00264                      "dropping\n"));
00265         }
00266         return false;
00267       }
00268     }
00269 
00270     if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
00271       // Peek at the byte order from the encapsulation containing the payload.
00272       opendds.byte_order_ = rds.sample_->rd_ptr()[1] & FLAG_E;
00273     }
00274 
00275     break;
00276   }
00277   case DATA_FRAG: {
00278     const DataFragSubmessage& rtps = submessage_.data_frag_sm();
00279     opendds.cdr_encapsulation_ = true;
00280     opendds.message_length_ = message_length();
00281     opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low);
00282     opendds.publication_id_.entityId = rtps.writerId;
00283     opendds.message_id_ = SAMPLE_DATA;
00284     opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG);
00285     // opendds.byte_order_ set in RtpsUdpReceiveStrategy::reassemble().
00286 
00287     process_iqos(opendds, rtps.inlineQos);
00288 
00289     const CORBA::ULong lastFragInSubmsg =
00290       rtps.fragmentStartingNum.value - 1 + rtps.fragmentsInSubmessage;
00291     if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) {
00292       opendds.more_fragments_ = true;
00293     }
00294     break;
00295   }
00296   default:
00297     break;
00298   }
00299 
00300   return true;
00301 }
00302 
00303 namespace {
00304   void add_timestamp(RTPS::SubmessageSeq& subm, ACE_CDR::Octet flags,
00305     const DataSampleHeader& header)
00306   {
00307     using namespace OpenDDS::RTPS;
00308     const DDS::Time_t st = {header.source_timestamp_sec_,
00309                             header.source_timestamp_nanosec_};
00310     const InfoTimestampSubmessage ts = {
00311       {INFO_TS, flags, INFO_TS_SZ},
00312       {st.sec, static_cast<ACE_UINT32>(st.nanosec * NANOS_TO_RTPS_FRACS + .5)}
00313     };
00314     const CORBA::ULong i = subm.length();
00315     subm.length(i + 1);
00316     subm[i].info_ts_sm(ts);
00317   }
00318 }
00319 
00320 void
00321 RtpsSampleHeader::populate_data_sample_submessages(
00322   RTPS::SubmessageSeq& subm,
00323   const DataSampleElement& dsle,
00324   bool requires_inline_qos)
00325 {
00326   using namespace OpenDDS::RTPS;
00327 
00328   const ACE_CDR::Octet flags = dsle.get_header().byte_order_;
00329   add_timestamp(subm, flags, dsle.get_header());
00330   CORBA::ULong i = subm.length();
00331 
00332   EntityId_t readerId = ENTITYID_UNKNOWN;
00333   if (dsle.get_num_subs() == 1) {
00334     readerId = dsle.get_sub_id(0).entityId;
00335     InfoDestinationSubmessage idest;
00336     idest.smHeader.submessageId = INFO_DST;
00337     idest.smHeader.flags = flags;
00338     idest.smHeader.submessageLength = INFO_DST_SZ;
00339     std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix,
00340                 sizeof(GuidPrefix_t));
00341     subm.length(i + 1);
00342     subm[i++].info_dst_sm(idest);
00343   } else {
00344     //Not durability resend, but could have inline gaps
00345     for (CORBA::ULong x = 0; x < i; ++x) {
00346       if (subm[x]._d() == INFO_DST) {
00347         //Need to add INFO_DST
00348         InfoDestinationSubmessage idest;
00349         idest.smHeader.submessageId = INFO_DST;
00350         idest.smHeader.flags = flags;
00351         idest.smHeader.submessageLength = INFO_DST_SZ;
00352         std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN,
00353                     sizeof(GuidPrefix_t));
00354         subm.length(i + 1);
00355         subm[i++].info_dst_sm(idest);
00356         break;
00357       }
00358     }
00359   }
00360 
00361   DataSubmessage data = {
00362     {DATA, flags, 0},
00363     0,
00364     DATA_OCTETS_TO_IQOS,
00365     readerId,
00366     dsle.get_pub_id().entityId,
00367     {dsle.get_header().sequence_.getHigh(), dsle.get_header().sequence_.getLow()},
00368     ParameterList()
00369   };
00370   const char message_id = dsle.get_header().message_id_;
00371   switch (message_id) {
00372   case SAMPLE_DATA:
00373     // Must be a data message
00374     data.smHeader.flags |= FLAG_D;
00375     break;
00376   default:
00377     ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): "
00378                "Non-sample messages seen, message_id = %d\n", int(message_id)));
00379     break;
00380   }
00381 
00382   if (requires_inline_qos) {
00383     TransportSendListener::InlineQosData qos_data;
00384     dsle.get_send_listener()->retrieve_inline_qos_data(qos_data);
00385 
00386     populate_inline_qos(qos_data, data.inlineQos);
00387   }
00388 
00389   if (data.inlineQos.length() > 0) {
00390     data.smHeader.flags |= FLAG_Q;
00391   }
00392 
00393   subm.length(i + 1);
00394   subm[i].data_sm(data);
00395 }
00396 
00397 namespace {
00398   // Interoperability note:
00399   // used for discovery (SEDP) only, this helps interoperability because they
00400   // spec is unclear about the use of PID_KEY_HASH vs. the key as the payload
00401   void add_key_hash(RTPS::ParameterList& plist, const ACE_Message_Block* data)
00402   {
00403     RTPS::KeyHash_t kh;
00404     static const size_t offset = 8 /*skip encap (4) and plist hdr (4)*/;
00405     std::memcpy(kh.value, data->rd_ptr() + offset, sizeof(GUID_t));
00406     RTPS::Parameter p;
00407     p.key_hash(kh);
00408     const CORBA::ULong i = plist.length();
00409     plist.length(i + 1);
00410     plist[i] = p;
00411   }
00412 }
00413 
00414 void
00415 RtpsSampleHeader::populate_data_control_submessages(
00416   OpenDDS::RTPS::SubmessageSeq& subm,
00417   const TransportSendControlElement& tsce,
00418   bool requires_inline_qos)
00419 {
00420   using namespace OpenDDS::RTPS;
00421 
00422   const DataSampleHeader& header = tsce.header();
00423   const ACE_CDR::Octet flags = header.byte_order_;
00424   add_timestamp(subm, flags, header);
00425   CORBA::ULong i = subm.length();
00426 
00427   static const CORBA::Octet BUILT_IN_WRITER = 0xC2;
00428 
00429   DataSubmessage data = {
00430     {DATA, flags, 0},
00431     0,
00432     DATA_OCTETS_TO_IQOS,
00433     ENTITYID_UNKNOWN,
00434     header.publication_id_.entityId,
00435     {header.sequence_.getHigh(), header.sequence_.getLow()},
00436     ParameterList()
00437   };
00438   switch (header.message_id_) {
00439   case INSTANCE_REGISTRATION: {
00440     // NOTE: The RTPS spec is not entirely clear about instance registration.
00441     // We have decided to send a DATA Submessage containing the key and an
00442     // inlineQoS StatusInfo of zero.
00443     data.smHeader.flags |= FLAG_K_IN_DATA;
00444     const int qos_len = data.inlineQos.length();
00445     data.inlineQos.length(qos_len + 1);
00446     data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
00447     break;
00448   }
00449   case UNREGISTER_INSTANCE: {
00450     data.smHeader.flags |= FLAG_K_IN_DATA;
00451     const int qos_len = data.inlineQos.length();
00452     data.inlineQos.length(qos_len+1);
00453     data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
00454     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00455       add_key_hash(data.inlineQos, tsce.msg_payload());
00456     }
00457     break;
00458   }
00459   case DISPOSE_INSTANCE: {
00460     data.smHeader.flags |= FLAG_K_IN_DATA;
00461     const int qos_len = data.inlineQos.length();
00462     data.inlineQos.length(qos_len + 1);
00463     data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
00464     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00465       add_key_hash(data.inlineQos, tsce.msg_payload());
00466     }
00467     break;
00468   }
00469   case DISPOSE_UNREGISTER_INSTANCE: {
00470     data.smHeader.flags |= FLAG_K_IN_DATA;
00471     const int qos_len = data.inlineQos.length();
00472     data.inlineQos.length(qos_len + 1);
00473     data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
00474     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00475       add_key_hash(data.inlineQos, tsce.msg_payload());
00476     }
00477     break;
00478   }
00479   // update control_message_supported() when adding new cases here
00480   default:
00481     ACE_DEBUG((LM_INFO,
00482                "RtpsSampleHeader::populate_data_control_submessages(): "
00483                "Non-sample messages seen, message_id = %d\n",
00484                header.message_id_));
00485     break;
00486   }
00487 
00488   if (requires_inline_qos) {
00489     TransportSendListener::InlineQosData qos_data;
00490     tsce.listener()->retrieve_inline_qos_data(qos_data);
00491 
00492     populate_inline_qos(qos_data, data.inlineQos);
00493   }
00494 
00495   if (data.inlineQos.length() > 0) {
00496     data.smHeader.flags |= FLAG_Q;
00497   }
00498 
00499   subm.length(i + 1);
00500   subm[i].data_sm(data);
00501 }
00502 
00503 #define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS) \
00504   if (WRITER_QOS.QOS_NAME != DEFAULT_QOS.QOS_NAME) {          \
00505     const int qos_len = plist.length();                       \
00506     plist.length(qos_len + 1);                                \
00507     plist[qos_len].QOS_NAME(WRITER_QOS.QOS_NAME);             \
00508   }
00509 
00510 void
00511 RtpsSampleHeader::populate_inline_qos(
00512   const TransportSendListener::InlineQosData& qos_data,
00513   RTPS::ParameterList& plist)
00514 {
00515   using namespace OpenDDS::RTPS;
00516 
00517   // Always include topic name (per the spec)
00518   {
00519     const int qos_len = plist.length();
00520     plist.length(qos_len + 1);
00521     plist[qos_len].string_data(qos_data.topic_name.c_str());
00522     plist[qos_len]._d(PID_TOPIC_NAME);
00523   }
00524 
00525   // Conditionally include other QoS inline when the differ from the
00526   // default value.
00527   DDS::PublisherQos default_pub_qos =
00528     TheServiceParticipant->initial_PublisherQos();
00529   PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos);
00530   PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos);
00531 
00532   DDS::DataWriterQos default_dw_qos =
00533     TheServiceParticipant->initial_DataWriterQos();
00534   PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos);
00535   PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos);
00536   PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos);
00537   PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos);
00538 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00539   PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos);
00540 #endif
00541   PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos);
00542   PROCESS_INLINE_QOS(reliability, default_dw_qos, qos_data.dw_qos);
00543   PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos);
00544   PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos);
00545   PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos);
00546 }
00547 
00548 #undef PROCESS_INLINE_QOS
00549 
00550 // simple marshaling helpers for RtpsSampleHeader::split()
00551 namespace {
00552   void write(ACE_Message_Block* mb, ACE_CDR::UShort s, bool swap_bytes)
00553   {
00554     const char* ps = reinterpret_cast<const char*>(&s);
00555     if (swap_bytes) {
00556       ACE_CDR::swap_2(ps, mb->wr_ptr());
00557       mb->wr_ptr(2);
00558     } else {
00559       mb->copy(ps, 2);
00560     }
00561   }
00562 
00563   void write(ACE_Message_Block* mb, ACE_CDR::ULong i, bool swap_bytes)
00564   {
00565     const char* pi = reinterpret_cast<const char*>(&i);
00566     if (swap_bytes) {
00567       ACE_CDR::swap_4(pi, mb->wr_ptr());
00568       mb->wr_ptr(4);
00569     } else {
00570       mb->copy(pi, 4);
00571     }
00572   }
00573 
00574   // read without advancing rd_ptr()
00575   void peek(ACE_CDR::UShort& target, const char* src, bool swap_bytes)
00576   {
00577     if (swap_bytes) {
00578       ACE_CDR::swap_2(src, reinterpret_cast<char*>(&target));
00579     } else {
00580       std::memcpy(&target, src, sizeof(ACE_CDR::UShort));
00581     }
00582   }
00583 
00584   void peek(ACE_CDR::ULong& target, const char* src, bool swap_bytes)
00585   {
00586     if (swap_bytes) {
00587       ACE_CDR::swap_4(src, reinterpret_cast<char*>(&target));
00588     } else {
00589       std::memcpy(&target, src, sizeof(ACE_CDR::ULong));
00590     }
00591   }
00592 
00593   const size_t FRAG_START_OFFSET = 24, FRAG_SAMPLE_SIZE_OFFSET = 32;
00594 }
00595 
00596 SequenceRange
00597 RtpsSampleHeader::split(const ACE_Message_Block& orig, size_t size,
00598                         ACE_Message_Block*& head, ACE_Message_Block*& tail)
00599 {
00600   using namespace RTPS;
00601   static const SequenceRange unknown_range(SequenceNumber::SEQUENCENUMBER_UNKNOWN(),
00602                                            SequenceNumber::SEQUENCENUMBER_UNKNOWN());
00603   size_t data_offset = 0;
00604   const char* rd = orig.rd_ptr();
00605   ACE_CDR::ULong starting_frag, sample_size;
00606   ACE_CDR::Octet flags;
00607   bool swap_bytes;
00608 
00609   // Find the start of the DATA | DATA_FRAG submessage in the orig msg block.
00610   // The submessages from the start of the msg block to this point (data_offset)
00611   // will be copied to both the head and tail fragments.
00612   while (true) {
00613     flags = rd[data_offset + 1];
00614     swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E);
00615     bool found_data = false;
00616 
00617     switch (rd[data_offset]) {
00618     case DATA:
00619       if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) {
00620         if (Transport_debug_level) {
00621           ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
00622             "attempting to fragment a Data submessage with no payload.\n"));
00623         }
00624         return unknown_range;
00625       }
00626       found_data = true;
00627       starting_frag = 1;
00628       sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length());
00629       break;
00630     case DATA_FRAG:
00631       found_data = true;
00632       peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
00633       peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
00634       break;
00635     }
00636 
00637     if (found_data) {
00638       break;
00639     }
00640 
00641     // Scan for next submessage in orig
00642     ACE_CDR::UShort octetsToNextHeader;
00643     peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
00644 
00645     data_offset += octetsToNextHeader + SMHDR_SZ;
00646     if (data_offset >= orig.length()) {
00647       if (Transport_debug_level) {
00648         ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
00649           "invalid octetsToNextHeader encountered while fragmenting.\n"));
00650       }
00651       return unknown_range;
00652     }
00653   }
00654 
00655   // Create the "head" message block (of size "sz") containing DATA_FRAG
00656   size_t sz = orig.length();
00657   ACE_CDR::Octet new_flags = flags;
00658   size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS;
00659   if (rd[data_offset] == DATA) {
00660     sz += 12; // DATA_FRAG is 12 bytes larger than DATA
00661     iqos_offset -= 12;
00662     new_flags &= ~(FLAG_K_IN_DATA | FLAG_K_IN_FRAG);
00663     if (flags & FLAG_K_IN_DATA) {
00664       new_flags |= FLAG_K_IN_FRAG;
00665     }
00666   }
00667   head = DataSampleHeader::alloc_msgblock(orig, sz, false);
00668 
00669   head->copy(rd, data_offset);
00670 
00671   head->wr_ptr()[0] = DATA_FRAG;
00672   head->wr_ptr()[1] = new_flags;
00673   head->wr_ptr(2);
00674 
00675   std::memset(head->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
00676   head->wr_ptr(4);
00677 
00678   write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
00679 
00680   head->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
00681 
00682   write(head, starting_frag, swap_bytes);
00683   const size_t max_data = size - sz, orig_payload = orig.cont()->total_length();
00684   const ACE_CDR::UShort frags =
00685     static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
00686   write(head, frags, swap_bytes);
00687   write(head, FRAG_SIZE, swap_bytes);
00688   write(head, sample_size, swap_bytes);
00689 
00690   if (flags & FLAG_Q) {
00691     head->copy(rd + iqos_offset, orig.length() - iqos_offset);
00692   }
00693 
00694   // Create the "tail" message block containing DATA_FRAG with Q=0
00695   tail = DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false);
00696 
00697   tail->copy(rd, data_offset);
00698 
00699   tail->wr_ptr()[0] = DATA_FRAG;
00700   tail->wr_ptr()[1] = new_flags & ~FLAG_Q;
00701   tail->wr_ptr(2);
00702 
00703   std::memset(tail->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
00704   tail->wr_ptr(4);
00705 
00706   write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
00707   tail->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
00708 
00709   write(tail, starting_frag + frags, swap_bytes);
00710   const size_t tail_data = orig_payload - frags * FRAG_SIZE;
00711   const ACE_CDR::UShort tail_frags =
00712     static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE);
00713   write(tail, tail_frags, swap_bytes);
00714   write(tail, FRAG_SIZE, swap_bytes);
00715   write(tail, sample_size, swap_bytes);
00716 
00717   ACE_Message_Block* payload_head = 0;
00718   ACE_Message_Block* payload_tail;
00719   DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE,
00720                                   payload_head, payload_tail);
00721   head->cont(payload_head);
00722   tail->cont(payload_tail);
00723 
00724   return SequenceRange(starting_frag + frags - 1,
00725                        starting_frag + frags + tail_frags - 1);
00726 }
00727 
00728 }
00729 }

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7