OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader Class Reference
Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader:
Collaboration graph
[legend]

Public Member Functions

 RtpsReader (const RtpsUdpDataLink_rch &link, const GUID_t &id)
 
virtual ~RtpsReader ()
 
bool add_writer (const WriterInfo_rch &info)
 
bool has_writer (const GUID_t &id) const
 
bool remove_writer (const GUID_t &id)
 
size_t writer_count () const
 
bool should_nack_fragments (const RcHandle< RtpsUdpDataLink > &link, const WriterInfo_rch &info)
 
void pre_stop_helper ()
 
void process_heartbeat_i (const RTPS::HeartBeatSubmessage &heartbeat, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
 
bool process_data_i (const RTPS::DataSubmessage &data, const GUID_t &src, MetaSubmessageVec &meta_submessages)
 
void process_gap_i (const RTPS::GapSubmessage &gap, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
 
void process_heartbeat_frag_i (const RTPS::HeartBeatFragSubmessage &hb_frag, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
 
void deliver_held_data (const GUID_t &src)
 
const GUID_tid () const
 
void log_remote_counts (const char *funcname)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Member Functions

void send_preassociation_acknacks (const MonotonicTimePoint &now)
 
void gather_preassociation_acknack_i (MetaSubmessageVec &meta_submessages, const WriterInfo_rch &writer)
 
void gather_ack_nacks_i (const WriterInfo_rch &writer, const RtpsUdpDataLink_rch &link, bool heartbeat_was_non_final, MetaSubmessageVec &meta_submessages, ACE_CDR::ULong &cumulative_bits_added)
 
void generate_nack_frags_i (MetaSubmessageVec &meta_submessages, const WriterInfo_rch &wi, EntityId_t reader_id, EntityId_t writer_id, ACE_CDR::ULong &cumulative_bits_added)
 

Private Attributes

ACE_Thread_Mutex mutex_
 
WeakRcHandle< RtpsUdpDataLinklink_
 
const GUID_t id_
 
WriterInfoMap remote_writers_
 
WriterInfoSet preassociation_writers_
 
bool stopping_
 
CORBA::Long nackfrag_count_
 
RcHandle< SporadicEventpreassociation_task_
 
TimeDuration heartbeat_period_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Detailed Description

Definition at line 621 of file RtpsUdpDataLink.h.

Constructor & Destructor Documentation

◆ RtpsReader()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::RtpsReader ( const RtpsUdpDataLink_rch link,
const GUID_t id 
)

Definition at line 1650 of file RtpsUdpDataLink.cpp.

1651  : link_(link)
1652  , id_(id)
1653  , stopping_(false)
1654  , nackfrag_count_(0)
1655  , preassociation_task_(make_rch<SporadicEvent>(link->event_dispatcher(), make_rch<PmfNowEvent<RtpsReader> >(rchandle_from(this), &RtpsUdpDataLink::RtpsReader::send_preassociation_acknacks)))
1656  , heartbeat_period_(link ? link->config()->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC))
1657 {
1658 }
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
RcHandle< SporadicEvent > preassociation_task_
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
void send_preassociation_acknacks(const MonotonicTimePoint &now)

◆ ~RtpsReader()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::~RtpsReader ( )
virtual

Definition at line 1660 of file RtpsUdpDataLink.cpp.

1661 {
1662 }

Member Function Documentation

◆ add_writer()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::add_writer ( const WriterInfo_rch info)

Definition at line 2149 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, gather_preassociation_acknack_i(), heartbeat_period_, link_, log_remote_counts(), mutex_, preassociation_task_, preassociation_writers_, OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages(), remote_writers_, and stopping_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::associated().

2150 {
2152 
2153  if (stopping_) {
2154  return false;
2155  }
2156 
2157  WriterInfoMap::const_iterator iter = remote_writers_.find(writer->id_);
2158  if (iter == remote_writers_.end()) {
2159  remote_writers_[writer->id_] = writer;
2160  preassociation_writers_.insert(writer);
2161  log_remote_counts("add_writer");
2162 
2163  RtpsUdpDataLink_rch link = link_.lock();
2164  if (!link) {
2165  return false;
2166  }
2167 
2169  MetaSubmessageVec meta_submessages;
2170  gather_preassociation_acknack_i(meta_submessages, writer);
2171  g.release();
2172  link->queue_submessages(meta_submessages);
2173 
2174  return true;
2175  }
2176  return false;
2177 }
void gather_preassociation_acknack_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &writer)
RcHandle< SporadicEvent > preassociation_task_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ deliver_held_data()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::deliver_held_data ( const GUID_t src)

Definition at line 4525 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::link_, LM_DEBUG, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::mutex_, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::stopping_, and OpenDDS::DCPS::Transport_debug_level.

4526 {
4528 
4529  if (stopping_) {
4530  return;
4531  }
4532 
4533  RtpsUdpDataLink_rch link = link_.lock();
4534 
4535  if (!link) {
4536  return;
4537  }
4538 
4539  OPENDDS_VECTOR(ReceivedDataSample) to_deliver;
4540 
4541  const WriterInfoMap::iterator wi = remote_writers_.find(src);
4542  if (wi != remote_writers_.end()) {
4543  const SequenceNumber ca = wi->second->recvd_.cumulative_ack();
4544  const WriterInfo::HeldMap::iterator end = wi->second->held_.upper_bound(ca);
4545  for (WriterInfo::HeldMap::iterator it = wi->second->held_.begin(); it != end; /*increment in loop body*/) {
4546  to_deliver.push_back(it->second);
4547  wi->second->held_.erase(it++);
4548  }
4549  }
4550 
4551  const GUID_t dst = id_;
4552 
4553  g.release();
4554 
4555  for (OPENDDS_VECTOR(ReceivedDataSample)::iterator it = to_deliver.begin(); it != to_deliver.end(); ++it) {
4556  if (Transport_debug_level > 5) {
4557  LogGuid reader(dst);
4558  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::DeliverHeldData::~DeliverHeldData -")
4559  ACE_TEXT(" deliver sequence: %q to %C\n"),
4560  it->header_.sequence_.getValue(),
4561  reader.c_str()));
4562  }
4563  link->data_received(*it, dst);
4564  }
4565 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
WeakRcHandle< RtpsUdpDataLink > link_
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
ACE_TEXT("TCP_Factory")
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ gather_ack_nacks_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::gather_ack_nacks_i ( const WriterInfo_rch writer,
const RtpsUdpDataLink_rch link,
bool  heartbeat_was_non_final,
MetaSubmessageVec &  meta_submessages,
ACE_CDR::ULong cumulative_bits_added 
)
private

Definition at line 2286 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, OpenDDS::DCPS::DisjointSequence::bitmap_num_longs(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, generate_nack_frags_i(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::high(), id_, OpenDDS::DCPS::DisjointSequence::last_ack(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), should_nack_fragments(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::DCPS::DisjointSequence::to_bitmap(), and OpenDDS::RTPS::to_rtps_seqnum().

Referenced by process_heartbeat_frag_i(), and process_heartbeat_i().

2291 {
2292  const bool should_nack_frags = should_nack_fragments(link, writer);
2293  if (writer->should_nack() ||
2294  should_nack_frags) {
2295  using namespace OpenDDS::RTPS;
2296  const EntityId_t reader_id = id_.entityId;
2297  const EntityId_t writer_id = writer->id_.entityId;
2298  MetaSubmessage meta_submessage(id_, writer->id_);
2299 
2300  const DisjointSequence& recvd = writer->recvd_;
2301  const SequenceNumber& hb_high = writer->hb_last_;
2302  const SequenceNumber ack = recvd.empty() ? 1 : ++SequenceNumber(recvd.cumulative_ack());
2303  const SequenceNumber::Value ack_val = ack.getValue();
2304  CORBA::ULong num_bits = 0;
2305  LongSeq8 bitmap;
2306 
2307  if (recvd.disjoint()) {
2308  bitmap.length(DisjointSequence::bitmap_num_longs(ack, recvd.last_ack().previous()));
2309  if (bitmap.length() > 0) {
2310  (void)recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
2311  num_bits, cumulative_bits_added, true);
2312  }
2313  }
2314 
2315  if (!recvd.empty() && hb_high > recvd.high()) {
2316  const SequenceNumber eff_high =
2317  (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
2318  const SequenceNumber::Value eff_high_val = eff_high.getValue();
2319  // Nack the range between the received high and the effective high.
2320  const CORBA::ULong old_len = bitmap.length(),
2321  new_len = DisjointSequence::bitmap_num_longs(ack, eff_high);
2322  if (new_len > old_len) {
2323  bitmap.length(new_len);
2324  for (CORBA::ULong i = old_len; i < new_len; ++i) {
2325  bitmap[i] = 0;
2326  }
2327  }
2328  const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
2329  idx_recv_high = recvd.disjoint() ?
2330  CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
2331  DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
2332  bitmap.get_buffer(), new_len,
2333  num_bits, cumulative_bits_added);
2334  }
2335 
2336  // If the receive strategy is holding any fragments, those should
2337  // not be "nacked" in the ACKNACK reply. They will be accounted for
2338  // in the NACK_FRAG(s) instead.
2339  const bool frags_modified =
2340  link->receive_strategy()->remove_frags_from_bitmap(bitmap.get_buffer(),
2341  num_bits, ack, writer->id_, cumulative_bits_added);
2342  if (frags_modified) {
2343  for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
2344  if ((i + 1) * 32 <= num_bits) {
2345  if (bitmap[i]) {
2346  break;
2347  }
2348  } else {
2349  if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
2350  break;
2351  }
2352  }
2353  }
2354  }
2355 
2356  AckNackSubmessage acknack = {
2357  {ACKNACK,
2359  0 /*length*/},
2360  reader_id,
2361  writer_id,
2362  { // SequenceNumberSet: acking bitmapBase - 1
2363  to_rtps_seqnum(ack),
2364  num_bits, bitmap
2365  },
2366  {writer->heartbeat_recvd_count_}
2367  };
2368  meta_submessage.sm_.acknack_sm(acknack);
2369  meta_submessages.push_back(meta_submessage);
2370 
2371  if (should_nack_frags) {
2372  generate_nack_frags_i(meta_submessages, writer, reader_id, writer_id, cumulative_bits_added);
2373  }
2374  } else if (heartbeat_was_non_final) {
2375  using namespace OpenDDS::RTPS;
2376  const DisjointSequence& recvd = writer->recvd_;
2377  const CORBA::ULong num_bits = 0;
2378  const LongSeq8 bitmap;
2379  const SequenceNumber ack = recvd.empty() ? 1 : ++SequenceNumber(recvd.cumulative_ack());
2380  const EntityId_t reader_id = id_.entityId;
2381  const EntityId_t writer_id = writer->id_.entityId;
2382 
2383  MetaSubmessage meta_submessage(id_, writer->id_);
2384 
2385  AckNackSubmessage acknack = {
2386  {ACKNACK,
2388  0 /*length*/},
2389  reader_id,
2390  writer_id,
2391  { // SequenceNumberSet: acking bitmapBase - 1
2392  to_rtps_seqnum(ack),
2393  num_bits, bitmap
2394  },
2395  {writer->heartbeat_recvd_count_}
2396  };
2397  meta_submessage.sm_.acknack_sm(acknack);
2398  meta_submessages.push_back(meta_submessage);
2399  }
2400 }
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
const octet FLAG_E
Definition: RtpsCore.idl:518
void generate_nack_frags_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &wi, EntityId_t reader_id, EntityId_t writer_id, ACE_CDR::ULong &cumulative_bits_added)
static ACE_CDR::ULong bitmap_num_longs(const SequenceNumber &low, const SequenceNumber &high)
ACE_CDR::ULong ULong
const octet FLAG_F
Definition: RtpsCore.idl:520
ACE_CDR::Octet Octet
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.
bool should_nack_fragments(const RcHandle< RtpsUdpDataLink > &link, const WriterInfo_rch &info)

◆ gather_preassociation_acknack_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::gather_preassociation_acknack_i ( MetaSubmessageVec &  meta_submessages,
const WriterInfo_rch writer 
)
private

Definition at line 2256 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::FLAG_E, id_, OPENDDS_ASSERT, and OpenDDS::DCPS::MetaSubmessage::sm_.

Referenced by add_writer(), and send_preassociation_acknacks().

2258 {
2259  using namespace OpenDDS::RTPS;
2260 
2261  OPENDDS_ASSERT(writer->recvd_.empty());
2262  const CORBA::ULong num_bits = 0;
2263  const LongSeq8 bitmap;
2264  const EntityId_t reader_id = id_.entityId;
2265  const EntityId_t writer_id = writer->id_.entityId;
2266 
2267  MetaSubmessage meta_submessage(id_, writer->id_);
2268 
2269  AckNackSubmessage acknack = {
2270  {ACKNACK,
2272  0 /*length*/},
2273  reader_id,
2274  writer_id,
2275  { // SequenceNumberSet: acking bitmapBase - 1
2276  {0, 1},
2277  num_bits, bitmap
2278  },
2279  {writer->heartbeat_recvd_count_}
2280  };
2281  meta_submessage.sm_.acknack_sm(acknack);
2282  meta_submessages.push_back(meta_submessage);
2283 }
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
const octet FLAG_E
Definition: RtpsCore.idl:518
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
ACE_CDR::ULong ULong
ACE_CDR::Octet Octet
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69

◆ generate_nack_frags_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::generate_nack_frags_i ( MetaSubmessageVec &  meta_submessages,
const WriterInfo_rch wi,
EntityId_t  reader_id,
EntityId_t  writer_id,
ACE_CDR::ULong cumulative_bits_added 
)
private

Definition at line 2881 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::RtpsUdpDataLink::extend_bitmap_range(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, id_, link_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, nackfrag_count_, OpenDDS::RTPS::FragmentNumberSet::numBits, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP(), OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::to_rtps_seqnum(), OpenDDS::RTPS::Count_t::value, OpenDDS::RTPS::FragmentNumber_t::value, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

Referenced by gather_ack_nacks_i().

2886 {
2887  typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
2888  typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
2889  RtpsUdpReceiveStrategy::FragmentInfo frag_info;
2890 
2891  // This is an internal method, locks already locked,
2892  // we just need a local handle to the link
2893  RtpsUdpDataLink_rch link = link_.lock();
2894 
2895  // Populate frag_info with two possible sources of NackFrags:
2896  // 1. sequence #s in the reception gaps that we have partially received
2897  OPENDDS_VECTOR(SequenceRange) missing = wi->recvd_.missing_sequence_ranges();
2898  for (size_t i = 0; i < missing.size(); ++i) {
2899  link->receive_strategy()->has_fragments(missing[i], wi->id_, &frag_info);
2900  }
2901  // 1b. larger than the last received seq# but less than the heartbeat.lastSN
2902  if (!wi->recvd_.empty() && wi->recvd_.high() < wi->hb_last_) {
2903  const SequenceRange range(wi->recvd_.high() + 1, wi->hb_last_);
2904  link->receive_strategy()->has_fragments(range, wi->id_, &frag_info);
2905  }
2906  for (size_t i = 0; i < frag_info.size(); ++i) {
2907  // If we've received a HeartbeatFrag, we know the last (available) frag #
2908  const iter_t heartbeat_frag = wi->frags_.find(frag_info[i].first);
2909  if (heartbeat_frag != wi->frags_.end()) {
2910  extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value, cumulative_bits_added);
2911  }
2912  }
2913 
2914  // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag
2915  const iter_t low = wi->frags_.lower_bound(wi->recvd_.cumulative_ack()),
2916  high = wi->frags_.upper_bound(wi->recvd_.last_ack()),
2917  end = wi->frags_.end();
2918  for (iter_t iter = wi->frags_.begin(); iter != end; ++iter) {
2919  if (iter == low) {
2920  // skip over the range covered by step #1 above
2921  if (high == end) {
2922  break;
2923  }
2924  iter = high;
2925  }
2926 
2927  const SequenceRange range(iter->first, iter->first);
2928  if (!link->receive_strategy()->has_fragments(range, wi->id_, &frag_info)) {
2929  // it was not in the recv strategy, so the entire range is "missing"
2930  frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
2931  RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
2932  fnSet.bitmapBase.value = 1;
2933  fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
2934  fnSet.bitmap.length((fnSet.numBits + 31) / 32);
2935  for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
2936  fnSet.bitmap[i] = 0xFFFFFFFF;
2937  }
2938  }
2939  }
2940 
2941  if (frag_info.empty()) {
2942  return;
2943  }
2944 
2945  const RTPS::NackFragSubmessage nackfrag_prototype = {
2946  {RTPS::NACK_FRAG, RTPS::FLAG_E, 0 /* length set below */},
2947  reader_id,
2948  writer_id,
2949  {0, 0}, // writerSN set below
2950  RTPS::FragmentNumberSet(), // fragmentNumberState set below
2951  {0} // count set below
2952  };
2953 
2954  meta_submessages.reserve(meta_submessages.size() + frag_info.size());
2955  for (size_t i = 0; i < frag_info.size(); ++i) {
2956  MetaSubmessage meta_submessage(id_, wi->id_);
2957  meta_submessage.sm_.nack_frag_sm(nackfrag_prototype);
2958  RTPS::NackFragSubmessage& nackfrag = meta_submessage.sm_.nack_frag_sm();
2959  nackfrag.writerSN = to_rtps_seqnum(frag_info[i].first);
2960  nackfrag.fragmentNumberState = frag_info[i].second;
2961  nackfrag.count.value = ++nackfrag_count_;
2962  meta_submessages.push_back(meta_submessage);
2963  }
2964 }
const octet FLAG_E
Definition: RtpsCore.idl:518
static void extend_bitmap_range(RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent, ACE_CDR::ULong &cumulative_bits_added)
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
ACE_CDR::ULong ULong
WeakRcHandle< RtpsUdpDataLink > link_
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ has_writer()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::has_writer ( const GUID_t id) const

Definition at line 2180 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, mutex_, and remote_writers_.

2181 {
2183  return remote_writers_.count(id) != 0;
2184 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ id()

const GUID_t& OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id ( void  ) const
inline

◆ log_remote_counts()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::log_remote_counts ( const char *  funcname)

Definition at line 4827 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::id_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_remote_counts, OPENDDS_END_VERSIONED_NAMESPACE_DECL, and OpenDDS::DCPS::transport_debug.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_reader(), add_writer(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acknack(), process_heartbeat_i(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_reader(), and remove_writer().

4828 {
4830  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_remote_counts} "
4831  "RtpsUdpDataLink::RtpsReader::%C: "
4832  "%C pre: %b assoc: %b\n",
4833  funcname, LogGuid(id_).c_str(),
4834  preassociation_writers_.size(), remote_writers_.size()));
4835  }
4836 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
bool log_remote_counts
Log number of associations and pending associations of RTPS entities.

◆ pre_stop_helper()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::pre_stop_helper ( )

Definition at line 1616 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), ACE_Guard< ACE_LOCK >::release(), and OpenDDS::DCPS::DataLink::strategy_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::client_stop().

1617 {
1619 
1620  if (stopping_) {
1621  return;
1622  }
1623 
1624  stopping_ = true;
1625 
1626  preassociation_writers_.clear();
1627  log_remote_counts("pre_stop_helper");
1628 
1629  RtpsUdpDataLink_rch link = link_.lock();
1630 
1631  if (!link) {
1632  return;
1633  }
1634 
1635  GuardType guard(link->strategy_lock_);
1636  if (link->receive_strategy() == 0) {
1637  return;
1638  }
1639 
1640  for (WriterInfoMap::iterator it = remote_writers_.begin(); it != remote_writers_.end(); ++it) {
1641  it->second->held_.clear();
1642  }
1643 
1644  guard.release();
1645  g.release();
1646 
1647  preassociation_task_->cancel();
1648 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< SporadicEvent > preassociation_task_
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ process_data_i()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_data_i ( const RTPS::DataSubmessage data,
const GUID_t src,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 1665 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::SequenceNumber::getValue(), id_, link_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_dropped_messages, mutex_, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), ACE_Guard< ACE_LOCK >::release(), remote_writers_, stopping_, OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::DCPS::swap(), OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, and OpenDDS::RTPS::DataSubmessage::writerSN.

1668 {
1670 
1671  if (stopping_) {
1672  return false;
1673  }
1674 
1675  RtpsUdpDataLink_rch link = link_.lock();
1676 
1677  if (!link) {
1678  return false;
1679  }
1680 
1681  GuardType guard(link->strategy_lock_);
1682  if (link->receive_strategy() == 0) {
1683  return false;
1684  }
1685 
1686  const SequenceNumber seq = to_opendds_seqnum(data.writerSN);
1687  DeliverHeldData dhd;
1688  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1689  if (wi != remote_writers_.end()) {
1690  const WriterInfo_rch& writer = wi->second;
1691 
1692  DeliverHeldData dhd2(rchandle_from(this), src);
1693  std::swap(dhd, dhd2);
1694 
1695  writer->frags_.erase(seq);
1696 
1697  if (writer->recvd_.empty()) {
1698  if (Transport_debug_level > 5) {
1699  ACE_DEBUG((LM_DEBUG,
1700  ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1701  ACE_TEXT(" data seq: %q from %C being from %C expecting heartbeat\n"),
1702  seq.getValue(),
1703  LogGuid(src).c_str(),
1704  LogGuid(id_).c_str()));
1705  }
1706  const ReceivedDataSample* sample =
1707  link->receive_strategy()->withhold_data_from(id_);
1708  writer->held_.insert(std::make_pair(seq, *sample));
1709 
1710  } else if (writer->recvd_.contains(seq)) {
1712  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_data_i: %C -> %C duplicate sample\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1713  }
1714  if (Transport_debug_level > 5) {
1715  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1716  ACE_TEXT(" data seq: %q from %C being DROPPED from %C because it's ALREADY received\n"),
1717  seq.getValue(),
1718  LogGuid(src).c_str(),
1719  LogGuid(id_).c_str()));
1720  }
1721  link->receive_strategy()->withhold_data_from(id_);
1722 
1723  } else if (!writer->held_.empty()) {
1724  const ReceivedDataSample* sample =
1725  link->receive_strategy()->withhold_data_from(id_);
1726  if (Transport_debug_level > 5) {
1727  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) WITHHOLD %q\n", seq.getValue()));
1728  writer->recvd_.dump();
1729  }
1730  writer->held_.insert(std::make_pair(seq, *sample));
1731  writer->recvd_.insert(seq);
1732 
1733  } else if (writer->recvd_.disjoint() || writer->recvd_.cumulative_ack() != seq.previous()) {
1734  if (Transport_debug_level > 5) {
1735  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1736  ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because it's EXPECTING more data\n"),
1737  seq.getValue(),
1738  LogGuid(src).c_str(),
1739  LogGuid(id_).c_str()));
1740  }
1741  const ReceivedDataSample* sample =
1742  link->receive_strategy()->withhold_data_from(id_);
1743  writer->held_.insert(std::make_pair(seq, *sample));
1744  writer->recvd_.insert(seq);
1745 
1746  } else {
1747  if (Transport_debug_level > 5) {
1748  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1749  ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
1750  seq.getValue(),
1751  LogGuid(src).c_str(),
1752  LogGuid(id_).c_str()));
1753  }
1754  writer->recvd_.insert(seq);
1755  link->receive_strategy()->do_not_withhold_data_from(id_);
1756  }
1757 
1758  } else {
1760  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_data_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1761  }
1762  if (Transport_debug_level > 5) {
1763  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1764  ACE_TEXT(" data seq: %q from %C to %C dropped because of unknown writer\n"),
1765  to_opendds_seqnum(data.writerSN).getValue(),
1766  LogGuid(src).c_str(),
1767  LogGuid(id_).c_str()));
1768  }
1769  link->receive_strategy()->withhold_data_from(id_);
1770  }
1771 
1772  // Release for delivering held data.
1773  guard.release();
1774  g.release();
1775 
1776  return false;
1777 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
bool log_dropped_messages
Log received RTPS messages that were dropped.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
WeakRcHandle< RtpsUdpDataLink > link_
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
void swap(MessageBlock &lhs, MessageBlock &rhs)
RcHandle< WriterInfo > WriterInfo_rch

◆ process_gap_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_gap_i ( const RTPS::GapSubmessage gap,
const GUID_t src,
bool  directed,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 1790 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, OpenDDS::RTPS::SequenceNumberSet::bitmap, OpenDDS::RTPS::SequenceNumberSet::bitmapBase, OpenDDS::DCPS::DisjointSequence::contains(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::GapSubmessage::gapList, OpenDDS::RTPS::GapSubmessage::gapStart, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::high(), id_, OpenDDS::DCPS::DisjointSequence::insert(), link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::DisjointSequence::low(), mutex_, OpenDDS::RTPS::SequenceNumberSet::numBits, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), ACE_Guard< ACE_LOCK >::release(), remote_writers_, OpenDDS::DCPS::DataLink::start(), OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::RTPS::to_opendds_seqnum(), and OpenDDS::DCPS::transport_debug.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

1794 {
1796 
1797  RtpsUdpDataLink_rch link = link_.lock();
1798 
1799  if (!link) {
1800  return;
1801  }
1802 
1803  GuardType guard(link->strategy_lock_);
1804  if (link->receive_strategy() == 0) {
1805  return;
1806  }
1807 
1808  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1809  if (wi == remote_writers_.end()) {
1811  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_gap_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1812  }
1813  return;
1814  }
1815 
1816  const WriterInfo_rch& writer = wi->second;
1817 
1818  if (writer->recvd_.empty()) {
1820  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_gap_i: %C -> %C preassociation writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1821  }
1822  return;
1823  }
1824 
1825  const SequenceNumber start = to_opendds_seqnum(gap.gapStart);
1826  const SequenceNumber base = to_opendds_seqnum(gap.gapList.bitmapBase);
1827 
1828  if (start < base) {
1829  writer->recvd_.insert(SequenceRange(start, base.previous()));
1830  } else if (start != base) {
1831  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::RtpsReader::process_gap_i: ERROR - Incoming GAP has inverted start (%q) & base (%q) values, ignoring start value\n", start.getValue(), base.getValue()));
1832  }
1833  writer->recvd_.insert(base, gap.gapList.numBits, gap.gapList.bitmap.get_buffer());
1834 
1835  DisjointSequence gaps;
1836  if (start < base) {
1837  gaps.insert(SequenceRange(start, base.previous()));
1838  }
1839  gaps.insert(base, gap.gapList.numBits, gap.gapList.bitmap.get_buffer());
1840 
1841  if (!gaps.empty()) {
1842  for (WriterInfo::HeldMap::iterator pos = writer->held_.lower_bound(gaps.low()),
1843  limit = writer->held_.upper_bound(gaps.high()); pos != limit;) {
1844  if (gaps.contains(pos->first)) {
1845  writer->held_.erase(pos++);
1846  } else {
1847  ++pos;
1848  }
1849  }
1850  }
1851 
1852  const OPENDDS_VECTOR(SequenceRange) psr = gaps.present_sequence_ranges();
1853  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end(); pos != limit; ++pos) {
1854  link->receive_strategy()->remove_fragments(*pos, writer->id_);
1855  }
1856 
1857  guard.release();
1858  g.release();
1859 
1860  DeliverHeldData dhd(rchandle_from(this), src);
1861 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
bool log_dropped_messages
Log received RTPS messages that were dropped.
WeakRcHandle< RtpsUdpDataLink > link_
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
RcHandle< WriterInfo > WriterInfo_rch
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ process_heartbeat_frag_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i ( const RTPS::HeartBeatFragSubmessage hb_frag,
const GUID_t src,
bool  directed,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 2999 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::RTPS::HeartBeatFragSubmessage::count, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, gather_ack_nacks_i(), id_, OpenDDS::RTPS::HeartBeatFragSubmessage::lastFragmentNum, link_, LM_DEBUG, LM_WARNING, OpenDDS::DCPS::TransportDebug::log_dropped_messages, mutex_, OpenDDS::DCPS::InternalTransportStatistics::reader_nack_count, OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), remote_writers_, OpenDDS::RTPS::HeartBeatFragSubmessage::smHeader, OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_mutex_, OpenDDS::RTPS::Count_t::value, VDBG, and OpenDDS::RTPS::HeartBeatFragSubmessage::writerSN.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

3003 {
3005 
3006  RtpsUdpDataLink_rch link = link_.lock();
3007 
3008  if (!link) {
3009  return;
3010  }
3011 
3012  GuardType guard(link->strategy_lock_);
3013  if (link->receive_strategy() == 0) {
3014  return;
3015  }
3016 
3017  const WriterInfoMap::iterator wi = remote_writers_.find(src);
3018  if (wi == remote_writers_.end()) {
3019  // we may not be associated yet, even if the writer thinks we are
3021  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3022  }
3023  return;
3024  }
3025 
3026  const WriterInfo_rch& writer = wi->second;
3027 
3028  if (!compare_and_update_counts(hb_frag.count.value, writer->hb_frag_recvd_count_)) {
3030  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i: %C -> %C stale/duplicate message\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3031  }
3032  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_heartbeat_frag_i "
3033  "WARNING Count indicates duplicate, dropping\n"));
3034  return;
3035  }
3036 
3037  // If seq is outside the heartbeat range or we haven't completely received
3038  // it yet, send a NackFrag along with the AckNack. The heartbeat range needs
3039  // to be checked first because recvd_ contains the numbers below the
3040  // heartbeat range (so that we don't NACK those).
3041  const SequenceNumber seq = to_opendds_seqnum(hb_frag.writerSN);
3042  if (seq > writer->hb_last_ || !writer->recvd_.contains(seq)) {
3043  writer->frags_[seq] = hb_frag.lastFragmentNum;
3044  ACE_CDR::ULong cumulative_bits_added = 0;
3045  gather_ack_nacks_i(writer, link, !(hb_frag.smHeader.flags & RTPS::FLAG_F), meta_submessages, cumulative_bits_added);
3046  if (cumulative_bits_added) {
3047  RtpsUdpInst_rch cfg = link->config();
3048  if (cfg && cfg->count_messages()) {
3049  ACE_GUARD(ACE_Thread_Mutex, g, link->transport_statistics_mutex_);
3050  link->transport_statistics_.reader_nack_count[id_] += cumulative_bits_added;
3051  }
3052  }
3053 
3054  }
3055 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
bool log_dropped_messages
Log received RTPS messages that were dropped.
#define VDBG(DBG_ARGS)
void gather_ack_nacks_i(const WriterInfo_rch &writer, const RtpsUdpDataLink_rch &link, bool heartbeat_was_non_final, MetaSubmessageVec &meta_submessages, ACE_CDR::ULong &cumulative_bits_added)
WeakRcHandle< RtpsUdpDataLink > link_
ACE_UINT32 ULong
const octet FLAG_F
Definition: RtpsCore.idl:520
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RcHandle< WriterInfo > WriterInfo_rch

◆ process_heartbeat_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_i ( const RTPS::HeartBeatSubmessage heartbeat,
const GUID_t src,
bool  directed,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 1903 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, gather_ack_nacks_i(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, id_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), OpenDDS::RTPS::HeartBeatSubmessage::lastSN, link_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::TransportDebug::log_nonfinal_messages, OpenDDS::DCPS::log_progress(), OpenDDS::DCPS::TransportDebug::log_progress, log_remote_counts(), mutex_, OPENDDS_ASSERT, preassociation_writers_, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::InternalTransportStatistics::reader_nack_count, OpenDDS::RTPS::HeartBeatSubmessage::readerId, OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), ACE_Guard< ACE_LOCK >::release(), remote_writers_, OpenDDS::RTPS::HeartBeatSubmessage::smHeader, OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_mutex_, OpenDDS::RTPS::Count_t::value, VDBG, and OpenDDS::DCPS::SequenceNumber::ZERO().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

1907 {
1908  // TODO: Delay responses by heartbeat_response_delay_.
1910 
1911  RtpsUdpDataLink_rch link = link_.lock();
1912 
1913  if (!link) {
1914  return;
1915  }
1916 
1917  GuardType guard(link->strategy_lock_);
1918  if (link->receive_strategy() == 0) {
1919  return;
1920  }
1921 
1922  // Heartbeat Sequence Range
1923  const SequenceNumber hb_first = to_opendds_seqnum(heartbeat.firstSN);
1924  const SequenceNumber hb_last = to_opendds_seqnum(heartbeat.lastSN);
1925 
1926  if (Transport_debug_level > 5) {
1927  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsReader::process_heartbeat_i - %C -> %C first %q last %q count %d\n",
1928  LogGuid(src).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
1929  }
1930 
1931  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1932  if (wi == remote_writers_.end()) {
1934  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1935  }
1936  return;
1937  }
1938 
1939  const WriterInfo_rch& writer = wi->second;
1940 
1941  if (!compare_and_update_counts(heartbeat.count.value, writer->heartbeat_recvd_count_)) {
1943  const GUID_t dst = heartbeat.readerId == DCPS::ENTITYID_UNKNOWN ? GUID_UNKNOWN : id_;
1944  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C stale/duplicate message (%d vs %d)\n",
1945  LogGuid(src).c_str(), LogGuid(dst).c_str(), heartbeat.count.value, writer->heartbeat_recvd_count_));
1946  }
1947  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_heartbeat_i "
1948  "WARNING Count indicates duplicate, dropping\n"));
1949  return;
1950  }
1951 
1952  const bool is_final = heartbeat.smHeader.flags & RTPS::FLAG_F;
1953 
1954  static const SequenceNumber one, zero = SequenceNumber::ZERO();
1955 
1956  bool first_ever_hb = false;
1957 
1958  if (!is_final && transport_debug.log_nonfinal_messages) {
1959  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i - %C -> %C first %q last %q count %d\n",
1960  LogGuid(src).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
1961  }
1962 
1963  // Only valid heartbeats (see spec) will be "fully" applied to writer info
1964  if (!(hb_first < 1 || hb_last < 0 || hb_last < hb_first.previous())) {
1965  if (writer->recvd_.empty() && (directed || !writer->sends_directed_hb())) {
1966  OPENDDS_ASSERT(preassociation_writers_.count(writer));
1967  preassociation_writers_.erase(writer);
1969  log_progress("RTPS reader/writer association complete", id_, writer->id_, writer->participant_discovered_at_);
1970  }
1971  log_remote_counts("process_heartbeat_i");
1972 
1973  const SequenceRange sr(zero, hb_first.previous());
1974  writer->recvd_.insert(sr);
1975  while (!writer->held_.empty() && writer->held_.begin()->first <= sr.second) {
1976  writer->held_.erase(writer->held_.begin());
1977  }
1978  for (WriterInfo::HeldMap::const_iterator it = writer->held_.begin(); it != writer->held_.end(); ++it) {
1979  writer->recvd_.insert(it->first);
1980  }
1981  link->receive_strategy()->remove_fragments(sr, writer->id_);
1982  first_ever_hb = true;
1983  }
1984 
1985  ACE_CDR::ULong cumulative_bits_added = 0;
1986  if (!writer->recvd_.empty()) {
1987  writer->hb_last_ = std::max(writer->hb_last_, hb_last);
1988  gather_ack_nacks_i(writer, link, !is_final, meta_submessages, cumulative_bits_added);
1989  }
1990  if (cumulative_bits_added) {
1991  RtpsUdpInst_rch cfg = link->config();
1992  if (cfg && cfg->count_messages()) {
1993  ACE_Guard<ACE_Thread_Mutex> tsg(link->transport_statistics_mutex_);
1994  link->transport_statistics_.reader_nack_count[id_] += cumulative_bits_added;
1995  }
1996  }
1997  } else {
1998  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C - INVALID - first %q last %q count %d\n", LogGuid(writer->id_).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
1999  }
2000 
2001  guard.release();
2002  g.release();
2003 
2004  if (first_ever_hb) {
2005  link->invoke_on_start_callbacks(id_, src, true);
2006  }
2007 
2008  DeliverHeldData dhd(rchandle_from(this), src);
2009 
2010  //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC
2011  return;
2012 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
bool log_dropped_messages
Log received RTPS messages that were dropped.
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
static SequenceNumber ZERO()
#define VDBG(DBG_ARGS)
void gather_ack_nacks_i(const WriterInfo_rch &writer, const RtpsUdpDataLink_rch &link, bool heartbeat_was_non_final, MetaSubmessageVec &meta_submessages, ACE_CDR::ULong &cumulative_bits_added)
WeakRcHandle< RtpsUdpDataLink > link_
ACE_UINT32 ULong
const octet FLAG_F
Definition: RtpsCore.idl:520
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
RcHandle< WriterInfo > WriterInfo_rch
std::pair< SequenceNumber, SequenceNumber > SequenceRange
bool log_progress
Log progress for RTPS entity discovery and association.

◆ remove_writer()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::remove_writer ( const GUID_t id)

Definition at line 2187 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, log_remote_counts(), mutex_, preassociation_writers_, and remote_writers_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i().

2188 {
2190  WriterInfoMap::iterator pos = remote_writers_.find(id);
2191  if (pos != remote_writers_.end()) {
2192  preassociation_writers_.erase(pos->second);
2193  remote_writers_.erase(pos);
2194  log_remote_counts("remove_writer");
2195  return true;
2196  }
2197 
2198  return false;
2199 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ send_preassociation_acknacks()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::send_preassociation_acknacks ( const MonotonicTimePoint now)
private

Definition at line 2227 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, gather_preassociation_acknack_i(), heartbeat_period_, link_, mutex_, preassociation_task_, preassociation_writers_, OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages(), and stopping_.

2228 {
2229  RtpsUdpDataLink_rch link = link_.lock();
2230  if (!link) {
2231  return;
2232  }
2233 
2234  MetaSubmessageVec meta_submessages;
2235  {
2237 
2238  if (stopping_ || preassociation_writers_.empty()) {
2239  return;
2240  }
2241 
2242  // We want a heartbeat from these writers.
2243  meta_submessages.reserve(preassociation_writers_.size());
2244  for (WriterInfoSet::const_iterator pos = preassociation_writers_.begin(), limit = preassociation_writers_.end();
2245  pos != limit; ++pos) {
2246  gather_preassociation_acknack_i(meta_submessages, *pos);
2247  }
2248  }
2249 
2250  link->queue_submessages(meta_submessages);
2251 
2253 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void gather_preassociation_acknack_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &writer)
RcHandle< SporadicEvent > preassociation_task_
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ should_nack_fragments()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::should_nack_fragments ( const RcHandle< RtpsUdpDataLink > &  link,
const WriterInfo_rch info 
)

Definition at line 2209 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy().

Referenced by gather_ack_nacks_i().

2211 {
2212  if (!info->frags_.empty()) {
2213  return true;
2214  }
2215 
2216  if (!info->recvd_.empty()) {
2217  const SequenceRange range(info->recvd_.cumulative_ack() + 1, info->hb_last_);
2218  if (link->receive_strategy()->has_fragments(range, info->id_)) {
2219  return true;
2220  }
2221  }
2222 
2223  return false;
2224 }
RtpsUdpReceiveStrategy_rch receive_strategy()
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ writer_count()

size_t OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::writer_count ( ) const

Definition at line 2202 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, mutex_, and remote_writers_.

2203 {
2205  return remote_writers_.size();
2206 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

Member Data Documentation

◆ heartbeat_period_

TimeDuration OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::heartbeat_period_
private

Definition at line 679 of file RtpsUdpDataLink.h.

Referenced by add_writer(), and send_preassociation_acknacks().

◆ id_

const GUID_t OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_
private

◆ link_

WeakRcHandle<RtpsUdpDataLink> OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_
private

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_
mutableprivate

◆ nackfrag_count_

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::nackfrag_count_
private

Definition at line 677 of file RtpsUdpDataLink.h.

Referenced by generate_nack_frags_i().

◆ preassociation_task_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::preassociation_task_
private

Definition at line 678 of file RtpsUdpDataLink.h.

Referenced by add_writer(), and send_preassociation_acknacks().

◆ preassociation_writers_

WriterInfoSet OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::preassociation_writers_
private

◆ remote_writers_

WriterInfoMap OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::remote_writers_
private

◆ stopping_

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_
private

The documentation for this class was generated from the following files: