LCOV - code coverage report
Current view: top level - DCPS/transport/rtps_udp - RtpsSampleHeader.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 117 371 31.5 %
Date: 2023-04-30 01:32:43 Functions: 6 15 40.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "RtpsSampleHeader.h"
       9             : 
      10             : #include "RtpsUdpSendStrategy.h"
      11             : 
      12             : #include <dds/DCPS/SequenceNumber.h>
      13             : #include <dds/DCPS/Serializer.h>
      14             : #include <dds/DCPS/DataSampleElement.h>
      15             : #include <dds/DCPS/Marked_Default_Qos.h>
      16             : #include <dds/DCPS/Qos_Helper.h>
      17             : #include <dds/DCPS/Service_Participant.h>
      18             : #include <dds/DCPS/DisjointSequence.h>
      19             : #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h>
      20             : #include <dds/DCPS/RTPS/MessageTypes.h>
      21             : #include <dds/DCPS/RTPS/MessageUtils.h>
      22             : #include <dds/DCPS/transport/framework/ReceivedDataSample.h>
      23             : #include <dds/DCPS/transport/framework/TransportSendListener.h>
      24             : 
      25             : #include <cstring>
      26             : 
      27             : #ifndef __ACE_INLINE__
      28             : #  include "RtpsSampleHeader.inl"
      29             : #endif
      30             : 
      31             : namespace {
      32             :   const OpenDDS::RTPS::StatusInfo_t STATUS_INFO_REGISTER = { { 0, 0, 0, 0 } },
      33             :     STATUS_INFO_DISPOSE = { { 0, 0, 0, 1 } },
      34             :     STATUS_INFO_UNREGISTER = { { 0, 0, 0, 2 } },
      35             :     STATUS_INFO_DISPOSE_UNREGISTER = { { 0, 0, 0, 3 } };
      36             : }
      37             : 
      38             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      39             : 
      40             : namespace OpenDDS {
      41             : namespace RTPS {
      42             : 
      43             : #ifndef OPENDDS_SAFETY_PROFILE
      44             : inline bool
      45           0 : operator==(const StatusInfo_t& lhs, const StatusInfo_t& rhs)
      46             : {
      47             :   return
      48           0 :     (lhs.value[3] == rhs.value[3]) &&
      49           0 :     (lhs.value[2] == rhs.value[2]) &&
      50           0 :     (lhs.value[1] == rhs.value[1]) &&
      51           0 :     (lhs.value[0] == rhs.value[0]);
      52             : }
      53             : #endif
      54             : 
      55             : }
      56             : namespace DCPS {
      57             : 
      58             : void
      59           8 : RtpsSampleHeader::init(ACE_Message_Block& mb)
      60             : {
      61             :   using namespace OpenDDS::RTPS;
      62             : 
      63             :   // valid_ is false here, it will only be set to true if there is a Submessage
      64             : 
      65             :   // Manually grab the first two bytes for the SubmessageKind and the byte order
      66           8 :   if (mb.length() == 0) {
      67           0 :     return;
      68             :   }
      69             : 
      70           8 :   const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr());
      71             : 
      72           8 :   ACE_CDR::Octet flags = 0;
      73             : 
      74           8 :   if (mb.length() > 1) {
      75           8 :     flags = mb.rd_ptr()[1];
      76           0 :   } else if (mb.cont() && mb.cont()->length() > 0) {
      77           0 :     flags = mb.cont()->rd_ptr()[0];
      78             :   } else {
      79           0 :     return;
      80             :   }
      81             : 
      82           8 :   const size_t starting_length = mb.total_length();
      83             :   Serializer ser(&mb, Encoding::KIND_XCDR1,
      84           8 :     (flags & FLAG_E) ? ENDIAN_LITTLE : ENDIAN_BIG);
      85             : 
      86           8 :   ACE_CDR::UShort octetsToNextHeader = 0;
      87             : 
      88             : #define CASE_SMKIND(kind, class, name) case kind: {               \
      89             :     class submessage;                                             \
      90             :     if (ser >> submessage) {                                      \
      91             :       octetsToNextHeader = submessage.smHeader.submessageLength;  \
      92             :       submessage_.name##_sm(submessage);                          \
      93             :       valid_ = true;                                              \
      94             :     }                                                             \
      95             :     break;                                                        \
      96             :   }
      97             : 
      98           8 :   switch (kind) {
      99           0 :   CASE_SMKIND(PAD, PadSubmessage, pad)
     100           0 :   CASE_SMKIND(ACKNACK, AckNackSubmessage, acknack)
     101           0 :   CASE_SMKIND(HEARTBEAT, HeartBeatSubmessage, heartbeat)
     102           0 :   CASE_SMKIND(GAP, GapSubmessage, gap)
     103           4 :   CASE_SMKIND(INFO_TS, InfoTimestampSubmessage, info_ts)
     104           0 :   CASE_SMKIND(INFO_SRC, InfoSourceSubmessage, info_src)
     105           0 :   CASE_SMKIND(INFO_REPLY_IP4, InfoReplyIp4Submessage, info_reply_ipv4)
     106           0 :   CASE_SMKIND(INFO_DST, InfoDestinationSubmessage, info_dst)
     107           0 :   CASE_SMKIND(INFO_REPLY, InfoReplySubmessage, info_reply)
     108           0 :   CASE_SMKIND(NACK_FRAG, NackFragSubmessage, nack_frag)
     109           0 :   CASE_SMKIND(HEARTBEAT_FRAG, HeartBeatFragSubmessage, hb_frag)
     110           0 :   CASE_SMKIND(DATA, DataSubmessage, data)
     111           4 :   CASE_SMKIND(DATA_FRAG, DataFragSubmessage, data_frag)
     112             : 
     113             : #if defined(OPENDDS_SECURITY)
     114             :     // Each submessage type introduced by the Security spec is treated
     115             :     // as an opaque octet sequence at this layer.
     116           0 :     case SEC_BODY:
     117             :     case SEC_PREFIX:
     118             :     case SEC_POSTFIX:
     119             :     case SRTPS_PREFIX:
     120             :     case SRTPS_POSTFIX: {
     121           0 :     SecuritySubmessage submessage;
     122           0 :     if (ser >> submessage) {
     123           0 :       octetsToNextHeader = submessage.smHeader.submessageLength;
     124           0 :       submessage_.security_sm(submessage);
     125           0 :       submessage_._d(kind);
     126           0 :       valid_ = true;
     127             :     }
     128           0 :     break;
     129           0 :     }
     130             : #endif
     131             : 
     132           0 :   default:
     133             :     {
     134             :       SubmessageHeader submessage;
     135           0 :       if (ser >> submessage) {
     136           0 :         octetsToNextHeader = submessage.submessageLength;
     137           0 :         submessage_.unknown_sm(submessage);
     138           0 :         valid_ = true;
     139             :       }
     140           0 :       break;
     141             :     }
     142             :   }
     143             : #undef CASE_SMKIND
     144             : 
     145           8 :   if (valid_) {
     146             : 
     147           8 :     frag_ = (kind == DATA_FRAG);
     148           8 :     data_ = (kind == DATA);
     149             : 
     150             :     // serialized_size_ is # of bytes of submessage we have read from "mb"
     151           8 :     serialized_size_ = starting_length - mb.total_length();
     152             : 
     153           8 :     const ACE_CDR::UShort remaining = static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ);
     154             : 
     155           8 :     if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) {
     156             :       // see RTPS v2.1 section 9.4.5.1.3
     157             :       // In this case the current Submessage extends to the end of Message,
     158             :       // so we will use the message_length_ that was set in pdu_remaining().
     159           4 :       octetsToNextHeader = remaining;
     160             : 
     161           4 :     } else if (octetsToNextHeader > remaining) {
     162           0 :       valid_ = false;
     163           0 :       return;
     164             :     }
     165             : 
     166           8 :     if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA)))
     167           8 :         || kind == DATA_FRAG) {
     168             :       // These Submessages have a payload which we haven't deserialized yet.
     169             :       // The TransportReceiveStrategy will know this via message_length().
     170             :       // octetsToNextHeader does not count the SubmessageHeader (4 bytes)
     171           4 :       message_length_ = octetsToNextHeader + SMHDR_SZ - serialized_size_;
     172             :     } else {
     173             :       // These Submessages _could_ have extra data that we don't know about
     174             :       // (from a newer minor version of the RTPS spec).  Either way, indicate
     175             :       // to the TransportReceiveStrategy that there is no data payload here.
     176           4 :       message_length_ = 0;
     177           4 :       ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(serialized_size_);
     178           4 :       if (octetsToNextHeader + SMHDR_SZ > marshaled) {
     179           0 :         valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled);
     180           0 :         serialized_size_ = octetsToNextHeader + SMHDR_SZ;
     181             :       }
     182             :     }
     183             :   }
     184           8 : }
     185             : 
     186             : void
     187           0 : RtpsSampleHeader::process_iqos(DataSampleHeader& opendds,
     188             :                                const RTPS::ParameterList& iqos)
     189             : {
     190             :   using namespace OpenDDS::RTPS;
     191             : #if defined(OPENDDS_TEST_INLINE_QOS)
     192             :   OPENDDS_STRING output("into_received_data_sample(): ");
     193             :   output += to_dds_string(iqos.length());
     194             :   output += " inline QoS parameters\n";
     195             :   for (CORBA::ULong index = 0; index < iqos.length(); ++index) {
     196             :     output += "  parameter type = ";
     197             :     output += to_dds_string(iqos[index]._d());
     198             :     output += "\n";
     199             :   }
     200             :   ACE_DEBUG((LM_DEBUG, "%C", output.c_str()));
     201             : #endif
     202           0 :   for (CORBA::ULong i = 0; i < iqos.length(); ++i) {
     203           0 :     if (iqos[i]._d() == PID_STATUS_INFO) {
     204           0 :       if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
     205           0 :         opendds.message_id_ = DISPOSE_INSTANCE;
     206           0 :       } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
     207           0 :         opendds.message_id_ = UNREGISTER_INSTANCE;
     208           0 :       } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
     209           0 :         opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE;
     210           0 :       } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
     211           0 :         opendds.message_id_ = INSTANCE_REGISTRATION;
     212             :       }
     213           0 :     } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) {
     214           0 :       opendds.historic_sample_ = true;
     215             : #if defined(OPENDDS_TEST_INLINE_QOS)
     216             :     } else if (iqos[i]._d() == PID_TOPIC_NAME) {
     217             :       ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data()));
     218             :     } else if (iqos[i]._d() == PID_PRESENTATION) {
     219             :       DDS::PresentationQosPolicy pres_qos = iqos[i].presentation();
     220             :       ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, "
     221             :                  "coherent_access = %d, ordered_access = %d\n",
     222             :                  pres_qos.access_scope, pres_qos.coherent_access,
     223             :                  pres_qos.ordered_access));
     224             :     } else if (iqos[i]._d() == PID_PARTITION) {
     225             :       DDS::PartitionQosPolicy part_qos = iqos[i].partition();
     226             :       ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length()));
     227             :       for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) {
     228             :         ACE_DEBUG((LM_DEBUG, "'%C'  ", part_qos.name[j].in()));
     229             :       }
     230             :       ACE_DEBUG((LM_DEBUG, "\n"));
     231             : #endif
     232             :     }
     233             :   }
     234           0 : }
     235             : 
     236             : bool
     237           0 : RtpsSampleHeader::into_received_data_sample(ReceivedDataSample& rds)
     238             : {
     239             :   using namespace OpenDDS::RTPS;
     240           0 :   DataSampleHeader& opendds = rds.header_;
     241             : 
     242           0 :   switch (submessage_._d()) {
     243           0 :   case DATA: {
     244           0 :     const DataSubmessage& rtps = submessage_.data_sm();
     245           0 :     opendds.cdr_encapsulation_ = true;
     246           0 :     opendds.message_length_ = message_length();
     247           0 :     opendds.sequence_ = to_opendds_seqnum(rtps.writerSN);
     248           0 :     opendds.publication_id_.entityId = rtps.writerId;
     249           0 :     opendds.message_id_ = SAMPLE_DATA;
     250             : 
     251           0 :     process_iqos(opendds, rtps.inlineQos);
     252             : 
     253           0 :     if (rtps.smHeader.flags & FLAG_K_IN_DATA) {
     254           0 :       opendds.key_fields_only_ = true;
     255           0 :     } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) {
     256             :       // Interoperability note: the Key may be hiding in the "key hash" param
     257             :       // in the InlineQos.  In order to make use of this Key, it mst be 16
     258             :       // bytes or less.  We have observed other DDS implementations only send
     259             :       // the MD5 hash of a >16 byte key, so we must limit this to Built-in
     260             :       // endpoints which are assumed to use GUIDs as keys.
     261           0 :       if ((rtps.writerId.entityKind & 0xC0) == 0xC0 // Only Built-in endpoints
     262           0 :           && (rtps.smHeader.flags & FLAG_Q) && !rds.has_data()) {
     263           0 :         for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) {
     264           0 :           if (rtps.inlineQos[i]._d() == PID_KEY_HASH) {
     265             :             // CDR_BE encapsulation scheme (endianness is not used for key hash)
     266           0 :             rds.replace("\x00\x00\x00\x00", EncapsulationHeader::serialized_size);
     267           0 :             const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value;
     268           0 :             rds.append(reinterpret_cast<const char*>(data), sizeof(DDS::OctetArray16));
     269           0 :             opendds.message_length_ = EncapsulationHeader::serialized_size + sizeof(DDS::OctetArray16);
     270           0 :             opendds.key_fields_only_ = true;
     271           0 :             if (Transport_debug_level) {
     272           0 :               ACE_DEBUG((LM_DEBUG,
     273             :                          "(%P|%t) RtpsSampleHeader::into_received_data_sample()"
     274             :                          " - used KeyHash data as the key-only payload\n"));
     275             :             }
     276           0 :             break;
     277             :           }
     278             :         }
     279             :       } else {
     280             :         // FUTURE: Handle the case of D = 0 and K = 0
     281             :         // used for Coherent Sets in PRESENTATION QoS (see 8.7.5)
     282           0 :         if (Transport_debug_level) {
     283           0 :           ACE_DEBUG((LM_WARNING,
     284             :                      "(%P|%t) RtpsSampleHeader::into_received_data_sample() - "
     285             :                      "Received a DATA Submessage with D = 0 and K = 0, "
     286             :                      "dropping\n"));
     287             :         }
     288           0 :         return false;
     289             :       }
     290             :     }
     291             : 
     292           0 :     if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
     293             :       // TODO(iguessthislldo: Convert to use Encoding
     294             :       // Peek at the byte order from the encapsulation containing the payload.
     295           0 :       opendds.byte_order_ = payload_byte_order(rds);
     296             :     }
     297             : 
     298           0 :     break;
     299             :   }
     300           0 :   case DATA_FRAG: {
     301           0 :     const DataFragSubmessage& rtps = submessage_.data_frag_sm();
     302           0 :     opendds.cdr_encapsulation_ = true;
     303           0 :     opendds.message_length_ = message_length();
     304           0 :     opendds.sequence_ = to_opendds_seqnum(rtps.writerSN);
     305           0 :     opendds.publication_id_.entityId = rtps.writerId;
     306           0 :     opendds.message_id_ = SAMPLE_DATA;
     307           0 :     opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG);
     308             :     // opendds.byte_order_ set in RtpsUdpReceiveStrategy::reassemble().
     309             : 
     310           0 :     process_iqos(opendds, rtps.inlineQos);
     311             : 
     312           0 :     const CORBA::ULong lastFragInSubmsg =
     313           0 :       rtps.fragmentStartingNum.value - 1 + rtps.fragmentsInSubmessage;
     314           0 :     if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) {
     315           0 :       opendds.more_fragments_ = true;
     316             :     }
     317           0 :     break;
     318             :   }
     319           0 :   default:
     320           0 :     break;
     321             :   }
     322             : 
     323           0 :   return true;
     324             : }
     325             : 
     326           0 : bool RtpsSampleHeader::payload_byte_order(const ReceivedDataSample& rds)
     327             : {
     328           0 :   return rds.peek(1) & RTPS::FLAG_E;
     329             : }
     330             : 
     331             : namespace {
     332           0 :   void add_timestamp(RTPS::SubmessageSeq& subm, ACE_CDR::Octet flags,
     333             :     const DataSampleHeader& header)
     334             :   {
     335             :     using namespace OpenDDS::RTPS;
     336           0 :     const DDS::Time_t st = {header.source_timestamp_sec_,
     337           0 :                             header.source_timestamp_nanosec_};
     338           0 :     const InfoTimestampSubmessage ts = {
     339             :       {INFO_TS, flags, INFO_TS_SZ},
     340           0 :       {static_cast<ACE_UINT32>(st.sec), DCPS::nanoseconds_to_uint32_fractional_seconds(st.nanosec)}
     341           0 :     };
     342           0 :     const CORBA::ULong i = DCPS::grow(subm) - 1;
     343           0 :     subm[i].info_ts_sm(ts);
     344           0 :   }
     345             : }
     346             : 
     347             : void
     348           0 : RtpsSampleHeader::populate_data_sample_submessages(
     349             :   RTPS::SubmessageSeq& subm,
     350             :   const DataSampleElement& dsle,
     351             :   bool requires_inline_qos)
     352             : {
     353             :   using namespace OpenDDS::RTPS;
     354             : 
     355           0 :   const ACE_CDR::Octet flags = dsle.get_header().byte_order_;
     356           0 :   add_timestamp(subm, flags, dsle.get_header());
     357           0 :   CORBA::ULong i = subm.length();
     358             : 
     359           0 :   EntityId_t readerId = ENTITYID_UNKNOWN;
     360           0 :   if (dsle.get_num_subs() == 1) {
     361           0 :     readerId = dsle.get_sub_id(0).entityId;
     362             :     InfoDestinationSubmessage idest;
     363           0 :     idest.smHeader.submessageId = INFO_DST;
     364           0 :     idest.smHeader.flags = flags;
     365           0 :     idest.smHeader.submessageLength = INFO_DST_SZ;
     366           0 :     std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix,
     367             :                 sizeof(GuidPrefix_t));
     368           0 :     i = DCPS::grow(subm);
     369           0 :     subm[i - 1].info_dst_sm(idest);
     370             :   } else {
     371             :     //Not durability resend, but could have inline gaps
     372           0 :     for (CORBA::ULong x = 0; x < i; ++x) {
     373           0 :       if (subm[x]._d() == INFO_DST) {
     374             :         //Need to add INFO_DST
     375             :         InfoDestinationSubmessage idest;
     376           0 :         idest.smHeader.submessageId = INFO_DST;
     377           0 :         idest.smHeader.flags = flags;
     378           0 :         idest.smHeader.submessageLength = INFO_DST_SZ;
     379           0 :         std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN,
     380             :                     sizeof(GuidPrefix_t));
     381           0 :         i = DCPS::grow(subm);
     382           0 :         subm[i - 1].info_dst_sm(idest);
     383           0 :         break;
     384             :       }
     385             :     }
     386             :   }
     387             : 
     388           0 :   DataSubmessage data = {
     389             :     {DATA, flags, 0},
     390             :     0,
     391             :     DATA_OCTETS_TO_IQOS,
     392             :     readerId,
     393           0 :     dsle.get_pub_id().entityId,
     394           0 :     RTPS::to_rtps_seqnum(dsle.get_header().sequence_),
     395             :     ParameterList()
     396           0 :   };
     397           0 :   const char message_id = dsle.get_header().message_id_;
     398           0 :   switch (message_id) {
     399           0 :   case SAMPLE_DATA:
     400             :     // Must be a data message
     401           0 :     data.smHeader.flags |= FLAG_D;
     402           0 :     break;
     403           0 :   default:
     404           0 :     ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): "
     405             :                "Non-sample messages seen, message_id = %d\n", int(message_id)));
     406           0 :     break;
     407             :   }
     408             : 
     409           0 :   if (requires_inline_qos) {
     410           0 :     TransportSendListener::InlineQosData qos_data;
     411           0 :     dsle.get_send_listener()->retrieve_inline_qos_data(qos_data);
     412             : 
     413           0 :     populate_inline_qos(qos_data, data.inlineQos);
     414           0 :   }
     415             : 
     416           0 :   if (data.inlineQos.length() > 0) {
     417           0 :     data.smHeader.flags |= FLAG_Q;
     418             :   }
     419             : 
     420           0 :   i = DCPS::grow(subm);
     421           0 :   subm[i - 1].data_sm(data);
     422           0 : }
     423             : 
     424             : namespace {
     425             :   // Interoperability note:
     426             :   // used for discovery (SEDP) only, this helps interoperability because they
     427             :   // spec is unclear about the use of PID_KEY_HASH vs. the key as the payload
     428           0 :   void add_key_hash(RTPS::ParameterList& plist, const ACE_Message_Block* data)
     429             :   {
     430             :     RTPS::KeyHash_t kh;
     431             :     static const size_t offset = 8 /*skip encap (4) and plist hdr (4)*/;
     432           0 :     std::memcpy(kh.value, data->rd_ptr() + offset, sizeof(GUID_t));
     433           0 :     RTPS::Parameter p;
     434           0 :     p.key_hash(kh);
     435           0 :     DCPS::push_back(plist, p);
     436           0 :   }
     437             : }
     438             : 
     439             : void
     440           0 : RtpsSampleHeader::populate_data_control_submessages(
     441             :   OpenDDS::RTPS::SubmessageSeq& subm,
     442             :   const TransportSendControlElement& tsce,
     443             :   bool requires_inline_qos)
     444             : {
     445             :   using namespace OpenDDS::RTPS;
     446             : 
     447           0 :   const DataSampleHeader& header = tsce.header();
     448           0 :   const ACE_CDR::Octet flags = header.byte_order_;
     449           0 :   add_timestamp(subm, flags, header);
     450             : 
     451             :   static const CORBA::Octet BUILT_IN_WRITER = 0xC2;
     452             : 
     453           0 :   DataSubmessage data = {
     454             :     {DATA, flags, 0},
     455             :     0,
     456             :     DATA_OCTETS_TO_IQOS,
     457             :     ENTITYID_UNKNOWN,
     458             :     header.publication_id_.entityId,
     459           0 :     RTPS::to_rtps_seqnum(header.sequence_),
     460             :     ParameterList()
     461           0 :   };
     462           0 :   switch (header.message_id_) {
     463           0 :   case INSTANCE_REGISTRATION: {
     464             :     // NOTE: The RTPS spec is not entirely clear about instance registration.
     465             :     // We have decided to send a DATA Submessage containing the key and an
     466             :     // inlineQoS StatusInfo of zero.
     467           0 :     data.smHeader.flags |= FLAG_K_IN_DATA;
     468           0 :     const int qos_len = DCPS::grow(data.inlineQos) - 1;
     469           0 :     data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
     470           0 :     break;
     471             :   }
     472           0 :   case UNREGISTER_INSTANCE: {
     473           0 :     data.smHeader.flags |= FLAG_K_IN_DATA;
     474           0 :     const int qos_len = data.inlineQos.length();
     475           0 :     data.inlineQos.length(qos_len+1);
     476           0 :     data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
     477           0 :     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
     478           0 :       add_key_hash(data.inlineQos, tsce.msg_payload());
     479             :     }
     480           0 :     break;
     481             :   }
     482           0 :   case DISPOSE_INSTANCE: {
     483           0 :     data.smHeader.flags |= FLAG_K_IN_DATA;
     484           0 :     const int qos_len = DCPS::grow(data.inlineQos) - 1;
     485           0 :     data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
     486           0 :     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
     487           0 :       add_key_hash(data.inlineQos, tsce.msg_payload());
     488             :     }
     489           0 :     break;
     490             :   }
     491           0 :   case DISPOSE_UNREGISTER_INSTANCE: {
     492           0 :     data.smHeader.flags |= FLAG_K_IN_DATA;
     493           0 :     const int qos_len = DCPS::grow(data.inlineQos) - 1;
     494           0 :     data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
     495           0 :     if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) {
     496           0 :       add_key_hash(data.inlineQos, tsce.msg_payload());
     497             :     }
     498           0 :     break;
     499             :   }
     500             :   // update control_message_supported() when adding new cases here
     501           0 :   default:
     502           0 :     ACE_DEBUG((LM_INFO,
     503             :                "RtpsSampleHeader::populate_data_control_submessages(): "
     504             :                "Non-sample messages seen, message_id = %d\n",
     505             :                header.message_id_));
     506           0 :     break;
     507             :   }
     508             : 
     509           0 :   if (requires_inline_qos) {
     510           0 :     TransportSendListener::InlineQosData qos_data;
     511           0 :     tsce.listener()->retrieve_inline_qos_data(qos_data);
     512             : 
     513           0 :     populate_inline_qos(qos_data, data.inlineQos);
     514           0 :   }
     515             : 
     516           0 :   if (data.inlineQos.length() > 0) {
     517           0 :     data.smHeader.flags |= FLAG_Q;
     518             :   }
     519             : 
     520           0 :   CORBA::ULong idx = DCPS::grow(subm) - 1;
     521           0 :   subm[idx].data_sm(data);
     522           0 : }
     523             : 
     524             : #define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS) \
     525             :   if (WRITER_QOS.QOS_NAME != DEFAULT_QOS.QOS_NAME) {          \
     526             :     const int idx = DCPS::grow(plist) - 1;                    \
     527             :     plist[idx].QOS_NAME(WRITER_QOS.QOS_NAME);                 \
     528             :   }
     529             : 
     530             : void
     531           0 : RtpsSampleHeader::populate_inline_qos(
     532             :   const TransportSendListener::InlineQosData& qos_data,
     533             :   RTPS::ParameterList& plist)
     534             : {
     535             :   using namespace OpenDDS::RTPS;
     536             : 
     537             :   // Always include topic name (per the spec)
     538             :   {
     539           0 :     const int idx = DCPS::grow(plist) - 1;
     540           0 :     plist[idx].string_data(qos_data.topic_name.c_str());
     541           0 :     plist[idx]._d(PID_TOPIC_NAME);
     542             :   }
     543             : 
     544             :   // Conditionally include other QoS inline when the differ from the
     545             :   // default value.
     546             :   DDS::PublisherQos default_pub_qos =
     547           0 :     TheServiceParticipant->initial_PublisherQos();
     548           0 :   PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos);
     549           0 :   PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos);
     550             : 
     551             :   DDS::DataWriterQos default_dw_qos =
     552           0 :     TheServiceParticipant->initial_DataWriterQos();
     553           0 :   PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos);
     554           0 :   PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos);
     555           0 :   PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos);
     556           0 :   PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos);
     557             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     558           0 :   PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos);
     559             : #endif
     560           0 :   PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos);
     561           0 :   if (qos_data.dw_qos.reliability != default_dw_qos.reliability) {
     562           0 :     const int idx = DCPS::grow(plist) - 1;
     563             : 
     564             :     ReliabilityQosPolicyRtps reliability;
     565           0 :     reliability.max_blocking_time = qos_data.dw_qos.reliability.max_blocking_time;
     566             : 
     567           0 :     if (qos_data.dw_qos.reliability.kind == DDS::BEST_EFFORT_RELIABILITY_QOS) {
     568           0 :       reliability.kind.value = RTPS::BEST_EFFORT;
     569             :     } else { // default to RELIABLE for writers
     570           0 :       reliability.kind.value = RTPS::RELIABLE;
     571             :     }
     572             : 
     573           0 :     plist[idx].reliability(reliability);
     574             :   }
     575           0 :   PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos);
     576           0 :   PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos);
     577           0 :   PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos);
     578           0 : }
     579             : 
     580             : #undef PROCESS_INLINE_QOS
     581             : 
     582             : // simple marshaling helpers for RtpsSampleHeader::split()
     583             : namespace {
     584          12 :   void write(const Message_Block_Ptr& mb, ACE_CDR::UShort s, bool swap_bytes)
     585             :   {
     586          12 :     const char* ps = reinterpret_cast<const char*>(&s);
     587          12 :     if (swap_bytes) {
     588           0 :       ACE_CDR::swap_2(ps, mb->wr_ptr());
     589           0 :       mb->wr_ptr(2);
     590             :     } else {
     591          12 :       mb->copy(ps, 2);
     592             :     }
     593          12 :   }
     594             : 
     595           8 :   void write(const Message_Block_Ptr& mb, ACE_CDR::ULong i, bool swap_bytes)
     596             :   {
     597           8 :     const char* pi = reinterpret_cast<const char*>(&i);
     598           8 :     if (swap_bytes) {
     599           0 :       ACE_CDR::swap_4(pi, mb->wr_ptr());
     600           0 :       mb->wr_ptr(4);
     601             :     } else {
     602           8 :       mb->copy(pi, 4);
     603             :     }
     604           8 :   }
     605             : 
     606             :   // read without advancing rd_ptr()
     607           2 :   void peek(ACE_CDR::UShort& target, const char* src, bool swap_bytes)
     608             :   {
     609           2 :     if (swap_bytes) {
     610           0 :       ACE_CDR::swap_2(src, reinterpret_cast<char*>(&target));
     611             :     } else {
     612           2 :       std::memcpy(&target, src, sizeof(ACE_CDR::UShort));
     613             :     }
     614           2 :   }
     615             : 
     616           2 :   void peek(ACE_CDR::ULong& target, const char* src, bool swap_bytes)
     617             :   {
     618           2 :     if (swap_bytes) {
     619           0 :       ACE_CDR::swap_4(src, reinterpret_cast<char*>(&target));
     620             :     } else {
     621           2 :       std::memcpy(&target, src, sizeof(ACE_CDR::ULong));
     622             :     }
     623           2 :   }
     624             : 
     625             :   const size_t FRAG_START_OFFSET = 24, FRAG_SAMPLE_SIZE_OFFSET = 32;
     626             : }
     627             : 
     628             : SequenceRange
     629           2 : RtpsSampleHeader::split(const ACE_Message_Block& orig, size_t size,
     630             :                         Message_Block_Ptr& head, Message_Block_Ptr& tail)
     631             : {
     632             :   using namespace RTPS;
     633           2 :   size_t data_offset = 0;
     634           2 :   const char* rd = orig.rd_ptr();
     635             :   ACE_CDR::ULong starting_frag, sample_size;
     636             :   ACE_CDR::Octet flags;
     637             :   bool swap_bytes;
     638             : 
     639             :   // Find the start of the DATA | DATA_FRAG submessage in the orig msg block.
     640             :   // The submessages from the start of the msg block to this point (data_offset)
     641             :   // will be copied to both the head and tail fragments.
     642             :   while (true) {
     643           4 :     flags = rd[data_offset + 1];
     644           4 :     swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E);
     645           4 :     bool found_data = false;
     646             : 
     647           4 :     switch (rd[data_offset]) {
     648           1 :     case DATA:
     649           1 :       if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) {
     650           0 :         if (Transport_debug_level) {
     651           0 :           ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
     652             :             "attempting to fragment a Data submessage with no payload.\n"));
     653             :         }
     654           0 :         return unknown_sequence_range;
     655             :       }
     656           1 :       found_data = true;
     657           1 :       starting_frag = 1;
     658           1 :       sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length());
     659           1 :       break;
     660           1 :     case DATA_FRAG:
     661           1 :       found_data = true;
     662           1 :       peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
     663           1 :       peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
     664           1 :       break;
     665             :     }
     666             : 
     667           4 :     if (found_data) {
     668           2 :       break;
     669             :     }
     670             : 
     671             :     // Scan for next submessage in orig
     672             :     ACE_CDR::UShort octetsToNextHeader;
     673           2 :     peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
     674             : 
     675           2 :     data_offset += octetsToNextHeader + SMHDR_SZ;
     676           2 :     if (data_offset >= orig.length()) {
     677           0 :       if (Transport_debug_level) {
     678           0 :         ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - "
     679             :           "invalid octetsToNextHeader encountered while fragmenting.\n"));
     680             :       }
     681           0 :       return unknown_sequence_range;
     682             :     }
     683           2 :   }
     684             : 
     685             :   // Create the "head" message block (of size "sz") containing DATA_FRAG
     686           2 :   size_t sz = orig.length();
     687           2 :   ACE_CDR::Octet new_flags = flags;
     688           2 :   size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS;
     689           2 :   if (rd[data_offset] == DATA) {
     690           1 :     sz += 12; // DATA_FRAG is 12 bytes larger than DATA
     691           1 :     iqos_offset -= 12;
     692           1 :     new_flags = flags & (FLAG_E | FLAG_Q);
     693           1 :     if (flags & FLAG_K_IN_DATA) {
     694           0 :       new_flags |= FLAG_K_IN_FRAG;
     695             :     }
     696           1 :     if (flags & FLAG_N_IN_DATA) {
     697           0 :       new_flags |= FLAG_N_IN_FRAG;
     698             :     }
     699             : 
     700             :   }
     701           2 :   head.reset(DataSampleHeader::alloc_msgblock(orig, sz, false));
     702             : 
     703           2 :   head->copy(rd, data_offset);
     704             : 
     705           2 :   head->wr_ptr()[0] = DATA_FRAG;
     706           2 :   head->wr_ptr()[1] = new_flags;
     707           2 :   head->wr_ptr(2);
     708             : 
     709           2 :   std::memset(head->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
     710           2 :   head->wr_ptr(4);
     711             : 
     712           2 :   write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
     713             : 
     714           2 :   head->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
     715             : 
     716           2 :   write(head, starting_frag, swap_bytes);
     717           2 :   const size_t max_data = size - sz, orig_payload = orig.cont()->total_length();
     718             :   const ACE_CDR::UShort frags =
     719           2 :     static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
     720           2 :   if (frags == 0) {
     721           0 :     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsSampleHeader::split: "
     722             :       "Number of Fragments is Zero: min(%B, %B) / FRAG_SIZE\n",
     723             :       max_data, orig_payload));
     724           0 :     return unknown_sequence_range;
     725             :   }
     726           2 :   write(head, frags, swap_bytes);
     727           2 :   write(head, FRAG_SIZE, swap_bytes);
     728           2 :   write(head, sample_size, swap_bytes);
     729             : 
     730           2 :   if (flags & FLAG_Q) {
     731           1 :     head->copy(rd + iqos_offset, orig.length() - iqos_offset);
     732             :   }
     733             : 
     734             :   // Create the "tail" message block containing DATA_FRAG with Q=0
     735           2 :   tail.reset(DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false));
     736             : 
     737           2 :   tail->copy(rd, data_offset);
     738             : 
     739           2 :   tail->wr_ptr()[0] = DATA_FRAG;
     740           2 :   tail->wr_ptr()[1] = new_flags & ~FLAG_Q;
     741           2 :   tail->wr_ptr(2);
     742             : 
     743           2 :   std::memset(tail->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags
     744           2 :   tail->wr_ptr(4);
     745             : 
     746           2 :   write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
     747           2 :   tail->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum
     748             : 
     749           2 :   write(tail, starting_frag + frags, swap_bytes);
     750           2 :   const size_t tail_data = orig_payload - frags * FRAG_SIZE;
     751           2 :   const ACE_CDR::UShort tail_frags =
     752           2 :     static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE);
     753           2 :   write(tail, tail_frags, swap_bytes);
     754           2 :   write(tail, FRAG_SIZE, swap_bytes);
     755           2 :   write(tail, sample_size, swap_bytes);
     756             : 
     757           2 :   Message_Block_Ptr payload_head;
     758           0 :   Message_Block_Ptr payload_tail;
     759           2 :   DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE,
     760             :                                   payload_head, payload_tail);
     761           2 :   head->cont(payload_head.release());
     762           2 :   tail->cont(payload_tail.release());
     763             : 
     764           0 :   return SequenceRange(starting_frag + frags - 1,
     765           2 :                        starting_frag + frags + tail_frags - 1);
     766             : }
     767             : 
     768             : }
     769             : }
     770             : 
     771             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16