#include <DataSampleHeader.h>
Collaboration diagram for OpenDDS::DCPS::DataSampleHeader:
Public Types | |
MESSAGE_ID_OFFSET = 0 | |
SUBMESSAGE_ID_OFFSET = 1 | |
FLAGS_OFFSET = 2 | |
enum | { MESSAGE_ID_OFFSET = 0, SUBMESSAGE_ID_OFFSET = 1, FLAGS_OFFSET = 2 } |
Public Member Functions | |
DataSampleHeader () | |
DataSampleHeader (ACE_Message_Block &buffer) | |
Construct with values extracted from a buffer. | |
DataSampleHeader & | operator= (ACE_Message_Block &buffer) |
Assignment from an ACE_Message_Block. | |
size_t | marshaled_size () const |
Amount of data read when initializing from a buffer. | |
void | init (ACE_Message_Block *buffer) |
Implement load from buffer. | |
bool | into_received_data_sample (ReceivedDataSample &rds) |
ACE_UINT32 | message_length () const |
bool | more_fragments () const |
void | pdu_remaining (size_t) |
Static Public Member Functions | |
static ACE_UINT8 | mask_flag (DataSampleHeaderFlag flag) |
static ACE_UINT8 | mask_flag (DataSampleHeaderFlag2 flag) |
static void | clear_flag (DataSampleHeaderFlag flag, ACE_Message_Block *buffer) |
static void | set_flag (DataSampleHeaderFlag flag, ACE_Message_Block *buffer) |
static bool | test_flag (DataSampleHeaderFlag flag, const ACE_Message_Block *buffer) |
static bool | partial (const ACE_Message_Block &mb) |
Does the data in this mb constitute a partial Sample Header? | |
static void | add_cfentries (const GUIDSeq *guids, ACE_Message_Block *mb) |
static void | split (const ACE_Message_Block &orig, size_t size, ACE_Message_Block *&head, ACE_Message_Block *&tail) |
static bool | join (const DataSampleHeader &first, const DataSampleHeader &second, DataSampleHeader &result) |
static size_t | max_marshaled_size () |
Similar to IDL compiler generated methods. | |
static ACE_Message_Block * | alloc_msgblock (const ACE_Message_Block &mb, size_t size, bool use_data_alloc) |
static void | split_payload (const ACE_Message_Block &orig, size_t size, ACE_Message_Block *&head, ACE_Message_Block *&tail) |
Public Attributes | |
char | message_id_ |
The enum MessageId. | |
char | submessage_id_ |
Implementation-specific sub-message Ids. | |
bool | byte_order_: 1 |
bool | coherent_change_: 1 |
bool | historic_sample_: 1 |
bool | lifespan_duration_: 1 |
bool | group_coherent_: 1 |
bool | content_filter_: 1 |
bool | sequence_repair_: 1 |
bool | more_fragments_: 1 |
The current "Data Sample" needs reassembly before further processing. | |
bool | cdr_encapsulation_: 1 |
bool | key_fields_only_: 1 |
Only the key fields of the data sample are present in the payload. | |
bool | reserved_1: 1 |
bool | reserved_2: 1 |
bool | reserved_3: 1 |
bool | reserved_4: 1 |
bool | reserved_5: 1 |
bool | reserved_6: 1 |
ACE_UINT32 | message_length_ |
SequenceNumber | sequence_ |
ACE_INT32 | source_timestamp_sec_ |
ACE_UINT32 | source_timestamp_nanosec_ |
ACE_INT32 | lifespan_duration_sec_ |
ACE_UINT32 | lifespan_duration_nanosec_ |
PublicationId | publication_id_ |
RepoId | publisher_id_ |
GUIDSeq | content_filter_entries_ |
Private Attributes | |
size_t | marshaled_size_ |
Keep track of the amount of data read from a buffer. |
Definition at line 70 of file DataSampleHeader.h.
anonymous enum |
Definition at line 71 of file DataSampleHeader.h.
00071 { 00072 MESSAGE_ID_OFFSET = 0, 00073 SUBMESSAGE_ID_OFFSET = 1, 00074 FLAGS_OFFSET = 2 // message_id_ + submessage_id_ 00075 };
ACE_INLINE OpenDDS::DCPS::DataSampleHeader::DataSampleHeader | ( | ) |
Definition at line 9 of file DataSampleHeader.inl.
00010 : message_id_(0) 00011 , submessage_id_(0) 00012 , byte_order_(ACE_CDR_BYTE_ORDER) 00013 , coherent_change_(0) 00014 , historic_sample_(0) 00015 , lifespan_duration_(0) 00016 , group_coherent_(0) 00017 , content_filter_(0) 00018 , sequence_repair_(0) 00019 , more_fragments_(0) 00020 , cdr_encapsulation_(0) 00021 , key_fields_only_(0) 00022 , reserved_1(0) 00023 , reserved_2(0) 00024 , reserved_3(0) 00025 , reserved_4(0) 00026 , reserved_5(0) 00027 , reserved_6(0) 00028 , message_length_(0) 00029 , sequence_() 00030 , source_timestamp_sec_(0) 00031 , source_timestamp_nanosec_(0) 00032 , lifespan_duration_sec_(0) 00033 , lifespan_duration_nanosec_(0) 00034 , publication_id_(GUID_UNKNOWN) 00035 , publisher_id_(GUID_UNKNOWN) 00036 , marshaled_size_(0) 00037 { 00038 }
ACE_INLINE OpenDDS::DCPS::DataSampleHeader::DataSampleHeader | ( | ACE_Message_Block & | buffer | ) | [explicit] |
Construct with values extracted from a buffer.
Definition at line 41 of file DataSampleHeader.inl.
References init().
00042 : message_id_(0) 00043 , submessage_id_(0) 00044 , byte_order_(ACE_CDR_BYTE_ORDER) 00045 , coherent_change_(0) 00046 , historic_sample_(0) 00047 , lifespan_duration_(0) 00048 , group_coherent_(0) 00049 , content_filter_(0) 00050 , sequence_repair_(0) 00051 , more_fragments_(0) 00052 , cdr_encapsulation_(0) 00053 , key_fields_only_(0) 00054 , reserved_1(0) 00055 , reserved_2(0) 00056 , reserved_3(0) 00057 , reserved_4(0) 00058 , reserved_5(0) 00059 , reserved_6(0) 00060 , message_length_(0) 00061 , sequence_() 00062 , source_timestamp_sec_(0) 00063 , source_timestamp_nanosec_(0) 00064 , lifespan_duration_sec_(0) 00065 , lifespan_duration_nanosec_(0) 00066 , publication_id_(GUID_UNKNOWN) 00067 , publisher_id_(GUID_UNKNOWN) 00068 { 00069 this->init(&buffer); 00070 }
void OpenDDS::DCPS::DataSampleHeader::add_cfentries | ( | const GUIDSeq * | guids, | |
ACE_Message_Block * | mb | |||
) | [static] |
Marshal the "guids" as an optional header chained as to the continuation of "mb" (which must already be a valid DataSampleHeader serialization). Any existing payload of "mb" (its continuation) will be chained after the new optional header part. "guids" may be null, same serialization as 0.
Definition at line 318 of file DataSampleHeader.cpp.
References alloc_msgblock(), OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::swap(), and test_flag().
Referenced by OpenDDS::DCPS::DataLinkSet::send(), and split().
00319 { 00320 size_t size = 0; 00321 if (guids) { 00322 size_t padding = 0; // GUIDs are always aligned 00323 gen_find_size(*guids, size, padding); 00324 } else { 00325 size = sizeof(CORBA::ULong); 00326 } 00327 ACE_Message_Block* optHdr = alloc_msgblock(*mb, size, false); 00328 00329 const bool swap = (ACE_CDR_BYTE_ORDER != test_flag(BYTE_ORDER_FLAG, mb)); 00330 Serializer ser(optHdr, swap); 00331 if (guids) { 00332 ser << *guids; 00333 } else { 00334 ser << CORBA::ULong(0); 00335 } 00336 00337 // New chain: mb (DataSampleHeader), optHdr (GUIDSeq), data (Foo or control) 00338 optHdr->cont(mb->cont()); 00339 mb->cont(optHdr); 00340 }
ACE_Message_Block * OpenDDS::DCPS::DataSampleHeader::alloc_msgblock | ( | const ACE_Message_Block & | mb, | |
size_t | size, | |||
bool | use_data_alloc | |||
) | [static] |
Definition at line 74 of file DataSampleHeader.cpp.
References OpenDDS::RTPS::DATA.
Referenced by add_cfentries(), OpenDDS::DCPS::RtpsSampleHeader::split(), and split().
00076 { 00077 enum { DATA, DB, MB, N_ALLOC }; 00078 ACE_Allocator* allocators[N_ALLOC]; 00079 // It's an ACE bug that access_allocators isn't const 00080 ACE_Message_Block& mut_mb = const_cast<ACE_Message_Block&>(mb); 00081 mut_mb.access_allocators(allocators[DATA], allocators[DB], allocators[MB]); 00082 if (allocators[MB]) { 00083 ACE_Message_Block* result; 00084 ACE_NEW_MALLOC_RETURN(result, 00085 static_cast<ACE_Message_Block*>( 00086 allocators[MB]->malloc(sizeof(ACE_Message_Block))), 00087 ACE_Message_Block(size, 00088 ACE_Message_Block::MB_DATA, 00089 0, // cont 00090 0, // data 00091 use_data_alloc ? allocators[DATA] : 0, 00092 mut_mb.locking_strategy(), // locking_strategy 00093 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00094 ACE_Time_Value::zero, 00095 ACE_Time_Value::max_time, 00096 allocators[DB], 00097 allocators[MB]), 00098 0); 00099 return result; 00100 } else { 00101 return new ACE_Message_Block(size, 00102 ACE_Message_Block::MB_DATA, 00103 0, // cont 00104 0, // data 00105 use_data_alloc ? allocators[DATA] : 0, 00106 mut_mb.locking_strategy(), // locking_strategy 00107 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00108 ACE_Time_Value::zero, 00109 ACE_Time_Value::max_time, 00110 allocators[DB]); 00111 } 00112 }
ACE_INLINE void OpenDDS::DCPS::DataSampleHeader::clear_flag | ( | DataSampleHeaderFlag | flag, | |
ACE_Message_Block * | buffer | |||
) | [static] |
The clear_flag and set_flag methods are a hack to update the header flags after a sample has been serialized without deserializing the entire message. This method will break if the current Serializer behavior changes.
Definition at line 113 of file DataSampleHeader.inl.
References FLAGS_OFFSET, and mask_flag().
00115 { 00116 char* base = buffer->base(); 00117 00118 // verify sufficient length exists: 00119 if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) { 00120 ACE_ERROR((LM_ERROR, 00121 ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::clear_flag: ") 00122 ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n"))); 00123 return; 00124 } 00125 00126 base[FLAGS_OFFSET] &= ~mask_flag(flag); 00127 }
void OpenDDS::DCPS::DataSampleHeader::init | ( | ACE_Message_Block * | buffer | ) |
Implement load from buffer.
Definition at line 167 of file DataSampleHeader.cpp.
References byte_order_, OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::CDR_ENCAP_FLAG, cdr_encapsulation_, coherent_change_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, content_filter_, content_filter_entries_, OpenDDS::DCPS::CONTENT_FILTER_FLAG, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::Serializer::good_bit(), group_coherent_, OpenDDS::DCPS::GROUP_COHERENT_FLAG, historic_sample_, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, key_fields_only_, OpenDDS::DCPS::KEY_ONLY_FLAG, lifespan_duration_, OpenDDS::DCPS::LIFESPAN_DURATION_FLAG, lifespan_duration_nanosec_, lifespan_duration_sec_, marshaled_size_, mask_flag(), message_id_, message_length_, more_fragments_, OpenDDS::DCPS::MORE_FRAGMENTS_FLAG, publication_id_, publisher_id_, sequence_, sequence_repair_, OpenDDS::DCPS::SEQUENCE_REPAIR_FLAG, source_timestamp_nanosec_, source_timestamp_sec_, submessage_id_, and OpenDDS::DCPS::Serializer::swap_bytes().
Referenced by DataSampleHeader(), and operator=().
00168 { 00169 this->marshaled_size_ = 0; 00170 00171 Serializer reader(buffer); 00172 00173 // Only byte-sized reads until we get the byte_order_ flag. 00174 00175 reader >> this->message_id_; 00176 00177 if (!reader.good_bit()) return; 00178 this->marshaled_size_ += sizeof(this->message_id_); 00179 00180 reader >> this->submessage_id_; 00181 00182 if (!reader.good_bit()) return; 00183 this->marshaled_size_ += sizeof(this->submessage_id_); 00184 00185 // Extract the flag values. 00186 ACE_CDR::Octet byte; 00187 reader >> ACE_InputCDR::to_octet(byte); 00188 00189 if (!reader.good_bit()) return; 00190 this->marshaled_size_ += sizeof(byte); 00191 00192 this->byte_order_ = byte & mask_flag(BYTE_ORDER_FLAG); 00193 this->coherent_change_ = byte & mask_flag(COHERENT_CHANGE_FLAG); 00194 this->historic_sample_ = byte & mask_flag(HISTORIC_SAMPLE_FLAG); 00195 this->lifespan_duration_ = byte & mask_flag(LIFESPAN_DURATION_FLAG); 00196 this->group_coherent_ = byte & mask_flag(GROUP_COHERENT_FLAG); 00197 this->content_filter_ = byte & mask_flag(CONTENT_FILTER_FLAG); 00198 this->sequence_repair_ = byte & mask_flag(SEQUENCE_REPAIR_FLAG); 00199 this->more_fragments_ = byte & mask_flag(MORE_FRAGMENTS_FLAG); 00200 00201 // Set swap_bytes flag to the Serializer if data sample from 00202 // the publisher is in different byte order. 00203 reader.swap_bytes(this->byte_order_ != ACE_CDR_BYTE_ORDER); 00204 00205 reader >> ACE_InputCDR::to_octet(byte); 00206 00207 if (!reader.good_bit()) return; 00208 this->marshaled_size_ += sizeof(byte); 00209 00210 this->cdr_encapsulation_ = byte & mask_flag(CDR_ENCAP_FLAG); 00211 this->key_fields_only_ = byte & mask_flag(KEY_ONLY_FLAG); 00212 00213 reader >> this->message_length_; 00214 00215 if (!reader.good_bit()) return; 00216 this->marshaled_size_ += sizeof(this->message_length_); 00217 00218 reader >> this->sequence_; 00219 00220 if (!reader.good_bit()) return; 00221 size_t padding = 0; 00222 gen_find_size(this->sequence_, this->marshaled_size_, padding); 00223 00224 reader >> this->source_timestamp_sec_; 00225 00226 if (!reader.good_bit()) return; 00227 this->marshaled_size_ += sizeof(this->source_timestamp_sec_); 00228 00229 reader >> this->source_timestamp_nanosec_; 00230 00231 if (!reader.good_bit()) return; 00232 this->marshaled_size_ += sizeof(this->source_timestamp_nanosec_); 00233 00234 if (this->lifespan_duration_) { 00235 reader >> this->lifespan_duration_sec_; 00236 00237 if (!reader.good_bit()) return; 00238 this->marshaled_size_ += sizeof(this->lifespan_duration_sec_); 00239 00240 reader >> this->lifespan_duration_nanosec_; 00241 00242 if (!reader.good_bit()) return; 00243 this->marshaled_size_ += sizeof(this->lifespan_duration_nanosec_); 00244 } 00245 00246 reader >> this->publication_id_; 00247 00248 if (!reader.good_bit()) return; 00249 gen_find_size(this->publication_id_, this->marshaled_size_, padding); 00250 00251 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00252 if (this->group_coherent_) { 00253 reader >> this->publisher_id_; 00254 if (!reader.good_bit()) return; 00255 gen_find_size(this->publisher_id_, this->marshaled_size_, padding); 00256 } 00257 #endif 00258 00259 if (this->content_filter_) { 00260 reader >> this->content_filter_entries_; 00261 if (!reader.good_bit()) return; 00262 gen_find_size(this->content_filter_entries_, this->marshaled_size_, padding); 00263 } 00264 }
bool OpenDDS::DCPS::DataSampleHeader::into_received_data_sample | ( | ReceivedDataSample & | rds | ) |
Definition at line 723 of file DataSampleHeader.cpp.
References OpenDDS::DCPS::ReceivedDataSample::header_.
bool OpenDDS::DCPS::DataSampleHeader::join | ( | const DataSampleHeader & | first, | |
const DataSampleHeader & | second, | |||
DataSampleHeader & | result | |||
) | [static] |
If "first" and "second" are two fragments of the same original message (as created by split()), return true and set up the "result" header to match the original header. Joining the data payload is the responsibility of the caller (manipulate the continuation chain).
Definition at line 439 of file DataSampleHeader.cpp.
References content_filter_, content_filter_entries_, message_length_, more_fragments_, and sequence_.
Referenced by OpenDDS::DCPS::TransportReassembly::insert().
00441 { 00442 if (!first.more_fragments_ || first.sequence_ != second.sequence_) { 00443 return false; 00444 } 00445 result = second; 00446 result.message_length_ += first.message_length_; 00447 if (first.content_filter_) { 00448 result.content_filter_ = true; 00449 const CORBA::ULong entries = first.content_filter_entries_.length(); 00450 CORBA::ULong x = result.content_filter_entries_.length(); 00451 result.content_filter_entries_.length(x + entries); 00452 for (CORBA::ULong i(entries); i > 0;) { 00453 result.content_filter_entries_[x++] = first.content_filter_entries_[--i]; 00454 } 00455 } 00456 return true; 00457 }
ACE_INLINE size_t OpenDDS::DCPS::DataSampleHeader::marshaled_size | ( | ) | const |
Amount of data read when initializing from a buffer.
Definition at line 82 of file DataSampleHeader.inl.
References marshaled_size_.
00083 { 00084 return marshaled_size_; 00085 }
static ACE_UINT8 OpenDDS::DCPS::DataSampleHeader::mask_flag | ( | DataSampleHeaderFlag2 | flag | ) | [inline, static] |
static ACE_UINT8 OpenDDS::DCPS::DataSampleHeader::mask_flag | ( | DataSampleHeaderFlag | flag | ) | [inline, static] |
Definition at line 178 of file DataSampleHeader.h.
Referenced by clear_flag(), init(), partial(), set_flag(), and test_flag().
ACE_INLINE size_t OpenDDS::DCPS::DataSampleHeader::max_marshaled_size | ( | ) | [static] |
Similar to IDL compiler generated methods.
Definition at line 89 of file DataSampleHeader.inl.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::RTPS::Sedp::Writer::end_historic_samples(), OpenDDS::DCPS::UdpDataLink::open(), partial(), OpenDDS::DCPS::TransportSendStrategy::send(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), split(), and OpenDDS::RTPS::Sedp::Writer::write_sample().
00090 { 00091 return 1 + // message_id_; 00092 1 + // submessage_id_; 00093 2 + // flags 00094 4 + // message_length_; 00095 8 + // sequence_; 00096 4 + // source_timestamp_sec_; 00097 4 + // source_timestamp_nanosec_; 00098 4 + // lifespan_duration_sec_; 00099 4 + // lifespan_duration_nanosec_; 00100 16 + // publication_id_; 00101 16 ; // publisher_id_; 00102 // content_filter_entries_ is not marsahled into the same Data Block 00103 // so it is not part of the max_marshaled_size() which is used to allocate 00104 }
ACE_UINT32 OpenDDS::DCPS::DataSampleHeader::message_length | ( | ) | const [inline] |
bool OpenDDS::DCPS::DataSampleHeader::more_fragments | ( | ) | const [inline] |
ACE_INLINE OpenDDS::DCPS::DataSampleHeader & OpenDDS::DCPS::DataSampleHeader::operator= | ( | ACE_Message_Block & | buffer | ) |
Assignment from an ACE_Message_Block.
Definition at line 74 of file DataSampleHeader.inl.
References init().
00075 { 00076 this->init(&buffer); 00077 return *this; 00078 }
bool OpenDDS::DCPS::DataSampleHeader::partial | ( | const ACE_Message_Block & | mb | ) | [static] |
Does the data in this mb constitute a partial Sample Header?
Definition at line 114 of file DataSampleHeader.cpp.
References OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::CONTENT_FILTER_FLAG, FLAGS_OFFSET, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::GROUP_COHERENT_FLAG, OpenDDS::DCPS::LIFESPAN_DURATION_FLAG, mask_flag(), max_marshaled_size(), mb_peek(), OpenDDS::DCPS::MESSAGE_ID_MAX, MESSAGE_ID_OFFSET, OpenDDS::DCPS::SUBMESSAGE_ID_MAX, SUBMESSAGE_ID_OFFSET, and OpenDDS::DCPS::swap().
00115 { 00116 static const unsigned int LIFESPAN_MASK = mask_flag(LIFESPAN_DURATION_FLAG), 00117 LIFESPAN_LENGTH = 8, 00118 COHERENT_MASK = mask_flag(GROUP_COHERENT_FLAG), 00119 COHERENT_LENGTH = 16, 00120 CONTENT_FILT_MASK = mask_flag(CONTENT_FILTER_FLAG), 00121 BYTE_ORDER_MASK = mask_flag(BYTE_ORDER_FLAG); 00122 00123 const size_t len = mb.total_length(); 00124 00125 if (len <= FLAGS_OFFSET) return true; 00126 00127 unsigned char msg_id; 00128 if (!mb_peek(msg_id, mb, MESSAGE_ID_OFFSET, 00129 false /*swap ignored for char*/) 00130 || int(msg_id) >= MESSAGE_ID_MAX) { 00131 // This check, and the similar one below for submessage id, are actually 00132 // indicating an invalid header (and not a partial header) but we can 00133 // treat it the same as partial for the sake of the TransportRecvStrategy. 00134 return true; 00135 } 00136 00137 if (!mb_peek(msg_id, mb, SUBMESSAGE_ID_OFFSET, 00138 false /*swap ignored for char*/) 00139 || int(msg_id) >= SUBMESSAGE_ID_MAX) { 00140 return true; 00141 } 00142 00143 char flags; 00144 if (!mb_peek(flags, mb, FLAGS_OFFSET, false /*swap ignored for char*/)) { 00145 return true; 00146 } 00147 00148 size_t expected = max_marshaled_size(); 00149 if (!(flags & LIFESPAN_MASK)) expected -= LIFESPAN_LENGTH; 00150 if (!(flags & COHERENT_MASK)) expected -= COHERENT_LENGTH; 00151 00152 if (flags & CONTENT_FILT_MASK) { 00153 CORBA::ULong seqLen; 00154 const bool swap = (flags & BYTE_ORDER_MASK) != ACE_CDR_BYTE_ORDER; 00155 if (!mb_peek(seqLen, mb, expected, swap)) { 00156 return true; 00157 } 00158 size_t guidsize = 0, padding = 0; 00159 gen_find_size(GUID_t(), guidsize, padding); 00160 expected += sizeof(seqLen) + guidsize * seqLen; 00161 } 00162 00163 return len < expected; 00164 }
void OpenDDS::DCPS::DataSampleHeader::pdu_remaining | ( | size_t | ) | [inline] |
ACE_INLINE void OpenDDS::DCPS::DataSampleHeader::set_flag | ( | DataSampleHeaderFlag | flag, | |
ACE_Message_Block * | buffer | |||
) | [static] |
Definition at line 131 of file DataSampleHeader.inl.
References FLAGS_OFFSET, and mask_flag().
Referenced by OpenDDS::DCPS::WriteDataContainer::data_delivered().
00133 { 00134 char* base = buffer->base(); 00135 00136 // verify sufficient length exists: 00137 if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) { 00138 ACE_ERROR((LM_ERROR, 00139 ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::set_flag: ") 00140 ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n"))); 00141 return; 00142 } 00143 00144 base[FLAGS_OFFSET] |= mask_flag(flag); 00145 }
void OpenDDS::DCPS::DataSampleHeader::split | ( | const ACE_Message_Block & | orig, | |
size_t | size, | |||
ACE_Message_Block *& | head, | |||
ACE_Message_Block *& | tail | |||
) | [static] |
Create two new serialized headers (owned by caller), the "head" having at most "size" bytes (header + data) and the "tail" having the rest.
Definition at line 369 of file DataSampleHeader.cpp.
References add_cfentries(), alloc_msgblock(), OpenDDS::DCPS::gen_max_marshaled_size(), max_marshaled_size(), message_length_, more_fragments_, and split_payload().
Referenced by OpenDDS::DCPS::TransportQueueElement::fragment().
00371 { 00372 ACE_Message_Block* dup = orig.duplicate(); 00373 AMB_Releaser rel(dup); 00374 00375 const size_t length = dup->total_length(); 00376 DataSampleHeader hdr(*dup); // deserialize entire header (with cfentries) 00377 const size_t hdr_len = length - dup->total_length(); 00378 00379 ACE_Message_Block* payload = dup; 00380 ACE_Message_Block* prev = 0; 00381 for (; payload->length() == 0; payload = payload->cont()) { 00382 prev = payload; 00383 } 00384 prev->cont(0); 00385 00386 if (size < hdr_len) { // need to fragment the content_filter_entries_ 00387 head = alloc_msgblock(*dup, max_marshaled_size(), true); 00388 hdr.more_fragments_ = true; 00389 hdr.message_length_ = 0; // no room for payload data 00390 *head << hdr; 00391 const size_t avail = size - head->length() - 4 /* sequence length */; 00392 const CORBA::ULong n_entries = 00393 static_cast<CORBA::ULong>(avail / gen_max_marshaled_size(GUID_t())); 00394 GUIDSeq entries(n_entries); 00395 entries.length(n_entries); 00396 // remove from the end of hdr's entries (order doesn't matter) 00397 for (CORBA::ULong i(0), x(hdr.content_filter_entries_.length()); 00398 i < n_entries; ++i) { 00399 entries[i] = hdr.content_filter_entries_[--x]; 00400 hdr.content_filter_entries_.length(x); 00401 } 00402 add_cfentries(&entries, head); 00403 00404 tail = alloc_msgblock(*dup, max_marshaled_size(), true); 00405 hdr.more_fragments_ = false; 00406 hdr.content_filter_ = (hdr.content_filter_entries_.length() > 0); 00407 hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length()); 00408 *tail << hdr; 00409 tail->cont(payload); 00410 if (hdr.content_filter_) { 00411 add_cfentries(&hdr.content_filter_entries_, tail); 00412 } 00413 return; 00414 } 00415 00416 ACE_Message_Block* payload_tail; 00417 split_payload(*payload, size - hdr_len, payload, payload_tail); 00418 00419 hdr.more_fragments_ = true; 00420 hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length()); 00421 00422 head = alloc_msgblock(*dup, max_marshaled_size(), true); 00423 *head << hdr; 00424 head->cont(payload); 00425 if (hdr.content_filter_) { 00426 add_cfentries(&hdr.content_filter_entries_, head); 00427 } 00428 00429 hdr.more_fragments_ = false; 00430 hdr.content_filter_ = false; 00431 hdr.message_length_ = static_cast<ACE_UINT32>(payload_tail->total_length()); 00432 00433 tail = alloc_msgblock(*dup, max_marshaled_size(), true); 00434 *tail << hdr; 00435 tail->cont(payload_tail); 00436 }
void OpenDDS::DCPS::DataSampleHeader::split_payload | ( | const ACE_Message_Block & | orig, | |
size_t | size, | |||
ACE_Message_Block *& | head, | |||
ACE_Message_Block *& | tail | |||
) | [static] |
Definition at line 343 of file DataSampleHeader.cpp.
Referenced by OpenDDS::DCPS::RtpsSampleHeader::split(), and split().
00346 { 00347 if (!head) { 00348 head = orig.duplicate(); 00349 } 00350 00351 ACE_Message_Block* frag = head; 00352 size_t frag_remain = size; 00353 for (; frag_remain > frag->length(); frag = frag->cont()) { 00354 frag_remain -= frag->length(); 00355 } 00356 00357 if (frag_remain == frag->length()) { // split at ACE_Message_Block boundary 00358 tail = frag->cont(); 00359 } else { 00360 tail = frag->duplicate(); 00361 frag->wr_ptr(frag->wr_ptr() - frag->length() + frag_remain); 00362 ACE_Message_Block::release(frag->cont()); 00363 tail->rd_ptr(frag_remain); 00364 } 00365 frag->cont(0); 00366 }
ACE_INLINE bool OpenDDS::DCPS::DataSampleHeader::test_flag | ( | DataSampleHeaderFlag | flag, | |
const ACE_Message_Block * | buffer | |||
) | [static] |
Definition at line 149 of file DataSampleHeader.inl.
References FLAGS_OFFSET, and mask_flag().
Referenced by add_cfentries(), OpenDDS::DCPS::DataDurabilityCache::insert(), and OpenDDS::DCPS::DataLinkSet::send().
00151 { 00152 char* base = buffer->base(); 00153 00154 // verify sufficient length exists: 00155 if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) { 00156 ACE_ERROR_RETURN((LM_ERROR, 00157 ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::set_flag: ") 00158 ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n")), false); 00159 } 00160 00161 // Test flag bit. 00162 return base[FLAGS_OFFSET] & mask_flag(flag); 00163 }
0 - Message encoded using big-endian byte order. (see ace/CDR_Base.h) 1 - Message encoded using little-endian byte order.
Definition at line 85 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::DataWriterImpl::filter_out(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().
The data payload uses CDR encapsulation and alignment rules, as defined by the RTPS specification formal/2010-11-01.
Definition at line 120 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::DataWriterImpl::filter_out(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
The flag indicates the sample belongs to a coherent change set (i.e. PRESENTATION coherent_access == true).
Definition at line 89 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
The publishing side has applied content filtering, and the optional content_filter_entries_ field is present in the marshaled header.
Definition at line 103 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataLink::data_received_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), init(), join(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
Optional field present if the content_filter_ flag bit is set. Indicates which readers should not receive the data.
Definition at line 176 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataLink::data_received_i(), init(), join(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
Definition at line 99 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
This flag indicates a sample has been resent from a non-VOLATILE DataWriter.
Definition at line 93 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::DataReaderImpl::deliver_historic(), init(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsSampleHeader::process_iqos(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().
Only the key fields of the data sample are present in the payload.
Definition at line 123 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), and OpenDDS::DCPS::to_string().
This flag indicates the sample header contains non-default LIFESPAN duration fields.
Definition at line 97 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
Definition at line 163 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
The LIFESPAN duration field is generated from the DataWriter or supplied by the application at the time of the write. This field is used to determine if a given sample is considered 'stale' and should be discarded by associated DataReader. These fields are optional and are controlled by the lifespan_duration_ flag.
Definition at line 162 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
size_t OpenDDS::DCPS::DataSampleHeader::marshaled_size_ [private] |
Keep track of the amount of data read from a buffer.
Definition at line 244 of file DataSampleHeader.h.
Referenced by init(), and marshaled_size().
The enum MessageId.
Definition at line 78 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::RtpsSampleHeader::process_iqos(), OpenDDS::DCPS::WriteDataContainer::release_buffer(), OpenDDS::DCPS::MulticastDataLink::sample_received(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), OpenDDS::DCPS::TransportClient::send_i(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().
The size of the data sample (without header). After this header is demarshaled, the transport expects to see this many bytes in the stream before the start of the next header (or end of the Transport PDU).
Definition at line 135 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl::data_received(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), split(), and OpenDDS::DCPS::to_string().
The current "Data Sample" needs reassembly before further processing.
Definition at line 111 of file DataSampleHeader.h.
Referenced by init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), OpenDDS::DCPS::operator<<(), split(), and OpenDDS::DCPS::to_string().
Identify the DataWriter that produced the sample data being sent.
Definition at line 168 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataLink::data_received_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::ReliableSession::ready_to_deliver(), OpenDDS::DCPS::TransportReassembly::reassemble_i(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().
Id representing the coherent group. Optional field that's only present if the flag for group_coherent_ is set.
Definition at line 172 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
Definition at line 125 of file DataSampleHeader.h.
Definition at line 126 of file DataSampleHeader.h.
Definition at line 127 of file DataSampleHeader.h.
Definition at line 128 of file DataSampleHeader.h.
Definition at line 129 of file DataSampleHeader.h.
Definition at line 130 of file DataSampleHeader.h.
The sequence number is obtained from the Publisher associated with the DataWriter based on the PRESENTATION requirement for the sequence value (access_scope == GROUP).
Definition at line 140 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::ReliableSession::ready_to_deliver(), OpenDDS::DCPS::TransportReassembly::reassemble_i(), OpenDDS::DCPS::TransportSendElement::sequence(), OpenDDS::DCPS::TransportSendControlElement::sequence(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().
Due to content filtering, a gap in the sequence numbers may be an expected condition. If this bit is set, assume prior sequence numbers were filtered-out and are not missing.
Definition at line 108 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().
Definition at line 152 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), init(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::resend_data_expired(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().
The SOURCE_TIMESTAMP field is generated from the DataWriter or supplied by the application at the time of the write. This value is derived from the local hosts system clock, which is assumed to be synchronized with the clocks on other hosts within the domain. This field is required for DESTINATION_ORDER and LIFESPAN policy behaviors of subscriptions. It is also required to be present for all data in the SampleInfo structure supplied along with each data sample.
Definition at line 151 of file DataSampleHeader.h.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), init(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::resend_data_expired(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().
Implementation-specific sub-message Ids.
Definition at line 81 of file DataSampleHeader.h.
Referenced by init(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::MulticastDataLink::sample_received(), and OpenDDS::DCPS::to_string().