25 #ifdef OPENDDS_SECURITY 29 #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h> 30 #include <dds/DdsDcpsCoreTypeSupportImpl.h> 39 #ifndef __ACE_INLINE__ 46 static const CORBA::Long ONE_QUARTER_MAX_POSITIVE = 0x20000000;
47 static const CORBA::Long THREE_QUARTER_MAX_POSITIVE = 0x60000000;
48 if (incoming <= existing &&
49 !(incoming < ONE_QUARTER_MAX_POSITIVE && existing > THREE_QUARTER_MAX_POSITIVE)) {
78 , reactor_task_(reactor_task)
80 , event_dispatcher_(transport->event_dispatcher())
86 , multi_buff_(this, config->nak_depth_)
90 , best_effort_heartbeat_count_(0)
94 , transport_statistics_(transport_statistics)
95 , transport_statistics_mutex_(transport_statistics_mutex)
96 #ifdef OPENDDS_SECURITY
97 , security_config_(
Security::SecurityRegistry::instance()->default_config())
99 , ice_agent_(ICE::Agent::instance())
103 #ifdef OPENDDS_SECURITY 134 writer = iter->second;
140 writer->add_elem_awaiting_ack(element);
153 RtpsWriterMap::iterator iter =
writers_.find(pub_id);
155 writer = iter->second;
161 return writer->remove_sample(sample);
170 RtpsWriterMap::iterator iter =
writers_.find(pub_id);
172 writer = iter->second;
178 writer->remove_all_msgs();
192 SingleSendBuffer::BufferVec removed;
214 if (!elems_not_acked_.empty()) {
215 typedef SnToTqeMap::iterator iter_t;
216 for (std::pair<iter_t, iter_t> er = elems_not_acked_.equal_range(seq); er.first != er.second; ++er.first) {
217 if (modp.
matches(*er.first->second)) {
220 tqe = er.first->second;
221 elems_not_acked_.erase(er.first);
230 send_buff_->remove_acked(to_release, removed);
236 for (
size_t i = 0; i < removed.size(); ++i) {
238 removed[i].first->accept_remove_visitor(visitor);
239 delete removed[i].first;
240 removed[i].second->release();
259 send_buff_->retain_all(
id_);
272 SnToTqeMap sn_tqe_map;
273 sn_tqe_map.swap(elems_not_acked_);
278 typedef SnToTqeMap::iterator iter_t;
279 for (iter_t it = sn_tqe_map.begin(); it != sn_tqe_map.end(); ++it) {
280 if (it->first != prev) {
281 send_buff_->release_acked(it->first);
288 for (iter_t it = sn_tqe_map.begin(); it != sn_tqe_map.end(); ++it) {
289 it->second->data_dropped(
true);
302 ipv6_unicast_socket_ = ipv6_unicast_socket;
310 if (cfg->use_multicast_) {
311 #ifdef ACE_HAS_MAC_OSX 321 if (cfg->use_multicast_) {
327 ACE_TEXT(
"failed to set TTL: %d\n"),
338 ACE_TEXT(
"failed to set TTL: %d\n"),
346 if (cfg->send_buffer_size_ > 0) {
347 const int snd_size = cfg->send_buffer_size_;
351 sizeof(snd_size)) < 0
356 ACE_TEXT(
"RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
362 if (ipv6_unicast_socket_.set_option(
SOL_SOCKET,
365 sizeof(snd_size)) < 0
370 ACE_TEXT(
"RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
378 if (cfg->rcv_buffer_size_ > 0) {
379 const int rcv_size = cfg->rcv_buffer_size_;
388 ACE_TEXT(
"RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
394 if (ipv6_unicast_socket_.set_option(
SOL_SOCKET,
402 ACE_TEXT(
"RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
418 ACE_TEXT(
"UdpDataLink::open: start failed!\n")));
431 InternalSampleInfoSequence infos;
436 if (!cfg || !cfg->use_multicast_) {
442 cfg->multicast_interface_,
445 cfg->multicast_group_address(),
448 , cfg->ipv6_multicast_group_address(),
449 ipv6_multicast_socket_
453 if (!samples.empty()) {
470 const RemoteInfoMap::const_iterator pos =
locators_.find(remote_id);
475 return valid_last_recv_addr ? pos->second.last_recv_addr_ :
NetworkAddress();
482 AddrSet& unicast_addresses,
483 AddrSet& multicast_addresses,
487 if (unicast_addresses.empty() && multicast_addresses.empty()) {
509 if (log_unicast_change) {
510 for (AddrSet::const_iterator pos = unicast_addresses.begin(), limit = unicast_addresses.end();
511 pos != limit; ++pos) {
516 if (log_multicast_change) {
517 for (AddrSet::const_iterator pos = multicast_addresses.begin(), limit = multicast_addresses.end();
518 pos != limit; ++pos) {
532 if (w->second.seq < seq) {
534 selected.insert(w->second.readers.begin(), w->second.readers.end());
536 withheld.insert(w->second.readers.begin(), w->second.readers.end());
549 RtpsReaderMap::iterator rr =
readers_.find(lsi);
560 bool local_reliable,
bool remote_reliable,
561 bool local_durable,
bool remote_durable,
566 AddrSet& unicast_addresses,
567 AddrSet& multicast_addresses,
573 update_locators(remote_id, unicast_addresses, multicast_addresses, requires_inline_qos,
true);
574 if (last_addr_hint) {
580 if (!local_reliable) {
586 }
else if (i->second.readers.find(local_id) == i->second.readers.end()) {
587 i->second.readers.insert(local_id);
597 log_progress(
"RTPS writer/reader association", local_id, remote_id, participant_discovered_at);
600 if (remote_reliable) {
603 RtpsWriterMap::iterator rw =
writers_.find(local_id);
609 hb_start = hbc_it->second;
612 RtpsWriter_rch writer = make_rch<RtpsWriter>(client, link, local_id, local_durable,
614 rw =
writers_.insert(RtpsWriterMap::value_type(local_id, writer)).first;
618 const SequenceNumber writer_max_sn = writer->update_max_sn(remote_id, max_sn);
619 writer->add_reader(make_rch<ReaderInfo>(remote_id, remote_durable, participant_discovered_at, participant_flags, writer_max_sn + 1));
624 log_progress(
"RTPS reader/writer association", local_id, remote_id, participant_discovered_at);
632 if (remote_reliable) {
634 RtpsReaderMap::iterator rr =
readers_.find(local_id);
639 rr =
readers_.insert(RtpsReaderMap::value_type(local_id, reader)).first;
645 reader->
add_writer(make_rch<WriterInfo>(remote_id, participant_discovered_at, participant_flags));
665 RemoteInfoMap::iterator pos =
locators_.find(remote_id);
669 --pos->second.ref_count_;
670 if (pos->second.ref_count_ == 0) {
674 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) RtpsUdpDataLink::disassociated: local id %C does not have any locators\n",
LogGuid(local_id).c_str()));
681 const AddrSet& addresses,
687 InterestingRemoteMapType::value_type(
694 if (enableheartbeat) {
709 if (pos->second.localid == writerid) {
720 const AddrSet& addresses,
726 InterestingRemoteMapType::value_type(
730 if (enableheartbeatchecker) {
745 if (pos->second.localid == readerid) {
759 RtpsReaderMap::iterator rr =
readers_.find(localId);
763 if (iter->second->id() == localId) {
779 RepoIdSet::iterator r = w->second.readers.find(localId);
780 if (r != w->second.readers.end()) {
781 w->second.readers.erase(r);
782 if (w->second.readers.empty()) {
796 RtpsWriterMap::iterator pos =
writers_.find(localId);
798 writer = pos->second;
805 writer->pre_stop_helper(to_drop,
true);
807 TqeVector::iterator drop_it = to_drop.begin();
808 while (drop_it != to_drop.end()) {
809 (*drop_it)->data_dropped(
true);
812 writer->remove_all_msgs();
826 typedef SnToTqeMap::iterator iter_t;
831 stopping_ = true_stop;
833 if (!elems_not_acked_.empty()) {
835 iter_t iter = elems_not_acked_.begin();
836 while (iter != elems_not_acked_.end()) {
837 to_drop.push_back(iter->second);
838 sns_to_release.insert(iter->first);
839 elems_not_acked_.erase(iter);
840 iter = elems_not_acked_.begin();
843 while (sns_it != sns_to_release.end()) {
844 send_buff_->release_acked(*sns_it);
849 send_buff_->pre_clear();
856 nack_response_->cancel();
867 RtpsWriterMap writers;
871 for (RtpsWriterMap::const_iterator it = writers.begin(); it != writers.end(); ++it) {
876 RtpsWriterMap::iterator w_iter = writers.begin();
877 while (w_iter != writers.end()) {
878 w_iter->second->pre_stop_helper(to_drop,
true);
879 writers.erase(w_iter++);
882 TqeVector::iterator drop_it = to_drop.begin();
883 while (drop_it != to_drop.end()) {
884 (*drop_it)->data_dropped(
true);
888 RtpsReaderMap readers;
894 RtpsReaderMap::iterator r_iter = readers.begin();
895 while (r_iter != readers.end()) {
896 r_iter->second->pre_stop_helper();
910 RtpsWriterMap::iterator rw =
writers_.find(local_id);
915 writer->remove_reader(remote_id);
916 if (writer->reader_count() == 0) {
917 writer->pre_stop_helper(to_drop,
false);
919 writer->process_acked_by_all();
924 RtpsReaderMap::iterator rr =
readers_.find(local_id);
927 for (pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> iters =
929 iters.first != iters.second;) {
930 if (iters.first->second->id() == local_id) {
945 RepoIdSet::iterator r = w->second.readers.find(local_id);
946 if (r != w->second.readers.end()) {
947 w->second.readers.erase(r);
948 if (w->second.readers.empty()) {
958 for (TqeVector::iterator drop_it = to_drop.begin(); drop_it != to_drop.end(); ++drop_it) {
959 (*drop_it)->data_dropped(
true);
973 ipv6_unicast_socket_.close();
974 ipv6_multicast_socket_.close();
984 const RtpsWriterMap::iterator wi =
writers_.find(pub_id);
986 result = wi->second->get_send_buff();
1010 if (send_buff.
is_nil()) {
1016 "pub_id %C seq %q frag %d\n",
LogGuid(pub_id).c_str(), seq.
getValue(),
1027 " - ERROR: couldn't get fragment number for pub_id %C seq %q\n",
1031 send_buff->insert(seq, q, chain);
1039 static_cast<ACE_Message_Block*>(
1081 MetaSubmessageVec& meta_submessages,
1082 bool& deliver_after_send)
1087 if (stopping_ || !link) {
1101 max_sn_ = std::max(max_sn_, seq);
1103 if (!durable_ && !is_pvs_writer() &&
1105 previous_max_sn < max_sn_.previous()) {
1106 add_gap_submsg_i(subm, previous_max_sn + 1);
1111 check_leader_lagger();
1120 bool durable =
false;
1132 subm, *tsce, requires_inline_qos);
1135 end_historic_samples_i(tsce->
header(), msg->
cont(), meta_submessages);
1140 request_ack_i(tsce->
header(), msg->
cont(), meta_submessages);
1141 deliver_after_send =
true;
1145 deliver_after_send =
true;
1159 subm, *dsle, requires_inline_qos);
1169 subm, *dsle, requires_inline_qos);
1174 send_buff_->pre_insert(seq);
1178 #ifdef OPENDDS_SECURITY 1206 ReaderInfoMap::iterator ri = remote_readers_.find(sub);
1207 if (ri != remote_readers_.end()) {
1208 ri->second->durable_data_[rtps->
sequence()] = rtps;
1209 ri->second->durable_timestamp_.set_to_now();
1211 const LogGuid conv(pub_id), sub_conv(sub);
1213 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - " 1214 "storing durable data for local %C remote %C seq %q\n",
1215 conv.c_str(), sub_conv.
c_str(),
1223 send_buff_->pre_insert(seq);
1231 MetaSubmessageVec& meta_submessages,
1232 bool& deliver_after_send,
1254 subm, *tsce, requires_inline_qos);
1257 deliver_after_send =
true;
1271 subm, *dsle, requires_inline_qos);
1279 subm, *dsle, requires_inline_qos);
1285 #ifdef OPENDDS_SECURITY 1314 GUIDSeq_var peers =
peer_ids(pub_id);
1320 const RtpsWriterMap::iterator rw =
writers_.find(pub_id);
1321 MetaSubmessageVec meta_submessages;
1324 bool deliver_after_send =
false;
1326 writer = rw->second;
1328 result = writer->customize_queue_element_helper(element, require_iq, meta_submessages, deliver_after_send);
1336 if (deliver_after_send) {
1346 MetaSubmessageVec& meta_submessages)
1354 std::memcpy(&sub, body->
rd_ptr(),
sizeof(sub));
1356 typedef ReaderInfoMap::iterator iter_t;
1360 "local %C all readers\n",
LogGuid(
id_).c_str()));
1362 for (iter_t iter = remote_readers_.begin();
1363 iter != remote_readers_.end(); ++iter) {
1364 if (iter->second->durable_) {
1365 iter->second->durable_timestamp_ = now;
1367 log_progress(
"durable data queued",
id_, iter->first, iter->second->participant_discovered_at_);
1372 iter_t iter = remote_readers_.find(sub);
1373 if (iter != remote_readers_.end()) {
1374 if (iter->second->durable_) {
1375 iter->second->durable_timestamp_ = now;
1377 log_progress(
"durable data queued",
id_, iter->first, iter->second->participant_discovered_at_);
1381 initialize_heartbeat(proxy, meta_submessage);
1382 gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, iter->second);
1396 MetaSubmessageVec& meta_submessages)
1402 std::memcpy(&sub, body->
rd_ptr(),
sizeof(sub));
1404 typedef ReaderInfoMap::iterator iter_t;
1406 gather_heartbeats_i(meta_submessages);
1409 "local %C all readers\n",
LogGuid(
id_).c_str()));
1412 iter_t iter = remote_readers_.find(sub);
1413 if (iter != remote_readers_.end()) {
1416 initialize_heartbeat(proxy, meta_submessage);
1417 gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, iter->second);
1421 " local %C remote %C\n", conv.c_str(), sub_conv.
c_str()));
1438 const RemoteInfoMap::const_iterator iter =
locators_.find(peers[i]);
1439 if (iter !=
locators_.end() && iter->second.requires_inline_qos_) {
1464 MetaSubmessageVec meta_submessages;
1465 gather_heartbeats_i(meta_submessages);
1467 if (!preassociation_readers_.empty() || !lagging_readers_.empty()) {
1469 fallback_.advance();
1471 fallback_.set(initial_fallback_);
1487 MetaSubmessageVec meta_submessages;
1495 gather_nack_replies_i(meta_submessages);
1529 msg[idx].gap_sm(gap);
1540 if (addr == cfg->rtps_relay_address()) {
1544 bool remove_cache =
false;
1547 const RemoteInfoMap::iterator pos =
locators_.find(src);
1549 const bool expired = cfg->receive_address_duration_ < (
MonotonicTimePoint::now() - pos->second.last_recv_time_);
1550 const bool allow_update = expired ||
1551 pos->second.last_recv_addr_ == addr ||
1554 remove_cache = pos->second.last_recv_addr_ != addr;
1555 pos->second.last_recv_addr_ = addr;
1556 pos->second.last_recv_time_ = now;
1581 typedef std::pair<RtpsReaderMultiMap::iterator, RtpsReaderMultiMap::iterator> RRMM_IterRange;
1582 for (RRMM_IterRange iters =
readers_of_writer_.equal_range(src); iters.first != iters.second; ++iters.first) {
1583 to_call.push_back(iters.first->second);
1592 trs->withhold_data_from(*it);
1597 const RtpsReaderMap::iterator rr =
readers_.find(local);
1599 to_call.push_back(rr->second);
1604 trs->withhold_data_from(local);
1609 MetaSubmessageVec meta_submessages;
1611 (*it)->process_data_i(data, src, meta_submessages);
1627 preassociation_writers_.clear();
1628 log_remote_counts(
"pre_stop_helper");
1641 for (WriterInfoMap::iterator it = remote_writers_.begin(); it != remote_writers_.end(); ++it) {
1642 it->second->held_.clear();
1648 preassociation_task_->cancel();
1655 , nackfrag_count_(0)
1696 writer->frags_.erase(seq);
1698 if (writer->recvd_.empty()) {
1701 ACE_TEXT(
"(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1702 ACE_TEXT(
" data seq: %q from %C being from %C expecting heartbeat\n"),
1709 writer->held_.insert(std::make_pair(seq, *sample));
1711 }
else if (writer->recvd_.contains(seq)) {
1713 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_data_i: %C -> %C duplicate sample\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
1717 ACE_TEXT(
" data seq: %q from %C being DROPPED from %C because it's ALREADY received\n"),
1724 }
else if (!writer->held_.empty()) {
1729 writer->recvd_.dump();
1731 writer->held_.insert(std::make_pair(seq, *sample));
1732 writer->recvd_.insert(seq);
1734 }
else if (writer->recvd_.disjoint() || writer->recvd_.cumulative_ack() != seq.
previous()) {
1737 ACE_TEXT(
" data seq: %q from %C being WITHHELD from %C because it's EXPECTING more data\n"),
1744 writer->held_.insert(std::make_pair(seq, *sample));
1745 writer->recvd_.insert(seq);
1750 ACE_TEXT(
" data seq: %q from %C to %C OK to deliver\n"),
1755 writer->recvd_.insert(seq);
1761 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_data_i: %C -> %C unknown remote writer\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
1765 ACE_TEXT(
" data seq: %q from %C to %C dropped because of unknown writer\n"),
1812 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_gap_i: %C -> %C unknown remote writer\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
1819 if (writer->recvd_.empty()) {
1821 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_gap_i: %C -> %C preassociation writer\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
1831 }
else if (start != base) {
1832 ACE_ERROR((
LM_ERROR,
"(%P|%t) RtpsUdpDataLink::RtpsReader::process_gap_i: ERROR - Incoming GAP has inverted start (%q) & base (%q) values, ignoring start value\n", start.
getValue(), base.
getValue()));
1834 writer->recvd_.insert(base, gap.
gapList.numBits, gap.
gapList.bitmap.get_buffer());
1842 if (!gaps.
empty()) {
1843 for (WriterInfo::HeldMap::iterator pos = writer->held_.lower_bound(gaps.
low()),
1844 limit = writer->held_.upper_bound(gaps.
high()); pos != limit;) {
1846 writer->held_.erase(pos++);
1875 MetaSubmessageVec meta_submessages;
1887 pos->second.last_activity = now;
1889 callbacks.push_back(pos->second);
1896 for (
size_t i = 0; i < callbacks.size(); ++i) {
1897 callbacks[i].listener->writer_exists(src, callbacks[i].localid);
1907 MetaSubmessageVec& meta_submessages)
1928 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) RtpsUdpDataLink::RtpsReader::process_heartbeat_i - %C -> %C first %q last %q count %d\n",
1935 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C unknown remote writer\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
1942 if (!compare_and_update_counts(heartbeat.
count.
value, writer->heartbeat_recvd_count_)) {
1945 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C stale/duplicate message (%d vs %d)\n",
1948 VDBG((
LM_WARNING,
"(%P|%t) RtpsUdpDataLink::process_heartbeat_i " 1949 "WARNING Count indicates duplicate, dropping\n"));
1957 bool first_ever_hb =
false;
1960 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i - %C -> %C first %q last %q count %d\n",
1965 if (!(hb_first < 1 || hb_last < 0 || hb_last < hb_first.
previous())) {
1966 if (writer->recvd_.empty() && (directed || !writer->sends_directed_hb())) {
1970 log_progress(
"RTPS reader/writer association complete",
id_, writer->id_, writer->participant_discovered_at_);
1975 writer->recvd_.insert(sr);
1976 while (!writer->held_.empty() && writer->held_.begin()->first <= sr.second) {
1977 writer->held_.erase(writer->held_.begin());
1979 for (WriterInfo::HeldMap::const_iterator it = writer->held_.begin(); it != writer->held_.end(); ++it) {
1980 writer->recvd_.insert(it->first);
1983 first_ever_hb =
true;
1987 if (!writer->recvd_.empty()) {
1988 writer->hb_last_ = std::max(writer->hb_last_, hb_last);
1989 gather_ack_nacks_i(writer, link, !is_final, meta_submessages, cumulative_bits_added);
1991 if (cumulative_bits_added) {
1993 if (cfg && cfg->count_messages()) {
2005 if (first_ever_hb) {
2018 if (recvd_.empty() || (recvd_.disjoint() && recvd_.cumulative_ack() < hb_last_)) {
2021 if (!recvd_.empty()) {
2022 return recvd_.high() < hb_last_;
2042 ReaderInfoMap::const_iterator iter = remote_readers_.find(reader->id_);
2043 if (iter == remote_readers_.end()) {
2044 #ifdef OPENDDS_SECURITY 2045 if (is_pvs_writer_) {
2046 reader->max_pvs_sn_ = max_sn_;
2049 remote_readers_.insert(ReaderInfoMap::value_type(reader->id_, reader));
2050 update_remote_guids_cache_i(
true, reader->id_);
2051 preassociation_readers_.insert(reader);
2052 preassociation_reader_start_sns_.insert(reader->start_sn_);
2060 fallback_.set(initial_fallback_);
2063 if (!reader->durable_) {
2064 MetaSubmessageVec meta_submessages;
2067 initialize_heartbeat(proxy, meta_submessage);
2068 gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
2082 return remote_readers_.count(
id) != 0;
2091 bool result =
false;
2094 ReaderInfoMap::iterator it = remote_readers_.find(
id);
2095 if (it != remote_readers_.end()) {
2097 reader->swap_durable_data(dd);
2098 remove_preassociation_reader(reader);
2101 readers_expecting_data_.erase(reader);
2102 readers_expecting_heartbeat_.erase(reader);
2103 snris_erase(acked_sn == max_sn ? leading_readers_ : lagging_readers_, acked_sn, reader);
2104 check_leader_lagger();
2106 #ifdef OPENDDS_SECURITY 2107 if (is_pvs_writer_ &&
2108 !reader->pvs_outstanding_.empty()) {
2112 for (
SequenceNumber seq = pos->first; seq <= pos->second; ++seq) {
2114 if (iter != elems_not_acked_.end()) {
2115 send_buff_->release_acked(iter->first);
2116 to_drop.insert(iter->second);
2117 elems_not_acked_.erase(iter);
2124 remote_readers_.erase(it);
2125 update_remote_guids_cache_i(
false,
id);
2131 for (iter_t it = dd.begin(); it != dd.end(); ++it) {
2132 it->second->data_dropped();
2135 for (TqeSet::iterator pos = to_drop.begin(), limit = to_drop.end(); pos != limit; ++pos) {
2136 (*pos)->data_dropped();
2146 return remote_readers_.size();
2158 WriterInfoMap::const_iterator iter =
remote_writers_.find(writer->id_);
2170 MetaSubmessageVec meta_submessages;
2213 if (!info->frags_.empty()) {
2217 if (!info->recvd_.empty()) {
2218 const SequenceRange range(info->recvd_.cumulative_ack() + 1, info->hb_last_);
2235 MetaSubmessageVec meta_submessages;
2246 pos != limit; ++pos) {
2266 const EntityId_t writer_id = writer->id_.entityId;
2280 {writer->heartbeat_recvd_count_}
2283 meta_submessages.push_back(meta_submessage);
2289 bool heartbeat_was_non_final,
2290 MetaSubmessageVec& meta_submessages,
2294 if (writer->should_nack() ||
2295 should_nack_frags) {
2298 const EntityId_t writer_id = writer->id_.entityId;
2310 if (bitmap.length() > 0) {
2311 (void)recvd.
to_bitmap(bitmap.get_buffer(), bitmap.length(),
2312 num_bits, cumulative_bits_added,
true);
2316 if (!recvd.
empty() && hb_high > recvd.
high()) {
2318 (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
2323 if (new_len > old_len) {
2324 bitmap.length(new_len);
2333 bitmap.get_buffer(), new_len,
2334 num_bits, cumulative_bits_added);
2340 const bool frags_modified =
2342 num_bits, ack, writer->id_, cumulative_bits_added);
2343 if (frags_modified) {
2345 if ((i + 1) * 32 <= num_bits) {
2350 if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
2367 {writer->heartbeat_recvd_count_}
2370 meta_submessages.push_back(meta_submessage);
2372 if (should_nack_frags) {
2375 }
else if (heartbeat_was_non_final) {
2382 const EntityId_t writer_id = writer->id_.entityId;
2396 {writer->heartbeat_recvd_count_}
2399 meta_submessages.push_back(meta_submessage);
2403 #ifdef OPENDDS_SECURITY 2412 size_t cache_hits = 0;
2413 size_t cache_misses = 0;
2414 size_t addrset_min_size = std::numeric_limits<size_t>::max();
2415 size_t addrset_max_size = 0;
2421 for (MetaSubmessageVec::iterator it = meta_submessages.begin(), limit = meta_submessages.end(); it != limit; ++it) {
2428 if (entry.is_new_) {
2430 AddrSet& addrs = entry.value().addrs_;
2439 #ifdef OPENDDS_SECURITY 2441 addrs.insert(BUNDLING_PLACEHOLDER);
2444 #if defined ACE_HAS_CPP11 2445 entry.recalculate_hash();
2452 const BundlingCache::ScopedAccess& const_entry = entry;
2453 const AddrSet& addrs = const_entry.value().addrs_;
2454 addrset_min_size = std::min(addrset_min_size, static_cast<size_t>(addrs.size()));
2455 addrset_max_size = std::max(addrset_max_size, static_cast<size_t>(addrs.size()));
2456 if (addrs.empty()) {
2458 #ifdef OPENDDS_SECURITY 2459 }
else if (addrs.size() == 1 && *addrs.begin() == BUNDLING_PLACEHOLDER) {
2466 MetaSubmessageIterVec& vec = dest_map[
make_unknown_guid(it->dst_guid_.guidPrefix)];
2467 vec.reserve(meta_submessages.size());
2471 vec.reserve(meta_submessages.size());
2476 VDBG((
LM_DEBUG,
"(%P|%t) RtpsUdpDataLink::build_meta_submessage_map()" 2477 "- Bundling Cache Stats: hits = %B, misses = %B, min = %B, max = %B\n",
2478 cache_hits, cache_misses, addrset_min_size, addrset_max_size));
2481 #ifdef OPENDDS_SECURITY 2487 using namespace RTPS;
2497 struct BundleHelper {
2499 #ifdef OPENDDS_SECURITY 2506 const Encoding&
encoding,
size_t max_bundle_size,
2507 RtpsUdpDataLink::BundleVec& bundles)
2510 ,
size_(initial_size)
2521 template <
typename T>
2522 bool add_to_bundle(T& submessage)
2524 const size_t prev_size =
size_;
2525 #ifdef OPENDDS_SECURITY 2532 size_ += submessage_size;
2533 #ifdef OPENDDS_SECURITY 2538 size_t compare_size =
size_;
2539 #ifdef OPENDDS_SECURITY 2545 const size_t chunk_size =
size_ - prev_size;
2547 size_ = initial_size + chunk_size;
2563 AddrDestMetaSubmessageMap& addr_map,
2567 using namespace RTPS;
2572 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
2577 const bool new_bundle_per_dest_guid = cfg && cfg->rtps_relay_only();
2581 for (AddrDestMetaSubmessageMap::iterator addr_it = addr_map.begin(), limit = addr_map.end(); addr_it != limit; ++addr_it) {
2584 bundles.push_back(
Bundle(addr_it->first));
2588 for (DestMetaSubmessageMap::iterator dest_it = addr_it->second.begin(), limit2 = addr_it->second.end(); dest_it != limit2; ++dest_it) {
2590 if (dest_it->second.empty()) {
2595 if (new_bundle_per_dest_guid && bundles.back().submessages_.size()) {
2596 helper.end_bundle();
2597 bundles.push_back(
Bundle(addr_it->first));
2601 for (MetaSubmessageIterVec::iterator resp_it = dest_it->second.begin(), limit3 = dest_it->second.end(); resp_it != limit3; ++resp_it) {
2604 if (dest_it->first != prev_dst) {
2607 if (!helper.add_to_bundle(idst)) {
2608 bundles.push_back(
Bundle(addr_it->first));
2613 bool result =
false, unique =
false;
2614 ACE_UNUSED_ARG(unique);
2616 switch (res.
sm_._d()) {
2631 result = helper.add_to_bundle(res.
sm_.
gap_sm());
2645 prev_dst = dest_it->first;
2650 bundles.push_back(
Bundle(addr_it->first));
2653 bundles.back().submessages_.push_back(*resp_it);
2656 helper.end_bundle();
2680 dedup(fsq_vec_[idx]);
2682 fsq_vec_[idx].clear();
2696 MetaSubmessageVec vec;
2701 fsq_vec_.resize(fsq_vec_.size() + 1);
2707 if (send_immediately) {
2735 ReaderInfoMap::iterator ri = remote_readers_.find(
id);
2736 if (ri != remote_readers_.end()) {
2737 ri->second->required_acknack_count_ = current;
2747 RtpsWriterMap::iterator rw =
writers_.find(local_id);
2749 writer = rw->second;
2753 writer->update_required_acknack_count(remote_id, current);
2760 using namespace RTPS;
2763 AddrDestMetaSubmessageMap addr_map;
2770 bundles.reserve(meta_submessages.size());
2778 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
2782 it->second.next_directed_unassigned_ = it->second.map_.begin();
2783 it->second.next_undirected_unassigned_ = it->second.map_.begin();
2784 for (CountMap::iterator it2 = it->second.map_.begin(), limit2 = it->second.map_.end(); it2 != limit2; ++it2) {
2785 if (it2->second.undirected_) {
2786 ++(it->second.next_directed_unassigned_);
2793 for (
size_t i = 0; i < bundles.size(); ++i) {
2798 const MetaSubmessageIterVec& bundle_vec = bundles[i].submessages_;
2799 for (MetaSubmessageIterVec::const_iterator it = bundle_vec.begin(), limit = bundle_vec.end(); it != limit; ++it) {
2802 if (dst != prev_dst) {
2809 switch (res.
sm_._d()) {
2836 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: HEARTBEAT: %C -> %C first %q last %q count %d\n",
2845 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: ACKNACK: %C -> %C base %q bits %u count %d\n",
2854 set.erase(
set.begin());
2859 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::bundle_and_send_submessages: NACKFRAG: %C -> %C seq %q base %u bits %u\n",
2876 ss->send_rtps_control(rtps_message, *(mb_bundle.
get()), bundles[i].proxy_.addrs());
2889 typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
2890 RtpsUdpReceiveStrategy::FragmentInfo frag_info;
2899 for (
size_t i = 0; i < missing.size(); ++i) {
2903 if (!wi->recvd_.empty() && wi->recvd_.high() < wi->hb_last_) {
2904 const SequenceRange range(wi->recvd_.high() + 1, wi->hb_last_);
2907 for (
size_t i = 0; i < frag_info.size(); ++i) {
2909 const iter_t heartbeat_frag = wi->frags_.find(frag_info[i].first);
2910 if (heartbeat_frag != wi->frags_.end()) {
2911 extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value, cumulative_bits_added);
2916 const iter_t low = wi->frags_.lower_bound(wi->recvd_.cumulative_ack()),
2917 high = wi->frags_.upper_bound(wi->recvd_.last_ack()),
2918 end = wi->frags_.end();
2919 for (iter_t iter = wi->frags_.begin(); iter != end; ++iter) {
2929 if (!link->
receive_strategy()->has_fragments(range, wi->id_, &frag_info)) {
2931 frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
2932 RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
2933 fnSet.bitmapBase.value = 1;
2934 fnSet.numBits = std::min(
CORBA::ULong(256), iter->second.value);
2935 fnSet.bitmap.length((fnSet.numBits + 31) / 32);
2936 for (
CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
2937 fnSet.bitmap[i] = 0xFFFFFFFF;
2942 if (frag_info.empty()) {
2951 RTPS::FragmentNumberSet(),
2955 meta_submessages.reserve(meta_submessages.size() + frag_info.size());
2956 for (
size_t i = 0; i < frag_info.size(); ++i) {
2963 meta_submessages.push_back(meta_submessage);
2972 if (extent < fnSet.bitmapBase.value) {
2977 extent - fnSet.bitmapBase.value + 1),
2978 len = (new_num_bits + 31) / 32;
2979 if (new_num_bits < fnSet.numBits) {
2982 fnSet.bitmap.length(len);
2985 fnSet.bitmap.get_buffer(), len,
2986 fnSet.numBits, samples_requested);
3003 MetaSubmessageVec& meta_submessages)
3022 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i: %C -> %C unknown remote writer\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
3029 if (!compare_and_update_counts(hb_frag.
count.
value, writer->hb_frag_recvd_count_)) {
3031 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i: %C -> %C stale/duplicate message\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
3033 VDBG((
LM_WARNING,
"(%P|%t) RtpsUdpDataLink::process_heartbeat_frag_i " 3034 "WARNING Count indicates duplicate, dropping\n"));
3043 if (seq > writer->hb_last_ || !writer->recvd_.contains(seq)) {
3047 if (cumulative_bits_added) {
3049 if (cfg && cfg->count_messages()) {
3081 pos->second.last_activity = now;
3083 if (local == pos->second.localid) {
3085 callbacks.push_back(pos->second.listener);
3092 for (
size_t i = 0; i < callbacks.size(); ++i) {
3093 callbacks[i]->reader_exists(remote, local);
3102 MetaSubmessageVec& meta_submessages)
3104 using namespace RTPS;
3123 if (bitmap.length() > 0) {
3125 (void)gaps.
to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits, cumulative_bits_added);
3135 {gapListBase, num_bits, bitmap}
3147 "GAP with range [%q, %q] from %C\n",
3148 sr.first.getValue(), sr.second.getValue(),
3152 meta_submessages.push_back(meta_submessage);
3175 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C base %q bits %u count %d\n",
3179 ReaderInfoMap::iterator ri = remote_readers_.find(src);
3180 if (ri == remote_readers_.end()) {
3182 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C unknown remote reader\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
3185 "WARNING ReaderInfo not found\n"));
3192 const bool count_is_not_zero = acknack.
count.
value != 0;
3193 const CORBA::Long previous_count = reader->acknack_recvd_count_;
3194 bool dont_schedule_nack_response =
false;
3196 if (count_is_not_zero) {
3197 if (!compare_and_update_counts(acknack.
count.
value, reader->acknack_recvd_count_) &&
3198 (!reader->reflects_heartbeat_count() || acknack.
count.
value != 0 || reader->acknack_recvd_count_ != 0)) {
3200 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C stale/duplicate message\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str()));
3203 "WARNING Count indicates duplicate, dropping\n"));
3207 if (reader->reflects_heartbeat_count()) {
3208 if (acknack.
count.
value < reader->required_acknack_count_) {
3210 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C stale message (reflect %d < %d)\n",
LogGuid(src).c_str(),
LogGuid(
id_).c_str(), acknack.
count.
value, reader->required_acknack_count_));
3212 dont_schedule_nack_response =
true;
3217 fallback_.set(initial_fallback_);
3222 if (preassociation_readers_.count(reader)) {
3223 if (is_postassociation) {
3224 remove_preassociation_reader(reader);
3226 log_progress(
"RTPS writer/reader association complete",
id_, reader->id_, reader->participant_discovered_at_);
3232 snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3233 check_leader_lagger();
3241 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C base %q bits %u count %d\n",
3246 bool inform_send_listener =
false;
3249 if (ack >= reader->cur_cumulative_ack_) {
3250 reader->cur_cumulative_ack_ = ack;
3251 inform_send_listener =
true;
3252 }
else if (count_is_not_zero) {
3255 "%C -> %C reset detected count %d > %d ack %q < %q\n",
3257 acknack.
count.
value, previous_count, ack.
getValue(), reader->cur_cumulative_ack_.getValue()));
3259 snris_erase(previous_acked_sn == max_sn ? leading_readers_ : lagging_readers_, previous_acked_sn, reader);
3260 reader->cur_cumulative_ack_ = ack;
3262 snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3263 previous_acked_sn = acked_sn;
3264 check_leader_lagger();
3267 if (reader->durable_) {
3269 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: enqueuing ReplayDurableData\n"));
3271 reader->durable_data_.
swap(pendingCallbacks);
3277 if (!reader->durable_data_.empty()) {
3279 const LogGuid local_conv(
id_), remote_conv(src);
3281 "local %C has durable for remote %C\n",
3283 remote_conv.
c_str()));
3285 const SequenceNumber& dd_last = reader->durable_data_.rbegin()->first;
3288 "check base %q against last durable %q\n",
3291 if (ack > dd_last) {
3293 log_progress(
"durable delivered",
id_, reader->id_, reader->participant_discovered_at_);
3298 "durable data acked\n"));
3300 reader->durable_data_.
swap(pendingCallbacks);
3303 limit = reader->durable_data_.end(); pos != limit && pos->first < ack;) {
3304 pendingCallbacks.insert(*pos);
3305 reader->durable_data_.erase(pos++);
3314 bool schedule_nack_response =
false;
3315 if (!dont_schedule_nack_response) {
3316 if (count_is_not_zero) {
3317 reader->requests_.
reset();
3322 && ack == max_data_seq(proxy, reader)) {
3327 reader->requests_.insert(ack);
3333 if (!reader->requests_.empty()) {
3334 readers_expecting_data_.insert(reader);
3335 schedule_nack_response =
true;
3336 }
else if (reader->requested_frags_.empty()) {
3337 readers_expecting_data_.erase(reader);
3343 readers_expecting_heartbeat_.insert(reader);
3344 schedule_nack_response =
true;
3348 if (preassociation_readers_.count(reader) == 0) {
3349 make_lagger_leader(reader, previous_acked_sn);
3350 check_leader_lagger();
3354 acked_by_all_helper_i(to_deliver);
3356 #ifdef OPENDDS_SECURITY 3357 if (is_pvs_writer_ &&
3358 !reader->pvs_outstanding_.empty() &&
3359 reader->pvs_outstanding_.low() < reader->cur_cumulative_ack_) {
3362 pos != limit && pos->first < reader->cur_cumulative_ack_; ++pos) {
3364 for (
SequenceNumber seq = pos->first; seq <= pos->second && seq < reader->cur_cumulative_ack_; ++seq) {
3365 reader->pvs_outstanding_.erase(seq);
3367 if (iter != elems_not_acked_.end()) {
3368 send_buff_->release_acked(iter->first);
3369 to_deliver.insert(iter->second);
3370 elems_not_acked_.erase(iter);
3377 if (!dont_schedule_nack_response && schedule_nack_response) {
3386 if (inform_send_listener && client) {
3387 client->data_acked(src);
3391 for (iter_t it = pendingCallbacks.begin();
3392 it != pendingCallbacks.end(); ++it) {
3393 it->second->data_delivered();
3396 TqeSet::iterator deliver_iter = to_deliver.begin();
3397 while (deliver_iter != to_deliver.end()) {
3398 (*deliver_iter)->data_delivered();
3414 MetaSubmessageVec& )
3429 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C\n",
3433 const ReaderInfoMap::iterator ri = remote_readers_.find(src);
3434 if (ri == remote_readers_.end()) {
3436 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C unknown remote reader\n",
3444 if (!compare_and_update_counts(nackfrag.
count.
value, reader->nackfrag_recvd_count_)) {
3446 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C stale/duplicate message\n",
3456 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C seq %q base %u bits %u\n",
3461 readers_expecting_data_.insert(reader);
3487 ReaderMap consolidated_request_readers;
3488 RecipientMap consolidated_recipients_unicast;
3489 RecipientMap consolidated_recipients_multicast;
3490 FragmentInfo consolidated_fragment_requests;
3491 ReaderMap consolidated_fragment_request_readers;
3492 RecipientMap consolidated_fragment_recipients_unicast;
3493 RecipientMap consolidated_fragment_recipients_multicast;
3499 size_t cumulative_send_count = 0;
3501 for (ReaderInfoSet::const_iterator pos = readers_expecting_data_.begin(), limit = readers_expecting_data_.end();
3502 pos != limit; ++pos) {
3509 if (reader->expecting_durable_data()) {
3511 if (!reader->requests_.empty() && !reader->durable_data_.empty()) {
3512 const SequenceNumber dd_first = reader->durable_data_.begin()->first;
3513 const SequenceNumber dd_last = reader->durable_data_.rbegin()->first;
3515 if (reader->requests_.high() < dd_first) {
3517 reader->requests_.
reset();
3521 iter != limit && iter->first <= dd_last; ++iter) {
3525 if (dd_iter != reader->durable_data_.end()) {
3530 reader->requests_.erase(s);
3537 typedef RequestedFragSeqMap::const_iterator rfs_iter;
3538 for (rfs_iter rfs = reader->requested_frags_.begin(), limit = reader->requested_frags_.end(); rfs != limit; ++rfs) {
3540 if (dd_iter != reader->durable_data_.end()) {
3541 for (RequestedFragMap::const_iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3544 }
else if ((!reader->durable_data_.empty() && rfs->first < reader->durable_data_.begin()->first)) {
3549 gather_gaps_i(reader, gaps, meta_submessages);
3554 reader->requests_.
reset();
3555 reader->requested_frags_.clear();
3559 const SequenceNumber first_sn = std::max(non_durable_first_sn(proxy), reader->start_sn_);
3560 if (!reader->requests_.empty() &&
3561 reader->requests_.high() < first_sn) {
3565 reader->requests_.
reset();
3570 iter != limit; ++iter) {
3571 for (
SequenceNumber seq = iter->first; seq <= iter->second; ++seq) {
3573 if (proxy.contains(seq, destination)) {
3576 consolidated_requests.
insert(seq);
3577 consolidated_request_readers[seq].insert(reader->id_);
3578 consolidated_recipients_unicast[seq].insert(addrs.begin(), addrs.end());
3582 }
else if (destination != reader->id_) {
3591 ++cumulative_send_count;
3594 }
else if (proxy.pre_contains(seq) || seq > max_sn_) {
3599 if (durable_ || is_pvs_writer()) {
3604 consolidated_gaps.
insert(seq);
3609 reader->requests_.
reset();
3611 typedef RequestedFragSeqMap::iterator rfs_iter;
3612 const rfs_iter rfs_end = reader->requested_frags_.end();
3613 for (rfs_iter rfs = reader->requested_frags_.begin(); rfs != rfs_end; ++rfs) {
3616 if (proxy.contains(seq, destination)) {
3618 for (RequestedFragMap::iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3619 consolidated_fragment_requests[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
3620 rf->second.bitmap.get_buffer());
3622 consolidated_fragment_request_readers[seq].insert(reader->id_);
3623 consolidated_fragment_recipients_unicast[seq].insert(addrs.begin(), addrs.end());
3627 }
else if (destination != reader->id_) {
3635 for (RequestedFragMap::iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3637 x.
insert(rf->second.bitmapBase.value, rf->second.numBits,
3638 rf->second.bitmap.get_buffer());
3639 proxy.resend_fragments_i(seq, x, cumulative_send_count);
3643 }
else if (proxy.pre_contains(seq) || seq > max_sn_) {
3648 if (durable_ || is_pvs_writer()) {
3653 consolidated_gaps.
insert(seq);
3657 reader->requested_frags_.clear();
3660 gather_gaps_i(reader, gaps, meta_submessages);
3667 pos != limit; ++pos) {
3669 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: " 3670 "resend data %q-%q\n", pos->first.getValue(),
3671 pos->second.getValue()));
3673 for (
SequenceNumber seq = pos->first; seq <= pos->second; ++seq) {
3674 const AddrSet& uni = consolidated_recipients_unicast[seq];
3675 const AddrSet& multi = consolidated_recipients_multicast[seq];
3676 const RepoIdSet& readers = consolidated_request_readers[seq];
3678 if (proxy.has_frags(seq)) {
3679 if (consolidated_fragment_requests.find(seq) == consolidated_fragment_requests.end()) {
3680 consolidated_fragment_requests[seq].insert(1);
3682 consolidated_fragment_recipients_unicast[seq].insert(uni.begin(), uni.end());
3683 consolidated_fragment_recipients_multicast[seq].insert(multi.begin(), multi.end());
3684 consolidated_fragment_request_readers[seq].insert(readers.begin(), readers.end());
3687 link->
send_strategy()->override_destinations(readers.size() * 2 > remote_readers_.size() ? multi : uni);
3690 ++cumulative_send_count;
3695 for (FragmentInfo::const_iterator pos = consolidated_fragment_requests.begin(),
3696 limit = consolidated_fragment_requests.end(); pos != limit; ++pos) {
3697 const AddrSet& uni = consolidated_fragment_recipients_unicast[pos->first];
3698 const AddrSet& multi = consolidated_fragment_recipients_multicast[pos->first];
3699 const RepoIdSet& readers = consolidated_fragment_request_readers[pos->first];
3701 link->
send_strategy()->override_destinations(readers.size() * 2 > remote_readers_.size() ? multi : uni);
3703 proxy.resend_fragments_i(pos->first, pos->second, cumulative_send_count);
3707 if (cumulative_send_count) {
3709 if (cfg && cfg->count_messages()) {
3716 if (!consolidated_gaps.
empty()) {
3719 "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: GAPs:\n"));
3720 consolidated_gaps.
dump();
3722 gather_gaps_i(
ReaderInfo_rch(), consolidated_gaps, meta_submessages);
3724 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: " 3725 "no GAPs to send\n"));
3729 initialize_heartbeat(proxy, meta_submessage);
3732 for (ReaderInfoSet::const_iterator pos = readers_expecting_data_.begin(), limit = readers_expecting_data_.end();
3733 pos != limit; ++pos) {
3735 readers_expecting_heartbeat_.erase(reader);
3736 if (reader->reflects_heartbeat_count()) {
3738 gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3739 reader->required_acknack_count_ = heartbeat_count_;
3742 gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3745 readers_expecting_data_.clear();
3749 meta_submessages.reserve(meta_submessages.size() + readers_expecting_heartbeat_.size());
3750 for (ReaderInfoSet::const_iterator pos = readers_expecting_heartbeat_.begin(), limit = readers_expecting_heartbeat_.end();
3751 pos != limit; ++pos) {
3753 gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3755 readers_expecting_heartbeat_.clear();
3761 ACE_UNUSED_ARG(reader);
3762 #ifdef OPENDDS_SECURITY 3763 if (is_pvs_writer_) {
3764 return reader->max_pvs_sn_;
3768 #ifdef OPENDDS_SECURITY 3778 SNRIS::iterator pos = snris.find(sn);
3779 if (pos == snris.end()) {
3780 pos = snris.insert(RtpsUdpDataLink::SNRIS::value_type(sn, make_rch<ReaderInfoSetHolder>())).first;
3782 pos->second->readers.insert(reader);
3790 SNRIS::iterator pos = snris.find(sn);
3791 if (pos != snris.end()) {
3792 pos->second->readers.erase(reader);
3793 if (pos->second->readers.empty()) {
3803 ACE_UNUSED_ARG(reader_id);
3814 #ifdef OPENDDS_SECURITY 3815 if (!is_pvs_writer_) {
3817 if (previous_max_sn != max_sn_) {
3820 SNRIS::iterator leading_pos = leading_readers_.find(previous_max_sn);
3821 SNRIS::iterator lagging_pos = lagging_readers_.find(previous_max_sn);
3822 if (leading_pos != leading_readers_.end()) {
3823 if (lagging_pos != lagging_readers_.end()) {
3824 lagging_pos->second->readers.insert(leading_pos->second->readers.begin(), leading_pos->second->readers.end());
3826 lagging_readers_[previous_max_sn] = leading_pos->second;
3828 leading_readers_.erase(leading_pos);
3832 #ifdef OPENDDS_SECURITY 3835 const ReaderInfoMap::iterator iter = remote_readers_.find(reader_id);
3836 if (iter == remote_readers_.end()) {
3841 previous_max_sn = reader->max_pvs_sn_;
3842 reader->max_pvs_sn_ = max_sn_;
3843 if (preassociation_readers_.count(reader)) {
3849 if (acked_sn == previous_max_sn && previous_max_sn != max_sn_) {
3850 snris_erase(leading_readers_, acked_sn, reader);
3851 snris_insert(lagging_readers_, reader);
3868 if (previous_acked_sn == acked_sn) {
return; }
3870 #ifdef OPENDDS_SECURITY 3871 if (is_pvs_writer_ && acked_sn > previous_max_sn) {
3872 reader->max_pvs_sn_ = acked_sn;
3877 snris_erase(previous_acked_sn == previous_max_sn ? leading_readers_ : lagging_readers_, previous_acked_sn, reader);
3878 snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3879 if (acked_sn != max_sn) {
3887 return reader->acked_sn() != expected_max_sn(reader);
3893 return reader->acked_sn() == expected_max_sn(reader);
3899 #ifndef OPENDDS_SAFETY_PROFILE 3902 for (SNRIS::const_iterator pos1 = lagging_readers_.begin(), limit = lagging_readers_.end();
3903 pos1 != limit; ++pos1) {
3906 for (ReaderInfoSet::const_iterator pos2 = readers->readers.begin(), limit = readers->readers.end();
3907 pos2 != limit; ++pos2) {
3916 for (SNRIS::const_iterator pos1 = leading_readers_.begin(), limit = leading_readers_.end();
3917 pos1 != limit; ++pos1) {
3920 for (ReaderInfoSet::const_iterator pos2 = readers->readers.begin(), limit = readers->readers.end();
3921 pos2 != limit; ++pos2) {
3936 ACE_UNUSED_ARG(reader_id);
3937 ACE_UNUSED_ARG(seq);
3938 #ifdef OPENDDS_SECURITY 3939 if (!is_pvs_writer_) {
3943 const ReaderInfoMap::iterator iter = remote_readers_.find(reader_id);
3944 if (iter == remote_readers_.end()) {
3949 reader->pvs_outstanding_.insert(seq);
3959 acked_by_all_helper_i(to_deliver);
3962 TqeSet::iterator deliver_iter = to_deliver.begin();
3963 while (deliver_iter != to_deliver.end()) {
3964 (*deliver_iter)->data_delivered();
3985 if (!preassociation_readers_.empty()) {
3986 all_readers_ack = std::min(all_readers_ack, *preassociation_reader_start_sns_.begin());
3988 if (!lagging_readers_.empty()) {
3989 all_readers_ack = std::min(all_readers_ack, lagging_readers_.begin()->first + 1);
3991 if (!leading_readers_.empty()) {
3996 all_readers_ack = std::min(all_readers_ack, leading_readers_.rbegin()->first + 1);
4005 if (!elems_not_acked_.empty()) {
4006 for (iter_t it = elems_not_acked_.begin(), limit = elems_not_acked_.end();
4007 it != limit && it->first < all_readers_ack;) {
4008 send_buff_->release_acked(it->first);
4009 to_deliver.insert(it->second);
4010 elems_not_acked_.erase(it++);
4016 size_t& cumulative_send_count)
4019 static const RTPS::FragmentNumberSet none = { {0}, 0,
RTPS::LongSeq8(0, buffer) };
4024 const RTPS::FragmentNumberSet& fragmentSet,
4025 size_t& cumulative_send_count)
4031 if (addrs.empty()) {
4034 "(%P|%t) ERROR: RtpsUdpDataLink::durability_resend() - " 4035 "no locator for remote %C\n", conv.c_str()));
4040 if (!
send_strategy()->fragmentation_helper(element, to_send)) {
4045 fragments.
insert(fragmentSet.bitmapBase.value, fragmentSet.numBits,
4046 fragmentSet.bitmap.get_buffer());
4049 const TqeVector::iterator end = to_send.end();
4050 for (TqeVector::iterator i = to_send.begin(); i != end; ++i) {
4053 send_strategy()->send_rtps_control(message, *const_cast<ACE_Message_Block*>((*i)->msg()), addrs);
4054 ++cumulative_send_count;
4057 (*i)->data_delivered();
4075 lastFragment = thisElement.second;
4086 MetaSubmessageVec meta_submessages;
4095 WtaMap writers_to_advertise;
4105 tg = make_rch<ConstSharedRepoIdSet>();
4111 readerDoesNotExistCallbacks.push_back(callback);
4116 for (WtaMap::const_iterator pos = writers_to_advertise.begin(),
4117 limit = writers_to_advertise.end();
4120 RtpsWriterMap::const_iterator wpos =
writers_.find(pos->first);
4122 wpos->second->gather_heartbeats(pos->second, meta_submessages);
4129 pos->first.entityId,
4135 for (RepoIdSet::const_iterator it = pos->second->guids_.begin(),
4136 limit = pos->second->guids_.end(); it != limit; ++it) {
4140 meta_submessages.push_back(meta_submessage);
4149 iter != readerDoesNotExistCallbacks.end(); ++iter) {
4162 const SequenceNumber nonDurableFirstSN = non_durable_first_sn(proxy);
4181 MetaSubmessageVec& meta_submessages,
4185 const SequenceNumber first_sn = reader->durable_ ? 1 : std::max(non_durable_first_sn(proxy), reader->start_sn_);
4187 #ifdef OPENDDS_SECURITY 4188 if (is_pvs_writer_ && last_sn < first_sn.
previous()) {
4195 meta_submessage.
dst_guid_ = reader->id_;
4201 meta_submessages.push_back(meta_submessage);
4217 if (remote_reader_guids_) {
4218 const_cast<RepoIdSet&
>(temp->guids_) = remote_reader_guids_->guids_;
4223 const_cast<RepoIdSet&
>(temp->guids_).erase(guid);
4225 remote_reader_guids_ = temp;
4234 if (preassociation_readers_.empty() && lagging_readers_.empty()) {
4238 check_leader_lagger();
4250 const SequenceNumber nonDurableFirstSN = non_durable_first_sn(proxy);
4255 initialize_heartbeat(proxy, meta_submessage);
4258 if (!preassociation_readers_.empty()) {
4259 meta_submessages.reserve(meta_submessages.size() + preassociation_readers_.size());
4260 for (ReaderInfoSet::const_iterator pos = preassociation_readers_.begin(), limit = preassociation_readers_.end();
4261 pos != limit; ++pos) {
4263 gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
4267 if (!lagging_readers_.empty()) {
4268 if (leading_readers_.empty() && remote_readers_.size() > 1
4269 #ifdef OPENDDS_SECURITY 4280 meta_submessages.push_back(meta_submessage);
4283 for (SNRIS::const_iterator snris_pos = lagging_readers_.begin(), snris_limit = lagging_readers_.end();
4284 snris_pos != snris_limit; ++snris_pos) {
4285 meta_submessages.reserve(meta_submessages.size() + snris_pos->second->readers.size());
4286 for (ReaderInfoSet::const_iterator pos = snris_pos->second->readers.begin(),
4287 limit = snris_pos->second->readers.end();
4288 pos != limit; ++pos) {
4290 gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
4299 MetaSubmessageVec& meta_submessages)
4313 initialize_heartbeat(proxy, meta_submessage);
4315 for (RepoIdSet::const_iterator it = additional_guids->guids_.begin(),
4316 limit = additional_guids->guids_.end(); it != limit; ++it) {
4321 meta_submessages.push_back(meta_submessage);
4343 writerDoesNotExistCallbacks.push_back(callback);
4350 for (iter = writerDoesNotExistCallbacks.begin(); iter != writerDoesNotExistCallbacks.end(); ++iter) {
4351 const GUID_t& rid = iter->first;
4377 meta_submessages.push_back(meta_submessage);
4391 const SequenceNumber firstSN = durable_ ? 1 : non_durable_first_sn(proxy);
4393 const int counter = ++heartbeat_count_;
4409 meta_submessages.push_back(meta_submessage);
4414 expunge_durable_data();
4420 durable_data_.swap(dd);
4427 for (iter_t it = durable_data_.begin(); it != durable_data_.end(); ++it) {
4428 it->second->data_dropped();
4436 (durable_timestamp_.is_zero()
4437 || !durable_data_.empty());
4448 bool durable,
SequenceNumber max_sn,
int heartbeat_count,
size_t capacity)
4456 , heartbeat_count_(heartbeat_count)
4457 #ifdef OPENDDS_SECURITY
4464 , fallback_(initial_fallback_)
4473 ACE_TEXT(
"deleting with %d elements left not fully acknowledged\n"),
4488 const SequenceNumber durable_max = ri->durable_data_.empty() ? 0 : ri->durable_data_.rbegin()->first;
4491 return std::max(durable_max, std::max(pre_max, data_max));
4542 const WriterInfoMap::iterator wi = remote_writers_.find(src);
4543 if (wi != remote_writers_.end()) {
4545 const WriterInfo::HeldMap::iterator end = wi->second->held_.upper_bound(ca);
4546 for (WriterInfo::HeldMap::iterator it = wi->second->held_.begin(); it != end; ) {
4547 to_deliver.push_back(it->second);
4548 wi->second->held_.erase(it++);
4560 ACE_TEXT(
" deliver sequence: %q to %C\n"),
4561 it->header_.sequence_.getValue(),
4571 reader_->deliver_held_data(writer_id_);
4621 bool use_peers =
true;
4628 RtpsWriterMap::const_iterator pos =
writers_.find(local);
4630 writer = pos->second;
4636 for (RepoIdSet::const_iterator it = addr_guids->guids_.begin(),
4637 limit = addr_guids->guids_.end(); it != limit; ++it) {
4647 const GUIDSeq_var peers =
peer_ids(local);
4660 if (!last_recv_addr_) {
4663 const ACE_INT16 last_addr_type = last_recv_addr_.get_type();
4666 AddrSet::const_iterator it = unicast_addrs_.lower_bound(limit);
4667 while (it != unicast_addrs_.end() && it->get_type() == last_addr_type) {
4668 if (it->addr_bytes_equal(last_recv_addr_)) {
4679 AddrSet& addresses,
bool prefer_unicast)
const 4686 if (!entry.is_new_) {
4687 addresses.insert(entry.value().addrs_.begin(), entry.value().addrs_.end());
4698 addresses.insert(cfg->rtps_relay_address());
4699 entry.value().addrs_.insert(cfg->rtps_relay_address());
4704 AddrSet normal_addrs;
4707 bool valid_last_recv_addr =
false;
4710 const RemoteInfoMap::const_iterator pos =
locators_.find(remote);
4712 if (prefer_unicast && pos->second.insert_recv_addr(normal_addrs)) {
4713 normal_addrs_expires = pos->second.last_recv_time_ + cfg->receive_address_duration_;
4714 valid_last_recv_addr = (
MonotonicTimePoint::now() - pos->second.last_recv_time_) <= cfg->receive_address_duration_;
4715 }
else if (prefer_unicast && !pos->second.unicast_addrs_.empty()) {
4716 normal_addrs = pos->second.unicast_addrs_;
4717 }
else if (!pos->second.multicast_addrs_.empty()) {
4719 if (pos->second.last_recv_addr_ != NO_ADDR) {
4720 const AddrSet& mc_addrs = pos->second.multicast_addrs_;
4721 for (AddrSet::const_iterator it = mc_addrs.begin(); it != mc_addrs.end(); ++it) {
4722 if (it->get_type() == pos->second.last_recv_addr_.get_type()) {
4723 normal_addrs.insert(*it);
4727 normal_addrs = pos->second.multicast_addrs_;
4730 normal_addrs = pos->second.multicast_addrs_;
4732 }
else if (pos->second.insert_recv_addr(normal_addrs)) {
4733 normal_addrs_expires = pos->second.last_recv_time_ + cfg->receive_address_duration_;
4734 valid_last_recv_addr = (
MonotonicTimePoint::now() - pos->second.last_recv_time_) <= cfg->receive_address_duration_;
4736 normal_addrs = pos->second.unicast_addrs_;
4744 normal_addrs = ipos->second.addresses;
4750 normal_addrs = ipos->second.addresses;
4755 #ifdef OPENDDS_SECURITY 4758 ice_addr =
ice_agent_->get_address(endpoint, local, remote);
4762 if (ice_addr == NO_ADDR) {
4763 addresses.insert(normal_addrs.begin(), normal_addrs.end());
4764 entry.value().addrs_.insert(normal_addrs.begin(), normal_addrs.end());
4765 entry.value().expires_ = normal_addrs_expires;
4767 if (!valid_last_recv_addr && relay_addr != NO_ADDR) {
4768 addresses.insert(relay_addr);
4769 entry.value().addrs_.insert(relay_addr);
4774 if (normal_addrs.count(ice_addr) == 0) {
4775 addresses.insert(ice_addr);
4776 entry.value().addrs_.insert(ice_addr);
4780 addresses.insert(normal_addrs.begin(), normal_addrs.end());
4781 entry.value().addrs_.insert(normal_addrs.begin(), normal_addrs.end());
4782 entry.value().expires_ = normal_addrs_expires;
4785 #ifdef OPENDDS_SECURITY 4801 const GUID_t& reader_id)
const 4803 RtpsWriterMap::mapped_type writer;
4807 RtpsWriterMap::const_iterator pos =
writers_.find(writer_id);
4811 writer = pos->second;
4814 return writer->is_leading(reader_id);
4821 "RtpsUdpDataLink::RtpsWriter::%C: " 4822 "%C pre: %b assoc: %b\n",
4832 "RtpsUdpDataLink::RtpsReader::%C: " 4833 "%C pre: %b assoc: %b\n",
4835 preassociation_writers_.size(), remote_writers_.size()));
bool add_writer(const WriterInfo_rch &info)
SequenceNumberSet readerSNState
void remove_all_msgs(const GUID_t &pub_id)
DataSampleHeader header_
The demarshalled sample header.
sequence< Submessage > SubmessageSeq
void swap(MessageBlock &lhs, MessageBlock &rhs)
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
RcHandle< T > rchandle_from(T *pointer)
static void populate_data_control_submessages(RTPS::SubmessageSeq &subm, const TransportSendControlElement &tsce, bool requires_inline_qos)
void * malloc(size_t nbytes=sizeof(T))
size_t writer_count() const
TimeDuration heartbeat_period_
bool remove_reader(const GUID_t &id)
void ready_to_send()
Indicate that the queue is ready to send after all pending transactions are complete.
void send_heartbeats(const MonotonicTimePoint &now)
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void send_heartbeats(const MonotonicTimePoint &now)
virtual SequenceNumber sequence() const
void begin_transaction()
Signal that a thread is beginning to send a sequence of submessages.
DCPS::EntityId_t readerId
TransportImpl_rch impl() const
const ACE_CDR::UShort HEARTBEAT_SZ
static const size_t MaxSecureSubmessageLeadingSize
EventDispatcher_rch event_dispatcher()
bool requires_inline_qos(const GUIDSeq_var &peers)
int make_reservation(const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
static const size_t MaxSecureFullMessageFollowingSize
DCPS::EntityId_t readerId
SequenceNumber pre_high() const
void flush_send_queue_i()
virtual bool matches(const TransportQueueElement &candidate) const
const DataSampleHeader & get_header() const
void process_acked_by_all()
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
ACE_Message_Block * submsgs_to_msgblock(const RTPS::SubmessageSeq &subm)
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
bool process_data_i(const RTPS::DataSubmessage &data, const GUID_t &src, MetaSubmessageVec &meta_submessages)
char message_id_
The enum MessageId.
CountMapType heartbeat_counts_
AckNackSubmessage acknack_sm
const ACE_CDR::UShort RTPSHDR_SZ
void unregister_for_reader(const GUID_t &writerid, const GUID_t &readerid)
void register_for_writer(const GUID_t &readerid, const GUID_t &writerid, const AddrSet &addresses, DiscoveryListener *listener)
SubmessageHeader smHeader
SequenceNumber previous() const
bool expecting_durable_data() const
virtual bool is_last_fragment() const
Is this QueueElement the last result of fragmentation?
OpenDDS_Dcps_Export TransportDebug transport_debug
RtpsUdpDataLink(const RtpsUdpTransport_rch &transport, const GuidPrefix_t &local_prefix, const RtpsUdpInst_rch &config, const ReactorTask_rch &reactor_task, InternalTransportStatistics &transport_statistics, ACE_Thread_Mutex &transport_statistics_mutex)
void update_last_recv_addr(const GUID_t &src, const NetworkAddress &addr, const MonotonicTimePoint &now=MonotonicTimePoint::now())
bool contains(SequenceNumber seq) const
const TransportSendElement * original_send_element() const
void bundle_and_send_submessages(MetaSubmessageVec &meta_submessages)
virtual bool is_leading(const GUID_t &writer_id, const GUID_t &reader_id) const
bool reflects_heartbeat_count() const
RemoveResult remove_sample(const DataSampleElement *sample)
DCPS::EntityId_t writerId
bool add_reader(const ReaderInfo_rch &reader)
SequenceNumber cumulative_ack() const
ACE_Thread_Mutex fsq_mutex_
void acked_by_all_helper_i(TqeSet &to_deliver)
virtual void pre_stop_i()
const GUID_t GUID_UNKNOWN
Nil value for GUID.
BundlingCache bundling_cache_
bool data_dropped(bool dropped_by_transport=false)
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > bundle_allocator_
ReaderInfoMap remote_readers_
RtpsUdpTransport_rch transport()
bool isReader() const
Returns true if the GUID represents a reader entity.
size_t reader_count() const
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
static bool include_fragment(const TransportQueueElement &element, const DisjointSequence &fragments, SequenceNumber &lastFragment)
RcHandle< ReaderInfo > ReaderInfo_rch
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
bool log_dropped_messages
Log received RTPS messages that were dropped.
ACE_Thread_Mutex elems_not_acked_mutex_
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
void process_heartbeat_frag_i(const RTPS::HeartBeatFragSubmessage &hb_frag, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
GUID_t get_pub_id() const
static void populate_data_sample_submessages(RTPS::SubmessageSeq &subm, const DataSampleElement &dsle, bool requires_inline_qos)
DataBlockAllocator db_allocator_
TransportSendStrategy::LockType LockType
bool is_more_local(const NetworkAddress ¤t, const NetworkAddress &incoming)
void gather_nack_replies_i(MetaSubmessageVec &meta_submessages)
const char * c_str() const
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
RepoIdSet pending_reliable_readers_
void accumulate_addresses(const GUID_t &local, const GUID_t &remote, AddrSet &addresses, bool prefer_unicast=false) const
RtpsUdpReceiveStrategy_rch receive_strategy()
RcHandle< SporadicEvent > harvest_send_queue_sporadic_
#define OPENDDS_ASSERT(C)
void send_nack_responses(const MonotonicTimePoint &now)
void end_historic_samples_i(const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
const ACE_CDR::UShort INFO_DST_SZ
const DataSampleElement * sample() const
Original sample from send listener.
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
const SampleStateMask ANY_SAMPLE_STATE
OpenDDS_Dcps_Export GUID_t make_unknown_guid(const GuidPrefix_t &prefix)
const GUID_t & id() const
virtual GUID_t publication_id() const
Accessor for the publisher id.
DataBlockLock * get_lock()
static bool control_message_supported(char message_id)
virtual SequenceNumber sequence() const
void update_required_acknack_count(const GUID_t &local_id, const GUID_t &remote_id, CORBA::Long current)
void process_nackfrag(const RTPS::NackFragSubmessage &nackfrag, const GUID_t &src, MetaSubmessageVec &meta_submessages)
#define OPENDDS_MULTIMAP(K, T)
key GuidPrefix_t guidPrefix
ACE_SOCK_Dgram unicast_socket_
bool contains_any(const SequenceRange &range) const
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint() const
reference_wrapper< T > ref(T &r)
DCPS::EntityId_t writerId
ACE_Thread_Mutex & transport_statistics_mutex_
T::rv_reference move(T &p)
CountMap::iterator next_directed_unassigned_
void queue_submessages(MetaSubmessageVec &meta_submessages)
SequenceNumber high() const
int set_option(int level, int option, void *optval, int optlen) const
ACE_Thread_Mutex readers_lock_
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
void disable_response_queue(bool send_immediately)
void swap_durable_data(OPENDDS_MAP(SequenceNumber, TransportQueueElement *)&dd)
SequenceNumber_t writerSN
static const suseconds_t DEFAULT_NAK_RESPONSE_DELAY_USEC
char * rd_ptr(void) const
const size_t max_bundle_size_
RtpsReaderMultiMap readers_of_writer_
void check_leader_lagger() const
void process_acknack(const RTPS::AckNackSubmessage &acknack, const GUID_t &src, MetaSubmessageVec &meta_submessages)
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
void gather_heartbeats_i(MetaSubmessageVec &meta_submessages)
bool is_lagging(const ReaderInfo_rch &reader) const
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
void register_for_reader(const GUID_t &writerid, const GUID_t &readerid, const AddrSet &addresses, DiscoveryListener *listener)
Conversion processing and value testing utilities for RTPS GUID_t types.
RcHandle< SingleSendBuffer > send_buff_
bool requires_inline_qos_
void generate_nack_frags_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &wi, EntityId_t reader_id, EntityId_t writer_id, ACE_CDR::ULong &cumulative_bits_added)
const ACE_CDR::UShort SMHDR_SZ
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
RtpsUdpInst_rch config() const
void process_heartbeat_i(const RTPS::HeartBeatSubmessage &heartbeat, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
typedef OPENDDS_MAP_CMP(GUID_t, RemoteInfo, GUID_tKeyLessThan) RemoteInfoMap
bool bitmapNonEmpty(const SequenceNumberSet &snSet)
static TimePoint_T< MonotonicClock > now()
SequenceNumber max_data_seq(const SingleSendBuffer::Proxy &proxy, const ReaderInfo_rch &) const
SubmessageHeader smHeader
void send_heartbeats_manual_i(MetaSubmessageVec &meta_submessages)
const GuidPrefix_t & local_prefix() const
void set_type(ACE_INT16 type)
const DataSampleHeader & header() const
InternalTransportStatistics & transport_statistics_
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
CORBA::Long best_effort_heartbeat_count_
void disassociated(const GUID_t &local, const GUID_t &remote)
DCPS::EntityId_t writerId
void remove_all_msgs(const GUID_t &pub_id)
void remove_id(const GUID_t &val)
RtpsReader(const RtpsUdpDataLink_rch &link, const GUID_t &id)
ACE_Reactor * get_reactor()
GuidPrefix_t local_prefix_
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
void add_elem_awaiting_ack(TransportQueueElement *element)
const OpenDDSParticipantFlagsBits_t PFLAGS_DIRECTED_HEARTBEAT
const EntityId_t ENTITYID_PARTICIPANT
bool insert_recv_addr(AddrSet &aset) const
static const TimePoint_T< MonotonicClock > zero_value
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
RemoveResult remove_sample(const DataSampleElement *sample)
virtual bool is_fragment() const
Is this QueueElement the result of fragmentation?
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
virtual TransportQueueElement * customize_queue_element(TransportQueueElement *element)
CORBA::Long inc_heartbeat_count()
DataSample * get_sample() const
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
bool is_leading(const ReaderInfo_rch &reader) const
bool pre_contains(SequenceNumber sequence) const
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > custom_allocator_
NetworkAddress get_last_recv_address(const GUID_t &remote_id)
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
Class to serialize and deserialize data for DDS.
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
void client_stop(const GUID_t &localId)
DCPS::EntityId_t readerId
void send_preassociation_acknacks(const MonotonicTimePoint &now)
static const void * body(MD5_CTX *ctx, const void *data, unsigned long size)
CORBA::Long heartbeat_count_
InterestingRemoteMapType interesting_readers_
RcHandle< InternalDataReader< NetworkInterfaceAddress > > network_interface_address_reader_
void end_transaction(MetaSubmessageVec &vec)
unsigned short submessageLength
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
SequenceNumber_t writerSN
Holds a data sample received by the transport.
SequenceNumber sequence() const
void update_required_acknack_count(const GUID_t &id, CORBA::Long current)
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
GuidCountMap writer_resend_count
SequenceNumber low() const
const InstanceStateMask ANY_INSTANCE_STATE
void durability_resend(TransportQueueElement *element, size_t &cumulative_send_count)
void ignore(const GUID_t &local, const GUID_t &remote)
Mark all queued submessage with the given source and destination as ignored.
TransactionalRtpsSendQueue sq_
void pre_stop_helper(TqeVector &to_drop, bool true_stop)
RcHandle< SporadicEvent > preassociation_task_
void process_gap_i(const RTPS::GapSubmessage &gap, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
const ViewStateMask ANY_VIEW_STATE
ACE_Message_Block * cont(void) const
void gather_gaps_i(const ReaderInfo_rch &reader, const DisjointSequence &gaps, MetaSubmessageVec &meta_submessages)
FragmentNumberSet fragmentNumberState
const char * c_str() const
virtual ACE_Message_Block * duplicate(void) const
const size_t ONE_SAMPLE_PER_PACKET
SequenceNumber last_ack() const
static const TimePoint_T< MonotonicClock > max_value
static void snris_insert(RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
Data structure representing an "interesting" remote entity for static discovery.
Seq::size_type grow(Seq &seq)
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
void datawriter_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
DCPS::EntityId_t writerId
bool contains(SequenceNumber value) const
static SequenceNumber ZERO()
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
size_t dedup(MetaSubmessageVec &vec)
ACE_UINT32 message_length_
WriterInfoMap remote_writers_
RcHandle< JobQueue > job_queue_
bool add_on_start_callback(const TransportClient_wrch &client, const GUID_t &remote)
void build_meta_submessage_map(MetaSubmessageVec &meta_submessages, AddrDestMetaSubmessageMap &addr_map)
void ignore_remote(const GUID_t &id)
Mark all queued submessage with the given destination (dst_guid_) as ignored.
IdCountMapping heartbeat_counts_
GUIDSeq * peer_ids(const GUID_t &local_id) const
SnToTqeMap elems_not_acked_
OpenDDS_Dcps_Export void align(size_t &value, size_t by)
Align "value" by "by" if it's not already.
AddrSet get_addresses(const GUID_t &local, const GUID_t &remote) const
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER
void expunge_durable_data()
IdCountSet nackfrag_counts_
bool associated(const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
void log_remote_counts(const char *funcname)
static ACE_CDR::ULong bitmap_num_longs(const SequenceNumber &low, const SequenceNumber &high)
ACE_Thread_Mutex locators_lock_
MulticastManager multicast_manager_
DiscoveryListener * listener
Callback to invoke.
SubmessageHeader smHeader
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
Security::SecurityConfig_rch security_config_
RemoveResult remove_sample(const DataSampleElement *sample)
DCPS::EntityId_t readerId
AddrSet get_addresses_i(const GUID_t &local, const GUID_t &remote) const
void update_locators(const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
SequenceNumber_t gapStart
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
void append_submessage(RTPS::Message &message, const RTPS::InfoDestinationSubmessage &submessage)
WriterInfoSet preassociation_writers_
bool log_remote_counts
Log number of associations and pending associations of RTPS entities.
ACE_SOCK_Dgram & unicast_socket()
bool has_writer(const GUID_t &id) const
bool remove_writer(const GUID_t &id)
static const TimeDuration zero_value
void unregister_for_writer(const GUID_t &readerid, const GUID_t &writerid)
void make_leader_lagger(const GUID_t &reader, SequenceNumber previous_max_sn)
Security::HandleRegistry_rch handle_registry_
void check_heartbeats(const MonotonicTimePoint &now)
std::pair< SequenceNumber, SequenceNumber > SequenceRange
bool open(const ACE_SOCK_Dgram &unicast_socket)
ACE_SOCK_Dgram_Mcast multicast_socket_
FragmentNumber_t lastFragmentNum
MessageBlockAllocator mb_allocator_
DCPS::EntityId_t writerId
sequence< long, 8 > LongSeq8
virtual bool is_last_fragment() const
Is this QueueElement the last result of fragmentation?
void gather_ack_nacks_i(const WriterInfo_rch &writer, const RtpsUdpDataLink_rch &link, bool heartbeat_was_non_final, MetaSubmessageVec &meta_submessages, ACE_CDR::ULong &cumulative_bits_added)
RcHandle< ICE::Agent > ice_agent_
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER
HeartBeatSubmessage heartbeat_sm
CountMap::iterator next_undirected_unassigned_
virtual void reader_does_not_exist(const GUID_t &readerid, const GUID_t &writerid)=0
static bool force_inline_qos_
static member used by testing code to force inline qos
std::pair< GUID_t, InterestingRemote > CallbackType
virtual void release_reservations_i(const GUID_t &remote_id, const GUID_t &local_id)
bool align_w(size_t alignment)
ACE_Message_Block * alloc_msgblock(size_t size, ACE_Allocator *data_allocator)
GUID_t localid
id of local entity that is interested in this remote.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
RcHandle< SporadicEvent > flush_send_queue_sporadic_
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
void add_gap_submsg_i(RTPS::SubmessageSeq &msg, SequenceNumber gap_start)
static void extend_bitmap_range(RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent, ACE_CDR::ULong &cumulative_bits_added)
Sequence number abstraction. Only allows positive 64 bit values.
void send_heartbeats_manual_i(const TransportSendControlElement *tsce, MetaSubmessageVec &meta_submessages)
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER
virtual GUID_t subscription_id() const
Accessor for the subscription id, if sent the sample is sent to 1 sub.
const size_t max_bundle_size_
static const ACE_Time_Value zero
bool to_bitmap(ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added, bool invert=false) const
WeakRcHandle< RtpsUdpDataLink > link_
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
DCPS::RcHandle< ICE::Agent > get_ice_agent() const
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
static const Value MAX_VALUE
void network_change() const
Adapt the TransportReceiveStrategy for RTPS's "sample" (submessage) Header.
bool log_messages
Log all RTPS messages sent or recieved.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
SequenceNumber update_max_sn(const GUID_t &reader, SequenceNumber seq)
void datareader_dispatch(const T &submessage, const GuidPrefix_t &src_prefix, bool directed, const FN &func)
InterestingRemoteMapType interesting_writers_
void gather_heartbeats(RcHandle< ConstSharedRepoIdSet > additional_guids, MetaSubmessageVec &meta_submessages)
WriterToSeqReadersMap writer_to_seq_best_effort_readers_
bool log_nonfinal_messages
void make_lagger_leader(const ReaderInfo_rch &reader, const SequenceNumber previous_acked_sn)
void update_remote_guids_cache_i(bool add, const GUID_t &guid)
virtual void pre_stop_i()
void log_remote_counts(const char *funcname)
RtpsWriter(const TransportClient_rch &client, const RtpsUdpDataLink_rch &link, const GUID_t &id, bool durable, SequenceNumber max_sn, CORBA::Long heartbeat_count, size_t capacity)
bool enqueue(const MetaSubmessage &ms)
typedef OPENDDS_SET(ReaderInfo_rch) ReaderInfoSet
virtual GUID_t publication_id() const =0
Accessor for the publication id that sent the sample.
const octet OPENDDS_FLAG_R
void harvest_send_queue(const MonotonicTimePoint &now)
static const size_t MaxSecureFullMessageLeadingSize
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
ACE_UINT64 id_
The id for this DataLink.
ACE_Thread_Mutex writers_lock_
void flush_send_queue(const MonotonicTimePoint &now)
SequenceNumber_t writerSN
RcHandle< SingleSendBuffer > get_writer_send_buffer(const GUID_t &pub_id)
NackFragSubmessage nack_frag_sm
void assign(EntityId_t &dest, const EntityId_t &src)
const EntityId_t ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.
void filterBestEffortReaders(const ReceivedDataSample &ds, RepoIdSet &selected, RepoIdSet &withheld)
CORBA::Long nackfrag_count_
const long LENGTH_UNLIMITED
RtpsUdpSendStrategy_rch send_strategy()
virtual bool dispatch(EventBase_rch event)=0
static bool separate_message(EntityId_t entity)
int insert(Container &c, const ValueType &v)
GuidCountMap reader_nack_count
LocatorCache locator_cache_
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
SequenceNumber last_fragment() const
const EntityId_t ENTITYID_UNKNOWN
bool should_nack_fragments(const RcHandle< RtpsUdpDataLink > &link, const WriterInfo_rch &info)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
void job_queue(JobQueue_rch job_queue)
#define TheServiceParticipant
bool isWriter() const
Returns true if the GUID represents a writer entity.
RcHandle< PeriodicEvent > heartbeatchecker_
void remove_locator_and_bundling_cache(const GUID_t &remote_id)
RtpsUdpDataLink::BundleVec & bundles_
static const size_t initial_size
void request_ack_i(const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
bool add_delayed_notification(TransportQueueElement *element)
The Internal API and Implementation of OpenDDS.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
void on_data_available(RcHandle< InternalDataReader< NetworkInterfaceAddress > > reader)
SequenceNumber high() const
DCPS::GuidPrefix_t guidPrefix
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
void received(const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix, const NetworkAddress &remote_addr)
SubmessageSeq submessages
static const size_t MaxSecureSubmessageFollowingSize
unique_ptr< DataBlockLockPool > db_lock_pool_
void ignore_local(const GUID_t &id)
Mark all queued submessage with the given source (src_guid_) as ignored.
RcHandle< PeriodicEvent > heartbeat_
bool log_progress
Log progress for RTPS entity discovery and association.
Base wrapper class around a data/control sample to be sent.
void deliver_held_data(const GUID_t &src)
void record_directed(const GUID_t &reader, SequenceNumber seq)
TransportQueueElement * customize_queue_element_non_reliable_i(TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send, ACE_Guard< ACE_Thread_Mutex > &guard)
bool has_reader(const GUID_t &id) const
void bundle_mapped_meta_submessages(const Encoding &encoding, AddrDestMetaSubmessageMap &addr_map, BundleVec &bundles, CountKeeper &counts)
TransportQueueElement * customize_queue_element_helper(TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
bool sends_directed_hb() const
virtual void writer_does_not_exist(const GUID_t &writerid, const GUID_t &readerid)=0
SequenceNumberSet gapList
const size_t SM_ALIGN
Alignment of RTPS Submessage.
const OpenDDSParticipantFlagsBits_t PFLAGS_REFLECT_HEARTBEAT_COUNT
SubmessageHeader smHeader
void align(size_t &value, size_t by=(std::numeric_limits< size_t >::max)()) const
Align "value" to "by" and according to the stream's alignment.
void gather_preassociation_acknack_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &writer)
void insert(SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
bool process(InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
Returns true if at least one group was joined.
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)
void enable_response_queue()
const Encoding & encoding_
void invoke_on_start_callbacks(bool success)