#include <DataSampleHeader.h>
The header message of a data sample. This header and the data sample are in different message block and will be chained together.
Definition at line 75 of file DataSampleHeader.h.
anonymous enum |
Definition at line 76 of file DataSampleHeader.h.
00076 { 00077 MESSAGE_ID_OFFSET = 0, 00078 SUBMESSAGE_ID_OFFSET = 1, 00079 FLAGS_OFFSET = 2 // message_id_ + submessage_id_ 00080 };
ACE_INLINE OpenDDS::DCPS::DataSampleHeader::DataSampleHeader | ( | ) |
Definition at line 9 of file DataSampleHeader.inl.
00010 : message_id_(0) 00011 , submessage_id_(0) 00012 , byte_order_(ACE_CDR_BYTE_ORDER) 00013 , coherent_change_(0) 00014 , historic_sample_(0) 00015 , lifespan_duration_(0) 00016 , group_coherent_(0) 00017 , content_filter_(0) 00018 , sequence_repair_(0) 00019 , more_fragments_(0) 00020 , cdr_encapsulation_(0) 00021 , key_fields_only_(0) 00022 , reserved_1(0) 00023 , reserved_2(0) 00024 , reserved_3(0) 00025 , reserved_4(0) 00026 , reserved_5(0) 00027 , reserved_6(0) 00028 , message_length_(0) 00029 , sequence_() 00030 , source_timestamp_sec_(0) 00031 , source_timestamp_nanosec_(0) 00032 , lifespan_duration_sec_(0) 00033 , lifespan_duration_nanosec_(0) 00034 , publication_id_(GUID_UNKNOWN) 00035 , publisher_id_(GUID_UNKNOWN) 00036 , marshaled_size_(0) 00037 { 00038 }
ACE_INLINE OpenDDS::DCPS::DataSampleHeader::DataSampleHeader | ( | ACE_Message_Block & | buffer | ) | [explicit] |
Construct with values extracted from a buffer.
Definition at line 41 of file DataSampleHeader.inl.
References init().
00042 : message_id_(0) 00043 , submessage_id_(0) 00044 , byte_order_(ACE_CDR_BYTE_ORDER) 00045 , coherent_change_(0) 00046 , historic_sample_(0) 00047 , lifespan_duration_(0) 00048 , group_coherent_(0) 00049 , content_filter_(0) 00050 , sequence_repair_(0) 00051 , more_fragments_(0) 00052 , cdr_encapsulation_(0) 00053 , key_fields_only_(0) 00054 , reserved_1(0) 00055 , reserved_2(0) 00056 , reserved_3(0) 00057 , reserved_4(0) 00058 , reserved_5(0) 00059 , reserved_6(0) 00060 , message_length_(0) 00061 , sequence_() 00062 , source_timestamp_sec_(0) 00063 , source_timestamp_nanosec_(0) 00064 , lifespan_duration_sec_(0) 00065 , lifespan_duration_nanosec_(0) 00066 , publication_id_(GUID_UNKNOWN) 00067 , publisher_id_(GUID_UNKNOWN) 00068 { 00069 this->init(&buffer); 00070 }
void OpenDDS::DCPS::DataSampleHeader::add_cfentries | ( | const GUIDSeq * | guids, | |
ACE_Message_Block * | mb | |||
) | [static] |
Marshal the "guids" as an optional header chained as to the continuation of "mb" (which must already be a valid DataSampleHeader serialization). Any existing payload of "mb" (its continuation) will be chained after the new optional header part. "guids" may be null, same serialization as 0.
Definition at line 337 of file DataSampleHeader.cpp.
References alloc_msgblock(), OpenDDS::DCPS::BYTE_ORDER_FLAG, ACE_Message_Block::cont(), OpenDDS::DCPS::gen_find_size(), size, and test_flag().
Referenced by OpenDDS::DCPS::DataLinkSet::send(), and split().
00338 { 00339 size_t size = 0; 00340 if (guids) { 00341 size_t padding = 0; // GUIDs are always aligned 00342 gen_find_size(*guids, size, padding); 00343 } else { 00344 size = sizeof(CORBA::ULong); 00345 } 00346 ACE_Message_Block* optHdr = alloc_msgblock(*mb, size, false); 00347 00348 const bool swap = (ACE_CDR_BYTE_ORDER != test_flag(BYTE_ORDER_FLAG, mb)); 00349 Serializer ser(optHdr, swap); 00350 if (guids) { 00351 ser << *guids; 00352 } else { 00353 ser << CORBA::ULong(0); 00354 } 00355 00356 // New chain: mb (DataSampleHeader), optHdr (GUIDSeq), data (Foo or control) 00357 optHdr->cont(mb->cont()); 00358 mb->cont(optHdr); 00359 }
ACE_Message_Block * OpenDDS::DCPS::DataSampleHeader::alloc_msgblock | ( | const ACE_Message_Block & | mb, | |
size_t | size, | |||
bool | use_data_alloc | |||
) | [static] |
Definition at line 80 of file DataSampleHeader.cpp.
References ACE_Message_Block::access_allocators(), OpenDDS::RTPS::DATA, ACE_Message_Block::locking_strategy(), malloc(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, and ACE_Time_Value::zero.
Referenced by add_cfentries(), OpenDDS::DCPS::RtpsSampleHeader::split(), and split().
00082 { 00083 enum { DATA, DB, MB, N_ALLOC }; 00084 ACE_Allocator* allocators[N_ALLOC]; 00085 // It's an ACE bug that access_allocators isn't const 00086 ACE_Message_Block& mut_mb = const_cast<ACE_Message_Block&>(mb); 00087 mut_mb.access_allocators(allocators[DATA], allocators[DB], allocators[MB]); 00088 if (allocators[MB]) { 00089 ACE_Message_Block* result; 00090 ACE_NEW_MALLOC_RETURN(result, 00091 static_cast<ACE_Message_Block*>( 00092 allocators[MB]->malloc(sizeof(ACE_Message_Block))), 00093 ACE_Message_Block(size, 00094 ACE_Message_Block::MB_DATA, 00095 0, // cont 00096 0, // data 00097 use_data_alloc ? allocators[DATA] : 0, 00098 mut_mb.locking_strategy(), // locking_strategy 00099 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00100 ACE_Time_Value::zero, 00101 ACE_Time_Value::max_time, 00102 allocators[DB], 00103 allocators[MB]), 00104 0); 00105 return result; 00106 } else { 00107 return new ACE_Message_Block(size, 00108 ACE_Message_Block::MB_DATA, 00109 0, // cont 00110 0, // data 00111 use_data_alloc ? allocators[DATA] : 0, 00112 mut_mb.locking_strategy(), // locking_strategy 00113 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00114 ACE_Time_Value::zero, 00115 ACE_Time_Value::max_time, 00116 allocators[DB]); 00117 } 00118 }
ACE_INLINE void OpenDDS::DCPS::DataSampleHeader::clear_flag | ( | DataSampleHeaderFlag | flag, | |
ACE_Message_Block * | buffer | |||
) | [static] |
The clear_flag and set_flag methods are a hack to update the header flags after a sample has been serialized without deserializing the entire message. This method will break if the current Serializer behavior changes.
Definition at line 113 of file DataSampleHeader.inl.
References ACE_TEXT(), ACE_Message_Block::base(), ACE_Message_Block::end(), FLAGS_OFFSET, LM_ERROR, and mask_flag().
00115 { 00116 char* base = buffer->base(); 00117 00118 // verify sufficient length exists: 00119 if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) { 00120 ACE_ERROR((LM_ERROR, 00121 ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::clear_flag: ") 00122 ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n"))); 00123 return; 00124 } 00125 00126 base[FLAGS_OFFSET] &= ~mask_flag(flag); 00127 }
void OpenDDS::DCPS::DataSampleHeader::init | ( | ACE_Message_Block * | buffer | ) |
Implement load from buffer.
Definition at line 173 of file DataSampleHeader.cpp.
References byte_order_, OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::CDR_ENCAP_FLAG, cdr_encapsulation_, coherent_change_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, content_filter_, content_filter_entries_, OpenDDS::DCPS::CONTENT_FILTER_FLAG, OpenDDS::DCPS::gen_find_size(), group_coherent_, OpenDDS::DCPS::GROUP_COHERENT_FLAG, historic_sample_, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, key_fields_only_, OpenDDS::DCPS::KEY_ONLY_FLAG, lifespan_duration_, OpenDDS::DCPS::LIFESPAN_DURATION_FLAG, lifespan_duration_nanosec_, lifespan_duration_sec_, marshaled_size_, mask_flag(), message_id_, message_length_, more_fragments_, OpenDDS::DCPS::MORE_FRAGMENTS_FLAG, publication_id_, publisher_id_, sequence_, sequence_repair_, OpenDDS::DCPS::SEQUENCE_REPAIR_FLAG, source_timestamp_nanosec_, source_timestamp_sec_, submessage_id_, and OpenDDS::DCPS::Serializer::swap_bytes().
Referenced by DataSampleHeader(), and operator=().
00174 { 00175 this->marshaled_size_ = 0; 00176 00177 Serializer reader(buffer); 00178 00179 // Only byte-sized reads until we get the byte_order_ flag. 00180 00181 if (!(reader >> this->message_id_)) { 00182 return; 00183 } 00184 00185 this->marshaled_size_ += sizeof(this->message_id_); 00186 00187 if (!(reader >> this->submessage_id_)) { 00188 return; 00189 } 00190 00191 this->marshaled_size_ += sizeof(this->submessage_id_); 00192 00193 // Extract the flag values. 00194 ACE_CDR::Octet byte; 00195 if (!(reader >> ACE_InputCDR::to_octet(byte))) { 00196 return; 00197 } 00198 00199 this->marshaled_size_ += sizeof(byte); 00200 00201 this->byte_order_ = byte & mask_flag(BYTE_ORDER_FLAG); 00202 this->coherent_change_ = byte & mask_flag(COHERENT_CHANGE_FLAG); 00203 this->historic_sample_ = byte & mask_flag(HISTORIC_SAMPLE_FLAG); 00204 this->lifespan_duration_ = byte & mask_flag(LIFESPAN_DURATION_FLAG); 00205 this->group_coherent_ = byte & mask_flag(GROUP_COHERENT_FLAG); 00206 this->content_filter_ = byte & mask_flag(CONTENT_FILTER_FLAG); 00207 this->sequence_repair_ = byte & mask_flag(SEQUENCE_REPAIR_FLAG); 00208 this->more_fragments_ = byte & mask_flag(MORE_FRAGMENTS_FLAG); 00209 00210 // Set swap_bytes flag to the Serializer if data sample from 00211 // the publisher is in different byte order. 00212 reader.swap_bytes(this->byte_order_ != ACE_CDR_BYTE_ORDER); 00213 00214 if (!(reader >> ACE_InputCDR::to_octet(byte))) { 00215 return; 00216 } 00217 00218 this->marshaled_size_ += sizeof(byte); 00219 00220 this->cdr_encapsulation_ = byte & mask_flag(CDR_ENCAP_FLAG); 00221 this->key_fields_only_ = byte & mask_flag(KEY_ONLY_FLAG); 00222 00223 if (!(reader >> this->message_length_)) { 00224 return; 00225 } 00226 00227 this->marshaled_size_ += sizeof(this->message_length_); 00228 00229 if (!(reader >> this->sequence_)) { 00230 return; 00231 } 00232 00233 size_t padding = 0; 00234 gen_find_size(this->sequence_, this->marshaled_size_, padding); 00235 00236 if (!(reader >> this->source_timestamp_sec_)) { 00237 return; 00238 } 00239 00240 this->marshaled_size_ += sizeof(this->source_timestamp_sec_); 00241 00242 if (!(reader >> this->source_timestamp_nanosec_)) { 00243 return; 00244 } 00245 00246 this->marshaled_size_ += sizeof(this->source_timestamp_nanosec_); 00247 00248 if (this->lifespan_duration_) { 00249 if (!(reader >> this->lifespan_duration_sec_)) { 00250 return; 00251 } 00252 00253 this->marshaled_size_ += sizeof(this->lifespan_duration_sec_); 00254 00255 if (!(reader >> this->lifespan_duration_nanosec_)) { 00256 return; 00257 } 00258 00259 this->marshaled_size_ += sizeof(this->lifespan_duration_nanosec_); 00260 } 00261 00262 if (!(reader >> this->publication_id_)) { 00263 return; 00264 } 00265 00266 gen_find_size(this->publication_id_, this->marshaled_size_, padding); 00267 00268 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00269 if (this->group_coherent_) { 00270 if (!(reader >> this->publisher_id_)) { 00271 return; 00272 } 00273 gen_find_size(this->publisher_id_, this->marshaled_size_, padding); 00274 } 00275 #endif 00276 00277 if (this->content_filter_) { 00278 if (!(reader >> this->content_filter_entries_)) { 00279 return; 00280 } 00281 gen_find_size(this->content_filter_entries_, this->marshaled_size_, padding); 00282 } 00283 }
bool OpenDDS::DCPS::DataSampleHeader::into_received_data_sample | ( | ReceivedDataSample & | rds | ) |
Definition at line 743 of file DataSampleHeader.cpp.
References OpenDDS::DCPS::ReceivedDataSample::header_.
bool OpenDDS::DCPS::DataSampleHeader::join | ( | const DataSampleHeader & | first, | |
const DataSampleHeader & | second, | |||
DataSampleHeader & | result | |||
) | [static] |
If "first" and "second" are two fragments of the same original message (as created by split()), return true and set up the "result" header to match the original header. Joining the data payload is the responsibility of the caller (manipulate the continuation chain).
Definition at line 459 of file DataSampleHeader.cpp.
References content_filter_, content_filter_entries_, message_length_, more_fragments_, and sequence_.
Referenced by OpenDDS::DCPS::TransportReassembly::insert().
00461 { 00462 if (!first.more_fragments_ || first.sequence_ != second.sequence_) { 00463 return false; 00464 } 00465 result = second; 00466 result.message_length_ += first.message_length_; 00467 if (first.content_filter_) { 00468 result.content_filter_ = true; 00469 const CORBA::ULong entries = first.content_filter_entries_.length(); 00470 CORBA::ULong x = result.content_filter_entries_.length(); 00471 result.content_filter_entries_.length(x + entries); 00472 for (CORBA::ULong i(entries); i > 0;) { 00473 result.content_filter_entries_[x++] = first.content_filter_entries_[--i]; 00474 } 00475 } 00476 return true; 00477 }
ACE_INLINE size_t OpenDDS::DCPS::DataSampleHeader::marshaled_size | ( | ) | const |
Amount of data read when initializing from a buffer.
Definition at line 82 of file DataSampleHeader.inl.
References marshaled_size_.
00083 { 00084 return marshaled_size_; 00085 }
static ACE_UINT8 OpenDDS::DCPS::DataSampleHeader::mask_flag | ( | DataSampleHeaderFlag2 | flag | ) | [inline, static] |
Definition at line 184 of file DataSampleHeader.h.
static ACE_UINT8 OpenDDS::DCPS::DataSampleHeader::mask_flag | ( | DataSampleHeaderFlag | flag | ) | [inline, static] |
Definition at line 183 of file DataSampleHeader.h.
Referenced by clear_flag(), init(), partial(), set_flag(), and test_flag().
ACE_INLINE size_t OpenDDS::DCPS::DataSampleHeader::max_marshaled_size | ( | ) | [static] |
Similar to IDL compiler generated methods.
Definition at line 89 of file DataSampleHeader.inl.
Referenced by OpenDDS::DCPS::DataLink::create_control(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::UdpDataLink::open(), partial(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::TransportSendStrategy::send(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), and split().
00090 { 00091 return 1 + // message_id_; 00092 1 + // submessage_id_; 00093 2 + // flags 00094 4 + // message_length_; 00095 8 + // sequence_; 00096 4 + // source_timestamp_sec_; 00097 4 + // source_timestamp_nanosec_; 00098 4 + // lifespan_duration_sec_; 00099 4 + // lifespan_duration_nanosec_; 00100 16 + // publication_id_; 00101 16 ; // publisher_id_; 00102 // content_filter_entries_ is not marsahled into the same Data Block 00103 // so it is not part of the max_marshaled_size() which is used to allocate 00104 }
ACE_UINT32 OpenDDS::DCPS::DataSampleHeader::message_length | ( | void | ) | const [inline] |
Definition at line 235 of file DataSampleHeader.h.
00235 { return this->message_length_; }
bool OpenDDS::DCPS::DataSampleHeader::more_fragments | ( | void | ) | const [inline] |
Definition at line 237 of file DataSampleHeader.h.
00237 { return this->more_fragments_; }
ACE_INLINE OpenDDS::DCPS::DataSampleHeader & OpenDDS::DCPS::DataSampleHeader::operator= | ( | ACE_Message_Block & | buffer | ) |
Assignment from an ACE_Message_Block.
Definition at line 74 of file DataSampleHeader.inl.
References init().
00075 { 00076 this->init(&buffer); 00077 return *this; 00078 }
bool OpenDDS::DCPS::DataSampleHeader::partial | ( | const ACE_Message_Block & | mb | ) | [static] |
Does the data in this mb constitute a partial Sample Header?
Definition at line 120 of file DataSampleHeader.cpp.
References OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::CONTENT_FILTER_FLAG, FLAGS_OFFSET, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::GROUP_COHERENT_FLAG, len, OpenDDS::DCPS::LIFESPAN_DURATION_FLAG, mask_flag(), max_marshaled_size(), OpenDDS::DCPS::MESSAGE_ID_MAX, MESSAGE_ID_OFFSET, OpenDDS::DCPS::SUBMESSAGE_ID_MAX, SUBMESSAGE_ID_OFFSET, and ACE_Message_Block::total_length().
00121 { 00122 static const unsigned int LIFESPAN_MASK = mask_flag(LIFESPAN_DURATION_FLAG), 00123 LIFESPAN_LENGTH = 8, 00124 COHERENT_MASK = mask_flag(GROUP_COHERENT_FLAG), 00125 COHERENT_LENGTH = 16, 00126 CONTENT_FILT_MASK = mask_flag(CONTENT_FILTER_FLAG), 00127 BYTE_ORDER_MASK = mask_flag(BYTE_ORDER_FLAG); 00128 00129 const size_t len = mb.total_length(); 00130 00131 if (len <= FLAGS_OFFSET) return true; 00132 00133 unsigned char msg_id; 00134 if (!mb_peek(msg_id, mb, MESSAGE_ID_OFFSET, 00135 false /*swap ignored for char*/) 00136 || int(msg_id) >= MESSAGE_ID_MAX) { 00137 // This check, and the similar one below for submessage id, are actually 00138 // indicating an invalid header (and not a partial header) but we can 00139 // treat it the same as partial for the sake of the TransportRecvStrategy. 00140 return true; 00141 } 00142 00143 if (!mb_peek(msg_id, mb, SUBMESSAGE_ID_OFFSET, 00144 false /*swap ignored for char*/) 00145 || int(msg_id) >= SUBMESSAGE_ID_MAX) { 00146 return true; 00147 } 00148 00149 char flags; 00150 if (!mb_peek(flags, mb, FLAGS_OFFSET, false /*swap ignored for char*/)) { 00151 return true; 00152 } 00153 00154 size_t expected = max_marshaled_size(); 00155 if (!(flags & LIFESPAN_MASK)) expected -= LIFESPAN_LENGTH; 00156 if (!(flags & COHERENT_MASK)) expected -= COHERENT_LENGTH; 00157 00158 if (flags & CONTENT_FILT_MASK) { 00159 CORBA::ULong seqLen; 00160 const bool swap = (flags & BYTE_ORDER_MASK) != ACE_CDR_BYTE_ORDER; 00161 if (!mb_peek(seqLen, mb, expected, swap)) { 00162 return true; 00163 } 00164 size_t guidsize = 0, padding = 0; 00165 gen_find_size(GUID_t(), guidsize, padding); 00166 expected += sizeof(seqLen) + guidsize * seqLen; 00167 } 00168 00169 return len < expected; 00170 }
void OpenDDS::DCPS::DataSampleHeader::pdu_remaining | ( | size_t | ) | [inline] |
Definition at line 239 of file DataSampleHeader.h.
ACE_INLINE void OpenDDS::DCPS::DataSampleHeader::set_flag | ( | DataSampleHeaderFlag | flag, | |
ACE_Message_Block * | buffer | |||
) | [static] |
Definition at line 131 of file DataSampleHeader.inl.
References ACE_TEXT(), ACE_Message_Block::base(), ACE_Message_Block::end(), FLAGS_OFFSET, LM_ERROR, and mask_flag().
Referenced by OpenDDS::DCPS::WriteDataContainer::data_delivered().
00133 { 00134 char* base = buffer->base(); 00135 00136 // verify sufficient length exists: 00137 if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) { 00138 ACE_ERROR((LM_ERROR, 00139 ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::set_flag: ") 00140 ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n"))); 00141 return; 00142 } 00143 00144 base[FLAGS_OFFSET] |= mask_flag(flag); 00145 }
void OpenDDS::DCPS::DataSampleHeader::split | ( | const ACE_Message_Block & | orig, | |
size_t | size, | |||
Message_Block_Ptr & | head, | |||
Message_Block_Ptr & | tail | |||
) | [static] |
Create two new serialized headers (owned by caller), the "head" having at most "size" bytes (header + data) and the "tail" having the rest.
Definition at line 388 of file DataSampleHeader.cpp.
References add_cfentries(), alloc_msgblock(), ACE_Message_Block::cont(), dup(), ACE_Message_Block::duplicate(), OpenDDS::DCPS::gen_max_marshaled_size(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), ACE_Message_Block::length(), max_marshaled_size(), message_length_, more_fragments_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), split_payload(), and ACE_Message_Block::total_length().
Referenced by OpenDDS::DCPS::TransportQueueElement::fragment().
00390 { 00391 Message_Block_Ptr dup (orig.duplicate()); 00392 00393 const size_t length = dup->total_length(); 00394 DataSampleHeader hdr(*dup); // deserialize entire header (with cfentries) 00395 const size_t hdr_len = length - dup->total_length(); 00396 00397 ACE_Message_Block* payload = dup.get(); 00398 //skip zero length message blocks 00399 ACE_Message_Block* prev = 0; 00400 for (; payload->length() == 0; payload = payload->cont()) { 00401 prev = payload; 00402 } 00403 prev->cont(0); 00404 Message_Block_Ptr payload_head(payload); 00405 00406 if (size < hdr_len) { // need to fragment the content_filter_entries_ 00407 head.reset(alloc_msgblock(*dup, max_marshaled_size(), true)); 00408 hdr.more_fragments_ = true; 00409 hdr.message_length_ = 0; // no room for payload data 00410 *head << hdr; 00411 const size_t avail = size - head->length() - 4 /* sequence length */; 00412 const CORBA::ULong n_entries = 00413 static_cast<CORBA::ULong>(avail / gen_max_marshaled_size(GUID_t())); 00414 GUIDSeq entries(n_entries); 00415 entries.length(n_entries); 00416 // remove from the end of hdr's entries (order doesn't matter) 00417 for (CORBA::ULong i(0), x(hdr.content_filter_entries_.length()); 00418 i < n_entries; ++i) { 00419 entries[i] = hdr.content_filter_entries_[--x]; 00420 hdr.content_filter_entries_.length(x); 00421 } 00422 add_cfentries(&entries, head.get()); 00423 00424 tail.reset(alloc_msgblock(*dup, max_marshaled_size(), true)); 00425 hdr.more_fragments_ = false; 00426 hdr.content_filter_ = (hdr.content_filter_entries_.length() > 0); 00427 hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length()); 00428 *tail << hdr; 00429 tail->cont(payload); 00430 if (hdr.content_filter_) { 00431 add_cfentries(&hdr.content_filter_entries_, tail.get()); 00432 } 00433 return; 00434 } 00435 00436 Message_Block_Ptr payload_tail; 00437 split_payload(*payload, size - hdr_len, payload_head, payload_tail); 00438 00439 hdr.more_fragments_ = true; 00440 hdr.message_length_ = static_cast<ACE_UINT32>(payload_head->total_length()); 00441 00442 head.reset(alloc_msgblock(*dup, max_marshaled_size(), true)); 00443 *head << hdr; 00444 head->cont(payload_head.release()); 00445 if (hdr.content_filter_) { 00446 add_cfentries(&hdr.content_filter_entries_, head.get()); 00447 } 00448 00449 hdr.more_fragments_ = false; 00450 hdr.content_filter_ = false; 00451 hdr.message_length_ = static_cast<ACE_UINT32>(payload_tail->total_length()); 00452 00453 tail.reset(alloc_msgblock(*dup, max_marshaled_size(), true)); 00454 *tail << hdr; 00455 tail->cont(payload_tail.release()); 00456 }
void OpenDDS::DCPS::DataSampleHeader::split_payload | ( | const ACE_Message_Block & | orig, | |
size_t | size, | |||
Message_Block_Ptr & | head, | |||
Message_Block_Ptr & | tail | |||
) | [static] |
Definition at line 362 of file DataSampleHeader.cpp.
References ACE_Message_Block::cont(), ACE_Message_Block::duplicate(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), ACE_Message_Block::length(), ACE_Message_Block::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), and ACE_Message_Block::wr_ptr().
Referenced by OpenDDS::DCPS::RtpsSampleHeader::split(), and split().
00365 { 00366 if (!head) { 00367 head.reset(orig.duplicate()); 00368 } 00369 00370 ACE_Message_Block* frag = head.get(); 00371 size_t frag_remain = size; 00372 for (; frag_remain > frag->length(); frag = frag->cont()) { 00373 frag_remain -= frag->length(); 00374 } 00375 00376 if (frag_remain == frag->length()) { // split at ACE_Message_Block boundary 00377 tail.reset(frag->cont()); 00378 } else { 00379 tail.reset(frag->duplicate()); 00380 frag->wr_ptr(frag->wr_ptr() - frag->length() + frag_remain); 00381 ACE_Message_Block::release(frag->cont()); 00382 tail->rd_ptr(frag_remain); 00383 } 00384 frag->cont(0); 00385 }
ACE_INLINE bool OpenDDS::DCPS::DataSampleHeader::test_flag | ( | DataSampleHeaderFlag | flag, | |
const ACE_Message_Block * | buffer | |||
) | [static] |
Definition at line 149 of file DataSampleHeader.inl.
References ACE_TEXT(), ACE_Message_Block::base(), ACE_Message_Block::end(), FLAGS_OFFSET, LM_ERROR, and mask_flag().
Referenced by add_cfentries(), OpenDDS::DCPS::SingleSendBuffer::insert(), OpenDDS::DCPS::DataDurabilityCache::insert(), OpenDDS::DCPS::WriteDataContainer::remove_excess_durable(), and OpenDDS::DCPS::DataLinkSet::send().
00151 { 00152 char* base = buffer->base(); 00153 00154 // verify sufficient length exists: 00155 if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) { 00156 ACE_ERROR_RETURN((LM_ERROR, 00157 ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::set_flag: ") 00158 ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n")), false); 00159 } 00160 00161 // Test flag bit. 00162 return base[FLAGS_OFFSET] & mask_flag(flag); 00163 }
0 - Message encoded using big-endian byte order. (see ace/CDR_Base.h) 1 - Message encoded using little-endian byte order.
Definition at line 90 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataLink::create_control(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::DataWriterImpl::filter_out(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), and OpenDDS::DCPS::ReplayerImpl::write().
The data payload uses CDR encapsulation and alignment rules, as defined by the RTPS specification formal/2010-11-01.
Definition at line 125 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::DataWriterImpl::filter_out(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance().
The flag indicates the sample belongs to a coherent change set (i.e. PRESENTATION coherent_access == true).
Definition at line 94 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::accept_sample_processing(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), and OpenDDS::DCPS::DataReaderImpl::writer_activity().
The publishing side has applied content filtering, and the optional content_filter_entries_ field is present in the marshaled header.
Definition at line 108 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataLink::data_received_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), init(), and join().
Optional field present if the content_filter_ flag bit is set. Indicates which readers should not receive the data.
Definition at line 181 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataLink::data_received_i(), init(), and join().
Definition at line 104 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::accept_sample_processing(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), and init().
This flag indicates a sample has been resent from a non-VOLATILE DataWriter.
Definition at line 98 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::DataReaderImpl::deliver_historic(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), init(), and OpenDDS::DCPS::RtpsSampleHeader::process_iqos().
Only the key fields of the data sample are present in the payload.
Definition at line 128 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble().
This flag indicates the sample header contains non-default LIFESPAN duration fields.
Definition at line 102 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), and init().
Definition at line 168 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), and init().
The LIFESPAN duration field is generated from the DataWriter or supplied by the application at the time of the write. This field is used to determine if a given sample is considered 'stale' and should be discarded by associated DataReader. These fields are optional and are controlled by the lifespan_duration_ flag.
Definition at line 167 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), and init().
Keep track of the amount of data read from a buffer.
Definition at line 249 of file DataSampleHeader.h.
Referenced by init(), and marshaled_size().
The enum MessageId.
Definition at line 83 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::MulticastDataLink::check_header(), OpenDDS::DCPS::DataLink::create_control(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::RtpsSampleHeader::process_iqos(), OpenDDS::DCPS::WriteDataContainer::release_buffer(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::MulticastDataLink::sample_received(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), OpenDDS::DCPS::TransportClient::send_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data(), and OpenDDS::DCPS::DataReaderImpl::writer_activity().
The size of the data sample (without header). After this header is demarshaled, the transport expects to see this many bytes in the stream before the start of the next header (or end of the Transport PDU).
Definition at line 140 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataLink::create_control(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::RtpsUdpDataLink::end_historic_samples(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), and split().
The current "Data Sample" needs reassembly before further processing.
Definition at line 116 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::TransportReassembly::data_unavailable(), init(), OpenDDS::DCPS::TransportReassembly::insert(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), and split().
Identify the DataWriter that produced the sample data being sent.
Definition at line 173 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::accept_sample_processing(), OpenDDS::DCPS::TcpDataLink::ack_received(), OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataLink::data_received_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::finish_store_instance_data(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::ReliableSession::ready_to_deliver(), OpenDDS::DCPS::TransportReassembly::reassemble_i(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::DataLink::send_control(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data(), OpenDDS::DCPS::ReplayerImpl::write(), and OpenDDS::DCPS::DataReaderImpl::writer_activity().
Id representing the coherent group. Optional field that's only present if the flag for group_coherent_ is set.
Definition at line 177 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::accept_sample_processing(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), and OpenDDS::DCPS::TcpDataLink::request_ack_received().
Definition at line 130 of file DataSampleHeader.h.
Definition at line 131 of file DataSampleHeader.h.
Definition at line 132 of file DataSampleHeader.h.
Definition at line 133 of file DataSampleHeader.h.
Definition at line 134 of file DataSampleHeader.h.
Definition at line 135 of file DataSampleHeader.h.
The sequence number is obtained from the Publisher associated with the DataWriter based on the PRESENTATION requirement for the sequence value (access_scope == GROUP).
Definition at line 145 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::TcpDataLink::ack_received(), OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::TransportReassembly::data_unavailable(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::finish_store_instance_data(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::ReliableSession::ready_to_deliver(), OpenDDS::DCPS::TransportReassembly::reassemble_i(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::TransportSendElement::sequence(), OpenDDS::DCPS::TransportSendControlElement::sequence(), and OpenDDS::DCPS::DataReaderImpl::writer_activity().
Due to content filtering, a gap in the sequence numbers may be an expected condition. If this bit is set, assume prior sequence numbers were filtered-out and are not missing.
Definition at line 113 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), and init().
Definition at line 157 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), init(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::ReceivedDataElement::ReceivedDataElement(), OpenDDS::DCPS::resend_data_expired(), and OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type().
The SOURCE_TIMESTAMP field is generated from the DataWriter or supplied by the application at the time of the write. This value is derived from the local hosts system clock, which is assumed to be synchronized with the clocks on other hosts within the domain. This field is required for DESTINATION_ORDER and LIFESPAN policy behaviors of subscriptions. It is also required to be present for all data in the SampleInfo structure supplied along with each data sample.
Definition at line 156 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), init(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::ReceivedDataElement::ReceivedDataElement(), OpenDDS::DCPS::resend_data_expired(), and OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type().
Implementation-specific sub-message Ids.
Definition at line 86 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataLink::create_control(), init(), and OpenDDS::DCPS::MulticastDataLink::sample_received().