RtpsSampleHeader.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsSampleHeader.h"
00009 #include "RtpsUdpSendStrategy.h"
00010 
00011 #include "dds/DCPS/Serializer.h"
00012 #include "dds/DCPS/DataSampleElement.h"
00013 #include "dds/DCPS/Marked_Default_Qos.h"
00014 #include "dds/DCPS/Qos_Helper.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/DisjointSequence.h"
00017 
00018 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00019 #include "dds/DCPS/RTPS/MessageTypes.h"
00020 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00021 
00022 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00023 #include "dds/DCPS/transport/framework/TransportSendListener.h"
00024 
00025 #include <cstring>
00026 
00027 #ifndef __ACE_INLINE__
00028 #include "RtpsSampleHeader.inl"
00029 #endif
00030 
00031 namespace {
00032   const OpenDDS::RTPS::StatusInfo_t STATUS_INFO_REGISTER = { { 0, 0, 0, 0 } },
00033     STATUS_INFO_DISPOSE = { { 0, 0, 0, 1 } },
00034     STATUS_INFO_UNREGISTER = { { 0, 0, 0, 2 } },
00035     STATUS_INFO_DISPOSE_UNREGISTER = { { 0, 0, 0, 3 } };
00036 }
00037 
00038 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00039 
00040 namespace OpenDDS {
00041 namespace RTPS {
00042 
00043 #ifndef OPENDDS_SAFETY_PROFILE
00044 inline bool
00045 operator==(const StatusInfo_t& lhs, const StatusInfo_t& rhs)
00046 {
00047   return
00048     (lhs.value[3] == rhs.value[3]) &&
00049     (lhs.value[2] == rhs.value[2]) &&
00050     (lhs.value[1] == rhs.value[1]) &&
00051     (lhs.value[0] == rhs.value[0]);
00052 }
00053 #endif
00054 
00055 }
00056 namespace DCPS {
00057 
00058 void
00059 RtpsSampleHeader::init(ACE_Message_Block& mb)
00060 {
00061   using namespace OpenDDS::RTPS;
00062 
00063   // valid_ is false here, it will only be set to true if there is a Submessage
00064 
00065   // Manually grab the first two bytes for the SubmessageKind and the byte order
00066   if (mb.length() == 0) {
00067     return;
00068   }
00069 
00070   const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr());
00071 
00072   ACE_CDR::Octet flags = 0;
00073 
00074   if (mb.length() > 1) {
00075     flags = mb.rd_ptr()[1];
00076   } else if (mb.cont() && mb.cont()->length() > 0) {
00077     flags = mb.cont()->rd_ptr()[0];
00078   } else {
00079     return;
00080   }
00081 
00082   const bool little_endian = flags & FLAG_E;
00083   const size_t starting_length = mb.total_length();
00084   Serializer ser(&mb, ACE_CDR_BYTE_ORDER != little_endian,
00085                  Serializer::ALIGN_CDR);
00086 
00087   ACE_CDR::UShort octetsToNextHeader = 0;
00088 
00089 #define CASE_SMKIND(kind, class, name) case kind: {               \
00090     class submessage;                                             \
00091     if (ser >> submessage) {                                      \
00092       octetsToNextHeader = submessage.smHeader.submessageLength;  \
00093       submessage_.name##_sm(submessage);                          \
00094       valid_ = true;                                              \
00095     }                                                             \
00096     break;                                                        \
00097   }
00098 
00099   switch (kind) {
00100   CASE_SMKIND(PAD, PadSubmessage, pad)
00101   CASE_SMKIND(ACKNACK, AckNackSubmessage, acknack)
00102   CASE_SMKIND(HEARTBEAT, HeartBeatSubmessage, heartbeat)
00103   CASE_SMKIND(GAP, GapSubmessage, gap)
00104   CASE_SMKIND(INFO_TS, InfoTimestampSubmessage, info_ts)
00105   CASE_SMKIND(INFO_SRC, InfoSourceSubmessage, info_src)
00106   CASE_SMKIND(INFO_REPLY_IP4, InfoReplyIp4Submessage, info_reply_ipv4)
00107   CASE_SMKIND(INFO_DST, InfoDestinationSubmessage, info_dst)
00108   CASE_SMKIND(INFO_REPLY, InfoReplySubmessage, info_reply)
00109   CASE_SMKIND(NACK_FRAG, NackFragSubmessage, nack_frag)
00110   CASE_SMKIND(HEARTBEAT_FRAG, HeartBeatFragSubmessage, hb_frag)
00111   CASE_SMKIND(DATA, DataSubmessage, data)
00112   CASE_SMKIND(DATA_FRAG, DataFragSubmessage, data_frag)
00113 
00114 #if defined(OPENDDS_SECURITY)
00115     // Each submessage type introduced by the Security spec is treated
00116     // as an opaque octet sequence at this layer.
00117     case SEC_BODY:
00118     case SEC_PREFIX:
00119     case SEC_POSTFIX:
00120     case SRTPS_PREFIX:
00121     case SRTPS_POSTFIX: {
00122     SecuritySubmessage submessage;
00123     if (ser >> submessage) {
00124       octetsToNextHeader = submessage.smHeader.submessageLength;
00125       submessage_.security_sm(submessage);
00126       submessage_._d(kind);
00127       valid_ = true;
00128     }
00129     break;
00130     }
00131 #endif
00132 
00133   default:
00134     {
00135       SubmessageHeader submessage;
00136       if (ser >> submessage) {
00137         octetsToNextHeader = submessage.submessageLength;
00138         submessage_.unknown_sm(submessage);
00139         valid_ = true;
00140       }
00141       break;
00142     }
00143   }
00144 #undef CASE_SMKIND
00145 
00146   if (valid_) {
00147 
00148     frag_ = (kind == DATA_FRAG);
00149 
00150     // marshaled_size_ is # of bytes of submessage we have read from "mb"
00151     marshaled_size_ = starting_length - mb.total_length();
00152 
00153     if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) {
00154       // see RTPS v2.1 section 9.4.5.1.3
00155       // In this case the current Submessage extends to the end of Message,
00156       // so we will use the message_length_ that was set in pdu_remaining().
00157       octetsToNextHeader =
00158         static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ);
00159     }
00160 
00161     if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA)))
00162         || kind == DATA_FRAG) {
00163       // These Submessages have a payload which we haven't deserialized yet.
00164       // The TransportReceiveStrategy will know this via message_length().
00165       // octetsToNextHeader does not count the SubmessageHeader (4 bytes)
00166       message_length_ = octetsToNextHeader + SMHDR_SZ - marshaled_size_;
00167     } else {
00168       // These Submessages _could_ have extra data that we don't know about
00169       // (from a newer minor version of the RTPS spec).  Either way, indicate
00170       // to the TransportReceiveStrategy that there is no data payload here.
00171       message_length_ = 0;
00172       ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(marshaled_size_);
00173       if (octetsToNextHeader + SMHDR_SZ > marshaled) {
00174         valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled);
00175         marshaled_size_ = octetsToNextHeader + SMHDR_SZ;
00176       }
00177     }
00178   }
00179 }
00180 
00181 void
00182 RtpsSampleHeader::process_iqos(DataSampleHeader& opendds,
00183                                const RTPS::ParameterList& iqos)
00184 {
00185   using namespace OpenDDS::RTPS;
00186 #if defined(OPENDDS_TEST_INLINE_QOS)
00187   OPENDDS_STRING output("into_received_data_sample(): ");
00188   output += to_dds_string(iqos.length());
00189   output += " inline QoS parameters\n";
00190   for (CORBA::ULong index = 0; index < iqos.length(); ++index) {
00191     output += "  parameter type = ";
00192     output += to_dds_string(iqos[index]._d());
00193     output += "\n";
00194   }
00195   ACE_DEBUG((LM_DEBUG, "%C", output.c_str()));
00196 #endif
00197   for (CORBA::ULong i = 0; i < iqos.length(); ++i) {
00198     if (iqos[i]._d() == PID_STATUS_INFO) {
00199       if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
00200         opendds.message_id_ = DISPOSE_INSTANCE;
00201       } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
00202         opendds.message_id_ = UNREGISTER_INSTANCE;
00203       } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
00204         opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE;
00205       } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
00206         opendds.message_id_ = INSTANCE_REGISTRATION;
00207       }
00208     } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) {
00209       opendds.historic_sample_ = true;
00210 #if defined(OPENDDS_TEST_INLINE_QOS)
00211     } else if (iqos[i]._d() == PID_TOPIC_NAME) {
00212       ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data()));
00213     } else if (iqos[i]._d() == PID_PRESENTATION) {
00214       DDS::PresentationQosPolicy pres_qos = iqos[i].presentation();
00215       ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, "
00216                  "coherent_access = %d, ordered_access = %d\n",
00217                  pres_qos.access_scope, pres_qos.coherent_access,
00218                  pres_qos.ordered_access));
00219     } else if (iqos[i]._d() == PID_PARTITION) {
00220       DDS::PartitionQosPolicy part_qos = iqos[i].partition();
00221       ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length()));
00222       for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) {
00223         ACE_DEBUG((LM_DEBUG, "'%C'  ", part_qos.name[j].in()));
00224       }
00225       ACE_DEBUG((LM_DEBUG, "\n"));
00226 #endif
00227     }
00228   }
00229 }
00230 
00231 bool
00232 RtpsSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
00233 {
00234   using namespace OpenDDS::RTPS;
00235   DataSampleHeader& opendds = rds.header_;
00236 
00237   switch (submessage_._d()) {
00238   case DATA: {
00239     const DataSubmessage& rtps = submessage_.data_sm();
00240     opendds.cdr_encapsulation_ = true;
00241     opendds.message_length_ = message_length();
00242     opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low);
00243     opendds.publication_id_.entityId = rtps.writerId;
00244     opendds.message_id_ = SAMPLE_DATA;
00245 
00246     process_iqos(opendds, rtps.inlineQos);
00247 
00248     if (rtps.smHeader.flags & FLAG_K_IN_DATA) {
00249       opendds.key_fields_only_ = true;
00250     } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) {
00251       // Interoperability note: the Key may be hiding in the "key hash" param
00252       // in the InlineQos.  In order to make use of this Key, it mst be 16
00253       // bytes or less.  We have observed other DDS implementations only send
00254       // the MD5 hash of a >16 byte key, so we must limit this to Built-in
00255       // endpoints which are assumed to use GUIDs as keys.
00256       if ((rtps.writerId.entityKind & 0xC0) == 0xC0 // Only Built-in endpoints
00257           && (rtps.smHeader.flags & FLAG_Q) && !rds.sample_) {
00258         for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) {
00259           if (rtps.inlineQos[i]._d() == PID_KEY_HASH) {
00260             rds.sample_.reset(new ACE_Message_Block(20));
00261             // CDR_BE encapsuation scheme (endianness is not used for key hash)
00262             rds.sample_->copy("\x00\x00\x00\x00", 4);
00263             const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value;
00264             rds.sample_->copy(reinterpret_cast<const char*>(data), 16);
00265             opendds.message_length_ =
00266               static_cast<ACE_UINT32>(rds.sample_->length());
00267             opendds.key_fields_only_ = true;
00268             if (Transport_debug_level) {
00269               ACE_DEBUG((LM_DEBUG,
00270                          "(%P|%t) RtpsSampleHeader::into_received_data_sample()"
00271                          " - used KeyHash data as the key-only payload\n"));
00272             }
00273             break;
00274           }
00275         }
00276       } else {
00277       // FUTURE: Handle the case of D = 0 and K = 0
00278       // used for Coherent Sets in PRESENTATION QoS (see 8.7.5)
00279         if (Transport_debug_level) {
00280           ACE_DEBUG((LM_WARNING,
00281                      "(%P|%t) RtpsSampleHeader::into_received_data_sample() - "
00282                      "Received a DATA Submessage with D = 0 and K = 0, "
00283                      "dropping\n"));
00284         }
00285         return false;
00286       }
00287     }
00288 
00289     if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
00290       // Peek at the byte order from the encapsulation containing the payload.
00291       opendds.byte_order_ = payload_byte_order(rds);
00292     }
00293 
00294     break;
00295   }
00296   case DATA_FRAG: {
00297     const DataFragSubmessage& rtps = submessage_.data_frag_sm();
00298     opendds.cdr_encapsulation_ = true;
00299     opendds.message_length_ = message_length();
00300     opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low);
00301     opendds.publication_id_.entityId = rtps.writerId;
00302     opendds.message_id_ = SAMPLE_DATA;
00303     opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG);
00304     // opendds.byte_order_ set in RtpsUdpReceiveStrategy::reassemble().
00305 
00306     process_iqos(opendds, rtps.inlineQos);
00307 
00308     const CORBA::ULong lastFragInSubmsg =
00309       rtps.fragmentStartingNum.value - 1 + rtps.fragmentsInSubmessage;
00310     if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) {
00311       opendds.more_fragments_ = true;
00312     }
00313     break;
00314   }
00315   default:
00316     break;
00317   }
00318 
00319   return true;
00320 }
00321 
00322 bool RtpsSampleHeader::payload_byte_order(const ReceivedDataSample& rds)
00323 {
00324   return rds.sample_->rd_ptr()[1] & RTPS::FLAG_E;
00325 }
00326 
00327 namespace {
00328   void add_timestamp(RTPS::SubmessageSeq& subm, ACE_CDR::Octet flags,
00329     const DataSampleHeader& header)
00330   {
00331     using namespace OpenDDS::RTPS;
00332     const DDS::Time_t st = {header.source_timestamp_sec_,
00333                             header.source_timestamp_nanosec_};
00334     const InfoTimestampSubmessage ts = {
00335       {INFO_TS, flags, INFO_TS_SZ},
00336       {st.sec, static_cast<ACE_UINT32>(st.nanosec * NANOS_TO_RTPS_FRACS + .5)}
00337     };
00338     const CORBA::ULong i = subm.length();
00339     subm.length(i + 1);
00340     subm[i].info_ts_sm(ts);
00341   }
00342 }
00343 
00344 void
00345 RtpsSampleHeader::populate_data_sample_submessages(
00346   RTPS::SubmessageSeq& subm,
00347   const DataSampleElement& dsle,
00348   bool requires_inline_qos)
00349 {
00350   using namespace OpenDDS::RTPS;
00351 
00352   const ACE_CDR::Octet flags = dsle.get_header().byte_order_;
00353   add_timestamp(subm, flags, dsle.get_header());
00354   CORBA::ULong i = subm.length();
00355 
00356   EntityId_t readerId = ENTITYID_UNKNOWN;
00357   if (dsle.get_num_subs() == 1) {
00358     readerId = dsle.get_sub_id(0).entityId;
00359     InfoDestinationSubmessage idest;
00360     idest.smHeader.submessageId = INFO_DST;
00361     idest.smHeader.flags = flags;
00362     idest.smHeader.submessageLength = INFO_DST_SZ;
00363     std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix,
00364                 sizeof(GuidPrefix_t));
00365     subm.length(i + 1);
00366     subm[i++].info_dst_sm(idest);
00367   } else {
00368     //Not durability resend, but could have inline gaps
00369     for (CORBA::ULong x = 0; x < i; ++x) {
00370       if (subm[x]._d() == INFO_DST) {
00371         //Need to add INFO_DST
00372         InfoDestinationSubmessage idest;
00373         idest.smHeader.submessageId = INFO_DST;
00374         idest.smHeader.flags = flags;
00375         idest.smHeader.submessageLength = INFO_DST_SZ;
00376         std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN,
00377                     sizeof(GuidPrefix_t));
00378         subm.length(i + 1);
00379         subm[i++].info_dst_sm(idest);
00380         break;
00381       }
00382     }
00383   }
00384 
00385   DataSubmessage data = {
00386     {DATA, flags, 0},
00387     0,
00388     DATA_OCTETS_TO_IQOS,
00389     readerId,
00390     dsle.get_pub_id().entityId,
00391     {dsle.get_header().sequence_.getHigh(), dsle.get_header().sequence_.getLow()},
00392     ParameterList()
00393   };
00394   const char message_id = dsle.get_header().message_id_;
00395   switch (message_id) {
00396   case SAMPLE_DATA:
00397     // Must be a data message
00398     data.smHeader.flags |= FLAG_D;
00399     break;
00400   default:
00401     ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): "
00402                "Non-sample messages seen, message_id = %d\n", int(message_id)));
00403     break;
00404   }
00405 
00406   if (requires_inline_qos) {
00407     TransportSendListener::InlineQosData qos_data;
00408     dsle.get_send_listener()->retrieve_inline_qos_data(qos_data);
00409 
00410     populate_inline_qos(qos_data, data.inlineQos);
00411   }
00412 
00413   if (data.inlineQos.length() > 0) {
00414     data.smHeader.flags |= FLAG_Q;
00415   }
00416 
00417   subm.length(i + 1);
00418   subm[i].data_sm(data);
00419 }
00420 
00421 namespace {
00422   // Interoperability note:
00423   // used for discovery (SEDP) only, this helps interoperability because they
00424   // spec is unclear about the use of PID_KEY_HASH vs. the key as the payload
00425   void add_key_hash(RTPS::ParameterList& plist, const ACE_Message_Block* data)
00426   {
00427     RTPS::KeyHash_t kh;
00428     static const size_t offset = 8 /*skip encap (4) and plist hdr (4)*/;
00429     std::memcpy(kh.value, data->rd_ptr() + offset, sizeof(GUID_t));
00430     RTPS::Parameter p;
00431     p.key_hash(kh);
00432     const CORBA::ULong i = plist.length();
00433     plist.length(i + 1);
00434     plist[i] = p;
00435   }
00436 }
00437 
00438 void
00439 RtpsSampleHeader::populate_data_control_submessages(
00440   OpenDDS::RTPS::SubmessageSeq& subm,
00441   const TransportSendControlElement& tsce,
00442   bool requires_inline_qos)
00443 {
00444   using namespace OpenDDS::RTPS;
00445 
00446   const DataSampleHeader& header = tsce.header();
00447   const ACE_CDR::Octet flags = header.byte_order_;
00448   add_timestamp(subm, flags, header);
00449   CORBA::ULong i = subm.length();
00450 
00451   static const CORBA::Octet BUILT_IN_WRITER = 0xC2;
00452 
00453   DataSubmessage data = {
00454     {DATA, flags, 0},
00455     0,
00456     DATA_OCTETS_TO_IQOS,
00457     ENTITYID_UNKNOWN,
00458     header.publication_id_.entityId,
00459     {header.sequence_.getHigh(), header.sequence_.getLow()},
00460     ParameterList()
00461   };
00462   switch (header.message_id_) {
00463   case INSTANCE_REGISTRATION: {
00464     // NOTE: The RTPS spec is not entirely clear about instance registration.
00465     // We have decided to send a DATA Submessage containing the key and an
00466     // inlineQoS StatusInfo of zero.
00467     data.smHeader.flags |= FLAG_K_IN_DATA;
00468     const int qos_len = data.inlineQos.length();
00469     data.inlineQos.length(qos_len + 1);
00470     data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
00471     break;
00472   }
00473   case UNREGISTER_INSTANCE: {
00474     data.smHeader.flags |= FLAG_K_IN_DATA;
00475     const int qos_len = data.inlineQos.length();
00476     data.inlineQos.length(qos_len+1);
00477     data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
00478     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00479       add_key_hash(data.inlineQos, tsce.msg_payload());
00480     }
00481     break;
00482   }
00483   case DISPOSE_INSTANCE: {
00484     data.smHeader.flags |= FLAG_K_IN_DATA;
00485     const int qos_len = data.inlineQos.length();
00486     data.inlineQos.length(qos_len + 1);
00487     data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
00488     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00489       add_key_hash(data.inlineQos, tsce.msg_payload());
00490     }
00491     break;
00492   }
00493   case DISPOSE_UNREGISTER_INSTANCE: {
00494     data.smHeader.flags |= FLAG_K_IN_DATA;
00495     const int qos_len = data.inlineQos.length();
00496     data.inlineQos.length(qos_len + 1);
00497     data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
00498     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
00499       add_key_hash(data.inlineQos, tsce.msg_payload());
00500     }
00501     break;
00502   }
00503   // update control_message_supported() when adding new cases here
00504   default:
00505     ACE_DEBUG((LM_INFO,
00506                "RtpsSampleHeader::populate_data_control_submessages(): "
00507                "Non-sample messages seen, message_id = %d\n",
00508                header.message_id_));
00509     break;
00510   }
00511 
00512   if (requires_inline_qos) {
00513     TransportSendListener::InlineQosData qos_data;
00514     tsce.listener()->retrieve_inline_qos_data(qos_data);
00515 
00516     populate_inline_qos(qos_data, data.inlineQos);
00517   }
00518 
00519   if (data.inlineQos.length() > 0) {
00520     data.smHeader.flags |= FLAG_Q;
00521   }
00522 
00523   subm.length(i + 1);
00524   subm[i].data_sm(data);
00525 }
00526 
00527 #define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS) \
00528   if (WRITER_QOS.QOS_NAME != DEFAULT_QOS.QOS_NAME) {          \
00529     const int qos_len = plist.length();                       \
00530     plist.length(qos_len + 1);                                \
00531     plist[qos_len].QOS_NAME(WRITER_QOS.QOS_NAME);             \
00532   }
00533 
00534 void
00535 RtpsSampleHeader::populate_inline_qos(
00536   const TransportSendListener::InlineQosData& qos_data,
00537   RTPS::ParameterList& plist)
00538 {
00539   using namespace OpenDDS::RTPS;
00540 
00541   // Always include topic name (per the spec)
00542   {
00543     const int qos_len = plist.length();
00544     plist.length(qos_len + 1);
00545     plist[qos_len].string_data(qos_data.topic_name.c_str());
00546     plist[qos_len]._d(PID_TOPIC_NAME);
00547   }
00548 
00549   // Conditionally include other QoS inline when the differ from the
00550   // default value.
00551   DDS::PublisherQos default_pub_qos =
00552     TheServiceParticipant->initial_PublisherQos();
00553   PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos);
00554   PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos);
00555 
00556   DDS::DataWriterQos default_dw_qos =
00557     TheServiceParticipant->initial_DataWriterQos();
00558   PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos);
00559   PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos);
00560   PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos);
00561   PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos);
00562 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00563   PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos);
00564 #endif
00565   PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos);
00566   PROCESS_INLINE_QOS(reliability, default_dw_qos, qos_data.dw_qos);
00567   PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos);
00568   PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos);
00569   PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos);
00570 }
00571 
00572 #undef PROCESS_INLINE_QOS
00573 
00574 // simple marshaling helpers for RtpsSampleHeader::split()
00575 namespace {
00576   void write(const Message_Block_Ptr& mb, ACE_CDR::UShort s, bool swap_bytes)
00577   {
00578     const char* ps = reinterpret_cast<const char*>(&s);
00579     if (swap_bytes) {
00580       ACE_CDR::swap_2(ps, mb->wr_ptr());
00581       mb->wr_ptr(2);
00582     } else {
00583       mb->copy(ps, 2);
00584     }
00585   }
00586 
00587   void write(const Message_Block_Ptr& mb, ACE_CDR::ULong i, bool swap_bytes)
00588   {
00589     const char* pi = reinterpret_cast<const char*>(&i);
00590     if (swap_bytes) {
00591       ACE_CDR::swap_4(pi, mb->wr_ptr());
00592       mb->wr_ptr(4);
00593     } else {
00594       mb->copy(pi, 4);
00595     }
00596   }
00597 
00598   // read without advancing rd_ptr()
00599   void peek(ACE_CDR::UShort& target, const char* src, bool swap_bytes)
00600   {
00601     if (swap_bytes) {
00602       ACE_CDR::swap_2(src, reinterpret_cast<char*>(&target));
00603     } else {
00604       std::memcpy(&target, src, sizeof(ACE_CDR::UShort));
00605     }
00606   }
00607 
00608   void peek(ACE_CDR::ULong& target, const char* src, bool swap_bytes)
00609   {
00610     if (swap_bytes) {
00611       ACE_CDR::swap_4(src, reinterpret_cast<char*>(&target));
00612     } else {
00613       std::memcpy(&target, src, sizeof(ACE_CDR::ULong));
00614     }
00615   }
00616 
00617   const size_t FRAG_START_OFFSET = 24, FRAG_SAMPLE_SIZE_OFFSET = 32;
00618 }
00619 
00620 SequenceRange
00621 RtpsSampleHeader::split(const ACE_Message_Block& orig, size_t size,
00622                         Message_Block_Ptr& head, Message_Block_Ptr& tail)
00623 {
00624   using namespace RTPS;
00625   static const SequenceRange unknown_range(SequenceNumber::SEQUENCENUMBER_UNKNOWN(),
00626                                            SequenceNumber::SEQUENCENUMBER_UNKNOWN());
00627   size_t data_offset = 0;
00628   const char* rd = orig.rd_ptr();
00629   ACE_CDR::ULong starting_frag, sample_size;
00630   ACE_CDR::Octet flags;
00631   bool swap_bytes;
00632 
00633   // Find the start of the DATA | DATA_FRAG submessage in the orig msg block.
00634   // The submessages from the start of the msg block to this point (data_offset)
00635   // will be copied to both the head and tail fragments.
00636   while (true) {
00637     flags = rd[data_offset + 1];
00638     swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E);
00639     bool found_data = false;
00640 
00641     switch (rd[data_offset]) {
00642     case DATA:
00643       if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) {
00644         if (Transport_debug_level) {
00645           ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
00646             "attempting to fragment a Data submessage with no payload.\n"));
00647         }
00648         return unknown_range;
00649       }
00650       found_data = true;
00651       starting_frag = 1;
00652       sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length());
00653       break;
00654     case DATA_FRAG:
00655       found_data = true;
00656       peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
00657       peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
00658       break;
00659     }
00660 
00661     if (found_data) {
00662       break;
00663     }
00664 
00665     // Scan for next submessage in orig
00666     ACE_CDR::UShort octetsToNextHeader;
00667     peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
00668 
00669     data_offset += octetsToNextHeader + SMHDR_SZ;
00670     if (data_offset >= orig.length()) {
00671       if (Transport_debug_level) {
00672         ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
00673           "invalid octetsToNextHeader encountered while fragmenting.\n"));
00674       }
00675       return unknown_range;
00676     }
00677   }
00678 
00679   // Create the "head" message block (of size "sz") containing DATA_FRAG
00680   size_t sz = orig.length();
00681   ACE_CDR::Octet new_flags = flags;
00682   size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS;
00683   if (rd[data_offset] == DATA) {
00684     sz += 12; // DATA_FRAG is 12 bytes larger than DATA
00685     iqos_offset -= 12;
00686     new_flags &= ~(FLAG_K_IN_DATA | FLAG_K_IN_FRAG);
00687     if (flags & FLAG_K_IN_DATA) {
00688       new_flags |= FLAG_K_IN_FRAG;
00689     }
00690   }
00691   head.reset(DataSampleHeader::alloc_msgblock(orig, sz, false));
00692 
00693   head->copy(rd, data_offset);
00694 
00695   head->wr_ptr()[0] = DATA_FRAG;
00696   head->wr_ptr()[1] = new_flags;
00697   head->wr_ptr(2);
00698 
00699   std::memset(head->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
00700   head->wr_ptr(4);
00701 
00702   write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
00703 
00704   head->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
00705 
00706   write(head, starting_frag, swap_bytes);
00707   const size_t max_data = size - sz, orig_payload = orig.cont()->total_length();
00708   const ACE_CDR::UShort frags =
00709     static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
00710   write(head, frags, swap_bytes);
00711   write(head, FRAG_SIZE, swap_bytes);
00712   write(head, sample_size, swap_bytes);
00713 
00714   if (flags & FLAG_Q) {
00715     head->copy(rd + iqos_offset, orig.length() - iqos_offset);
00716   }
00717 
00718   // Create the "tail" message block containing DATA_FRAG with Q=0
00719   tail.reset(DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false));
00720 
00721   tail->copy(rd, data_offset);
00722 
00723   tail->wr_ptr()[0] = DATA_FRAG;
00724   tail->wr_ptr()[1] = new_flags & ~FLAG_Q;
00725   tail->wr_ptr(2);
00726 
00727   std::memset(tail->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
00728   tail->wr_ptr(4);
00729 
00730   write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
00731   tail->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
00732 
00733   write(tail, starting_frag + frags, swap_bytes);
00734   const size_t tail_data = orig_payload - frags * FRAG_SIZE;
00735   const ACE_CDR::UShort tail_frags =
00736     static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE);
00737   write(tail, tail_frags, swap_bytes);
00738   write(tail, FRAG_SIZE, swap_bytes);
00739   write(tail, sample_size, swap_bytes);
00740 
00741   Message_Block_Ptr payload_head;
00742   Message_Block_Ptr payload_tail;
00743   DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE,
00744                                   payload_head, payload_tail);
00745   head->cont(payload_head.release());
00746   tail->cont(payload_tail.release());
00747 
00748   return SequenceRange(starting_frag + frags - 1,
00749                        starting_frag + frags + tail_frags - 1);
00750 }
00751 
00752 }
00753 }
00754 
00755 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1