OpenDDS::DCPS::DataSampleHeader Struct Reference

#include <DataSampleHeader.h>

Collaboration diagram for OpenDDS::DCPS::DataSampleHeader:

Collaboration graph
[legend]
List of all members.

Public Types

 MESSAGE_ID_OFFSET = 0
 SUBMESSAGE_ID_OFFSET = 1
 FLAGS_OFFSET = 2
enum  { MESSAGE_ID_OFFSET = 0, SUBMESSAGE_ID_OFFSET = 1, FLAGS_OFFSET = 2 }

Public Member Functions

 DataSampleHeader ()
 DataSampleHeader (ACE_Message_Block &buffer)
 Construct with values extracted from a buffer.
DataSampleHeaderoperator= (ACE_Message_Block &buffer)
 Assignment from an ACE_Message_Block.
size_t marshaled_size () const
 Amount of data read when initializing from a buffer.
void init (ACE_Message_Block *buffer)
 Implement load from buffer.
bool into_received_data_sample (ReceivedDataSample &rds)
ACE_UINT32 message_length () const
bool more_fragments () const
void pdu_remaining (size_t)

Static Public Member Functions

static ACE_UINT8 mask_flag (DataSampleHeaderFlag flag)
static ACE_UINT8 mask_flag (DataSampleHeaderFlag2 flag)
static void clear_flag (DataSampleHeaderFlag flag, ACE_Message_Block *buffer)
static void set_flag (DataSampleHeaderFlag flag, ACE_Message_Block *buffer)
static bool test_flag (DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
static bool partial (const ACE_Message_Block &mb)
 Does the data in this mb constitute a partial Sample Header?
static void add_cfentries (const GUIDSeq *guids, ACE_Message_Block *mb)
static void split (const ACE_Message_Block &orig, size_t size, ACE_Message_Block *&head, ACE_Message_Block *&tail)
static bool join (const DataSampleHeader &first, const DataSampleHeader &second, DataSampleHeader &result)
static size_t max_marshaled_size ()
 Similar to IDL compiler generated methods.
static ACE_Message_Block * alloc_msgblock (const ACE_Message_Block &mb, size_t size, bool use_data_alloc)
static void split_payload (const ACE_Message_Block &orig, size_t size, ACE_Message_Block *&head, ACE_Message_Block *&tail)

Public Attributes

char message_id_
 The enum MessageId.
char submessage_id_
 Implementation-specific sub-message Ids.
bool byte_order_: 1
bool coherent_change_: 1
bool historic_sample_: 1
bool lifespan_duration_: 1
bool group_coherent_: 1
bool content_filter_: 1
bool sequence_repair_: 1
bool more_fragments_: 1
 The current "Data Sample" needs reassembly before further processing.
bool cdr_encapsulation_: 1
bool key_fields_only_: 1
 Only the key fields of the data sample are present in the payload.
bool reserved_1: 1
bool reserved_2: 1
bool reserved_3: 1
bool reserved_4: 1
bool reserved_5: 1
bool reserved_6: 1
ACE_UINT32 message_length_
SequenceNumber sequence_
ACE_INT32 source_timestamp_sec_
ACE_UINT32 source_timestamp_nanosec_
ACE_INT32 lifespan_duration_sec_
ACE_UINT32 lifespan_duration_nanosec_
PublicationId publication_id_
RepoId publisher_id_
GUIDSeq content_filter_entries_

Private Attributes

size_t marshaled_size_
 Keep track of the amount of data read from a buffer.

Detailed Description

The header message of a data sample. This header and the data sample are in different message block and will be chained together.

Definition at line 70 of file DataSampleHeader.h.


Member Enumeration Documentation

anonymous enum

Enumerator:
MESSAGE_ID_OFFSET 
SUBMESSAGE_ID_OFFSET 
FLAGS_OFFSET 

Definition at line 71 of file DataSampleHeader.h.

00071        {
00072     MESSAGE_ID_OFFSET = 0,
00073     SUBMESSAGE_ID_OFFSET = 1,
00074     FLAGS_OFFSET = 2 // message_id_ + submessage_id_
00075   };


Constructor & Destructor Documentation

ACE_INLINE OpenDDS::DCPS::DataSampleHeader::DataSampleHeader (  ) 

Definition at line 9 of file DataSampleHeader.inl.

00010   : message_id_(0)
00011   , submessage_id_(0)
00012   , byte_order_(ACE_CDR_BYTE_ORDER)
00013   , coherent_change_(0)
00014   , historic_sample_(0)
00015   , lifespan_duration_(0)
00016   , group_coherent_(0)
00017   , content_filter_(0)
00018   , sequence_repair_(0)
00019   , more_fragments_(0)
00020   , cdr_encapsulation_(0)
00021   , key_fields_only_(0)
00022   , reserved_1(0)
00023   , reserved_2(0)
00024   , reserved_3(0)
00025   , reserved_4(0)
00026   , reserved_5(0)
00027   , reserved_6(0)
00028   , message_length_(0)
00029   , sequence_()
00030   , source_timestamp_sec_(0)
00031   , source_timestamp_nanosec_(0)
00032   , lifespan_duration_sec_(0)
00033   , lifespan_duration_nanosec_(0)
00034   , publication_id_(GUID_UNKNOWN)
00035   , publisher_id_(GUID_UNKNOWN)
00036   , marshaled_size_(0)
00037 {
00038 }

ACE_INLINE OpenDDS::DCPS::DataSampleHeader::DataSampleHeader ( ACE_Message_Block &  buffer  )  [explicit]

Construct with values extracted from a buffer.

Definition at line 41 of file DataSampleHeader.inl.

References init().

00042   : message_id_(0)
00043   , submessage_id_(0)
00044   , byte_order_(ACE_CDR_BYTE_ORDER)
00045   , coherent_change_(0)
00046   , historic_sample_(0)
00047   , lifespan_duration_(0)
00048   , group_coherent_(0)
00049   , content_filter_(0)
00050   , sequence_repair_(0)
00051   , more_fragments_(0)
00052   , cdr_encapsulation_(0)
00053   , key_fields_only_(0)
00054   , reserved_1(0)
00055   , reserved_2(0)
00056   , reserved_3(0)
00057   , reserved_4(0)
00058   , reserved_5(0)
00059   , reserved_6(0)
00060   , message_length_(0)
00061   , sequence_()
00062   , source_timestamp_sec_(0)
00063   , source_timestamp_nanosec_(0)
00064   , lifespan_duration_sec_(0)
00065   , lifespan_duration_nanosec_(0)
00066   , publication_id_(GUID_UNKNOWN)
00067   , publisher_id_(GUID_UNKNOWN)
00068 {
00069   this->init(&buffer);
00070 }


Member Function Documentation

void OpenDDS::DCPS::DataSampleHeader::add_cfentries ( const GUIDSeq guids,
ACE_Message_Block *  mb 
) [static]

Marshal the "guids" as an optional header chained as to the continuation of "mb" (which must already be a valid DataSampleHeader serialization). Any existing payload of "mb" (its continuation) will be chained after the new optional header part. "guids" may be null, same serialization as 0.

Definition at line 318 of file DataSampleHeader.cpp.

References alloc_msgblock(), OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::swap(), and test_flag().

Referenced by OpenDDS::DCPS::DataLinkSet::send(), and split().

00319 {
00320   size_t size = 0;
00321   if (guids) {
00322     size_t padding = 0; // GUIDs are always aligned
00323     gen_find_size(*guids, size, padding);
00324   } else {
00325     size = sizeof(CORBA::ULong);
00326   }
00327   ACE_Message_Block* optHdr = alloc_msgblock(*mb, size, false);
00328 
00329   const bool swap = (ACE_CDR_BYTE_ORDER != test_flag(BYTE_ORDER_FLAG, mb));
00330   Serializer ser(optHdr, swap);
00331   if (guids) {
00332     ser << *guids;
00333   } else {
00334     ser << CORBA::ULong(0);
00335   }
00336 
00337   // New chain: mb (DataSampleHeader), optHdr (GUIDSeq), data (Foo or control)
00338   optHdr->cont(mb->cont());
00339   mb->cont(optHdr);
00340 }

ACE_Message_Block * OpenDDS::DCPS::DataSampleHeader::alloc_msgblock ( const ACE_Message_Block &  mb,
size_t  size,
bool  use_data_alloc 
) [static]

Definition at line 74 of file DataSampleHeader.cpp.

References OpenDDS::RTPS::DATA.

Referenced by add_cfentries(), OpenDDS::DCPS::RtpsSampleHeader::split(), and split().

00076 {
00077   enum { DATA, DB, MB, N_ALLOC };
00078   ACE_Allocator* allocators[N_ALLOC];
00079   // It's an ACE bug that access_allocators isn't const
00080   ACE_Message_Block& mut_mb = const_cast<ACE_Message_Block&>(mb);
00081   mut_mb.access_allocators(allocators[DATA], allocators[DB], allocators[MB]);
00082   if (allocators[MB]) {
00083     ACE_Message_Block* result;
00084     ACE_NEW_MALLOC_RETURN(result,
00085       static_cast<ACE_Message_Block*>(
00086         allocators[MB]->malloc(sizeof(ACE_Message_Block))),
00087       ACE_Message_Block(size,
00088                         ACE_Message_Block::MB_DATA,
00089                         0, // cont
00090                         0, // data
00091                         use_data_alloc ? allocators[DATA] : 0,
00092                         mut_mb.locking_strategy(), // locking_strategy
00093                         ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00094                         ACE_Time_Value::zero,
00095                         ACE_Time_Value::max_time,
00096                         allocators[DB],
00097                         allocators[MB]),
00098       0);
00099     return result;
00100   } else {
00101     return new ACE_Message_Block(size,
00102                                   ACE_Message_Block::MB_DATA,
00103                                   0, // cont
00104                                   0, // data
00105                                   use_data_alloc ? allocators[DATA] : 0,
00106                                   mut_mb.locking_strategy(), // locking_strategy
00107                                   ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00108                                   ACE_Time_Value::zero,
00109                                   ACE_Time_Value::max_time,
00110                                   allocators[DB]);
00111   }
00112 }

ACE_INLINE void OpenDDS::DCPS::DataSampleHeader::clear_flag ( DataSampleHeaderFlag  flag,
ACE_Message_Block *  buffer 
) [static]

The clear_flag and set_flag methods are a hack to update the header flags after a sample has been serialized without deserializing the entire message. This method will break if the current Serializer behavior changes.

Definition at line 113 of file DataSampleHeader.inl.

References FLAGS_OFFSET, and mask_flag().

00115 {
00116   char* base = buffer->base();
00117 
00118   // verify sufficient length exists:
00119   if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) {
00120     ACE_ERROR((LM_ERROR,
00121                ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::clear_flag: ")
00122                ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n")));
00123     return;
00124   }
00125 
00126   base[FLAGS_OFFSET] &= ~mask_flag(flag);
00127 }

void OpenDDS::DCPS::DataSampleHeader::init ( ACE_Message_Block *  buffer  ) 

Implement load from buffer.

Definition at line 167 of file DataSampleHeader.cpp.

References byte_order_, OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::CDR_ENCAP_FLAG, cdr_encapsulation_, coherent_change_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, content_filter_, content_filter_entries_, OpenDDS::DCPS::CONTENT_FILTER_FLAG, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::Serializer::good_bit(), group_coherent_, OpenDDS::DCPS::GROUP_COHERENT_FLAG, historic_sample_, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, key_fields_only_, OpenDDS::DCPS::KEY_ONLY_FLAG, lifespan_duration_, OpenDDS::DCPS::LIFESPAN_DURATION_FLAG, lifespan_duration_nanosec_, lifespan_duration_sec_, marshaled_size_, mask_flag(), message_id_, message_length_, more_fragments_, OpenDDS::DCPS::MORE_FRAGMENTS_FLAG, publication_id_, publisher_id_, sequence_, sequence_repair_, OpenDDS::DCPS::SEQUENCE_REPAIR_FLAG, source_timestamp_nanosec_, source_timestamp_sec_, submessage_id_, and OpenDDS::DCPS::Serializer::swap_bytes().

Referenced by DataSampleHeader(), and operator=().

00168 {
00169   this->marshaled_size_ = 0;
00170 
00171   Serializer reader(buffer);
00172 
00173   // Only byte-sized reads until we get the byte_order_ flag.
00174 
00175   reader >> this->message_id_;
00176 
00177   if (!reader.good_bit()) return;
00178   this->marshaled_size_ += sizeof(this->message_id_);
00179 
00180   reader >> this->submessage_id_;
00181 
00182   if (!reader.good_bit()) return;
00183   this->marshaled_size_ += sizeof(this->submessage_id_);
00184 
00185   // Extract the flag values.
00186   ACE_CDR::Octet byte;
00187   reader >> ACE_InputCDR::to_octet(byte);
00188 
00189   if (!reader.good_bit()) return;
00190   this->marshaled_size_ += sizeof(byte);
00191 
00192   this->byte_order_         = byte & mask_flag(BYTE_ORDER_FLAG);
00193   this->coherent_change_    = byte & mask_flag(COHERENT_CHANGE_FLAG);
00194   this->historic_sample_    = byte & mask_flag(HISTORIC_SAMPLE_FLAG);
00195   this->lifespan_duration_  = byte & mask_flag(LIFESPAN_DURATION_FLAG);
00196   this->group_coherent_     = byte & mask_flag(GROUP_COHERENT_FLAG);
00197   this->content_filter_     = byte & mask_flag(CONTENT_FILTER_FLAG);
00198   this->sequence_repair_    = byte & mask_flag(SEQUENCE_REPAIR_FLAG);
00199   this->more_fragments_     = byte & mask_flag(MORE_FRAGMENTS_FLAG);
00200 
00201   // Set swap_bytes flag to the Serializer if data sample from
00202   // the publisher is in different byte order.
00203   reader.swap_bytes(this->byte_order_ != ACE_CDR_BYTE_ORDER);
00204 
00205   reader >> ACE_InputCDR::to_octet(byte);
00206 
00207   if (!reader.good_bit()) return;
00208   this->marshaled_size_ += sizeof(byte);
00209 
00210   this->cdr_encapsulation_ = byte & mask_flag(CDR_ENCAP_FLAG);
00211   this->key_fields_only_   = byte & mask_flag(KEY_ONLY_FLAG);
00212 
00213   reader >> this->message_length_;
00214 
00215   if (!reader.good_bit()) return;
00216   this->marshaled_size_ += sizeof(this->message_length_);
00217 
00218   reader >> this->sequence_;
00219 
00220   if (!reader.good_bit()) return;
00221   size_t padding = 0;
00222   gen_find_size(this->sequence_, this->marshaled_size_, padding);
00223 
00224   reader >> this->source_timestamp_sec_;
00225 
00226   if (!reader.good_bit()) return;
00227   this->marshaled_size_ += sizeof(this->source_timestamp_sec_);
00228 
00229   reader >> this->source_timestamp_nanosec_;
00230 
00231   if (!reader.good_bit()) return;
00232   this->marshaled_size_ += sizeof(this->source_timestamp_nanosec_);
00233 
00234   if (this->lifespan_duration_) {
00235     reader >> this->lifespan_duration_sec_;
00236 
00237     if (!reader.good_bit()) return;
00238     this->marshaled_size_ += sizeof(this->lifespan_duration_sec_);
00239 
00240     reader >> this->lifespan_duration_nanosec_;
00241 
00242     if (!reader.good_bit()) return;
00243     this->marshaled_size_ += sizeof(this->lifespan_duration_nanosec_);
00244   }
00245 
00246   reader >> this->publication_id_;
00247 
00248   if (!reader.good_bit()) return;
00249   gen_find_size(this->publication_id_, this->marshaled_size_, padding);
00250 
00251 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00252   if (this->group_coherent_) {
00253     reader >> this->publisher_id_;
00254     if (!reader.good_bit()) return;
00255     gen_find_size(this->publisher_id_, this->marshaled_size_, padding);
00256   }
00257 #endif
00258 
00259   if (this->content_filter_) {
00260     reader >> this->content_filter_entries_;
00261     if (!reader.good_bit()) return;
00262     gen_find_size(this->content_filter_entries_, this->marshaled_size_, padding);
00263   }
00264 }

bool OpenDDS::DCPS::DataSampleHeader::into_received_data_sample ( ReceivedDataSample rds  ) 

Definition at line 723 of file DataSampleHeader.cpp.

References OpenDDS::DCPS::ReceivedDataSample::header_.

00724 {
00725   rds.header_ = *this;
00726   return true;
00727 }

bool OpenDDS::DCPS::DataSampleHeader::join ( const DataSampleHeader first,
const DataSampleHeader second,
DataSampleHeader result 
) [static]

If "first" and "second" are two fragments of the same original message (as created by split()), return true and set up the "result" header to match the original header. Joining the data payload is the responsibility of the caller (manipulate the continuation chain).

Definition at line 439 of file DataSampleHeader.cpp.

References content_filter_, content_filter_entries_, message_length_, more_fragments_, and sequence_.

Referenced by OpenDDS::DCPS::TransportReassembly::insert().

00441 {
00442   if (!first.more_fragments_ || first.sequence_ != second.sequence_) {
00443     return false;
00444   }
00445   result = second;
00446   result.message_length_ += first.message_length_;
00447   if (first.content_filter_) {
00448     result.content_filter_ = true;
00449     const CORBA::ULong entries = first.content_filter_entries_.length();
00450     CORBA::ULong x = result.content_filter_entries_.length();
00451     result.content_filter_entries_.length(x + entries);
00452     for (CORBA::ULong i(entries); i > 0;) {
00453       result.content_filter_entries_[x++] = first.content_filter_entries_[--i];
00454     }
00455   }
00456   return true;
00457 }

ACE_INLINE size_t OpenDDS::DCPS::DataSampleHeader::marshaled_size (  )  const

Amount of data read when initializing from a buffer.

Definition at line 82 of file DataSampleHeader.inl.

References marshaled_size_.

00083 {
00084   return marshaled_size_;
00085 }

static ACE_UINT8 OpenDDS::DCPS::DataSampleHeader::mask_flag ( DataSampleHeaderFlag2  flag  )  [inline, static]

Definition at line 179 of file DataSampleHeader.h.

00179 { return 1 << flag; }

static ACE_UINT8 OpenDDS::DCPS::DataSampleHeader::mask_flag ( DataSampleHeaderFlag  flag  )  [inline, static]

Definition at line 178 of file DataSampleHeader.h.

Referenced by clear_flag(), init(), partial(), set_flag(), and test_flag().

00178 { return 1 << flag; }

ACE_INLINE size_t OpenDDS::DCPS::DataSampleHeader::max_marshaled_size (  )  [static]

Similar to IDL compiler generated methods.

Definition at line 89 of file DataSampleHeader.inl.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::RTPS::Sedp::Writer::end_historic_samples(), OpenDDS::DCPS::UdpDataLink::open(), partial(), OpenDDS::DCPS::TransportSendStrategy::send(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), split(), and OpenDDS::RTPS::Sedp::Writer::write_sample().

00090 {
00091   return 1 + // message_id_;
00092          1 + // submessage_id_;
00093          2 + // flags
00094          4 + // message_length_;
00095          8 + // sequence_;
00096          4 + // source_timestamp_sec_;
00097          4 + // source_timestamp_nanosec_;
00098          4 + // lifespan_duration_sec_;
00099          4 + // lifespan_duration_nanosec_;
00100         16 + // publication_id_;
00101         16 ; // publisher_id_;
00102   // content_filter_entries_ is not marsahled into the same Data Block
00103   // so it is not part of the max_marshaled_size() which is used to allocate
00104 }

ACE_UINT32 OpenDDS::DCPS::DataSampleHeader::message_length (  )  const [inline]

Definition at line 230 of file DataSampleHeader.h.

00230 { return this->message_length_; }

bool OpenDDS::DCPS::DataSampleHeader::more_fragments (  )  const [inline]

Definition at line 232 of file DataSampleHeader.h.

00232 { return this->more_fragments_; }

ACE_INLINE OpenDDS::DCPS::DataSampleHeader & OpenDDS::DCPS::DataSampleHeader::operator= ( ACE_Message_Block &  buffer  ) 

Assignment from an ACE_Message_Block.

Definition at line 74 of file DataSampleHeader.inl.

References init().

00075 {
00076   this->init(&buffer);
00077   return *this;
00078 }

bool OpenDDS::DCPS::DataSampleHeader::partial ( const ACE_Message_Block &  mb  )  [static]

Does the data in this mb constitute a partial Sample Header?

Definition at line 114 of file DataSampleHeader.cpp.

References OpenDDS::DCPS::BYTE_ORDER_FLAG, OpenDDS::DCPS::CONTENT_FILTER_FLAG, FLAGS_OFFSET, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::GROUP_COHERENT_FLAG, OpenDDS::DCPS::LIFESPAN_DURATION_FLAG, mask_flag(), max_marshaled_size(), mb_peek(), OpenDDS::DCPS::MESSAGE_ID_MAX, MESSAGE_ID_OFFSET, OpenDDS::DCPS::SUBMESSAGE_ID_MAX, SUBMESSAGE_ID_OFFSET, and OpenDDS::DCPS::swap().

00115 {
00116   static const unsigned int LIFESPAN_MASK = mask_flag(LIFESPAN_DURATION_FLAG),
00117     LIFESPAN_LENGTH = 8,
00118     COHERENT_MASK = mask_flag(GROUP_COHERENT_FLAG),
00119     COHERENT_LENGTH = 16,
00120     CONTENT_FILT_MASK = mask_flag(CONTENT_FILTER_FLAG),
00121     BYTE_ORDER_MASK = mask_flag(BYTE_ORDER_FLAG);
00122 
00123   const size_t len = mb.total_length();
00124 
00125   if (len <= FLAGS_OFFSET) return true;
00126 
00127   unsigned char msg_id;
00128   if (!mb_peek(msg_id, mb, MESSAGE_ID_OFFSET,
00129                false /*swap ignored for char*/)
00130       || int(msg_id) >= MESSAGE_ID_MAX) {
00131     // This check, and the similar one below for submessage id, are actually
00132     // indicating an invalid header (and not a partial header) but we can
00133     // treat it the same as partial for the sake of the TransportRecvStrategy.
00134     return true;
00135   }
00136 
00137   if (!mb_peek(msg_id, mb, SUBMESSAGE_ID_OFFSET,
00138                false /*swap ignored for char*/)
00139       || int(msg_id) >= SUBMESSAGE_ID_MAX) {
00140     return true;
00141   }
00142 
00143   char flags;
00144   if (!mb_peek(flags, mb, FLAGS_OFFSET, false /*swap ignored for char*/)) {
00145     return true;
00146   }
00147 
00148   size_t expected = max_marshaled_size();
00149   if (!(flags & LIFESPAN_MASK)) expected -= LIFESPAN_LENGTH;
00150   if (!(flags & COHERENT_MASK)) expected -= COHERENT_LENGTH;
00151 
00152   if (flags & CONTENT_FILT_MASK) {
00153     CORBA::ULong seqLen;
00154     const bool swap = (flags & BYTE_ORDER_MASK) != ACE_CDR_BYTE_ORDER;
00155     if (!mb_peek(seqLen, mb, expected, swap)) {
00156       return true;
00157     }
00158     size_t guidsize = 0, padding = 0;
00159     gen_find_size(GUID_t(), guidsize, padding);
00160     expected += sizeof(seqLen) + guidsize * seqLen;
00161   }
00162 
00163   return len < expected;
00164 }

void OpenDDS::DCPS::DataSampleHeader::pdu_remaining ( size_t   )  [inline]

Definition at line 234 of file DataSampleHeader.h.

00234 { /* ignored, only RTPS uses this */ }

ACE_INLINE void OpenDDS::DCPS::DataSampleHeader::set_flag ( DataSampleHeaderFlag  flag,
ACE_Message_Block *  buffer 
) [static]

Definition at line 131 of file DataSampleHeader.inl.

References FLAGS_OFFSET, and mask_flag().

Referenced by OpenDDS::DCPS::WriteDataContainer::data_delivered().

00133 {
00134   char* base = buffer->base();
00135 
00136   // verify sufficient length exists:
00137   if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) {
00138     ACE_ERROR((LM_ERROR,
00139                ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::set_flag: ")
00140                ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n")));
00141     return;
00142   }
00143 
00144   base[FLAGS_OFFSET] |= mask_flag(flag);
00145 }

void OpenDDS::DCPS::DataSampleHeader::split ( const ACE_Message_Block &  orig,
size_t  size,
ACE_Message_Block *&  head,
ACE_Message_Block *&  tail 
) [static]

Create two new serialized headers (owned by caller), the "head" having at most "size" bytes (header + data) and the "tail" having the rest.

Definition at line 369 of file DataSampleHeader.cpp.

References add_cfentries(), alloc_msgblock(), OpenDDS::DCPS::gen_max_marshaled_size(), max_marshaled_size(), message_length_, more_fragments_, and split_payload().

Referenced by OpenDDS::DCPS::TransportQueueElement::fragment().

00371 {
00372   ACE_Message_Block* dup = orig.duplicate();
00373   AMB_Releaser rel(dup);
00374 
00375   const size_t length = dup->total_length();
00376   DataSampleHeader hdr(*dup); // deserialize entire header (with cfentries)
00377   const size_t hdr_len = length - dup->total_length();
00378 
00379   ACE_Message_Block* payload = dup;
00380   ACE_Message_Block* prev = 0;
00381   for (; payload->length() == 0; payload = payload->cont()) {
00382     prev = payload;
00383   }
00384   prev->cont(0);
00385 
00386   if (size < hdr_len) { // need to fragment the content_filter_entries_
00387     head = alloc_msgblock(*dup, max_marshaled_size(), true);
00388     hdr.more_fragments_ = true;
00389     hdr.message_length_ = 0; // no room for payload data
00390     *head << hdr;
00391     const size_t avail = size - head->length() - 4 /* sequence length */;
00392     const CORBA::ULong n_entries =
00393       static_cast<CORBA::ULong>(avail / gen_max_marshaled_size(GUID_t()));
00394     GUIDSeq entries(n_entries);
00395     entries.length(n_entries);
00396     // remove from the end of hdr's entries (order doesn't matter)
00397     for (CORBA::ULong i(0), x(hdr.content_filter_entries_.length());
00398          i < n_entries; ++i) {
00399       entries[i] = hdr.content_filter_entries_[--x];
00400       hdr.content_filter_entries_.length(x);
00401     }
00402     add_cfentries(&entries, head);
00403 
00404     tail = alloc_msgblock(*dup, max_marshaled_size(), true);
00405     hdr.more_fragments_ = false;
00406     hdr.content_filter_ = (hdr.content_filter_entries_.length() > 0);
00407     hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length());
00408     *tail << hdr;
00409     tail->cont(payload);
00410     if (hdr.content_filter_) {
00411       add_cfentries(&hdr.content_filter_entries_, tail);
00412     }
00413     return;
00414   }
00415 
00416   ACE_Message_Block* payload_tail;
00417   split_payload(*payload, size - hdr_len, payload, payload_tail);
00418 
00419   hdr.more_fragments_ = true;
00420   hdr.message_length_ = static_cast<ACE_UINT32>(payload->total_length());
00421 
00422   head = alloc_msgblock(*dup, max_marshaled_size(), true);
00423   *head << hdr;
00424   head->cont(payload);
00425   if (hdr.content_filter_) {
00426     add_cfentries(&hdr.content_filter_entries_, head);
00427   }
00428 
00429   hdr.more_fragments_ = false;
00430   hdr.content_filter_ = false;
00431   hdr.message_length_ = static_cast<ACE_UINT32>(payload_tail->total_length());
00432 
00433   tail = alloc_msgblock(*dup, max_marshaled_size(), true);
00434   *tail << hdr;
00435   tail->cont(payload_tail);
00436 }

void OpenDDS::DCPS::DataSampleHeader::split_payload ( const ACE_Message_Block &  orig,
size_t  size,
ACE_Message_Block *&  head,
ACE_Message_Block *&  tail 
) [static]

Definition at line 343 of file DataSampleHeader.cpp.

Referenced by OpenDDS::DCPS::RtpsSampleHeader::split(), and split().

00346 {
00347   if (!head) {
00348     head = orig.duplicate();
00349   }
00350 
00351   ACE_Message_Block* frag = head;
00352   size_t frag_remain = size;
00353   for (; frag_remain > frag->length(); frag = frag->cont()) {
00354     frag_remain -= frag->length();
00355   }
00356 
00357   if (frag_remain == frag->length()) { // split at ACE_Message_Block boundary
00358     tail = frag->cont();
00359   } else {
00360     tail = frag->duplicate();
00361     frag->wr_ptr(frag->wr_ptr() - frag->length() + frag_remain);
00362     ACE_Message_Block::release(frag->cont());
00363     tail->rd_ptr(frag_remain);
00364   }
00365   frag->cont(0);
00366 }

ACE_INLINE bool OpenDDS::DCPS::DataSampleHeader::test_flag ( DataSampleHeaderFlag  flag,
const ACE_Message_Block *  buffer 
) [static]

Definition at line 149 of file DataSampleHeader.inl.

References FLAGS_OFFSET, and mask_flag().

Referenced by add_cfentries(), OpenDDS::DCPS::DataDurabilityCache::insert(), and OpenDDS::DCPS::DataLinkSet::send().

00151 {
00152   char* base = buffer->base();
00153 
00154   // verify sufficient length exists:
00155   if (static_cast<size_t>(buffer->end() - base) < FLAGS_OFFSET + 1) {
00156     ACE_ERROR_RETURN((LM_ERROR,
00157                       ACE_TEXT("(%P|%t) ERROR: DataSampleHeader::set_flag: ")
00158                       ACE_TEXT("ACE_Message_Block too short (missing flags octet).\n")), false);
00159   }
00160 
00161   // Test flag bit.
00162   return base[FLAGS_OFFSET] & mask_flag(flag);
00163 }


Member Data Documentation

bool OpenDDS::DCPS::DataSampleHeader::byte_order_

0 - Message encoded using big-endian byte order. (see ace/CDR_Base.h) 1 - Message encoded using little-endian byte order.

Definition at line 85 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::DataWriterImpl::filter_out(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_

The data payload uses CDR encapsulation and alignment rules, as defined by the RTPS specification formal/2010-11-01.

Definition at line 120 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::DataWriterImpl::filter_out(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::coherent_change_

The flag indicates the sample belongs to a coherent change set (i.e. PRESENTATION coherent_access == true).

Definition at line 89 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::content_filter_

The publishing side has applied content filtering, and the optional content_filter_entries_ field is present in the marshaled header.

Definition at line 103 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataLink::data_received_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), init(), join(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

GUIDSeq OpenDDS::DCPS::DataSampleHeader::content_filter_entries_

Optional field present if the content_filter_ flag bit is set. Indicates which readers should not receive the data.

Definition at line 176 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataLink::data_received_i(), init(), join(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::group_coherent_

Definition at line 99 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::historic_sample_

This flag indicates a sample has been resent from a non-VOLATILE DataWriter.

Definition at line 93 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::DataReaderImpl::deliver_historic(), init(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsSampleHeader::process_iqos(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::key_fields_only_

Only the key fields of the data sample are present in the payload.

Definition at line 123 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::lifespan_duration_

This flag indicates the sample header contains non-default LIFESPAN duration fields.

Definition at line 97 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

ACE_UINT32 OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_

Definition at line 163 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

ACE_INT32 OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_

The LIFESPAN duration field is generated from the DataWriter or supplied by the application at the time of the write. This field is used to determine if a given sample is considered 'stale' and should be discarded by associated DataReader. These fields are optional and are controlled by the lifespan_duration_ flag.

Definition at line 162 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

size_t OpenDDS::DCPS::DataSampleHeader::marshaled_size_ [private]

Keep track of the amount of data read from a buffer.

Definition at line 244 of file DataSampleHeader.h.

Referenced by init(), and marshaled_size().

char OpenDDS::DCPS::DataSampleHeader::message_id_

The enum MessageId.

Definition at line 78 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::WriteDataContainer::data_dropped(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal(), OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::TcpReceiveStrategy::deliver_sample(), OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::RtpsSampleHeader::process_iqos(), OpenDDS::DCPS::WriteDataContainer::release_buffer(), OpenDDS::DCPS::MulticastDataLink::sample_received(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), OpenDDS::DCPS::TransportClient::send_i(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().

ACE_UINT32 OpenDDS::DCPS::DataSampleHeader::message_length_

The size of the data sample (without header). After this header is demarshaled, the transport expects to see this many bytes in the stream before the start of the next header (or end of the Transport PDU).

Definition at line 135 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::DataReaderImpl::data_received(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), split(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::more_fragments_

The current "Data Sample" needs reassembly before further processing.

Definition at line 111 of file DataSampleHeader.h.

Referenced by init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), OpenDDS::DCPS::operator<<(), split(), and OpenDDS::DCPS::to_string().

PublicationId OpenDDS::DCPS::DataSampleHeader::publication_id_

Identify the DataWriter that produced the sample data being sent.

Definition at line 168 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), OpenDDS::DCPS::DataReaderImpl::data_received(), OpenDDS::DCPS::DataLink::data_received_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::ReliableSession::ready_to_deliver(), OpenDDS::DCPS::TransportReassembly::reassemble_i(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().

RepoId OpenDDS::DCPS::DataSampleHeader::publisher_id_

Id representing the coherent group. Optional field that's only present if the flag for group_coherent_ is set.

Definition at line 172 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::reserved_1

Definition at line 125 of file DataSampleHeader.h.

bool OpenDDS::DCPS::DataSampleHeader::reserved_2

Definition at line 126 of file DataSampleHeader.h.

bool OpenDDS::DCPS::DataSampleHeader::reserved_3

Definition at line 127 of file DataSampleHeader.h.

bool OpenDDS::DCPS::DataSampleHeader::reserved_4

Definition at line 128 of file DataSampleHeader.h.

bool OpenDDS::DCPS::DataSampleHeader::reserved_5

Definition at line 129 of file DataSampleHeader.h.

bool OpenDDS::DCPS::DataSampleHeader::reserved_6

Definition at line 130 of file DataSampleHeader.h.

SequenceNumber OpenDDS::DCPS::DataSampleHeader::sequence_

The sequence number is obtained from the Publisher associated with the DataWriter based on the PRESENTATION requirement for the sequence value (access_scope == GROUP).

Definition at line 140 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::check_historic(), OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::WriteDataContainer::data_delivered(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), init(), OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), join(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::ReliableSession::ready_to_deliver(), OpenDDS::DCPS::TransportReassembly::reassemble_i(), OpenDDS::DCPS::TransportSendElement::sequence(), OpenDDS::DCPS::TransportSendControlElement::sequence(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().

bool OpenDDS::DCPS::DataSampleHeader::sequence_repair_

Due to content filtering, a gap in the sequence numbers may be an expected condition. If this bit is set, assume prior sequence numbers were filtered-out and are not missing.

Definition at line 108 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), init(), OpenDDS::DCPS::operator<<(), and OpenDDS::DCPS::to_string().

ACE_UINT32 OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_

Definition at line 152 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), init(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::resend_data_expired(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().

ACE_INT32 OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_

The SOURCE_TIMESTAMP field is generated from the DataWriter or supplied by the application at the time of the write. This value is derived from the local hosts system clock, which is assumed to be synchronized with the clocks on other hosts within the domain. This field is required for DESTINATION_ORDER and LIFESPAN policy behaviors of subscriptions. It is also required to be present for all data in the SampleInfo structure supplied along with each data sample.

Definition at line 151 of file DataSampleHeader.h.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_control_message(), OpenDDS::DCPS::ReplayerImpl::create_sample_data_message(), OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), OpenDDS::DCPS::RecorderImpl::data_received(), init(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::DataReaderImpl::process_latency(), OpenDDS::DCPS::resend_data_expired(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(), OpenDDS::RTPS::Sedp::Writer::set_header_fields(), and OpenDDS::DCPS::to_string().

char OpenDDS::DCPS::DataSampleHeader::submessage_id_

Implementation-specific sub-message Ids.

Definition at line 81 of file DataSampleHeader.h.

Referenced by init(), OpenDDS::DCPS::operator<<(), OpenDDS::DCPS::MulticastDataLink::sample_received(), and OpenDDS::DCPS::to_string().


The documentation for this struct was generated from the following files:
Generated on Fri Feb 12 20:06:13 2016 for OpenDDS by  doxygen 1.4.7