OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::RtpsUdpReceiveStrategy Class Reference

#include <RtpsUdpReceiveStrategy.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpReceiveStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpReceiveStrategy:
Collaboration graph
[legend]

Classes

struct  MessageReceiver
 

Public Types

typedef std::pair< SequenceNumber, RTPS::FragmentNumberSetSeqFragPair
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 

Public Member Functions

 RtpsUdpReceiveStrategy (RtpsUdpDataLink *link, const GuidPrefix_t &local_prefix, ThreadStatusManager &thread_status_manager)
 
virtual int handle_input (ACE_HANDLE fd)
 
bool remove_frags_from_bitmap (CORBA::Long bitmap[], CORBA::ULong num_bits, const SequenceNumber &base, const GUID_t &pub_id, ACE_CDR::ULong &samples_requested)
 
void remove_fragments (const SequenceRange &range, const GUID_t &pub_id)
 
typedef OPENDDS_VECTOR (SeqFragPair) FragmentInfo
 
void clear_completed_fragments (const GUID_t &pub_id)
 
bool has_fragments (const SequenceRange &range, const GUID_t &pub_id, FragmentInfo *frag_info=0)
 
const ReceivedDataSamplewithhold_data_from (const GUID_t &sub_id)
 
void do_not_withhold_data_from (const GUID_t &sub_id)
 
virtual void begin_transport_header_processing ()
 Begin Current Transport Header Processing. More...
 
virtual void end_transport_header_processing ()
 End Current Transport Header Processing. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >
virtual ~TransportReceiveStrategy ()
 
int start ()
 
void stop ()
 
int handle_dds_input (ACE_HANDLE fd)
 
virtual void relink (bool do_suspend=true)
 
const RtpsTransportHeaderreceived_header () const
 
RtpsTransportHeaderreceived_header ()
 
const RtpsSampleHeaderreceived_sample_header () const
 
RtpsSampleHeaderreceived_sample_header ()
 
ACE_Message_Blockto_msgblock (const ReceivedDataSample &sample)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportStrategy
virtual ~TransportStrategy ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 

Static Public Member Functions

static ssize_t receive_bytes_helper (iovec iov[], int n, const ACE_SOCK_Dgram &socket, ACE_INET_Addr &remote_address, DCPS::RcHandle< ICE::Agent > agent, DCPS::WeakRcHandle< ICE::Endpoint > endpoint, RtpsUdpTransport &tport, bool &stop)
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 

Static Public Attributes

static const size_t BUFFER_COUNT = 1u
 
- Static Public Attributes inherited from OpenDDS::DCPS::TransportReceiveConstants
static const size_t RECEIVE_BUFFERS = DEFAULT_TRANSPORT_RECEIVE_BUFFERS
 
static const size_t BUFFER_LOW_WATER = 4096
 
static const size_t MESSAGE_BLOCKS = 1000
 
static const size_t DATA_BLOCKS = 100
 

Private Types

typedef TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeaderBaseReceiveStrategy
 

Private Member Functions

bool getDirectedWriteReaders (RepoIdSet &directedWriteReaders, const RTPS::DataSubmessage &ds) const
 
const ACE_SOCK_Dgramchoose_recv_socket (ACE_HANDLE fd) const
 
virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
 Only our subclass knows how to do this. More...
 
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
 Called when there is a ReceivedDataSample to be delivered. More...
 
void deliver_sample_i (ReceivedDataSample &sample, const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
 
virtual int start_i ()
 Let the subclass start. More...
 
virtual void stop_i ()
 Let the subclass stop. More...
 
virtual bool check_header (const RtpsTransportHeader &header)
 Check the transport header for suitability. More...
 
virtual bool check_header (const RtpsSampleHeader &header)
 Check the data sample header for suitability. More...
 
virtual bool reassemble (ReceivedDataSample &data)
 
virtual bool reassemble_i (ReceivedDataSample &data, RtpsSampleHeader &rsh)
 
bool sec_submsg_to_octets (DDS::OctetSeq &encoded, const RTPS::Submessage &postfix)
 
void deliver_from_secure (const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
 
bool decode_payload (ReceivedDataSample &sample, const RTPS::DataSubmessage &submessage)
 
bool check_encoded (const EntityId_t &sender)
 
 OPENDDS_VECTOR (RTPS::Submessage) secure_submessages_
 

Private Attributes

RtpsUdpDataLinklink_
 
SequenceNumber last_received_
 
const ReceivedDataSamplerecvd_sample_
 
RepoIdSet readers_withheld_
 
RepoIdSet readers_selected_
 
ACE_UINT16 fragment_size_
 
FragmentRange frags_
 
ACE_UINT32 total_frags_
 
TransportReassembly reassembly_
 
MessageReceiver receiver_
 
ThreadStatusManagerthread_status_manager_
 
ACE_INET_Addr remote_address_
 
RTPS::Message message_
 
RTPS::SecuritySubmessage secure_prefix_
 
ReceivedDataSample secure_sample_
 
bool encoded_rtps_
 
bool encoded_submsg_
 

Additional Inherited Members

- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >
 TransportReceiveStrategy (const TransportInst_rch &config, size_t receive_buffers_count=RECEIVE_BUFFERS)
 
virtual void finish_message ()
 
int skip_bad_pdus ()
 Ignore bad PDUs by skipping over them. More...
 
void reset ()
 
size_t pdu_remaining () const
 
size_t successor_index (size_t index) const
 Manage an index into the receive buffer array. More...
 
void update_buffer_index (bool &done)
 
 OPENDDS_VECTOR (ACE_Message_Block *) receive_buffers_
 Set of receive buffers in use. More...
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >
bool gracefully_disconnected_
 Flag indicates if the GRACEFUL_DISCONNECT message is received. More...
 
size_t receive_sample_remaining_
 Bytes remaining in the current DataSample. More...
 
RtpsTransportHeader receive_transport_header_
 Current receive TransportHeader. More...
 
TransportMessageBlockAllocator mb_allocator_
 
TransportDataBlockAllocator db_allocator_
 
TransportDataAllocator data_allocator_
 
ACE_Lock_Adapter< ACE_SYNCH_MUTEXreceive_lock_
 Locking strategy for the allocators. More...
 
size_t buffer_index_
 Current receive buffer index in use. More...
 
RtpsSampleHeader data_sample_header_
 Current data sample header. More...
 
ACE_Message_Blockpayload_
 
bool good_pdu_
 
size_t pdu_remaining_
 Amount of the current PDU that has not been processed yet. More...
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 40 of file RtpsUdpReceiveStrategy.h.

Member Typedef Documentation

◆ BaseReceiveStrategy

Definition at line 132 of file RtpsUdpReceiveStrategy.h.

◆ SeqFragPair

Definition at line 65 of file RtpsUdpReceiveStrategy.h.

Constructor & Destructor Documentation

◆ RtpsUdpReceiveStrategy()

OpenDDS::DCPS::RtpsUdpReceiveStrategy::RtpsUdpReceiveStrategy ( RtpsUdpDataLink link,
const GuidPrefix_t local_prefix,
ThreadStatusManager thread_status_manager 
)

Definition at line 31 of file RtpsUdpReceiveStrategy.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_NEW_MALLOC, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::data_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::db_allocator_, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::RECEIVE_DATA_BUFFER_SIZE, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::receive_lock_, secure_prefix_, OpenDDS::RTPS::SecuritySubmessage::smHeader, OpenDDS::DCPS::SUBMESSAGE_NONE, OpenDDS::RTPS::SubmessageHeader::submessageId, and ACE_Time_Value::zero.

34  : BaseReceiveStrategy(link->config(), BUFFER_COUNT)
35  , link_(link)
36  , last_received_()
37  , recvd_sample_(0)
38  , fragment_size_(0)
39  , total_frags_(0)
40  , reassembly_(link->config()->fragment_reassembly_timeout_)
41  , receiver_(local_prefix)
42  , thread_status_manager_(thread_status_manager)
43 #ifdef OPENDDS_SECURITY
44  , secure_sample_()
45  , encoded_rtps_(false)
46  , encoded_submsg_(false)
47 #endif
48 {
49  // Since BUFFER_COUNT is 1, the index will always be 0
50  const size_t INDEX = 0;
51 
52  if (receive_buffers_[INDEX] == 0) {
54  receive_buffers_[INDEX],
57  RECEIVE_DATA_BUFFER_SIZE, // Buffer size
58  ACE_Message_Block::MB_DATA, // Default
59  0, // Start with no continuation
60  0, // Let the constructor allocate
61  &data_allocator_, // Our buffer cache
62  &receive_lock_, // Our locking strategy
63  ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default
64  ACE_Time_Value::zero, // Default
65  ACE_Time_Value::max_time, // Default
66  &db_allocator_, // Our data block cache
67  &mb_allocator_ // Our message block cache
68  ));
69  }
70 
71 #ifdef OPENDDS_SECURITY
73 #endif
74 }
TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader > BaseReceiveStrategy
static const ACE_Time_Value max_time
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
static const ACE_Time_Value zero
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > receive_lock_
Locking strategy for the allocators.

Member Function Documentation

◆ begin_transport_header_processing()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::begin_transport_header_processing ( )
virtual

◆ check_encoded()

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_encoded ( const EntityId_t sender)
private

Definition at line 475 of file RtpsUdpReceiveStrategy.cpp.

References ACE_DEBUG, OpenDDS::DCPS::SecurityDebug::encdec_warn, encoded_rtps_, encoded_submsg_, DDS::Security::ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED, DDS::Security::ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID, DDS::HANDLE_NIL, OpenDDS::DCPS::RtpsUdpDataLink::handle_registry(), OpenDDS::DCPS::GuidConverter::isReader(), link_, LM_WARNING, OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle(), OpenDDS::DCPS::make_id(), OPENDDS_STRING, receiver_, OpenDDS::RTPS::security_attributes_to_bitmask(), OpenDDS::DCPS::security_debug, OpenDDS::DCPS::RtpsUdpDataLink::separate_message(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::source_guid_prefix_.

Referenced by deliver_sample_i().

476 {
477 #ifdef OPENDDS_SECURITY
478  using namespace DDS::Security;
479  const GUID_t sendGuid = make_id(receiver_.source_guid_prefix_, sender);
480  const GuidConverter conv(sendGuid);
481 
485  ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy::check_encoded "
486  "Full message from %C requires protection, dropping\n",
487  OPENDDS_STRING(conv).c_str()));
488  }
489  return false;
490  }
491 
493  conv.isReader() ?
494  link_->handle_registry()->get_remote_datareader_security_attributes(sendGuid) :
495  link_->handle_registry()->get_remote_datawriter_security_attributes(sendGuid));
496  static const EndpointSecurityAttributesMask MASK_PROTECT_SUBMSG =
498  if ((esa & MASK_PROTECT_SUBMSG) == MASK_PROTECT_SUBMSG && !encoded_submsg_) {
500  ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy::check_encoded "
501  "Submessage from %C requires protection, dropping\n",
502  OPENDDS_STRING(conv).c_str()));
503  }
504  return false;
505  }
506 #else
507  ACE_UNUSED_ARG(sender);
508 #endif
509  return true;
510 }
#define ACE_DEBUG(X)
unsigned long EndpointSecurityAttributesMask
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_SUBMESSAGE_PROTECTED
const InstanceHandle_t HANDLE_NIL
Security::HandleRegistry_rch handle_registry() const
#define OPENDDS_STRING
static bool separate_message(EntityId_t entity)
OpenDDS_Dcps_Export SecurityDebug security_debug
Definition: debug.cpp:32
DDS::Security::ParticipantSecurityAttributesMask security_attributes_to_bitmask(const DDS::Security::ParticipantSecurityAttributes &sec_attr)
Definition: MessageUtils.h:177
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ check_header() [1/2]

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header ( const RtpsTransportHeader header)
privatevirtual

Check the transport header for suitability.

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 1025 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::Message::hdr, OpenDDS::DCPS::RtpsTransportHeader::header_, OpenDDS::DCPS::TransportDebug::log_messages, message_, receiver_, remote_address_, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::reset(), secure_prefix_, OpenDDS::RTPS::SecuritySubmessage::smHeader, OpenDDS::DCPS::SUBMESSAGE_NONE, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::Message::submessages, OpenDDS::DCPS::transport_debug, and OpenDDS::DCPS::RtpsTransportHeader::valid().

Referenced by deliver_from_secure(), and handle_input().

1026 {
1029  message_.submessages.length(0);
1030  message_.hdr = header.header_;
1031  }
1032 
1033 #ifdef OPENDDS_SECURITY
1035 #endif
1036 
1037  return header.valid();
1038 }
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
SubmessageSeq submessages
Definition: RtpsCore.idl:897
void reset(const ACE_INET_Addr &remote_address, const RTPS::Header &hdr)
bool log_messages
Log all RTPS messages sent or recieved.

◆ check_header() [2/2]

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header ( const RtpsSampleHeader header)
privatevirtual

Check the data sample header for suitability.

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 1041 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::Submessage::data_frag_sm, fragment_size_, OpenDDS::RTPS::DataFragSubmessage::fragmentsInSubmessage, OpenDDS::RTPS::DataFragSubmessage::fragmentSize, OpenDDS::RTPS::DataFragSubmessage::fragmentStartingNum, frags_, receiver_, OpenDDS::RTPS::DataFragSubmessage::sampleSize, secure_prefix_, OpenDDS::RTPS::SecuritySubmessage::smHeader, OpenDDS::DCPS::RtpsSampleHeader::submessage_, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::submsg(), total_frags_, OpenDDS::DCPS::RtpsSampleHeader::valid(), and OpenDDS::RTPS::FragmentNumber_t::value.

1042 {
1043 
1044 #ifdef OPENDDS_SECURITY
1046  return header.valid();
1047  }
1048 #endif
1049 
1050  receiver_.submsg(header.submessage_);
1051 
1052  // save fragmentation details for use in reassemble()
1053  if (header.valid() && header.submessage_._d() == RTPS::DATA_FRAG) {
1054  const RTPS::DataFragSubmessage& rtps = header.submessage_.data_frag_sm();
1055  frags_.first = rtps.fragmentStartingNum.value;
1056  frags_.second = frags_.first + (rtps.fragmentsInSubmessage - 1);
1057  fragment_size_ = rtps.fragmentSize;
1058  total_frags_ = (rtps.sampleSize / rtps.fragmentSize) + (rtps.sampleSize % rtps.fragmentSize ? 1 : 0);
1059  }
1060 
1061  return header.valid();
1062 }
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8

◆ choose_recv_socket()

const ACE_SOCK_Dgram & OpenDDS::DCPS::RtpsUdpReceiveStrategy::choose_recv_socket ( ACE_HANDLE  fd) const
private

Definition at line 326 of file RtpsUdpReceiveStrategy.cpp.

References ACE_Event_Handler::get_handle(), ACE_IPC_SAP::get_handle(), link_, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().

Referenced by receive_bytes().

327 {
328 #ifdef ACE_HAS_IPV6
329  if (fd == link_->ipv6_multicast_socket().get_handle()) {
330  return link_->ipv6_multicast_socket();
331  }
332  if (fd == link_->ipv6_unicast_socket().get_handle()) {
333  return link_->ipv6_unicast_socket();
334  }
335 #endif
336  if (fd == link_->multicast_socket().get_handle()) {
337  return link_->multicast_socket();
338  }
339  return link_->unicast_socket();
340 }
ACE_SOCK_Dgram_Mcast & multicast_socket()
ACE_HANDLE get_handle(void) const
virtual ACE_HANDLE get_handle(void) const

◆ clear_completed_fragments()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::clear_completed_fragments ( const GUID_t pub_id)

◆ decode_payload()

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::decode_payload ( ReceivedDataSample sample,
const RTPS::DataSubmessage submessage 
)
private

Definition at line 926 of file RtpsUdpReceiveStrategy.cpp.

References ACE_ERROR, OpenDDS::DCPS::ReceivedDataSample::append(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::ReceivedDataSample::clear(), DDS::Security::SecurityException::code, OpenDDS::DCPS::ReceivedDataSample::copy_data(), OpenDDS::DCPS::SecurityDebug::encdec_warn, OpenDDS::STUN::encoding(), DDS::Security::ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_PAYLOAD_PROTECTED, DDS::Security::ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID, OpenDDS::DCPS::equal_guid_prefixes(), OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::GUID_t::guidPrefix, DDS::HANDLE_NIL, OpenDDS::DCPS::RtpsUdpDataLink::handle_registry(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::RTPS::DataSubmessage::inlineQos, OpenDDS::DCPS::Encoding::KIND_XCDR1, link_, LM_WARNING, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::local_, DDS::Security::SecurityException::message, DDS::Security::SecurityException::minor_code, OpenDDS::DCPS::RtpsSampleHeader::payload_byte_order(), OpenDDS::DCPS::DataSampleHeader::publication_id_, receiver_, OpenDDS::RTPS::security_attributes_to_bitmask(), OpenDDS::DCPS::RtpsUdpDataLink::security_config(), OpenDDS::DCPS::security_debug, OpenDDS::DCPS::serialized_size(), and OpenDDS::RTPS::DataSubmessage::smHeader.

Referenced by deliver_sample_i().

928 {
929  using namespace DDS::Security;
930 
931  static const EndpointSecurityAttributesMask MASK_PROTECT_PAYLOAD =
933  const CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
934  DatawriterCryptoHandle writer_crypto_handle;
936 
937  if (equal_guid_prefixes(sample.header_.publication_id_.guidPrefix, receiver_.local_)) {
938  writer_crypto_handle =
939  link_->handle_registry()->get_local_datawriter_crypto_handle(sample.header_.publication_id_);
940  esa =
941  RTPS::security_attributes_to_bitmask(link_->handle_registry()->get_local_datawriter_security_attributes(sample.header_.publication_id_));
942  } else {
943  writer_crypto_handle =
944  link_->handle_registry()->get_remote_datawriter_crypto_handle(sample.header_.publication_id_);
945  esa =
946  RTPS::security_attributes_to_bitmask(link_->handle_registry()->get_remote_datawriter_security_attributes(sample.header_.publication_id_));
947  }
948 
949  const bool payload_protected = (esa & MASK_PROTECT_PAYLOAD) == MASK_PROTECT_PAYLOAD;
950 
951  if (writer_crypto_handle == DDS::HANDLE_NIL || !crypto || !payload_protected) {
952  return true;
953  }
954 
955  DDS::OctetSeq encoded = sample.copy_data(), plain, iQos;
956 
957  const Encoding encoding(Encoding::KIND_XCDR1,
958  static_cast<Endianness>(submsg.smHeader.flags & 1));
959  size_t iQosSize = 0;
960  serialized_size(encoding, iQosSize, submsg.inlineQos);
961  iQos.length(static_cast<unsigned int>(iQosSize));
962  const char* iQos_raw = reinterpret_cast<const char*>(iQos.get_buffer());
963  ACE_Message_Block iQosMb(iQos_raw, iQos.length());
964  Serializer ser(&iQosMb, encoding);
965  ser << submsg.inlineQos;
966 
967  SecurityException ex = {"", 0, 0};
968  // DDS-Security: since origin authentication for payload is not yet supported
969  // the reader's crypto handle is NIL here (could be multiple readers in this
970  // participant)
971  const bool ok = crypto->decode_serialized_payload(plain, encoded, iQos,
973  writer_crypto_handle, ex);
974  if (ok) {
975  // The ReceivedDataSample's message block uses the transport's data block so it
976  // can't be modified in-place, instead replace it with a new block.
977  sample.clear();
978  sample.append(reinterpret_cast<const char*>(plain.get_buffer()), plain.length());
979 
980  if (plain.length() > 1) {
981  sample.header_.byte_order_ = RtpsSampleHeader::payload_byte_order(sample);
982  }
983 
984  } else if (security_debug.encdec_warn) {
985  ACE_ERROR((LM_WARNING, "(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy: "
986  "decode_serialized_payload failed [%d.%d]: %C\n",
987  ex.code, ex.minor_code, ex.message.in()));
988  }
989 
990  return ok;
991 }
#define ACE_ERROR(X)
unsigned long EndpointSecurityAttributesMask
NativeCryptoHandle DatawriterCryptoHandle
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_VALID
const InstanceHandle_t HANDLE_NIL
Security::HandleRegistry_rch handle_registry() const
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
static bool payload_byte_order(const ReceivedDataSample &rds)
const EndpointSecurityAttributesMask ENDPOINT_SECURITY_ATTRIBUTES_FLAG_IS_PAYLOAD_PROTECTED
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
Security::SecurityConfig_rch security_config() const
OpenDDS_Dcps_Export SecurityDebug security_debug
Definition: debug.cpp:32
DDS::Security::ParticipantSecurityAttributesMask security_attributes_to_bitmask(const DDS::Security::ParticipantSecurityAttributes &sec_attr)
Definition: MessageUtils.h:177

◆ deliver_from_secure()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_from_secure ( const RTPS::Submessage submessage,
const NetworkAddress remote_addr 
)
private

Definition at line 755 of file RtpsUdpReceiveStrategy.cpp.

References ACE_ERROR, ACE_HEX_DUMP, ACE_TEXT(), check_header(), DDS::Security::SecurityException::code, ACE_Message_Block::copy(), DDS::Security::DATAREADER_SUBMESSAGE, DDS::Security::DATAWRITER_SUBMESSAGE, deliver_sample_i(), OpenDDS::DCPS::SecurityDebug::encdec_warn, encoded_submsg_, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::equal_guid_prefixes(), OpenDDS::DCPS::GUID_t::guidPrefix, DDS::HANDLE_NIL, OpenDDS::DCPS::RtpsUdpDataLink::handle_registry(), DDS::Security::INFO_SUBMESSAGE, OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), link_, LM_DEBUG, LM_WARNING, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::local_, OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle(), OpenDDS::DCPS::TransportDebug::log_messages, OpenDDS::DCPS::make_id(), DDS::Security::SecurityException::message, message_, DDS::Security::SecurityException::minor_code, OpenDDS::DCPS::RtpsSampleHeader::more_fragments(), OpenDDS::DCPS::push_back(), reassemble_i(), receiver_, sec_submsg_to_octets(), secure_prefix_, secure_sample_, OpenDDS::DCPS::RtpsUdpDataLink::security_config(), OpenDDS::DCPS::security_debug, OpenDDS::RTPS::SecuritySubmessage::smHeader, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::source_guid_prefix_, OpenDDS::DCPS::RtpsSampleHeader::submessage_, OpenDDS::DCPS::SUBMESSAGE_NONE, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::Message::submessages, OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.

Referenced by deliver_sample_i().

757 {
758  using namespace DDS::Security;
759 
760  const CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
761  if (!crypto) {
762  // security not enabled for this datalink -- this can be reached
763  // when a secure message is seen on the same multicast group
764  return;
765  }
766 
768  const ParticipantCryptoHandle peer_pch = equal_guid_prefixes(peer.guidPrefix, receiver_.local_) ?
770  link_->handle_registry()->get_remote_participant_crypto_handle(peer);
771 
772  DDS::OctetSeq encoded_submsg, plain_submsg;
773  if (!sec_submsg_to_octets(encoded_submsg, submessage)) {
775  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy: ")
776  ACE_TEXT("deliver_from_secure failed to encode submessage %C RPCH %d\n"),
777  LogGuid(peer).c_str(), peer_pch));
778  }
779  return;
780  }
782  secure_sample_ = ReceivedDataSample();
783 
787  SecurityException ex = {"", 0, 0};
788 
789  bool ok = crypto->preprocess_secure_submsg(dwch, drch, category, encoded_submsg,
790  link_->local_crypto_handle(), peer_pch, ex);
791 
792  if (ok) {
793  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy::deliver_from_secure ")
794  ACE_TEXT("dwch is %d and drch is %d\n"), dwch, drch), 4);
795  }
796 
797  if (ok && category == DATAWRITER_SUBMESSAGE) {
798  ok = crypto->decode_datawriter_submessage(plain_submsg, encoded_submsg,
799  drch, dwch, ex);
800 
801  } else if (ok && category == DATAREADER_SUBMESSAGE) {
802  ok = crypto->decode_datareader_submessage(plain_submsg, encoded_submsg,
803  dwch, drch, ex);
804 
805  } else if (ok && category == INFO_SUBMESSAGE) {
806  return;
807 
808  } else {
810  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::deliver_from_secure ")
811  ACE_TEXT("failed remote %C RPCH %d, [%d.%d]: %C\n"),
812  LogGuid(peer).c_str(), peer_pch, ex.code, ex.minor_code, ex.message.in()));
813  }
814  return;
815  }
816 
817  if (!ok) {
818  bool dw = category == DATAWRITER_SUBMESSAGE;
820  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::deliver_from_secure ")
821  ACE_TEXT("decode %C submessage failed [%d.%d]: \"%C\" ")
822  ACE_TEXT("(rpch: %u, local d%cch: %u, remote d%cch: %u)\n"),
823  dw ? "writer" : "reader",
824  ex.code, ex.minor_code, ex.message.in(),
825  peer_pch,
826  dw ? 'r' : 'w',
827  dw ? drch : dwch,
828  dw ? 'w' : 'r',
829  dw ? dwch : drch
830  ));
831  }
832  return;
833  }
834 
835  ACE_Message_Block mb(plain_submsg.length());
836  mb.copy(reinterpret_cast<const char*>(plain_submsg.get_buffer()), mb.size());
837 
838  if (Transport_debug_level > 5) {
839  ACE_HEX_DUMP((LM_DEBUG, mb.rd_ptr(), mb.length(),
840  category == DATAWRITER_SUBMESSAGE ?
841  ACE_TEXT("RtpsUdpReceiveStrategy: decoded writer submessage") :
842  ACE_TEXT("RtpsUdpReceiveStrategy: decoded reader submessage")));
843  }
844 
845  RtpsSampleHeader rsh(mb);
846  if (check_header(rsh)) {
847  ReceivedDataSample plain_sample(mb);
848  if (rsh.into_received_data_sample(plain_sample)) {
849  if (rsh.more_fragments()) {
850  VDBG((LM_DEBUG, "(%P|%t) DBG: Attempt reassembly of decoded fragments\n"));
851  if (reassemble_i(plain_sample, rsh)) {
852  VDBG((LM_DEBUG, "(%P|%t) DBG: Reassembled complete message from decoded\n"));
853  encoded_submsg_ = true;
855  // Pop the secure envelope.
856  message_.submessages.length(message_.submessages.length() - 3);
857  DCPS::push_back(message_.submessages, rsh.submessage_);
858  }
859  deliver_sample_i(plain_sample, rsh.submessage_, remote_addr);
860  return;
861  }
862  }
863  encoded_submsg_ = true;
865  // Pop the secure envelope.
866  message_.submessages.length(message_.submessages.length() - 3);
867  DCPS::push_back(message_.submessages, rsh.submessage_);
868  }
869  deliver_sample_i(plain_sample, rsh.submessage_, remote_addr);
870  }
871  }
872 }
#define ACE_ERROR(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
NativeCryptoHandle DatawriterCryptoHandle
bool sec_submsg_to_octets(DDS::OctetSeq &encoded, const RTPS::Submessage &postfix)
const InstanceHandle_t HANDLE_NIL
Security::HandleRegistry_rch handle_registry() const
int copy(const char *buf, size_t n)
void deliver_sample_i(ReceivedDataSample &sample, const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
SubmessageSeq submessages
Definition: RtpsCore.idl:897
#define VDBG(DBG_ARGS)
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
virtual bool reassemble_i(ReceivedDataSample &data, RtpsSampleHeader &rsh)
#define ACE_HEX_DUMP(X)
virtual bool check_header(const RtpsTransportHeader &header)
Check the transport header for suitability.
ACE_TEXT("TCP_Factory")
bool log_messages
Log all RTPS messages sent or recieved.
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
#define VDBG_LVL(DBG_ARGS, LEVEL)
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
NativeCryptoHandle DatareaderCryptoHandle
Security::SecurityConfig_rch security_config() const
OpenDDS_Dcps_Export SecurityDebug security_debug
Definition: debug.cpp:32
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ deliver_sample()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
)
privatevirtual

Called when there is a ReceivedDataSample to be delivered.

Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 513 of file RtpsUdpReceiveStrategy.cpp.

References ACE_DEBUG, OpenDDS::RTPS::DATA, deliver_sample_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::dest_guid_prefix_, encoded_submsg_, link_, LM_DEBUG, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::TransportDebug::log_messages, message_, OpenDDS::DCPS::push_back(), OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::received_sample_header(), receiver_, OpenDDS::RTPS::SEC_POSTFIX, OpenDDS::RTPS::SEC_PREFIX, secure_prefix_, secure_sample_, OpenDDS::RTPS::SecuritySubmessage::smHeader, OpenDDS::DCPS::RtpsSampleHeader::submessage_, OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::Message::submessages, and OpenDDS::DCPS::transport_debug.

Referenced by handle_input().

515 {
516  using namespace RTPS;
517 
518  if (std::memcmp(receiver_.dest_guid_prefix_, link_->local_prefix(),
519  sizeof(GuidPrefix_t))) {
520  // Not our message, we may be on multicast listening to all the others.
522  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample - not destination\n"));
523  }
524  return;
525  }
526 
527  const RtpsSampleHeader& rsh = received_sample_header();
528 
530  DCPS::push_back(message_.submessages, rsh.submessage_);
531  }
532 
533 #ifdef OPENDDS_SECURITY
534  const SubmessageKind kind = rsh.submessage_._d();
535 
537  // secure envelope in progress, defer processing
538  secure_submessages_.push_back(rsh.submessage_);
539  if (kind == DATA) {
540  secure_sample_ = sample;
541  }
542  return;
543  }
544 
545  encoded_submsg_ = false;
546 #endif
547 
548  deliver_sample_i(sample, rsh.submessage_, NetworkAddress(remote_address));
549 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
const GuidPrefix_t & local_prefix() const
bool log_dropped_messages
Log received RTPS messages that were dropped.
void deliver_sample_i(ReceivedDataSample &sample, const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
SubmessageSeq submessages
Definition: RtpsCore.idl:897
bool log_messages
Log all RTPS messages sent or recieved.
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138

◆ deliver_sample_i()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i ( ReceivedDataSample sample,
const RTPS::Submessage submessage,
const NetworkAddress remote_addr 
)
private

Definition at line 552 of file RtpsUdpReceiveStrategy.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, OpenDDS::DCPS::LogGuid::c_str(), check_encoded(), OpenDDS::DCPS::LogGuid::conv_, OpenDDS::RTPS::DATA, OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::DataLink::data_received_include(), OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::DATAWRITER_LIVELINESS, decode_payload(), deliver_from_secure(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::directed_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::DCPS::RtpsUdpDataLink::filterBestEffortReaders(), OpenDDS::RTPS::FLAG_L, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::RTPS::GAP, OpenDDS::RTPS::Submessage::gap_sm, getDirectedWriteReaders(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::Submessage::hb_frag_sm, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::HEARTBEAT_FRAG, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_REPLY, OpenDDS::RTPS::INFO_REPLY_IP4, OpenDDS::RTPS::INFO_SRC, OpenDDS::RTPS::INFO_TS, link_, LM_DEBUG, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::RTPS::AckNackSubmessage::readerId, OpenDDS::RTPS::NackFragSubmessage::readerId, readers_selected_, readers_withheld_, OpenDDS::DCPS::RtpsUdpDataLink::received(), receiver_, recvd_sample_, OpenDDS::RTPS::SEC_POSTFIX, OpenDDS::RTPS::SEC_PREFIX, secure_prefix_, OpenDDS::RTPS::Submessage::security_sm, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::set_intersect(), OpenDDS::RTPS::HeartBeatSubmessage::smHeader, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::source_guid_prefix_, OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, OpenDDS::RTPS::GapSubmessage::writerId, OpenDDS::RTPS::HeartBeatSubmessage::writerId, and OpenDDS::RTPS::HeartBeatFragSubmessage::writerId.

Referenced by deliver_from_secure(), and deliver_sample().

555 {
556  using namespace RTPS;
557  const SubmessageKind kind = submessage._d();
558 
559  switch (kind) {
560  case INFO_SRC:
561  case INFO_REPLY_IP4:
562  case INFO_DST:
563  case INFO_REPLY:
564  case INFO_TS:
565  // No-op: the INFO_* submessages only modify the state of the
566  // MessageReceiver (see check_header()), they are not passed up to DCPS.
567  break;
568 
569  case DATA: {
570  receiver_.fill_header(sample.header_);
571  const DataSubmessage& data = submessage.data_sm();
572  if (!check_encoded(data.writerId)) {
574  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
575  }
576  break;
577  }
578 
579 #ifdef OPENDDS_SECURITY
580  if (!decode_payload(sample, data)) {
582  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
583  }
584  break;
585  }
586 #endif
587 
588  RepoIdSet directedWriteReaders;
589  getDirectedWriteReaders(directedWriteReaders, data);
590 
591  recvd_sample_ = &sample;
592  readers_selected_.clear();
593  readers_withheld_.clear();
594  // If this sample should be withheld from some readers in order to maintain
595  // in-order delivery, link_->received() will add it to readers_withheld_ otherwise
596  // it will be added to readers_selected_
597  link_->received(data, receiver_.source_guid_prefix_, remote_addr);
598  recvd_sample_ = 0;
599 
601 
602  if (data.readerId != ENTITYID_UNKNOWN) {
603  GUID_t reader;
604  std::memcpy(reader.guidPrefix, link_->local_prefix(),
605  sizeof(GuidPrefix_t));
606  reader.entityId = data.readerId;
607  if (!readers_withheld_.count(reader) &&
608  (directedWriteReaders.empty() || directedWriteReaders.find(reader) != directedWriteReaders.end())) {
609  if (Transport_debug_level > 5) {
610  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
611  ACE_TEXT("calling DataLink::data_received for seq: %q to reader %C\n"),
612  this, sample.header_.sequence_.getValue(), LogGuid(reader).c_str()));
613  }
614  link_->data_received(sample, reader);
615  }
616  } else {
617  if (Transport_debug_level > 5) {
618  OPENDDS_STRING included_ids;
619  bool first = true;
620  RepoIdSet::iterator iter = readers_selected_.begin();
621  while (iter != readers_selected_.end()) {
622  included_ids += (first ? "" : "\n") + LogGuid(*iter).conv_;
623  first = false;
624  ++iter;
625  }
626  OPENDDS_STRING excluded_ids;
627  first = true;
628  RepoIdSet::iterator iter2 = this->readers_withheld_.begin();
629  while (iter2 != readers_withheld_.end()) {
630  excluded_ids += (first ? "" : "\n") + LogGuid(*iter2).conv_;
631  first = false;
632  ++iter2;
633  }
634  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i:")
635  ACE_TEXT(" readers_selected ids: %C\n")
636  ACE_TEXT(" readers_withheld ids: %C\n"),
637  this, included_ids.c_str(), excluded_ids.c_str()));
638  }
639 
640  if (readers_withheld_.empty() && readers_selected_.empty()) {
641  if (directedWriteReaders.empty()) {
642  if (Transport_debug_level > 5) {
643  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
644  ACE_TEXT("calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n"),
645  this, sample.header_.sequence_.getValue()));
646  }
647  link_->data_received(sample);
648  } else {
649  if (Transport_debug_level > 5) {
650  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
651  ACE_TEXT("calling DataLink::data_received_include for seq: %q to directedWriteReaders\n"),
652  this, sample.header_.sequence_.getValue()));
653  }
654  link_->data_received_include(sample, directedWriteReaders);
655  }
656  } else {
657  if (directedWriteReaders.empty()) {
658  if (Transport_debug_level > 5) {
659  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
660  ACE_TEXT("calling DataLink::data_received_include for seq: %q to readers_selected_\n"),
661  this, sample.header_.sequence_.getValue()));
662  }
664  } else {
665  if (Transport_debug_level > 5) {
666  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample_i - ")
667  ACE_TEXT("calling DataLink::data_received_include for seq: %q to intersection of readers\n"),
668  this, sample.header_.sequence_.getValue()));
669  }
670  set_intersect(directedWriteReaders, readers_selected_, GUID_tKeyLessThan());
671  link_->data_received_include(sample, directedWriteReaders);
672  }
673  }
674  }
675  break;
676  }
677  case GAP:
678  if (!check_encoded(submessage.gap_sm().writerId)) {
680  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
681  }
682  break;
683  }
684  link_->received(submessage.gap_sm(), receiver_.source_guid_prefix_, receiver_.directed_, remote_addr);
685  break;
686 
687  case HEARTBEAT:
688  if (!check_encoded(submessage.heartbeat_sm().writerId)) {
690  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
691  }
692  break;
693  }
694  link_->received(submessage.heartbeat_sm(), receiver_.source_guid_prefix_, receiver_.directed_, remote_addr);
695  if (submessage.heartbeat_sm().smHeader.flags & FLAG_L) {
696  // Liveliness has been asserted. Create a DATAWRITER_LIVELINESS message.
697  sample.header_.message_id_ = DATAWRITER_LIVELINESS;
698  receiver_.fill_header(sample.header_);
699  sample.header_.publication_id_.entityId = submessage.heartbeat_sm().writerId;
700  link_->data_received(sample);
701  }
702  break;
703 
704  case ACKNACK:
705  if (!check_encoded(submessage.acknack_sm().readerId)) {
707  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
708  }
709  break;
710  }
711  link_->received(submessage.acknack_sm(), receiver_.source_guid_prefix_, remote_addr);
712  break;
713 
714  case HEARTBEAT_FRAG:
715  if (!check_encoded(submessage.hb_frag_sm().writerId)) {
717  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
718  }
719  break;
720  }
721  link_->received(submessage.hb_frag_sm(), receiver_.source_guid_prefix_, receiver_.directed_, remote_addr);
722  break;
723 
724  case NACK_FRAG:
725  if (!check_encoded(submessage.nack_frag_sm().readerId)) {
727  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::deliver_sample_i - decode error\n"));
728  }
729  break;
730  }
731  link_->received(submessage.nack_frag_sm(), receiver_.source_guid_prefix_, remote_addr);
732  break;
733 
734  /* no case DATA_FRAG: by the time deliver_sample() is called, reassemble()
735  has successfully reassembled the fragments and we now have a DATA submsg
736  */
737 
738 #ifdef OPENDDS_SECURITY
739  case SEC_PREFIX:
740  secure_prefix_ = submessage.security_sm();
741  break;
742 
743  case SEC_POSTFIX:
744  deliver_from_secure(submessage, remote_addr);
745  break;
746 #endif
747 
748  default:
749  break;
750  }
751 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
void received(const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
GuidSet RepoIdSet
Definition: GuidUtils.h:113
void filterBestEffortReaders(const ReceivedDataSample &ds, RepoIdSet &selected, RepoIdSet &withheld)
void deliver_from_secure(const RTPS::Submessage &submessage, const NetworkAddress &remote_addr)
const GuidPrefix_t & local_prefix() const
bool log_dropped_messages
Log received RTPS messages that were dropped.
#define OPENDDS_STRING
const octet FLAG_L
Definition: RtpsCore.idl:524
bool decode_payload(ReceivedDataSample &sample, const RTPS::DataSubmessage &submessage)
ACE_TEXT("TCP_Factory")
bool set_intersect(SetA &sA, const SortedB &sB, LessThan lessThan)
Definition: Util.h:200
bool getDirectedWriteReaders(RepoIdSet &directedWriteReaders, const RTPS::DataSubmessage &ds) const
void data_received_include(ReceivedDataSample &sample, const RepoIdSet &incl)
Definition: DataLink.cpp:698
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
bool check_encoded(const EntityId_t &sender)

◆ do_not_withhold_data_from()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::do_not_withhold_data_from ( const GUID_t sub_id)

Definition at line 1084 of file RtpsUdpReceiveStrategy.cpp.

References readers_selected_.

1085 {
1086  readers_selected_.insert(sub_id);
1087 }

◆ end_transport_header_processing()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::end_transport_header_processing ( )
virtual

◆ getDirectedWriteReaders()

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::getDirectedWriteReaders ( RepoIdSet directedWriteReaders,
const RTPS::DataSubmessage ds 
) const
private

Definition at line 1089 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::DataSubmessage::inlineQos, OpenDDS::RTPS::ProtocolVersion_t::minor, OpenDDS::RTPS::PID_DIRECTED_WRITE, receiver_, and OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::source_version_.

Referenced by deliver_sample_i().

1090 {
1091  directedWriteReaders.clear();
1092  for (CORBA::ULong i = 0; i < ds.inlineQos.length(); ++i) {
1093  if (ds.inlineQos[i]._d() == RTPS::PID_DIRECTED_WRITE
1094  && receiver_.source_version_.minor >= 4) {
1095  directedWriteReaders.insert(ds.inlineQos[i].guid());
1096  }
1097  }
1098  return !directedWriteReaders.empty();
1099 }
const ParameterId_t PID_DIRECTED_WRITE
Definition: RtpsCore.idl:300
ACE_CDR::ULong ULong

◆ handle_input()

int OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input ( ACE_HANDLE  fd)
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 77 of file RtpsUdpReceiveStrategy.cpp.

References ACE_DEBUG, ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_DES_FREE, ACE_NEW_MALLOC_RETURN, ACE_TEXT(), check_header(), OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::data_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::data_sample_header_, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::db_allocator_, OpenDDS::DCPS::DCPS_debug_level, deliver_sample(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::RtpsSampleHeader::get_serialized_size(), OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::gracefully_disconnected_, OpenDDS::DCPS::LogLevel::Info, OpenDDS::DCPS::RtpsSampleHeader::into_received_data_sample(), OpenDDS::DCPS::RtpsTransportHeader::last_fragment(), OpenDDS::DCPS::RtpsTransportHeader::length_, LM_DEBUG, LM_INFO, LM_WARNING, OpenDDS::DCPS::log_level, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::RtpsSampleHeader::message_length(), OpenDDS::DCPS::RtpsSampleHeader::more_fragments(), OpenDDS::DCPS::RtpsSampleHeader::pdu_remaining(), OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::pdu_remaining_, ACE_Message_Block::rd_ptr(), reassemble(), receive_bytes(), OpenDDS::DCPS::RECEIVE_DATA_BUFFER_SIZE, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::receive_lock_, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::receive_transport_header_, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::relink(), ACE_Message_Block::reset(), ACE_Message_Block::space(), OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::stop(), thread_status_manager_, OpenDDS::DCPS::RtpsTransportHeader::valid(), VDBG, ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.

78 {
79  ThreadStatusManager::Event ev(thread_status_manager_);
80 
81  // Since BUFFER_COUNT is 1, the index will always be 0
82  const size_t INDEX = 0;
83 
84  ACE_Message_Block* const cur_rb = receive_buffers_[INDEX];
85  cur_rb->reset();
86 
87  iovec iov;
88 #ifdef _MSC_VER
89 #pragma warning(push)
90 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
91 // since on other platforms iov_len is 64-bit
92 #pragma warning(disable : 4267)
93 #endif
94  iov.iov_len = cur_rb->space();
95 #ifdef _MSC_VER
96 #pragma warning(pop)
97 #endif
98  iov.iov_base = cur_rb->wr_ptr();
99 
100  ACE_INET_Addr remote_address;
101  bool stop = false;
102  ssize_t bytes_remaining = receive_bytes(&iov,
103  1,
104  remote_address,
105  fd,
106  stop);
107 
108  if (stop) {
109  return 0;
110  }
111 
112  if (bytes_remaining < 0) {
113  relink();
114  return -1;
115  }
116 
117  cur_rb->wr_ptr(bytes_remaining);
118 
119  if (bytes_remaining == 0) {
121  return -1;
122  } else {
123  relink();
124  return -1;
125  }
126  }
127 
128  if (!pdu_remaining_) {
129  receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes_remaining);
130  }
131 
132  receive_transport_header_ = *cur_rb;
134  cur_rb->reset();
135  if (DCPS_debug_level > 0) {
136  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: RtpsUdpReceiveStrategy::handle_input: TransportHeader invalid.\n")));
137  }
138  return 0;
139  }
140 
141  bytes_remaining = receive_transport_header_.length_;
143  return 0;
144  }
145 
146  {
147  const ScopedHeaderProcessing shp(*this);
148  while (bytes_remaining > 0) {
149  data_sample_header_.pdu_remaining(bytes_remaining);
150  data_sample_header_ = *cur_rb;
151  bytes_remaining -= data_sample_header_.get_serialized_size();
153  return 0;
154  }
155  ReceivedDataSample rds = data_sample_header_.message_length() ? ReceivedDataSample(*cur_rb) : ReceivedDataSample();
157 
159  VDBG((LM_DEBUG,"(%P|%t) DBG: Attempt reassembly of fragments\n"));
160 
161  if (reassemble(rds)) {
162  VDBG((LM_DEBUG,"(%P|%t) DBG: Reassembled complete message\n"));
163  deliver_sample(rds, remote_address);
164  }
165  // If reassemble() returned false, it takes ownership of the data
166  // just like deliver_sample() does.
167 
168  } else {
169  deliver_sample(rds, remote_address);
170  }
171  }
173  bytes_remaining -= data_sample_header_.message_length();
174 
175  // For the reassembly algorithm, the 'last_fragment_' header bit only
176  // applies to the first DataSampleHeader in the TransportHeader
178  }
179  }
180 
181  // If newly selected buffer index still has a reference count, we'll need to allocate a new one for the read
182  if (receive_buffers_[INDEX]->data_block()->reference_count() > 1) {
183 
184  if (log_level >= LogLevel::Info) {
185  ACE_DEBUG((LM_INFO, "(%P|%t) INFO: RtpsUdpReceiveStrategy::handle_input: reallocating primary receive buffer based on reference count\n"));
186  }
187 
188  ACE_DES_FREE(
189  receive_buffers_[INDEX],
192 
194  receive_buffers_[INDEX],
197  RECEIVE_DATA_BUFFER_SIZE, // Buffer size
198  ACE_Message_Block::MB_DATA, // Default
199  0, // Start with no continuation
200  0, // Let the constructor allocate
201  &data_allocator_, // Our buffer cache
202  &receive_lock_, // Our locking strategy
203  ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default
204  ACE_Time_Value::zero, // Default
205  ACE_Time_Value::max_time, // Default
206  &db_allocator_, // Our data block cache
207  &mb_allocator_ // Our message block cache
208  ),
209  -1);
210  }
211 
212  return 0;
213 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_DEBUG(X)
static const ACE_Time_Value max_time
virtual void deliver_sample(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
Called when there is a ReceivedDataSample to be delivered.
virtual ssize_t receive_bytes(iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
Only our subclass knows how to do this.
void reset(void)
bool into_received_data_sample(ReceivedDataSample &rds)
int ssize_t
char * rd_ptr(void) const
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
#define VDBG(DBG_ARGS)
char * wr_ptr(void) const
virtual bool check_header(const RtpsTransportHeader &header)
Check the transport header for suitability.
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void free(void *ptr)
Return a chunk of memory back to free list cache.
ACE_TEXT("TCP_Factory")
size_t space(void) const
static const ACE_Time_Value zero
virtual bool reassemble(ReceivedDataSample &data)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > receive_lock_
Locking strategy for the allocators.

◆ has_fragments()

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments ( const SequenceRange range,
const GUID_t pub_id,
FragmentInfo *  frag_info = 0 
)

Definition at line 1189 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::TransportReassembly::get_gaps(), OpenDDS::DCPS::TransportReassembly::has_frags(), OpenDDS::RTPS::FragmentNumberSet::numBits, OPENDDS_VECTOR(), reassembly_, and OpenDDS::RTPS::FragmentNumber_t::value.

1192 {
1193  for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
1194  ACE_UINT32 total_frags = 0;
1195  if (reassembly_.has_frags(sn, pub_id, total_frags)) {
1196  if (frag_info) {
1197  if (total_frags > 256) {
1198  static const CORBA::Long empty_buffer[8] = { 0, 0, 0, 0, 0, 0, 0, 0 };
1199  OPENDDS_VECTOR(CORBA::Long) buffer(total_frags + 31 / 32, 0);
1200  ACE_UINT32 numBits = 0;
1201  size_t idx = 0;
1202  const ACE_UINT32 base = reassembly_.get_gaps(sn, pub_id, &buffer[0], static_cast<CORBA::ULong>(buffer.size()), numBits);
1203  const CORBA::ULong end = base + numBits;
1204  for (CORBA::ULong i = base; i <= end; i += 256) {
1205  const CORBA::ULong remain = end - i;
1206  const CORBA::ULong len = std::min(remain, static_cast<CORBA::ULong>(256));
1207  const CORBA::ULong len32 = (len + 31) / 32;
1208  const CORBA::ULong len8 = len32 * 4;
1209  if (std::memcmp(&buffer[idx], &empty_buffer[0], len8) != 0) {
1210  std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
1211  p.first = sn;
1212  p.second = RTPS::FragmentNumberSet();
1213  frag_info->push_back(p);
1214  RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
1215  missing_frags.numBits = len;
1216  missing_frags.bitmapBase.value = i;
1217  missing_frags.bitmap.length(len32);
1218  std::memcpy(missing_frags.bitmap.get_buffer(), &buffer[idx], len8);
1219  }
1220  idx += 8;
1221  }
1222  } else {
1223  std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
1224  p.first = sn;
1225  p.second = RTPS::FragmentNumberSet();
1226  frag_info->push_back(p);
1227  RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
1228  missing_frags.numBits = 0; // make sure this is a valid number before passing to get_gaps
1229  missing_frags.bitmap.length(8); // start at max length
1230  missing_frags.bitmapBase.value =
1231  reassembly_.get_gaps(sn, pub_id, missing_frags.bitmap.get_buffer(),
1232  8, missing_frags.numBits);
1233  // reduce length in case get_gaps() didn't need all that room
1234  missing_frags.bitmap.length((missing_frags.numBits + 31) / 32);
1235  }
1236  } else {
1237  return true;
1238  }
1239  }
1240  }
1241  return frag_info ? !frag_info->empty() : false;
1242 }
ACE_CDR::Long Long
typedef OPENDDS_VECTOR(SeqFragPair) FragmentInfo
bool has_frags(const SequenceNumber &seq, const GUID_t &pub_id) const
ACE_CDR::ULong ULong
CORBA::ULong get_gaps(const SequenceNumber &msg_seq, const GUID_t &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const

◆ OPENDDS_VECTOR() [1/2]

typedef OpenDDS::DCPS::RtpsUdpReceiveStrategy::OPENDDS_VECTOR ( SeqFragPair  )

Referenced by has_fragments().

◆ OPENDDS_VECTOR() [2/2]

OpenDDS::DCPS::RtpsUdpReceiveStrategy::OPENDDS_VECTOR ( RTPS::Submessage  )
private

◆ reassemble()

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble ( ReceivedDataSample data)
privatevirtual

◆ reassemble_i()

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble_i ( ReceivedDataSample data,
RtpsSampleHeader rsh 
)
privatevirtual

Definition at line 1107 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::Submessage::data_frag_sm, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, OpenDDS::DCPS::ReceivedDataSample::fragment_size_, fragment_size_, frags_, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataLink::is_target(), OpenDDS::DCPS::DataSampleHeader::key_fields_only_, link_, OpenDDS::DCPS::ReceivedDataSample::peek(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::TransportReassembly::reassemble(), reassembly_, receiver_, OpenDDS::DCPS::RtpsSampleHeader::submessage_, and total_frags_.

Referenced by deliver_from_secure(), and reassemble().

1108 {
1109  using namespace RTPS;
1110  receiver_.fill_header(data.header_); // set publication_id_.guidPrefix
1111  data.fragment_size_ = fragment_size_;
1112  if (link_->is_target(data.header_.publication_id_) && reassembly_.reassemble(frags_, data, total_frags_)) {
1113 
1114  // Reassembly was successful, replace DataFrag with Data. This doesn't have
1115  // to be a fully-formed DataSubmessage, just enough for this class to use
1116  // in deliver_sample() which ends up calling RtpsUdpDataLink::received().
1117  // In particular we will need the SequenceNumber, but ignore the iQoS.
1118 
1119  // Peek at the byte order from the encapsulation containing the payload.
1120  data.header_.byte_order_ = data.peek(1) & FLAG_E;
1121 
1122  const DataFragSubmessage& dfsm = rsh.submessage_.data_frag_sm();
1123 
1124  const CORBA::Octet data_flags = (data.header_.byte_order_ ? FLAG_E : 0)
1125  | (data.header_.key_fields_only_ ? FLAG_K_IN_DATA : FLAG_D);
1126  const DataSubmessage dsm = {
1127  {DATA, data_flags, 0}, 0, DATA_OCTETS_TO_IQOS,
1128  dfsm.readerId, dfsm.writerId, dfsm.writerSN, ParameterList()};
1129  rsh.submessage_.data_sm(dsm);
1130  return true;
1131  }
1132  return false;
1133 }
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:527
bool is_target(const GUID_t &remote_id)
Definition: DataLink.cpp:1013
const octet FLAG_E
Definition: RtpsCore.idl:518
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
Definition: MessageTypes.h:102
bool reassemble(const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags=0)
sequence< Parameter > ParameterList
ACE_CDR::Octet Octet
const octet FLAG_D
Definition: RtpsCore.idl:523

◆ receive_bytes()

ssize_t OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr remote_address,
ACE_HANDLE  fd,
bool &  stop 
)
privatevirtual

Only our subclass knows how to do this.

Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 343 of file RtpsUdpReceiveStrategy.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), choose_recv_socket(), DDS::Security::SecurityException::code, OpenDDS::DCPS::SecurityDebug::encdec_warn, encoded_rtps_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::equal_guid_prefixes(), OpenDDS::DCPS::RtpsUdpDataLink::get_ice_agent(), OpenDDS::DCPS::RtpsUdpDataLink::get_ice_endpoint(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUID_t::guidPrefix, DDS::HANDLE_NIL, OpenDDS::DCPS::RtpsUdpDataLink::handle_registry(), link_, LM_DEBUG, LM_WARNING, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::local_, OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle(), OpenDDS::DCPS::TransportDebug::log_dropped_messages, DDS::Security::SecurityException::message, DDS::Security::SecurityException::minor_code, DDS::Security::OPENDDS_EXCEPTION_CODE_NO_KEY, DDS::Security::OPENDDS_EXCEPTION_MINOR_CODE_NO_KEY, receive_bytes_helper(), receiver_, ACE_SOCK_Dgram::recv(), remote_address_, OpenDDS::RTPS::RTPSHDR_SZ, OpenDDS::DCPS::RtpsUdpDataLink::security_config(), OpenDDS::DCPS::security_debug, OpenDDS::RTPS::SMHDR_SZ, socket(), OpenDDS::RTPS::SRTPS_PREFIX, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::stop(), OpenDDS::DCPS::RtpsUdpDataLink::transport(), and OpenDDS::DCPS::transport_debug.

Referenced by handle_input().

348 {
350 #ifdef ACE_LACKS_SENDMSG
351  ACE_UNUSED_ARG(stop);
352  char buffer[0x10000];
353  ssize_t scatter = socket.recv(buffer, sizeof buffer, remote_address);
354  char* iter = buffer;
355  for (int i = 0; scatter > 0 && i < n; ++i) {
356  const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len), // int on LynxOS
357  static_cast<size_t>(scatter));
358  std::memcpy(iov[i].iov_base, iter, chunk);
359  scatter -= chunk;
360  iter += chunk;
361  }
362  const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer);
363 #else
364  const ssize_t ret = receive_bytes_helper(iov, n, socket, remote_address,
365 #ifdef OPENDDS_SECURITY
367 #endif
368  *link_->transport(), stop);
369 #endif
370  remote_address_ = remote_address;
371 
372 #ifdef OPENDDS_SECURITY
373  if (stop) {
374  return ret;
375  }
376 
377  using namespace DDS::Security;
379  if (ret > 0 && receiver != DDS::HANDLE_NIL) {
380  encoded_rtps_ = false;
381 
382  GUID_t peer = GUID_UNKNOWN;
383 
384  const CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
385  if (!crypto) {
386  return recv_err("no crypto plugin", remote_address, peer, stop);
387  }
388 
389  if (ret < RTPS::RTPSHDR_SZ + RTPS::SMHDR_SZ) {
390  return recv_err("message too short", remote_address, peer, stop);
391  }
392 
393  const unsigned int encLen = static_cast<unsigned int>(ret);
394  DDS::OctetSeq encoded(encLen);
395  encoded.length(encLen);
396  unsigned char* const encBuf = encoded.get_buffer();
397  size_t copied = 0;
398  for (int i = 0; i < n && copied < encLen; ++i) {
399  const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
400  static_cast<size_t>(encLen - copied));
401  std::memcpy(encBuf + copied, iov[i].iov_base, chunk);
402  copied += chunk;
403  }
404 
405  if (copied != encLen) {
406  return recv_err("received bytes didn't fit in iovec array", remote_address, peer, stop);
407  }
408 
409  if (encoded[RTPS::RTPSHDR_SZ] != RTPS::SRTPS_PREFIX) {
410  return ret;
411  }
412 
413  static const int GuidPrefixOffset = 8; // "RTPS", Version(2), Vendor(2)
414  std::memcpy(peer.guidPrefix, encBuf + GuidPrefixOffset, sizeof peer.guidPrefix);
415  peer.entityId = RTPS::ENTITYID_PARTICIPANT;
416  const ParticipantCryptoHandle sender = equal_guid_prefixes(peer.guidPrefix, receiver_.local_) ?
418  link_->handle_registry()->get_remote_participant_crypto_handle(peer);
419  if (sender == DDS::HANDLE_NIL) {
421  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::receive_bytes - decode error from %C\n", LogGuid(peer).c_str()));
422  }
424  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::receive_bytes: ")
425  ACE_TEXT("decode_rtps_message no remote participant crypto handle for %C, dropping\n"),
426  LogGuid(peer).c_str()));
427  }
428  stop = true;
429  return ret;
430  }
431 
432  DDS::OctetSeq plain;
433  SecurityException ex = {"", 0, 0};
434  if (!crypto->decode_rtps_message(plain, encoded, receiver, sender, ex)) {
436  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpReceiveStrategy::receive_bytes - decode error from %C\n", LogGuid(peer).c_str()));
437  }
439  ACE_ERROR((LM_WARNING, "(%P|%t) {encdec_warn} decode_rtps_message SecurityException [%d.%d]: %C\n",
440  ex.code, ex.minor_code, ex.message.in()));
441  }
444  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) {encdec_warn} RtpsUdpReceiveStrategy::receive_bytes: ")
445  ACE_TEXT("decode_rtps_message remote participant has crypto handle but no key, dropping\n")));
446  }
447  stop = true;
448  return ret;
449  }
450  return recv_err("decode_rtps_message failed", remote_address, peer, stop);
451  }
452 
453  copied = 0;
454  const size_t plainLen = plain.length();
455  const unsigned char* const plainBuf = plain.get_buffer();
456  for (int i = 0; i < n && copied < plainLen; ++i) {
457  const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
458  plainLen - copied);
459  std::memcpy(iov[i].iov_base, plainBuf + copied, chunk);
460  copied += chunk;
461  }
462 
463  if (copied != plainLen) {
464  return recv_err("plaintext doesn't fit in iovec array", remote_address, peer, stop);
465  }
466 
467  encoded_rtps_ = true;
468  return plainLen;
469  }
470 #endif
471 
472  return ret;
473 }
const ACE_CDR::UShort RTPSHDR_SZ
Definition: MessageTypes.h:105
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
DCPS::RcHandle< ICE::Agent > get_ice_agent() const
RtpsUdpTransport_rch transport()
const InstanceHandle_t HANDLE_NIL
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Security::HandleRegistry_rch handle_registry() const
int ssize_t
const long OPENDDS_EXCEPTION_CODE_NO_KEY
bool log_dropped_messages
Log received RTPS messages that were dropped.
ACE_HANDLE socket(int protocol_family, int type, int proto)
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
const ACE_SOCK_Dgram & choose_recv_socket(ACE_HANDLE fd) const
static ssize_t receive_bytes_helper(iovec iov[], int n, const ACE_SOCK_Dgram &socket, ACE_INET_Addr &remote_address, DCPS::RcHandle< ICE::Agent > agent, DCPS::WeakRcHandle< ICE::Endpoint > endpoint, RtpsUdpTransport &tport, bool &stop)
ACE_TEXT("TCP_Factory")
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
Security::SecurityConfig_rch security_config() const
OpenDDS_Dcps_Export SecurityDebug security_debug
Definition: debug.cpp:32
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint() const
const long OPENDDS_EXCEPTION_MINOR_CODE_NO_KEY

◆ receive_bytes_helper()

ssize_t OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes_helper ( iovec  iov[],
int  n,
const ACE_SOCK_Dgram socket,
ACE_INET_Addr remote_address,
DCPS::RcHandle< ICE::Agent agent,
DCPS::WeakRcHandle< ICE::Endpoint endpoint,
RtpsUdpTransport tport,
bool &  stop 
)
static

Definition at line 216 of file RtpsUdpReceiveStrategy.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_NOTSUP_RETURN, OpenDDS::STUN::Message::block, ACE_Message_Block::cont(), OpenDDS::DCPS::SecurityDebug::encdec_warn, OpenDDS::STUN::encoding(), ACE_INET_Addr::get_addr_size(), ACE_Addr::get_size(), ACE_Message_Block::length(), LM_ERROR, LM_WARNING, OpenDDS::DCPS::MCK_RTPS, OpenDDS::DCPS::MCK_STUN, ACE_SOCK_Dgram::recv(), ACE_Message_Block::release(), OpenDDS::DCPS::security_debug, and OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::stop().

Referenced by OpenDDS::DCPS::RtpsUdpTransport::IceEndpoint::handle_input(), and receive_bytes().

226 {
227  ACE_INET_Addr local_address;
228  const ssize_t ret = socket.recv(iov, n, remote_address, 0
229 #if defined(ACE_RECVPKTINFO) || defined(ACE_RECVPKTINFO6)
230  , &local_address
231 #endif
232  );
233 
234  if (ret == -1) {
235  return ret;
236  }
237 
238  if (remote_address.get_size() > remote_address.get_addr_size()) {
239  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpReceiveStrategy::receive_bytes_helper - invalid address size\n"));
240  return 0;
241  }
242 
243  if (n > 0 && ret > 0 && iov[0].iov_len >= 4 && std::memcmp(iov[0].iov_base, "RTPS", 4) == 0) {
244  RtpsUdpInst_rch cfg = tport.config();
245  if (cfg && cfg->count_messages()) {
246  const NetworkAddress ra(remote_address);
247  const InternalMessageCountKey key(ra, MCK_RTPS, ra == cfg->rtps_relay_address());
248  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, tport.transport_statistics_mutex_, -1);
249  tport.transport_statistics_.message_count[key].recv(ret);
250  }
251  return ret;
252  }
253 
254 #ifdef OPENDDS_SECURITY
255  // Assume STUN
256 # ifndef ACE_RECVPKTINFO
257  ACE_ERROR((LM_ERROR, "ERROR: RtpsUdpReceiveStrategy::receive_bytes_helper potential STUN message "
258  "received but this version of the ACE library doesn't support the local_address "
259  "extension in ACE_SOCK_Dgram::recv\n"));
260  ACE_UNUSED_ARG(stop);
261  ACE_NOTSUP_RETURN(-1);
262 # else
263 
264  stop = true;
265  size_t bytes = ret;
266  size_t block_size = std::min(bytes, static_cast<size_t>(iov[0].iov_len));
267  ACE_Message_Block* head = new ACE_Message_Block(static_cast<const char*>(iov[0].iov_base), block_size);
268  head->length(block_size);
269  bytes -= block_size;
270 
271  ACE_Message_Block* tail = head;
272  for (int i = 1; i < n && bytes != 0; ++i) {
273  block_size = std::min(bytes, static_cast<size_t>(iov[i].iov_len));
274  ACE_Message_Block* mb = new ACE_Message_Block(static_cast<const char*>(iov[i].iov_base), block_size);
275  mb->length(block_size);
276  tail->cont(mb);
277  tail = mb;
278  bytes -= block_size;
279  }
280 
281  DCPS::Serializer serializer(head, STUN::encoding);
282  STUN::Message message;
283  message.block = head;
284  if (serializer >> message) {
285  RtpsUdpInst_rch cfg = tport.config();
286  if (cfg && cfg->count_messages()) {
287  const NetworkAddress ra(remote_address);
288  const InternalMessageCountKey key(ra, MCK_STUN, ra == cfg->rtps_relay_address());
289  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, tport.transport_statistics_mutex_, -1);
290  tport.transport_statistics_.message_count[key].recv(ret);
291  }
292 
293  if (tport.relay_srsm().is_response(message)) {
294  tport.process_relay_sra(tport.relay_srsm().receive(message));
295 #ifdef OPENDDS_SECURITY
296  } else if (endpoint) {
297  ice_agent->receive(endpoint, local_address, remote_address, message);
298 #endif
299  }
300  }
301  head->release();
302 # endif
303 #else
304  ACE_UNUSED_ARG(stop);
305 #endif
306 
307  return ret;
308 }
const MessageCountKind MCK_STUN
#define ACE_ERROR(X)
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
size_t length(void) const
sequence< octet > key
int get_size(void) const
int ssize_t
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual ACE_Message_Block * release(void)
ACE_Message_Block * cont(void) const
const MessageCountKind MCK_RTPS
int get_addr_size(void) const
#define ACE_NOTSUP_RETURN(FAILVALUE)
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ remove_fragments()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments ( const SequenceRange range,
const GUID_t pub_id 
)

Remove any saved fragments. We do not expect to receive any more fragments with sequence numbers in "range" from publication "pub_id".

Definition at line 1174 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReassembly::data_unavailable(), and reassembly_.

1176 {
1177  for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
1178  reassembly_.data_unavailable(sn, pub_id);
1179  }
1180 }
void data_unavailable(const FragmentRange &transportSeqDropped)

◆ remove_frags_from_bitmap()

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap ( CORBA::Long  bitmap[],
CORBA::ULong  num_bits,
const SequenceNumber base,
const GUID_t pub_id,
ACE_CDR::ULong samples_requested 
)

For each "1" bit in the bitmap, change it to a "0" if there are fragments from publication "pub_id" for the sequence number represented by that position in the bitmap. Returns true if the bitmap was changed.

Definition at line 1136 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReassembly::has_frags(), and reassembly_.

1141 {
1142  bool modified = false;
1143  for (CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) {
1144  if (bit == 32) bit = 0;
1145 
1146  if (bit == 0) {
1147  x = static_cast<CORBA::ULong>(bitmap[i / 32]);
1148  if (x == 0) {
1149  // skip an entire Long if it's all 0's (adds 32 due to ++i)
1150  i += 31;
1151  bit = 31;
1152  //FUTURE: this could be generalized with something like the x86 "bsr"
1153  // instruction using compiler intrinsics, VC++ _BitScanReverse()
1154  // and GCC __builtin_clz()
1155  continue;
1156  }
1157  }
1158 
1159  const CORBA::ULong mask = 1 << (31 - bit);
1160  if (x & mask) {
1161  const bool has_frags = reassembly_.has_frags(base + i, pub_id);
1162  if (has_frags) {
1163  x &= ~mask;
1164  bitmap[i / 32] = x;
1165  modified = true;
1166  --cumulative_bits_added;
1167  }
1168  }
1169  }
1170  return modified;
1171 }
bool has_frags(const SequenceNumber &seq, const GUID_t &pub_id) const
ACE_CDR::ULong ULong

◆ sec_submsg_to_octets()

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::sec_submsg_to_octets ( DDS::OctetSeq encoded,
const RTPS::Submessage postfix 
)
private

Definition at line 875 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::align(), OpenDDS::DCPS::Serializer::align_r(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, OpenDDS::DCPS::ReceivedDataSample::data_length(), OpenDDS::STUN::encoding(), OpenDDS::DCPS::ENDIAN_BIG, OpenDDS::DCPS::Encoding::KIND_XCDR1, ACE_Message_Block::length(), ACE_Message_Block::rd_ptr(), secure_prefix_, secure_sample_, OpenDDS::DCPS::serialized_size(), OpenDDS::RTPS::SMHDR_SZ, and OpenDDS::DCPS::ReceivedDataSample::write_data().

Referenced by deliver_from_secure().

877 {
878  const Encoding encoding(Encoding::KIND_XCDR1, ENDIAN_BIG);
879  size_t size = serialized_size(encoding, secure_prefix_);
880 
881  for (size_t i = 0; i < secure_submessages_.size(); ++i) {
882  serialized_size(encoding, size, secure_submessages_[i]);
883  const RTPS::SubmessageKind kind = secure_submessages_[i]._d();
884  if (kind == RTPS::DATA || kind == RTPS::DATA_FRAG) {
885  size += secure_sample_.data_length();
886  }
887  align(size, RTPS::SMHDR_SZ);
888  }
889  serialized_size(encoding, size, postfix);
890 
891  ACE_Message_Block mb(size);
892  Serializer ser(&mb, encoding);
893  if (!(ser << secure_prefix_)) {
894  return false;
895  }
896 
897  if (!ser.align_r(RTPS::SMHDR_SZ)) {
898  return false;
899  }
900 
901  for (size_t i = 0; i < secure_submessages_.size(); ++i) {
902  if (!(ser << secure_submessages_[i])) {
903  return false;
904  }
905  const RTPS::SubmessageKind kind = secure_submessages_[i]._d();
906  if (kind == RTPS::DATA || kind == RTPS::DATA_FRAG) {
907  if (!secure_sample_.write_data(ser)) {
908  return false;
909  }
910  }
911  if (!ser.align_r(RTPS::SMHDR_SZ)) {
912  return false;
913  }
914  }
915  if (!(ser << postfix)) {
916  return false;
917  }
918 
919  encoded.length(static_cast<unsigned int>(mb.length()));
920  std::memcpy(encoded.get_buffer(), mb.rd_ptr(), mb.length());
921  secure_submessages_.resize(0);
922 
923  return true;
924 }
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
bool write_data(Serializer &ser) const
write the data payload to the Serializer
OpenDDS_Dcps_Export void align(size_t &value, size_t by)
Align "value" by "by" if it&#39;s not already.
Definition: Serializer.inl:23
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
size_t data_length() const
total length of usable bytes (between rd_ptr and wr_ptr) of all Data Blocks

◆ start_i()

int OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i ( )
privatevirtual

Let the subclass start.

Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 995 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), ACE_IPC_SAP::get_handle(), ACE_Event_Handler::get_handle(), OpenDDS::DCPS::RtpsUdpDataLink::get_reactor_interceptor(), link_, ACE_Event_Handler::READ_MASK, and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().

996 {
998  ri->execute_or_enqueue(make_rch<RegisterHandler>(link_->unicast_socket().get_handle(), this, static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
999 #ifdef ACE_HAS_IPV6
1000  ri->execute_or_enqueue(make_rch<RegisterHandler>(link_->ipv6_unicast_socket().get_handle(), this, static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1001 #endif
1002 
1003  return 0;
1004 }
unsigned long ACE_Reactor_Mask
ReactorInterceptor_rch get_reactor_interceptor() const
ACE_HANDLE get_handle(void) const
virtual ACE_HANDLE get_handle(void) const
RcHandle< ReactorInterceptor > ReactorInterceptor_rch
CommandPtr execute_or_enqueue(CommandPtr command)

◆ stop_i()

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i ( )
privatevirtual

Let the subclass stop.

Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 1007 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), ACE_IPC_SAP::get_handle(), ACE_Event_Handler::get_handle(), OpenDDS::DCPS::RtpsUdpDataLink::get_reactor_interceptor(), link_, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), ACE_Event_Handler::READ_MASK, and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().

1008 {
1010  ri->execute_or_enqueue(make_rch<RemoveHandler>(link_->unicast_socket().get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1011 #ifdef ACE_HAS_IPV6
1012  ri->execute_or_enqueue(make_rch<RemoveHandler>(link_->ipv6_unicast_socket().get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1013 #endif
1014 
1015  RtpsUdpInst_rch cfg = link_->config();
1016  if (cfg && cfg->use_multicast_) {
1017  ri->execute_or_enqueue(make_rch<RemoveHandler>(link_->multicast_socket().get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1018 #ifdef ACE_HAS_IPV6
1019  ri->execute_or_enqueue(make_rch<RemoveHandler>(link_->ipv6_multicast_socket().get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
1020 #endif
1021  }
1022 }
unsigned long ACE_Reactor_Mask
ReactorInterceptor_rch get_reactor_interceptor() const
ACE_SOCK_Dgram_Mcast & multicast_socket()
ACE_HANDLE get_handle(void) const
virtual ACE_HANDLE get_handle(void) const
RcHandle< ReactorInterceptor > ReactorInterceptor_rch
CommandPtr execute_or_enqueue(CommandPtr command)
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RtpsUdpInst_rch config() const

◆ withhold_data_from()

const ReceivedDataSample * OpenDDS::DCPS::RtpsUdpReceiveStrategy::withhold_data_from ( const GUID_t sub_id)

Prevent delivery of the currently in-progress data sample to the subscription sub_id. Returns pointer to the in-progress data so it can be stored for later delivery.

Definition at line 1077 of file RtpsUdpReceiveStrategy.cpp.

References readers_withheld_, and recvd_sample_.

1078 {
1079  readers_withheld_.insert(sub_id);
1080  return recvd_sample_;
1081 }

Member Data Documentation

◆ BUFFER_COUNT

const size_t OpenDDS::DCPS::RtpsUdpReceiveStrategy::BUFFER_COUNT = 1u
static

Definition at line 45 of file RtpsUdpReceiveStrategy.h.

◆ encoded_rtps_

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::encoded_rtps_
private

Definition at line 181 of file RtpsUdpReceiveStrategy.h.

Referenced by check_encoded(), and receive_bytes().

◆ encoded_submsg_

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::encoded_submsg_
private

Definition at line 181 of file RtpsUdpReceiveStrategy.h.

Referenced by check_encoded(), deliver_from_secure(), and deliver_sample().

◆ fragment_size_

ACE_UINT16 OpenDDS::DCPS::RtpsUdpReceiveStrategy::fragment_size_
private

Definition at line 140 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), and reassemble_i().

◆ frags_

FragmentRange OpenDDS::DCPS::RtpsUdpReceiveStrategy::frags_
private

Definition at line 141 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), and reassemble_i().

◆ last_received_

SequenceNumber OpenDDS::DCPS::RtpsUdpReceiveStrategy::last_received_
private

Definition at line 135 of file RtpsUdpReceiveStrategy.h.

◆ link_

RtpsUdpDataLink* OpenDDS::DCPS::RtpsUdpReceiveStrategy::link_
private

◆ message_

RTPS::Message OpenDDS::DCPS::RtpsUdpReceiveStrategy::message_
private

Definition at line 175 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), deliver_from_secure(), and deliver_sample().

◆ readers_selected_

RepoIdSet OpenDDS::DCPS::RtpsUdpReceiveStrategy::readers_selected_
private

Definition at line 138 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample_i(), and do_not_withhold_data_from().

◆ readers_withheld_

RepoIdSet OpenDDS::DCPS::RtpsUdpReceiveStrategy::readers_withheld_
private

Definition at line 138 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample_i(), and withhold_data_from().

◆ reassembly_

TransportReassembly OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassembly_
private

◆ receiver_

MessageReceiver OpenDDS::DCPS::RtpsUdpReceiveStrategy::receiver_
private

◆ recvd_sample_

const ReceivedDataSample* OpenDDS::DCPS::RtpsUdpReceiveStrategy::recvd_sample_
private

Definition at line 137 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample_i(), and withhold_data_from().

◆ remote_address_

ACE_INET_Addr OpenDDS::DCPS::RtpsUdpReceiveStrategy::remote_address_
private

Definition at line 174 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), and receive_bytes().

◆ secure_prefix_

RTPS::SecuritySubmessage OpenDDS::DCPS::RtpsUdpReceiveStrategy::secure_prefix_
private

◆ secure_sample_

ReceivedDataSample OpenDDS::DCPS::RtpsUdpReceiveStrategy::secure_sample_
private

◆ thread_status_manager_

ThreadStatusManager& OpenDDS::DCPS::RtpsUdpReceiveStrategy::thread_status_manager_
private

Definition at line 173 of file RtpsUdpReceiveStrategy.h.

Referenced by handle_input().

◆ total_frags_

ACE_UINT32 OpenDDS::DCPS::RtpsUdpReceiveStrategy::total_frags_
private

Definition at line 142 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), and reassemble_i().


The documentation for this class was generated from the following files: