RtpsUdpReceiveStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsUdpReceiveStrategy.h"
00009 #include "RtpsUdpDataLink.h"
00010 #include "RtpsUdpInst.h"
00011 #include "RtpsUdpTransport.h"
00012 
00013 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00014 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00015 #include "dds/DCPS/RTPS/MessageTypes.h"
00016 
00017 #include "ace/Reactor.h"
00018 
00019 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00020 
00021 namespace OpenDDS {
00022 namespace DCPS {
00023 
00024 RtpsUdpReceiveStrategy::RtpsUdpReceiveStrategy(RtpsUdpDataLink* link, const GuidPrefix_t& local_prefix)
00025   : link_(link)
00026   , last_received_()
00027   , recvd_sample_(0)
00028   , receiver_(local_prefix)
00029 #if defined(OPENDDS_SECURITY)
00030   , secure_sample_(0)
00031 #endif
00032 {
00033 #if defined(OPENDDS_SECURITY)
00034   secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00035 #endif
00036 }
00037 
00038 int
00039 RtpsUdpReceiveStrategy::handle_input(ACE_HANDLE fd)
00040 {
00041   return handle_dds_input(fd);
00042 }
00043 
00044 ssize_t
00045 RtpsUdpReceiveStrategy::receive_bytes(iovec iov[],
00046                                       int n,
00047                                       ACE_INET_Addr& remote_address,
00048                                       ACE_HANDLE fd)
00049 {
00050   const ACE_SOCK_Dgram& socket =
00051     (fd == link_->unicast_socket().get_handle())
00052     ? link_->unicast_socket() : link_->multicast_socket();
00053 #ifdef ACE_LACKS_SENDMSG
00054   char buffer[0x10000];
00055   ssize_t scatter = socket.recv(buffer, sizeof buffer, remote_address);
00056   char* iter = buffer;
00057   for (int i = 0; scatter > 0 && i < n; ++i) {
00058     const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len), // int on LynxOS
00059                                   static_cast<size_t>(scatter));
00060     std::memcpy(iov[i].iov_base, iter, chunk);
00061     scatter -= chunk;
00062     iter += chunk;
00063   }
00064   const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer);
00065 #else
00066   const ssize_t ret = socket.recv(iov, n, remote_address);
00067 #endif
00068   remote_address_ = remote_address;
00069   return ret;
00070 }
00071 
00072 void
00073 RtpsUdpReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00074                                        const ACE_INET_Addr& /*remote_address*/)
00075 {
00076   using namespace RTPS;
00077 
00078   if (std::memcmp(receiver_.dest_guid_prefix_, link_->local_prefix(),
00079                   sizeof(GuidPrefix_t))) {
00080     // Not our message, we may be on multicast listening to all the others.
00081     return;
00082   }
00083 
00084   const RtpsSampleHeader& rsh = received_sample_header();
00085 
00086 #if defined(OPENDDS_SECURITY)
00087   const SubmessageKind kind = rsh.submessage_._d();
00088 
00089   if ((secure_prefix_.smHeader.submessageId == SRTPS_PREFIX
00090        && kind != SRTPS_POSTFIX) ||
00091       (secure_prefix_.smHeader.submessageId == SEC_PREFIX
00092        && kind != SEC_POSTFIX)) {
00093     // secure envelope in progress, defer processing
00094     secure_submessages_.push_back(rsh.submessage_);
00095     if (kind == DATA) {
00096       // SRTPS: once full-message protection is supported, this technique will
00097       // need to be extended to support > 1 data payload (auth. only)
00098       secure_sample_ = sample;
00099     }
00100     return;
00101   }
00102 #endif
00103 
00104   deliver_sample_i(sample, rsh.submessage_);
00105 }
00106 
00107 void
00108 RtpsUdpReceiveStrategy::deliver_sample_i(ReceivedDataSample& sample,
00109                                          const RTPS::Submessage& submessage)
00110 {
00111   using namespace RTPS;
00112   const SubmessageKind kind = submessage._d();
00113 
00114   switch (kind) {
00115   case INFO_SRC:
00116   case INFO_REPLY_IP4:
00117   case INFO_DST:
00118   case INFO_REPLY:
00119   case INFO_TS:
00120     // No-op: the INFO_* submessages only modify the state of the
00121     // MessageReceiver (see check_header()), they are not passed up to DCPS.
00122     break;
00123 
00124   case DATA: {
00125     receiver_.fill_header(sample.header_);
00126     const DataSubmessage& data = submessage.data_sm();
00127     recvd_sample_ = &sample;
00128     readers_selected_.clear();
00129     readers_withheld_.clear();
00130     // If this sample should be withheld from some readers in order to maintain
00131     // in-order delivery, link_->received() will add it to readers_withheld_ otherwise
00132     // it will be added to readers_selected_
00133     link_->received(data, receiver_.source_guid_prefix_);
00134     recvd_sample_ = 0;
00135 
00136     if (data.readerId != ENTITYID_UNKNOWN) {
00137       RepoId reader;
00138       std::memcpy(reader.guidPrefix, link_->local_prefix(),
00139                   sizeof(GuidPrefix_t));
00140       reader.entityId = data.readerId;
00141       if (!readers_withheld_.count(reader)) {
00142         if (Transport_debug_level > 5) {
00143           GuidConverter reader_conv(reader);
00144           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q to reader %C\n", this,
00145                                sample.header_.sequence_.getValue(),
00146                                OPENDDS_STRING(reader_conv).c_str()));
00147         }
00148 #if defined(OPENDDS_SECURITY)
00149         if (decode_payload(sample, data)) {
00150           link_->data_received(sample, reader);
00151         }
00152 #else
00153         link_->data_received(sample, reader);
00154 #endif
00155 
00156       }
00157 
00158     } else {
00159       if (Transport_debug_level > 5) {
00160         OPENDDS_STRING included_ids;
00161         bool first = true;
00162         RepoIdSet::iterator iter = readers_selected_.begin();
00163         while(iter != readers_selected_.end()) {
00164           included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00165           first = false;
00166           ++iter;
00167         }
00168         OPENDDS_STRING excluded_ids;
00169         first = true;
00170         RepoIdSet::iterator iter2 = this->readers_withheld_.begin();
00171         while(iter2 != readers_withheld_.end()) {
00172             excluded_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter2));
00173           first = false;
00174           ++iter2;
00175         }
00176         ACE_DEBUG((LM_DEBUG, "(%P|%t)  - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_selected ids:\n%C\n", this, included_ids.c_str()));
00177         ACE_DEBUG((LM_DEBUG, "(%P|%t)  - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_withheld ids:\n%C\n", this, excluded_ids.c_str()));
00178       }
00179 
00180       if (readers_withheld_.empty() && readers_selected_.empty()) {
00181         if (Transport_debug_level > 5) {
00182           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n", this,
00183                                sample.header_.sequence_.getValue()));
00184         }
00185 
00186 #if defined(OPENDDS_SECURITY)
00187         if (decode_payload(sample, data)) {
00188           link_->data_received(sample);
00189         }
00190 #else
00191         link_->data_received(sample);
00192 #endif
00193 
00194       } else {
00195         if (Transport_debug_level > 5) {
00196           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received_include for seq: %q to readers_selected_\n", this,
00197                                sample.header_.sequence_.getValue()));
00198         }
00199 
00200 #if defined(OPENDDS_SECURITY)
00201         if (decode_payload(sample, data)) {
00202           link_->data_received_include(sample, readers_selected_);
00203         }
00204 #else
00205         link_->data_received_include(sample, readers_selected_);
00206 #endif
00207 
00208       }
00209     }
00210     break;
00211   }
00212   case GAP:
00213     link_->received(submessage.gap_sm(), receiver_.source_guid_prefix_);
00214     break;
00215 
00216   case HEARTBEAT:
00217     link_->received(submessage.heartbeat_sm(),
00218                     receiver_.source_guid_prefix_);
00219     if (submessage.heartbeat_sm().smHeader.flags & FLAG_L) {
00220       // Liveliness has been asserted.  Create a DATAWRITER_LIVELINESS message.
00221       sample.header_.message_id_ = DATAWRITER_LIVELINESS;
00222       receiver_.fill_header(sample.header_);
00223       sample.header_.publication_id_.entityId = submessage.heartbeat_sm().writerId;
00224       link_->data_received(sample);
00225     }
00226     break;
00227 
00228   case ACKNACK:
00229     link_->received(submessage.acknack_sm(),
00230                     receiver_.source_guid_prefix_);
00231     break;
00232 
00233   case HEARTBEAT_FRAG:
00234     link_->received(submessage.hb_frag_sm(),
00235                     receiver_.source_guid_prefix_);
00236     break;
00237 
00238   case NACK_FRAG:
00239     link_->received(submessage.nack_frag_sm(),
00240                     receiver_.source_guid_prefix_);
00241     break;
00242 
00243   /* no case DATA_FRAG: by the time deliver_sample() is called, reassemble()
00244      has successfully reassembled the fragments and we now have a DATA submsg
00245    */
00246 
00247 #if defined(OPENDDS_SECURITY)
00248   case SRTPS_PREFIX:
00249   case SEC_PREFIX:
00250     secure_prefix_ = submessage.security_sm();
00251     break;
00252 
00253   case SRTPS_POSTFIX:
00254     secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00255     secure_sample_ = ReceivedDataSample(0);
00256     ACE_ERROR((LM_ERROR, "ERROR: RtpsUdpReceiveStrategy SRTPS unsupported.\n"));
00257     break;
00258 
00259   case SEC_POSTFIX:
00260     deliver_from_secure(submessage);
00261     break;
00262 #endif
00263 
00264   default:
00265     break;
00266   }
00267 }
00268 
00269 #if defined(OPENDDS_SECURITY)
00270 void
00271 RtpsUdpReceiveStrategy::deliver_from_secure(const RTPS::Submessage& submessage)
00272 {
00273   using namespace DDS::Security;
00274   const ParticipantCryptoHandle local_pch = link_->local_crypto_handle();
00275 
00276   RepoId peer;
00277   RTPS::assign(peer.guidPrefix, receiver_.source_guid_prefix_);
00278   peer.entityId = ENTITYID_PARTICIPANT;
00279   const ParticipantCryptoHandle peer_pch = link_->peer_crypto_handle(peer);
00280 
00281   CryptoTransform_var crypto = link_->security_config()->get_crypto_transform();
00282 
00283   DDS::OctetSeq encoded_submsg, plain_submsg;
00284   sec_submsg_to_octets(encoded_submsg, submessage);
00285   secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00286   secure_sample_ = ReceivedDataSample(0);
00287 
00288   if (local_pch == DDS::HANDLE_NIL || !crypto) {
00289     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpReceiveStrategy SEC_POSTFIX "
00290                "precondition unmet %d %@\n", local_pch, crypto.in()));
00291     return;
00292   }
00293 
00294   if (peer_pch == DDS::HANDLE_NIL) {
00295     VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy SEC_POSTFIX "
00296               "no crypto handle for %C\n",
00297               OPENDDS_STRING(GuidConverter(peer)).c_str()), 2);
00298     return;
00299   }
00300 
00301   DatawriterCryptoHandle dwch = DDS::HANDLE_NIL;
00302   DatareaderCryptoHandle drch = DDS::HANDLE_NIL;
00303   SecureSubmessageCategory_t category = INFO_SUBMESSAGE;
00304   SecurityException ex = {"", 0, 0};
00305 
00306   bool ok = crypto->preprocess_secure_submsg(dwch, drch, category, encoded_submsg,
00307                                              local_pch, peer_pch, ex);
00308 
00309   if (ok && category == DATAWRITER_SUBMESSAGE) {
00310     ok = crypto->decode_datawriter_submessage(plain_submsg, encoded_submsg,
00311                                               drch, dwch, ex);
00312 
00313   } else if (ok && category == DATAREADER_SUBMESSAGE) {
00314     ok = crypto->decode_datareader_submessage(plain_submsg, encoded_submsg,
00315                                               dwch, drch, ex);
00316 
00317   } else if (ok && category == INFO_SUBMESSAGE) {
00318     return;
00319 
00320   } else {
00321     ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy: "
00322                "preprocess_secure_submsg failed RPCH %d, [%d.%d]: %C\n",
00323                peer_pch, ex.code, ex.minor_code, ex.message.in()));
00324     return;
00325   }
00326 
00327   if (!ok) {
00328     ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy: "
00329                "decode_datawriter/reader_submessage failed [%d.%d]: %C\n",
00330                ex.code, ex.minor_code, ex.message.in()));
00331     return;
00332   }
00333 
00334   ACE_Message_Block mb(plain_submsg.length());
00335   mb.copy(reinterpret_cast<const char*>(plain_submsg.get_buffer()), mb.size());
00336 
00337   if (Transport_debug_level > 5) {
00338     ACE_HEX_DUMP((LM_DEBUG, mb.rd_ptr(), mb.length(),
00339                   category == DATAWRITER_SUBMESSAGE ?
00340                   "RtpsUdpReceiveStrategy: decoded writer submessage" :
00341                   "RtpsUdpReceiveStrategy: decoded reader submessage"));
00342   }
00343 
00344   RtpsSampleHeader rsh(mb);
00345   if (check_header(rsh)) {
00346     ReceivedDataSample plain_sample(mb.duplicate());
00347     if (rsh.into_received_data_sample(plain_sample)) {
00348       deliver_sample_i(plain_sample, rsh.submessage_);
00349     }
00350   }
00351 }
00352 
00353 void
00354 RtpsUdpReceiveStrategy::sec_submsg_to_octets(DDS::OctetSeq& encoded,
00355                                              const RTPS::Submessage& postfix)
00356 {
00357   size_t size = 0, padding = 0;
00358   gen_find_size(secure_prefix_, size, padding);
00359 
00360   for (size_t i = 0; i < secure_submessages_.size(); ++i) {
00361     gen_find_size(secure_submessages_[i], size, padding);
00362     const RTPS::SubmessageKind kind = secure_submessages_[i]._d();
00363     if (kind == RTPS::DATA || kind == RTPS::DATA_FRAG) {
00364       size += secure_sample_.sample_->size();
00365     }
00366     if ((size + padding) % 4) {
00367       padding += 4 - ((size + padding) % 4);
00368     }
00369   }
00370   gen_find_size(postfix, size, padding);
00371 
00372   ACE_Message_Block mb(size + padding);
00373   Serializer ser(&mb, ACE_CDR_BYTE_ORDER, Serializer::ALIGN_CDR);
00374   ser << secure_prefix_;
00375   ser.align_r(4);
00376 
00377   for (size_t i = 0; i < secure_submessages_.size(); ++i) {
00378     ser << secure_submessages_[i];
00379     const RTPS::SubmessageKind kind = secure_submessages_[i]._d();
00380     if (kind == RTPS::DATA || kind == RTPS::DATA_FRAG) {
00381       const CORBA::Octet* sample_bytes =
00382         reinterpret_cast<const CORBA::Octet*>(secure_sample_.sample_->rd_ptr());
00383       ser.write_octet_array(sample_bytes, secure_sample_.sample_->length());
00384     }
00385     ser.align_r(4);
00386   }
00387   ser << postfix;
00388 
00389   encoded.length(mb.length());
00390   std::memcpy(encoded.get_buffer(), mb.rd_ptr(), mb.length());
00391   secure_submessages_.resize(0);
00392 }
00393 
00394 bool RtpsUdpReceiveStrategy::decode_payload(ReceivedDataSample& sample,
00395                                             const RTPS::DataSubmessage& submsg)
00396 {
00397   const DDS::Security::DatawriterCryptoHandle writer_crypto_handle =
00398     link_->writer_crypto_handle(sample.header_.publication_id_);
00399   DDS::Security::CryptoTransform_var crypto =
00400     link_->security_config()->get_crypto_transform();
00401 
00402   if (writer_crypto_handle == DDS::HANDLE_NIL || !crypto) {
00403     return true;
00404   }
00405 
00406   DDS::OctetSeq encoded, plain, iQos;
00407   encoded.length(sample.sample_->total_length());
00408   ACE_Message_Block* mb(sample.sample_.get());
00409   for (CORBA::ULong i = 0; mb; mb = mb->cont()) {
00410     std::memcpy(encoded.get_buffer() + i, mb->rd_ptr(), mb->length());
00411     i += mb->length();
00412   }
00413 
00414   size_t iQosSize = 0, iQosPadding = 0;
00415   gen_find_size(submsg.inlineQos, iQosSize, iQosPadding);
00416   iQos.length(iQosSize + iQosPadding);
00417   const char* iQos_raw = reinterpret_cast<const char*>(iQos.get_buffer());
00418   ACE_Message_Block iQosMb(iQos_raw, iQos.length());
00419   Serializer ser(&iQosMb, ACE_CDR_BYTE_ORDER != (submsg.smHeader.flags & 1),
00420                  Serializer::ALIGN_CDR);
00421   ser << submsg.inlineQos;
00422 
00423   DDS::Security::SecurityException ex = {"", 0, 0};
00424   // DDS-Security: since origin authentication for payload is not yet supported
00425   // the reader's crypto handle is NIL here (could be multiple readers in this
00426   // participant)
00427   const bool ok = crypto->decode_serialized_payload(plain, encoded, iQos,
00428                                                     DDS::HANDLE_NIL,
00429                                                     writer_crypto_handle, ex);
00430   if (ok) {
00431     const unsigned int n = plain.length();
00432     if (encoded.length() == n &&
00433         0 == std::memcmp(plain.get_buffer(), encoded.get_buffer(), n)) {
00434       return true;
00435     }
00436 
00437     // The sample.sample_ message block uses the transport's data block so it
00438     // can't be modified in-place, instead replace it with a new block.
00439     sample.sample_.reset(new ACE_Message_Block(n));
00440     const char* buffer_raw = reinterpret_cast<const char*>(plain.get_buffer());
00441     sample.sample_->copy(buffer_raw, n);
00442 
00443     if (n > 1) {
00444       sample.header_.byte_order_ = RtpsSampleHeader::payload_byte_order(sample);
00445     }
00446 
00447   } else {
00448     ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpReceiveStrategy: "
00449                "decode_serialized_payload failed [%d.%d]: %C\n",
00450                ex.code, ex.minor_code, ex.message.in()));
00451   }
00452 
00453   return ok;
00454 }
00455 #endif
00456 
00457 int
00458 RtpsUdpReceiveStrategy::start_i()
00459 {
00460   ACE_Reactor* reactor = link_->get_reactor();
00461   if (reactor == 0) {
00462     ACE_ERROR_RETURN((LM_ERROR,
00463                       ACE_TEXT("(%P|%t) ERROR: ")
00464                       ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00465                       ACE_TEXT("NULL reactor reference!\n")),
00466                      -1);
00467   }
00468 
00469 #ifdef ACE_WIN32
00470   // By default Winsock will cause reads to fail with "connection reset"
00471   // when UDP sends result in ICMP "port unreachable" messages.
00472   // The transport framework is not set up for this since returning <= 0
00473   // from our receive_bytes causes the framework to close down the datalink
00474   // which in this case is used to receive from multiple peers.
00475   BOOL recv_udp_connreset = FALSE;
00476   link_->unicast_socket().control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00477 #endif
00478 
00479   if (reactor->register_handler(link_->unicast_socket().get_handle(), this,
00480                                 ACE_Event_Handler::READ_MASK) != 0) {
00481     ACE_ERROR_RETURN((LM_ERROR,
00482                       ACE_TEXT("(%P|%t) ERROR: ")
00483                       ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00484                       ACE_TEXT("failed to register handler for unicast ")
00485                       ACE_TEXT("socket %d\n"),
00486                       link_->unicast_socket().get_handle()),
00487                      -1);
00488   }
00489 
00490   if (link_->config().use_multicast_) {
00491     if (reactor->register_handler(link_->multicast_socket().get_handle(), this,
00492                                   ACE_Event_Handler::READ_MASK) != 0) {
00493       ACE_ERROR_RETURN((LM_ERROR,
00494                         ACE_TEXT("(%P|%t) ERROR: ")
00495                         ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00496                         ACE_TEXT("failed to register handler for multicast\n")),
00497                        -1);
00498     }
00499   }
00500 
00501   return 0;
00502 }
00503 
00504 void
00505 RtpsUdpReceiveStrategy::stop_i()
00506 {
00507   ACE_Reactor* reactor = link_->get_reactor();
00508   if (reactor == 0) {
00509     ACE_ERROR((LM_ERROR,
00510                ACE_TEXT("(%P|%t) ERROR: ")
00511                ACE_TEXT("RtpsUdpReceiveStrategy::stop_i: ")
00512                ACE_TEXT("NULL reactor reference!\n")));
00513     return;
00514   }
00515 
00516   reactor->remove_handler(link_->unicast_socket().get_handle(),
00517                           ACE_Event_Handler::READ_MASK);
00518 
00519   if (link_->config().use_multicast_) {
00520     reactor->remove_handler(link_->multicast_socket().get_handle(),
00521                             ACE_Event_Handler::READ_MASK);
00522   }
00523 }
00524 
00525 bool
00526 RtpsUdpReceiveStrategy::check_header(const RtpsTransportHeader& header)
00527 {
00528   receiver_.reset(remote_address_, header.header_);
00529 
00530 #if defined(OPENDDS_SECURITY)
00531   secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00532 #endif
00533 
00534   return header.valid();
00535 }
00536 
00537 bool
00538 RtpsUdpReceiveStrategy::check_header(const RtpsSampleHeader& header)
00539 {
00540 
00541 #if defined(OPENDDS_SECURITY)
00542   if (secure_prefix_.smHeader.submessageId) {
00543     return header.valid();
00544   }
00545 #endif
00546 
00547   receiver_.submsg(header.submessage_);
00548 
00549   // save fragmentation details for use in reassemble()
00550   if (header.valid() && header.submessage_._d() == RTPS::DATA_FRAG) {
00551     const RTPS::DataFragSubmessage& rtps = header.submessage_.data_frag_sm();
00552     frags_.first = rtps.fragmentStartingNum.value;
00553     frags_.second = frags_.first + (rtps.fragmentsInSubmessage - 1);
00554   }
00555 
00556   return header.valid();
00557 }
00558 
00559 const ReceivedDataSample*
00560 RtpsUdpReceiveStrategy::withhold_data_from(const RepoId& sub_id)
00561 {
00562   readers_withheld_.insert(sub_id);
00563   return recvd_sample_;
00564 }
00565 
00566 void
00567 RtpsUdpReceiveStrategy::do_not_withhold_data_from(const RepoId& sub_id)
00568 {
00569   readers_selected_.insert(sub_id);
00570 }
00571 
00572 bool
00573 RtpsUdpReceiveStrategy::reassemble(ReceivedDataSample& data)
00574 {
00575   using namespace RTPS;
00576   receiver_.fill_header(data.header_); // set publication_id_.guidPrefix
00577   if (reassembly_.reassemble(frags_, data)) {
00578 
00579     // Reassembly was successful, replace DataFrag with Data.  This doesn't have
00580     // to be a fully-formed DataSubmessage, just enough for this class to use
00581     // in deliver_sample() which ends up calling RtpsUdpDataLink::received().
00582     // In particular we will need the SequenceNumber, but ignore the iQoS.
00583 
00584     // Peek at the byte order from the encapsulation containing the payload.
00585     data.header_.byte_order_ = data.sample_->rd_ptr()[1] & FLAG_E;
00586 
00587     RtpsSampleHeader& rsh = received_sample_header();
00588     const DataFragSubmessage& dfsm = rsh.submessage_.data_frag_sm();
00589 
00590     const CORBA::Octet data_flags = (data.header_.byte_order_ ? FLAG_E : 0)
00591       | (data.header_.key_fields_only_ ? FLAG_K_IN_DATA : FLAG_D);
00592     const DataSubmessage dsm = {
00593       {DATA, data_flags, 0}, 0, DATA_OCTETS_TO_IQOS,
00594       dfsm.readerId, dfsm.writerId, dfsm.writerSN, ParameterList()};
00595     rsh.submessage_.data_sm(dsm);
00596     return true;
00597   }
00598   return false;
00599 }
00600 
00601 bool
00602 RtpsUdpReceiveStrategy::remove_frags_from_bitmap(CORBA::Long bitmap[],
00603                                                  CORBA::ULong num_bits,
00604                                                  const SequenceNumber& base,
00605                                                  const RepoId& pub_id)
00606 {
00607   bool modified = false;
00608   for (CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) {
00609     if (bit == 32) bit = 0;
00610 
00611     if (bit == 0) {
00612       x = static_cast<CORBA::ULong>(bitmap[i / 32]);
00613       if (x == 0) {
00614         // skip an entire Long if it's all 0's (adds 32 due to ++i)
00615         i += 31;
00616         bit = 31;
00617         //FUTURE: this could be generalized with something like the x86 "bsr"
00618         //        instruction using compiler intrinsics, VC++ _BitScanReverse()
00619         //        and GCC __builtin_clz()
00620         continue;
00621       }
00622     }
00623 
00624     const CORBA::ULong mask = 1 << (31 - bit);
00625     if ((x & mask) && reassembly_.has_frags(base + i, pub_id)) {
00626       x &= ~mask;
00627       bitmap[i / 32] = x;
00628       modified = true;
00629     }
00630   }
00631   return modified;
00632 }
00633 
00634 void
00635 RtpsUdpReceiveStrategy::remove_fragments(const SequenceRange& range,
00636                                          const RepoId& pub_id)
00637 {
00638   for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00639     reassembly_.data_unavailable(sn, pub_id);
00640   }
00641 }
00642 
00643 bool
00644 RtpsUdpReceiveStrategy::has_fragments(const SequenceRange& range,
00645                                       const RepoId& pub_id,
00646                                       FragmentInfo* frag_info)
00647 {
00648   for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00649     if (reassembly_.has_frags(sn, pub_id)) {
00650       if (frag_info) {
00651         std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
00652         p.first = sn;
00653         frag_info->push_back(p);
00654         RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
00655         missing_frags.bitmap.length(8); // start at max length
00656         missing_frags.bitmapBase.value =
00657           reassembly_.get_gaps(sn, pub_id, missing_frags.bitmap.get_buffer(),
00658                                8, missing_frags.numBits);
00659         // reduce length in case get_gaps() didn't need all that room
00660         missing_frags.bitmap.length((missing_frags.numBits + 31) / 32);
00661       } else {
00662         return true;
00663       }
00664     }
00665   }
00666   return frag_info ? !frag_info->empty() : false;
00667 }
00668 
00669 
00670 // MessageReceiver nested class
00671 
00672 RtpsUdpReceiveStrategy::MessageReceiver::MessageReceiver(const GuidPrefix_t& local)
00673   : have_timestamp_(false)
00674 {
00675   RTPS::assign(local_, local);
00676   source_version_.major = source_version_.minor = 0;
00677   source_vendor_.vendorId[0] = source_vendor_.vendorId[1] = 0;
00678   for (size_t i = 0; i < sizeof(GuidPrefix_t); ++i) {
00679     source_guid_prefix_[i] = 0;
00680     dest_guid_prefix_[i] = 0;
00681   }
00682   timestamp_.seconds = 0;
00683   timestamp_.fraction = 0;
00684 }
00685 
00686 void
00687 RtpsUdpReceiveStrategy::MessageReceiver::reset(const ACE_INET_Addr& addr,
00688                                                const RTPS::Header& hdr)
00689 {
00690   using namespace RTPS;
00691   // see RTPS spec v2.1 section 8.3.4 table 8.16 and section 8.3.6.4
00692   source_version_ = hdr.version;
00693   source_vendor_ = hdr.vendorId;
00694 
00695   assign(source_guid_prefix_, hdr.guidPrefix);
00696   assign(dest_guid_prefix_, local_);
00697 
00698   unicast_reply_locator_list_.length(1);
00699   unicast_reply_locator_list_[0].kind = address_to_kind(addr);
00700   unicast_reply_locator_list_[0].port = LOCATOR_PORT_INVALID;
00701   RTPS::address_to_bytes(unicast_reply_locator_list_[0].address, addr);
00702 
00703   multicast_reply_locator_list_.length(1);
00704   multicast_reply_locator_list_[0].kind = address_to_kind(addr);
00705   multicast_reply_locator_list_[0].port = LOCATOR_PORT_INVALID;
00706   assign(multicast_reply_locator_list_[0].address, LOCATOR_ADDRESS_INVALID);
00707 
00708   have_timestamp_ = false;
00709   timestamp_ = TIME_INVALID;
00710 }
00711 
00712 void
00713 RtpsUdpReceiveStrategy::MessageReceiver::submsg(const RTPS::Submessage& s)
00714 {
00715   using namespace RTPS;
00716 
00717   switch (s._d()) {
00718   case INFO_TS:
00719     submsg(s.info_ts_sm());
00720     break;
00721 
00722   case INFO_SRC:
00723     submsg(s.info_src_sm());
00724     break;
00725 
00726   case INFO_REPLY_IP4:
00727     submsg(s.info_reply_ipv4_sm());
00728     break;
00729 
00730   case INFO_DST:
00731     submsg(s.info_dst_sm());
00732     break;
00733 
00734   case INFO_REPLY:
00735     submsg(s.info_reply_sm());
00736     break;
00737 
00738   default:
00739     break;
00740   }
00741 }
00742 
00743 void
00744 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00745   const RTPS::InfoDestinationSubmessage& id)
00746 {
00747   // see RTPS spec v2.1 section 8.3.7.7.4
00748   for (size_t i = 0; i < sizeof(GuidPrefix_t); ++i) {
00749     if (id.guidPrefix[i]) { // if some byte is > 0, it's not UNKNOWN
00750       RTPS::assign(dest_guid_prefix_, id.guidPrefix);
00751       return;
00752     }
00753   }
00754   RTPS::assign(dest_guid_prefix_, local_);
00755 }
00756 
00757 void
00758 RtpsUdpReceiveStrategy::MessageReceiver::submsg(const RTPS::InfoReplySubmessage& ir)
00759 {
00760   // see RTPS spec v2.1 section 8.3.7.8.4
00761   unicast_reply_locator_list_.length(ir.unicastLocatorList.length());
00762   for (CORBA::ULong i = 0; i < ir.unicastLocatorList.length(); ++i) {
00763     unicast_reply_locator_list_[i] = ir.unicastLocatorList[i];
00764   }
00765 
00766   if (ir.smHeader.flags & 2 /* MulticastFlag */) {
00767     multicast_reply_locator_list_.length(ir.multicastLocatorList.length());
00768     for (CORBA::ULong i = 0; i < ir.multicastLocatorList.length(); ++i) {
00769       multicast_reply_locator_list_[i] = ir.multicastLocatorList[i];
00770     }
00771 
00772   } else {
00773     multicast_reply_locator_list_.length(0);
00774   }
00775 }
00776 
00777 void
00778 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00779   const RTPS::InfoReplyIp4Submessage& iri4)
00780 {
00781   // see RTPS spec v2.1 sections 8.3.7.8.4 and 9.4.5.14
00782   unicast_reply_locator_list_.length(1);
00783   unicast_reply_locator_list_[0].kind = RTPS::LOCATOR_KIND_UDPv4;
00784   unicast_reply_locator_list_[0].port = iri4.unicastLocator.port;
00785   RTPS::assign(unicast_reply_locator_list_[0].address, iri4.unicastLocator.address);
00786 
00787   if (iri4.smHeader.flags & 2 /* MulticastFlag */) {
00788     multicast_reply_locator_list_.length(1);
00789     multicast_reply_locator_list_[0].kind = RTPS::LOCATOR_KIND_UDPv4;
00790     multicast_reply_locator_list_[0].port = iri4.multicastLocator.port;
00791     RTPS::assign(multicast_reply_locator_list_[0].address, iri4.multicastLocator.address);
00792   } else {
00793     multicast_reply_locator_list_.length(0);
00794   }
00795 }
00796 
00797 void
00798 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00799   const RTPS::InfoTimestampSubmessage& it)
00800 {
00801   // see RTPS spec v2.1 section 8.3.7.9.10
00802   if (!(it.smHeader.flags & 2 /* InvalidateFlag */)) {
00803     have_timestamp_ = true;
00804     timestamp_ = it.timestamp;
00805   } else {
00806     have_timestamp_ = false;
00807   }
00808 }
00809 
00810 void
00811 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00812   const RTPS::InfoSourceSubmessage& is)
00813 {
00814   // see RTPS spec v2.1 section 8.3.7.9.4
00815   RTPS::assign(source_guid_prefix_, is.guidPrefix);
00816   source_version_ = is.version;
00817   source_vendor_ = is.vendorId;
00818   unicast_reply_locator_list_.length(1);
00819   unicast_reply_locator_list_[0] = RTPS::LOCATOR_INVALID;
00820   multicast_reply_locator_list_.length(1);
00821   multicast_reply_locator_list_[0] = RTPS::LOCATOR_INVALID;
00822   have_timestamp_ = false;
00823 }
00824 
00825 void
00826 RtpsUdpReceiveStrategy::MessageReceiver::fill_header(
00827   DataSampleHeader& header) const
00828 {
00829   using namespace RTPS;
00830   if (have_timestamp_) {
00831     header.source_timestamp_sec_ = timestamp_.seconds;
00832     header.source_timestamp_nanosec_ =
00833       static_cast<ACE_UINT32>(timestamp_.fraction / NANOS_TO_RTPS_FRACS + .5);
00834   }
00835   assign(header.publication_id_.guidPrefix, source_guid_prefix_);
00836 }
00837 
00838 } // namespace DCPS
00839 } // namespace OpenDDS
00840 
00841 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1