#include <RtpsSampleHeader.h>
Collaboration diagram for OpenDDS::DCPS::RtpsSampleHeader:
Public Member Functions | |
RtpsSampleHeader () | |
RtpsSampleHeader (ACE_Message_Block &mb) | |
RtpsSampleHeader & | operator= (ACE_Message_Block &mn) |
void | pdu_remaining (size_t size) |
size_t | marshaled_size () |
ACE_UINT32 | message_length () |
bool | valid () const |
bool | into_received_data_sample (ReceivedDataSample &rds) |
bool | more_fragments () const |
Static Public Member Functions | |
static size_t | max_marshaled_size () |
static bool | partial (const ACE_Message_Block &) |
static SequenceRange | split (const ACE_Message_Block &orig, size_t size, ACE_Message_Block *&head, ACE_Message_Block *&tail) |
static void | populate_data_sample_submessages (RTPS::SubmessageSeq &subm, const DataSampleElement &dsle, bool requires_inline_qos) |
static void | populate_data_control_submessages (RTPS::SubmessageSeq &subm, const TransportSendControlElement &tsce, bool requires_inline_qos) |
static void | populate_inline_qos (const TransportSendListener::InlineQosData &qos_data, RTPS::ParameterList &plist) |
static bool | control_message_supported (char message_id) |
Public Attributes | |
RTPS::Submessage | submessage_ |
Static Public Attributes | |
static const ACE_CDR::UShort | FRAG_SIZE = 1024 |
Private Member Functions | |
void | init (ACE_Message_Block &mb) |
Static Private Member Functions | |
static void | process_iqos (DataSampleHeader &opendds, const OpenDDS::RTPS::ParameterList &iqos) |
Private Attributes | |
bool | valid_ |
bool | frag_ |
size_t | marshaled_size_ |
size_t | message_length_ |
Definition at line 30 of file RtpsSampleHeader.h.
ACE_INLINE OpenDDS::DCPS::RtpsSampleHeader::RtpsSampleHeader | ( | ) |
Definition at line 12 of file RtpsSampleHeader.inl.
00013 : valid_(false) 00014 , frag_(false) 00015 , marshaled_size_(0) 00016 , message_length_(0) 00017 { 00018 }
ACE_INLINE OpenDDS::DCPS::RtpsSampleHeader::RtpsSampleHeader | ( | ACE_Message_Block & | mb | ) | [explicit] |
Definition at line 21 of file RtpsSampleHeader.inl.
References init().
00022 : valid_(false) 00023 , frag_(false) 00024 , marshaled_size_(0) 00025 , message_length_(0) 00026 { 00027 init(mb); 00028 }
ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::control_message_supported | ( | char | message_id | ) | [static] |
Definition at line 72 of file RtpsSampleHeader.inl.
References OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::INSTANCE_REGISTRATION, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element().
00073 { 00074 switch (message_id) { 00075 case INSTANCE_REGISTRATION: 00076 case UNREGISTER_INSTANCE: 00077 case DISPOSE_INSTANCE: 00078 case DISPOSE_UNREGISTER_INSTANCE: 00079 return true; 00080 default: 00081 return false; 00082 } 00083 }
void OpenDDS::DCPS::RtpsSampleHeader::init | ( | ACE_Message_Block & | mb | ) | [private] |
Definition at line 60 of file RtpsSampleHeader.cpp.
References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, CASE_SMKIND, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, FLAG_K_IN_DATA, frag_, OpenDDS::RTPS::GAP, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::HEARTBEAT_FRAG, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_REPLY, OpenDDS::RTPS::INFO_REPLY_IP4, OpenDDS::RTPS::INFO_SRC, OpenDDS::RTPS::INFO_TS, marshaled_size_, message_length_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::PAD, OpenDDS::DCPS::Serializer::skip(), OpenDDS::RTPS::SMHDR_SZ, submessage_, and valid_.
Referenced by operator=(), and RtpsSampleHeader().
00061 { 00062 using namespace OpenDDS::RTPS; 00063 00064 // valid_ is false here, it will only be set to true if there is a Submessage 00065 00066 // Manually grab the first two bytes for the SubmessageKind and the byte order 00067 if (mb.length() == 0) { 00068 return; 00069 } 00070 00071 const SubmessageKind kind = static_cast<SubmessageKind>(*mb.rd_ptr()); 00072 00073 ACE_CDR::Octet flags = 0; 00074 00075 if (mb.length() > 1) { 00076 flags = mb.rd_ptr()[1]; 00077 } else if (mb.cont() && mb.cont()->length() > 0) { 00078 flags = mb.cont()->rd_ptr()[0]; 00079 } else { 00080 return; 00081 } 00082 00083 const bool little_endian = flags & FLAG_E; 00084 const size_t starting_length = mb.total_length(); 00085 Serializer ser(&mb, ACE_CDR_BYTE_ORDER != little_endian, 00086 Serializer::ALIGN_CDR); 00087 00088 ACE_CDR::UShort octetsToNextHeader = 0; 00089 00090 #define CASE_SMKIND(kind, class, name) case kind: { \ 00091 class submessage; \ 00092 if (ser >> submessage) { \ 00093 octetsToNextHeader = submessage.smHeader.submessageLength; \ 00094 submessage_.name##_sm(submessage); \ 00095 valid_ = true; \ 00096 } \ 00097 break; \ 00098 } 00099 00100 switch (kind) { 00101 CASE_SMKIND(PAD, PadSubmessage, pad) 00102 CASE_SMKIND(ACKNACK, AckNackSubmessage, acknack) 00103 CASE_SMKIND(HEARTBEAT, HeartBeatSubmessage, heartbeat) 00104 CASE_SMKIND(GAP, GapSubmessage, gap) 00105 CASE_SMKIND(INFO_TS, InfoTimestampSubmessage, info_ts) 00106 CASE_SMKIND(INFO_SRC, InfoSourceSubmessage, info_src) 00107 CASE_SMKIND(INFO_REPLY_IP4, InfoReplyIp4Submessage, info_reply_ipv4) 00108 CASE_SMKIND(INFO_DST, InfoDestinationSubmessage, info_dst) 00109 CASE_SMKIND(INFO_REPLY, InfoReplySubmessage, info_reply) 00110 CASE_SMKIND(NACK_FRAG, NackFragSubmessage, nack_frag) 00111 CASE_SMKIND(HEARTBEAT_FRAG, HeartBeatFragSubmessage, hb_frag) 00112 CASE_SMKIND(DATA, DataSubmessage, data) 00113 CASE_SMKIND(DATA_FRAG, DataFragSubmessage, data_frag) 00114 default: 00115 { 00116 SubmessageHeader submessage; 00117 if (ser >> submessage) { 00118 octetsToNextHeader = submessage.submessageLength; 00119 submessage_.unknown_sm(submessage); 00120 valid_ = true; 00121 } 00122 break; 00123 } 00124 } 00125 #undef CASE_SMKIND 00126 00127 if (valid_) { 00128 00129 frag_ = (kind == DATA_FRAG); 00130 00131 // marshaled_size_ is # of bytes of submessage we have read from "mb" 00132 marshaled_size_ = starting_length - mb.total_length(); 00133 00134 if (octetsToNextHeader == 0 && kind != PAD && kind != INFO_TS) { 00135 // see RTPS v2.1 section 9.4.5.1.3 00136 // In this case the current Submessage extends to the end of Message, 00137 // so we will use the message_length_ that was set in pdu_remaining(). 00138 octetsToNextHeader = 00139 static_cast<ACE_CDR::UShort>(message_length_ - SMHDR_SZ); 00140 } 00141 00142 if ((kind == DATA && (flags & (FLAG_D | FLAG_K_IN_DATA))) 00143 || kind == DATA_FRAG) { 00144 // These Submessages have a payload which we haven't deserialized yet. 00145 // The TransportReceiveStrategy will know this via message_length(). 00146 // octetsToNextHeader does not count the SubmessageHeader (4 bytes) 00147 message_length_ = octetsToNextHeader + SMHDR_SZ - marshaled_size_; 00148 } else { 00149 // These Submessages _could_ have extra data that we don't know about 00150 // (from a newer minor version of the RTPS spec). Either way, indicate 00151 // to the TransportReceiveStrategy that there is no data payload here. 00152 message_length_ = 0; 00153 ACE_CDR::UShort marshaled = static_cast<ACE_CDR::UShort>(marshaled_size_); 00154 if (octetsToNextHeader + SMHDR_SZ > marshaled) { 00155 valid_ = ser.skip(octetsToNextHeader + SMHDR_SZ - marshaled); 00156 marshaled_size_ = octetsToNextHeader + SMHDR_SZ; 00157 } 00158 } 00159 } 00160 }
bool OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample | ( | ReceivedDataSample & | rds | ) |
Definition at line 213 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::Submessage::data_frag_sm, OpenDDS::RTPS::Submessage::data_sm, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, FLAG_K_IN_DATA, FLAG_K_IN_FRAG, OpenDDS::RTPS::FLAG_Q, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::DataSampleHeader::message_id_, message_length(), OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::DataSampleHeader::more_fragments_, OpenDDS::RTPS::PID_KEY_HASH, process_iqos(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::setValue(), submessage_, and OpenDDS::DCPS::Transport_debug_level.
00214 { 00215 using namespace OpenDDS::RTPS; 00216 DataSampleHeader& opendds = rds.header_; 00217 00218 switch (submessage_._d()) { 00219 case DATA: { 00220 const DataSubmessage& rtps = submessage_.data_sm(); 00221 opendds.cdr_encapsulation_ = true; 00222 opendds.message_length_ = message_length(); 00223 opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low); 00224 opendds.publication_id_.entityId = rtps.writerId; 00225 opendds.message_id_ = SAMPLE_DATA; 00226 00227 process_iqos(opendds, rtps.inlineQos); 00228 00229 if (rtps.smHeader.flags & FLAG_K_IN_DATA) { 00230 opendds.key_fields_only_ = true; 00231 } else if (!(rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA))) { 00232 // Interoperability note: the Key may be hiding in the "key hash" param 00233 // in the InlineQos. In order to make use of this Key, it mst be 16 00234 // bytes or less. We have observed other DDS implementations only send 00235 // the MD5 hash of a >16 byte key, so we must limit this to Built-in 00236 // endpoints which are assumed to use GUIDs as keys. 00237 if ((rtps.writerId.entityKind & 0xC0) == 0xC0 // Only Built-in endpoints 00238 && (rtps.smHeader.flags & FLAG_Q) && !rds.sample_) { 00239 for (CORBA::ULong i = 0; i < rtps.inlineQos.length(); ++i) { 00240 if (rtps.inlineQos[i]._d() == PID_KEY_HASH) { 00241 rds.sample_ = new ACE_Message_Block(20); 00242 // CDR_BE encapsuation scheme (endianness is not used for key hash) 00243 rds.sample_->copy("\x00\x00\x00\x00", 4); 00244 const CORBA::Octet* data = rtps.inlineQos[i].key_hash().value; 00245 rds.sample_->copy(reinterpret_cast<const char*>(data), 16); 00246 opendds.message_length_ = 00247 static_cast<ACE_UINT32>(rds.sample_->length()); 00248 opendds.key_fields_only_ = true; 00249 if (Transport_debug_level) { 00250 ACE_DEBUG((LM_DEBUG, 00251 "(%P|%t) RtpsSampleHeader::into_received_data_sample()" 00252 " - used KeyHash data as the key-only payload\n")); 00253 } 00254 break; 00255 } 00256 } 00257 } else { 00258 // FUTURE: Handle the case of D = 0 and K = 0 00259 // used for Coherent Sets in PRESENTATION QoS (see 8.7.5) 00260 if (Transport_debug_level) { 00261 ACE_DEBUG((LM_WARNING, 00262 "(%P|%t) RtpsSampleHeader::into_received_data_sample() - " 00263 "Received a DATA Submessage with D = 0 and K = 0, " 00264 "dropping\n")); 00265 } 00266 return false; 00267 } 00268 } 00269 00270 if (rtps.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) { 00271 // Peek at the byte order from the encapsulation containing the payload. 00272 opendds.byte_order_ = rds.sample_->rd_ptr()[1] & FLAG_E; 00273 } 00274 00275 break; 00276 } 00277 case DATA_FRAG: { 00278 const DataFragSubmessage& rtps = submessage_.data_frag_sm(); 00279 opendds.cdr_encapsulation_ = true; 00280 opendds.message_length_ = message_length(); 00281 opendds.sequence_.setValue(rtps.writerSN.high, rtps.writerSN.low); 00282 opendds.publication_id_.entityId = rtps.writerId; 00283 opendds.message_id_ = SAMPLE_DATA; 00284 opendds.key_fields_only_ = (rtps.smHeader.flags & FLAG_K_IN_FRAG); 00285 // opendds.byte_order_ set in RtpsUdpReceiveStrategy::reassemble(). 00286 00287 process_iqos(opendds, rtps.inlineQos); 00288 00289 const CORBA::ULong lastFragInSubmsg = 00290 rtps.fragmentStartingNum.value - 1 + rtps.fragmentsInSubmessage; 00291 if (lastFragInSubmsg * rtps.fragmentSize < rtps.sampleSize) { 00292 opendds.more_fragments_ = true; 00293 } 00294 break; 00295 } 00296 default: 00297 break; 00298 } 00299 00300 return true; 00301 }
ACE_INLINE size_t OpenDDS::DCPS::RtpsSampleHeader::marshaled_size | ( | ) |
Definition at line 54 of file RtpsSampleHeader.inl.
References marshaled_size_.
00055 { 00056 return marshaled_size_; 00057 }
static size_t OpenDDS::DCPS::RtpsSampleHeader::max_marshaled_size | ( | ) | [inline, static] |
ACE_INLINE ACE_UINT32 OpenDDS::DCPS::RtpsSampleHeader::message_length | ( | ) |
Definition at line 60 of file RtpsSampleHeader.inl.
References message_length_.
Referenced by into_received_data_sample().
00061 { 00062 return static_cast<ACE_UINT32>(message_length_); 00063 }
ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::more_fragments | ( | ) | const |
Definition at line 66 of file RtpsSampleHeader.inl.
References frag_.
00067 { 00068 return frag_; 00069 }
ACE_INLINE RtpsSampleHeader & OpenDDS::DCPS::RtpsSampleHeader::operator= | ( | ACE_Message_Block & | mn | ) |
Definition at line 31 of file RtpsSampleHeader.inl.
References frag_, init(), and valid_.
00032 { 00033 frag_ = false; 00034 valid_ = false; 00035 // message_length_ should not be reset here 00036 // marshaled_size_ doesn't need to be reset, init() will set it (if valid_) 00037 init(mb); 00038 return *this; 00039 }
static bool OpenDDS::DCPS::RtpsSampleHeader::partial | ( | const ACE_Message_Block & | ) | [inline, static] |
ACE_INLINE void OpenDDS::DCPS::RtpsSampleHeader::pdu_remaining | ( | size_t | size | ) |
Definition at line 48 of file RtpsSampleHeader.inl.
References message_length_.
00049 { 00050 message_length_ = size; 00051 }
void OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages | ( | RTPS::SubmessageSeq & | subm, | |
const TransportSendControlElement & | tsce, | |||
bool | requires_inline_qos | |||
) | [static] |
Definition at line 415 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::add_key_hash(), OpenDDS::DCPS::add_timestamp(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::ENTITYID_UNKNOWN, FLAG_K_IN_DATA, OpenDDS::RTPS::FLAG_Q, OpenDDS::DCPS::TransportSendControlElement::header(), header, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::TransportSendControlElement::listener(), OpenDDS::DCPS::TransportSendControlElement::msg_payload(), populate_inline_qos(), OpenDDS::DCPS::TransportSendListener::retrieve_inline_qos_data(), STATUS_INFO_DISPOSE, STATUS_INFO_DISPOSE_UNREGISTER, STATUS_INFO_REGISTER, STATUS_INFO_UNREGISTER, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element().
00419 { 00420 using namespace OpenDDS::RTPS; 00421 00422 const DataSampleHeader& header = tsce.header(); 00423 const ACE_CDR::Octet flags = header.byte_order_; 00424 add_timestamp(subm, flags, header); 00425 CORBA::ULong i = subm.length(); 00426 00427 static const CORBA::Octet BUILT_IN_WRITER = 0xC2; 00428 00429 DataSubmessage data = { 00430 {DATA, flags, 0}, 00431 0, 00432 DATA_OCTETS_TO_IQOS, 00433 ENTITYID_UNKNOWN, 00434 header.publication_id_.entityId, 00435 {header.sequence_.getHigh(), header.sequence_.getLow()}, 00436 ParameterList() 00437 }; 00438 switch (header.message_id_) { 00439 case INSTANCE_REGISTRATION: { 00440 // NOTE: The RTPS spec is not entirely clear about instance registration. 00441 // We have decided to send a DATA Submessage containing the key and an 00442 // inlineQoS StatusInfo of zero. 00443 data.smHeader.flags |= FLAG_K_IN_DATA; 00444 const int qos_len = data.inlineQos.length(); 00445 data.inlineQos.length(qos_len + 1); 00446 data.inlineQos[qos_len].status_info(STATUS_INFO_REGISTER); 00447 break; 00448 } 00449 case UNREGISTER_INSTANCE: { 00450 data.smHeader.flags |= FLAG_K_IN_DATA; 00451 const int qos_len = data.inlineQos.length(); 00452 data.inlineQos.length(qos_len+1); 00453 data.inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER); 00454 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) { 00455 add_key_hash(data.inlineQos, tsce.msg_payload()); 00456 } 00457 break; 00458 } 00459 case DISPOSE_INSTANCE: { 00460 data.smHeader.flags |= FLAG_K_IN_DATA; 00461 const int qos_len = data.inlineQos.length(); 00462 data.inlineQos.length(qos_len + 1); 00463 data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE); 00464 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) { 00465 add_key_hash(data.inlineQos, tsce.msg_payload()); 00466 } 00467 break; 00468 } 00469 case DISPOSE_UNREGISTER_INSTANCE: { 00470 data.smHeader.flags |= FLAG_K_IN_DATA; 00471 const int qos_len = data.inlineQos.length(); 00472 data.inlineQos.length(qos_len + 1); 00473 data.inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER); 00474 if (header.publication_id_.entityId.entityKind == BUILT_IN_WRITER) { 00475 add_key_hash(data.inlineQos, tsce.msg_payload()); 00476 } 00477 break; 00478 } 00479 // update control_message_supported() when adding new cases here 00480 default: 00481 ACE_DEBUG((LM_INFO, 00482 "RtpsSampleHeader::populate_data_control_submessages(): " 00483 "Non-sample messages seen, message_id = %d\n", 00484 header.message_id_)); 00485 break; 00486 } 00487 00488 if (requires_inline_qos) { 00489 TransportSendListener::InlineQosData qos_data; 00490 tsce.listener()->retrieve_inline_qos_data(qos_data); 00491 00492 populate_inline_qos(qos_data, data.inlineQos); 00493 } 00494 00495 if (data.inlineQos.length() > 0) { 00496 data.smHeader.flags |= FLAG_Q; 00497 } 00498 00499 subm.length(i + 1); 00500 subm[i].data_sm(data); 00501 }
void OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages | ( | RTPS::SubmessageSeq & | subm, | |
const DataSampleElement & | dsle, | |||
bool | requires_inline_qos | |||
) | [static] |
Definition at line 321 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::add_timestamp(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_Q, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_send_listener(), OpenDDS::DCPS::DataSampleElement::get_sub_id(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::GUIDPREFIX_UNKNOWN, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::DataSampleHeader::message_id_, populate_inline_qos(), OpenDDS::DCPS::TransportSendListener::retrieve_inline_qos_data(), OpenDDS::DCPS::SAMPLE_DATA, and OpenDDS::DCPS::DataSampleHeader::sequence_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element().
00325 { 00326 using namespace OpenDDS::RTPS; 00327 00328 const ACE_CDR::Octet flags = dsle.get_header().byte_order_; 00329 add_timestamp(subm, flags, dsle.get_header()); 00330 CORBA::ULong i = subm.length(); 00331 00332 EntityId_t readerId = ENTITYID_UNKNOWN; 00333 if (dsle.get_num_subs() == 1) { 00334 readerId = dsle.get_sub_id(0).entityId; 00335 InfoDestinationSubmessage idest; 00336 idest.smHeader.submessageId = INFO_DST; 00337 idest.smHeader.flags = flags; 00338 idest.smHeader.submessageLength = INFO_DST_SZ; 00339 std::memcpy(idest.guidPrefix, dsle.get_sub_id(0).guidPrefix, 00340 sizeof(GuidPrefix_t)); 00341 subm.length(i + 1); 00342 subm[i++].info_dst_sm(idest); 00343 } else { 00344 //Not durability resend, but could have inline gaps 00345 for (CORBA::ULong x = 0; x < i; ++x) { 00346 if (subm[x]._d() == INFO_DST) { 00347 //Need to add INFO_DST 00348 InfoDestinationSubmessage idest; 00349 idest.smHeader.submessageId = INFO_DST; 00350 idest.smHeader.flags = flags; 00351 idest.smHeader.submessageLength = INFO_DST_SZ; 00352 std::memcpy(idest.guidPrefix, GUIDPREFIX_UNKNOWN, 00353 sizeof(GuidPrefix_t)); 00354 subm.length(i + 1); 00355 subm[i++].info_dst_sm(idest); 00356 break; 00357 } 00358 } 00359 } 00360 00361 DataSubmessage data = { 00362 {DATA, flags, 0}, 00363 0, 00364 DATA_OCTETS_TO_IQOS, 00365 readerId, 00366 dsle.get_pub_id().entityId, 00367 {dsle.get_header().sequence_.getHigh(), dsle.get_header().sequence_.getLow()}, 00368 ParameterList() 00369 }; 00370 const char message_id = dsle.get_header().message_id_; 00371 switch (message_id) { 00372 case SAMPLE_DATA: 00373 // Must be a data message 00374 data.smHeader.flags |= FLAG_D; 00375 break; 00376 default: 00377 ACE_DEBUG((LM_INFO, "(%P|%t) RtpsSampleHeader::populate_submessages(): " 00378 "Non-sample messages seen, message_id = %d\n", int(message_id))); 00379 break; 00380 } 00381 00382 if (requires_inline_qos) { 00383 TransportSendListener::InlineQosData qos_data; 00384 dsle.get_send_listener()->retrieve_inline_qos_data(qos_data); 00385 00386 populate_inline_qos(qos_data, data.inlineQos); 00387 } 00388 00389 if (data.inlineQos.length() > 0) { 00390 data.smHeader.flags |= FLAG_Q; 00391 } 00392 00393 subm.length(i + 1); 00394 subm[i].data_sm(data); 00395 }
void OpenDDS::DCPS::RtpsSampleHeader::populate_inline_qos | ( | const TransportSendListener::InlineQosData & | qos_data, | |
RTPS::ParameterList & | plist | |||
) | [static] |
Definition at line 511 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, OpenDDS::RTPS::PID_TOPIC_NAME, PROCESS_INLINE_QOS, OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, TheServiceParticipant, and OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name.
Referenced by populate_data_control_submessages(), and populate_data_sample_submessages().
00514 { 00515 using namespace OpenDDS::RTPS; 00516 00517 // Always include topic name (per the spec) 00518 { 00519 const int qos_len = plist.length(); 00520 plist.length(qos_len + 1); 00521 plist[qos_len].string_data(qos_data.topic_name.c_str()); 00522 plist[qos_len]._d(PID_TOPIC_NAME); 00523 } 00524 00525 // Conditionally include other QoS inline when the differ from the 00526 // default value. 00527 DDS::PublisherQos default_pub_qos = 00528 TheServiceParticipant->initial_PublisherQos(); 00529 PROCESS_INLINE_QOS(presentation, default_pub_qos, qos_data.pub_qos); 00530 PROCESS_INLINE_QOS(partition, default_pub_qos, qos_data.pub_qos); 00531 00532 DDS::DataWriterQos default_dw_qos = 00533 TheServiceParticipant->initial_DataWriterQos(); 00534 PROCESS_INLINE_QOS(durability, default_dw_qos, qos_data.dw_qos); 00535 PROCESS_INLINE_QOS(deadline, default_dw_qos, qos_data.dw_qos); 00536 PROCESS_INLINE_QOS(latency_budget, default_dw_qos, qos_data.dw_qos); 00537 PROCESS_INLINE_QOS(ownership, default_dw_qos, qos_data.dw_qos); 00538 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00539 PROCESS_INLINE_QOS(ownership_strength, default_dw_qos, qos_data.dw_qos); 00540 #endif 00541 PROCESS_INLINE_QOS(liveliness, default_dw_qos, qos_data.dw_qos); 00542 PROCESS_INLINE_QOS(reliability, default_dw_qos, qos_data.dw_qos); 00543 PROCESS_INLINE_QOS(transport_priority, default_dw_qos, qos_data.dw_qos); 00544 PROCESS_INLINE_QOS(lifespan, default_dw_qos, qos_data.dw_qos); 00545 PROCESS_INLINE_QOS(destination_order, default_dw_qos, qos_data.dw_qos); 00546 }
void OpenDDS::DCPS::RtpsSampleHeader::process_iqos | ( | DataSampleHeader & | opendds, | |
const OpenDDS::RTPS::ParameterList & | iqos | |||
) | [static, private] |
Definition at line 163 of file RtpsSampleHeader.cpp.
References DDS::PresentationQosPolicy::access_scope, DDS::PresentationQosPolicy::coherent_access, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::DataSampleHeader::message_id_, DDS::PartitionQosPolicy::name, OPENDDS_STRING, DDS::PresentationQosPolicy::ordered_access, OpenDDS::RTPS::PID_ORIGINAL_WRITER_INFO, OpenDDS::RTPS::PID_PARTITION, OpenDDS::RTPS::PID_PRESENTATION, OpenDDS::RTPS::PID_STATUS_INFO, OpenDDS::RTPS::PID_TOPIC_NAME, STATUS_INFO_DISPOSE, STATUS_INFO_DISPOSE_UNREGISTER, STATUS_INFO_REGISTER, STATUS_INFO_UNREGISTER, OpenDDS::DCPS::to_dds_string(), and OpenDDS::DCPS::UNREGISTER_INSTANCE.
Referenced by into_received_data_sample().
00165 { 00166 using namespace OpenDDS::RTPS; 00167 #if defined(OPENDDS_TEST_INLINE_QOS) 00168 OPENDDS_STRING output("into_received_data_sample(): "); 00169 output += to_dds_string(iqos.length()); 00170 output += " inline QoS parameters\n"; 00171 for (CORBA::ULong index = 0; index < iqos.length(); ++index) { 00172 output += " parameter type = "; 00173 output += to_dds_string(iqos[index]._d()); 00174 output += "\n"; 00175 } 00176 ACE_DEBUG((LM_DEBUG, "%C", output.c_str())); 00177 #endif 00178 for (CORBA::ULong i = 0; i < iqos.length(); ++i) { 00179 if (iqos[i]._d() == PID_STATUS_INFO) { 00180 if (iqos[i].status_info() == STATUS_INFO_DISPOSE) { 00181 opendds.message_id_ = DISPOSE_INSTANCE; 00182 } else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) { 00183 opendds.message_id_ = UNREGISTER_INSTANCE; 00184 } else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) { 00185 opendds.message_id_ = DISPOSE_UNREGISTER_INSTANCE; 00186 } else if (iqos[i].status_info() == STATUS_INFO_REGISTER) { 00187 opendds.message_id_ = INSTANCE_REGISTRATION; 00188 } 00189 } else if (iqos[i]._d() == PID_ORIGINAL_WRITER_INFO) { 00190 opendds.historic_sample_ = true; 00191 #if defined(OPENDDS_TEST_INLINE_QOS) 00192 } else if (iqos[i]._d() == PID_TOPIC_NAME) { 00193 ACE_DEBUG((LM_DEBUG, "topic_name = %C\n", iqos[i].string_data())); 00194 } else if (iqos[i]._d() == PID_PRESENTATION) { 00195 DDS::PresentationQosPolicy pres_qos = iqos[i].presentation(); 00196 ACE_DEBUG((LM_DEBUG, "presentation qos, access_scope = %d, " 00197 "coherent_access = %d, ordered_access = %d\n", 00198 pres_qos.access_scope, pres_qos.coherent_access, 00199 pres_qos.ordered_access)); 00200 } else if (iqos[i]._d() == PID_PARTITION) { 00201 DDS::PartitionQosPolicy part_qos = iqos[i].partition(); 00202 ACE_DEBUG((LM_DEBUG, "partition qos(%d): ", part_qos.name.length())); 00203 for (CORBA::ULong j = 0; j < part_qos.name.length(); j++) { 00204 ACE_DEBUG((LM_DEBUG, "'%C' ", part_qos.name[j].in())); 00205 } 00206 ACE_DEBUG((LM_DEBUG, "\n")); 00207 #endif 00208 } 00209 } 00210 }
SequenceRange OpenDDS::DCPS::RtpsSampleHeader::split | ( | const ACE_Message_Block & | orig, | |
size_t | size, | |||
ACE_Message_Block *& | head, | |||
ACE_Message_Block *& | tail | |||
) | [static] |
Create two new serialized headers (owned by caller), the "head" having at most "size" bytes (header + data) and the "tail" having the rest. Returns a pair containing the largest fragment number in each new header.
Definition at line 597 of file RtpsSampleHeader.cpp.
References OpenDDS::DCPS::DataSampleHeader::alloc_msgblock(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::DATA_FRAG_OCTETS_TO_IQOS, OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, FLAG_K_IN_DATA, FLAG_K_IN_FRAG, OpenDDS::RTPS::FLAG_Q, OpenDDS::DCPS::FRAG_SAMPLE_SIZE_OFFSET, FRAG_SIZE, OpenDDS::DCPS::FRAG_START_OFFSET, OpenDDS::DCPS::peek(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::DataSampleHeader::split_payload(), OpenDDS::DCPS::Transport_debug_level, and OpenDDS::DCPS::write().
Referenced by OpenDDS::DCPS::RtpsCustomizedElement::fragment().
00599 { 00600 using namespace RTPS; 00601 static const SequenceRange unknown_range(SequenceNumber::SEQUENCENUMBER_UNKNOWN(), 00602 SequenceNumber::SEQUENCENUMBER_UNKNOWN()); 00603 size_t data_offset = 0; 00604 const char* rd = orig.rd_ptr(); 00605 ACE_CDR::ULong starting_frag, sample_size; 00606 ACE_CDR::Octet flags; 00607 bool swap_bytes; 00608 00609 // Find the start of the DATA | DATA_FRAG submessage in the orig msg block. 00610 // The submessages from the start of the msg block to this point (data_offset) 00611 // will be copied to both the head and tail fragments. 00612 while (true) { 00613 flags = rd[data_offset + 1]; 00614 swap_bytes = ACE_CDR_BYTE_ORDER != bool(flags & FLAG_E); 00615 bool found_data = false; 00616 00617 switch (rd[data_offset]) { 00618 case DATA: 00619 if ((flags & (FLAG_D | FLAG_K_IN_DATA)) == 0) { 00620 if (Transport_debug_level) { 00621 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - " 00622 "attempting to fragment a Data submessage with no payload.\n")); 00623 } 00624 return unknown_range; 00625 } 00626 found_data = true; 00627 starting_frag = 1; 00628 sample_size = static_cast<ACE_CDR::ULong>(orig.cont()->total_length()); 00629 break; 00630 case DATA_FRAG: 00631 found_data = true; 00632 peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes); 00633 peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes); 00634 break; 00635 } 00636 00637 if (found_data) { 00638 break; 00639 } 00640 00641 // Scan for next submessage in orig 00642 ACE_CDR::UShort octetsToNextHeader; 00643 peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes); 00644 00645 data_offset += octetsToNextHeader + SMHDR_SZ; 00646 if (data_offset >= orig.length()) { 00647 if (Transport_debug_level) { 00648 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsSampleHeader::split() ERROR - " 00649 "invalid octetsToNextHeader encountered while fragmenting.\n")); 00650 } 00651 return unknown_range; 00652 } 00653 } 00654 00655 // Create the "head" message block (of size "sz") containing DATA_FRAG 00656 size_t sz = orig.length(); 00657 ACE_CDR::Octet new_flags = flags; 00658 size_t iqos_offset = data_offset + 8 + DATA_FRAG_OCTETS_TO_IQOS; 00659 if (rd[data_offset] == DATA) { 00660 sz += 12; // DATA_FRAG is 12 bytes larger than DATA 00661 iqos_offset -= 12; 00662 new_flags &= ~(FLAG_K_IN_DATA | FLAG_K_IN_FRAG); 00663 if (flags & FLAG_K_IN_DATA) { 00664 new_flags |= FLAG_K_IN_FRAG; 00665 } 00666 } 00667 head = DataSampleHeader::alloc_msgblock(orig, sz, false); 00668 00669 head->copy(rd, data_offset); 00670 00671 head->wr_ptr()[0] = DATA_FRAG; 00672 head->wr_ptr()[1] = new_flags; 00673 head->wr_ptr(2); 00674 00675 std::memset(head->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags 00676 head->wr_ptr(4); 00677 00678 write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes); 00679 00680 head->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum 00681 00682 write(head, starting_frag, swap_bytes); 00683 const size_t max_data = size - sz, orig_payload = orig.cont()->total_length(); 00684 const ACE_CDR::UShort frags = 00685 static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE); 00686 write(head, frags, swap_bytes); 00687 write(head, FRAG_SIZE, swap_bytes); 00688 write(head, sample_size, swap_bytes); 00689 00690 if (flags & FLAG_Q) { 00691 head->copy(rd + iqos_offset, orig.length() - iqos_offset); 00692 } 00693 00694 // Create the "tail" message block containing DATA_FRAG with Q=0 00695 tail = DataSampleHeader::alloc_msgblock(orig, data_offset + 36, false); 00696 00697 tail->copy(rd, data_offset); 00698 00699 tail->wr_ptr()[0] = DATA_FRAG; 00700 tail->wr_ptr()[1] = new_flags & ~FLAG_Q; 00701 tail->wr_ptr(2); 00702 00703 std::memset(tail->wr_ptr(), 0, 4); // octetsToNextHeader, extraFlags 00704 tail->wr_ptr(4); 00705 00706 write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes); 00707 tail->copy(rd + data_offset + 8, 16); // readerId, writerId, sequenceNum 00708 00709 write(tail, starting_frag + frags, swap_bytes); 00710 const size_t tail_data = orig_payload - frags * FRAG_SIZE; 00711 const ACE_CDR::UShort tail_frags = 00712 static_cast<ACE_CDR::UShort>((tail_data + FRAG_SIZE - 1) / FRAG_SIZE); 00713 write(tail, tail_frags, swap_bytes); 00714 write(tail, FRAG_SIZE, swap_bytes); 00715 write(tail, sample_size, swap_bytes); 00716 00717 ACE_Message_Block* payload_head = 0; 00718 ACE_Message_Block* payload_tail; 00719 DataSampleHeader::split_payload(*orig.cont(), frags * FRAG_SIZE, 00720 payload_head, payload_tail); 00721 head->cont(payload_head); 00722 tail->cont(payload_tail); 00723 00724 return SequenceRange(starting_frag + frags - 1, 00725 starting_frag + frags + tail_frags - 1); 00726 }
ACE_INLINE bool OpenDDS::DCPS::RtpsSampleHeader::valid | ( | ) | const |
Definition at line 42 of file RtpsSampleHeader.inl.
References valid_.
00043 { 00044 return valid_; 00045 }
bool OpenDDS::DCPS::RtpsSampleHeader::frag_ [private] |
Definition at line 70 of file RtpsSampleHeader.h.
Referenced by init(), more_fragments(), and operator=().
const ACE_CDR::UShort OpenDDS::DCPS::RtpsSampleHeader::FRAG_SIZE = 1024 [static] |
size_t OpenDDS::DCPS::RtpsSampleHeader::marshaled_size_ [private] |
size_t OpenDDS::DCPS::RtpsSampleHeader::message_length_ [private] |
Definition at line 71 of file RtpsSampleHeader.h.
Referenced by init(), message_length(), and pdu_remaining().
Definition at line 65 of file RtpsSampleHeader.h.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), init(), into_received_data_sample(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble().
bool OpenDDS::DCPS::RtpsSampleHeader::valid_ [private] |