00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsUdpReceiveStrategy.h"
00009 #include "RtpsUdpDataLink.h"
00010 #include "RtpsUdpInst.h"
00011 #include "RtpsUdpTransport.h"
00012
00013 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00014 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00015 #include "dds/DCPS/RTPS/MessageTypes.h"
00016
00017 #include "ace/Reactor.h"
00018
00019 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 namespace OpenDDS {
00022 namespace DCPS {
00023
00024 RtpsUdpReceiveStrategy::RtpsUdpReceiveStrategy(RtpsUdpDataLink* link, const GuidPrefix_t& local_prefix)
00025 : link_(link)
00026 , last_received_()
00027 , recvd_sample_(0)
00028 , receiver_(local_prefix)
00029 #if defined(OPENDDS_SECURITY)
00030 , secure_sample_(0)
00031 #endif
00032 {
00033 #if defined(OPENDDS_SECURITY)
00034 secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00035 #endif
00036 }
00037
00038 int
00039 RtpsUdpReceiveStrategy::handle_input(ACE_HANDLE fd)
00040 {
00041 return handle_dds_input(fd);
00042 }
00043
00044 ssize_t
00045 RtpsUdpReceiveStrategy::receive_bytes(iovec iov[],
00046 int n,
00047 ACE_INET_Addr& remote_address,
00048 ACE_HANDLE fd)
00049 {
00050 const ACE_SOCK_Dgram& socket =
00051 (fd == link_->unicast_socket().get_handle())
00052 ? link_->unicast_socket() : link_->multicast_socket();
00053 #ifdef ACE_LACKS_SENDMSG
00054 char buffer[0x10000];
00055 ssize_t scatter = socket.recv(buffer, sizeof buffer, remote_address);
00056 char* iter = buffer;
00057 for (int i = 0; scatter > 0 && i < n; ++i) {
00058 const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
00059 static_cast<size_t>(scatter));
00060 std::memcpy(iov[i].iov_base, iter, chunk);
00061 scatter -= chunk;
00062 iter += chunk;
00063 }
00064 const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer);
00065 #else
00066 const ssize_t ret = socket.recv(iov, n, remote_address);
00067 #endif
00068 remote_address_ = remote_address;
00069 return ret;
00070 }
00071
00072 void
00073 RtpsUdpReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00074 const ACE_INET_Addr& )
00075 {
00076 using namespace RTPS;
00077
00078 if (std::memcmp(receiver_.dest_guid_prefix_, link_->local_prefix(),
00079 sizeof(GuidPrefix_t))) {
00080
00081 return;
00082 }
00083
00084 const RtpsSampleHeader& rsh = received_sample_header();
00085
00086 #if defined(OPENDDS_SECURITY)
00087 const SubmessageKind kind = rsh.submessage_._d();
00088
00089 if ((secure_prefix_.smHeader.submessageId == SRTPS_PREFIX
00090 && kind != SRTPS_POSTFIX) ||
00091 (secure_prefix_.smHeader.submessageId == SEC_PREFIX
00092 && kind != SEC_POSTFIX)) {
00093
00094 secure_submessages_.push_back(rsh.submessage_);
00095 if (kind == DATA) {
00096
00097
00098 secure_sample_ = sample;
00099 }
00100 return;
00101 }
00102 #endif
00103
00104 deliver_sample_i(sample, rsh.submessage_);
00105 }
00106
00107 void
00108 RtpsUdpReceiveStrategy::deliver_sample_i(ReceivedDataSample& sample,
00109 const RTPS::Submessage& submessage)
00110 {
00111 using namespace RTPS;
00112 const SubmessageKind kind = submessage._d();
00113
00114 switch (kind) {
00115 case INFO_SRC:
00116 case INFO_REPLY_IP4:
00117 case INFO_DST:
00118 case INFO_REPLY:
00119 case INFO_TS:
00120
00121
00122 break;
00123
00124 case DATA: {
00125 receiver_.fill_header(sample.header_);
00126 const DataSubmessage& data = submessage.data_sm();
00127 recvd_sample_ = &sample;
00128 readers_selected_.clear();
00129 readers_withheld_.clear();
00130
00131
00132
00133 link_->received(data, receiver_.source_guid_prefix_);
00134 recvd_sample_ = 0;
00135
00136 if (data.readerId != ENTITYID_UNKNOWN) {
00137 RepoId reader;
00138 std::memcpy(reader.guidPrefix, link_->local_prefix(),
00139 sizeof(GuidPrefix_t));
00140 reader.entityId = data.readerId;
00141 if (!readers_withheld_.count(reader)) {
00142 if (Transport_debug_level > 5) {
00143 GuidConverter reader_conv(reader);
00144 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q to reader %C\n", this,
00145 sample.header_.sequence_.getValue(),
00146 OPENDDS_STRING(reader_conv).c_str()));
00147 }
00148 #if defined(OPENDDS_SECURITY)
00149 if (decode_payload(sample, data)) {
00150 link_->data_received(sample, reader);
00151 }
00152 #else
00153 link_->data_received(sample, reader);
00154 #endif
00155
00156 }
00157
00158 } else {
00159 if (Transport_debug_level > 5) {
00160 OPENDDS_STRING included_ids;
00161 bool first = true;
00162 RepoIdSet::iterator iter = readers_selected_.begin();
00163 while(iter != readers_selected_.end()) {
00164 included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00165 first = false;
00166 ++iter;
00167 }
00168 OPENDDS_STRING excluded_ids;
00169 first = true;
00170 RepoIdSet::iterator iter2 = this->readers_withheld_.begin();
00171 while(iter2 != readers_withheld_.end()) {
00172 excluded_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter2));
00173 first = false;
00174 ++iter2;
00175 }
00176 ACE_DEBUG((LM_DEBUG, "(%P|%t) - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_selected ids:\n%C\n", this, included_ids.c_str()));
00177 ACE_DEBUG((LM_DEBUG, "(%P|%t) - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_withheld ids:\n%C\n", this, excluded_ids.c_str()));
00178 }
00179
00180 if (readers_withheld_.empty() && readers_selected_.empty()) {
00181 if (Transport_debug_level > 5) {
00182 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n", this,
00183 sample.header_.sequence_.getValue()));
00184 }
00185
00186 #if defined(OPENDDS_SECURITY)
00187 if (decode_payload(sample, data)) {
00188 link_->data_received(sample);
00189 }
00190 #else
00191 link_->data_received(sample);
00192 #endif
00193
00194 } else {
00195 if (Transport_debug_level > 5) {
00196 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received_include for seq: %q to readers_selected_\n", this,
00197 sample.header_.sequence_.getValue()));
00198 }
00199
00200 #if defined(OPENDDS_SECURITY)
00201 if (decode_payload(sample, data)) {
00202 link_->data_received_include(sample, readers_selected_);
00203 }
00204 #else
00205 link_->data_received_include(sample, readers_selected_);
00206 #endif
00207
00208 }
00209 }
00210 break;
00211 }
00212 case GAP:
00213 link_->received(submessage.gap_sm(), receiver_.source_guid_prefix_);
00214 break;
00215
00216 case HEARTBEAT:
00217 link_->received(submessage.heartbeat_sm(),
00218 receiver_.source_guid_prefix_);
00219 if (submessage.heartbeat_sm().smHeader.flags & FLAG_L) {
00220
00221 sample.header_.message_id_ = DATAWRITER_LIVELINESS;
00222 receiver_.fill_header(sample.header_);
00223 sample.header_.publication_id_.entityId = submessage.heartbeat_sm().writerId;
00224 link_->data_received(sample);
00225 }
00226 break;
00227
00228 case ACKNACK:
00229 link_->received(submessage.acknack_sm(),
00230 receiver_.source_guid_prefix_);
00231 break;
00232
00233 case HEARTBEAT_FRAG:
00234 link_->received(submessage.hb_frag_sm(),
00235 receiver_.source_guid_prefix_);
00236 break;
00237
00238 case NACK_FRAG:
00239 link_->received(submessage.nack_frag_sm(),
00240 receiver_.source_guid_prefix_);
00241 break;
00242
00243
00244
00245
00246
00247 #if defined(OPENDDS_SECURITY)
00248 case SRTPS_PREFIX:
00249 case SEC_PREFIX:
00250 secure_prefix_ = submessage.security_sm();
00251 break;
00252
00253 case SRTPS_POSTFIX:
00254 secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00255 secure_sample_ = ReceivedDataSample(0);
00256 ACE_ERROR((LM_ERROR, "ERROR: RtpsUdpReceiveStrategy SRTPS unsupported.\n"));
00257 break;
00258
00259 case SEC_POSTFIX:
00260 deliver_from_secure(submessage);
00261 break;
00262 #endif
00263
00264 default:
00265 break;
00266 }
00267 }
00268
00269 #if defined(OPENDDS_SECURITY)
00270 void
00271 RtpsUdpReceiveStrategy::deliver_from_secure(const RTPS::Submessage& submessage)
00272 {
00273 using namespace DDS::Security;
00274 const ParticipantCryptoHandle local_pch = link_->local_crypto_handle();
00275
00276 RepoId peer;
00277 RTPS::assign(peer.guidPrefix, receiver_.source_guid_prefix_);
00278 peer.entityId = ENTITYID_PARTICIPANT;
00279 const ParticipantCryptoHandle peer_pch = link_->peer_crypto_handle(peer);
00280
00281 CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
00282
00283 DDS::OctetSeq encoded_submsg, plain_submsg;
00284 sec_submsg_to_octets(encoded_submsg, submessage);
00285 secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00286 secure_sample_ = ReceivedDataSample(0);
00287
00288 if (local_pch == DDS::HANDLE_NIL || !crypto) {
00289 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpReceiveStrategy SEC_POSTFIX "
00290 "precondition unmet %d %@\n", local_pch, crypto.in()));
00291 return;
00292 }
00293
00294 if (peer_pch == DDS::HANDLE_NIL) {
00295 VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy SEC_POSTFIX "
00296 "no crypto handle for %C\n",
00297 OPENDDS_STRING(GuidConverter(peer)).c_str()), 2);
00298 return;
00299 }
00300
00301 DatawriterCryptoHandle dwch = DDS::HANDLE_NIL;
00302 DatareaderCryptoHandle drch = DDS::HANDLE_NIL;
00303 SecureSubmessageCategory_t category = INFO_SUBMESSAGE;
00304 SecurityException ex = {"", 0, 0};
00305
00306 bool ok = crypto->preprocess_secure_submsg(dwch, drch, category, encoded_submsg,
00307 local_pch, peer_pch, ex);
00308
00309 if (ok && category == DATAWRITER_SUBMESSAGE) {
00310 ok = crypto->decode_datawriter_submessage(plain_submsg, encoded_submsg,
00311 drch, dwch, ex);
00312
00313 } else if (ok && category == DATAREADER_SUBMESSAGE) {
00314 ok = crypto->decode_datareader_submessage(plain_submsg, encoded_submsg,
00315 dwch, drch, ex);
00316
00317 } else if (ok && category == INFO_SUBMESSAGE) {
00318 return;
00319
00320 } else {
00321 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy: "
00322 "preprocess_secure_submsg failed RPCH %d, [%d.%d]: %C\n",
00323 peer_pch, ex.code, ex.minor_code, ex.message.in()));
00324 return;
00325 }
00326
00327 if (!ok) {
00328 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy: "
00329 "decode_datawriter/reader_submessage failed [%d.%d]: %C\n",
00330 ex.code, ex.minor_code, ex.message.in()));
00331 return;
00332 }
00333
00334 ACE_Message_Block mb(plain_submsg.length());
00335 mb.copy(reinterpret_cast<const char*>(plain_submsg.get_buffer()), mb.size());
00336
00337 if (Transport_debug_level > 5) {
00338 ACE_HEX_DUMP((LM_DEBUG, mb.rd_ptr(), mb.length(),
00339 category == DATAWRITER_SUBMESSAGE ?
00340 "RtpsUdpReceiveStrategy: decoded writer submessage" :
00341 "RtpsUdpReceiveStrategy: decoded reader submessage"));
00342 }
00343
00344 RtpsSampleHeader rsh(mb);
00345 if (check_header(rsh)) {
00346 ReceivedDataSample plain_sample(mb.duplicate());
00347 if (rsh.into_received_data_sample(plain_sample)) {
00348 deliver_sample_i(plain_sample, rsh.submessage_);
00349 }
00350 }
00351 }
00352
00353 void
00354 RtpsUdpReceiveStrategy::sec_submsg_to_octets(DDS::OctetSeq& encoded,
00355 const RTPS::Submessage& postfix)
00356 {
00357 size_t size = 0, padding = 0;
00358 gen_find_size(secure_prefix_, size, padding);
00359
00360 for (size_t i = 0; i < secure_submessages_.size(); ++i) {
00361 gen_find_size(secure_submessages_[i], size, padding);
00362 const RTPS::SubmessageKind kind = secure_submessages_[i]._d();
00363 if (kind == RTPS::DATA || kind == RTPS::DATA_FRAG) {
00364 size += secure_sample_.sample_->size();
00365 }
00366 if ((size + padding) % 4) {
00367 padding += 4 - ((size + padding) % 4);
00368 }
00369 }
00370 gen_find_size(postfix, size, padding);
00371
00372 ACE_Message_Block mb(size + padding);
00373 Serializer ser(&mb, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00374 ser << secure_prefix_;
00375 ser.align_r(4);
00376
00377 for (size_t i = 0; i < secure_submessages_.size(); ++i) {
00378 ser << secure_submessages_[i];
00379 const RTPS::SubmessageKind kind = secure_submessages_[i]._d();
00380 if (kind == RTPS::DATA || kind == RTPS::DATA_FRAG) {
00381 const CORBA::Octet* sample_bytes =
00382 reinterpret_cast<const CORBA::Octet*>(secure_sample_.sample_->rd_ptr());
00383 ser.write_octet_array(sample_bytes, secure_sample_.sample_->length());
00384 }
00385 ser.align_r(4);
00386 }
00387 ser << postfix;
00388
00389 encoded.length(mb.length());
00390 std::memcpy(encoded.get_buffer(), mb.rd_ptr(), mb.length());
00391 secure_submessages_.resize(0);
00392 }
00393
00394 bool RtpsUdpReceiveStrategy::decode_payload(ReceivedDataSample& sample,
00395 const RTPS::DataSubmessage& submsg)
00396 {
00397 const DDS::Security::DatawriterCryptoHandle writer_crypto_handle =
00398 link_->writer_crypto_handle(sample.header_.publication_id_);
00399 DDS::Security::CryptoTransform_var crypto =
00400 link_->security_config()->get_crypto_transform();
00401
00402 if (writer_crypto_handle == DDS::HANDLE_NIL || !crypto) {
00403 return true;
00404 }
00405
00406 DDS::OctetSeq encoded, plain, iQos;
00407 encoded.length(sample.sample_->total_length());
00408 ACE_Message_Block* mb(sample.sample_.get());
00409 for (CORBA::ULong i = 0; mb; mb = mb->cont()) {
00410 std::memcpy(encoded.get_buffer() + i, mb->rd_ptr(), mb->length());
00411 i += mb->length();
00412 }
00413
00414 size_t iQosSize = 0, iQosPadding = 0;
00415 gen_find_size(submsg.inlineQos, iQosSize, iQosPadding);
00416 iQos.length(iQosSize + iQosPadding);
00417 const char* iQos_raw = reinterpret_cast<const char*>(iQos.get_buffer());
00418 ACE_Message_Block iQosMb(iQos_raw, iQos.length());
00419 Serializer ser(&iQosMb, ACE_CDR_BYTE_ORDER != (submsg.smHeader.flags & 1),
00420 Serializer::ALIGN_CDR);
00421 ser << submsg.inlineQos;
00422
00423 DDS::Security::SecurityException ex = {"", 0, 0};
00424
00425
00426
00427 const bool ok = crypto->decode_serialized_payload(plain, encoded, iQos,
00428 DDS::HANDLE_NIL,
00429 writer_crypto_handle, ex);
00430 if (ok) {
00431 const unsigned int n = plain.length();
00432 if (encoded.length() == n &&
00433 0 == std::memcmp(plain.get_buffer(), encoded.get_buffer(), n)) {
00434 return true;
00435 }
00436
00437
00438
00439 sample.sample_.reset(new ACE_Message_Block(n));
00440 const char* buffer_raw = reinterpret_cast<const char*>(plain.get_buffer());
00441 sample.sample_->copy(buffer_raw, n);
00442
00443 if (n > 1) {
00444 sample.header_.byte_order_ = RtpsSampleHeader::payload_byte_order(sample);
00445 }
00446
00447 } else {
00448 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy: "
00449 "decode_serialized_payload failed [%d.%d]: %C\n",
00450 ex.code, ex.minor_code, ex.message.in()));
00451 }
00452
00453 return ok;
00454 }
00455 #endif
00456
00457 int
00458 RtpsUdpReceiveStrategy::start_i()
00459 {
00460 ACE_Reactor* reactor = link_->get_reactor();
00461 if (reactor == 0) {
00462 ACE_ERROR_RETURN((LM_ERROR,
00463 ACE_TEXT("(%P|%t) ERROR: ")
00464 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00465 ACE_TEXT("NULL reactor reference!\n")),
00466 -1);
00467 }
00468
00469 #ifdef ACE_WIN32
00470
00471
00472
00473
00474
00475 BOOL recv_udp_connreset = FALSE;
00476 link_->unicast_socket().control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00477 #endif
00478
00479 if (reactor->register_handler(link_->unicast_socket().get_handle(), this,
00480 ACE_Event_Handler::READ_MASK) != 0) {
00481 ACE_ERROR_RETURN((LM_ERROR,
00482 ACE_TEXT("(%P|%t) ERROR: ")
00483 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00484 ACE_TEXT("failed to register handler for unicast ")
00485 ACE_TEXT("socket %d\n"),
00486 link_->unicast_socket().get_handle()),
00487 -1);
00488 }
00489
00490 if (link_->config().use_multicast_) {
00491 if (reactor->register_handler(link_->multicast_socket().get_handle(), this,
00492 ACE_Event_Handler::READ_MASK) != 0) {
00493 ACE_ERROR_RETURN((LM_ERROR,
00494 ACE_TEXT("(%P|%t) ERROR: ")
00495 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00496 ACE_TEXT("failed to register handler for multicast\n")),
00497 -1);
00498 }
00499 }
00500
00501 return 0;
00502 }
00503
00504 void
00505 RtpsUdpReceiveStrategy::stop_i()
00506 {
00507 ACE_Reactor* reactor = link_->get_reactor();
00508 if (reactor == 0) {
00509 ACE_ERROR((LM_ERROR,
00510 ACE_TEXT("(%P|%t) ERROR: ")
00511 ACE_TEXT("RtpsUdpReceiveStrategy::stop_i: ")
00512 ACE_TEXT("NULL reactor reference!\n")));
00513 return;
00514 }
00515
00516 reactor->remove_handler(link_->unicast_socket().get_handle(),
00517 ACE_Event_Handler::READ_MASK);
00518
00519 if (link_->config().use_multicast_) {
00520 reactor->remove_handler(link_->multicast_socket().get_handle(),
00521 ACE_Event_Handler::READ_MASK);
00522 }
00523 }
00524
00525 bool
00526 RtpsUdpReceiveStrategy::check_header(const RtpsTransportHeader& header)
00527 {
00528 receiver_.reset(remote_address_, header.header_);
00529
00530 #if defined(OPENDDS_SECURITY)
00531 secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00532 #endif
00533
00534 return header.valid();
00535 }
00536
00537 bool
00538 RtpsUdpReceiveStrategy::check_header(const RtpsSampleHeader& header)
00539 {
00540
00541 #if defined(OPENDDS_SECURITY)
00542 if (secure_prefix_.smHeader.submessageId) {
00543 return header.valid();
00544 }
00545 #endif
00546
00547 receiver_.submsg(header.submessage_);
00548
00549
00550 if (header.valid() && header.submessage_._d() == RTPS::DATA_FRAG) {
00551 const RTPS::DataFragSubmessage& rtps = header.submessage_.data_frag_sm();
00552 frags_.first = rtps.fragmentStartingNum.value;
00553 frags_.second = frags_.first + (rtps.fragmentsInSubmessage - 1);
00554 }
00555
00556 return header.valid();
00557 }
00558
00559 const ReceivedDataSample*
00560 RtpsUdpReceiveStrategy::withhold_data_from(const RepoId& sub_id)
00561 {
00562 readers_withheld_.insert(sub_id);
00563 return recvd_sample_;
00564 }
00565
00566 void
00567 RtpsUdpReceiveStrategy::do_not_withhold_data_from(const RepoId& sub_id)
00568 {
00569 readers_selected_.insert(sub_id);
00570 }
00571
00572 bool
00573 RtpsUdpReceiveStrategy::reassemble(ReceivedDataSample& data)
00574 {
00575 using namespace RTPS;
00576 receiver_.fill_header(data.header_);
00577 if (reassembly_.reassemble(frags_, data)) {
00578
00579
00580
00581
00582
00583
00584
00585 data.header_.byte_order_ = data.sample_->rd_ptr()[1] & FLAG_E;
00586
00587 RtpsSampleHeader& rsh = received_sample_header();
00588 const DataFragSubmessage& dfsm = rsh.submessage_.data_frag_sm();
00589
00590 const CORBA::Octet data_flags = (data.header_.byte_order_ ? FLAG_E : 0)
00591 | (data.header_.key_fields_only_ ? FLAG_K_IN_DATA : FLAG_D);
00592 const DataSubmessage dsm = {
00593 {DATA, data_flags, 0}, 0, DATA_OCTETS_TO_IQOS,
00594 dfsm.readerId, dfsm.writerId, dfsm.writerSN, ParameterList()};
00595 rsh.submessage_.data_sm(dsm);
00596 return true;
00597 }
00598 return false;
00599 }
00600
00601 bool
00602 RtpsUdpReceiveStrategy::remove_frags_from_bitmap(CORBA::Long bitmap[],
00603 CORBA::ULong num_bits,
00604 const SequenceNumber& base,
00605 const RepoId& pub_id)
00606 {
00607 bool modified = false;
00608 for (CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) {
00609 if (bit == 32) bit = 0;
00610
00611 if (bit == 0) {
00612 x = static_cast<CORBA::ULong>(bitmap[i / 32]);
00613 if (x == 0) {
00614
00615 i += 31;
00616 bit = 31;
00617
00618
00619
00620 continue;
00621 }
00622 }
00623
00624 const CORBA::ULong mask = 1 << (31 - bit);
00625 if ((x & mask) && reassembly_.has_frags(base + i, pub_id)) {
00626 x &= ~mask;
00627 bitmap[i / 32] = x;
00628 modified = true;
00629 }
00630 }
00631 return modified;
00632 }
00633
00634 void
00635 RtpsUdpReceiveStrategy::remove_fragments(const SequenceRange& range,
00636 const RepoId& pub_id)
00637 {
00638 for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00639 reassembly_.data_unavailable(sn, pub_id);
00640 }
00641 }
00642
00643 bool
00644 RtpsUdpReceiveStrategy::has_fragments(const SequenceRange& range,
00645 const RepoId& pub_id,
00646 FragmentInfo* frag_info)
00647 {
00648 for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00649 if (reassembly_.has_frags(sn, pub_id)) {
00650 if (frag_info) {
00651 std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
00652 p.first = sn;
00653 frag_info->push_back(p);
00654 RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
00655 missing_frags.bitmap.length(8);
00656 missing_frags.bitmapBase.value =
00657 reassembly_.get_gaps(sn, pub_id, missing_frags.bitmap.get_buffer(),
00658 8, missing_frags.numBits);
00659
00660 missing_frags.bitmap.length((missing_frags.numBits + 31) / 32);
00661 } else {
00662 return true;
00663 }
00664 }
00665 }
00666 return frag_info ? !frag_info->empty() : false;
00667 }
00668
00669
00670
00671
00672 RtpsUdpReceiveStrategy::MessageReceiver::MessageReceiver(const GuidPrefix_t& local)
00673 : have_timestamp_(false)
00674 {
00675 RTPS::assign(local_, local);
00676 source_version_.major = source_version_.minor = 0;
00677 source_vendor_.vendorId[0] = source_vendor_.vendorId[1] = 0;
00678 for (size_t i = 0; i < sizeof(GuidPrefix_t); ++i) {
00679 source_guid_prefix_[i] = 0;
00680 dest_guid_prefix_[i] = 0;
00681 }
00682 timestamp_.seconds = 0;
00683 timestamp_.fraction = 0;
00684 }
00685
00686 void
00687 RtpsUdpReceiveStrategy::MessageReceiver::reset(const ACE_INET_Addr& addr,
00688 const RTPS::Header& hdr)
00689 {
00690 using namespace RTPS;
00691
00692 source_version_ = hdr.version;
00693 source_vendor_ = hdr.vendorId;
00694
00695 assign(source_guid_prefix_, hdr.guidPrefix);
00696 assign(dest_guid_prefix_, local_);
00697
00698 unicast_reply_locator_list_.length(1);
00699 unicast_reply_locator_list_[0].kind = address_to_kind(addr);
00700 unicast_reply_locator_list_[0].port = LOCATOR_PORT_INVALID;
00701 RTPS::address_to_bytes(unicast_reply_locator_list_[0].address, addr);
00702
00703 multicast_reply_locator_list_.length(1);
00704 multicast_reply_locator_list_[0].kind = address_to_kind(addr);
00705 multicast_reply_locator_list_[0].port = LOCATOR_PORT_INVALID;
00706 assign(multicast_reply_locator_list_[0].address, LOCATOR_ADDRESS_INVALID);
00707
00708 have_timestamp_ = false;
00709 timestamp_ = TIME_INVALID;
00710 }
00711
00712 void
00713 RtpsUdpReceiveStrategy::MessageReceiver::submsg(const RTPS::Submessage& s)
00714 {
00715 using namespace RTPS;
00716
00717 switch (s._d()) {
00718 case INFO_TS:
00719 submsg(s.info_ts_sm());
00720 break;
00721
00722 case INFO_SRC:
00723 submsg(s.info_src_sm());
00724 break;
00725
00726 case INFO_REPLY_IP4:
00727 submsg(s.info_reply_ipv4_sm());
00728 break;
00729
00730 case INFO_DST:
00731 submsg(s.info_dst_sm());
00732 break;
00733
00734 case INFO_REPLY:
00735 submsg(s.info_reply_sm());
00736 break;
00737
00738 default:
00739 break;
00740 }
00741 }
00742
00743 void
00744 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00745 const RTPS::InfoDestinationSubmessage& id)
00746 {
00747
00748 for (size_t i = 0; i < sizeof(GuidPrefix_t); ++i) {
00749 if (id.guidPrefix[i]) {
00750 RTPS::assign(dest_guid_prefix_, id.guidPrefix);
00751 return;
00752 }
00753 }
00754 RTPS::assign(dest_guid_prefix_, local_);
00755 }
00756
00757 void
00758 RtpsUdpReceiveStrategy::MessageReceiver::submsg(const RTPS::InfoReplySubmessage& ir)
00759 {
00760
00761 unicast_reply_locator_list_.length(ir.unicastLocatorList.length());
00762 for (CORBA::ULong i = 0; i < ir.unicastLocatorList.length(); ++i) {
00763 unicast_reply_locator_list_[i] = ir.unicastLocatorList[i];
00764 }
00765
00766 if (ir.smHeader.flags & 2 ) {
00767 multicast_reply_locator_list_.length(ir.multicastLocatorList.length());
00768 for (CORBA::ULong i = 0; i < ir.multicastLocatorList.length(); ++i) {
00769 multicast_reply_locator_list_[i] = ir.multicastLocatorList[i];
00770 }
00771
00772 } else {
00773 multicast_reply_locator_list_.length(0);
00774 }
00775 }
00776
00777 void
00778 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00779 const RTPS::InfoReplyIp4Submessage& iri4)
00780 {
00781
00782 unicast_reply_locator_list_.length(1);
00783 unicast_reply_locator_list_[0].kind = RTPS::LOCATOR_KIND_UDPv4;
00784 unicast_reply_locator_list_[0].port = iri4.unicastLocator.port;
00785 RTPS::assign(unicast_reply_locator_list_[0].address, iri4.unicastLocator.address);
00786
00787 if (iri4.smHeader.flags & 2 ) {
00788 multicast_reply_locator_list_.length(1);
00789 multicast_reply_locator_list_[0].kind = RTPS::LOCATOR_KIND_UDPv4;
00790 multicast_reply_locator_list_[0].port = iri4.multicastLocator.port;
00791 RTPS::assign(multicast_reply_locator_list_[0].address, iri4.multicastLocator.address);
00792 } else {
00793 multicast_reply_locator_list_.length(0);
00794 }
00795 }
00796
00797 void
00798 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00799 const RTPS::InfoTimestampSubmessage& it)
00800 {
00801
00802 if (!(it.smHeader.flags & 2 )) {
00803 have_timestamp_ = true;
00804 timestamp_ = it.timestamp;
00805 } else {
00806 have_timestamp_ = false;
00807 }
00808 }
00809
00810 void
00811 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00812 const RTPS::InfoSourceSubmessage& is)
00813 {
00814
00815 RTPS::assign(source_guid_prefix_, is.guidPrefix);
00816 source_version_ = is.version;
00817 source_vendor_ = is.vendorId;
00818 unicast_reply_locator_list_.length(1);
00819 unicast_reply_locator_list_[0] = RTPS::LOCATOR_INVALID;
00820 multicast_reply_locator_list_.length(1);
00821 multicast_reply_locator_list_[0] = RTPS::LOCATOR_INVALID;
00822 have_timestamp_ = false;
00823 }
00824
00825 void
00826 RtpsUdpReceiveStrategy::MessageReceiver::fill_header(
00827 DataSampleHeader& header) const
00828 {
00829 using namespace RTPS;
00830 if (have_timestamp_) {
00831 header.source_timestamp_sec_ = timestamp_.seconds;
00832 header.source_timestamp_nanosec_ =
00833 static_cast<ACE_UINT32>(timestamp_.fraction / NANOS_TO_RTPS_FRACS + .5);
00834 }
00835 assign(header.publication_id_.guidPrefix, source_guid_prefix_);
00836 }
00837
00838 }
00839 }
00840
00841 OPENDDS_END_VERSIONED_NAMESPACE_DECL