Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #ifndef OPENDDS_DCPS_DATASAMPLEHEADER_H
9 : #define OPENDDS_DCPS_DATASAMPLEHEADER_H
10 :
11 : #include "Cached_Allocator_With_Overflow_T.h"
12 : #include "Definitions.h"
13 : #include "GuidUtils.h"
14 : #include "Message_Block_Ptr.h"
15 : #include "PoolAllocationBase.h"
16 : #include "SequenceNumber.h"
17 :
18 : #include <ace/Guard_T.h>
19 : #include <ace/Lock.h>
20 :
21 : #include <iosfwd>
22 :
23 : #if !defined (ACE_LACKS_PRAGMA_ONCE)
24 : #pragma once
25 : #endif /* ACE_LACKS_PRAGMA_ONCE */
26 :
27 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
28 :
29 : namespace OpenDDS {
30 : namespace DCPS {
31 :
32 : class ReceivedDataSample;
33 :
34 : /// One byte message id (<256)
35 : enum MessageId {
36 : SAMPLE_DATA,
37 : DATAWRITER_LIVELINESS,
38 : INSTANCE_REGISTRATION,
39 : UNREGISTER_INSTANCE,
40 : DISPOSE_INSTANCE,
41 : GRACEFUL_DISCONNECT,
42 : REQUEST_ACK,
43 : SAMPLE_ACK,
44 : END_COHERENT_CHANGES,
45 : TRANSPORT_CONTROL,
46 : DISPOSE_UNREGISTER_INSTANCE,
47 : END_HISTORIC_SAMPLES,
48 : MESSAGE_ID_MAX // must be the last enumerator
49 : };
50 :
51 : enum SubMessageId {
52 : SUBMESSAGE_NONE,
53 : MULTICAST_SYN,
54 : MULTICAST_SYNACK,
55 : MULTICAST_NAK,
56 : MULTICAST_NAKACK,
57 : SUBMESSAGE_ID_MAX // must be the last enumerator
58 : };
59 :
60 : enum DataSampleHeaderFlag {
61 : BYTE_ORDER_FLAG,
62 : COHERENT_CHANGE_FLAG,
63 : HISTORIC_SAMPLE_FLAG,
64 : LIFESPAN_DURATION_FLAG,
65 : GROUP_COHERENT_FLAG,
66 : CONTENT_FILTER_FLAG,
67 : SEQUENCE_REPAIR_FLAG,
68 : MORE_FRAGMENTS_FLAG
69 : };
70 :
71 : enum DataSampleHeaderFlag2 {
72 : CDR_ENCAP_FLAG,
73 : KEY_ONLY_FLAG
74 : };
75 :
76 : /// The header message of a data sample.
77 : /// This header and the data sample are in different
78 : /// message block and will be chained together.
79 : struct OpenDDS_Dcps_Export DataSampleHeader : public PoolAllocationBase {
80 : enum {
81 : MESSAGE_ID_OFFSET = 0,
82 : SUBMESSAGE_ID_OFFSET = 1,
83 : FLAGS_OFFSET = 2 // message_id_ + submessage_id_
84 : };
85 :
86 : /// The enum MessageId.
87 : char message_id_;
88 :
89 : /// Implementation-specific sub-message Ids.
90 : char submessage_id_;
91 :
92 : /// 0 - Message encoded using big-endian byte order. (see ace/CDR_Base.h)
93 : /// 1 - Message encoded using little-endian byte order.
94 : bool byte_order_ : 1;
95 :
96 : /// The flag indicates the sample belongs to a coherent
97 : /// change set (i.e. PRESENTATION coherent_access == true).
98 : bool coherent_change_ : 1;
99 :
100 : /// This flag indicates a sample has been resent from a
101 : /// non-VOLATILE DataWriter.
102 : bool historic_sample_ : 1;
103 :
104 : /// This flag indicates the sample header contains non-default
105 : /// LIFESPAN duration fields.
106 : bool lifespan_duration_ : 1;
107 :
108 : bool group_coherent_ : 1;
109 :
110 : /// The publishing side has applied content filtering, and the optional
111 : /// content_filter_entries_ field is present in the marshaled header.
112 : bool content_filter_ : 1;
113 :
114 : /// Due to content filtering, a gap in the sequence numbers may be an
115 : /// expected condition. If this bit is set, assume prior sequence numbers
116 : /// were filtered-out and are not missing.
117 : bool sequence_repair_ : 1;
118 :
119 : /// The current "Data Sample" needs reassembly before further processing.
120 : bool more_fragments_ : 1;
121 :
122 : // bools above this line are in the first flags byte, below this line are
123 : // in the second flags byte. To avoid complicating the implementation of
124 : // partial(), flags that impact the size of serialized DataSampleHeader
125 : // should go in the first flags byte.
126 :
127 : /// The data payload uses CDR encapsulation and alignment rules, as defined
128 : /// by the RTPS specification formal/2010-11-01.
129 : bool cdr_encapsulation_ : 1;
130 :
131 : /// Only the key fields of the data sample are present in the payload.
132 : bool key_fields_only_ : 1;
133 :
134 : bool reserved_1 : 1;
135 : bool reserved_2 : 1;
136 : bool reserved_3 : 1;
137 : bool reserved_4 : 1;
138 : bool reserved_5 : 1;
139 : bool reserved_6 : 1;
140 :
141 : /// The size of the data sample (without header). After this header is
142 : /// demarshaled, the transport expects to see this many bytes in the stream
143 : /// before the start of the next header (or end of the Transport PDU).
144 : ACE_UINT32 message_length_;
145 :
146 : /// The sequence number is obtained from the Publisher
147 : /// associated with the DataWriter based on the PRESENTATION
148 : /// requirement for the sequence value (access_scope == GROUP).
149 : SequenceNumber sequence_;
150 :
151 : //{@
152 : /// The SOURCE_TIMESTAMP field is generated from the DataWriter
153 : /// or supplied by the application at the time of the write.
154 : /// This value is derived from the local hosts system clock,
155 : /// which is assumed to be synchronized with the clocks on other
156 : /// hosts within the domain. This field is required for
157 : /// DESTINATION_ORDER and LIFESPAN policy behaviors of subscriptions.
158 : /// It is also required to be present for all data in the
159 : /// SampleInfo structure supplied along with each data sample.
160 : ACE_INT32 source_timestamp_sec_;
161 : ACE_UINT32 source_timestamp_nanosec_; // Corresponding IDL is unsigned.
162 : //@}
163 :
164 : //{@
165 : /// The LIFESPAN duration field is generated from the DataWriter
166 : /// or supplied by the application at the time of the write. This
167 : /// field is used to determine if a given sample is considered
168 : /// 'stale' and should be discarded by associated DataReader.
169 : /// These fields are optional and are controlled by the
170 : /// lifespan_duration_ flag.
171 : ACE_INT32 lifespan_duration_sec_;
172 : ACE_UINT32 lifespan_duration_nanosec_; // Corresponding IDL is unsigned.
173 : //@}
174 :
175 : /// Identify the DataWriter that produced the sample data being
176 : /// sent.
177 : GUID_t publication_id_;
178 :
179 : /// Id representing the coherent group. Optional field that's only present if
180 : /// the flag for group_coherent_ is set.
181 : GUID_t publisher_id_;
182 :
183 : /// Optional field present if the content_filter_ flag bit is set.
184 : /// Indicates which readers should not receive the data.
185 : GUIDSeq content_filter_entries_;
186 :
187 274 : static ACE_UINT8 mask_flag(DataSampleHeaderFlag flag) { return 1 << flag; }
188 66 : static ACE_UINT8 mask_flag(DataSampleHeaderFlag2 flag) { return 1 << flag; }
189 :
190 : static void clear_flag(DataSampleHeaderFlag flag,
191 : ACE_Message_Block* buffer);
192 :
193 : static void set_flag(DataSampleHeaderFlag flag,
194 : ACE_Message_Block* buffer);
195 :
196 : static bool test_flag(DataSampleHeaderFlag flag,
197 : const ACE_Message_Block* buffer);
198 :
199 : /// Does the data in this mb constitute a partial Sample Header?
200 : static bool partial(const ACE_Message_Block& mb);
201 :
202 : /// Marshal the "guids" as an optional header chained as to the continuation
203 : /// of "mb" (which must already be a valid DataSampleHeader serialization).
204 : /// Any existing payload of "mb" (its continuation) will be chained after the
205 : /// new optional header part. "guids" may be null, same serialization as 0.
206 : static void add_cfentries(const GUIDSeq* guids, ACE_Message_Block* mb);
207 :
208 : /// Create two new serialized headers (owned by caller), the "head" having at
209 : /// most "size" bytes (header + data) and the "tail" having the rest.
210 : static void split(const ACE_Message_Block& orig, size_t size,
211 : Message_Block_Ptr& head, Message_Block_Ptr& tail);
212 :
213 : /// If "first" and "second" are two fragments of the same original message
214 : /// (as created by split()), return true and set up the "result" header to
215 : /// match the original header. Joining the data payload is the
216 : /// responsibility of the caller (manipulate the continuation chain).
217 : static bool join(const DataSampleHeader& first,
218 : const DataSampleHeader& second, DataSampleHeader& result);
219 :
220 : DataSampleHeader();
221 :
222 : /// Construct with values extracted from a buffer.
223 : explicit DataSampleHeader(ACE_Message_Block& buffer);
224 :
225 : /// Assignment from an ACE_Message_Block.
226 : DataSampleHeader& operator=(ACE_Message_Block& buffer);
227 :
228 : /// Amount of data read when initializing from a buffer.
229 : size_t get_serialized_size() const;
230 :
231 : /// Similar to IDL compiler generated methods.
232 : static size_t get_max_serialized_size();
233 :
234 : /// Implement load from buffer.
235 : void init(ACE_Message_Block* buffer);
236 :
237 : bool into_received_data_sample(ReceivedDataSample& rds);
238 :
239 : ACE_UINT32 message_length() const { return this->message_length_; }
240 :
241 0 : bool more_fragments() const { return this->more_fragments_; }
242 :
243 : void pdu_remaining(size_t) { /* ignored, only RTPS uses this */ }
244 :
245 : static ACE_Message_Block* alloc_msgblock(const ACE_Message_Block& mb,
246 : size_t size, bool use_data_alloc);
247 :
248 : static void split_payload(const ACE_Message_Block& orig, size_t size,
249 : Message_Block_Ptr& head, Message_Block_Ptr& tail);
250 :
251 : /// Returns true if the sample has a complete serialized payload.
252 : bool valid_data() const;
253 :
254 0 : DDS::InstanceStateKind instance_state() const
255 : {
256 0 : switch (message_id_) {
257 0 : case UNREGISTER_INSTANCE:
258 0 : return DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
259 0 : case DISPOSE_INSTANCE:
260 : case DISPOSE_UNREGISTER_INSTANCE:
261 0 : return DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
262 0 : default:
263 0 : return DDS::ALIVE_INSTANCE_STATE;
264 : }
265 : }
266 :
267 : private:
268 : /// Keep track of the amount of data read from a buffer.
269 : size_t serialized_size_;
270 :
271 : // If the constructor argument is null this object does nothing.
272 : // Otherwise it is an ACE_Guard for the lock constructor argument.
273 : struct MaybeGuard {
274 10 : explicit MaybeGuard(ACE_Lock* a) : guard_(a ? *a : non_lock) {}
275 :
276 : ACE_Guard<ACE_Lock> guard_;
277 :
278 : struct NoOpLock : ACE_Lock {
279 0 : int remove() { return 0; }
280 10 : int acquire() { return 0; }
281 0 : int tryacquire() { return 0; }
282 10 : int release() { return 0; }
283 0 : int acquire_read() { return 0; }
284 0 : int acquire_write() { return 0; }
285 0 : int tryacquire_read() { return 0; }
286 0 : int tryacquire_write() { return 0; }
287 0 : int tryacquire_write_upgrade() { return 0; }
288 : };
289 : static NoOpLock non_lock;
290 : };
291 : };
292 :
293 : typedef Cached_Allocator_With_Overflow<DataSampleHeader, ACE_Null_Mutex> DataSampleHeaderAllocator;
294 :
295 : OpenDDS_Dcps_Export
296 : const char* to_string(MessageId value);
297 : OpenDDS_Dcps_Export
298 : const char* to_string(SubMessageId value);
299 : OpenDDS_Dcps_Export
300 : OPENDDS_STRING to_string(const DataSampleHeader& value);
301 :
302 : /// Marshal/Insertion into a buffer.
303 : OpenDDS_Dcps_Export
304 : bool operator<<(ACE_Message_Block&, const DataSampleHeader& value);
305 :
306 : #ifndef OPENDDS_SAFETY_PROFILE
307 : /// Message Id enumeration insertion onto an ostream.
308 : OpenDDS_Dcps_Export
309 : std::ostream& operator<<(std::ostream& os, MessageId value);
310 :
311 : /// Sub-Message Id enumeration insertion onto an ostream.
312 : OpenDDS_Dcps_Export
313 : std::ostream& operator<<(std::ostream& os, SubMessageId value);
314 :
315 : /// Message header insertion onto an ostream.
316 : OpenDDS_Dcps_Export
317 : std::ostream& operator<<(std::ostream& str, const DataSampleHeader& value);
318 : #endif //OPENDDS_SAFETY_PROFILE
319 :
320 : } // namespace DCPS
321 : } // namespace OpenDDS
322 :
323 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
324 :
325 : #if defined(__ACE_INLINE__)
326 : #include "DataSampleHeader.inl"
327 : #endif /* __ACE_INLINE__ */
328 :
329 : #endif /* OPENDDS_DCPS_DATASAMPLEHEADER_H */
|