Adapt the TransportReceiveStrategy for RTPS's "sample" (submessage) Header. More...
#include <RtpsSampleHeader.h>
Adapt the TransportReceiveStrategy for RTPS's "sample" (submessage) Header.
Definition at line 32 of file RtpsSampleHeader.h.
ACE_INLINE OpenDDS::DCPS::RtpsSampleHeader::RtpsSampleHeader | ( | ) |
Definition at line 14 of file RtpsSampleHeader.inl.
00015 : valid_(false) 00016 , frag_(false) 00017 , marshaled_size_(0) 00018 , message_length_(0) 00019 { 00020 }
ACE_INLINE OpenDDS::DCPS::RtpsSampleHeader::RtpsSampleHeader | ( | ACE_Message_Block & | mb | ) | [explicit] |
Definition at line 23 of file RtpsSampleHeader.inl.
References init().
00024 : valid_(false) 00025 , frag_(false) 00026 , marshaled_size_(0) 00027 , message_length_(0) 00028 { 00029 init(mb); 00030 }
ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::control_message_supported | ( | char | message_id | ) | [static] |
Definition at line 74 of file RtpsSampleHeader.inl.
References OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::INSTANCE_REGISTRATION, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element().
00075 { 00076 switch (message_id) { 00077 case INSTANCE_REGISTRATION: 00078 case UNREGISTER_INSTANCE: 00079 case DISPOSE_INSTANCE: 00080 case DISPOSE_UNREGISTER_INSTANCE: 00081 return true; 00082 default: 00083 return false; 00084 } 00085 }
void OpenDDS::DCPS::RtpsSampleHeader::init | ( | ACE_Message_Block & | mb | ) | [private] |
Definition at line 59 of file RtpsSampleHeader.cpp.
References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, CASE_SMKIND, ACE_Message_Block::cont(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, frag_, OpenDDS::RTPS::GAP, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::HEARTBEAT_FRAG, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_REPLY, OpenDDS::RTPS::INFO_REPLY_IP4, OpenDDS::RTPS::INFO_SRC, OpenDDS::RTPS::INFO_TS, ACE_Message_Block::length(), marshaled_size_, message_length_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::PAD, ACE_Message_Block::rd_ptr(), OpenDDS::RTPS::SEC_BODY, OpenDDS::RTPS::SEC_POSTFIX, OpenDDS::RTPS::SEC_PREFIX, OpenDDS::RTPS::Submessage::security_sm, OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::SRTPS_POSTFIX, OpenDDS::RTPS::SRTPS_PREFIX, submessage_, ACE_Message_Block::total_length(), and valid_.
Referenced by operator=(), and RtpsSampleHeader().
00060 { 00061 using namespace OpenDDS::RTPS; 00062 00063 // valid_ is false here, it will only be set to true if there is a Submessage 00064 00065 // Manually grab the first two bytes for the SubmessageKind and the byte order 00066 if (mb.length() == 0) { 00067 return; 00068 } 00069 00070 const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr()); 00071 00072 ACE_CDR::Octet flags = 0; 00073 00074 if (mb.length() > 1) { 00075 flags = mb.rd_ptr()[1]; 00076 } else if (mb.cont() && mb.cont()->length() > 0) { 00077 flags = mb.cont()->rd_ptr()[0]; 00078 } else { 00079 return; 00080 } 00081 00082 const bool little_endian = flags & FLAG_E; 00083 const size_t starting_length = mb.total_length(); 00084 Serializer ser(&mb, ACE_CDR_BYTE_ORDER != little_endian, 00085 Serializer::ALIGN_CDR); 00086 00087 ACE_CDR::UShort octetsToNextHeader = 0; 00088 00089 #define CASE_SMKIND(kind, class, name) case kind: { \ 00090 class submessage; \ 00091 if (ser >> submessage) { \ 00092 octetsToNextHeader = submessage.smHeader.submessageLength; \ 00093 submessage_.name##_sm(submessage); \ 00094 valid_ = true; \ 00095 } \ 00096 break; \ 00097 } 00098 00099 switch (kind) { 00100 CASE_SMKIND(PAD, PadSubmessage, pad) 00101 CASE_SMKIND(ACKNACK, AckNackSubmessage, acknack) 00102 CASE_SMKIND(HEARTBEAT, HeartBeatSubmessage, heartbeat) 00103 CASE_SMKIND(GAP, GapSubmessage, gap) 00104 CASE_SMKIND(INFO_TS, InfoTimestampSubmessage, info_ts) 00105 CASE_SMKIND(INFO_SRC, InfoSourceSubmessage, info_src) 00106 CASE_SMKIND(INFO_REPLY_IP4, InfoReplyIp4Submessage, info_reply_ipv4) 00107 CASE_SMKIND(INFO_DST, InfoDestinationSubmessage, info_dst) 00108 CASE_SMKIND(INFO_REPLY, InfoReplySubmessage, info_reply) 00109 CASE_SMKIND(NACK_FRAG, NackFragSubmessage, nack_frag) 00110 CASE_SMKIND(HEARTBEAT_FRAG, HeartBeatFragSubmessage, hb_frag) 00111 CASE_SMKIND(DATA, DataSubmessage, data) 00112 CASE_SMKIND(DATA_FRAG, DataFragSubmessage, data_frag) 00113 00114 #if defined(OPENDDS_SECURITY) 00115 // Each submessage type introduced by the Security spec is treated 00116 // as an opaque octet sequence at this layer. 00117 case SEC_BODY: 00118 case SEC_PREFIX: 00119 case SEC_POSTFIX: 00120 case SRTPS_PREFIX: 00121 case SRTPS_POSTFIX: { 00122 SecuritySubmessage submessage; 00123 if (ser >> submessage) { 00124 octetsToNextHeader = submessage.smHeader.submessageLength; 00125 submessage_.security_sm(submessage); 00126 submessage_._d(kind); 00127 valid_ = true; 00128 } 00129 break; 00130 } 00131 #endif 00132 00133 default: 00134 { 00135 SubmessageHeader submessage; 00136 if (ser >> submessage) { 00137 octetsToNextHeader = submessage.submessageLength; 00138 submessage_.unknown_sm(submessage); 00139 valid_ = true; 00140 } 00141 break; 00142 } 00143 } 00144 #undef CASE_SMKIND 00145 00146 if (valid_) { 00147 00148 frag_ = (kind == DATA_FRAG); 00149 00150 // marshaled_size_ is # of bytes of submessage we have read from "mb" 00151 marshaled_size_ = starting_length - mb.total_length(); 00152 00153 if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) { 00154 // see RTPS v2.1 section 9.4.5.1.3 00155 // In this case the current Submessage extends to the end of Message, 00156 // so we will use the message_length_ that was set in pdu_remaining(). 00157 octetsToNextHeader = 00158 static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ); 00159 } 00160 00161 if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA))) 00162 || kind == DATA_FRAG) { 00163 // These Submessages have a payload which we haven't deserialized yet. 00164 // The TransportReceiveStrategy will know this via message_length(). 00165 // octetsToNextHeader does not count the SubmessageHeader (4 bytes) 00166 message_length_ = octetsToNextHeader + SMHDR_SZ - marshaled_size_; 00167 } else { 00168 // These Submessages _could_ have extra data that we don't know about 00169 // (from a newer minor version of the RTPS spec). Either way, indicate 00170 // to the TransportReceiveStrategy that there is no data payload here. 00171 message_length_ = 0; 00172 ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(marshaled_size_); 00173 if (octetsToNextHeader + SMHDR_SZ > marshaled) { 00174 valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled); 00175 marshaled_size_ = octetsToNextHeader + SMHDR_SZ; 00176 } 00177 } 00178 } 00179 }
bool OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample | ( | ReceivedDataSample & | rds | ) |
Definition at line 232 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::Submessage::data_frag_sm, OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_K_IN_FRAG, OpenDDS::RTPS::FLAG_Q, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, LM_DEBUG, LM_WARNING, OpenDDS::DCPS::DataSampleHeader::message_id_, message_length(), OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataSampleHeader::more_fragments_, payload_byte_order(), OpenDDS::RTPS::PID_KEY_HASH, process_iqos(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::setValue(), submessage_, and OpenDDS::DCPS::Transport_debug_level.
00233 { 00234 using namespace OpenDDS::RTPS; 00235 DataSampleHeader& opendds = rds.header_; 00236 00237 switch (submessage_._d()) { 00238 case DATA: { 00239 const DataSubmessage& rtps = submessage_.data_sm(); 00240 opendds.cdr_encapsulation_ = true; 00241 opendds.message_length_ = message_length(); 00242 opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low); 00243 opendds.publication_id_.entityId = rtps.writerId; 00244 opendds.message_id_ = SAMPLE_DATA; 00245 00246 process_iqos(opendds, rtps.inlineQos); 00247 00248 if (rtps.smHeader.flags & FLAG_K_IN_DATA) { 00249 opendds.key_fields_only_ = true; 00250 } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) { 00251 // Interoperability note: the Key may be hiding in the "key hash" param 00252 // in the InlineQos. In order to make use of this Key, it mst be 16 00253 // bytes or less. We have observed other DDS implementations only send 00254 // the MD5 hash of a >16 byte key, so we must limit this to Built-in 00255 // endpoints which are assumed to use GUIDs as keys. 00256 if ((rtps.writerId.entityKind & 0xC0) == 0xC0 // Only Built-in endpoints 00257 && (rtps.smHeader.flags & FLAG_Q) && !rds.sample_) { 00258 for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) { 00259 if (rtps.inlineQos[i]._d() == PID_KEY_HASH) { 00260 rds.sample_.reset(new ACE_Message_Block(20)); 00261 // CDR_BE encapsuation scheme (endianness is not used for key hash) 00262 rds.sample_->copy("\x00\x00\x00\x00", 4); 00263 const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value; 00264 rds.sample_->copy(reinterpret_cast<const char*>(data), 16); 00265 opendds.message_length_ = 00266 static_cast<ACE_UINT32>(rds.sample_->length()); 00267 opendds.key_fields_only_ = true; 00268 if (Transport_debug_level) { 00269 ACE_DEBUG((LM_DEBUG, 00270 "(%P|%t) RtpsSampleHeader::into_received_data_sample()" 00271 " - used KeyHash data as the key-only payload\n")); 00272 } 00273 break; 00274 } 00275 } 00276 } else { 00277 // FUTURE: Handle the case of D = 0 and K = 0 00278 // used for Coherent Sets in PRESENTATION QoS (see 8.7.5) 00279 if (Transport_debug_level) { 00280 ACE_DEBUG((LM_WARNING, 00281 "(%P|%t) RtpsSampleHeader::into_received_data_sample() - " 00282 "Received a DATA Submessage with D = 0 and K = 0, " 00283 "dropping\n")); 00284 } 00285 return false; 00286 } 00287 } 00288 00289 if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) { 00290 // Peek at the byte order from the encapsulation containing the payload. 00291 opendds.byte_order_ = payload_byte_order(rds); 00292 } 00293 00294 break; 00295 } 00296 case DATA_FRAG: { 00297 const DataFragSubmessage& rtps = submessage_.data_frag_sm(); 00298 opendds.cdr_encapsulation_ = true; 00299 opendds.message_length_ = message_length(); 00300 opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low); 00301 opendds.publication_id_.entityId = rtps.writerId; 00302 opendds.message_id_ = SAMPLE_DATA; 00303 opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG); 00304 // opendds.byte_order_ set in RtpsUdpReceiveStrategy::reassemble(). 00305 00306 process_iqos(opendds, rtps.inlineQos); 00307 00308 const CORBA::ULong lastFragInSubmsg = 00309 rtps.fragmentStartingNum.value - 1 + rtps.fragmentsInSubmessage; 00310 if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) { 00311 opendds.more_fragments_ = true; 00312 } 00313 break; 00314 } 00315 default: 00316 break; 00317 } 00318 00319 return true; 00320 }
ACE_INLINE size_t OpenDDS::DCPS::RtpsSampleHeader::marshaled_size | ( | ) |
Definition at line 56 of file RtpsSampleHeader.inl.
References marshaled_size_.
00057 { 00058 return marshaled_size_; 00059 }
static size_t OpenDDS::DCPS::RtpsSampleHeader::max_marshaled_size | ( | ) | [inline, static] |
Definition at line 39 of file RtpsSampleHeader.h.
ACE_INLINE ACE_UINT32 OpenDDS::DCPS::RtpsSampleHeader::message_length | ( | void | ) |
Definition at line 62 of file RtpsSampleHeader.inl.
References message_length_.
Referenced by into_received_data_sample().
00063 { 00064 return static_cast<ACE_UINT32>(message_length_); 00065 }
ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::more_fragments | ( | void | ) | const |
Definition at line 68 of file RtpsSampleHeader.inl.
References frag_.
00069 { 00070 return frag_; 00071 }
ACE_INLINE RtpsSampleHeader & OpenDDS::DCPS::RtpsSampleHeader::operator= | ( | ACE_Message_Block & | mn | ) |
Definition at line 33 of file RtpsSampleHeader.inl.
References frag_, init(), and valid_.
00034 { 00035 frag_ = false; 00036 valid_ = false; 00037 // message_length_ should not be reset here 00038 // marshaled_size_ doesn't need to be reset, init() will set it (if valid_) 00039 init(mb); 00040 return *this; 00041 }
static bool OpenDDS::DCPS::RtpsSampleHeader::partial | ( | const ACE_Message_Block & | ) | [inline, static] |
Definition at line 44 of file RtpsSampleHeader.h.
bool OpenDDS::DCPS::RtpsSampleHeader::payload_byte_order | ( | const ReceivedDataSample & | rds | ) | [static] |
Definition at line 322 of file RtpsSampleHeader.cpp.
References OpenDDS::RTPS::FLAG_E, and OpenDDS::DCPS::ReceivedDataSample::sample_.
Referenced by into_received_data_sample().
00323 { 00324 return rds.sample_->rd_ptr()[1] & RTPS::FLAG_E; 00325 }
ACE_INLINE void OpenDDS::DCPS::RtpsSampleHeader::pdu_remaining | ( | size_t | size | ) |
Definition at line 50 of file RtpsSampleHeader.inl.
References message_length_.
00051 { 00052 message_length_ = size; 00053 }
void OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages | ( | RTPS::SubmessageSeq & | subm, | |
const TransportSendControlElement & | tsce, | |||
bool | requires_inline_qos | |||
) | [static] |
Definition at line 439 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_Q, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::TransportSendControlElement::header(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::TransportSendControlElement::listener(), LM_INFO, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::TransportSendControlElement::msg_payload(), populate_inline_qos(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::TransportSendListener::retrieve_inline_qos_data(), OpenDDS::DCPS::DataSampleHeader::sequence_, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element().
00443 { 00444 using namespace OpenDDS::RTPS; 00445 00446 const DataSampleHeader& header = tsce.header(); 00447 const ACE_CDR::Octet flags = header.byte_order_; 00448 add_timestamp(subm, flags, header); 00449 CORBA::ULong i = subm.length(); 00450 00451 static const CORBA::Octet BUILT_IN_WRITER = 0xC2; 00452 00453 DataSubmessage data = { 00454 {DATA, flags, 0}, 00455 0, 00456 DATA_OCTETS_TO_IQOS, 00457 ENTITYID_UNKNOWN, 00458 header.publication_id_.entityId, 00459 {header.sequence_.getHigh(), header.sequence_.getLow()}, 00460 ParameterList() 00461 }; 00462 switch (header.message_id_) { 00463 case INSTANCE_REGISTRATION: { 00464 // NOTE: The RTPS spec is not entirely clear about instance registration. 00465 // We have decided to send a DATA Submessage containing the key and an 00466 // inlineQoS StatusInfo of zero. 00467 data.smHeader.flags |= FLAG_K_IN_DATA; 00468 const int qos_len = data.inlineQos.length(); 00469 data.inlineQos.length(qos_len + 1); 00470 data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER); 00471 break; 00472 } 00473 case UNREGISTER_INSTANCE: { 00474 data.smHeader.flags |= FLAG_K_IN_DATA; 00475 const int qos_len = data.inlineQos.length(); 00476 data.inlineQos.length(qos_len+1); 00477 data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER); 00478 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) { 00479 add_key_hash(data.inlineQos, tsce.msg_payload()); 00480 } 00481 break; 00482 } 00483 case DISPOSE_INSTANCE: { 00484 data.smHeader.flags |= FLAG_K_IN_DATA; 00485 const int qos_len = data.inlineQos.length(); 00486 data.inlineQos.length(qos_len + 1); 00487 data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE); 00488 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) { 00489 add_key_hash(data.inlineQos, tsce.msg_payload()); 00490 } 00491 break; 00492 } 00493 case DISPOSE_UNREGISTER_INSTANCE: { 00494 data.smHeader.flags |= FLAG_K_IN_DATA; 00495 const int qos_len = data.inlineQos.length(); 00496 data.inlineQos.length(qos_len + 1); 00497 data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER); 00498 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) { 00499 add_key_hash(data.inlineQos, tsce.msg_payload()); 00500 } 00501 break; 00502 } 00503 // update control_message_supported() when adding new cases here 00504 default: 00505 ACE_DEBUG((LM_INFO, 00506 "RtpsSampleHeader::populate_data_control_submessages(): " 00507 "Non-sample messages seen, message_id = %d\n", 00508 header.message_id_)); 00509 break; 00510 } 00511 00512 if (requires_inline_qos) { 00513 TransportSendListener::InlineQosData qos_data; 00514 tsce.listener()->retrieve_inline_qos_data(qos_data); 00515 00516 populate_inline_qos(qos_data, data.inlineQos); 00517 } 00518 00519 if (data.inlineQos.length() > 0) { 00520 data.smHeader.flags |= FLAG_Q; 00521 } 00522 00523 subm.length(i + 1); 00524 subm[i].data_sm(data); 00525 }
void OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages | ( | RTPS::SubmessageSeq & | subm, | |
const DataSampleElement & | dsle, | |||
bool | requires_inline_qos | |||
) | [static] |
Definition at line 345 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_Q, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_id(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::GUIDPREFIX_UNKNOWN, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, LM_INFO, OpenDDS::DCPS::DataSampleHeader::message_id_, populate_inline_qos(), OpenDDS::DCPS::TransportSendListener::retrieve_inline_qos_data(), OpenDDS::DCPS::SAMPLE_DATA, and OpenDDS::DCPS::DataSampleHeader::sequence_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element().
00349 { 00350 using namespace OpenDDS::RTPS; 00351 00352 const ACE_CDR::Octet flags = dsle.get_header().byte_order_; 00353 add_timestamp(subm, flags, dsle.get_header()); 00354 CORBA::ULong i = subm.length(); 00355 00356 EntityId_t readerId = ENTITYID_UNKNOWN; 00357 if (dsle.get_num_subs() == 1) { 00358 readerId = dsle.get_sub_id(0).entityId; 00359 InfoDestinationSubmessage idest; 00360 idest.smHeader.submessageId = INFO_DST; 00361 idest.smHeader.flags = flags; 00362 idest.smHeader.submessageLength = INFO_DST_SZ; 00363 std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix, 00364 sizeof(GuidPrefix_t)); 00365 subm.length(i + 1); 00366 subm[i++].info_dst_sm(idest); 00367 } else { 00368 //Not durability resend, but could have inline gaps 00369 for (CORBA::ULong x = 0; x < i; ++x) { 00370 if (subm[x]._d() == INFO_DST) { 00371 //Need to add INFO_DST 00372 InfoDestinationSubmessage idest; 00373 idest.smHeader.submessageId = INFO_DST; 00374 idest.smHeader.flags = flags; 00375 idest.smHeader.submessageLength = INFO_DST_SZ; 00376 std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN, 00377 sizeof(GuidPrefix_t)); 00378 subm.length(i + 1); 00379 subm[i++].info_dst_sm(idest); 00380 break; 00381 } 00382 } 00383 } 00384 00385 DataSubmessage data = { 00386 {DATA, flags, 0}, 00387 0, 00388 DATA_OCTETS_TO_IQOS, 00389 readerId, 00390 dsle.get_pub_id().entityId, 00391 {dsle.get_header().sequence_.getHigh(), dsle.get_header().sequence_.getLow()}, 00392 ParameterList() 00393 }; 00394 const char message_id = dsle.get_header().message_id_; 00395 switch (message_id) { 00396 case SAMPLE_DATA: 00397 // Must be a data message 00398 data.smHeader.flags |= FLAG_D; 00399 break; 00400 default: 00401 ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): " 00402 "Non-sample messages seen, message_id = %d\n", int(message_id))); 00403 break; 00404 } 00405 00406 if (requires_inline_qos) { 00407 TransportSendListener::InlineQosData qos_data; 00408 dsle.get_send_listener()->retrieve_inline_qos_data(qos_data); 00409 00410 populate_inline_qos(qos_data, data.inlineQos); 00411 } 00412 00413 if (data.inlineQos.length() > 0) { 00414 data.smHeader.flags |= FLAG_Q; 00415 } 00416 00417 subm.length(i + 1); 00418 subm[i].data_sm(data); 00419 }
void OpenDDS::DCPS::RtpsSampleHeader::populate_inline_qos | ( | const TransportSendListener::InlineQosData & | qos_data, | |
RTPS::ParameterList & | plist | |||
) | [static] |
Definition at line 535 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::RTPS::PID_TOPIC_NAME, PROCESS_INLINE_QOS, OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, TheServiceParticipant, and OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name.
Referenced by populate_data_control_submessages(), and populate_data_sample_submessages().
00538 { 00539 using namespace OpenDDS::RTPS; 00540 00541 // Always include topic name (per the spec) 00542 { 00543 const int qos_len = plist.length(); 00544 plist.length(qos_len + 1); 00545 plist[qos_len].string_data(qos_data.topic_name.c_str()); 00546 plist[qos_len]._d(PID_TOPIC_NAME); 00547 } 00548 00549 // Conditionally include other QoS inline when the differ from the 00550 // default value. 00551 DDS::PublisherQos default_pub_qos = 00552 TheServiceParticipant->initial_PublisherQos(); 00553 PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos); 00554 PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos); 00555 00556 DDS::DataWriterQos default_dw_qos = 00557 TheServiceParticipant->initial_DataWriterQos(); 00558 PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos); 00559 PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos); 00560 PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos); 00561 PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos); 00562 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00563 PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos); 00564 #endif 00565 PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos); 00566 PROCESS_INLINE_QOS(reliability, default_dw_qos, qos_data.dw_qos); 00567 PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos); 00568 PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos); 00569 PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos); 00570 }
void OpenDDS::DCPS::RtpsSampleHeader::process_iqos | ( | DataSampleHeader & | opendds, | |
const OpenDDS::RTPS::ParameterList & | iqos | |||
) | [static, private] |
Definition at line 182 of file RtpsSampleHeader.cpp.
References DDS::PresentationQosPolicy::access_scope, DDS::PresentationQosPolicy::coherent_access, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::INSTANCE_REGISTRATION, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, DDS::PartitionQosPolicy::name, OPENDDS_STRING, DDS::PresentationQosPolicy::ordered_access, OpenDDS::RTPS::PID_ORIGINAL_WRITER_INFO, OpenDDS::RTPS::PID_PARTITION, OpenDDS::RTPS::PID_PRESENTATION, OpenDDS::RTPS::PID_STATUS_INFO, OpenDDS::RTPS::PID_TOPIC_NAME, OpenDDS::DCPS::to_dds_string(), and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by into_received_data_sample().
00184 { 00185 using namespace OpenDDS::RTPS; 00186 #if defined(OPENDDS_TEST_INLINE_QOS) 00187 OPENDDS_STRING output("into_received_data_sample(): "); 00188 output += to_dds_string(iqos.length()); 00189 output += " inline QoS parameters\n"; 00190 for (CORBA::ULong index = 0; index < iqos.length(); ++index) { 00191 output += " parameter type = "; 00192 output += to_dds_string(iqos[index]._d()); 00193 output += "\n"; 00194 } 00195 ACE_DEBUG((LM_DEBUG, "%C", output.c_str())); 00196 #endif 00197 for (CORBA::ULong i = 0; i < iqos.length(); ++i) { 00198 if (iqos[i]._d() == PID_STATUS_INFO) { 00199 if (iqos[i].status_info() == STATUS_INFO_DISPOSE) { 00200 opendds.message_id_ = DISPOSE_INSTANCE; 00201 } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) { 00202 opendds.message_id_ = UNREGISTER_INSTANCE; 00203 } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) { 00204 opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE; 00205 } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) { 00206 opendds.message_id_ = INSTANCE_REGISTRATION; 00207 } 00208 } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) { 00209 opendds.historic_sample_ = true; 00210 #if defined(OPENDDS_TEST_INLINE_QOS) 00211 } else if (iqos[i]._d() == PID_TOPIC_NAME) { 00212 ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data())); 00213 } else if (iqos[i]._d() == PID_PRESENTATION) { 00214 DDS::PresentationQosPolicy pres_qos = iqos[i].presentation(); 00215 ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, " 00216 "coherent_access = %d, ordered_access = %d\n", 00217 pres_qos.access_scope, pres_qos.coherent_access, 00218 pres_qos.ordered_access)); 00219 } else if (iqos[i]._d() == PID_PARTITION) { 00220 DDS::PartitionQosPolicy part_qos = iqos[i].partition(); 00221 ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length())); 00222 for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) { 00223 ACE_DEBUG((LM_DEBUG, "'%C' ", part_qos.name[j].in())); 00224 } 00225 ACE_DEBUG((LM_DEBUG, "\n")); 00226 #endif 00227 } 00228 } 00229 }
SequenceRange OpenDDS::DCPS::RtpsSampleHeader::split | ( | const ACE_Message_Block & | orig, | |
size_t | size, | |||
Message_Block_Ptr & | head, | |||
Message_Block_Ptr & | tail | |||
) | [static] |
Create two new serialized headers (owned by caller), the "head" having at most "size" bytes (header + data) and the "tail" having the rest. Returns a pair containing the largest fragment number in each new header.
Definition at line 621 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::DataSampleHeader::alloc_msgblock(), ACE_Message_Block::cont(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::DATA_FRAG_OCTETS_TO_IQOS, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_K_IN_FRAG, OpenDDS::RTPS::FLAG_Q, FRAG_SIZE, ACE_Message_Block::length(), LM_ERROR, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::DataSampleHeader::split_payload(), ACE_Message_Block::total_length(), OpenDDS::DCPS::Transport_debug_level, and write().
Referenced by OpenDDS::DCPS::RtpsCustomizedElement::fragment().
00623 { 00624 using namespace RTPS; 00625 static const SequenceRange unknown_range(SequenceNumber::SEQUENCENUMBER_UNKNOWN(), 00626 SequenceNumber::SEQUENCENUMBER_UNKNOWN()); 00627 size_t data_offset = 0; 00628 const char* rd = orig.rd_ptr(); 00629 ACE_CDR::ULong starting_frag, sample_size; 00630 ACE_CDR::Octet flags; 00631 bool swap_bytes; 00632 00633 // Find the start of the DATA | DATA_FRAG submessage in the orig msg block. 00634 // The submessages from the start of the msg block to this point (data_offset) 00635 // will be copied to both the head and tail fragments. 00636 while (true) { 00637 flags = rd[data_offset + 1]; 00638 swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E); 00639 bool found_data = false; 00640 00641 switch (rd[data_offset]) { 00642 case DATA: 00643 if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) { 00644 if (Transport_debug_level) { 00645 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - " 00646 "attempting to fragment a Data submessage with no payload.\n")); 00647 } 00648 return unknown_range; 00649 } 00650 found_data = true; 00651 starting_frag = 1; 00652 sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length()); 00653 break; 00654 case DATA_FRAG: 00655 found_data = true; 00656 peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes); 00657 peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes); 00658 break; 00659 } 00660 00661 if (found_data) { 00662 break; 00663 } 00664 00665 // Scan for next submessage in orig 00666 ACE_CDR::UShort octetsToNextHeader; 00667 peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes); 00668 00669 data_offset += octetsToNextHeader + SMHDR_SZ; 00670 if (data_offset >= orig.length()) { 00671 if (Transport_debug_level) { 00672 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - " 00673 "invalid octetsToNextHeader encountered while fragmenting.\n")); 00674 } 00675 return unknown_range; 00676 } 00677 } 00678 00679 // Create the "head" message block (of size "sz") containing DATA_FRAG 00680 size_t sz = orig.length(); 00681 ACE_CDR::Octet new_flags = flags; 00682 size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS; 00683 if (rd[data_offset] == DATA) { 00684 sz += 12; // DATA_FRAG is 12 bytes larger than DATA 00685 iqos_offset -= 12; 00686 new_flags &= ~(FLAG_K_IN_DATA | FLAG_K_IN_FRAG); 00687 if (flags & FLAG_K_IN_DATA) { 00688 new_flags |= FLAG_K_IN_FRAG; 00689 } 00690 } 00691 head.reset(DataSampleHeader::alloc_msgblock(orig, sz, false)); 00692 00693 head->copy(rd, data_offset); 00694 00695 head->wr_ptr()[0] = DATA_FRAG; 00696 head->wr_ptr()[1] = new_flags; 00697 head->wr_ptr(2); 00698 00699 std::memset(head->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags 00700 head->wr_ptr(4); 00701 00702 write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes); 00703 00704 head->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum 00705 00706 write(head, starting_frag, swap_bytes); 00707 const size_t max_data = size - sz, orig_payload = orig.cont()->total_length(); 00708 const ACE_CDR::UShort frags = 00709 static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE); 00710 write(head, frags, swap_bytes); 00711 write(head, FRAG_SIZE, swap_bytes); 00712 write(head, sample_size, swap_bytes); 00713 00714 if (flags & FLAG_Q) { 00715 head->copy(rd + iqos_offset, orig.length() - iqos_offset); 00716 } 00717 00718 // Create the "tail" message block containing DATA_FRAG with Q=0 00719 tail.reset(DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false)); 00720 00721 tail->copy(rd, data_offset); 00722 00723 tail->wr_ptr()[0] = DATA_FRAG; 00724 tail->wr_ptr()[1] = new_flags & ~FLAG_Q; 00725 tail->wr_ptr(2); 00726 00727 std::memset(tail->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags 00728 tail->wr_ptr(4); 00729 00730 write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes); 00731 tail->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum 00732 00733 write(tail, starting_frag + frags, swap_bytes); 00734 const size_t tail_data = orig_payload - frags * FRAG_SIZE; 00735 const ACE_CDR::UShort tail_frags = 00736 static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE); 00737 write(tail, tail_frags, swap_bytes); 00738 write(tail, FRAG_SIZE, swap_bytes); 00739 write(tail, sample_size, swap_bytes); 00740 00741 Message_Block_Ptr payload_head; 00742 Message_Block_Ptr payload_tail; 00743 DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE, 00744 payload_head, payload_tail); 00745 head->cont(payload_head.release()); 00746 tail->cont(payload_tail.release()); 00747 00748 return SequenceRange(starting_frag + frags - 1, 00749 starting_frag + frags + tail_frags - 1); 00750 }
ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::valid | ( | void | ) | const |
Definition at line 44 of file RtpsSampleHeader.inl.
References valid_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header().
00045 { 00046 return valid_; 00047 }
bool OpenDDS::DCPS::RtpsSampleHeader::frag_ [private] |
Definition at line 74 of file RtpsSampleHeader.h.
Referenced by init(), more_fragments(), and operator=().
const ACE_CDR::UShort OpenDDS::DCPS::RtpsSampleHeader::FRAG_SIZE = 1024 [static] |
Definition at line 98 of file RtpsSampleHeader.h.
Referenced by split().
Definition at line 75 of file RtpsSampleHeader.h.
Referenced by init(), and marshaled_size().
Definition at line 75 of file RtpsSampleHeader.h.
Referenced by init(), message_length(), and pdu_remaining().
Definition at line 69 of file RtpsSampleHeader.h.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), init(), into_received_data_sample(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble().
bool OpenDDS::DCPS::RtpsSampleHeader::valid_ [private] |
Definition at line 74 of file RtpsSampleHeader.h.
Referenced by init(), operator=(), and valid().