13 #include <dds/DdsDcpsGuidTypeSupportImpl.h> 18 #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h> 38 link->transport_priority(),
42 override_single_dest_(0),
43 max_message_size_(link->config()->max_message_size_),
47 network_is_unreachable_(false)
60 bool shouldWarn(
int code) {
61 return code ==
EPERM || code == EACCES || code == EINTR || code ==
ENOBUFS 71 if (result == -1 && shouldWarn(errno)) {
76 for (
int i = 0; i < n; ++i) {
113 for (
int i = 0; i < n; ++i) {
114 result += iov[i].iov_len;
138 outer_->override_single_dest_ = 0;
139 outer_->override_dest_ = 0;
145 Serializer writer(mb, encoding_unaligned_native);
151 struct AMB_Continuation {
172 #ifdef OPENDDS_SECURITY 175 const DDS::Security::CryptoTransform_var crypto =
link_->
security_config()->get_crypto_transform();
179 VDBG((
LM_DEBUG,
"(%P|%t) RtpsUdpSendStrategy::send_rtps_control () - " 180 "pre_send_packet returned NULL, dropping.\n"));
191 const int num_blocks =
mb_to_iov(use_mb, iov);
195 ACE_ERROR((prio,
"(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - " 196 "failed to send RTPS control message\n"));
203 const AddrSet& addrs)
212 #ifdef OPENDDS_SECURITY 215 const DDS::Security::CryptoTransform_var crypto =
link_->
security_config()->get_crypto_transform();
219 VDBG((
LM_DEBUG,
"(%P|%t) RtpsUdpSendStrategy::send_rtps_control () - " 220 "pre_send_packet returned NULL, dropping.\n"));
231 const int num_blocks =
mb_to_iov(use_mb, iov);
235 ACE_ERROR((prio,
"(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - " 236 "failed to send RTPS control message\n"));
244 for (
ACE_CDR::ULong idx = 0; idx != submessages.length(); ++idx) {
251 const AddrSet& addrs)
254 typedef AddrSet::const_iterator iter_t;
255 for (iter_t iter = addrs.begin(); iter != addrs.end(); ++iter) {
260 if (result_per_dest >= 0) {
261 result = result_per_dest;
272 return link_->ipv6_unicast_socket();
275 ACE_UNUSED_ARG(addr);
297 #ifdef OPENDDS_TESTING_FEATURES 299 if (cfg->should_drop(iov, n, total_length)) {
304 #ifdef ACE_LACKS_SENDMSG 307 for (
int i = 0; i < n; ++i) {
310 "message too large at index %d size %d\n", i, iov[i].iov_len));
313 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
314 iter += iov[i].iov_len;
321 if (cfg->count_messages()) {
324 transport->transport_statistics_.message_count[key].send_fail(result);
326 const int err = errno;
330 ACE_ERROR((prio,
"(%P|%t) RtpsUdpSendStrategy::send_single_i() - " 331 "destination %C failed send: %m\n",
DCPS::LogAddr(addr).c_str()));
333 for (
int i = 0; i < n; ++i) {
334 ACE_ERROR((prio,
"(%P|%t) RtpsUdpSendStrategy::send_single_i: " 335 "iovec[%d].iov_len = %B\n", i,
size_t(iov[i].iov_len)));
345 if (cfg->count_messages()) {
348 transport->transport_statistics_.message_count[key].send(result);
363 #ifdef OPENDDS_SECURITY 368 out.length(static_cast<unsigned int>(mb->
total_length()));
369 unsigned char*
const buffer = out.get_buffer();
370 for (
unsigned int i = 0; mb; mb = mb->
cont()) {
372 i +=
static_cast<unsigned int>(mb->
length());
391 DDS::Security::CryptoTransform_var crypto =
402 if (crypto->encode_serialized_payload(encoded, iQos, plain, writer_crypto_handle, ex)) {
403 if (encoded != plain) {
405 const char* raw =
reinterpret_cast<const char*
>(encoded.get_buffer());
406 payload->
copy(raw, encoded.length());
409 for (
CORBA::ULong i = 0; i < submessages.length(); ++i) {
419 for (
CORBA::ULong i = 0; i < submessages.length(); ++i) {
423 if (iQos[iQosLen - 3] + iQos[iQosLen - 4] != 1) {
425 "extra_inline_qos is not a valid ParameterList\n"), 2);
430 const char* rawIQos =
reinterpret_cast<const char*
>(iQos.get_buffer());
437 "extra_inline_qos deserialization failed\n"), 2);
444 }
else if (iQosLen) {
446 "extra_inline_qos not enough bytes for ParameterList\n"), 2);
454 const DDS::Security::CryptoTransform_var crypto =
link_->
security_config()->get_crypto_transform();
459 bool stateless_or_volatile =
false;
480 send_handle, recv_handles, idx, ex)) {
482 const char* raw =
reinterpret_cast<const char*
>(encoded_rtps_message.get_buffer());
483 out->
copy(raw, encoded_rtps_message.length());
491 "plugin failed to encode RTPS message from handle %d [%d.%d]: %C\n",
510 ser2 << ACE_OutputCDR::from_octet(smHdr.
flags);
518 static_cast<unsigned int>(mb.
space()));
524 const GUID_t& sender_guid,
526 const GUID_t& receiver_guid,
531 "plugin failed to encode submessage 0x%x from handle %d (%C) to %d (%C) [%d.%d]: %C\n",
536 void check_stateless_volatile(
EntityId_t writerId,
bool& stateless_or_volatile)
538 stateless_or_volatile |=
551 const char* submessage_start,
562 if (std::memcmp(&
GUID_UNKNOWN, &receiver,
sizeof receiver)) {
565 readerHandles.length(1);
566 readerHandles[0] = drch;
572 replacements.resize(replacements.size() + 1);
573 Chunk& c = replacements.back();
576 c.
start_ = submessage_start;
579 replacements.pop_back();
582 log_encode_error(msgId, sender_dwch, sender, drch, receiver, ex);
583 replacements.pop_back();
596 const char* submessage_start,
607 if (std::memcmp(&
GUID_UNKNOWN, &receiver,
sizeof receiver)) {
610 writerHandles.length(1);
611 writerHandles[0] = dwch;
616 replacements.resize(replacements.size() + 1);
617 Chunk& c = replacements.back();
620 c.
start_ = submessage_start;
623 replacements.pop_back();
626 log_encode_error(msgId, sender_drch, sender, dwch, receiver, ex);
627 replacements.pop_back();
636 bool& stateless_or_volatile)
657 const char*
const submessage_start = parser.
current();
664 const unsigned int remaining =
static_cast<unsigned int>(parser.
remaining());
671 GuidPrefix_t_forany guidPrefix(receiver.guidPrefix);
672 if (!(parser >> guidPrefix)) {
680 if (!(parser >> dataExtra)) {
688 if (!(parser >> receiver.entityId)) {
697 check_stateless_volatile(sender.
entityId, stateless_or_volatile);
711 if (!(parser >> receiver.entityId)) {
716 check_stateless_volatile(receiver.entityId, stateless_or_volatile);
741 if (replacements.empty()) {
752 unsigned int out_size =
static_cast<unsigned int>(plain->
total_length());
753 for (
size_t i = 0; i < replacements.size(); ++i) {
754 out_size += replacements[i].
encoded_.length();
755 out_size -= replacements[i].
length_;
761 for (
size_t i = 0; i < replacements.size(); ++i) {
762 const Chunk& c = replacements[i];
763 for (; cur && (c.
start_ < cur->rd_ptr() || c.
start_ >= cur->wr_ptr());
765 out->
copy(cur->rd_ptr(), cur->length());
771 const size_t prefix = c.
start_ - cur->rd_ptr();
772 out->
copy(cur->rd_ptr(), prefix);
776 for (
size_t n = c.
length_; n; cur = cur->cont()) {
777 if (cur->length() > n) {
786 for (; cur; cur = cur->cont()) {
787 out->
copy(cur->rd_ptr(), cur->length());
803 #ifdef OPENDDS_SECURITY sequence< Submessage > SubmessageSeq
RtpsUdpSendStrategy(RtpsUdpDataLink *link, const GuidPrefix_t &local_prefix)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_INT16 get_type() const
void send_rtps_control(RTPS::Message &message, ACE_Message_Block &submessages, const NetworkAddress &destination)
const size_t max_message_size_
static const size_t MaxSecureFullMessageAdditionalSize
virtual size_t max_message_size() const
const InstanceHandle_t HANDLE_NIL
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
const ACE_CDR::UShort RTPSHDR_SZ
SubmessageHeader submessageHeader() const
ACE_INET_Addr to_addr() const
size_t length(void) const
bool hasNextSubmessage() const
SubmessageHeader smHeader
sequence< DatareaderCryptoHandle > DatareaderCryptoHandleSeq
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
RtpsUdpTransport_rch transport()
ACE_Message_Block & head_
static const size_t MaxSecureSubmessageAdditionalSize
const VendorId_t VENDORID_OPENDDS
const ACE_CDR::Octet PROTOCOL_RTPS[]
Security::HandleRegistry_rch handle_registry() const
#define OPENDDS_ASSERT(C)
sequence< DatawriterCryptoHandle > DatawriterCryptoHandleSeq
ACE_Thread_Mutex rtps_message_mutex_
friend struct OverrideToken
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused as reported by Andy Elvey and Dan Kosecki *resynced with Christopher Diggins s branch as it exists in tree building code is back Christopher Diggins *resynced codebase with Chris s branch *removed tree building code
key GuidPrefix_t guidPrefix
#define ACE_CDR_BYTE_ORDER
const octet FLAG_N_IN_DATA
bool read_octet_array(ACE_CDR::Octet *x, ACE_CDR::ULong length)
const ProtocolVersion_t PROTOCOLVERSION
ACE_Message_Block * encode_submessages(const ACE_Message_Block *plain, DDS::Security::CryptoTransform *crypto, bool &stateless_or_volatile)
bool parseSubmessageHeader()
bool encode_writer_submessage(const GUID_t &sender, const GUID_t &receiver, OPENDDS_VECTOR(Chunk)&replacements, DDS::Security::CryptoTransform *crypto, const DDS::OctetSeq &plain, DDS::Security::DatawriterCryptoHandle sender_dwch, const char *submessage_start, CORBA::Octet msgId)
char * rd_ptr(void) const
ssize_t send_bytes_i_helper(const iovec iov[], int n)
const ACE_CDR::UShort SMHDR_SZ
ACE_HANDLE socket(int protocol_family, int type, int proto)
ACE_Message_Block * replace_chunks(const ACE_Message_Block *plain, const OPENDDS_VECTOR(Chunk)&replacements)
bool write_octet_array(const ACE_CDR::Octet *x, ACE_CDR::ULong length)
const GuidPrefix_t & local_prefix() const
int copy(const char *buf, size_t n)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
ProtocolVersion_t version
const ACE_SOCK_Dgram & choose_send_socket(const NetworkAddress &addr) const
NativeCryptoHandle DatawriterCryptoHandle
bool marshal_transport_header(ACE_Message_Block *mb)
DCPS::GuidPrefix_t guidPrefix
void append_submessages(const RTPS::SubmessageSeq &submessages)
virtual void add_delayed_notification(TransportQueueElement *element)
Class to serialize and deserialize data for DDS.
unsigned short submessageLength
long ParticipantCryptoHandle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
NativeCryptoHandle DatareaderCryptoHandle
AtomicBool network_is_unreachable_
virtual void stop_i()
Let the subclass stop.
virtual void add_delayed_notification(TransportQueueElement *element)
Security::SecurityConfig_rch security_config() const
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
ACE_Message_Block * encode_rtps_message(const ACE_Message_Block *plain, DDS::Security::CryptoTransform *crypto)
size_t total_length(void) const
ACE_Message_Block * pre_send_packet(const ACE_Message_Block *plain)
char * wr_ptr(void) const
sequence< octet > OctetSeq
static const size_t UDP_MAX_MESSAGE_SIZE
void encode_payload(const GUID_t &pub_id, Message_Block_Ptr &payload, RTPS::SubmessageSeq &submessages)
AddrSet get_addresses(const GUID_t &local, const GUID_t &remote) const
ACE_Thread_Mutex rtps_header_mb_lock_
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER
ssize_t send_multi_i(const iovec iov[], int n, const AddrSet &addrs)
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
char rtps_header_data_[RTPS::RTPSHDR_SZ]
const AddrSet * override_dest_
ACE_SOCK_Dgram & unicast_socket()
const NetworkAddress * override_single_dest_
TransportQueueElement * current_packet_first_element() const
ACE_Message_Block rtps_header_mb_
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
virtual GUID_t subscription_id() const
Accessor for the subscription id, if sent the sample is sent to 1 sub.
bool skipToNextSubmessage()
virtual Security::SecurityConfig_rch security_config() const
OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_
const Encoding & encoding() const
sequence< ParticipantCryptoHandle > ParticipantCryptoHandleSeq
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
bool encode_reader_submessage(const GUID_t &sender, const GUID_t &receiver, OPENDDS_VECTOR(Chunk)&replacements, DDS::Security::CryptoTransform *crypto, const DDS::OctetSeq &plain, DDS::Security::DatareaderCryptoHandle sender_drch, const char *submessage_start, CORBA::Octet msgId)
const MessageCountKind MCK_RTPS
virtual GUID_t publication_id() const =0
Accessor for the publication id that sent the sample.
const char * current() const
void assign(EntityId_t &dest, const EntityId_t &src)
boolean encode_datawriter_submessage(inout OctetSeq encoded_rtps_submessage, in OctetSeq plain_rtps_submessage, in DatawriterCryptoHandle sending_datawriter_crypto, in DatareaderCryptoHandleSeq receiving_datareader_crypto_list, inout long receiving_datareader_crypto_list_index, inout SecurityException ex)
sequence<<%SCOPED%><%TYPE%><%SEQ%> local interface<%TYPE%> out string encoded
RTPS::Message rtps_message_
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
OverrideToken override_destinations(const NetworkAddress &destination)
bool add_delayed_notification(TransportQueueElement *element)
DCPS::Serializer & serializer()
The Internal API and Implementation of OpenDDS.
boolean encode_rtps_message(inout OctetSeq encoded_rtps_message, in OctetSeq plain_rtps_message, in ParticipantCryptoHandle sending_participant_crypto, in ParticipantCryptoHandleSeq receiving_participant_crypto_list, inout long receiving_participant_crypto_list_index, inout SecurityException ex)
boolean encode_datareader_submessage(inout OctetSeq encoded_rtps_submessage, in OctetSeq plain_rtps_submessage, in DatareaderCryptoHandle sending_datareader_crypto, in DatawriterCryptoHandleSeq receiving_datawriter_crypto_list, inout SecurityException ex)
SubmessageSeq submessages
Base wrapper class around a data/control sample to be sent.
virtual ssize_t send_bytes_i(const iovec iov[], int n)
ssize_t send_single_i(const iovec iov[], int n, const NetworkAddress &addr)