00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsUdpSendStrategy.h"
00009 #include "RtpsUdpDataLink.h"
00010 #include "RtpsUdpInst.h"
00011
00012 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00013 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00014 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00015
00016 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00017 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00018 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00019
00020 #include "dds/DCPS/Serializer.h"
00021
00022 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00023
00024 #include <cstring>
00025
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 namespace OpenDDS {
00029 namespace DCPS {
00030
00031 RtpsUdpSendStrategy::RtpsUdpSendStrategy(RtpsUdpDataLink* link,
00032 const GuidPrefix_t& local_prefix)
00033 : TransportSendStrategy(0, link->impl(),
00034 0,
00035 link->transport_priority(),
00036 make_rch<NullSynchStrategy>()),
00037 link_(link),
00038 override_dest_(0),
00039 override_single_dest_(0),
00040 rtps_header_db_(RTPS::RTPSHDR_SZ, ACE_Message_Block::MB_DATA,
00041 rtps_header_data_, 0, 0, ACE_Message_Block::DONT_DELETE, 0),
00042 rtps_header_mb_(&rtps_header_db_, ACE_Message_Block::DONT_DELETE)
00043 {
00044 rtps_header_.prefix[0] = 'R';
00045 rtps_header_.prefix[1] = 'T';
00046 rtps_header_.prefix[2] = 'P';
00047 rtps_header_.prefix[3] = 'S';
00048 rtps_header_.version = OpenDDS::RTPS::PROTOCOLVERSION;
00049 rtps_header_.vendorId = OpenDDS::RTPS::VENDORID_OPENDDS;
00050 std::memcpy(rtps_header_.guidPrefix, local_prefix,
00051 sizeof(GuidPrefix_t));
00052 Serializer writer(&rtps_header_mb_);
00053
00054 writer << rtps_header_;
00055 }
00056
00057 namespace {
00058 bool shouldWarn(int code) {
00059 return code == EPERM || code == EACCES || code == EINTR || code == ENOBUFS || code == ENOMEM;
00060 }
00061 }
00062
00063 ssize_t
00064 RtpsUdpSendStrategy::send_bytes_i(const iovec iov[], int n)
00065 {
00066 ssize_t result = send_bytes_i_helper(iov, n);
00067
00068 if (result == -1 && shouldWarn(errno)) {
00069
00070
00071
00072 ssize_t b = 0;
00073 for (int i = 0; i < n; ++i) {
00074 b += iov[i].iov_len;
00075 }
00076 result = b;
00077 }
00078
00079 return result;
00080 }
00081
00082 ssize_t
00083 RtpsUdpSendStrategy::send_bytes_i_helper(const iovec iov[], int n)
00084 {
00085 if (override_single_dest_) {
00086 return send_single_i(iov, n, *override_single_dest_);
00087 }
00088
00089 if (override_dest_) {
00090 return send_multi_i(iov, n, *override_dest_);
00091 }
00092
00093
00094 TransportQueueElement* elem = current_packet_first_element();
00095 if (!elem) {
00096 errno = ENOTCONN;
00097 return -1;
00098 }
00099
00100 const RepoId remote_id = elem->subscription_id();
00101 OPENDDS_SET(ACE_INET_Addr) addrs;
00102
00103 if (remote_id != GUID_UNKNOWN) {
00104 const ACE_INET_Addr remote = link_->get_locator(remote_id);
00105 if (remote != ACE_INET_Addr()) {
00106 addrs.insert(remote);
00107 }
00108 }
00109
00110 if (addrs.empty()) {
00111 link_->get_locators(elem->publication_id(), addrs);
00112 }
00113
00114 if (addrs.empty()) {
00115 errno = ENOTCONN;
00116 return -1;
00117 }
00118
00119 return send_multi_i(iov, n, addrs);
00120 }
00121
00122 RtpsUdpSendStrategy::OverrideToken
00123 RtpsUdpSendStrategy::override_destinations(const ACE_INET_Addr& destination)
00124 {
00125 override_single_dest_ = &destination;
00126 return OverrideToken(this);
00127 }
00128
00129 RtpsUdpSendStrategy::OverrideToken
00130 RtpsUdpSendStrategy::override_destinations(const OPENDDS_SET(ACE_INET_Addr)& dest)
00131 {
00132 override_dest_ = &dest;
00133 return OverrideToken(this);
00134 }
00135
00136 RtpsUdpSendStrategy::OverrideToken::~OverrideToken()
00137 {
00138 outer_->override_single_dest_ = 0;
00139 outer_->override_dest_ = 0;
00140 }
00141
00142 bool
00143 RtpsUdpSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
00144 {
00145 Serializer writer(mb);
00146 return writer.write_octet_array(reinterpret_cast<ACE_CDR::Octet*>(rtps_header_data_),
00147 RTPS::RTPSHDR_SZ);
00148 }
00149
00150 void
00151 RtpsUdpSendStrategy::send_rtps_control(ACE_Message_Block& submessages,
00152 const ACE_INET_Addr& addr)
00153 {
00154 rtps_header_mb_.cont(&submessages);
00155 #if defined(OPENDDS_SECURITY)
00156 Message_Block_Ptr alternate(pre_send_packet(&rtps_header_mb_));
00157 ACE_Message_Block& use_mb = alternate ? *alternate : rtps_header_mb_;
00158 #else
00159 ACE_Message_Block& use_mb = rtps_header_mb_;
00160 #endif
00161
00162 iovec iov[MAX_SEND_BLOCKS];
00163 const int num_blocks = mb_to_iov(use_mb, iov);
00164 const ssize_t result = send_single_i(iov, num_blocks, addr);
00165 if (result < 0) {
00166 const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
00167 ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
00168 "failed to send RTPS control message\n"));
00169 }
00170
00171 rtps_header_mb_.cont(0);
00172 }
00173
00174 void
00175 RtpsUdpSendStrategy::send_rtps_control(ACE_Message_Block& submessages,
00176 const OPENDDS_SET(ACE_INET_Addr)& addrs)
00177 {
00178 rtps_header_mb_.cont(&submessages);
00179 #if defined(OPENDDS_SECURITY)
00180 Message_Block_Ptr alternate(pre_send_packet(&rtps_header_mb_));
00181 ACE_Message_Block& use_mb = alternate ? *alternate : rtps_header_mb_;
00182 #else
00183 ACE_Message_Block& use_mb = rtps_header_mb_;
00184 #endif
00185
00186 iovec iov[MAX_SEND_BLOCKS];
00187 const int num_blocks = mb_to_iov(use_mb, iov);
00188 const ssize_t result = send_multi_i(iov, num_blocks, addrs);
00189 if (result < 0) {
00190 const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
00191 ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_rtps_control() - "
00192 "failed to send RTPS control message\n"));
00193 }
00194
00195 rtps_header_mb_.cont(0);
00196 }
00197
00198 ssize_t
00199 RtpsUdpSendStrategy::send_multi_i(const iovec iov[], int n,
00200 const OPENDDS_SET(ACE_INET_Addr)& addrs)
00201 {
00202 ssize_t result = -1;
00203 typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
00204 for (iter_t iter = addrs.begin(); iter != addrs.end(); ++iter) {
00205 const ssize_t result_per_dest = send_single_i(iov, n, *iter);
00206 if (result_per_dest >= 0) {
00207 result = result_per_dest;
00208 }
00209 }
00210 return result;
00211 }
00212
00213 ssize_t
00214 RtpsUdpSendStrategy::send_single_i(const iovec iov[], int n,
00215 const ACE_INET_Addr& addr)
00216 {
00217 #ifdef ACE_LACKS_SENDMSG
00218 char buffer[UDP_MAX_MESSAGE_SIZE];
00219 char *iter = buffer;
00220 for (int i = 0; i < n; ++i) {
00221 if (size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) {
00222 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
00223 "message too large at index %d size %d\n", i, iov[i].iov_len));
00224 return -1;
00225 }
00226 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00227 iter += iov[i].iov_len;
00228 }
00229 const ssize_t result = link_->unicast_socket().send(buffer, iter - buffer, addr);
00230 #else
00231 const ssize_t result = link_->unicast_socket().send(iov, n, addr);
00232 #endif
00233 if (result < 0) {
00234 ACE_TCHAR addr_buff[256] = {};
00235 int err = errno;
00236 addr.addr_to_string(addr_buff, 256, 0);
00237 errno = err;
00238 const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
00239 ACE_ERROR((prio, "(%P|%t) RtpsUdpSendStrategy::send_single_i() - "
00240 "destination %s failed %p\n", addr_buff, ACE_TEXT("send")));
00241 }
00242 return result;
00243 }
00244
00245 void
00246 RtpsUdpSendStrategy::add_delayed_notification(TransportQueueElement* element)
00247 {
00248 if (!link_->add_delayed_notification(element)) {
00249 TransportSendStrategy::add_delayed_notification(element);
00250 }
00251 }
00252
00253 RemoveResult
00254 RtpsUdpSendStrategy::do_remove_sample(const RepoId& pub_id,
00255 const TransportQueueElement::MatchCriteria& criteria,
00256 void* context)
00257 {
00258 ACE_Guard<ACE_Thread_Mutex>* guard =
00259 static_cast<ACE_Guard<ACE_Thread_Mutex>*>(context);
00260 link_->do_remove_sample(pub_id, criteria, *guard);
00261 return TransportSendStrategy::do_remove_sample(pub_id, criteria, 0);
00262 }
00263
00264 #if defined(OPENDDS_SECURITY)
00265 void
00266 RtpsUdpSendStrategy::encode_payload(const RepoId& pub_id,
00267 Message_Block_Ptr& payload,
00268 RTPS::SubmessageSeq& submessages)
00269 {
00270 const DDS::Security::DatawriterCryptoHandle writer_crypto_handle =
00271 link_->writer_crypto_handle(pub_id);
00272 DDS::Security::CryptoTransform_var crypto =
00273 link_->security_config()->get_crypto_transform();
00274
00275 if (writer_crypto_handle == DDS::HANDLE_NIL || !crypto) {
00276 return;
00277 }
00278
00279 DDS::OctetSeq encoded, plain, iQos;
00280 plain.length(payload->total_length());
00281 ACE_Message_Block* mb(payload.get());
00282 for (CORBA::ULong i = 0; mb; mb = mb->cont()) {
00283 std::memcpy(plain.get_buffer() + i, mb->rd_ptr(), mb->length());
00284 i += mb->length();
00285 }
00286
00287 DDS::Security::SecurityException ex = {"", 0, 0};
00288 if (crypto->encode_serialized_payload(encoded, iQos, plain,
00289 writer_crypto_handle, ex)) {
00290 if (encoded != plain) {
00291 payload.reset(new ACE_Message_Block(encoded.length()));
00292 const char* raw = reinterpret_cast<const char*>(encoded.get_buffer());
00293 payload->copy(raw, encoded.length());
00294 }
00295
00296 const CORBA::ULong iQosLen = iQos.length();
00297 if (iQosLen > 3) {
00298 for (CORBA::ULong i = 0; i < submessages.length(); ++i) {
00299 if (submessages[i]._d() == RTPS::DATA) {
00300
00301
00302 if (iQos[iQosLen - 3] + iQos[iQosLen - 4] != 1) {
00303 VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
00304 "extra_inline_qos is not a valid ParameterList\n"), 2);
00305 break;
00306 }
00307
00308 const bool swapPl = iQos[iQosLen - 4] != ACE_CDR_BYTE_ORDER;
00309 const char* rawIQos = reinterpret_cast<const char*>(iQos.get_buffer());
00310 ACE_Message_Block mbIQos(rawIQos, iQosLen);
00311 Serializer ser(&mbIQos, swapPl, Serializer::ALIGN_CDR);
00312
00313 RTPS::DataSubmessage& data = submessages[i].data_sm();
00314 if (!(ser >> data.inlineQos)) {
00315 VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
00316 "extra_inline_qos deserialization failed\n"), 2);
00317 break;
00318 }
00319 data.smHeader.flags |= RTPS::FLAG_Q;
00320 break;
00321 }
00322 }
00323 } else if (iQosLen) {
00324 VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpSendStrategy::encode_payload "
00325 "extra_inline_qos not enough bytes for ParameterList\n"), 2);
00326 }
00327 }
00328 }
00329
00330 namespace {
00331 DDS::OctetSeq toSeq(Serializer& ser1, CORBA::Octet msgId, CORBA::Octet flags,
00332 CORBA::UShort octetsToNextHeader, CORBA::ULong dataWord2,
00333 EntityId_t readerId, EntityId_t writerId, size_t remain)
00334 {
00335 const bool shortMsg = (msgId == RTPS::PAD || msgId == RTPS::INFO_TS);
00336 CORBA::ULong size = RTPS::SMHDR_SZ +
00337 ((octetsToNextHeader == 0 && !shortMsg) ? remain : octetsToNextHeader);
00338 DDS::OctetSeq out(size);
00339 out.length(size);
00340 ACE_Message_Block mb(reinterpret_cast<const char*>(out.get_buffer()), size);
00341 Serializer ser2(&mb, ser1.swap_bytes(), Serializer::ALIGN_CDR);
00342 ser2 << ACE_OutputCDR::from_octet(msgId);
00343 ser2 << ACE_OutputCDR::from_octet(flags);
00344 ser2 << octetsToNextHeader;
00345 if (msgId == RTPS::DATA || msgId == RTPS::DATA_FRAG) {
00346 ser2 << dataWord2;
00347 }
00348 ser2 << readerId;
00349 ser2 << writerId;
00350 ser1.read_octet_array(reinterpret_cast<CORBA::Octet*>(mb.wr_ptr()),
00351 mb.space());
00352 return out;
00353 }
00354
00355 void log_encode_error(CORBA::Octet msgId,
00356 DDS::Security::NativeCryptoHandle sender,
00357 const DDS::Security::SecurityException& ex)
00358 {
00359 if (Transport_debug_level) {
00360 ACE_ERROR((LM_ERROR, "RtpsUdpSendStrategy::pre_send_packet - ERROR "
00361 "plugin failed to encode submessage 0x%x from handle %d "
00362 "[%d.%d]: %C\n", msgId, sender, ex.code, ex.minor_code,
00363 ex.message.in()));
00364 }
00365 }
00366 }
00367
00368 bool
00369 RtpsUdpSendStrategy::encode_writer_submessage(const RepoId& receiver,
00370 OPENDDS_VECTOR(Chunk)& replacements,
00371 DDS::Security::CryptoTransform* crypto,
00372 const DDS::OctetSeq& plain,
00373 DDS::Security::DatawriterCryptoHandle sender_dwch,
00374 char* submessage_start,
00375 CORBA::Octet msgId)
00376 {
00377 using namespace DDS::Security;
00378
00379 DatareaderCryptoHandleSeq readerHandles;
00380 if (std::memcmp(&GUID_UNKNOWN, &receiver, sizeof receiver)) {
00381 DatareaderCryptoHandle drch = link_->reader_crypto_handle(receiver);
00382 if (drch != DDS::HANDLE_NIL) {
00383 readerHandles.length(1);
00384 readerHandles[0] = drch;
00385 }
00386 }
00387
00388 CORBA::Long idx = 0;
00389 SecurityException ex = {"", 0, 0};
00390 replacements.resize(replacements.size() + 1);
00391 Chunk& c = replacements.back();
00392 if (crypto->encode_datawriter_submessage(c.encoded_, plain, sender_dwch,
00393 readerHandles, idx, ex)) {
00394 if (c.encoded_ != plain) {
00395 c.start_ = submessage_start;
00396 c.length_ = plain.length();
00397 } else {
00398 replacements.pop_back();
00399 }
00400 } else {
00401 log_encode_error(msgId, sender_dwch, ex);
00402 replacements.pop_back();
00403 return false;
00404 }
00405 return true;
00406 }
00407
00408 bool
00409 RtpsUdpSendStrategy::encode_reader_submessage(const RepoId& receiver,
00410 OPENDDS_VECTOR(Chunk)& replacements,
00411 DDS::Security::CryptoTransform* crypto,
00412 const DDS::OctetSeq& plain,
00413 DDS::Security::DatareaderCryptoHandle sender_drch,
00414 char* submessage_start,
00415 CORBA::Octet msgId)
00416 {
00417 using namespace DDS::Security;
00418
00419 DatawriterCryptoHandleSeq writerHandles;
00420 if (std::memcmp(&GUID_UNKNOWN, &receiver, sizeof receiver)) {
00421 DatawriterCryptoHandle dwch = link_->writer_crypto_handle(receiver);
00422 if (dwch != DDS::HANDLE_NIL) {
00423 writerHandles.length(1);
00424 writerHandles[0] = dwch;
00425 }
00426 }
00427
00428 SecurityException ex = {"", 0, 0};
00429 replacements.resize(replacements.size() + 1);
00430 Chunk& c = replacements.back();
00431 if (crypto->encode_datareader_submessage(c.encoded_, plain, sender_drch,
00432 writerHandles, ex)) {
00433 if (c.encoded_ != plain) {
00434 c.start_ = submessage_start;
00435 c.length_ = plain.length();
00436 } else {
00437 replacements.pop_back();
00438 }
00439 } else {
00440 log_encode_error(msgId, sender_drch, ex);
00441 replacements.pop_back();
00442 return false;
00443 }
00444 return true;
00445 }
00446
00447 ACE_Message_Block*
00448 RtpsUdpSendStrategy::pre_send_packet(const ACE_Message_Block* plain)
00449 {
00450 using namespace DDS::Security;
00451
00452 CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
00453 const ParticipantCryptoHandle local_pch = link_->local_crypto_handle();
00454 if (!crypto || local_pch == DDS::HANDLE_NIL) {
00455 return 0;
00456 }
00457
00458
00459
00460
00461
00462
00463
00464
00465 Message_Block_Ptr in(plain->duplicate());
00466 ACE_Message_Block* current = in.get();
00467 Serializer ser(current);
00468 RTPS::Header header;
00469 bool ok = ser >> header;
00470
00471 RepoId sender = GUID_UNKNOWN;
00472 RTPS::assign(sender.guidPrefix, link_->local_prefix());
00473
00474 RepoId receiver = GUID_UNKNOWN;
00475
00476 OPENDDS_VECTOR(Chunk) replacements;
00477
00478 while (ok && in->total_length()) {
00479 while (current && !current->length()) {
00480 current = current->cont();
00481 }
00482 char* submessage_start = current->rd_ptr();
00483
00484 CORBA::Octet msgId, flags;
00485 if (!(ser >> ACE_InputCDR::to_octet(msgId)) ||
00486 !(ser >> ACE_InputCDR::to_octet(flags))) {
00487 ok = false;
00488 break;
00489 }
00490
00491 ser.swap_bytes(ACE_CDR_BYTE_ORDER != (flags & RTPS::FLAG_E));
00492 CORBA::UShort octetsToNextHeader;
00493 if (!(ser >> octetsToNextHeader)) {
00494 ok = false;
00495 break;
00496 }
00497
00498 const size_t remaining = in->total_length();
00499 int read = 0;
00500 CORBA::ULong u2 = 0;
00501
00502 switch (msgId) {
00503 case RTPS::INFO_DST: {
00504 GuidPrefix_t_forany guidPrefix(receiver.guidPrefix);
00505 if (!(ser >> guidPrefix)) {
00506 ok = false;
00507 break;
00508 }
00509 read += RTPS::INFO_DST_SZ;
00510 break;
00511 }
00512 case RTPS::DATA:
00513 case RTPS::DATA_FRAG:
00514 if (!(ser >> u2)) {
00515 ok = false;
00516 break;
00517 }
00518
00519 case RTPS::HEARTBEAT:
00520 case RTPS::GAP:
00521 case RTPS::HEARTBEAT_FRAG: {
00522 if (!(ser >> receiver.entityId)) {
00523 ok = false;
00524 break;
00525 }
00526 if (!(ser >> sender.entityId)) {
00527 ok = false;
00528 break;
00529 }
00530 DatawriterCryptoHandle sender_dwch = link_->writer_crypto_handle(sender);
00531 if (sender_dwch == DDS::HANDLE_NIL) {
00532 ok = false;
00533 break;
00534 }
00535
00536 DDS::OctetSeq plain(toSeq(ser, msgId, flags, octetsToNextHeader, u2,
00537 receiver.entityId, sender.entityId, remaining));
00538 read = octetsToNextHeader;
00539 if (!encode_writer_submessage(receiver, replacements, crypto, plain,
00540 sender_dwch, submessage_start, msgId)) {
00541 ok = false;
00542 }
00543 break;
00544 }
00545 case RTPS::ACKNACK:
00546 case RTPS::NACK_FRAG: {
00547 if (!(ser >> sender.entityId)) {
00548 ok = false;
00549 break;
00550 }
00551 if (!(ser >> receiver.entityId)) {
00552 ok = false;
00553 break;
00554 }
00555 DatareaderCryptoHandle sender_drch = link_->reader_crypto_handle(sender);
00556 if (sender_drch == DDS::HANDLE_NIL) {
00557 ok = false;
00558 break;
00559 }
00560
00561 DDS::OctetSeq plain(toSeq(ser, msgId, flags, octetsToNextHeader, 0,
00562 sender.entityId, receiver.entityId, remaining));
00563 read = octetsToNextHeader;
00564 if (!encode_reader_submessage(receiver, replacements, crypto, plain,
00565 sender_drch, submessage_start, msgId)) {
00566 ok = false;
00567 }
00568 break;
00569 }
00570 default:
00571 break;
00572 }
00573
00574 if (!ok || (octetsToNextHeader == 0 && msgId != RTPS::PAD
00575 && msgId != RTPS::INFO_TS)) {
00576 break;
00577 }
00578
00579 if (octetsToNextHeader > read) {
00580 if (!ser.skip(octetsToNextHeader - read)) {
00581 ok = false;
00582 }
00583 }
00584 }
00585
00586 if (!ok || replacements.empty()) {
00587 return 0;
00588 }
00589
00590
00591
00592 return replace_chunks(plain, replacements);
00593 }
00594
00595 ACE_Message_Block*
00596 RtpsUdpSendStrategy::replace_chunks(const ACE_Message_Block* plain,
00597 const OPENDDS_VECTOR(Chunk)& replacements)
00598 {
00599 unsigned int out_size = plain->total_length();
00600 for (size_t i = 0; i < replacements.size(); ++i) {
00601 out_size += replacements[i].encoded_.length();
00602 out_size -= replacements[i].length_;
00603 }
00604
00605 Message_Block_Ptr in(plain->duplicate());
00606 ACE_Message_Block* cur = in.get();
00607 Message_Block_Ptr out(new ACE_Message_Block(out_size));
00608 for (size_t i = 0; i < replacements.size(); ++i) {
00609 const Chunk& c = replacements[i];
00610 for (; cur && (c.start_ < cur->rd_ptr() || c.start_ >= cur->wr_ptr());
00611 cur = cur->cont()) {
00612 out->copy(cur->rd_ptr(), cur->length());
00613 }
00614 if (!cur) {
00615 return 0;
00616 }
00617
00618 const size_t prefix = c.start_ - cur->rd_ptr();
00619 out->copy(cur->rd_ptr(), prefix);
00620 cur->rd_ptr(prefix);
00621
00622 out->copy(reinterpret_cast<const char*>(c.encoded_.get_buffer()),
00623 c.encoded_.length());
00624 for (size_t n = c.length_; n; cur = cur->cont()) {
00625 if (cur->length() > n) {
00626 cur->rd_ptr(n);
00627 break;
00628 } else {
00629 n -= cur->length();
00630 }
00631 }
00632 }
00633
00634 for (; cur; cur = cur->cont()) {
00635 out->copy(cur->rd_ptr(), cur->length());
00636 }
00637
00638 return out.release();
00639 }
00640 #endif
00641
00642 void
00643 RtpsUdpSendStrategy::stop_i()
00644 {
00645 }
00646
00647 }
00648 }
00649
00650 OPENDDS_END_VERSIONED_NAMESPACE_DECL