40 , reassembly_(link->config()->fragment_reassembly_timeout_)
41 , receiver_(local_prefix)
42 , thread_status_manager_(thread_status_manager)
43 #ifdef OPENDDS_SECURITY
45 , encoded_rtps_(false)
46 , encoded_submsg_(false)
50 const size_t INDEX = 0;
52 if (receive_buffers_[INDEX] == 0) {
54 receive_buffers_[INDEX],
71 #ifdef OPENDDS_SECURITY 82 const size_t INDEX = 0;
92 #pragma warning(disable : 4267) 94 iov.iov_len = cur_rb->
space();
98 iov.iov_base = cur_rb->
wr_ptr();
112 if (bytes_remaining < 0) {
117 cur_rb->
wr_ptr(bytes_remaining);
119 if (bytes_remaining == 0) {
147 const ScopedHeaderProcessing shp(*
this);
148 while (bytes_remaining > 0) {
159 VDBG((
LM_DEBUG,
"(%P|%t) DBG: Attempt reassembly of fragments\n"));
162 VDBG((
LM_DEBUG,
"(%P|%t) DBG: Reassembled complete message\n"));
182 if (receive_buffers_[INDEX]->data_block()->reference_count() > 1) {
185 ACE_DEBUG((
LM_INFO,
"(%P|%t) INFO: RtpsUdpReceiveStrategy::handle_input: reallocating primary receive buffer based on reference count\n"));
189 receive_buffers_[INDEX],
194 receive_buffers_[INDEX],
220 #ifdef OPENDDS_SECURITY
228 const ssize_t ret = socket.
recv(iov, n, remote_address, 0
229 #
if defined(ACE_RECVPKTINFO) || defined(ACE_RECVPKTINFO6)
239 ACE_ERROR((
LM_ERROR,
"(%P|%t) ERROR: RtpsUdpReceiveStrategy::receive_bytes_helper - invalid address size\n"));
243 if (n > 0 && ret > 0 && iov[0].iov_len >= 4 && std::memcmp(iov[0].iov_base,
"RTPS", 4) == 0) {
245 if (cfg && cfg->count_messages()) {
249 tport.transport_statistics_.message_count[
key].recv(ret);
254 #ifdef OPENDDS_SECURITY 256 # ifndef ACE_RECVPKTINFO 257 ACE_ERROR((
LM_ERROR,
"ERROR: RtpsUdpReceiveStrategy::receive_bytes_helper potential STUN message " 258 "received but this version of the ACE library doesn't support the local_address " 259 "extension in ACE_SOCK_Dgram::recv\n"));
260 ACE_UNUSED_ARG(stop);
266 size_t block_size = std::min(bytes, static_cast<size_t>(iov[0].iov_len));
272 for (
int i = 1; i < n && bytes != 0; ++i) {
273 block_size = std::min(bytes, static_cast<size_t>(iov[i].iov_len));
283 message.
block = head;
284 if (serializer >> message) {
286 if (cfg && cfg->count_messages()) {
290 tport.transport_statistics_.message_count[
key].recv(ret);
293 if (tport.relay_srsm().is_response(message)) {
294 tport.process_relay_sra(tport.relay_srsm().receive(message));
295 #ifdef OPENDDS_SECURITY 296 }
else if (endpoint) {
297 ice_agent->receive(endpoint, local_address, remote_address, message);
304 ACE_UNUSED_ARG(stop);
310 #ifdef OPENDDS_SECURITY 316 "from %C %C secure RTPS processing failed: %C\n",
330 return link_->ipv6_multicast_socket();
333 return link_->ipv6_unicast_socket();
350 #ifdef ACE_LACKS_SENDMSG 351 ACE_UNUSED_ARG(stop);
352 char buffer[0x10000];
353 ssize_t scatter = socket.
recv(buffer,
sizeof buffer, remote_address);
355 for (
int i = 0; scatter > 0 && i < n; ++i) {
356 const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
357 static_cast<size_t>(scatter));
358 std::memcpy(iov[i].iov_base, iter, chunk);
362 const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer);
365 #ifdef OPENDDS_SECURITY
372 #ifdef OPENDDS_SECURITY 386 return recv_err(
"no crypto plugin", remote_address, peer, stop);
390 return recv_err(
"message too short", remote_address, peer, stop);
393 const unsigned int encLen =
static_cast<unsigned int>(ret);
395 encoded.length(encLen);
396 unsigned char*
const encBuf = encoded.get_buffer();
398 for (
int i = 0; i < n && copied < encLen; ++i) {
399 const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
400 static_cast<size_t>(encLen - copied));
401 std::memcpy(encBuf + copied, iov[i].iov_base, chunk);
405 if (copied != encLen) {
406 return recv_err(
"received bytes didn't fit in iovec array", remote_address, peer, stop);
413 static const int GuidPrefixOffset = 8;
421 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::receive_bytes - decode error from %C\n",
LogGuid(peer).c_str()));
425 ACE_TEXT(
"decode_rtps_message no remote participant crypto handle for %C, dropping\n"),
434 if (!crypto->decode_rtps_message(plain, encoded, receiver, sender, ex)) {
436 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::receive_bytes - decode error from %C\n",
LogGuid(peer).c_str()));
439 ACE_ERROR((
LM_WARNING,
"(%P|%t) {encdec_warn} decode_rtps_message SecurityException [%d.%d]: %C\n",
445 ACE_TEXT(
"decode_rtps_message remote participant has crypto handle but no key, dropping\n")));
450 return recv_err(
"decode_rtps_message failed", remote_address, peer, stop);
454 const size_t plainLen = plain.length();
455 const unsigned char*
const plainBuf = plain.get_buffer();
456 for (
int i = 0; i < n && copied < plainLen; ++i) {
457 const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
459 std::memcpy(iov[i].iov_base, plainBuf + copied, chunk);
463 if (copied != plainLen) {
464 return recv_err(
"plaintext doesn't fit in iovec array", remote_address, peer, stop);
477 #ifdef OPENDDS_SECURITY 486 "Full message from %C requires protection, dropping\n",
498 if ((esa & MASK_PROTECT_SUBMSG) == MASK_PROTECT_SUBMSG && !
encoded_submsg_) {
501 "Submessage from %C requires protection, dropping\n",
507 ACE_UNUSED_ARG(sender);
516 using namespace RTPS;
522 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample - not destination\n"));
533 #ifdef OPENDDS_SECURITY 556 using namespace RTPS;
571 const DataSubmessage& data = submessage.
data_sm();
574 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
579 #ifdef OPENDDS_SECURITY 582 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
608 (directedWriteReaders.empty() || directedWriteReaders.find(reader) != directedWriteReaders.end())) {
611 ACE_TEXT(
"calling DataLink::data_received for seq: %q to reader %C\n"),
622 included_ids += (first ?
"" :
"\n") +
LogGuid(*iter).
conv_;
630 excluded_ids += (first ?
"" :
"\n") +
LogGuid(*iter2).
conv_;
635 ACE_TEXT(
" readers_selected ids: %C\n")
636 ACE_TEXT(
" readers_withheld ids: %C\n"),
637 this, included_ids.c_str(), excluded_ids.c_str()));
641 if (directedWriteReaders.empty()) {
644 ACE_TEXT(
"calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n"),
651 ACE_TEXT(
"calling DataLink::data_received_include for seq: %q to directedWriteReaders\n"),
657 if (directedWriteReaders.empty()) {
660 ACE_TEXT(
"calling DataLink::data_received_include for seq: %q to readers_selected_\n"),
667 ACE_TEXT(
"calling DataLink::data_received_include for seq: %q to intersection of readers\n"),
680 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
690 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
707 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
717 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
727 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
738 #ifdef OPENDDS_SECURITY 753 #ifdef OPENDDS_SECURITY 776 ACE_TEXT(
"deliver_from_secure failed to encode submessage %C RPCH %d\n"),
777 LogGuid(peer).c_str(), peer_pch));
789 bool ok = crypto->preprocess_secure_submsg(dwch, drch, category, encoded_submsg,
794 ACE_TEXT(
"dwch is %d and drch is %d\n"), dwch, drch), 4);
798 ok = crypto->decode_datawriter_submessage(plain_submsg, encoded_submsg,
802 ok = crypto->decode_datareader_submessage(plain_submsg, encoded_submsg,
811 ACE_TEXT(
"failed remote %C RPCH %d, [%d.%d]: %C\n"),
821 ACE_TEXT(
"decode %C submessage failed [%d.%d]: \"%C\" ")
822 ACE_TEXT(
"(rpch: %u, local d%cch: %u, remote d%cch: %u)\n"),
823 dw ?
"writer" :
"reader",
836 mb.
copy(reinterpret_cast<const char*>(plain_submsg.get_buffer()), mb.size());
841 ACE_TEXT(
"RtpsUdpReceiveStrategy: decoded writer submessage") :
842 ACE_TEXT(
"RtpsUdpReceiveStrategy: decoded reader submessage")));
850 VDBG((
LM_DEBUG,
"(%P|%t) DBG: Attempt reassembly of decoded fragments\n"));
852 VDBG((
LM_DEBUG,
"(%P|%t) DBG: Reassembled complete message from decoded\n"));
881 for (
size_t i = 0; i < secure_submessages_.size(); ++i) {
901 for (
size_t i = 0; i < secure_submessages_.size(); ++i) {
902 if (!(ser << secure_submessages_[i])) {
915 if (!(ser << postfix)) {
919 encoded.length(static_cast<unsigned int>(mb.
length()));
920 std::memcpy(encoded.get_buffer(), mb.
rd_ptr(), mb.
length());
921 secure_submessages_.resize(0);
938 writer_crypto_handle =
943 writer_crypto_handle =
949 const bool payload_protected = (esa & MASK_PROTECT_PAYLOAD) == MASK_PROTECT_PAYLOAD;
951 if (writer_crypto_handle ==
DDS::HANDLE_NIL || !crypto || !payload_protected) {
961 iQos.length(static_cast<unsigned int>(iQosSize));
962 const char* iQos_raw =
reinterpret_cast<const char*
>(iQos.get_buffer());
971 const bool ok = crypto->decode_serialized_payload(plain, encoded, iQos,
973 writer_crypto_handle, ex);
978 sample.
append(reinterpret_cast<const char*>(plain.get_buffer()), plain.length());
980 if (plain.length() > 1) {
986 "decode_serialized_payload failed [%d.%d]: %C\n",
1016 if (cfg && cfg->use_multicast_) {
1033 #ifdef OPENDDS_SECURITY 1037 return header.
valid();
1044 #ifdef OPENDDS_SECURITY 1046 return header.
valid();
1061 return header.
valid();
1091 directedWriteReaders.clear();
1095 directedWriteReaders.insert(ds.
inlineQos[i].guid());
1098 return !directedWriteReaders.empty();
1109 using namespace RTPS;
1126 const DataSubmessage dsm = {
1128 dfsm.readerId, dfsm.writerId, dfsm.writerSN,
ParameterList()};
1142 bool modified =
false;
1143 for (
CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) {
1144 if (bit == 32) bit = 0;
1166 --cumulative_bits_added;
1177 for (
SequenceNumber sn = range.first; sn <= range.second; ++sn) {
1191 FragmentInfo* frag_info)
1193 for (
SequenceNumber sn = range.first; sn <= range.second; ++sn) {
1194 ACE_UINT32 total_frags = 0;
1197 if (total_frags > 256) {
1198 static const CORBA::Long empty_buffer[8] = { 0, 0, 0, 0, 0, 0, 0, 0 };
1200 ACE_UINT32 numBits = 0;
1202 const ACE_UINT32 base =
reassembly_.
get_gaps(sn, pub_id, &buffer[0], static_cast<CORBA::ULong>(buffer.size()), numBits);
1206 const CORBA::ULong len = std::min(remain, static_cast<CORBA::ULong>(256));
1209 if (std::memcmp(&buffer[idx], &empty_buffer[0], len8) != 0) {
1210 std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
1212 p.second = RTPS::FragmentNumberSet();
1213 frag_info->push_back(p);
1214 RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
1215 missing_frags.numBits = len;
1216 missing_frags.bitmapBase.value = i;
1217 missing_frags.bitmap.length(len32);
1218 std::memcpy(missing_frags.bitmap.get_buffer(), &buffer[idx], len8);
1223 std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
1225 p.second = RTPS::FragmentNumberSet();
1226 frag_info->push_back(p);
1227 RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
1228 missing_frags.numBits = 0;
1229 missing_frags.bitmap.length(8);
1230 missing_frags.bitmapBase.value =
1232 8, missing_frags.numBits);
1234 missing_frags.bitmap.length((missing_frags.numBits + 31) / 32);
1241 return frag_info ? !frag_info->empty() :
false;
1249 , have_timestamp_(false)
1266 using namespace RTPS;
1290 using namespace RTPS;
1324 if (
id.guidPrefix[i]) {
1406 using namespace RTPS;
GuidPrefix_t source_guid_prefix_
DataSampleHeader header_
The demarshalled sample header.
ACE_UINT16 fragment_size_
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.
RtpsUdpReceiveStrategy(RtpsUdpDataLink *link, const GuidPrefix_t &local_prefix, ThreadStatusManager &thread_status_manager)
void * malloc(size_t nbytes=sizeof(T))
const RtpsSampleHeader & received_sample_header() const
DCPS::EntityId_t readerId
void deliver_sample_i(ReceivedDataSample &sample, const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
DCPS::GuidPrefix_t guidPrefix
char message_id_
The enum MessageId.
ThreadStatusManager & thread_status_manager_
AckNackSubmessage acknack_sm
const ACE_CDR::UShort RTPSHDR_SZ
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
SubmessageHeader smHeader
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_PAYLOAD_PROTECTED
size_t length(void) const
unsigned long ACE_Reactor_Mask
SubmessageHeader smHeader
OpenDDS_Dcps_Export TransportDebug transport_debug
const long LOCATOR_KIND_UDPv4
RTPS::VendorId_t source_vendor_
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
TransportDataBlockAllocator db_allocator_
RtpsUdpTransport_rch transport()
bool isReader() const
Returns true if the GUID represents a reader entity.
void data_received_include(ReceivedDataSample &sample, const RepoIdSet &incl)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
InfoDestinationSubmessage info_dst_sm
RepoIdSet readers_withheld_
virtual int start_i()
Let the subclass start.
RepoIdSet readers_selected_
MessageReceiver receiver_
bool log_dropped_messages
Log received RTPS messages that were dropped.
const ReceivedDataSample * recvd_sample_
ACE_UINT32 source_timestamp_nanosec_
Adapt the TransportReceiveStrategy for RTPS's "transport" (message) Header.
const ACE_SOCK_Dgram & choose_recv_socket(ACE_HANDLE fd) const
const char * c_str() const
Security::HandleRegistry_rch handle_registry() const
CommandPtr execute_or_enqueue(CommandPtr command)
void remove_fragments(const SequenceRange &range, const GUID_t &pub_id)
bool has_fragments(const SequenceRange &range, const GUID_t &pub_id, FragmentInfo *frag_info=0)
const OpenDDS::DCPS::Locator_t LOCATOR_INVALID
DDS::OctetSeq copy_data() const
copy the data payload into an OctetSeq
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
key GuidPrefix_t guidPrefix
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint() const
DCPS::EntityId_t writerId
bool remove_frags_from_bitmap(CORBA::Long bitmap[], CORBA::ULong num_bits, const SequenceNumber &base, const GUID_t &pub_id, ACE_CDR::ULong &samples_requested)
LocatorUDPv4_t multicastLocator
const Time_t TIME_INVALID
InfoSourceSubmessage info_src_sm
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
void submsg(const RTPS::Submessage &s)
SubmessageHeader smHeader
void disable_response_queue(bool send_immediately)
const MessageCountKind MCK_STUN
char * rd_ptr(void) const
ReactorInterceptor_rch get_reactor_interceptor() const
void deliver_from_secure(const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
Conversion processing and value testing utilities for RTPS GUID_t types.
ACE_SOCK_Dgram_Mcast & multicast_socket()
RtpsSampleHeader data_sample_header_
Current data sample header.
bool is_target(const GUID_t &remote_id)
virtual void end_transport_header_processing()
End Current Transport Header Processing.
ProtocolVersion_t version
const ACE_CDR::UShort SMHDR_SZ
OpenDDS_Dcps_Export void address_to_locator(Locator_t &locator, const ACE_INET_Addr &addr)
RtpsUdpInst_rch config() const
SecuritySubmessage security_sm
ACE_HANDLE socket(int protocol_family, int type, int proto)
bool has_frags(const SequenceNumber &seq, const GUID_t &pub_id) const
virtual void stop_i()
Let the subclass stop.
const GuidPrefix_t & local_prefix() const
int copy(const char *buf, size_t n)
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
void clear_completed_fragments(const GUID_t &pub_id)
ProtocolVersion_t version
static ssize_t receive_bytes_helper(iovec iov[], int n, const ACE_SOCK_Dgram &socket, ACE_INET_Addr &remote_address, DCPS::RcHandle< ICE::Agent > agent, DCPS::WeakRcHandle< ICE::Endpoint > endpoint, RtpsUdpTransport &tport, bool &stop)
DCPS::EntityId_t writerId
void reset(const ACE_INET_Addr &remote_address, const RTPS::Header &hdr)
ACE_INLINE OpenDDS_Dcps_Export ACE_UINT32 uint32_fractional_seconds_to_nanoseconds(ACE_UINT32 fraction)
const EntityId_t ENTITYID_PARTICIPANT
NativeCryptoHandle DatawriterCryptoHandle
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
LocatorList multicastLocatorList
virtual int handle_input(ACE_HANDLE fd)
DCPS::GuidPrefix_t guidPrefix
bool write_data(Serializer &ser) const
write the data payload to the Serializer
Class to serialize and deserialize data for DDS.
HeartBeatFragSubmessage hb_frag_sm
virtual ssize_t receive_bytes(iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
Only our subclass knows how to do this.
const long OPENDDS_EXCEPTION_CODE_NO_KEY
DCPS::EntityId_t readerId
void data_unavailable(const FragmentRange &transportSeqDropped)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Holds a data sample received by the transport.
bool sec_submsg_to_octets(DDS::OctetSeq &encoded, const RTPS::Submessage &postfix)
long ParticipantCryptoHandle
SubmessageHeader smHeader
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
bool into_received_data_sample(ReceivedDataSample &rds)
NativeCryptoHandle DatareaderCryptoHandle
void free(void *ptr)
Return a chunk of memory back to free list cache.
virtual ACE_Message_Block * release(void)
ACE_Message_Block * block
bool align_r(size_t alignment)
unsigned long EndpointSecurityAttributesMask
GuidPrefix_t dest_guid_prefix_
Security::SecurityConfig_rch security_config() const
virtual void begin_transport_header_processing()
Begin Current Transport Header Processing.
ACE_Message_Block * cont(void) const
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID
unsigned char peek(size_t offset) const
Retreive one byte of data from the payload.
void pdu_remaining(size_t size)
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > receive_lock_
Locking strategy for the allocators.
const ReceivedDataSample * withhold_data_from(const GUID_t &sub_id)
char * wr_ptr(void) const
ACE_HANDLE get_handle(void) const
const unsigned long LOCATOR_PORT_INVALID
ACE_UINT32 message_length()
sequence< octet > OctetSeq
bool getDirectedWriteReaders(RepoIdSet &directedWriteReaders, const RTPS::DataSubmessage &ds) const
LocatorList unicastLocatorList
OpenDDS_Dcps_Export void align(size_t &value, size_t by)
Align "value" by "by" if it's not already.
TransportReassembly reassembly_
virtual void relink(bool do_suspend=true)
unsigned short fragmentsInSubmessage
OpenDDS::RTPS::Header header_
unsigned short fragmentSize
ReceivedDataSample secure_sample_
sequence< Parameter > ParameterList
ACE_INT32 source_timestamp_sec_
ACE_SOCK_Dgram & unicast_socket()
TransportMessageBlockAllocator mb_allocator_
std::pair< SequenceNumber, SequenceNumber > SequenceRange
MessageReceiver(const GuidPrefix_t &local)
void clear_completed(const GUID_t &pub_id)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
DCPS::EntityId_t writerId
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
HeartBeatSubmessage heartbeat_sm
virtual ACE_HANDLE get_handle(void) const
SecureSubmessageCategory_t
OpenDDS_Dcps_Export LogLevel log_level
virtual bool reassemble(ReceivedDataSample &data)
size_t get_serialized_size()
InfoReplyIp4Submessage info_reply_ipv4_sm
FragmentNumber_t fragmentStartingNum
size_t data_length() const
total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks
bool more_fragments() const
const long OPENDDS_EXCEPTION_MINOR_CODE_NO_KEY
LocatorUDPv4_t unicastLocator
Sequence number abstraction. Only allows positive 64 bit values.
bool reassemble(const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags=0)
static const ACE_Time_Value zero
DCPS::RcHandle< ICE::Agent > get_ice_agent() const
#define VDBG_LVL(DBG_ARGS, LEVEL)
int get_addr_size(void) const
Adapt the TransportReceiveStrategy for RTPS's "sample" (submessage) Header.
bool log_messages
Log all RTPS messages sent or recieved.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
bool set_intersect(SetA &sA, const SortedB &sB, LessThan lessThan)
const MessageCountKind MCK_RTPS
CORBA::ULong get_gaps(const SequenceNumber &msg_seq, const GUID_t &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const
bool check_encoded(const EntityId_t &sender)
RTPS::Submessage submessage_
static bool payload_byte_order(const ReceivedDataSample &rds)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
TransportDataAllocator data_allocator_
NackFragSubmessage nack_frag_sm
void assign(EntityId_t &dest, const EntityId_t &src)
InfoTimestampSubmessage info_ts_sm
void fill_header(DataSampleHeader &header) const
ACE_INET_Addr remote_address_
RTPS::ProtocolVersion_t source_version_
void filterBestEffortReaders(const ReceivedDataSample &ds, RepoIdSet &selected, RepoIdSet &withheld)
RtpsTransportHeader receive_transport_header_
Current receive TransportHeader.
sequence<<%SCOPED%><%TYPE%><%SEQ%> local interface<%TYPE%> out string encoded
ACE_UINT32 fragment_size_
Fragment size used by this sample.
static bool separate_message(EntityId_t entity)
void append(ReceivedDataSample &suffix)
Update this ReceivedDataSample's data payload to include the suffix's data payload after any existing...
InfoReplySubmessage info_reply_sm
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
virtual bool check_header(const RtpsTransportHeader &header)
Check the transport header for suitability.
bool decode_payload(ReceivedDataSample &sample, const RTPS::DataSubmessage &submessage)
const EntityId_t ENTITYID_UNKNOWN
DataFragSubmessage data_frag_sm
typedef OPENDDS_VECTOR(SeqFragPair) FragmentInfo
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
void do_not_withhold_data_from(const GUID_t &sub_id)
virtual bool reassemble_i(ReceivedDataSample &data, RtpsSampleHeader &rsh)
SubmessageHeader smHeader
The Internal API and Implementation of OpenDDS.
DCPS::LocatorSeq unicast_reply_locator_list_
const octet FLAG_K_IN_DATA
void received(const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
SubmessageSeq submessages
#define ACE_NOTSUP_RETURN(FAILVALUE)
RTPS::SecuritySubmessage secure_prefix_
OpenDDS_Dcps_Export SecurityDebug security_debug
DCPS::LocatorSeq multicast_reply_locator_list_
SubmessageHeader smHeader
DDS::Security::ParticipantSecurityAttributesMask security_attributes_to_bitmask(const DDS::Security::ParticipantSecurityAttributes &sec_attr)
const ParameterId_t PID_DIRECTED_WRITE
virtual void deliver_sample(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
Called when there is a ReceivedDataSample to be delivered.
void enable_response_queue()