RtpsUdpSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsUdpSendStrategy.h"
00009 #include "RtpsUdpDataLink.h"
00010 #include "RtpsUdpInst.h"
00011 
00012 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00013 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00014 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00015 
00016 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00017 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00018 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00019 
00020 #include "dds/DCPS/Serializer.h"
00021 
00022 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00023 
00024 #include <cstring>
00025 
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 namespace OpenDDS {
00029 namespace DCPS {
00030 
00031 RtpsUdpSendStrategy::RtpsUdpSendStrategy(RtpsUdpDataLink* link,
00032                                          const GuidPrefix_t& local_prefix)
00033   : TransportSendStrategy(0, link->impl(),
00034                           0,  // synch_resource
00035                           link->transport_priority(),
00036                           make_rch<NullSynchStrategy>()),
00037     link_(link),
00038     override_dest_(0),
00039     override_single_dest_(0),
00040     rtps_header_db_(RTPS::RTPSHDR_SZ, ACE_Message_Block::MB_DATA,
00041                     rtps_header_data_, 0, 0, ACE_Message_Block::DONT_DELETE, 0),
00042     rtps_header_mb_(&rtps_header_db_, ACE_Message_Block::DONT_DELETE)
00043 {
00044   rtps_header_.prefix[0] = 'R';
00045   rtps_header_.prefix[1] = 'T';
00046   rtps_header_.prefix[2] = 'P';
00047   rtps_header_.prefix[3] = 'S';
00048   rtps_header_.version = OpenDDS::RTPS::PROTOCOLVERSION;
00049   rtps_header_.vendorId = OpenDDS::RTPS::VENDORID_OPENDDS;
00050   std::memcpy(rtps_header_.guidPrefix, local_prefix,
00051               sizeof(GuidPrefix_t));
00052   Serializer writer(&rtps_header_mb_);
00053   // byte order doesn't matter for the RTPS Header
00054   writer << rtps_header_;
00055 }
00056 
00057 namespace {
00058   bool shouldWarn(int code) {
00059     return code == EPERM || code == EACCES || code == EINTR || code == ENOBUFS || code == ENOMEM;
00060   }
00061 }
00062 
00063 ssize_t
00064 RtpsUdpSendStrategy::send_bytes_i(const iovec iov[], int n)
00065 {
00066   ssize_t result = send_bytes_i_helper(iov, n);
00067 
00068   if (result == -1 && shouldWarn(errno)) {
00069     // Make the framework think this was a successful send to avoid
00070     // putting the send strategy in suspended mode. If reliability
00071     // is enabled, the data may be resent later.
00072     ssize_t b = 0;
00073     for (int i = 0; i < n; ++i) {
00074       b += iov[i].iov_len;
00075     }
00076     result = b;
00077   }
00078 
00079   return result;
00080 }
00081 
00082 ssize_t
00083 RtpsUdpSendStrategy::send_bytes_i_helper(const iovec iov[], int n)
00084 {
00085   if (override_single_dest_) {
00086     return send_single_i(iov, n, *override_single_dest_);
00087   }
00088 
00089   if (override_dest_) {
00090     return send_multi_i(iov, n, *override_dest_);
00091   }
00092 
00093   // determine destination address(es) from TransportQueueElement in progress
00094   TransportQueueElement* elem = current_packet_first_element();
00095   if (!elem) {
00096     errno = ENOTCONN;
00097     return -1;
00098   }
00099 
00100   const RepoId remote_id = elem->subscription_id();
00101   OPENDDS_SET(ACE_INET_Addr) addrs;
00102 
00103   if (remote_id != GUID_UNKNOWN) {
00104     const ACE_INET_Addr remote = link_->get_locator(remote_id);
00105     if (remote != ACE_INET_Addr()) {
00106       addrs.insert(remote);
00107     }
00108   }
00109 
00110   if (addrs.empty()) {
00111     link_->get_locators(elem->publication_id(), addrs);
00112   }
00113 
00114   if (addrs.empty()) {
00115     errno = ENOTCONN;
00116     return -1;
00117   }
00118 
00119   return send_multi_i(iov, n, addrs);
00120 }
00121 
00122 RtpsUdpSendStrategy::OverrideToken
00123 RtpsUdpSendStrategy::override_destinations(const ACE_INET_Addr& destination)
00124 {
00125   override_single_dest_ = &destination;
00126   return OverrideToken(this);
00127 }
00128 
00129 RtpsUdpSendStrategy::OverrideToken
00130 RtpsUdpSendStrategy::override_destinations(const OPENDDS_SET(ACE_INET_Addr)& dest)
00131 {
00132   override_dest_ = &dest;
00133   return OverrideToken(this);
00134 }
00135 
00136 RtpsUdpSendStrategy::OverrideToken::~OverrideToken()
00137 {
00138   outer_->override_single_dest_ = 0;
00139   outer_->override_dest_ = 0;
00140 }
00141 
00142 bool
00143 RtpsUdpSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
00144 {
00145   Serializer writer(mb); // byte order doesn't matter for the RTPS Header
00146   return writer.write_octet_array(reinterpret_cast<ACE_CDR::Octet*>(rtps_header_data_),
00147     RTPS::RTPSHDR_SZ);
00148 }
00149 
00150 void
00151 RtpsUdpSendStrategy::send_rtps_control(ACE_Message_Block& submessages,
00152                                        const ACE_INET_Addr& addr)
00153 {
00154   rtps_header_mb_.cont(&submessages);
00155 #if defined(OPENDDS_SECURITY)
00156   Message_Block_Ptr alternate(pre_send_packet(&rtps_header_mb_));
00157   ACE_Message_Block& use_mb = alternate ? *alternate : rtps_header_mb_;
00158 #else
00159   ACE_Message_Block& use_mb =  rtps_header_mb_;
00160 #endif
00161 
00162   iovec iov[MAX_SEND_BLOCKS];
00163   const int num_blocks = mb_to_iov(use_mb, iov);
00164   const ssize_t result = send_single_i(iov, num_blocks, addr);
00165   if (result < 0) {
00166     const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
00167     ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
00168       "failed to send RTPS control message\n"));
00169   }
00170 
00171   rtps_header_mb_.cont(0);
00172 }
00173 
00174 void
00175 RtpsUdpSendStrategy::send_rtps_control(ACE_Message_Block& submessages,
00176                                        const OPENDDS_SET(ACE_INET_Addr)& addrs)
00177 {
00178   rtps_header_mb_.cont(&submessages);
00179 #if defined(OPENDDS_SECURITY)
00180   Message_Block_Ptr alternate(pre_send_packet(&rtps_header_mb_));
00181   ACE_Message_Block& use_mb = alternate ? *alternate : rtps_header_mb_;
00182 #else
00183   ACE_Message_Block& use_mb =  rtps_header_mb_;
00184 #endif
00185 
00186   iovec iov[MAX_SEND_BLOCKS];
00187   const int num_blocks = mb_to_iov(use_mb, iov);
00188   const ssize_t result = send_multi_i(iov, num_blocks, addrs);
00189   if (result < 0) {
00190     const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
00191     ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
00192       "failed to send RTPS control message\n"));
00193   }
00194 
00195   rtps_header_mb_.cont(0);
00196 }
00197 
00198 ssize_t
00199 RtpsUdpSendStrategy::send_multi_i(const iovec iov[], int n,
00200                                   const OPENDDS_SET(ACE_INET_Addr)& addrs)
00201 {
00202   ssize_t result = -1;
00203   typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00204   for (iter_t iter = addrs.begin(); iter != addrs.end(); ++iter) {
00205     const ssize_t result_per_dest = send_single_i(iov, n, *iter);
00206     if (result_per_dest >= 0) {
00207       result = result_per_dest;
00208     }
00209   }
00210   return result;
00211 }
00212 
00213 ssize_t
00214 RtpsUdpSendStrategy::send_single_i(const iovec iov[], int n,
00215                                    const ACE_INET_Addr& addr)
00216 {
00217 #ifdef ACE_LACKS_SENDMSG
00218   char buffer[UDP_MAX_MESSAGE_SIZE];
00219   char *iter = buffer;
00220   for (int i = 0; i < n; ++i) {
00221     if (size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) {
00222       ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
00223                  "message too large at index %d size %d\n", i, iov[i].iov_len));
00224       return -1;
00225     }
00226     std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00227     iter += iov[i].iov_len;
00228   }
00229   const ssize_t result = link_->unicast_socket().send(buffer, iter - buffer, addr);
00230 #else
00231   const ssize_t result = link_->unicast_socket().send(iov, n, addr);
00232 #endif
00233   if (result < 0) {
00234     ACE_TCHAR addr_buff[256] = {};
00235     int err = errno;
00236     addr.addr_to_string(addr_buff, 256, 0);
00237     errno = err;
00238     const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
00239     ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
00240       "destination %s failed %p\n", addr_buff, ACE_TEXT("send")));
00241   }
00242   return result;
00243 }
00244 
00245 void
00246 RtpsUdpSendStrategy::add_delayed_notification(TransportQueueElement* element)
00247 {
00248   if (!link_->add_delayed_notification(element)) {
00249     TransportSendStrategy::add_delayed_notification(element);
00250   }
00251 }
00252 
00253 RemoveResult
00254 RtpsUdpSendStrategy::do_remove_sample(const RepoId& pub_id,
00255   const TransportQueueElement::MatchCriteria& criteria,
00256   void* context)
00257 {
00258   ACE_Guard<ACE_Thread_Mutex>* guard =
00259     static_cast<ACE_Guard<ACE_Thread_Mutex>*>(context);
00260   link_->do_remove_sample(pub_id, criteria, *guard);
00261   return TransportSendStrategy::do_remove_sample(pub_id, criteria, 0);
00262 }
00263 
00264 #if defined(OPENDDS_SECURITY)
00265 void
00266 RtpsUdpSendStrategy::encode_payload(const RepoId& pub_id,
00267                                     Message_Block_Ptr& payload,
00268                                     RTPS::SubmessageSeq& submessages)
00269 {
00270   const DDS::Security::DatawriterCryptoHandle writer_crypto_handle =
00271     link_->writer_crypto_handle(pub_id);
00272   DDS::Security::CryptoTransform_var crypto =
00273     link_->security_config()->get_crypto_transform();
00274 
00275   if (writer_crypto_handle == DDS::HANDLE_NIL || !crypto) {
00276     return;
00277   }
00278 
00279   DDS::OctetSeq encoded, plain, iQos;
00280   plain.length(payload->total_length());
00281   ACE_Message_Block* mb(payload.get());
00282   for (CORBA::ULong i = 0; mb; mb = mb->cont()) {
00283     std::memcpy(plain.get_buffer() + i, mb->rd_ptr(), mb->length());
00284     i += mb->length();
00285   }
00286 
00287   DDS::Security::SecurityException ex = {"", 0, 0};
00288   if (crypto->encode_serialized_payload(encoded, iQos, plain,
00289                                         writer_crypto_handle, ex)) {
00290     if (encoded != plain) {
00291       payload.reset(new ACE_Message_Block(encoded.length()));
00292       const char* raw = reinterpret_cast<const char*>(encoded.get_buffer());
00293       payload->copy(raw, encoded.length());
00294     }
00295 
00296     const CORBA::ULong iQosLen = iQos.length();
00297     if (iQosLen > 3) {
00298       for (CORBA::ULong i = 0; i < submessages.length(); ++i) {
00299         if (submessages[i]._d() == RTPS::DATA) {
00300           // ParameterList must end in {1, 0, x, x} (LE) or {0, 1, x, x} (BE)
00301           // Check for this sentinel and use it for endianness detection
00302           if (iQos[iQosLen - 3] + iQos[iQosLen - 4] != 1) {
00303             VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
00304                       "extra_inline_qos is not a valid ParameterList\n"), 2);
00305             break;
00306           }
00307 
00308           const bool swapPl = iQos[iQosLen - 4] != ACE_CDR_BYTE_ORDER;
00309           const char* rawIQos = reinterpret_cast<const char*>(iQos.get_buffer());
00310           ACE_Message_Block mbIQos(rawIQos, iQosLen);
00311           Serializer ser(&mbIQos, swapPl, Serializer::ALIGN_CDR);
00312 
00313           RTPS::DataSubmessage& data = submessages[i].data_sm();
00314           if (!(ser >> data.inlineQos)) { // appends to any existing inlineQos
00315             VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
00316                       "extra_inline_qos deserialization failed\n"), 2);
00317             break;
00318           }
00319           data.smHeader.flags |= RTPS::FLAG_Q;
00320           break;
00321         }
00322       }
00323     } else if (iQosLen) {
00324       VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
00325                 "extra_inline_qos not enough bytes for ParameterList\n"), 2);
00326     }
00327   }
00328 }
00329 
00330 namespace {
00331   DDS::OctetSeq toSeq(Serializer& ser1, CORBA::Octet msgId, CORBA::Octet flags,
00332                       CORBA::UShort octetsToNextHeader, CORBA::ULong dataWord2,
00333                       EntityId_t readerId, EntityId_t writerId, size_t remain)
00334   {
00335     const bool shortMsg = (msgId == RTPS::PAD || msgId == RTPS::INFO_TS);
00336     CORBA::ULong size = RTPS::SMHDR_SZ +
00337       ((octetsToNextHeader == 0 && !shortMsg) ? remain : octetsToNextHeader);
00338     DDS::OctetSeq out(size);
00339     out.length(size);
00340     ACE_Message_Block mb(reinterpret_cast<const char*>(out.get_buffer()), size);
00341     Serializer ser2(&mb, ser1.swap_bytes(), Serializer::ALIGN_CDR);
00342     ser2 << ACE_OutputCDR::from_octet(msgId);
00343     ser2 << ACE_OutputCDR::from_octet(flags);
00344     ser2 << octetsToNextHeader;
00345     if (msgId == RTPS::DATA || msgId == RTPS::DATA_FRAG) {
00346       ser2 << dataWord2;
00347     }
00348     ser2 << readerId;
00349     ser2 << writerId;
00350     ser1.read_octet_array(reinterpret_cast<CORBA::Octet*>(mb.wr_ptr()),
00351                           mb.space());
00352     return out;
00353   }
00354 
00355   void log_encode_error(CORBA::Octet msgId,
00356                         DDS::Security::NativeCryptoHandle sender,
00357                         const DDS::Security::SecurityException& ex)
00358   {
00359     if (Transport_debug_level) {
00360       ACE_ERROR((LM_ERROR, "RtpsUdpSendStrategy::pre_send_packet - ERROR "
00361                  "plugin failed to encode submessage 0x%x from handle %d "
00362                  "[%d.%d]: %C\n", msgId, sender, ex.code, ex.minor_code,
00363                  ex.message.in()));
00364     }
00365   }
00366 }
00367 
00368 bool
00369 RtpsUdpSendStrategy::encode_writer_submessage(const RepoId& receiver,
00370                                               OPENDDS_VECTOR(Chunk)& replacements,
00371                                               DDS::Security::CryptoTransform* crypto,
00372                                               const DDS::OctetSeq& plain,
00373                                               DDS::Security::DatawriterCryptoHandle sender_dwch,
00374                                               char* submessage_start,
00375                                               CORBA::Octet msgId)
00376 {
00377   using namespace DDS::Security;
00378 
00379   DatareaderCryptoHandleSeq readerHandles;
00380   if (std::memcmp(&GUID_UNKNOWN, &receiver, sizeof receiver)) {
00381     DatareaderCryptoHandle drch = link_->reader_crypto_handle(receiver);
00382     if (drch != DDS::HANDLE_NIL) {
00383       readerHandles.length(1);
00384       readerHandles[0] = drch;
00385     }
00386   }
00387 
00388   CORBA::Long idx = 0;
00389   SecurityException ex = {"", 0, 0};
00390   replacements.resize(replacements.size() + 1);
00391   Chunk& c = replacements.back();
00392   if (crypto->encode_datawriter_submessage(c.encoded_, plain, sender_dwch,
00393                                            readerHandles, idx, ex)) {
00394     if (c.encoded_ != plain) {
00395       c.start_ = submessage_start;
00396       c.length_ = plain.length();
00397     } else {
00398       replacements.pop_back();
00399     }
00400   } else {
00401     log_encode_error(msgId, sender_dwch, ex);
00402     replacements.pop_back();
00403     return false;
00404   }
00405   return true;
00406 }
00407 
00408 bool
00409 RtpsUdpSendStrategy::encode_reader_submessage(const RepoId& receiver,
00410                                               OPENDDS_VECTOR(Chunk)& replacements,
00411                                               DDS::Security::CryptoTransform* crypto,
00412                                               const DDS::OctetSeq& plain,
00413                                               DDS::Security::DatareaderCryptoHandle sender_drch,
00414                                               char* submessage_start,
00415                                               CORBA::Octet msgId)
00416 {
00417   using namespace DDS::Security;
00418 
00419   DatawriterCryptoHandleSeq writerHandles;
00420   if (std::memcmp(&GUID_UNKNOWN, &receiver, sizeof receiver)) {
00421     DatawriterCryptoHandle dwch = link_->writer_crypto_handle(receiver);
00422     if (dwch != DDS::HANDLE_NIL) {
00423       writerHandles.length(1);
00424       writerHandles[0] = dwch;
00425     }
00426   }
00427 
00428   SecurityException ex = {"", 0, 0};
00429   replacements.resize(replacements.size() + 1);
00430   Chunk& c = replacements.back();
00431   if (crypto->encode_datareader_submessage(c.encoded_, plain, sender_drch,
00432                                            writerHandles, ex)) {
00433     if (c.encoded_ != plain) {
00434       c.start_ = submessage_start;
00435       c.length_ = plain.length();
00436     } else {
00437       replacements.pop_back();
00438     }
00439   } else {
00440     log_encode_error(msgId, sender_drch, ex);
00441     replacements.pop_back();
00442     return false;
00443   }
00444   return true;
00445 }
00446 
00447 ACE_Message_Block*
00448 RtpsUdpSendStrategy::pre_send_packet(const ACE_Message_Block* plain)
00449 {
00450   using namespace DDS::Security;
00451 
00452   CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
00453   const ParticipantCryptoHandle local_pch = link_->local_crypto_handle();
00454   if (!crypto || local_pch == DDS::HANDLE_NIL) {
00455     return 0;
00456   }
00457 
00458   // 'plain' contains a full RTPS Message on its way to the socket(s).
00459   // Let the crypto plugin examine each submessage and replace it with an
00460   // encoded version.  First, parse through the message using the 'in'
00461   // message block chain.  Instead of changing the messsage in place,
00462   // modifications are stored in the 'replacements' which will end up
00463   // changing the message when the 'out' message block is created in the
00464   // helper method replace_chunks().
00465   Message_Block_Ptr in(plain->duplicate());
00466   ACE_Message_Block* current = in.get();
00467   Serializer ser(current);
00468   RTPS::Header header;
00469   bool ok = ser >> header;
00470 
00471   RepoId sender = GUID_UNKNOWN;
00472   RTPS::assign(sender.guidPrefix, link_->local_prefix());
00473 
00474   RepoId receiver = GUID_UNKNOWN;
00475 
00476   OPENDDS_VECTOR(Chunk) replacements;
00477 
00478   while (ok && in->total_length()) {
00479     while (current && !current->length()) {
00480       current = current->cont();
00481     }
00482     char* submessage_start = current->rd_ptr();
00483 
00484     CORBA::Octet msgId, flags;
00485     if (!(ser >> ACE_InputCDR::to_octet(msgId)) ||
00486         !(ser >> ACE_InputCDR::to_octet(flags))) {
00487       ok = false;
00488       break;
00489     }
00490 
00491     ser.swap_bytes(ACE_CDR_BYTE_ORDER != (flags & RTPS::FLAG_E));
00492     CORBA::UShort octetsToNextHeader;
00493     if (!(ser >> octetsToNextHeader)) {
00494       ok = false;
00495       break;
00496     }
00497 
00498     const size_t remaining = in->total_length();
00499     int read = 0;
00500     CORBA::ULong u2 = 0;
00501 
00502     switch (msgId) {
00503     case RTPS::INFO_DST: {
00504       GuidPrefix_t_forany guidPrefix(receiver.guidPrefix);
00505       if (!(ser >> guidPrefix)) {
00506         ok = false;
00507         break;
00508       }
00509       read += RTPS::INFO_DST_SZ;
00510       break;
00511     }
00512     case RTPS::DATA:
00513     case RTPS::DATA_FRAG:
00514       if (!(ser >> u2)) { // extraFlags|octetsToInlineQos
00515         ok = false;
00516         break;
00517       }
00518       // fall-through
00519     case RTPS::HEARTBEAT:
00520     case RTPS::GAP:
00521     case RTPS::HEARTBEAT_FRAG: {
00522       if (!(ser >> receiver.entityId)) { // readerId
00523         ok = false;
00524         break;
00525       }
00526       if (!(ser >> sender.entityId)) { // writerId
00527         ok = false;
00528         break;
00529       }
00530       DatawriterCryptoHandle sender_dwch = link_->writer_crypto_handle(sender);
00531       if (sender_dwch == DDS::HANDLE_NIL) {
00532         ok = false;
00533         break;
00534       }
00535 
00536       DDS::OctetSeq plain(toSeq(ser, msgId, flags, octetsToNextHeader, u2,
00537                                 receiver.entityId, sender.entityId, remaining));
00538       read = octetsToNextHeader;
00539       if (!encode_writer_submessage(receiver, replacements, crypto, plain,
00540                                     sender_dwch, submessage_start, msgId)) {
00541         ok = false;
00542       }
00543       break;
00544     }
00545     case RTPS::ACKNACK:
00546     case RTPS::NACK_FRAG: {
00547       if (!(ser >> sender.entityId)) { // readerId
00548         ok = false;
00549         break;
00550       }
00551       if (!(ser >> receiver.entityId)) { // writerId
00552         ok = false;
00553         break;
00554       }
00555       DatareaderCryptoHandle sender_drch = link_->reader_crypto_handle(sender);
00556       if (sender_drch == DDS::HANDLE_NIL) {
00557         ok = false;
00558         break;
00559       }
00560 
00561       DDS::OctetSeq plain(toSeq(ser, msgId, flags, octetsToNextHeader, 0,
00562                                 sender.entityId, receiver.entityId, remaining));
00563       read = octetsToNextHeader;
00564       if (!encode_reader_submessage(receiver, replacements, crypto, plain,
00565                                     sender_drch, submessage_start, msgId)) {
00566         ok = false;
00567       }
00568       break;
00569     }
00570     default:
00571       break;
00572     }
00573 
00574     if (!ok || (octetsToNextHeader == 0 && msgId != RTPS::PAD
00575                 && msgId != RTPS::INFO_TS)) {
00576       break;
00577     }
00578 
00579     if (octetsToNextHeader > read) {
00580       if (!ser.skip(octetsToNextHeader - read)) {
00581         ok = false;
00582       }
00583     }
00584   }
00585 
00586   if (!ok || replacements.empty()) {
00587     return 0;
00588   }
00589 
00590   //DDS-Security: SRTPS encoding (including if replacements is empty above)
00591 
00592   return replace_chunks(plain, replacements);
00593 }
00594 
00595 ACE_Message_Block*
00596 RtpsUdpSendStrategy::replace_chunks(const ACE_Message_Block* plain,
00597                                     const OPENDDS_VECTOR(Chunk)& replacements)
00598 {
00599   unsigned int out_size = plain->total_length();
00600   for (size_t i = 0; i < replacements.size(); ++i) {
00601     out_size += replacements[i].encoded_.length();
00602     out_size -= replacements[i].length_;
00603   }
00604 
00605   Message_Block_Ptr in(plain->duplicate());
00606   ACE_Message_Block* cur = in.get();
00607   Message_Block_Ptr out(new ACE_Message_Block(out_size));
00608   for (size_t i = 0; i < replacements.size(); ++i) {
00609     const Chunk& c = replacements[i];
00610     for (; cur && (c.start_ < cur->rd_ptr() || c.start_ >= cur->wr_ptr());
00611          cur = cur->cont()) {
00612       out->copy(cur->rd_ptr(), cur->length());
00613     }
00614     if (!cur) {
00615       return 0;
00616     }
00617 
00618     const size_t prefix = c.start_ - cur->rd_ptr();
00619     out->copy(cur->rd_ptr(), prefix);
00620     cur->rd_ptr(prefix);
00621 
00622     out->copy(reinterpret_cast<const char*>(c.encoded_.get_buffer()),
00623               c.encoded_.length());
00624     for (size_t n = c.length_; n; cur = cur->cont()) {
00625       if (cur->length() > n) {
00626         cur->rd_ptr(n);
00627         break;
00628       } else {
00629         n -= cur->length();
00630       }
00631     }
00632   }
00633 
00634   for (; cur; cur = cur->cont()) {
00635     out->copy(cur->rd_ptr(), cur->length());
00636   }
00637 
00638   return out.release();
00639 }
00640 #endif
00641 
00642 void
00643 RtpsUdpSendStrategy::stop_i()
00644 {
00645 }
00646 
00647 } // namespace DCPS
00648 } // namespace OpenDDS
00649 
00650 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1