00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsUdpReceiveStrategy.h"
00009 #include "RtpsUdpDataLink.h"
00010 #include "RtpsUdpInst.h"
00011 #include "RtpsUdpTransport.h"
00012
00013 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00014 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00015 #include "dds/DCPS/RTPS/MessageTypes.h"
00016
00017 #include "ace/Reactor.h"
00018
00019
00020 namespace OpenDDS {
00021 namespace DCPS {
00022
00023 RtpsUdpReceiveStrategy::RtpsUdpReceiveStrategy(RtpsUdpDataLink* link)
00024 : link_(link)
00025 , last_received_()
00026 , recvd_sample_(0)
00027 , receiver_(link->local_prefix())
00028 {
00029 }
00030
00031 int
00032 RtpsUdpReceiveStrategy::handle_input(ACE_HANDLE fd)
00033 {
00034 return handle_dds_input(fd);
00035 }
00036
00037 ssize_t
00038 RtpsUdpReceiveStrategy::receive_bytes(iovec iov[],
00039 int n,
00040 ACE_INET_Addr& remote_address,
00041 ACE_HANDLE fd)
00042 {
00043 const ACE_SOCK_Dgram& socket =
00044 (fd == link_->unicast_socket().get_handle())
00045 ? link_->unicast_socket() : link_->multicast_socket();
00046 #ifdef ACE_LACKS_SENDMSG
00047 char buffer[0x10000];
00048 ssize_t scatter = socket.recv(buffer, sizeof buffer, remote_address);
00049 char* iter = buffer;
00050 for (int i = 0; scatter > 0 && i < n; ++i) {
00051 const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len),
00052 static_cast<size_t>(scatter));
00053 std::memcpy(iov[i].iov_base, iter, chunk);
00054 scatter -= chunk;
00055 iter += chunk;
00056 }
00057 const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer);
00058 #else
00059 const ssize_t ret = socket.recv(iov, n, remote_address);
00060 #endif
00061 remote_address_ = remote_address;
00062 return ret;
00063 }
00064
00065 void
00066 RtpsUdpReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00067 const ACE_INET_Addr& )
00068 {
00069 using namespace RTPS;
00070
00071 if (std::memcmp(receiver_.dest_guid_prefix_, link_->local_prefix(),
00072 sizeof(GuidPrefix_t))) {
00073
00074 return;
00075 }
00076
00077 const RtpsSampleHeader& rsh = received_sample_header();
00078 const SubmessageKind kind = rsh.submessage_._d();
00079
00080 switch (kind) {
00081 case INFO_SRC:
00082 case INFO_REPLY_IP4:
00083 case INFO_DST:
00084 case INFO_REPLY:
00085 case INFO_TS:
00086
00087
00088 break;
00089
00090 case DATA: {
00091 receiver_.fill_header(sample.header_);
00092 const DataSubmessage& data = rsh.submessage_.data_sm();
00093 recvd_sample_ = &sample;
00094 readers_selected_.clear();
00095 readers_withheld_.clear();
00096
00097
00098
00099 link_->received(data, receiver_.source_guid_prefix_);
00100 recvd_sample_ = 0;
00101
00102 if (data.readerId != ENTITYID_UNKNOWN) {
00103 RepoId reader;
00104 std::memcpy(reader.guidPrefix, link_->local_prefix(),
00105 sizeof(GuidPrefix_t));
00106 reader.entityId = data.readerId;
00107 if (!readers_withheld_.count(reader)) {
00108 if (Transport_debug_level > 5) {
00109 GuidConverter reader_conv(reader);
00110 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q to reader %C\n", this,
00111 sample.header_.sequence_.getValue(),
00112 OPENDDS_STRING(reader_conv).c_str()));
00113 }
00114 link_->data_received(sample, reader);
00115 }
00116
00117 } else {
00118 if (Transport_debug_level > 5) {
00119 OPENDDS_STRING included_ids;
00120 bool first = true;
00121 RepoIdSet::iterator iter = readers_selected_.begin();
00122 while(iter != readers_selected_.end()) {
00123 included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00124 first = false;
00125 ++iter;
00126 }
00127 OPENDDS_STRING excluded_ids;
00128 first = true;
00129 RepoIdSet::iterator iter2 = this->readers_withheld_.begin();
00130 while(iter2 != readers_withheld_.end()) {
00131 excluded_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter2));
00132 first = false;
00133 ++iter2;
00134 }
00135 ACE_DEBUG((LM_DEBUG, "(%P|%t) - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_selected ids:\n%C\n", this, included_ids.c_str()));
00136 ACE_DEBUG((LM_DEBUG, "(%P|%t) - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_withheld ids:\n%C\n", this, excluded_ids.c_str()));
00137 }
00138
00139 if (readers_withheld_.empty() && readers_selected_.empty()) {
00140 if (Transport_debug_level > 5) {
00141 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n", this,
00142 sample.header_.sequence_.getValue()));
00143 }
00144 link_->data_received(sample);
00145 } else {
00146 if (Transport_debug_level > 5) {
00147 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received_include for seq: %q to readers_selected_\n", this,
00148 sample.header_.sequence_.getValue()));
00149 }
00150 link_->data_received_include(sample, readers_selected_);
00151 }
00152 }
00153 break;
00154 }
00155 case GAP:
00156 link_->received(rsh.submessage_.gap_sm(), receiver_.source_guid_prefix_);
00157 break;
00158
00159 case HEARTBEAT:
00160 link_->received(rsh.submessage_.heartbeat_sm(),
00161 receiver_.source_guid_prefix_);
00162 if (rsh.submessage_.heartbeat_sm().smHeader.flags & 4 ) {
00163
00164 sample.header_.message_id_ = DATAWRITER_LIVELINESS;
00165 receiver_.fill_header(sample.header_);
00166 sample.header_.publication_id_.entityId = rsh.submessage_.heartbeat_sm().writerId;
00167 link_->data_received(sample);
00168 }
00169 break;
00170
00171 case ACKNACK:
00172 link_->received(rsh.submessage_.acknack_sm(),
00173 receiver_.source_guid_prefix_);
00174 break;
00175
00176 case HEARTBEAT_FRAG:
00177 link_->received(rsh.submessage_.hb_frag_sm(),
00178 receiver_.source_guid_prefix_);
00179 break;
00180
00181 case NACK_FRAG:
00182 link_->received(rsh.submessage_.nack_frag_sm(),
00183 receiver_.source_guid_prefix_);
00184 break;
00185
00186
00187
00188
00189 default:
00190 break;
00191 }
00192 }
00193
00194 int
00195 RtpsUdpReceiveStrategy::start_i()
00196 {
00197 ACE_Reactor* reactor = link_->get_reactor();
00198 if (reactor == 0) {
00199 ACE_ERROR_RETURN((LM_ERROR,
00200 ACE_TEXT("(%P|%t) ERROR: ")
00201 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00202 ACE_TEXT("NULL reactor reference!\n")),
00203 -1);
00204 }
00205
00206 #ifdef ACE_WIN32
00207
00208
00209
00210
00211
00212 BOOL recv_udp_connreset = FALSE;
00213 link_->unicast_socket().control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00214 #endif
00215
00216 if (reactor->register_handler(link_->unicast_socket().get_handle(), this,
00217 ACE_Event_Handler::READ_MASK) != 0) {
00218 ACE_ERROR_RETURN((LM_ERROR,
00219 ACE_TEXT("(%P|%t) ERROR: ")
00220 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00221 ACE_TEXT("failed to register handler for unicast ")
00222 ACE_TEXT("socket %d\n"),
00223 link_->unicast_socket().get_handle()),
00224 -1);
00225 }
00226
00227 if (link_->config()->use_multicast_) {
00228 if (reactor->register_handler(link_->multicast_socket().get_handle(), this,
00229 ACE_Event_Handler::READ_MASK) != 0) {
00230 ACE_ERROR_RETURN((LM_ERROR,
00231 ACE_TEXT("(%P|%t) ERROR: ")
00232 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00233 ACE_TEXT("failed to register handler for multicast\n")),
00234 -1);
00235 }
00236 }
00237
00238 return 0;
00239 }
00240
00241 void
00242 RtpsUdpReceiveStrategy::stop_i()
00243 {
00244 ACE_Reactor* reactor = link_->get_reactor();
00245 if (reactor == 0) {
00246 ACE_ERROR((LM_ERROR,
00247 ACE_TEXT("(%P|%t) ERROR: ")
00248 ACE_TEXT("RtpsUdpReceiveStrategy::stop_i: ")
00249 ACE_TEXT("NULL reactor reference!\n")));
00250 return;
00251 }
00252
00253 reactor->remove_handler(link_->unicast_socket().get_handle(),
00254 ACE_Event_Handler::READ_MASK);
00255
00256 if (link_->config()->use_multicast_) {
00257 reactor->remove_handler(link_->multicast_socket().get_handle(),
00258 ACE_Event_Handler::READ_MASK);
00259 }
00260 }
00261
00262 bool
00263 RtpsUdpReceiveStrategy::check_header(const RtpsTransportHeader& header)
00264 {
00265 receiver_.reset(remote_address_, header.header_);
00266 return header.valid();
00267 }
00268
00269 bool
00270 RtpsUdpReceiveStrategy::check_header(const RtpsSampleHeader& header)
00271 {
00272 receiver_.submsg(header.submessage_);
00273
00274
00275 if (header.valid() && header.submessage_._d() == RTPS::DATA_FRAG) {
00276 const RTPS::DataFragSubmessage& rtps = header.submessage_.data_frag_sm();
00277 frags_.first = rtps.fragmentStartingNum.value;
00278 frags_.second = frags_.first + (rtps.fragmentsInSubmessage - 1);
00279 }
00280
00281 return header.valid();
00282 }
00283
00284 const ReceivedDataSample*
00285 RtpsUdpReceiveStrategy::withhold_data_from(const RepoId& sub_id)
00286 {
00287 readers_withheld_.insert(sub_id);
00288 return recvd_sample_;
00289 }
00290
00291 void
00292 RtpsUdpReceiveStrategy::do_not_withhold_data_from(const RepoId& sub_id)
00293 {
00294 readers_selected_.insert(sub_id);
00295 }
00296
00297 bool
00298 RtpsUdpReceiveStrategy::reassemble(ReceivedDataSample& data)
00299 {
00300 using namespace RTPS;
00301 receiver_.fill_header(data.header_);
00302 if (reassembly_.reassemble(frags_, data)) {
00303
00304
00305
00306
00307
00308
00309
00310 data.header_.byte_order_ = data.sample_->rd_ptr()[1] & 1 ;
00311
00312 RtpsSampleHeader& rsh = received_sample_header();
00313 const DataFragSubmessage& dfsm = rsh.submessage_.data_frag_sm();
00314
00315 const CORBA::Octet data_flags = (data.header_.byte_order_ ? 1 : 0)
00316 | (data.header_.key_fields_only_ ? 8 : 4);
00317 const DataSubmessage dsm = {
00318 {DATA, data_flags, 0}, 0, DATA_OCTETS_TO_IQOS,
00319 dfsm.readerId, dfsm.writerId, dfsm.writerSN, ParameterList()};
00320 rsh.submessage_.data_sm(dsm);
00321 return true;
00322 }
00323 return false;
00324 }
00325
00326 bool
00327 RtpsUdpReceiveStrategy::remove_frags_from_bitmap(CORBA::Long bitmap[],
00328 CORBA::ULong num_bits,
00329 const SequenceNumber& base,
00330 const RepoId& pub_id)
00331 {
00332 bool modified = false;
00333 for (CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) {
00334 if (bit == 32) bit = 0;
00335
00336 if (bit == 0) {
00337 x = static_cast<CORBA::ULong>(bitmap[i / 32]);
00338 if (x == 0) {
00339
00340 i += 31;
00341 bit = 31;
00342
00343
00344
00345 continue;
00346 }
00347 }
00348
00349 const CORBA::ULong mask = 1 << (31 - bit);
00350 if ((x & mask) && reassembly_.has_frags(base + i, pub_id)) {
00351 x &= ~mask;
00352 bitmap[i / 32] = x;
00353 modified = true;
00354 }
00355 }
00356 return modified;
00357 }
00358
00359 void
00360 RtpsUdpReceiveStrategy::remove_fragments(const SequenceRange& range,
00361 const RepoId& pub_id)
00362 {
00363 for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00364 reassembly_.data_unavailable(sn, pub_id);
00365 }
00366 }
00367
00368 bool
00369 RtpsUdpReceiveStrategy::has_fragments(const SequenceRange& range,
00370 const RepoId& pub_id,
00371 FragmentInfo* frag_info)
00372 {
00373 for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00374 if (reassembly_.has_frags(sn, pub_id)) {
00375 if (frag_info) {
00376 std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
00377 p.first = sn;
00378 frag_info->push_back(p);
00379 RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
00380 missing_frags.bitmap.length(8);
00381 missing_frags.bitmapBase.value =
00382 reassembly_.get_gaps(sn, pub_id, missing_frags.bitmap.get_buffer(),
00383 8, missing_frags.numBits);
00384
00385 missing_frags.bitmap.length((missing_frags.numBits + 31) / 32);
00386 } else {
00387 return true;
00388 }
00389 }
00390 }
00391 return frag_info ? !frag_info->empty() : false;
00392 }
00393
00394
00395
00396
00397 RtpsUdpReceiveStrategy::MessageReceiver::MessageReceiver(const GuidPrefix_t& local)
00398 : have_timestamp_(false)
00399 {
00400 RTPS::assign(local_, local);
00401 source_version_.major = source_version_.minor = 0;
00402 source_vendor_.vendorId[0] = source_vendor_.vendorId[1] = 0;
00403 for (size_t i = 0; i < sizeof(GuidPrefix_t); ++i) {
00404 source_guid_prefix_[i] = 0;
00405 dest_guid_prefix_[i] = 0;
00406 }
00407 timestamp_.seconds = 0;
00408 timestamp_.fraction = 0;
00409 }
00410
00411 void
00412 RtpsUdpReceiveStrategy::MessageReceiver::reset(const ACE_INET_Addr& addr,
00413 const RTPS::Header& hdr)
00414 {
00415 using namespace RTPS;
00416
00417 source_version_ = hdr.version;
00418 source_vendor_ = hdr.vendorId;
00419
00420 assign(source_guid_prefix_, hdr.guidPrefix);
00421 assign(dest_guid_prefix_, local_);
00422
00423 unicast_reply_locator_list_.length(1);
00424 unicast_reply_locator_list_[0].kind = address_to_kind(addr);
00425 unicast_reply_locator_list_[0].port = LOCATOR_PORT_INVALID;
00426 RTPS::address_to_bytes(unicast_reply_locator_list_[0].address, addr);
00427
00428 multicast_reply_locator_list_.length(1);
00429 multicast_reply_locator_list_[0].kind = address_to_kind(addr);
00430 multicast_reply_locator_list_[0].port = LOCATOR_PORT_INVALID;
00431 assign(multicast_reply_locator_list_[0].address, LOCATOR_ADDRESS_INVALID);
00432
00433 have_timestamp_ = false;
00434 timestamp_ = TIME_INVALID;
00435 }
00436
00437 void
00438 RtpsUdpReceiveStrategy::MessageReceiver::submsg(const RTPS::Submessage& s)
00439 {
00440 using namespace RTPS;
00441
00442 switch (s._d()) {
00443 case INFO_TS:
00444 submsg(s.info_ts_sm());
00445 break;
00446
00447 case INFO_SRC:
00448 submsg(s.info_src_sm());
00449 break;
00450
00451 case INFO_REPLY_IP4:
00452 submsg(s.info_reply_ipv4_sm());
00453 break;
00454
00455 case INFO_DST:
00456 submsg(s.info_dst_sm());
00457 break;
00458
00459 case INFO_REPLY:
00460 submsg(s.info_reply_sm());
00461 break;
00462
00463 default:
00464 break;
00465 }
00466 }
00467
00468 void
00469 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00470 const RTPS::InfoDestinationSubmessage& id)
00471 {
00472
00473 for (size_t i = 0; i < sizeof(GuidPrefix_t); ++i) {
00474 if (id.guidPrefix[i]) {
00475 RTPS::assign(dest_guid_prefix_, id.guidPrefix);
00476 return;
00477 }
00478 }
00479 RTPS::assign(dest_guid_prefix_, local_);
00480 }
00481
00482 void
00483 RtpsUdpReceiveStrategy::MessageReceiver::submsg(const RTPS::InfoReplySubmessage& ir)
00484 {
00485
00486 unicast_reply_locator_list_.length(ir.unicastLocatorList.length());
00487 for (CORBA::ULong i = 0; i < ir.unicastLocatorList.length(); ++i) {
00488 unicast_reply_locator_list_[i] = ir.unicastLocatorList[i];
00489 }
00490
00491 if (ir.smHeader.flags & 2 ) {
00492 multicast_reply_locator_list_.length(ir.multicastLocatorList.length());
00493 for (CORBA::ULong i = 0; i < ir.multicastLocatorList.length(); ++i) {
00494 multicast_reply_locator_list_[i] = ir.multicastLocatorList[i];
00495 }
00496
00497 } else {
00498 multicast_reply_locator_list_.length(0);
00499 }
00500 }
00501
00502 void
00503 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00504 const RTPS::InfoReplyIp4Submessage& iri4)
00505 {
00506
00507 unicast_reply_locator_list_.length(1);
00508 unicast_reply_locator_list_[0].kind = RTPS::LOCATOR_KIND_UDPv4;
00509 unicast_reply_locator_list_[0].port = iri4.unicastLocator.port;
00510 RTPS::assign(unicast_reply_locator_list_[0].address, iri4.unicastLocator.address);
00511
00512 if (iri4.smHeader.flags & 2 ) {
00513 multicast_reply_locator_list_.length(1);
00514 multicast_reply_locator_list_[0].kind = RTPS::LOCATOR_KIND_UDPv4;
00515 multicast_reply_locator_list_[0].port = iri4.multicastLocator.port;
00516 RTPS::assign(multicast_reply_locator_list_[0].address, iri4.multicastLocator.address);
00517 } else {
00518 multicast_reply_locator_list_.length(0);
00519 }
00520 }
00521
00522 void
00523 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00524 const RTPS::InfoTimestampSubmessage& it)
00525 {
00526
00527 if (!(it.smHeader.flags & 2 )) {
00528 have_timestamp_ = true;
00529 timestamp_ = it.timestamp;
00530 } else {
00531 have_timestamp_ = false;
00532 }
00533 }
00534
00535 void
00536 RtpsUdpReceiveStrategy::MessageReceiver::submsg(
00537 const RTPS::InfoSourceSubmessage& is)
00538 {
00539
00540 RTPS::assign(source_guid_prefix_, is.guidPrefix);
00541 source_version_ = is.version;
00542 source_vendor_ = is.vendorId;
00543 unicast_reply_locator_list_.length(1);
00544 unicast_reply_locator_list_[0] = RTPS::LOCATOR_INVALID;
00545 multicast_reply_locator_list_.length(1);
00546 multicast_reply_locator_list_[0] = RTPS::LOCATOR_INVALID;
00547 have_timestamp_ = false;
00548 }
00549
00550 void
00551 RtpsUdpReceiveStrategy::MessageReceiver::fill_header(
00552 DataSampleHeader& header) const
00553 {
00554 using namespace RTPS;
00555 if (have_timestamp_) {
00556 header.source_timestamp_sec_ = timestamp_.seconds;
00557 header.source_timestamp_nanosec_ =
00558 static_cast<ACE_UINT32>(timestamp_.fraction / NANOS_TO_RTPS_FRACS + .5);
00559 }
00560 assign(header.publication_id_.guidPrefix, source_guid_prefix_);
00561 }
00562
00563 }
00564 }