OpenDDS::DCPS::DataSampleHeader Struct Reference

#include <DataSampleHeader.h>

Inheritance diagram for OpenDDS::DCPS::DataSampleHeader:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataSampleHeader:
Collaboration graph
[legend]

List of all members.

Public Types

enum  { MESSAGE_ID_OFFSET = 0, SUBMESSAGE_ID_OFFSET = 1, FLAGS_OFFSET = 2 }

Public Member Functions

 DataSampleHeader ()
 DataSampleHeader (ACE_Message_Block &buffer)
 Construct with values extracted from a buffer.
DataSampleHeaderoperator= (ACE_Message_Block &buffer)
 Assignment from an ACE_Message_Block.
size_t marshaled_size () const
 Amount of data read when initializing from a buffer.
void init (ACE_Message_Block *buffer)
 Implement load from buffer.
bool into_received_data_sample (ReceivedDataSample &rds)
ACE_UINT32 message_length () const
bool more_fragments () const
void pdu_remaining (size_t)

Static Public Member Functions

static ACE_UINT8 mask_flag (DataSampleHeaderFlag flag)
static ACE_UINT8 mask_flag (DataSampleHeaderFlag2 flag)
static void clear_flag (DataSampleHeaderFlag flag, ACE_Message_Block *buffer)
static void set_flag (DataSampleHeaderFlag flag, ACE_Message_Block *buffer)
static bool test_flag (DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
static bool partial (const ACE_Message_Block &mb)
 Does the data in this mb constitute a partial Sample Header?
static void add_cfentries (const GUIDSeq *guids, ACE_Message_Block *mb)
static void split (const ACE_Message_Block &orig, size_t size, Message_Block_Ptr &head, Message_Block_Ptr &tail)
static bool join (const DataSampleHeader &first, const DataSampleHeader &second, DataSampleHeader &result)
static size_t max_marshaled_size ()
 Similar to IDL compiler generated methods.
static ACE_Message_Blockalloc_msgblock (const ACE_Message_Block &mb, size_t size, bool use_data_alloc)
static void split_payload (const ACE_Message_Block &orig, size_t size, Message_Block_Ptr &head, Message_Block_Ptr &tail)

Public Attributes

char message_id_
 The enum MessageId.
char submessage_id_
 Implementation-specific sub-message Ids.
bool byte_order_: 1
bool coherent_change_: 1
bool historic_sample_: 1
bool lifespan_duration_: 1
bool group_coherent_: 1
bool content_filter_: 1
bool sequence_repair_: 1
bool more_fragments_: 1
 The current "Data Sample" needs reassembly before further processing.
bool cdr_encapsulation_: 1
bool key_fields_only_: 1
 Only the key fields of the data sample are present in the payload.
bool reserved_1: 1
bool reserved_2: 1
bool reserved_3: 1
bool reserved_4: 1
bool reserved_5: 1
bool reserved_6: 1
ACE_UINT32 message_length_
SequenceNumber sequence_
ACE_INT32 source_timestamp_sec_
ACE_UINT32 source_timestamp_nanosec_
ACE_INT32 lifespan_duration_sec_
ACE_UINT32 lifespan_duration_nanosec_
PublicationId publication_id_
RepoId publisher_id_
GUIDSeq content_filter_entries_

Private Attributes

size_t marshaled_size_
 Keep track of the amount of data read from a buffer.

Detailed Description

The header message of a data sample. This header and the data sample are in different message block and will be chained together.

Definition at line 75 of file DataSampleHeader.h.


Member Enumeration Documentation

anonymous enum
Enumerator:
MESSAGE_ID_OFFSET 
SUBMESSAGE_ID_OFFSET 
FLAGS_OFFSET 

Definition at line 76 of file DataSampleHeader.h.

00076        {
00077     MESSAGE_ID_OFFSET = 0,
00078     SUBMESSAGE_ID_OFFSET = 1,
00079     FLAGS_OFFSET = 2 // message_id_ + submessage_id_
00080   };


Constructor & Destructor Documentation

ACE_INLINE OpenDDS::DCPS::DataSampleHeader::DataSampleHeader (  ) 

Definition at line 9 of file DataSampleHeader.inl.

00010   : message_id_(0)
00011   , submessage_id_(0)
00012   , byte_order_(ACE_CDR_BYTE_ORDER)
00013   , coherent_change_(0)
00014   , historic_sample_(0)
00015   , lifespan_duration_(0)
00016   , group_coherent_(0)
00017   , content_filter_(0)
00018   , sequence_repair_(0)
00019   , more_fragments_(0)
00020   , cdr_encapsulation_(0)
00021   , key_fields_only_(0)
00022   , reserved_1(0)
00023   , reserved_2(0)
00024   , reserved_3(0)
00025   , reserved_4(0)
00026   , reserved_5(0)
00027   , reserved_6(0)
00028   , message_length_(0)
00029   , sequence_()
00030   , source_timestamp_sec_(0)
00031   , source_timestamp_nanosec_(0)
00032   , lifespan_duration_sec_(0)
00033   , lifespan_duration_nanosec_(0)
00034   , publication_id_(GUID_UNKNOWN)
00035   , publisher_id_(GUID_UNKNOWN)
00036   , marshaled_size_(0)
00037 {
00038 }

ACE_INLINE OpenDDS::DCPS::DataSampleHeader::DataSampleHeader ( ACE_Message_Block buffer  )  [explicit]

Construct with values extracted from a buffer.

Definition at line 41 of file DataSampleHeader.inl.

References init().

00042   : message_id_(0)
00043   , submessage_id_(0)
00044   , byte_order_(ACE_CDR_BYTE_ORDER)
00045   , coherent_change_(0)
00046   , historic_sample_(0)
00047   , lifespan_duration_(0)
00048   , group_coherent_(0)
00049   , content_filter_(0)
00050   , sequence_repair_(0)
00051   , more_fragments_(0)
00052   , cdr_encapsulation_(0)
00053   , key_fields_only_(0)
00054   , reserved_1(0)
00055   , reserved_2(0)
00056   , reserved_3(0)
00057   , reserved_4(0)
00058   , reserved_5(0)
00059   , reserved_6(0)
00060   , message_length_(0)
00061   , sequence_()
00062   , source_timestamp_sec_(0)
00063   , source_timestamp_nanosec_(0)
00064   , lifespan_duration_sec_(0)
00065   , lifespan_duration_nanosec_(0)
00066   , publication_id_(GUID_UNKNOWN)
00067   , publisher_id_(GUID_UNKNOWN)
00068 {
00069   this->init(&buffer);
00070 }

Here is the call graph for this function:


Member Function Documentation

void OpenDDS::DCPS::DataSampleHeader::add_cfentries ( const GUIDSeq guids,
ACE_Message_Block mb 
) [static]

Marshal the "guids" as an optional header chained as to the continuation of "mb" (which must already be a valid DataSampleHeader serialization). Any existing payload of "mb" (its continuation) will be chained after the new optional header part. "guids" may be null, same serialization as 0.

Definition at line 337 of file DataSampleHeader.cpp.

References alloc_msgblock(), OpenDDS::DCPS::BYTE_ORDER_FLAG, ACE_Message_Block::cont(), OpenDDS::DCPS::gen_find_size(), size, and test_flag().

Referenced by OpenDDS::DCPS::DataLinkSet::send(), and split().

00338 {
00339   size_t size = 0;
00340   if (guids) {
00341     size_t padding = 0; // GUIDs are always aligned
00342     gen_find_size(*guids, size, padding);
00343   } else {
00344     size = sizeof(CORBA::ULong);
00345   }
00346   ACE_Message_Block* optHdr = alloc_msgblock(*mb, size, false);
00347 
00348   const bool swap = (ACE_CDR_BYTE_ORDER != test_flag(BYTE_ORDER_FLAG, mb));
00349   Serializer ser(optHdr, swap);
00350   if (guids) {
00351     ser << *guids;
00352   } else {
00353     ser << CORBA::ULong(0);
00354   }
00355 
00356   // New chain: mb (DataSampleHeader), optHdr (GUIDSeq), data (Foo or control)
00357   optHdr->cont(mb->cont());
00358   mb->cont(optHdr);
00359 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_Message_Block * OpenDDS::DCPS::DataSampleHeader::alloc_msgblock ( const ACE_Message_Block mb,
size_t  size,
bool  use_data_alloc 
) [static]

Definition at line 80 of file DataSampleHeader.cpp.

References ACE_Message_Block::access_allocators(), OpenDDS::RTPS::DATA, ACE_Message_Block::locking_strategy(), malloc(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, and ACE_Time_Value::zero.

Referenced by add_cfentries(), OpenDDS::DCPS::RtpsSampleHeader::split(), and split().

00082 {
00083   enum { DATA, DB, MB, N_ALLOC };
00084   ACE_Allocator* allocators[N_ALLOC];
00085   // It's an ACE bug that access_allocators isn't const
00086   ACE_Message_Block& mut_mb = const_cast<ACE_Message_Block&>(mb);
00087   mut_mb.access_allocators(allocators[DATA], allocators[DB], allocators[MB]);
00088   if (allocators[MB]) {
00089     ACE_Message_Block* result;
00090     ACE_NEW_MALLOC_RETURN(result,
00091       static_cast<ACE_Message_Block*>(
00092         allocators[MB]->malloc(sizeof(ACE_Message_Block))),
00093       ACE_Message_Block(size,
00094                         ACE_Message_Block::MB_DATA,
00095                         0, // cont
00096                         0, // data
00097                         use_data_alloc ? allocators[DATA] : 0,
00098                         mut_mb.locking_strategy(), // locking_strategy
00099                         ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00100                         ACE_Time_Value::zero,
00101                         ACE_Time_Value::max_time,
00102                         allocators[DB],
00103                         allocators[MB]),
00104       0);
00105     return result;
00106   } else {
00107     return new ACE_Message_Block(size,
00108                                   ACE_Message_Block::MB_DATA,
00109                                   0, // cont
00110                                   0, // data
00111                                   use_data_alloc ? allocators[DATA] : 0,
00112                                   mut_mb.locking_strategy(), // locking_strategy
00113                                   ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00114                                   ACE_Time_Value::zero,
00115                                   ACE_Time_Value::max_time,
00116                                   allocators[DB]);
00117   }
00118 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::DataSampleHeader::clear_flag ( DataSampleHeaderFlag  flag,
ACE_Message_Block buffer 
) [static]

The clear_flag and set_flag methods are a hack to update the header flags after a sample has been serialized without deserializing the entire message. This method will break if the current Serializer behavior changes.

Definition at line 113 of file DataSampleHeader.inl.

References ACE_TEXT(), ACE_Message_Block::base(), ACE_Message_Block::end(), FLAGS_OFFSET, LM_ERROR, and mask_flag().

00115 {
00116   char* base = buffer->base();
00117 
00118   // verify sufficient length exists:
00119   if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) {
00120     ACE_ERROR((LM_ERROR,
00121                ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::clear_flag: ")
00122                ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n")));
00123     return;
00124   }
00125 
00126   base[FLAGS_OFFSET] &= ~mask_flag(flag);
00127 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataSampleHeader::init ( ACE_Message_Block buffer  ) 

Implement load from buffer.

Definition at line 173 of file DataSampleHeader.cpp.

References byte_order_, OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::CDR_ENCAP_FLAG, cdr_encapsulation_, coherent_change_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, content_filter_, content_filter_entries_, OpenDDS::DCPS::CONTENT_FILTER_FLAG, OpenDDS::DCPS::gen_find_size(), group_coherent_, OpenDDS::DCPS::GROUP_COHERENT_FLAG, historic_sample_, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, key_fields_only_, OpenDDS::DCPS::KEY_ONLY_FLAG, lifespan_duration_, OpenDDS::DCPS::LIFESPAN_DURATION_FLAG, lifespan_duration_nanosec_, lifespan_duration_sec_, marshaled_size_, mask_flag(), message_id_, message_length_, more_fragments_, OpenDDS::DCPS::MORE_FRAGMENTS_FLAG, publication_id_, publisher_id_, sequence_, sequence_repair_, OpenDDS::DCPS::SEQUENCE_REPAIR_FLAG, source_timestamp_nanosec_, source_timestamp_sec_, submessage_id_, and OpenDDS::DCPS::Serializer::swap_bytes().

Referenced by DataSampleHeader(), and operator=().

00174 {
00175   this->marshaled_size_ = 0;
00176 
00177   Serializer reader(buffer);
00178 
00179   // Only byte-sized reads until we get the byte_order_ flag.
00180 
00181   if (!(reader >> this->message_id_)) {
00182     return;
00183   }
00184 
00185   this->marshaled_size_ += sizeof(this->message_id_);
00186 
00187   if (!(reader >> this->submessage_id_)) {
00188     return;
00189   }
00190 
00191   this->marshaled_size_ += sizeof(this->submessage_id_);
00192 
00193   // Extract the flag values.
00194   ACE_CDR::Octet byte;
00195   if (!(reader >> ACE_InputCDR::to_octet(byte))) {
00196     return;
00197   }
00198 
00199   this->marshaled_size_ += sizeof(byte);
00200 
00201   this->byte_order_         = byte & mask_flag(BYTE_ORDER_FLAG);
00202   this->coherent_change_    = byte & mask_flag(COHERENT_CHANGE_FLAG);
00203   this->historic_sample_    = byte & mask_flag(HISTORIC_SAMPLE_FLAG);
00204   this->lifespan_duration_  = byte & mask_flag(LIFESPAN_DURATION_FLAG);
00205   this->group_coherent_     = byte & mask_flag(GROUP_COHERENT_FLAG);
00206   this->content_filter_     = byte & mask_flag(CONTENT_FILTER_FLAG);
00207   this->sequence_repair_    = byte & mask_flag(SEQUENCE_REPAIR_FLAG);
00208   this->more_fragments_     = byte & mask_flag(MORE_FRAGMENTS_FLAG);
00209 
00210   // Set swap_bytes flag to the Serializer if data sample from
00211   // the publisher is in different byte order.
00212   reader.swap_bytes(this->byte_order_ != ACE_CDR_BYTE_ORDER);
00213 
00214   if (!(reader >> ACE_InputCDR::to_octet(byte))) {
00215     return;
00216   }
00217 
00218   this->marshaled_size_ += sizeof(byte);
00219 
00220   this->cdr_encapsulation_ = byte & mask_flag(CDR_ENCAP_FLAG);
00221   this->key_fields_only_   = byte & mask_flag(KEY_ONLY_FLAG);
00222 
00223   if (!(reader >> this->message_length_)) {
00224     return;
00225   }
00226 
00227   this->marshaled_size_ += sizeof(this->message_length_);
00228 
00229   if (!(reader >> this->sequence_)) {
00230     return;
00231   }
00232 
00233   size_t padding = 0;
00234   gen_find_size(this->sequence_, this->marshaled_size_, padding);
00235 
00236   if (!(reader >> this->source_timestamp_sec_)) {
00237     return;
00238   }
00239 
00240   this->marshaled_size_ += sizeof(this->source_timestamp_sec_);
00241 
00242   if (!(reader >> this->source_timestamp_nanosec_)) {
00243     return;
00244   }
00245 
00246   this->marshaled_size_ += sizeof(this->source_timestamp_nanosec_);
00247 
00248   if (this->lifespan_duration_) {
00249     if (!(reader >> this->lifespan_duration_sec_)) {
00250       return;
00251     }
00252 
00253     this->marshaled_size_ += sizeof(this->lifespan_duration_sec_);
00254 
00255     if (!(reader >> this->lifespan_duration_nanosec_)) {
00256       return;
00257     }
00258 
00259     this->marshaled_size_ += sizeof(this->lifespan_duration_nanosec_);
00260   }
00261 
00262   if (!(reader >> this->publication_id_)) {
00263     return;
00264   }
00265 
00266   gen_find_size(this->publication_id_, this->marshaled_size_, padding);
00267 
00268 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00269   if (this->group_coherent_) {
00270     if (!(reader >> this->publisher_id_)) {
00271       return;
00272     }
00273     gen_find_size(this->publisher_id_, this->marshaled_size_, padding);
00274   }
00275 #endif
00276 
00277   if (this->content_filter_) {
00278     if (!(reader >> this->content_filter_entries_)) {
00279       return;
00280     }
00281     gen_find_size(this->content_filter_entries_, this->marshaled_size_, padding);
00282   }
00283 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataSampleHeader::into_received_data_sample ( ReceivedDataSample rds  ) 

Definition at line 743 of file DataSampleHeader.cpp.

References OpenDDS::DCPS::ReceivedDataSample::header_.

00744 {
00745   rds.header_ = *this;
00746   return true;
00747 }

bool OpenDDS::DCPS::DataSampleHeader::join ( const DataSampleHeader first,
const DataSampleHeader second,
DataSampleHeader result 
) [static]

If "first" and "second" are two fragments of the same original message (as created by split()), return true and set up the "result" header to match the original header. Joining the data payload is the responsibility of the caller (manipulate the continuation chain).

Definition at line 459 of file DataSampleHeader.cpp.

References content_filter_, content_filter_entries_, message_length_, more_fragments_, and sequence_.

Referenced by OpenDDS::DCPS::TransportReassembly::insert().

00461 {
00462   if (!first.more_fragments_ || first.sequence_ != second.sequence_) {
00463     return false;
00464   }
00465   result = second;
00466   result.message_length_ += first.message_length_;
00467   if (first.content_filter_) {
00468     result.content_filter_ = true;
00469     const CORBA::ULong entries = first.content_filter_entries_.length();
00470     CORBA::ULong x = result.content_filter_entries_.length();
00471     result.content_filter_entries_.length(x + entries);
00472     for (CORBA::ULong i(entries); i > 0;) {
00473       result.content_filter_entries_[x++] = first.content_filter_entries_[--i];
00474     }
00475   }
00476   return true;
00477 }

Here is the caller graph for this function:

ACE_INLINE size_t OpenDDS::DCPS::DataSampleHeader::marshaled_size (  )  const

Amount of data read when initializing from a buffer.

Definition at line 82 of file DataSampleHeader.inl.

References marshaled_size_.

00083 {
00084   return marshaled_size_;
00085 }

static ACE_UINT8 OpenDDS::DCPS::DataSampleHeader::mask_flag ( DataSampleHeaderFlag2  flag  )  [inline, static]

Definition at line 184 of file DataSampleHeader.h.

00184 { return 1 << flag; }

static ACE_UINT8 OpenDDS::DCPS::DataSampleHeader::mask_flag ( DataSampleHeaderFlag  flag  )  [inline, static]

Definition at line 183 of file DataSampleHeader.h.

Referenced by clear_flag(), init(), partial(), set_flag(), and test_flag().

00183 { return 1 << flag; }

Here is the caller graph for this function:

ACE_INLINE size_t OpenDDS::DCPS::DataSampleHeader::max_marshaled_size (  )  [static]

Similar to IDL compiler generated methods.

Definition at line 89 of file DataSampleHeader.inl.

Referenced by OpenDDS::DCPS::DataLink::create_control(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::UdpDataLink::open(), partial(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::TransportSendStrategy::send(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), and split().

00090 {
00091   return 1 + // message_id_;
00092          1 + // submessage_id_;
00093          2 + // flags
00094          4 + // message_length_;
00095          8 + // sequence_;
00096          4 + // source_timestamp_sec_;
00097          4 + // source_timestamp_nanosec_;
00098          4 + // lifespan_duration_sec_;
00099          4 + // lifespan_duration_nanosec_;
00100         16 + // publication_id_;
00101         16 ; // publisher_id_;
00102   // content_filter_entries_ is not marsahled into the same Data Block
00103   // so it is not part of the max_marshaled_size() which is used to allocate
00104 }

Here is the caller graph for this function:

ACE_UINT32 OpenDDS::DCPS::DataSampleHeader::message_length ( void   )  const [inline]

Definition at line 235 of file DataSampleHeader.h.

00235 { return this->message_length_; }

bool OpenDDS::DCPS::DataSampleHeader::more_fragments ( void   )  const [inline]

Definition at line 237 of file DataSampleHeader.h.

00237 { return this->more_fragments_; }

ACE_INLINE OpenDDS::DCPS::DataSampleHeader & OpenDDS::DCPS::DataSampleHeader::operator= ( ACE_Message_Block buffer  ) 

Assignment from an ACE_Message_Block.

Definition at line 74 of file DataSampleHeader.inl.

References init().

00075 {
00076   this->init(&buffer);
00077   return *this;
00078 }

Here is the call graph for this function:

bool OpenDDS::DCPS::DataSampleHeader::partial ( const ACE_Message_Block mb  )  [static]

Does the data in this mb constitute a partial Sample Header?

Definition at line 120 of file DataSampleHeader.cpp.

References OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::CONTENT_FILTER_FLAG, FLAGS_OFFSET, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::GROUP_COHERENT_FLAG, len, OpenDDS::DCPS::LIFESPAN_DURATION_FLAG, mask_flag(), max_marshaled_size(), OpenDDS::DCPS::MESSAGE_ID_MAX, MESSAGE_ID_OFFSET, OpenDDS::DCPS::SUBMESSAGE_ID_MAX, SUBMESSAGE_ID_OFFSET, and ACE_Message_Block::total_length().

00121 {
00122   static const unsigned int LIFESPAN_MASK = mask_flag(LIFESPAN_DURATION_FLAG),
00123     LIFESPAN_LENGTH = 8,
00124     COHERENT_MASK = mask_flag(GROUP_COHERENT_FLAG),
00125     COHERENT_LENGTH = 16,
00126     CONTENT_FILT_MASK = mask_flag(CONTENT_FILTER_FLAG),
00127     BYTE_ORDER_MASK = mask_flag(BYTE_ORDER_FLAG);
00128 
00129   const size_t len = mb.total_length();
00130 
00131   if (len <= FLAGS_OFFSET) return true;
00132 
00133   unsigned char msg_id;
00134   if (!mb_peek(msg_id, mb, MESSAGE_ID_OFFSET,
00135                false /*swap ignored for char*/)
00136       || int(msg_id) >= MESSAGE_ID_MAX) {
00137     // This check, and the similar one below for submessage id, are actually
00138     // indicating an invalid header (and not a partial header) but we can
00139     // treat it the same as partial for the sake of the TransportRecvStrategy.
00140     return true;
00141   }
00142 
00143   if (!mb_peek(msg_id, mb, SUBMESSAGE_ID_OFFSET,
00144                false /*swap ignored for char*/)
00145       || int(msg_id) >= SUBMESSAGE_ID_MAX) {
00146     return true;
00147   }
00148 
00149   char flags;
00150   if (!mb_peek(flags, mb, FLAGS_OFFSET, false /*swap ignored for char*/)) {
00151     return true;
00152   }
00153 
00154   size_t expected = max_marshaled_size();
00155   if (!(flags & LIFESPAN_MASK)) expected -= LIFESPAN_LENGTH;
00156   if (!(flags & COHERENT_MASK)) expected -= COHERENT_LENGTH;
00157 
00158   if (flags & CONTENT_FILT_MASK) {
00159     CORBA::ULong seqLen;
00160     const bool swap = (flags & BYTE_ORDER_MASK) != ACE_CDR_BYTE_ORDER;
00161     if (!mb_peek(seqLen, mb, expected, swap)) {
00162       return true;
00163     }
00164     size_t guidsize = 0, padding = 0;
00165     gen_find_size(GUID_t(), guidsize, padding);
00166     expected += sizeof(seqLen) + guidsize * seqLen;
00167   }
00168 
00169   return len < expected;
00170 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataSampleHeader::pdu_remaining ( size_t   )  [inline]

Definition at line 239 of file DataSampleHeader.h.

00239 { /* ignored, only RTPS uses this */ }

ACE_INLINE void OpenDDS::DCPS::DataSampleHeader::set_flag ( DataSampleHeaderFlag  flag,
ACE_Message_Block buffer 
) [static]

Definition at line 131 of file DataSampleHeader.inl.

References ACE_TEXT(), ACE_Message_Block::base(), ACE_Message_Block::end(), FLAGS_OFFSET, LM_ERROR, and mask_flag().

Referenced by OpenDDS::DCPS::WriteDataContainer::data_delivered().

00133 {
00134   char* base = buffer->base();
00135 
00136   // verify sufficient length exists:
00137   if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) {
00138     ACE_ERROR((LM_ERROR,
00139                ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::set_flag: ")
00140                ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n")));
00141     return;
00142   }
00143 
00144   base[FLAGS_OFFSET] |= mask_flag(flag);
00145 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataSampleHeader::split ( const ACE_Message_Block orig,
size_t  size,
Message_Block_Ptr head,
Message_Block_Ptr tail 
) [static]

Create two new serialized headers (owned by caller), the "head" having at most "size" bytes (header + data) and the "tail" having the rest.

Definition at line 388 of file DataSampleHeader.cpp.

References add_cfentries(), alloc_msgblock(), ACE_Message_Block::cont(), dup(), ACE_Message_Block::duplicate(), OpenDDS::DCPS::gen_max_marshaled_size(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), ACE_Message_Block::length(), max_marshaled_size(), message_length_, more_fragments_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), split_payload(), and ACE_Message_Block::total_length().

Referenced by OpenDDS::DCPS::TransportQueueElement::fragment().

00390 {
00391   Message_Block_Ptr dup (orig.duplicate());
00392 
00393   const size_t length = dup->total_length();
00394   DataSampleHeader hdr(*dup); // deserialize entire header (with cfentries)
00395   const size_t hdr_len = length - dup->total_length();
00396 
00397   ACE_Message_Block* payload = dup.get();
00398   //skip zero length message blocks
00399   ACE_Message_Block* prev = 0;
00400   for (; payload->length() == 0; payload = payload->cont()) {
00401     prev = payload;
00402   }
00403   prev->cont(0);
00404   Message_Block_Ptr payload_head(payload);
00405 
00406   if (size < hdr_len) { // need to fragment the content_filter_entries_
00407     head.reset(alloc_msgblock(*dup, max_marshaled_size(), true));
00408     hdr.more_fragments_ = true;
00409     hdr.message_length_ = 0; // no room for payload data
00410     *head << hdr;
00411     const size_t avail = size - head->length() - 4 /* sequence length */;
00412     const CORBA::ULong n_entries =
00413       static_cast<CORBA::ULong>(avail / gen_max_marshaled_size(GUID_t()));
00414     GUIDSeq entries(n_entries);
00415     entries.length(n_entries);
00416     // remove from the end of hdr's entries (order doesn't matter)
00417     for (CORBA::ULong i(0), x(hdr.content_filter_entries_.length());
00418          i < n_entries; ++i) {
00419       entries[i] = hdr.content_filter_entries_[--x];
00420       hdr.content_filter_entries_.length(x);
00421     }
00422     add_cfentries(&entries, head.get());
00423 
00424     tail.reset(alloc_msgblock(*dup, max_marshaled_size(), true));
00425     hdr.more_fragments_ = false;
00426     hdr.content_filter_ = (hdr.content_filter_entries_.length() > 0);
00427     hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length());
00428     *tail << hdr;
00429     tail->cont(payload);
00430     if (hdr.content_filter_) {
00431       add_cfentries(&hdr.content_filter_entries_, tail.get());
00432     }
00433     return;
00434   }
00435 
00436   Message_Block_Ptr payload_tail;
00437   split_payload(*payload, size - hdr_len, payload_head, payload_tail);
00438 
00439   hdr.more_fragments_ = true;
00440   hdr.message_length_ = static_cast<ACE_UINT32>(payload_head->total_length());
00441 
00442   head.reset(alloc_msgblock(*dup, max_marshaled_size(), true));
00443   *head << hdr;
00444   head->cont(payload_head.release());
00445   if (hdr.content_filter_) {
00446     add_cfentries(&hdr.content_filter_entries_, head.get());
00447   }
00448 
00449   hdr.more_fragments_ = false;
00450   hdr.content_filter_ = false;
00451   hdr.message_length_ = static_cast<ACE_UINT32>(payload_tail->total_length());
00452 
00453   tail.reset(alloc_msgblock(*dup, max_marshaled_size(), true));
00454   *tail << hdr;
00455   tail->cont(payload_tail.release());
00456 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::DataSampleHeader::split_payload ( const ACE_Message_Block orig,
size_t  size,
Message_Block_Ptr head,
Message_Block_Ptr tail 
) [static]

Definition at line 362 of file DataSampleHeader.cpp.

References ACE_Message_Block::cont(), ACE_Message_Block::duplicate(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), ACE_Message_Block::length(), ACE_Message_Block::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), and ACE_Message_Block::wr_ptr().

Referenced by OpenDDS::DCPS::RtpsSampleHeader::split(), and split().

00365 {
00366   if (!head) {
00367     head.reset(orig.duplicate());
00368   }
00369 
00370   ACE_Message_Block* frag = head.get();
00371   size_t frag_remain = size;
00372   for (; frag_remain > frag->length(); frag = frag->cont()) {
00373     frag_remain -= frag->length();
00374   }
00375 
00376   if (frag_remain == frag->length()) { // split at ACE_Message_Block boundary
00377     tail.reset(frag->cont());
00378   } else {
00379     tail.reset(frag->duplicate());
00380     frag->wr_ptr(frag->wr_ptr() - frag->length() + frag_remain);
00381     ACE_Message_Block::release(frag->cont());
00382     tail->rd_ptr(frag_remain);
00383   }
00384   frag->cont(0);
00385 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::DataSampleHeader::test_flag ( DataSampleHeaderFlag  flag,
const ACE_Message_Block buffer 
) [static]

Definition at line 149 of file DataSampleHeader.inl.

References ACE_TEXT(), ACE_Message_Block::base(), ACE_Message_Block::end(), FLAGS_OFFSET, LM_ERROR, and mask_flag().

Referenced by add_cfentries(), OpenDDS::DCPS::SingleSendBuffer::insert(), OpenDDS::DCPS::DataDurabilityCache::insert(), OpenDDS::DCPS::WriteDataContainer::remove_excess_durable(), and OpenDDS::DCPS::DataLinkSet::send().

00151 {
00152   char* base = buffer->base();
00153 
00154   // verify sufficient length exists:
00155   if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) {
00156     ACE_ERROR_RETURN((LM_ERROR,
00157                       ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::set_flag: ")
00158                       ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n")), false);
00159   }
00160 
00161   // Test flag bit.
00162   return base[FLAGS_OFFSET] & mask_flag(flag);
00163 }

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

The publishing side has applied content filtering, and the optional content_filter_entries_ field is present in the marshaled header.

Definition at line 108 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataLink::data_received_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), init(), and join().

Optional field present if the content_filter_ flag bit is set. Indicates which readers should not receive the data.

Definition at line 181 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataLink::data_received_i(), init(), and join().

This flag indicates the sample header contains non-default LIFESPAN duration fields.

Definition at line 102 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), and init().

The LIFESPAN duration field is generated from the DataWriter or supplied by the application at the time of the write. This field is used to determine if a given sample is considered 'stale' and should be discarded by associated DataReader. These fields are optional and are controlled by the lifespan_duration_ flag.

Definition at line 167 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), and init().

Keep track of the amount of data read from a buffer.

Definition at line 249 of file DataSampleHeader.h.

Referenced by init(), and marshaled_size().

The enum MessageId.

Definition at line 83 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::MulticastDataLink::check_header(), OpenDDS::DCPS::DataLink::create_control(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::RtpsSampleHeader::process_iqos(), OpenDDS::DCPS::WriteDataContainer::release_buffer(), OpenDDS::DCPS::TcpDataLink::request_ack_received(), OpenDDS::DCPS::MulticastDataLink::sample_received(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), OpenDDS::DCPS::TransportClient::send_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data(), and OpenDDS::DCPS::DataReaderImpl::writer_activity().

Id representing the coherent group. Optional field that's only present if the flag for group_coherent_ is set.

Definition at line 177 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::accept_sample_processing(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), and OpenDDS::DCPS::TcpDataLink::request_ack_received().

Definition at line 130 of file DataSampleHeader.h.

Definition at line 131 of file DataSampleHeader.h.

Definition at line 132 of file DataSampleHeader.h.

Definition at line 133 of file DataSampleHeader.h.

Definition at line 134 of file DataSampleHeader.h.

Definition at line 135 of file DataSampleHeader.h.

Due to content filtering, a gap in the sequence numbers may be an expected condition. If this bit is set, assume prior sequence numbers were filtered-out and are not missing.

Definition at line 113 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), and init().

The SOURCE_TIMESTAMP field is generated from the DataWriter or supplied by the application at the time of the write. This value is derived from the local hosts system clock, which is assumed to be synchronized with the clocks on other hosts within the domain. This field is required for DESTINATION_ORDER and LIFESPAN policy behaviors of subscriptions. It is also required to be present for all data in the SampleInfo structure supplied along with each data sample.

Definition at line 156 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::DCPS::DataReaderImpl::filter_sample(), init(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::ReceivedDataElement::ReceivedDataElement(), OpenDDS::DCPS::resend_data_expired(), and OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type().

Implementation-specific sub-message Ids.

Definition at line 86 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataLink::create_control(), init(), and OpenDDS::DCPS::MulticastDataLink::sample_received().


The documentation for this struct was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1