OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Static Public Attributes | Protected Member Functions | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::RtpsUdpSendStrategy Class Reference

#include <RtpsUdpSendStrategy.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpSendStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpSendStrategy:
Collaboration graph
[legend]

Classes

struct  Chunk
 
struct  OverrideToken
 

Public Member Functions

 RtpsUdpSendStrategy (RtpsUdpDataLink *link, const GuidPrefix_t &local_prefix)
 
virtual void stop_i ()
 Let the subclass stop. More...
 
OverrideToken override_destinations (const NetworkAddress &destination)
 
OverrideToken override_destinations (const AddrSet &destinations)
 
void send_rtps_control (RTPS::Message &message, ACE_Message_Block &submessages, const NetworkAddress &destination)
 
void send_rtps_control (RTPS::Message &message, ACE_Message_Block &submessages, const AddrSet &destinations)
 
void append_submessages (const RTPS::SubmessageSeq &submessages)
 
void encode_payload (const GUID_t &pub_id, Message_Block_Ptr &payload, RTPS::SubmessageSeq &submessages)
 
virtual Security::SecurityConfig_rch security_config () const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
virtual ~TransportSendStrategy ()
 
void send_buffer (TransportSendBuffer *send_buffer)
 Assigns an optional send buffer. More...
 
int start ()
 
void stop ()
 
void send_start ()
 
void send (TransportQueueElement *element, bool relink=true)
 
void send_stop (GUID_t repoId)
 
RemoveResult remove_sample (const DataSampleElement *sample)
 
void remove_all_msgs (const GUID_t &pub_id)
 
virtual WorkOutcome perform_work ()
 
virtual void relink (bool do_suspend=true)
 
void suspend_send ()
 
void resume_send ()
 
void terminate_send (bool graceful_disconnecting=false)
 Remove all samples in the backpressure queue and packet queue. More...
 
virtual void terminate_send_if_suspended ()
 
virtual bool start_i ()
 Let the subclass start. More...
 
void link_released (bool flag)
 
bool isDirectMode ()
 
virtual ACE_HANDLE get_handle ()
 
void deliver_ack_request (TransportQueueElement *element)
 
bool fragmentation_helper (TransportQueueElement *original_element, TqeVector &elements_to_send)
 
void clear (SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
 
SendMode mode () const
 Access the current sending mode. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::ThreadSynchWorker
virtual ~ThreadSynchWorker ()
 
virtual void schedule_output ()
 Indicate that queued data is available to be sent. More...
 
std::size_t id () const
 DataLink reference value for diagnostics. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Static Public Attributes

static const size_t MaxCryptoHeaderSize = 20
 
static const size_t MaxCryptoFooterSize = 20
 
static const size_t MaxSecurePrefixSize = RTPS::SMHDR_SZ + MaxCryptoHeaderSize
 
static const size_t MaxSubmessagePadding = RTPS::SM_ALIGN - 1
 
static const size_t MaxSecureSuffixSize = RTPS::SMHDR_SZ + MaxCryptoFooterSize
 
static const size_t MaxSecureSubmessageLeadingSize = MaxSecurePrefixSize
 
static const size_t MaxSecureSubmessageFollowingSize
 
static const size_t MaxSecureSubmessageAdditionalSize
 
static const size_t MaxSecureFullMessageLeadingSize
 
static const size_t MaxSecureFullMessageFollowingSize = MaxSecureSuffixSize
 
static const size_t MaxSecureFullMessageAdditionalSize
 
- Static Public Attributes inherited from OpenDDS::DCPS::TransportSendStrategy
static const size_t UDP_MAX_MESSAGE_SIZE = 65466
 

Protected Member Functions

virtual ssize_t send_bytes_i (const iovec iov[], int n)
 
ssize_t send_bytes_i_helper (const iovec iov[], int n)
 
virtual size_t max_message_size () const
 
virtual void add_delayed_notification (TransportQueueElement *element)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
 TransportSendStrategy (std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
 
virtual ssize_t send_bytes (const iovec iov[], int n, int &bp)
 
virtual ssize_t non_blocking_send (const iovec iov[], int n, int &bp)
 
virtual void prepare_header_i ()
 Specific implementation processing of prepared packet header. More...
 
virtual void prepare_packet_i ()
 Specific implementation processing of prepared packet. More...
 
TransportQueueElementcurrent_packet_first_element () const
 
void set_graceful_disconnecting (bool flag)
 Set graceful disconnecting flag. More...
 
bool send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0)
 
virtual RemoveResult do_remove_sample (const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
 Implement framework chain visitations to remove a sample. More...
 
ThreadSynchsynch () const
 
void set_header_source (ACE_INT64 source)
 
- Protected Member Functions inherited from OpenDDS::DCPS::ThreadSynchWorker
 ThreadSynchWorker (std::size_t id=0)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Member Functions

bool marshal_transport_header (ACE_Message_Block *mb)
 
ssize_t send_multi_i (const iovec iov[], int n, const AddrSet &addrs)
 
const ACE_SOCK_Dgramchoose_send_socket (const NetworkAddress &addr) const
 
ssize_t send_single_i (const iovec iov[], int n, const NetworkAddress &addr)
 
ACE_Message_Blockpre_send_packet (const ACE_Message_Block *plain)
 
bool encode_writer_submessage (const GUID_t &sender, const GUID_t &receiver, OPENDDS_VECTOR(Chunk)&replacements, DDS::Security::CryptoTransform *crypto, const DDS::OctetSeq &plain, DDS::Security::DatawriterCryptoHandle sender_dwch, const char *submessage_start, CORBA::Octet msgId)
 
bool encode_reader_submessage (const GUID_t &sender, const GUID_t &receiver, OPENDDS_VECTOR(Chunk)&replacements, DDS::Security::CryptoTransform *crypto, const DDS::OctetSeq &plain, DDS::Security::DatareaderCryptoHandle sender_drch, const char *submessage_start, CORBA::Octet msgId)
 
ACE_Message_Blockencode_submessages (const ACE_Message_Block *plain, DDS::Security::CryptoTransform *crypto, bool &stateless_or_volatile)
 
ACE_Message_Blockencode_rtps_message (const ACE_Message_Block *plain, DDS::Security::CryptoTransform *crypto)
 
ACE_Message_Blockreplace_chunks (const ACE_Message_Block *plain, const OPENDDS_VECTOR(Chunk)&replacements)
 

Private Attributes

RtpsUdpDataLinklink_
 
const AddrSet * override_dest_
 
const NetworkAddressoverride_single_dest_
 
const size_t max_message_size_
 
RTPS::Message rtps_message_
 
ACE_Thread_Mutex rtps_message_mutex_
 
char rtps_header_data_ [RTPS::RTPSHDR_SZ]
 
ACE_Data_Block rtps_header_db_
 
ACE_Message_Block rtps_header_mb_
 
ACE_Thread_Mutex rtps_header_mb_lock_
 
AtomicBool network_is_unreachable_
 

Friends

struct OverrideToken
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::TransportSendStrategy
enum  SendMode {
  MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND,
  MODE_TERMINATED
}
 
typedef BasicQueue< TransportQueueElementQueueType
 
- Public Types inherited from OpenDDS::DCPS::ThreadSynchWorker
enum  WorkOutcome { WORK_OUTCOME_MORE_TO_DO, WORK_OUTCOME_NO_MORE_TO_DO, WORK_OUTCOME_CLOGGED_RESOURCE, WORK_OUTCOME_BROKEN_RESOURCE }
 
- Static Public Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
static int mb_to_iov (const ACE_Message_Block &msg, iovec *iov)
 

Detailed Description

Definition at line 30 of file RtpsUdpSendStrategy.h.

Constructor & Destructor Documentation

◆ RtpsUdpSendStrategy()

OpenDDS::DCPS::RtpsUdpSendStrategy::RtpsUdpSendStrategy ( RtpsUdpDataLink link,
const GuidPrefix_t local_prefix 
)

Definition at line 34 of file RtpsUdpSendStrategy.cpp.

References code, EADDRNOTAVAIL, ENETUNREACH, ENOBUFS, EPERM, OpenDDS::RTPS::Header::guidPrefix, OpenDDS::RTPS::Message::hdr, OpenDDS::RTPS::Header::prefix, OpenDDS::RTPS::PROTOCOL_RTPS, OpenDDS::RTPS::PROTOCOLVERSION, rtps_header_mb_, rtps_message_, OpenDDS::RTPS::Header::vendorId, OpenDDS::RTPS::VENDORID_OPENDDS, and OpenDDS::RTPS::Header::version.

36  : TransportSendStrategy(0, link->impl(),
37  0, // synch_resource
38  link->transport_priority(),
39  make_rch<NullSynchStrategy>()),
40  link_(link),
41  override_dest_(0),
43  max_message_size_(link->config()->max_message_size_),
48 {
52  std::memcpy(rtps_message_.hdr.guidPrefix, local_prefix,
53  sizeof(GuidPrefix_t));
54  Serializer writer(&rtps_header_mb_, encoding_unaligned_native);
55  // byte order doesn't matter for the RTPS Header
56  writer << rtps_message_.hdr;
57 }
const ACE_CDR::UShort RTPSHDR_SZ
Definition: MessageTypes.h:105
const ProtocolVersion_t PROTOCOLVERSION
Definition: MessageTypes.h:67
ProtocolVersion_t version
Definition: RtpsCore.idl:652
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
OctetArray4 prefix
Definition: RtpsCore.idl:651
const NetworkAddress * override_single_dest_
const ACE_CDR::Octet PROTOCOL_RTPS[]
Definition: MessageTypes.h:58
DCPS::GuidPrefix_t guidPrefix
Definition: RtpsCore.idl:654
TransportSendStrategy(std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
char rtps_header_data_[RTPS::RTPSHDR_SZ]
const VendorId_t VENDORID_OPENDDS
Definition: MessageTypes.h:26

Member Function Documentation

◆ add_delayed_notification()

void OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification ( TransportQueueElement element)
protectedvirtual

◆ append_submessages()

void OpenDDS::DCPS::RtpsUdpSendStrategy::append_submessages ( const RTPS::SubmessageSeq submessages)

Definition at line 241 of file RtpsUdpSendStrategy.cpp.

References ACE_GUARD, OpenDDS::DCPS::push_back(), rtps_message_, rtps_message_mutex_, and OpenDDS::RTPS::Message::submessages.

242 {
244  for (ACE_CDR::ULong idx = 0; idx != submessages.length(); ++idx) {
245  DCPS::push_back(rtps_message_.submessages, submessages[idx]);
246  }
247 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
SubmessageSeq submessages
Definition: RtpsCore.idl:897
ACE_UINT32 ULong
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138

◆ choose_send_socket()

const ACE_SOCK_Dgram & OpenDDS::DCPS::RtpsUdpSendStrategy::choose_send_socket ( const NetworkAddress addr) const
private

Definition at line 268 of file RtpsUdpSendStrategy.cpp.

References OpenDDS::DCPS::NetworkAddress::get_type(), link_, and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().

Referenced by send_single_i().

269 {
270 #ifdef ACE_HAS_IPV6
271  if (addr.get_type() == AF_INET6) {
272  return link_->ipv6_unicast_socket();
273  }
274 #endif
275  ACE_UNUSED_ARG(addr);
276  return link_->unicast_socket();
277 }

◆ encode_payload()

void OpenDDS::DCPS::RtpsUdpSendStrategy::encode_payload ( const GUID_t pub_id,
Message_Block_Ptr payload,
RTPS::SubmessageSeq submessages 
)

Definition at line 385 of file RtpsUdpSendStrategy.cpp.

References ACE_CDR_BYTE_ORDER, ACE_Message_Block::copy(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::FLAG_N_IN_DATA, OpenDDS::RTPS::FLAG_Q, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DDS::HANDLE_NIL, OpenDDS::DCPS::RtpsUdpDataLink::handle_registry(), OpenDDS::RTPS::DataSubmessage::inlineQos, OpenDDS::DCPS::Encoding::KIND_XCDR1, link_, LM_WARNING, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::RtpsUdpDataLink::security_config(), OpenDDS::RTPS::DataSubmessage::smHeader, and VDBG_LVL.

388 {
389  const DDS::Security::DatawriterCryptoHandle writer_crypto_handle =
390  link_->handle_registry()->get_local_datawriter_crypto_handle(pub_id);
391  DDS::Security::CryptoTransform_var crypto =
392  link_->security_config()->get_crypto_transform();
393 
394  if (writer_crypto_handle == DDS::HANDLE_NIL || !crypto) {
395  return;
396  }
397 
398  const DDS::OctetSeq plain = toSeq(payload.get());
399  DDS::OctetSeq encoded, iQos;
400  DDS::Security::SecurityException ex = {"", 0, 0};
401 
402  if (crypto->encode_serialized_payload(encoded, iQos, plain, writer_crypto_handle, ex)) {
403  if (encoded != plain) {
404  payload.reset(new ACE_Message_Block(encoded.length()));
405  const char* raw = reinterpret_cast<const char*>(encoded.get_buffer());
406  payload->copy(raw, encoded.length());
407 
408  // Set FLAG_N flag
409  for (CORBA::ULong i = 0; i < submessages.length(); ++i) {
410  if (submessages[i]._d() == RTPS::DATA) {
411  RTPS::DataSubmessage& data = submessages[i].data_sm();
412  data.smHeader.flags |= RTPS::FLAG_N_IN_DATA;
413  }
414  }
415  }
416 
417  const CORBA::ULong iQosLen = iQos.length();
418  if (iQosLen > 3) {
419  for (CORBA::ULong i = 0; i < submessages.length(); ++i) {
420  if (submessages[i]._d() == RTPS::DATA) {
421  // ParameterList must end in {1, 0, x, x} (LE) or {0, 1, x, x} (BE)
422  // Check for this sentinel and use it for endianness detection
423  if (iQos[iQosLen - 3] + iQos[iQosLen - 4] != 1) {
424  VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
425  "extra_inline_qos is not a valid ParameterList\n"), 2);
426  break;
427  }
428 
429  const bool swapPl = iQos[iQosLen - 4] != ACE_CDR_BYTE_ORDER;
430  const char* rawIQos = reinterpret_cast<const char*>(iQos.get_buffer());
431  ACE_Message_Block mbIQos(rawIQos, iQosLen);
432  Serializer ser(&mbIQos, Encoding::KIND_XCDR1, swapPl);
433 
434  RTPS::DataSubmessage& data = submessages[i].data_sm();
435  if (!(ser >> data.inlineQos)) { // appends to any existing inlineQos
436  VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
437  "extra_inline_qos deserialization failed\n"), 2);
438  break;
439  }
440  data.smHeader.flags |= RTPS::FLAG_Q;
441  break;
442  }
443  }
444  } else if (iQosLen) {
445  VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
446  "extra_inline_qos not enough bytes for ParameterList\n"), 2);
447  }
448  }
449 }
const octet FLAG_Q
Definition: RtpsCore.idl:519
NativeCryptoHandle DatawriterCryptoHandle
const InstanceHandle_t HANDLE_NIL
Security::HandleRegistry_rch handle_registry() const
#define ACE_CDR_BYTE_ORDER
ACE_CDR::ULong ULong
const octet FLAG_N_IN_DATA
Definition: RtpsCore.idl:529
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
#define VDBG_LVL(DBG_ARGS, LEVEL)
Security::SecurityConfig_rch security_config() const

◆ encode_reader_submessage()

bool OpenDDS::DCPS::RtpsUdpSendStrategy::encode_reader_submessage ( const GUID_t sender,
const GUID_t receiver,
OPENDDS_VECTOR(Chunk)&  replacements,
DDS::Security::CryptoTransform crypto,
const DDS::OctetSeq plain,
DDS::Security::DatareaderCryptoHandle  sender_drch,
const char *  submessage_start,
CORBA::Octet  msgId 
)
private

Definition at line 590 of file RtpsUdpSendStrategy.cpp.

References DDS::Security::CryptoTransform::encode_datareader_submessage(), OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::encoded_, OpenDDS::DCPS::GUID_UNKNOWN, DDS::HANDLE_NIL, OpenDDS::DCPS::RtpsUdpDataLink::handle_registry(), OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::length_, link_, and OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::start_.

Referenced by encode_submessages().

598 {
599  using namespace DDS::Security;
600 
601  if (sender_drch == DDS::HANDLE_NIL) {
602  return true;
603  }
604 
606  DatawriterCryptoHandleSeq writerHandles;
607  if (std::memcmp(&GUID_UNKNOWN, &receiver, sizeof receiver)) {
608  dwch = link_->handle_registry()->get_remote_datawriter_crypto_handle(receiver);
609  if (dwch != DDS::HANDLE_NIL) {
610  writerHandles.length(1);
611  writerHandles[0] = dwch;
612  }
613  }
614 
615  SecurityException ex = {"", 0, 0};
616  replacements.resize(replacements.size() + 1);
617  Chunk& c = replacements.back();
618  if (crypto->encode_datareader_submessage(c.encoded_, plain, sender_drch, writerHandles, ex)) {
619  if (c.encoded_ != plain) {
620  c.start_ = submessage_start;
621  c.length_ = plain.length();
622  } else {
623  replacements.pop_back();
624  }
625  } else {
626  log_encode_error(msgId, sender_drch, sender, dwch, receiver, ex);
627  replacements.pop_back();
628  return false;
629  }
630  return true;
631 }
sequence< DatawriterCryptoHandle > DatawriterCryptoHandleSeq
NativeCryptoHandle DatawriterCryptoHandle
boolean encode_datareader_submessage(inout OctetSeq encoded_rtps_submessage, in OctetSeq plain_rtps_submessage, in DatareaderCryptoHandle sending_datareader_crypto, in DatawriterCryptoHandleSeq receiving_datawriter_crypto_list, inout SecurityException ex)
const InstanceHandle_t HANDLE_NIL
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Security::HandleRegistry_rch handle_registry() const

◆ encode_rtps_message()

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpSendStrategy::encode_rtps_message ( const ACE_Message_Block plain,
DDS::Security::CryptoTransform crypto 
)
private

Definition at line 470 of file RtpsUdpSendStrategy.cpp.

References ACE_ERROR, DDS::Security::SecurityException::code, ACE_Message_Block::copy(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, ACE_Message_Block::duplicate(), DDS::Security::CryptoTransform::encode_rtps_message(), OpenDDS::DCPS::Serializer::encoding(), OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::RTPS::INFO_TS, link_, LM_ERROR, OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle(), DDS::Security::SecurityException::message, DDS::Security::SecurityException::minor_code, OpenDDS::RTPS::PAD, OpenDDS::DCPS::Serializer::read_octet_array(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::RTPS::SMHDR_SZ, ACE_Message_Block::space(), OpenDDS::RTPS::SubmessageHeader::submessageId, OpenDDS::RTPS::SubmessageHeader::submessageLength, OpenDDS::DCPS::Transport_debug_level, and ACE_Message_Block::wr_ptr().

Referenced by pre_send_packet().

471 {
472  using namespace DDS::Security;
473  DDS::OctetSeq encoded_rtps_message;
474  const DDS::OctetSeq plain_rtps_message = toSeq(plain);
475  const ParticipantCryptoHandle send_handle = link_->local_crypto_handle();
476  const ParticipantCryptoHandleSeq recv_handles; // unused
477  int idx = 0; // unused
478  SecurityException ex = {"", 0, 0};
479  if (crypto->encode_rtps_message(encoded_rtps_message, plain_rtps_message,
480  send_handle, recv_handles, idx, ex)) {
481  Message_Block_Ptr out(new ACE_Message_Block(encoded_rtps_message.length()));
482  const char* raw = reinterpret_cast<const char*>(encoded_rtps_message.get_buffer());
483  out->copy(raw, encoded_rtps_message.length());
484  return out.release();
485  }
486  if (ex.code == 0 && ex.minor_code == 0) {
487  return plain->duplicate(); // send original pre-encoded msg
488  }
489  if (Transport_debug_level) {
490  ACE_ERROR((LM_ERROR, "RtpsUdpSendStrategy::encode_rtps_message - ERROR "
491  "plugin failed to encode RTPS message from handle %d [%d.%d]: %C\n",
492  send_handle, ex.code, ex.minor_code, ex.message.in()));
493  }
494  return 0; // do not send pre-encoded msg
495 }
#define ACE_ERROR(X)
boolean encode_rtps_message(inout OctetSeq encoded_rtps_message, in OctetSeq plain_rtps_message, in ParticipantCryptoHandle sending_participant_crypto, in ParticipantCryptoHandleSeq receiving_participant_crypto_list, inout long receiving_participant_crypto_list_index, inout SecurityException ex)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
virtual ACE_Message_Block * duplicate(void) const
sequence< ParticipantCryptoHandle > ParticipantCryptoHandleSeq
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ encode_submessages()

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpSendStrategy::encode_submessages ( const ACE_Message_Block plain,
DDS::Security::CryptoTransform crypto,
bool &  stateless_or_volatile 
)
private

Definition at line 634 of file RtpsUdpSendStrategy.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::assign(), OpenDDS::RTPS::MessageParser::current(), OpenDDS::RTPS::DATA, OpenDDS::RTPS::DATA_FRAG, ACE_Message_Block::duplicate(), encode_reader_submessage(), encode_writer_submessage(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::GAP, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::RtpsUdpDataLink::handle_registry(), OpenDDS::RTPS::MessageParser::hasNextSubmessage(), OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::HEARTBEAT_FRAG, OpenDDS::RTPS::INFO_DST, link_, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::RTPS::NACK_FRAG, OpenDDS::DCPS::TransportSendStrategy::OPENDDS_VECTOR(), OpenDDS::RTPS::MessageParser::parseHeader(), OpenDDS::RTPS::MessageParser::parseSubmessageHeader(), OpenDDS::RTPS::MessageParser::remaining(), replace_chunks(), OpenDDS::RTPS::MessageParser::serializer(), OpenDDS::RTPS::MessageParser::skipToNextSubmessage(), OpenDDS::RTPS::MessageParser::submessageHeader(), and OpenDDS::RTPS::SubmessageHeader::submessageId.

Referenced by pre_send_packet().

637 {
638  // 'plain' contains a full RTPS Message on its way to the socket(s).
639  // Let the crypto plugin examine each submessage and replace it with an
640  // encoded version. First, parse through the message using the 'plain'
641  // message block chain. Instead of changing the messsage in place,
642  // modifications are stored in the 'replacements' which will end up
643  // changing the message when the 'out' message block is created in the
644  // helper method replace_chunks().
645  RTPS::MessageParser parser(*plain);
646  bool ok = parser.parseHeader();
647 
648  GUID_t sender = GUID_UNKNOWN;
649  assign(sender.guidPrefix, link_->local_prefix());
650 
651  GUID_t receiver = GUID_UNKNOWN;
652 
653  OPENDDS_VECTOR(Chunk) replacements;
654 
655  while (ok && parser.remaining()) {
656 
657  const char* const submessage_start = parser.current();
658 
659  if (!parser.parseSubmessageHeader()) {
660  ok = false;
661  break;
662  }
663 
664  const unsigned int remaining = static_cast<unsigned int>(parser.remaining());
665  const RTPS::SubmessageHeader smhdr = parser.submessageHeader();
666 
667  CORBA::ULong dataExtra = 0;
668 
669  switch (smhdr.submessageId) {
670  case RTPS::INFO_DST: {
671  GuidPrefix_t_forany guidPrefix(receiver.guidPrefix);
672  if (!(parser >> guidPrefix)) {
673  ok = false;
674  break;
675  }
676  break;
677  }
678  case RTPS::DATA:
679  case RTPS::DATA_FRAG:
680  if (!(parser >> dataExtra)) { // extraFlags|octetsToInlineQos
681  ok = false;
682  break;
683  }
684  // fall-through
685  case RTPS::HEARTBEAT:
686  case RTPS::GAP:
687  case RTPS::HEARTBEAT_FRAG: {
688  if (!(parser >> receiver.entityId)) { // readerId
689  ok = false;
690  break;
691  }
692  if (!(parser >> sender.entityId)) { // writerId
693  ok = false;
694  break;
695  }
696 
697  check_stateless_volatile(sender.entityId, stateless_or_volatile);
698  DDS::OctetSeq plainSm(toSeq(parser.serializer(), smhdr, dataExtra, receiver.entityId, sender.entityId, remaining));
699  if (!encode_writer_submessage(sender, receiver, replacements, crypto, plainSm,
700  link_->handle_registry()->get_local_datawriter_crypto_handle(sender), submessage_start, smhdr.submessageId)) {
701  ok = false;
702  }
703  break;
704  }
705  case RTPS::ACKNACK:
706  case RTPS::NACK_FRAG: {
707  if (!(parser >> sender.entityId)) { // readerId
708  ok = false;
709  break;
710  }
711  if (!(parser >> receiver.entityId)) { // writerId
712  ok = false;
713  break;
714  }
715 
716  check_stateless_volatile(receiver.entityId, stateless_or_volatile);
717  DDS::OctetSeq plainSm(toSeq(parser.serializer(), smhdr, 0, sender.entityId, receiver.entityId, remaining));
718  if (!encode_reader_submessage(sender, receiver, replacements, crypto, plainSm,
719  link_->handle_registry()->get_local_datareader_crypto_handle(sender), submessage_start, smhdr.submessageId)) {
720  ok = false;
721  }
722  break;
723  }
724  default:
725  break;
726  }
727 
728  if (!ok || !parser.hasNextSubmessage()) {
729  break;
730  }
731 
732  if (!parser.skipToNextSubmessage()) {
733  ok = false;
734  }
735  }
736 
737  if (!ok) {
738  return 0;
739  }
740 
741  if (replacements.empty()) {
742  return plain->duplicate();
743  }
744 
745  return replace_chunks(plain, replacements);
746 }
bool encode_reader_submessage(const GUID_t &sender, const GUID_t &receiver, OPENDDS_VECTOR(Chunk)&replacements, DDS::Security::CryptoTransform *crypto, const DDS::OctetSeq &plain, DDS::Security::DatareaderCryptoHandle sender_drch, const char *submessage_start, CORBA::Octet msgId)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Security::HandleRegistry_rch handle_registry() const
const GuidPrefix_t & local_prefix() const
ACE_Message_Block * replace_chunks(const ACE_Message_Block *plain, const OPENDDS_VECTOR(Chunk)&replacements)
ACE_CDR::ULong ULong
bool encode_writer_submessage(const GUID_t &sender, const GUID_t &receiver, OPENDDS_VECTOR(Chunk)&replacements, DDS::Security::CryptoTransform *crypto, const DDS::OctetSeq &plain, DDS::Security::DatawriterCryptoHandle sender_dwch, const char *submessage_start, CORBA::Octet msgId)
virtual ACE_Message_Block * duplicate(void) const
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64

◆ encode_writer_submessage()

bool OpenDDS::DCPS::RtpsUdpSendStrategy::encode_writer_submessage ( const GUID_t sender,
const GUID_t receiver,
OPENDDS_VECTOR(Chunk)&  replacements,
DDS::Security::CryptoTransform crypto,
const DDS::OctetSeq plain,
DDS::Security::DatawriterCryptoHandle  sender_dwch,
const char *  submessage_start,
CORBA::Octet  msgId 
)
private

Definition at line 545 of file RtpsUdpSendStrategy.cpp.

References DDS::Security::CryptoTransform::encode_datawriter_submessage(), OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::encoded_, OpenDDS::DCPS::GUID_UNKNOWN, DDS::HANDLE_NIL, OpenDDS::DCPS::RtpsUdpDataLink::handle_registry(), OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::length_, link_, and OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::start_.

Referenced by encode_submessages().

553 {
554  using namespace DDS::Security;
555 
556  if (sender_dwch == DDS::HANDLE_NIL) {
557  return true;
558  }
559 
561  DatareaderCryptoHandleSeq readerHandles;
562  if (std::memcmp(&GUID_UNKNOWN, &receiver, sizeof receiver)) {
563  drch = link_->handle_registry()->get_remote_datareader_crypto_handle(receiver);
564  if (drch != DDS::HANDLE_NIL) {
565  readerHandles.length(1);
566  readerHandles[0] = drch;
567  }
568  }
569 
570  CORBA::Long idx = 0;
571  SecurityException ex = {"", 0, 0};
572  replacements.resize(replacements.size() + 1);
573  Chunk& c = replacements.back();
574  if (crypto->encode_datawriter_submessage(c.encoded_, plain, sender_dwch, readerHandles, idx, ex)) {
575  if (c.encoded_ != plain) {
576  c.start_ = submessage_start;
577  c.length_ = plain.length();
578  } else {
579  replacements.pop_back();
580  }
581  } else {
582  log_encode_error(msgId, sender_dwch, sender, drch, receiver, ex);
583  replacements.pop_back();
584  return false;
585  }
586  return true;
587 }
ACE_CDR::Long Long
boolean encode_datawriter_submessage(inout OctetSeq encoded_rtps_submessage, in OctetSeq plain_rtps_submessage, in DatawriterCryptoHandle sending_datawriter_crypto, in DatareaderCryptoHandleSeq receiving_datareader_crypto_list, inout long receiving_datareader_crypto_list_index, inout SecurityException ex)
const InstanceHandle_t HANDLE_NIL
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Security::HandleRegistry_rch handle_registry() const
sequence< DatareaderCryptoHandle > DatareaderCryptoHandleSeq
NativeCryptoHandle DatareaderCryptoHandle

◆ marshal_transport_header()

bool OpenDDS::DCPS::RtpsUdpSendStrategy::marshal_transport_header ( ACE_Message_Block mb)
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 143 of file RtpsUdpSendStrategy.cpp.

References ACE_Message_Block::cont(), head_, OpenDDS::DCPS::TransportSendStrategy::lock_, rtps_header_data_, OpenDDS::RTPS::RTPSHDR_SZ, and OpenDDS::DCPS::Serializer::write_octet_array().

144 {
145  Serializer writer(mb, encoding_unaligned_native); // byte order doesn't matter for the RTPS Header
146  return writer.write_octet_array(reinterpret_cast<ACE_CDR::Octet*>(rtps_header_data_),
148 }
const ACE_CDR::UShort RTPSHDR_SZ
Definition: MessageTypes.h:105
char rtps_header_data_[RTPS::RTPSHDR_SZ]

◆ max_message_size()

size_t OpenDDS::DCPS::RtpsUdpSendStrategy::max_message_size ( void  ) const
protectedvirtual

The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 799 of file RtpsUdpSendStrategy.cpp.

References max_message_size_, MaxSecureFullMessageAdditionalSize, MaxSecureSubmessageAdditionalSize, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

800 {
801  // TODO: Make this conditional on if the message actually needs to do this.
802  return max_message_size_
803 #ifdef OPENDDS_SECURITY
804  // Worst case scenario is full message encryption plus one submessage encryption.
806 #endif
807  ;
808 }
static const size_t MaxSecureFullMessageAdditionalSize
static const size_t MaxSecureSubmessageAdditionalSize

◆ override_destinations() [1/2]

RtpsUdpSendStrategy::OverrideToken OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations ( const NetworkAddress destination)

Definition at line 123 of file RtpsUdpSendStrategy.cpp.

References override_single_dest_, and OverrideToken.

124 {
125  override_single_dest_ = &destination;
126  return OverrideToken(this);
127 }
const NetworkAddress * override_single_dest_

◆ override_destinations() [2/2]

RtpsUdpSendStrategy::OverrideToken OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations ( const AddrSet &  destinations)

Definition at line 130 of file RtpsUdpSendStrategy.cpp.

References override_dest_, and OverrideToken.

131 {
132  override_dest_ = &dest;
133  return OverrideToken(this);
134 }

◆ pre_send_packet()

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpSendStrategy::pre_send_packet ( const ACE_Message_Block m)
privatevirtual

Derived classes can override to transform the data right before it's sent. If the returned value is non-NULL it will be sent instead of sending the parameter. If the returned value is NULL the original message will be dropped.

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 452 of file RtpsUdpSendStrategy.cpp.

References ACE_Message_Block::duplicate(), encode_rtps_message(), encode_submessages(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), DDS::HANDLE_NIL, link_, OpenDDS::DCPS::RtpsUdpDataLink::local_crypto_handle(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), and OpenDDS::DCPS::RtpsUdpDataLink::security_config().

Referenced by send_rtps_control().

453 {
454  const DDS::Security::CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
455  if (!crypto) {
456  return plain->duplicate();
457  }
458 
459  bool stateless_or_volatile = false;
460  Message_Block_Ptr submessages(encode_submessages(plain, crypto, stateless_or_volatile));
461 
462  if (!submessages || stateless_or_volatile || link_->local_crypto_handle() == DDS::HANDLE_NIL) {
463  return submessages.release();
464  }
465 
466  return encode_rtps_message(submessages.get(), crypto);
467 }
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
const InstanceHandle_t HANDLE_NIL
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
Security::SecurityConfig_rch security_config() const
ACE_Message_Block * encode_submessages(const ACE_Message_Block *plain, DDS::Security::CryptoTransform *crypto, bool &stateless_or_volatile)
ACE_Message_Block * encode_rtps_message(const ACE_Message_Block *plain, DDS::Security::CryptoTransform *crypto)

◆ replace_chunks()

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpSendStrategy::replace_chunks ( const ACE_Message_Block plain,
const OPENDDS_VECTOR(Chunk)&  replacements 
)
private

Definition at line 749 of file RtpsUdpSendStrategy.cpp.

References ACE_Message_Block::copy(), ACE_Message_Block::duplicate(), OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::encoded_, OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::length_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::RtpsUdpSendStrategy::Chunk::start_, and ACE_Message_Block::total_length().

Referenced by encode_submessages().

751 {
752  unsigned int out_size = static_cast<unsigned int>(plain->total_length());
753  for (size_t i = 0; i < replacements.size(); ++i) {
754  out_size += replacements[i].encoded_.length();
755  out_size -= replacements[i].length_;
756  }
757 
758  Message_Block_Ptr in(plain->duplicate());
759  ACE_Message_Block* cur = in.get();
760  Message_Block_Ptr out(new ACE_Message_Block(out_size));
761  for (size_t i = 0; i < replacements.size(); ++i) {
762  const Chunk& c = replacements[i];
763  for (; cur && (c.start_ < cur->rd_ptr() || c.start_ >= cur->wr_ptr());
764  cur = cur->cont()) {
765  out->copy(cur->rd_ptr(), cur->length());
766  }
767  if (!cur) {
768  return 0;
769  }
770 
771  const size_t prefix = c.start_ - cur->rd_ptr();
772  out->copy(cur->rd_ptr(), prefix);
773  cur->rd_ptr(prefix);
774 
775  out->copy(reinterpret_cast<const char*>(c.encoded_.get_buffer()), c.encoded_.length());
776  for (size_t n = c.length_; n; cur = cur->cont()) {
777  if (cur->length() > n) {
778  cur->rd_ptr(n);
779  break;
780  } else {
781  n -= cur->length();
782  }
783  }
784  }
785 
786  for (; cur; cur = cur->cont()) {
787  out->copy(cur->rd_ptr(), cur->length());
788  }
789 
790  return out.release();
791 }
virtual ACE_Message_Block * duplicate(void) const
size_t total_length(void) const
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ security_config()

Security::SecurityConfig_rch OpenDDS::DCPS::RtpsUdpSendStrategy::security_config ( ) const
virtual

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 379 of file RtpsUdpSendStrategy.cpp.

References link_, and OpenDDS::DCPS::RtpsUdpDataLink::security_config().

Referenced by send_rtps_control().

380 {
381  return link_->security_config();
382 }
Security::SecurityConfig_rch security_config() const

◆ send_bytes_i()

ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i ( const iovec  iov[],
int  n 
)
protectedvirtual

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 67 of file RtpsUdpSendStrategy.cpp.

References send_bytes_i_helper().

68 {
69  ssize_t result = send_bytes_i_helper(iov, n);
70 
71  if (result == -1 && shouldWarn(errno)) {
72  // Make the framework think this was a successful send to avoid
73  // putting the send strategy in suspended mode. If reliability
74  // is enabled, the data may be resent later.
75  ssize_t b = 0;
76  for (int i = 0; i < n; ++i) {
77  b += iov[i].iov_len;
78  }
79  result = b;
80  }
81 
82  return result;
83 }
int ssize_t
ssize_t send_bytes_i_helper(const iovec iov[], int n)

◆ send_bytes_i_helper()

ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper ( const iovec  iov[],
int  n 
)
protected

Definition at line 86 of file RtpsUdpSendStrategy.cpp.

References OpenDDS::DCPS::TransportSendStrategy::current_packet_first_element(), ENOTCONN, OpenDDS::DCPS::RtpsUdpDataLink::get_addresses(), OpenDDS::DCPS::GUID_UNKNOWN, link_, override_dest_, override_single_dest_, OpenDDS::DCPS::TransportQueueElement::publication_id(), send_multi_i(), send_single_i(), and OpenDDS::DCPS::TransportQueueElement::subscription_id().

Referenced by send_bytes_i().

87 {
89  return send_single_i(iov, n, *override_single_dest_);
90  }
91 
92  if (override_dest_) {
93  return send_multi_i(iov, n, *override_dest_);
94  }
95 
96  // determine destination address(es) from TransportQueueElement in progress
97  TransportQueueElement* elem = current_packet_first_element();
98  if (!elem) {
99  errno = ENOTCONN;
100  return -1;
101  }
102 
103  AddrSet addrs;
104  if (elem->subscription_id() != GUID_UNKNOWN) {
105  addrs = link_->get_addresses(elem->publication_id(), elem->subscription_id());
106 
107  } else {
108  addrs = link_->get_addresses(elem->publication_id());
109  }
110 
111  if (addrs.empty()) {
112  ssize_t result = 0;
113  for (int i = 0; i < n; ++i) {
114  result += iov[i].iov_len;
115  }
116  return result;
117  }
118 
119  return send_multi_i(iov, n, addrs);
120 }
AddrSet get_addresses(const GUID_t &local, const GUID_t &remote) const
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
int ssize_t
const NetworkAddress * override_single_dest_
#define ENOTCONN
ssize_t send_multi_i(const iovec iov[], int n, const AddrSet &addrs)
TransportQueueElement * current_packet_first_element() const
ssize_t send_single_i(const iovec iov[], int n, const NetworkAddress &addr)

◆ send_multi_i()

ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_multi_i ( const iovec  iov[],
int  n,
const AddrSet &  addrs 
)
private

Definition at line 250 of file RtpsUdpSendStrategy.cpp.

References send_single_i().

Referenced by send_bytes_i_helper(), and send_rtps_control().

252 {
253  ssize_t result = -1;
254  typedef AddrSet::const_iterator iter_t;
255  for (iter_t iter = addrs.begin(); iter != addrs.end(); ++iter) {
256  if (!*iter) {
257  continue;
258  }
259  const ssize_t result_per_dest = send_single_i(iov, n, *iter);
260  if (result_per_dest >= 0) {
261  result = result_per_dest;
262  }
263  }
264  return result;
265 }
int ssize_t
ssize_t send_single_i(const iovec iov[], int n, const NetworkAddress &addr)

◆ send_rtps_control() [1/2]

void OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control ( RTPS::Message message,
ACE_Message_Block submessages,
const NetworkAddress destination 
)

Definition at line 161 of file RtpsUdpSendStrategy.cpp.

References ACE_ERROR, ACE_GUARD, OpenDDS::RTPS::Message::hdr, link_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::MAX_SEND_BLOCKS, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), network_is_unreachable_, pre_send_packet(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), rtps_header_mb_, rtps_header_mb_lock_, rtps_message_, rtps_message_mutex_, security_config(), OpenDDS::DCPS::RtpsUdpDataLink::security_config(), send_single_i(), and VDBG.

164 {
165  {
167  message.hdr = rtps_message_.hdr;
168  }
169 
170  const AMB_Continuation cont(rtps_header_mb_lock_, rtps_header_mb_, submessages);
171 
172 #ifdef OPENDDS_SECURITY
173  Message_Block_Ptr alternate;
174  if (security_config()) {
175  const DDS::Security::CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
176  if (crypto) {
178  if (!alternate) {
179  VDBG((LM_DEBUG, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control () - "
180  "pre_send_packet returned NULL, dropping.\n"));
181  return;
182  }
183  }
184  }
185  ACE_Message_Block& use_mb = alternate ? *alternate : rtps_header_mb_;
186 #else
188 #endif
189 
190  iovec iov[MAX_SEND_BLOCKS];
191  const int num_blocks = mb_to_iov(use_mb, iov);
192  const ssize_t result = send_single_i(iov, num_blocks, addr);
193  if (result < 0 && !network_is_unreachable_) {
194  const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
195  ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
196  "failed to send RTPS control message\n"));
197  }
198 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual Security::SecurityConfig_rch security_config() const
int ssize_t
ACE_Message_Block * pre_send_packet(const ACE_Message_Block *plain)
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
#define VDBG(DBG_ARGS)
ACE_Log_Priority
LM_WARNING
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
Security::SecurityConfig_rch security_config() const
LM_ERROR
ssize_t send_single_i(const iovec iov[], int n, const NetworkAddress &addr)

◆ send_rtps_control() [2/2]

void OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control ( RTPS::Message message,
ACE_Message_Block submessages,
const AddrSet &  destinations 
)

Definition at line 201 of file RtpsUdpSendStrategy.cpp.

References ACE_ERROR, ACE_GUARD, OpenDDS::RTPS::Message::hdr, link_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::MAX_SEND_BLOCKS, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), network_is_unreachable_, pre_send_packet(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), rtps_header_mb_, rtps_header_mb_lock_, rtps_message_, rtps_message_mutex_, security_config(), OpenDDS::DCPS::RtpsUdpDataLink::security_config(), send_multi_i(), and VDBG.

204 {
205  {
207  message.hdr = rtps_message_.hdr;
208  }
209 
210  const AMB_Continuation cont(rtps_header_mb_lock_, rtps_header_mb_, submessages);
211 
212 #ifdef OPENDDS_SECURITY
213  Message_Block_Ptr alternate;
214  if (security_config()) {
215  const DDS::Security::CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
216  if (crypto) {
218  if (!alternate) {
219  VDBG((LM_DEBUG, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control () - "
220  "pre_send_packet returned NULL, dropping.\n"));
221  return;
222  }
223  }
224  }
225  ACE_Message_Block& use_mb = alternate ? *alternate : rtps_header_mb_;
226 #else
228 #endif
229 
230  iovec iov[MAX_SEND_BLOCKS];
231  const int num_blocks = mb_to_iov(use_mb, iov);
232  const ssize_t result = send_multi_i(iov, num_blocks, addrs);
233  if (result < 0 && !network_is_unreachable_) {
234  const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
235  ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
236  "failed to send RTPS control message\n"));
237  }
238 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual Security::SecurityConfig_rch security_config() const
int ssize_t
ACE_Message_Block * pre_send_packet(const ACE_Message_Block *plain)
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
#define VDBG(DBG_ARGS)
ACE_Log_Priority
LM_WARNING
ssize_t send_multi_i(const iovec iov[], int n, const AddrSet &addrs)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
Security::SecurityConfig_rch security_config() const
LM_ERROR

◆ send_single_i()

ssize_t OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i ( const iovec  iov[],
int  n,
const NetworkAddress addr 
)
private

Definition at line 280 of file RtpsUdpSendStrategy.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, choose_send_socket(), EMSGSIZE, ENETUNREACH, link_, LM_ERROR, LM_WARNING, OpenDDS::DCPS::MCK_RTPS, network_is_unreachable_, OPENDDS_ASSERT, ACE_SOCK_Dgram::send(), socket(), OpenDDS::DCPS::NetworkAddress::to_addr(), OpenDDS::DCPS::RtpsUdpDataLink::transport(), and OpenDDS::DCPS::TransportSendStrategy::UDP_MAX_MESSAGE_SIZE.

Referenced by send_bytes_i_helper(), send_multi_i(), and send_rtps_control().

282 {
283  OPENDDS_ASSERT(addr);
284 
286 
287  RtpsUdpTransport_rch transport = link_->transport();
288  if (!transport) {
289  return 0;
290  }
291 
292  RtpsUdpInst_rch cfg = transport->config();
293  if (!cfg) {
294  return 0;
295  }
296 
297 #ifdef OPENDDS_TESTING_FEATURES
298  ssize_t total_length;
299  if (cfg->should_drop(iov, n, total_length)) {
300  return total_length;
301  }
302 #endif
303 
304 #ifdef ACE_LACKS_SENDMSG
305  char buffer[UDP_MAX_MESSAGE_SIZE];
306  char *iter = buffer;
307  for (int i = 0; i < n; ++i) {
308  if (size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) {
309  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
310  "message too large at index %d size %d\n", i, iov[i].iov_len));
311  return -1;
312  }
313  std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
314  iter += iov[i].iov_len;
315  }
316  const ssize_t result = socket.send(buffer, iter - buffer, addr.to_addr());
317 #else
318  const ssize_t result = socket.send(iov, n, addr.to_addr());
319 #endif
320  if (result < 0) {
321  if (cfg->count_messages()) {
322  const InternalMessageCountKey key(addr, MCK_RTPS, addr == NetworkAddress(cfg->rtps_relay_address()));
323  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, transport->transport_statistics_mutex_, -1);
324  transport->transport_statistics_.message_count[key].send_fail(result);
325  }
326  const int err = errno;
327  if (err != ENETUNREACH || !network_is_unreachable_) {
328  errno = err;
329  const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
330  ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
331  "destination %C failed send: %m\n", DCPS::LogAddr(addr).c_str()));
332  if (errno == EMSGSIZE) {
333  for (int i = 0; i < n; ++i) {
334  ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_single_i: "
335  "iovec[%d].iov_len = %B\n", i, size_t(iov[i].iov_len)));
336  }
337  }
338  }
339  if (err == ENETUNREACH) {
341  }
342  // Reset errno since the rest of framework expects it.
343  errno = err;
344  } else {
345  if (cfg->count_messages()) {
346  const InternalMessageCountKey key(addr, MCK_RTPS, addr == NetworkAddress(cfg->rtps_relay_address()));
347  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, transport->transport_statistics_mutex_, -1);
348  transport->transport_statistics_.message_count[key].send(result);
349  }
350  network_is_unreachable_ = false;
351  }
352  return result;
353 }
#define ACE_ERROR(X)
RtpsUdpTransport_rch transport()
sequence< octet > key
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
int ssize_t
ACE_HANDLE socket(int protocol_family, int type, int proto)
RcHandle< RtpsUdpTransport > RtpsUdpTransport_rch
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Log_Priority
LM_WARNING
const ACE_SOCK_Dgram & choose_send_socket(const NetworkAddress &addr) const
const MessageCountKind MCK_RTPS
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
LM_ERROR
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ stop_i()

void OpenDDS::DCPS::RtpsUdpSendStrategy::stop_i ( )
virtual

Let the subclass stop.

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 795 of file RtpsUdpSendStrategy.cpp.

796 {
797 }

Friends And Related Function Documentation

◆ OverrideToken

friend struct OverrideToken
friend

Definition at line 43 of file RtpsUdpSendStrategy.h.

Referenced by override_destinations().

Member Data Documentation

◆ link_

RtpsUdpDataLink* OpenDDS::DCPS::RtpsUdpSendStrategy::link_
private

◆ max_message_size_

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::max_message_size_
private

Definition at line 139 of file RtpsUdpSendStrategy.h.

Referenced by max_message_size().

◆ MaxCryptoFooterSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxCryptoFooterSize = 20
static

Definition at line 64 of file RtpsUdpSendStrategy.h.

◆ MaxCryptoHeaderSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxCryptoHeaderSize = 20
static

Definition at line 63 of file RtpsUdpSendStrategy.h.

◆ MaxSecureFullMessageAdditionalSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSecureFullMessageAdditionalSize
static

◆ MaxSecureFullMessageFollowingSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSecureFullMessageFollowingSize = MaxSecureSuffixSize
static

Definition at line 75 of file RtpsUdpSendStrategy.h.

◆ MaxSecureFullMessageLeadingSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSecureFullMessageLeadingSize
static
Initial value:

Definition at line 73 of file RtpsUdpSendStrategy.h.

◆ MaxSecurePrefixSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSecurePrefixSize = RTPS::SMHDR_SZ + MaxCryptoHeaderSize
static

Definition at line 65 of file RtpsUdpSendStrategy.h.

◆ MaxSecureSubmessageAdditionalSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSecureSubmessageAdditionalSize
static

◆ MaxSecureSubmessageFollowingSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSecureSubmessageFollowingSize
static
Initial value:

Definition at line 69 of file RtpsUdpSendStrategy.h.

◆ MaxSecureSubmessageLeadingSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSecureSubmessageLeadingSize = MaxSecurePrefixSize
static

Definition at line 68 of file RtpsUdpSendStrategy.h.

◆ MaxSecureSuffixSize

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSecureSuffixSize = RTPS::SMHDR_SZ + MaxCryptoFooterSize
static

Definition at line 67 of file RtpsUdpSendStrategy.h.

◆ MaxSubmessagePadding

const size_t OpenDDS::DCPS::RtpsUdpSendStrategy::MaxSubmessagePadding = RTPS::SM_ALIGN - 1
static

Definition at line 66 of file RtpsUdpSendStrategy.h.

◆ network_is_unreachable_

AtomicBool OpenDDS::DCPS::RtpsUdpSendStrategy::network_is_unreachable_
private

Definition at line 146 of file RtpsUdpSendStrategy.h.

Referenced by send_rtps_control(), and send_single_i().

◆ override_dest_

const AddrSet* OpenDDS::DCPS::RtpsUdpSendStrategy::override_dest_
private

Definition at line 136 of file RtpsUdpSendStrategy.h.

Referenced by override_destinations(), and send_bytes_i_helper().

◆ override_single_dest_

const NetworkAddress* OpenDDS::DCPS::RtpsUdpSendStrategy::override_single_dest_
private

Definition at line 137 of file RtpsUdpSendStrategy.h.

Referenced by override_destinations(), and send_bytes_i_helper().

◆ rtps_header_data_

char OpenDDS::DCPS::RtpsUdpSendStrategy::rtps_header_data_[RTPS::RTPSHDR_SZ]
private

Definition at line 142 of file RtpsUdpSendStrategy.h.

Referenced by marshal_transport_header().

◆ rtps_header_db_

ACE_Data_Block OpenDDS::DCPS::RtpsUdpSendStrategy::rtps_header_db_
private

Definition at line 143 of file RtpsUdpSendStrategy.h.

◆ rtps_header_mb_

ACE_Message_Block OpenDDS::DCPS::RtpsUdpSendStrategy::rtps_header_mb_
private

Definition at line 144 of file RtpsUdpSendStrategy.h.

Referenced by RtpsUdpSendStrategy(), and send_rtps_control().

◆ rtps_header_mb_lock_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpSendStrategy::rtps_header_mb_lock_
private

Definition at line 145 of file RtpsUdpSendStrategy.h.

Referenced by send_rtps_control().

◆ rtps_message_

RTPS::Message OpenDDS::DCPS::RtpsUdpSendStrategy::rtps_message_
private

◆ rtps_message_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpSendStrategy::rtps_message_mutex_
private

Definition at line 141 of file RtpsUdpSendStrategy.h.

Referenced by append_submessages(), and send_rtps_control().


The documentation for this class was generated from the following files: