19 #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h> 27 #ifndef __ACE_INLINE__ 33 STATUS_INFO_DISPOSE = { { 0, 0, 0, 1 } },
34 STATUS_INFO_UNREGISTER = { { 0, 0, 0, 2 } },
35 STATUS_INFO_DISPOSE_UNREGISTER = { { 0, 0, 0, 3 } };
43 #ifndef OPENDDS_SAFETY_PROFILE 88 #define CASE_SMKIND(kind, class, name) case kind: { \ 90 if (ser >> submessage) { \ 91 octetsToNextHeader = submessage.smHeader.submessageLength; \ 92 submessage_.name##_sm(submessage); \ 113 #if defined(OPENDDS_SECURITY) 122 if (ser >> submessage) {
124 submessage_.security_sm(submessage);
125 submessage_._d(kind);
135 if (ser >> submessage) {
137 submessage_.unknown_sm(submessage);
148 data_ = (kind ==
DATA);
155 if (octetsToNextHeader == 0 && kind !=
PAD && kind !=
INFO_TS) {
159 octetsToNextHeader = remaining;
161 }
else if (octetsToNextHeader > remaining) {
171 message_length_ = octetsToNextHeader +
SMHDR_SZ - serialized_size_;
178 if (octetsToNextHeader +
SMHDR_SZ > marshaled) {
179 valid_ = ser.
skip(octetsToNextHeader +
SMHDR_SZ - marshaled);
180 serialized_size_ = octetsToNextHeader +
SMHDR_SZ;
191 #if defined(OPENDDS_TEST_INLINE_QOS) 194 output +=
" inline QoS parameters\n";
195 for (
CORBA::ULong index = 0; index < iqos.length(); ++index) {
196 output +=
" parameter type = ";
204 if (iqos[i].status_info() == STATUS_INFO_DISPOSE) {
206 }
else if (iqos[i].status_info() == STATUS_INFO_UNREGISTER) {
208 }
else if (iqos[i].status_info() == STATUS_INFO_DISPOSE_UNREGISTER) {
210 }
else if (iqos[i].status_info() == STATUS_INFO_REGISTER) {
215 #if defined(OPENDDS_TEST_INLINE_QOS) 221 "coherent_access = %d, ordered_access = %d\n",
242 switch (submessage_._d()) {
273 "(%P|%t) RtpsSampleHeader::into_received_data_sample()" 274 " - used KeyHash data as the key-only payload\n"));
284 "(%P|%t) RtpsSampleHeader::into_received_data_sample() - " 285 "Received a DATA Submessage with D = 0 and K = 0, " 343 subm[i].info_ts_sm(ts);
348 RtpsSampleHeader::populate_data_sample_submessages(
351 bool requires_inline_qos)
356 add_timestamp(subm, flags, dsle.
get_header());
369 subm[i - 1].info_dst_sm(idest);
382 subm[i - 1].info_dst_sm(idest);
398 switch (message_id) {
405 "Non-sample messages seen, message_id = %d\n",
int(message_id)));
409 if (requires_inline_qos) {
413 populate_inline_qos(qos_data, data.
inlineQos);
421 subm[i - 1].data_sm(data);
431 static const size_t offset = 8 ;
440 RtpsSampleHeader::populate_data_control_submessages(
443 bool requires_inline_qos)
449 add_timestamp(subm, flags, header);
469 data.
inlineQos[qos_len].status_info(STATUS_INFO_REGISTER);
474 const int qos_len = data.
inlineQos.length();
476 data.
inlineQos[qos_len].status_info(STATUS_INFO_UNREGISTER);
485 data.
inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE);
494 data.
inlineQos[qos_len].status_info(STATUS_INFO_DISPOSE_UNREGISTER);
503 "RtpsSampleHeader::populate_data_control_submessages(): " 504 "Non-sample messages seen, message_id = %d\n",
509 if (requires_inline_qos) {
513 populate_inline_qos(qos_data, data.
inlineQos);
521 subm[idx].data_sm(data);
524 #define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS) \ 525 if (WRITER_QOS.QOS_NAME != DEFAULT_QOS.QOS_NAME) { \ 526 const int idx = DCPS::grow(plist) - 1; \ 527 plist[idx].QOS_NAME(WRITER_QOS.QOS_NAME); \ 531 RtpsSampleHeader::populate_inline_qos(
540 plist[idx].string_data(qos_data.
topic_name.c_str());
557 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 573 plist[idx].reliability(reliability);
580 #undef PROCESS_INLINE_QOS 586 const char* ps =
reinterpret_cast<const char*
>(&s);
597 const char* pi =
reinterpret_cast<const char*
>(&i);
616 void peek(
ACE_CDR::ULong& target,
const char* src,
bool swap_bytes)
625 const size_t FRAG_START_OFFSET = 24, FRAG_SAMPLE_SIZE_OFFSET = 32;
632 using namespace RTPS;
633 size_t data_offset = 0;
634 const char* rd = orig.
rd_ptr();
643 flags = rd[data_offset + 1];
645 bool found_data =
false;
647 switch (rd[data_offset]) {
652 "attempting to fragment a Data submessage with no payload.\n"));
662 peek(starting_frag, rd + data_offset + FRAG_START_OFFSET, swap_bytes);
663 peek(sample_size, rd + data_offset + FRAG_SAMPLE_SIZE_OFFSET, swap_bytes);
673 peek(octetsToNextHeader, rd + data_offset + 2, swap_bytes);
675 data_offset += octetsToNextHeader +
SMHDR_SZ;
676 if (data_offset >= orig.
length()) {
679 "invalid octetsToNextHeader encountered while fragmenting.\n"));
686 size_t sz = orig.
length();
689 if (rd[data_offset] ==
DATA) {
701 head.
reset(DataSampleHeader::alloc_msgblock(orig, sz,
false));
703 head->
copy(rd, data_offset);
706 head->
wr_ptr()[1] = new_flags;
709 std::memset(head->
wr_ptr(), 0, 4);
712 write(head, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
714 head->
copy(rd + data_offset + 8, 16);
716 write(head, starting_frag, swap_bytes);
717 const size_t max_data = size - sz, orig_payload = orig.
cont()->
total_length();
719 static_cast<ACE_CDR::UShort>(std::min(max_data, orig_payload) / FRAG_SIZE);
722 "Number of Fragments is Zero: min(%B, %B) / FRAG_SIZE\n",
723 max_data, orig_payload));
726 write(head, frags, swap_bytes);
727 write(head, FRAG_SIZE, swap_bytes);
728 write(head, sample_size, swap_bytes);
731 head->
copy(rd + iqos_offset, orig.
length() - iqos_offset);
735 tail.
reset(DataSampleHeader::alloc_msgblock(orig, data_offset + 36,
false));
737 tail->
copy(rd, data_offset);
740 tail->
wr_ptr()[1] = new_flags & ~FLAG_Q;
743 std::memset(tail->
wr_ptr(), 0, 4);
746 write(tail, DATA_FRAG_OCTETS_TO_IQOS, swap_bytes);
747 tail->
copy(rd + data_offset + 8, 16);
749 write(tail, starting_frag + frags, swap_bytes);
750 const size_t tail_data = orig_payload - frags * FRAG_SIZE;
753 write(tail, tail_frags, swap_bytes);
754 write(tail, FRAG_SIZE, swap_bytes);
755 write(tail, sample_size, swap_bytes);
759 DataSampleHeader::split_payload(*orig.
cont(), frags * FRAG_SIZE,
760 payload_head, payload_tail);
761 head->
cont(payload_head.release());
765 starting_frag + frags + tail_frags - 1);
DataSampleHeader header_
The demarshalled sample header.
sequence< Submessage > SubmessageSeq
CORBA::ULong get_num_subs() const
const octet FLAG_N_IN_FRAG
OPENDDS_STRING topic_name
const ACE_CDR::UShort DATA_FRAG_OCTETS_TO_IQOS
#define CASE_SMKIND(kind, class, name)
const DataSampleHeader & get_header() const
char message_id_
The enum MessageId.
TransportSendListener * get_send_listener() const
ReliabilityQosPolicy reliability
SubmessageHeader smHeader
size_t length(void) const
Duration_t max_blocking_time
SubmessageHeader smHeader
bool has_data() const
true if at least one Data Block is stored (even if it has 0 useable bytes)
const octet FLAG_K_IN_FRAG
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
void replace(const char *data, size_t size)
Replace all payload bytes with passed-in data Based on the ACE_Message_Block(const char*...
bool skip(size_t n, int size=1)
const ParameterId_t PID_KEY_HASH
String to_dds_string(unsigned short to_convert)
ACE_UINT32 source_timestamp_nanosec_
const ParameterId_t PID_PARTITION
GUID_t get_pub_id() const
PresentationQosPolicyAccessScopeKind access_scope
const ACE_CDR::UShort INFO_DST_SZ
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
key GuidPrefix_t guidPrefix
#define ACE_CDR_BYTE_ORDER
const TransportSendListener * listener() const
DDS::PublisherQos pub_qos
const octet FLAG_N_IN_DATA
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
char * rd_ptr(void) const
OpenDDS::internal::special_serialization typedef sequence< Parameter > ParameterList
const ACE_CDR::UShort INFO_TS_SZ
SubmessageHeader smHeader
OpenDDS_Dcps_Export const SequenceRange unknown_sequence_range
const ACE_CDR::UShort SMHDR_SZ
const DataSampleHeader & header() const
int copy(const char *buf, size_t n)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
const ParameterId_t PID_STATUS_INFO
ReliabilityQosPolicyKind kind
Class to serialize and deserialize data for DDS.
OpenDDS::DCPS::GUID_t get_sub_id(CORBA::ULong index) const
unsigned short submessageLength
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
SequenceNumber_t writerSN
Holds a data sample received by the transport.
bool operator==(const Duration_t &x, const Duration_t &y)
ACE_INLINE OpenDDS_Dcps_Export ACE_UINT32 nanoseconds_to_uint32_fractional_seconds(ACE_UINT32 fraction)
SubmessageHeader smHeader
DDS::Duration_t max_blocking_time
virtual const ACE_Message_Block * msg_payload() const
The marshalled payload only (sample data)
static void swap_2(char const *orig, char *target)
DCPS::EntityId_t writerId
ACE_Message_Block * cont(void) const
SequenceNumber_t writerSN
unsigned char peek(size_t offset) const
Retreive one byte of data from the payload.
Seq::size_type grow(Seq &seq)
DCPS::EntityId_t writerId
size_t total_length(void) const
char * wr_ptr(void) const
ACE_UINT32 message_length_
bool more_fragments_
The current "Data Sample" needs reassembly before further processing.
unsigned short fragmentsInSubmessage
const ParameterId_t PID_PRESENTATION
DDS::DataWriterQos dw_qos
unsigned short fragmentSize
const ParameterId_t PID_TOPIC_NAME
ACE_INT32 source_timestamp_sec_
std::pair< SequenceNumber, SequenceNumber > SequenceRange
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
virtual void retrieve_inline_qos_data(InlineQosData &qos_data) const
FragmentNumber_t fragmentStartingNum
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define PROCESS_INLINE_QOS(QOS_NAME, DEFAULT_QOS, WRITER_QOS)
::DDS::ReturnCode_t write(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t handle)
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
static void swap_4(char const *orig, char *target)
void append(ReceivedDataSample &suffix)
Update this ReceivedDataSample's data payload to include the suffix's data payload after any existing...
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
const EntityId_t ENTITYID_UNKNOWN
#define TheServiceParticipant
The Internal API and Implementation of OpenDDS.
const octet FLAG_K_IN_DATA
DCPS::GuidPrefix_t guidPrefix
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
const ParameterId_t PID_ORIGINAL_WRITER_INFO