OpenDDS  Snapshot(2023/04/28-20:55)
Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader Class Reference
Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader:
Collaboration graph
[legend]

Public Member Functions

 RtpsReader (const RtpsUdpDataLink_rch &link, const GUID_t &id)
 
virtual ~RtpsReader ()
 
bool add_writer (const WriterInfo_rch &info)
 
bool has_writer (const GUID_t &id) const
 
bool remove_writer (const GUID_t &id)
 
size_t writer_count () const
 
bool should_nack_fragments (const RcHandle< RtpsUdpDataLink > &link, const WriterInfo_rch &info)
 
void pre_stop_helper ()
 
void process_heartbeat_i (const RTPS::HeartBeatSubmessage &heartbeat, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
 
bool process_data_i (const RTPS::DataSubmessage &data, const GUID_t &src, MetaSubmessageVec &meta_submessages)
 
void process_gap_i (const RTPS::GapSubmessage &gap, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
 
void process_heartbeat_frag_i (const RTPS::HeartBeatFragSubmessage &hb_frag, const GUID_t &src, bool directed, MetaSubmessageVec &meta_submessages)
 
void deliver_held_data (const GUID_t &src)
 
const GUID_tid () const
 
void log_remote_counts (const char *funcname)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Member Functions

void send_preassociation_acknacks (const MonotonicTimePoint &now)
 
void gather_preassociation_acknack_i (MetaSubmessageVec &meta_submessages, const WriterInfo_rch &writer)
 
void gather_ack_nacks_i (const WriterInfo_rch &writer, const RtpsUdpDataLink_rch &link, bool heartbeat_was_non_final, MetaSubmessageVec &meta_submessages, ACE_CDR::ULong &cumulative_bits_added)
 
void generate_nack_frags_i (MetaSubmessageVec &meta_submessages, const WriterInfo_rch &wi, EntityId_t reader_id, EntityId_t writer_id, ACE_CDR::ULong &cumulative_bits_added)
 

Private Attributes

ACE_Thread_Mutex mutex_
 
WeakRcHandle< RtpsUdpDataLinklink_
 
const GUID_t id_
 
WriterInfoMap remote_writers_
 
WriterInfoSet preassociation_writers_
 
bool stopping_
 
CORBA::Long nackfrag_count_
 
RcHandle< SporadicEventpreassociation_task_
 
TimeDuration heartbeat_period_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Detailed Description

Definition at line 621 of file RtpsUdpDataLink.h.

Constructor & Destructor Documentation

◆ RtpsReader()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::RtpsReader ( const RtpsUdpDataLink_rch link,
const GUID_t id 
)

Definition at line 1651 of file RtpsUdpDataLink.cpp.

1652  : link_(link)
1653  , id_(id)
1654  , stopping_(false)
1655  , nackfrag_count_(0)
1656  , preassociation_task_(make_rch<SporadicEvent>(link->event_dispatcher(), make_rch<PmfNowEvent<RtpsReader> >(rchandle_from(this), &RtpsUdpDataLink::RtpsReader::send_preassociation_acknacks)))
1657  , heartbeat_period_(link ? link->config()->heartbeat_period_ : TimeDuration(RtpsUdpInst::DEFAULT_HEARTBEAT_PERIOD_SEC))
1658 {
1659 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
void send_preassociation_acknacks(const MonotonicTimePoint &now)
RcHandle< SporadicEvent > preassociation_task_
WeakRcHandle< RtpsUdpDataLink > link_

◆ ~RtpsReader()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::~RtpsReader ( )
virtual

Definition at line 1661 of file RtpsUdpDataLink.cpp.

1662 {
1663 }

Member Function Documentation

◆ add_writer()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::add_writer ( const WriterInfo_rch info)

Definition at line 2150 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, gather_preassociation_acknack_i(), heartbeat_period_, link_, log_remote_counts(), mutex_, preassociation_task_, preassociation_writers_, OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages(), remote_writers_, and stopping_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::associated().

2151 {
2153 
2154  if (stopping_) {
2155  return false;
2156  }
2157 
2158  WriterInfoMap::const_iterator iter = remote_writers_.find(writer->id_);
2159  if (iter == remote_writers_.end()) {
2160  remote_writers_[writer->id_] = writer;
2161  preassociation_writers_.insert(writer);
2162  log_remote_counts("add_writer");
2163 
2164  RtpsUdpDataLink_rch link = link_.lock();
2165  if (!link) {
2166  return false;
2167  }
2168 
2170  MetaSubmessageVec meta_submessages;
2171  gather_preassociation_acknack_i(meta_submessages, writer);
2172  g.release();
2173  link->queue_submessages(meta_submessages);
2174 
2175  return true;
2176  }
2177  return false;
2178 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
RcHandle< SporadicEvent > preassociation_task_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
void gather_preassociation_acknack_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &writer)

◆ deliver_held_data()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::deliver_held_data ( const GUID_t src)

Definition at line 4526 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::link_, LM_DEBUG, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::mutex_, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::stopping_, and OpenDDS::DCPS::Transport_debug_level.

4527 {
4529 
4530  if (stopping_) {
4531  return;
4532  }
4533 
4534  RtpsUdpDataLink_rch link = link_.lock();
4535 
4536  if (!link) {
4537  return;
4538  }
4539 
4540  OPENDDS_VECTOR(ReceivedDataSample) to_deliver;
4541 
4542  const WriterInfoMap::iterator wi = remote_writers_.find(src);
4543  if (wi != remote_writers_.end()) {
4544  const SequenceNumber ca = wi->second->recvd_.cumulative_ack();
4545  const WriterInfo::HeldMap::iterator end = wi->second->held_.upper_bound(ca);
4546  for (WriterInfo::HeldMap::iterator it = wi->second->held_.begin(); it != end; /*increment in loop body*/) {
4547  to_deliver.push_back(it->second);
4548  wi->second->held_.erase(it++);
4549  }
4550  }
4551 
4552  const GUID_t dst = id_;
4553 
4554  g.release();
4555 
4556  for (OPENDDS_VECTOR(ReceivedDataSample)::iterator it = to_deliver.begin(); it != to_deliver.end(); ++it) {
4557  if (Transport_debug_level > 5) {
4558  LogGuid reader(dst);
4559  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::DeliverHeldData::~DeliverHeldData -")
4560  ACE_TEXT(" deliver sequence: %q to %C\n"),
4561  it->header_.sequence_.getValue(),
4562  reader.c_str()));
4563  }
4564  link->data_received(*it, dst);
4565  }
4566 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ACE_TEXT("TCP_Factory")
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_

◆ gather_ack_nacks_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::gather_ack_nacks_i ( const WriterInfo_rch writer,
const RtpsUdpDataLink_rch link,
bool  heartbeat_was_non_final,
MetaSubmessageVec &  meta_submessages,
ACE_CDR::ULong cumulative_bits_added 
)
private

Definition at line 2287 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, OpenDDS::DCPS::DisjointSequence::bitmap_num_longs(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, generate_nack_frags_i(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::high(), id_, OpenDDS::DCPS::DisjointSequence::last_ack(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), should_nack_fragments(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::DCPS::DisjointSequence::to_bitmap(), and OpenDDS::RTPS::to_rtps_seqnum().

Referenced by process_heartbeat_frag_i(), and process_heartbeat_i().

2292 {
2293  const bool should_nack_frags = should_nack_fragments(link, writer);
2294  if (writer->should_nack() ||
2295  should_nack_frags) {
2296  using namespace OpenDDS::RTPS;
2297  const EntityId_t reader_id = id_.entityId;
2298  const EntityId_t writer_id = writer->id_.entityId;
2299  MetaSubmessage meta_submessage(id_, writer->id_);
2300 
2301  const DisjointSequence& recvd = writer->recvd_;
2302  const SequenceNumber& hb_high = writer->hb_last_;
2303  const SequenceNumber ack = recvd.empty() ? 1 : ++SequenceNumber(recvd.cumulative_ack());
2304  const SequenceNumber::Value ack_val = ack.getValue();
2305  CORBA::ULong num_bits = 0;
2306  LongSeq8 bitmap;
2307 
2308  if (recvd.disjoint()) {
2309  bitmap.length(DisjointSequence::bitmap_num_longs(ack, recvd.last_ack().previous()));
2310  if (bitmap.length() > 0) {
2311  (void)recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
2312  num_bits, cumulative_bits_added, true);
2313  }
2314  }
2315 
2316  if (!recvd.empty() && hb_high > recvd.high()) {
2317  const SequenceNumber eff_high =
2318  (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
2319  const SequenceNumber::Value eff_high_val = eff_high.getValue();
2320  // Nack the range between the received high and the effective high.
2321  const CORBA::ULong old_len = bitmap.length(),
2322  new_len = DisjointSequence::bitmap_num_longs(ack, eff_high);
2323  if (new_len > old_len) {
2324  bitmap.length(new_len);
2325  for (CORBA::ULong i = old_len; i < new_len; ++i) {
2326  bitmap[i] = 0;
2327  }
2328  }
2329  const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
2330  idx_recv_high = recvd.disjoint() ?
2331  CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
2332  DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
2333  bitmap.get_buffer(), new_len,
2334  num_bits, cumulative_bits_added);
2335  }
2336 
2337  // If the receive strategy is holding any fragments, those should
2338  // not be "nacked" in the ACKNACK reply. They will be accounted for
2339  // in the NACK_FRAG(s) instead.
2340  const bool frags_modified =
2341  link->receive_strategy()->remove_frags_from_bitmap(bitmap.get_buffer(),
2342  num_bits, ack, writer->id_, cumulative_bits_added);
2343  if (frags_modified) {
2344  for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
2345  if ((i + 1) * 32 <= num_bits) {
2346  if (bitmap[i]) {
2347  break;
2348  }
2349  } else {
2350  if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
2351  break;
2352  }
2353  }
2354  }
2355  }
2356 
2357  AckNackSubmessage acknack = {
2358  {ACKNACK,
2360  0 /*length*/},
2361  reader_id,
2362  writer_id,
2363  { // SequenceNumberSet: acking bitmapBase - 1
2364  to_rtps_seqnum(ack),
2365  num_bits, bitmap
2366  },
2367  {writer->heartbeat_recvd_count_}
2368  };
2369  meta_submessage.sm_.acknack_sm(acknack);
2370  meta_submessages.push_back(meta_submessage);
2371 
2372  if (should_nack_frags) {
2373  generate_nack_frags_i(meta_submessages, writer, reader_id, writer_id, cumulative_bits_added);
2374  }
2375  } else if (heartbeat_was_non_final) {
2376  using namespace OpenDDS::RTPS;
2377  const DisjointSequence& recvd = writer->recvd_;
2378  const CORBA::ULong num_bits = 0;
2379  const LongSeq8 bitmap;
2380  const SequenceNumber ack = recvd.empty() ? 1 : ++SequenceNumber(recvd.cumulative_ack());
2381  const EntityId_t reader_id = id_.entityId;
2382  const EntityId_t writer_id = writer->id_.entityId;
2383 
2384  MetaSubmessage meta_submessage(id_, writer->id_);
2385 
2386  AckNackSubmessage acknack = {
2387  {ACKNACK,
2389  0 /*length*/},
2390  reader_id,
2391  writer_id,
2392  { // SequenceNumberSet: acking bitmapBase - 1
2393  to_rtps_seqnum(ack),
2394  num_bits, bitmap
2395  },
2396  {writer->heartbeat_recvd_count_}
2397  };
2398  meta_submessage.sm_.acknack_sm(acknack);
2399  meta_submessages.push_back(meta_submessage);
2400  }
2401 }
const octet FLAG_E
Definition: RtpsCore.idl:521
void generate_nack_frags_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &wi, EntityId_t reader_id, EntityId_t writer_id, ACE_CDR::ULong &cumulative_bits_added)
const octet FLAG_F
Definition: RtpsCore.idl:523
ACE_CDR::ULong ULong
static ACE_CDR::ULong bitmap_num_longs(const SequenceNumber &low, const SequenceNumber &high)
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69
ACE_CDR::Octet Octet
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
bool should_nack_fragments(const RcHandle< RtpsUdpDataLink > &link, const WriterInfo_rch &info)
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59

◆ gather_preassociation_acknack_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::gather_preassociation_acknack_i ( MetaSubmessageVec &  meta_submessages,
const WriterInfo_rch writer 
)
private

Definition at line 2257 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::FLAG_E, id_, OPENDDS_ASSERT, and OpenDDS::DCPS::MetaSubmessage::sm_.

Referenced by add_writer(), and send_preassociation_acknacks().

2259 {
2260  using namespace OpenDDS::RTPS;
2261 
2262  OPENDDS_ASSERT(writer->recvd_.empty());
2263  const CORBA::ULong num_bits = 0;
2264  const LongSeq8 bitmap;
2265  const EntityId_t reader_id = id_.entityId;
2266  const EntityId_t writer_id = writer->id_.entityId;
2267 
2268  MetaSubmessage meta_submessage(id_, writer->id_);
2269 
2270  AckNackSubmessage acknack = {
2271  {ACKNACK,
2273  0 /*length*/},
2274  reader_id,
2275  writer_id,
2276  { // SequenceNumberSet: acking bitmapBase - 1
2277  {0, 1},
2278  num_bits, bitmap
2279  },
2280  {writer->heartbeat_recvd_count_}
2281  };
2282  meta_submessage.sm_.acknack_sm(acknack);
2283  meta_submessages.push_back(meta_submessage);
2284 }
const octet FLAG_E
Definition: RtpsCore.idl:521
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
ACE_CDR::ULong ULong
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69
ACE_CDR::Octet Octet
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59

◆ generate_nack_frags_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::generate_nack_frags_i ( MetaSubmessageVec &  meta_submessages,
const WriterInfo_rch wi,
EntityId_t  reader_id,
EntityId_t  writer_id,
ACE_CDR::ULong cumulative_bits_added 
)
private

Definition at line 2882 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::RtpsUdpDataLink::extend_bitmap_range(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, id_, link_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, nackfrag_count_, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP(), OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::to_rtps_seqnum(), OpenDDS::RTPS::Count_t::value, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

Referenced by gather_ack_nacks_i().

2887 {
2888  typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
2889  typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
2890  RtpsUdpReceiveStrategy::FragmentInfo frag_info;
2891 
2892  // This is an internal method, locks already locked,
2893  // we just need a local handle to the link
2894  RtpsUdpDataLink_rch link = link_.lock();
2895 
2896  // Populate frag_info with two possible sources of NackFrags:
2897  // 1. sequence #s in the reception gaps that we have partially received
2898  OPENDDS_VECTOR(SequenceRange) missing = wi->recvd_.missing_sequence_ranges();
2899  for (size_t i = 0; i < missing.size(); ++i) {
2900  link->receive_strategy()->has_fragments(missing[i], wi->id_, &frag_info);
2901  }
2902  // 1b. larger than the last received seq# but less than the heartbeat.lastSN
2903  if (!wi->recvd_.empty() && wi->recvd_.high() < wi->hb_last_) {
2904  const SequenceRange range(wi->recvd_.high() + 1, wi->hb_last_);
2905  link->receive_strategy()->has_fragments(range, wi->id_, &frag_info);
2906  }
2907  for (size_t i = 0; i < frag_info.size(); ++i) {
2908  // If we've received a HeartbeatFrag, we know the last (available) frag #
2909  const iter_t heartbeat_frag = wi->frags_.find(frag_info[i].first);
2910  if (heartbeat_frag != wi->frags_.end()) {
2911  extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value, cumulative_bits_added);
2912  }
2913  }
2914 
2915  // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag
2916  const iter_t low = wi->frags_.lower_bound(wi->recvd_.cumulative_ack()),
2917  high = wi->frags_.upper_bound(wi->recvd_.last_ack()),
2918  end = wi->frags_.end();
2919  for (iter_t iter = wi->frags_.begin(); iter != end; ++iter) {
2920  if (iter == low) {
2921  // skip over the range covered by step #1 above
2922  if (high == end) {
2923  break;
2924  }
2925  iter = high;
2926  }
2927 
2928  const SequenceRange range(iter->first, iter->first);
2929  if (!link->receive_strategy()->has_fragments(range, wi->id_, &frag_info)) {
2930  // it was not in the recv strategy, so the entire range is "missing"
2931  frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
2932  RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
2933  fnSet.bitmapBase.value = 1;
2934  fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
2935  fnSet.bitmap.length((fnSet.numBits + 31) / 32);
2936  for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
2937  fnSet.bitmap[i] = 0xFFFFFFFF;
2938  }
2939  }
2940  }
2941 
2942  if (frag_info.empty()) {
2943  return;
2944  }
2945 
2946  const RTPS::NackFragSubmessage nackfrag_prototype = {
2947  {RTPS::NACK_FRAG, RTPS::FLAG_E, 0 /* length set below */},
2948  reader_id,
2949  writer_id,
2950  {0, 0}, // writerSN set below
2951  RTPS::FragmentNumberSet(), // fragmentNumberState set below
2952  {0} // count set below
2953  };
2954 
2955  meta_submessages.reserve(meta_submessages.size() + frag_info.size());
2956  for (size_t i = 0; i < frag_info.size(); ++i) {
2957  MetaSubmessage meta_submessage(id_, wi->id_);
2958  meta_submessage.sm_.nack_frag_sm(nackfrag_prototype);
2959  RTPS::NackFragSubmessage& nackfrag = meta_submessage.sm_.nack_frag_sm();
2960  nackfrag.writerSN = to_rtps_seqnum(frag_info[i].first);
2961  nackfrag.fragmentNumberState = frag_info[i].second;
2962  nackfrag.count.value = ++nackfrag_count_;
2963  meta_submessages.push_back(meta_submessage);
2964  }
2965 }
const octet FLAG_E
Definition: RtpsCore.idl:521
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
ACE_CDR::ULong ULong
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
std::pair< SequenceNumber, SequenceNumber > SequenceRange
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
static void extend_bitmap_range(RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent, ACE_CDR::ULong &cumulative_bits_added)
WeakRcHandle< RtpsUdpDataLink > link_
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139

◆ has_writer()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::has_writer ( const GUID_t id) const

Definition at line 2181 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, mutex_, and remote_writers_.

2182 {
2184  return remote_writers_.count(id) != 0;
2185 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ id()

const GUID_t& OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id ( void  ) const
inline

◆ log_remote_counts()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::log_remote_counts ( const char *  funcname)

Definition at line 4828 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::id_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_remote_counts, OPENDDS_END_VERSIONED_NAMESPACE_DECL, and OpenDDS::DCPS::transport_debug.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_reader(), add_writer(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acknack(), process_heartbeat_i(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_reader(), and remove_writer().

4829 {
4831  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_remote_counts} "
4832  "RtpsUdpDataLink::RtpsReader::%C: "
4833  "%C pre: %b assoc: %b\n",
4834  funcname, LogGuid(id_).c_str(),
4835  preassociation_writers_.size(), remote_writers_.size()));
4836  }
4837 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
bool log_remote_counts
Log number of associations and pending associations of RTPS entities.

◆ pre_stop_helper()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::pre_stop_helper ( )

Definition at line 1617 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), ACE_Guard< ACE_LOCK >::release(), and OpenDDS::DCPS::DataLink::strategy_lock_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::client_stop().

1618 {
1620 
1621  if (stopping_) {
1622  return;
1623  }
1624 
1625  stopping_ = true;
1626 
1627  preassociation_writers_.clear();
1628  log_remote_counts("pre_stop_helper");
1629 
1630  RtpsUdpDataLink_rch link = link_.lock();
1631 
1632  if (!link) {
1633  return;
1634  }
1635 
1636  GuardType guard(link->strategy_lock_);
1637  if (link->receive_strategy() == 0) {
1638  return;
1639  }
1640 
1641  for (WriterInfoMap::iterator it = remote_writers_.begin(); it != remote_writers_.end(); ++it) {
1642  it->second->held_.clear();
1643  }
1644 
1645  guard.release();
1646  g.release();
1647 
1648  preassociation_task_->cancel();
1649 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< SporadicEvent > preassociation_task_
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_

◆ process_data_i()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_data_i ( const RTPS::DataSubmessage data,
const GUID_t src,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 1666 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::SequenceNumber::getValue(), id_, link_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_dropped_messages, mutex_, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), ACE_Guard< ACE_LOCK >::release(), remote_writers_, stopping_, OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::DCPS::swap(), OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, and OpenDDS::RTPS::DataSubmessage::writerSN.

1669 {
1671 
1672  if (stopping_) {
1673  return false;
1674  }
1675 
1676  RtpsUdpDataLink_rch link = link_.lock();
1677 
1678  if (!link) {
1679  return false;
1680  }
1681 
1682  GuardType guard(link->strategy_lock_);
1683  if (link->receive_strategy() == 0) {
1684  return false;
1685  }
1686 
1687  const SequenceNumber seq = to_opendds_seqnum(data.writerSN);
1688  DeliverHeldData dhd;
1689  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1690  if (wi != remote_writers_.end()) {
1691  const WriterInfo_rch& writer = wi->second;
1692 
1693  DeliverHeldData dhd2(rchandle_from(this), src);
1694  std::swap(dhd, dhd2);
1695 
1696  writer->frags_.erase(seq);
1697 
1698  if (writer->recvd_.empty()) {
1699  if (Transport_debug_level > 5) {
1700  ACE_DEBUG((LM_DEBUG,
1701  ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1702  ACE_TEXT(" data seq: %q from %C being from %C expecting heartbeat\n"),
1703  seq.getValue(),
1704  LogGuid(src).c_str(),
1705  LogGuid(id_).c_str()));
1706  }
1707  const ReceivedDataSample* sample =
1708  link->receive_strategy()->withhold_data_from(id_);
1709  writer->held_.insert(std::make_pair(seq, *sample));
1710 
1711  } else if (writer->recvd_.contains(seq)) {
1713  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_data_i: %C -> %C duplicate sample\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1714  }
1715  if (Transport_debug_level > 5) {
1716  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1717  ACE_TEXT(" data seq: %q from %C being DROPPED from %C because it's ALREADY received\n"),
1718  seq.getValue(),
1719  LogGuid(src).c_str(),
1720  LogGuid(id_).c_str()));
1721  }
1722  link->receive_strategy()->withhold_data_from(id_);
1723 
1724  } else if (!writer->held_.empty()) {
1725  const ReceivedDataSample* sample =
1726  link->receive_strategy()->withhold_data_from(id_);
1727  if (Transport_debug_level > 5) {
1728  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) WITHHOLD %q\n", seq.getValue()));
1729  writer->recvd_.dump();
1730  }
1731  writer->held_.insert(std::make_pair(seq, *sample));
1732  writer->recvd_.insert(seq);
1733 
1734  } else if (writer->recvd_.disjoint() || writer->recvd_.cumulative_ack() != seq.previous()) {
1735  if (Transport_debug_level > 5) {
1736  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1737  ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because it's EXPECTING more data\n"),
1738  seq.getValue(),
1739  LogGuid(src).c_str(),
1740  LogGuid(id_).c_str()));
1741  }
1742  const ReceivedDataSample* sample =
1743  link->receive_strategy()->withhold_data_from(id_);
1744  writer->held_.insert(std::make_pair(seq, *sample));
1745  writer->recvd_.insert(seq);
1746 
1747  } else {
1748  if (Transport_debug_level > 5) {
1749  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1750  ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
1751  seq.getValue(),
1752  LogGuid(src).c_str(),
1753  LogGuid(id_).c_str()));
1754  }
1755  writer->recvd_.insert(seq);
1756  link->receive_strategy()->do_not_withhold_data_from(id_);
1757  }
1758 
1759  } else {
1761  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_data_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1762  }
1763  if (Transport_debug_level > 5) {
1764  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
1765  ACE_TEXT(" data seq: %q from %C to %C dropped because of unknown writer\n"),
1766  to_opendds_seqnum(data.writerSN).getValue(),
1767  LogGuid(src).c_str(),
1768  LogGuid(id_).c_str()));
1769  }
1770  link->receive_strategy()->withhold_data_from(id_);
1771  }
1772 
1773  // Release for delivering held data.
1774  guard.release();
1775  g.release();
1776 
1777  return false;
1778 }
void swap(MessageBlock &lhs, MessageBlock &rhs)
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
RcHandle< WriterInfo > WriterInfo_rch
bool log_dropped_messages
Log received RTPS messages that were dropped.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_TEXT("TCP_Factory")
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132

◆ process_gap_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_gap_i ( const RTPS::GapSubmessage gap,
const GUID_t src,
bool  directed,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 1791 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, OpenDDS::DCPS::DisjointSequence::contains(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::GapSubmessage::gapList, OpenDDS::RTPS::GapSubmessage::gapStart, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::high(), id_, OpenDDS::DCPS::DisjointSequence::insert(), link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::DisjointSequence::low(), mutex_, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), ACE_Guard< ACE_LOCK >::release(), remote_writers_, OpenDDS::DCPS::DataLink::start(), OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::RTPS::to_opendds_seqnum(), and OpenDDS::DCPS::transport_debug.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

1795 {
1797 
1798  RtpsUdpDataLink_rch link = link_.lock();
1799 
1800  if (!link) {
1801  return;
1802  }
1803 
1804  GuardType guard(link->strategy_lock_);
1805  if (link->receive_strategy() == 0) {
1806  return;
1807  }
1808 
1809  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1810  if (wi == remote_writers_.end()) {
1812  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_gap_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1813  }
1814  return;
1815  }
1816 
1817  const WriterInfo_rch& writer = wi->second;
1818 
1819  if (writer->recvd_.empty()) {
1821  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_gap_i: %C -> %C preassociation writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1822  }
1823  return;
1824  }
1825 
1826  const SequenceNumber start = to_opendds_seqnum(gap.gapStart);
1827  const SequenceNumber base = to_opendds_seqnum(gap.gapList.bitmapBase);
1828 
1829  if (start < base) {
1830  writer->recvd_.insert(SequenceRange(start, base.previous()));
1831  } else if (start != base) {
1832  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::RtpsReader::process_gap_i: ERROR - Incoming GAP has inverted start (%q) & base (%q) values, ignoring start value\n", start.getValue(), base.getValue()));
1833  }
1834  writer->recvd_.insert(base, gap.gapList.numBits, gap.gapList.bitmap.get_buffer());
1835 
1836  DisjointSequence gaps;
1837  if (start < base) {
1838  gaps.insert(SequenceRange(start, base.previous()));
1839  }
1840  gaps.insert(base, gap.gapList.numBits, gap.gapList.bitmap.get_buffer());
1841 
1842  if (!gaps.empty()) {
1843  for (WriterInfo::HeldMap::iterator pos = writer->held_.lower_bound(gaps.low()),
1844  limit = writer->held_.upper_bound(gaps.high()); pos != limit;) {
1845  if (gaps.contains(pos->first)) {
1846  writer->held_.erase(pos++);
1847  } else {
1848  ++pos;
1849  }
1850  }
1851  }
1852 
1853  const OPENDDS_VECTOR(SequenceRange) psr = gaps.present_sequence_ranges();
1854  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end(); pos != limit; ++pos) {
1855  link->receive_strategy()->remove_fragments(*pos, writer->id_);
1856  }
1857 
1858  guard.release();
1859  g.release();
1860 
1861  DeliverHeldData dhd(rchandle_from(this), src);
1862 }
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
RcHandle< WriterInfo > WriterInfo_rch
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
bool log_dropped_messages
Log received RTPS messages that were dropped.
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
std::pair< SequenceNumber, SequenceNumber > SequenceRange
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132

◆ process_heartbeat_frag_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i ( const RTPS::HeartBeatFragSubmessage hb_frag,
const GUID_t src,
bool  directed,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 3000 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::RTPS::HeartBeatFragSubmessage::count, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, gather_ack_nacks_i(), id_, OpenDDS::RTPS::HeartBeatFragSubmessage::lastFragmentNum, link_, LM_DEBUG, LM_WARNING, OpenDDS::DCPS::TransportDebug::log_dropped_messages, mutex_, OpenDDS::DCPS::InternalTransportStatistics::reader_nack_count, OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), remote_writers_, OpenDDS::RTPS::HeartBeatFragSubmessage::smHeader, OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_mutex_, OpenDDS::RTPS::Count_t::value, VDBG, and OpenDDS::RTPS::HeartBeatFragSubmessage::writerSN.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

3004 {
3006 
3007  RtpsUdpDataLink_rch link = link_.lock();
3008 
3009  if (!link) {
3010  return;
3011  }
3012 
3013  GuardType guard(link->strategy_lock_);
3014  if (link->receive_strategy() == 0) {
3015  return;
3016  }
3017 
3018  const WriterInfoMap::iterator wi = remote_writers_.find(src);
3019  if (wi == remote_writers_.end()) {
3020  // we may not be associated yet, even if the writer thinks we are
3022  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3023  }
3024  return;
3025  }
3026 
3027  const WriterInfo_rch& writer = wi->second;
3028 
3029  if (!compare_and_update_counts(hb_frag.count.value, writer->hb_frag_recvd_count_)) {
3031  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_frag_i: %C -> %C stale/duplicate message\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3032  }
3033  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_heartbeat_frag_i "
3034  "WARNING Count indicates duplicate, dropping\n"));
3035  return;
3036  }
3037 
3038  // If seq is outside the heartbeat range or we haven't completely received
3039  // it yet, send a NackFrag along with the AckNack. The heartbeat range needs
3040  // to be checked first because recvd_ contains the numbers below the
3041  // heartbeat range (so that we don't NACK those).
3042  const SequenceNumber seq = to_opendds_seqnum(hb_frag.writerSN);
3043  if (seq > writer->hb_last_ || !writer->recvd_.contains(seq)) {
3044  writer->frags_[seq] = hb_frag.lastFragmentNum;
3045  ACE_CDR::ULong cumulative_bits_added = 0;
3046  gather_ack_nacks_i(writer, link, !(hb_frag.smHeader.flags & RTPS::FLAG_F), meta_submessages, cumulative_bits_added);
3047  if (cumulative_bits_added) {
3048  RtpsUdpInst_rch cfg = link->config();
3049  if (cfg && cfg->count_messages()) {
3050  ACE_GUARD(ACE_Thread_Mutex, g, link->transport_statistics_mutex_);
3051  link->transport_statistics_.reader_nack_count[id_] += cumulative_bits_added;
3052  }
3053  }
3054 
3055  }
3056 }
#define ACE_DEBUG(X)
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
RcHandle< WriterInfo > WriterInfo_rch
bool log_dropped_messages
Log received RTPS messages that were dropped.
const octet FLAG_F
Definition: RtpsCore.idl:523
#define VDBG(DBG_ARGS)
ACE_UINT32 ULong
void gather_ack_nacks_i(const WriterInfo_rch &writer, const RtpsUdpDataLink_rch &link, bool heartbeat_was_non_final, MetaSubmessageVec &meta_submessages, ACE_CDR::ULong &cumulative_bits_added)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132

◆ process_heartbeat_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::process_heartbeat_i ( const RTPS::HeartBeatSubmessage heartbeat,
const GUID_t src,
bool  directed,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 1904 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, gather_ack_nacks_i(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, id_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), OpenDDS::RTPS::HeartBeatSubmessage::lastSN, link_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::TransportDebug::log_nonfinal_messages, OpenDDS::DCPS::log_progress(), OpenDDS::DCPS::TransportDebug::log_progress, log_remote_counts(), mutex_, OPENDDS_ASSERT, preassociation_writers_, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::rchandle_from(), OpenDDS::DCPS::InternalTransportStatistics::reader_nack_count, OpenDDS::RTPS::HeartBeatSubmessage::readerId, OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy(), ACE_Guard< ACE_LOCK >::release(), remote_writers_, OpenDDS::RTPS::HeartBeatSubmessage::smHeader, OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_mutex_, OpenDDS::RTPS::Count_t::value, VDBG, and OpenDDS::DCPS::SequenceNumber::ZERO().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

1908 {
1909  // TODO: Delay responses by heartbeat_response_delay_.
1911 
1912  RtpsUdpDataLink_rch link = link_.lock();
1913 
1914  if (!link) {
1915  return;
1916  }
1917 
1918  GuardType guard(link->strategy_lock_);
1919  if (link->receive_strategy() == 0) {
1920  return;
1921  }
1922 
1923  // Heartbeat Sequence Range
1924  const SequenceNumber hb_first = to_opendds_seqnum(heartbeat.firstSN);
1925  const SequenceNumber hb_last = to_opendds_seqnum(heartbeat.lastSN);
1926 
1927  if (Transport_debug_level > 5) {
1928  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsReader::process_heartbeat_i - %C -> %C first %q last %q count %d\n",
1929  LogGuid(src).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
1930  }
1931 
1932  const WriterInfoMap::iterator wi = remote_writers_.find(src);
1933  if (wi == remote_writers_.end()) {
1935  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C unknown remote writer\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
1936  }
1937  return;
1938  }
1939 
1940  const WriterInfo_rch& writer = wi->second;
1941 
1942  if (!compare_and_update_counts(heartbeat.count.value, writer->heartbeat_recvd_count_)) {
1944  const GUID_t dst = heartbeat.readerId == DCPS::ENTITYID_UNKNOWN ? GUID_UNKNOWN : id_;
1945  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C stale/duplicate message (%d vs %d)\n",
1946  LogGuid(src).c_str(), LogGuid(dst).c_str(), heartbeat.count.value, writer->heartbeat_recvd_count_));
1947  }
1948  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_heartbeat_i "
1949  "WARNING Count indicates duplicate, dropping\n"));
1950  return;
1951  }
1952 
1953  const bool is_final = heartbeat.smHeader.flags & RTPS::FLAG_F;
1954 
1955  static const SequenceNumber one, zero = SequenceNumber::ZERO();
1956 
1957  bool first_ever_hb = false;
1958 
1959  if (!is_final && transport_debug.log_nonfinal_messages) {
1960  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsReader::process_heartbeat_i - %C -> %C first %q last %q count %d\n",
1961  LogGuid(src).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
1962  }
1963 
1964  // Only valid heartbeats (see spec) will be "fully" applied to writer info
1965  if (!(hb_first < 1 || hb_last < 0 || hb_last < hb_first.previous())) {
1966  if (writer->recvd_.empty() && (directed || !writer->sends_directed_hb())) {
1967  OPENDDS_ASSERT(preassociation_writers_.count(writer));
1968  preassociation_writers_.erase(writer);
1970  log_progress("RTPS reader/writer association complete", id_, writer->id_, writer->participant_discovered_at_);
1971  }
1972  log_remote_counts("process_heartbeat_i");
1973 
1974  const SequenceRange sr(zero, hb_first.previous());
1975  writer->recvd_.insert(sr);
1976  while (!writer->held_.empty() && writer->held_.begin()->first <= sr.second) {
1977  writer->held_.erase(writer->held_.begin());
1978  }
1979  for (WriterInfo::HeldMap::const_iterator it = writer->held_.begin(); it != writer->held_.end(); ++it) {
1980  writer->recvd_.insert(it->first);
1981  }
1982  link->receive_strategy()->remove_fragments(sr, writer->id_);
1983  first_ever_hb = true;
1984  }
1985 
1986  ACE_CDR::ULong cumulative_bits_added = 0;
1987  if (!writer->recvd_.empty()) {
1988  writer->hb_last_ = std::max(writer->hb_last_, hb_last);
1989  gather_ack_nacks_i(writer, link, !is_final, meta_submessages, cumulative_bits_added);
1990  }
1991  if (cumulative_bits_added) {
1992  RtpsUdpInst_rch cfg = link->config();
1993  if (cfg && cfg->count_messages()) {
1994  ACE_Guard<ACE_Thread_Mutex> tsg(link->transport_statistics_mutex_);
1995  link->transport_statistics_.reader_nack_count[id_] += cumulative_bits_added;
1996  }
1997  }
1998  } else {
1999  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpDataLink::RtpsReader::process_heartbeat_i: %C -> %C - INVALID - first %q last %q count %d\n", LogGuid(writer->id_).c_str(), LogGuid(id_).c_str(), hb_first.getValue(), hb_last.getValue(), heartbeat.count.value));
2000  }
2001 
2002  guard.release();
2003  g.release();
2004 
2005  if (first_ever_hb) {
2006  link->invoke_on_start_callbacks(id_, src, true);
2007  }
2008 
2009  DeliverHeldData dhd(rchandle_from(this), src);
2010 
2011  //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC
2012  return;
2013 }
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RcHandle< WriterInfo > WriterInfo_rch
bool log_dropped_messages
Log received RTPS messages that were dropped.
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
const octet FLAG_F
Definition: RtpsCore.idl:523
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
#define VDBG(DBG_ARGS)
static SequenceNumber ZERO()
ACE_UINT32 ULong
std::pair< SequenceNumber, SequenceNumber > SequenceRange
void gather_ack_nacks_i(const WriterInfo_rch &writer, const RtpsUdpDataLink_rch &link, bool heartbeat_was_non_final, MetaSubmessageVec &meta_submessages, ACE_CDR::ULong &cumulative_bits_added)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
bool log_progress
Log progress for RTPS entity discovery and association.

◆ remove_writer()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::remove_writer ( const GUID_t id)

Definition at line 2188 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, log_remote_counts(), mutex_, preassociation_writers_, and remote_writers_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i().

2189 {
2191  WriterInfoMap::iterator pos = remote_writers_.find(id);
2192  if (pos != remote_writers_.end()) {
2193  preassociation_writers_.erase(pos->second);
2194  remote_writers_.erase(pos);
2195  log_remote_counts("remove_writer");
2196  return true;
2197  }
2198 
2199  return false;
2200 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ send_preassociation_acknacks()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::send_preassociation_acknacks ( const MonotonicTimePoint now)
private

Definition at line 2228 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, gather_preassociation_acknack_i(), heartbeat_period_, link_, mutex_, preassociation_task_, preassociation_writers_, OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages(), and stopping_.

2229 {
2230  RtpsUdpDataLink_rch link = link_.lock();
2231  if (!link) {
2232  return;
2233  }
2234 
2235  MetaSubmessageVec meta_submessages;
2236  {
2238 
2239  if (stopping_ || preassociation_writers_.empty()) {
2240  return;
2241  }
2242 
2243  // We want a heartbeat from these writers.
2244  meta_submessages.reserve(preassociation_writers_.size());
2245  for (WriterInfoSet::const_iterator pos = preassociation_writers_.begin(), limit = preassociation_writers_.end();
2246  pos != limit; ++pos) {
2247  gather_preassociation_acknack_i(meta_submessages, *pos);
2248  }
2249  }
2250 
2251  link->queue_submessages(meta_submessages);
2252 
2254 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< SporadicEvent > preassociation_task_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
void gather_preassociation_acknack_i(MetaSubmessageVec &meta_submessages, const WriterInfo_rch &writer)

◆ should_nack_fragments()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::should_nack_fragments ( const RcHandle< RtpsUdpDataLink > &  link,
const WriterInfo_rch info 
)

Definition at line 2210 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy().

Referenced by gather_ack_nacks_i().

2212 {
2213  if (!info->frags_.empty()) {
2214  return true;
2215  }
2216 
2217  if (!info->recvd_.empty()) {
2218  const SequenceRange range(info->recvd_.cumulative_ack() + 1, info->hb_last_);
2219  if (link->receive_strategy()->has_fragments(range, info->id_)) {
2220  return true;
2221  }
2222  }
2223 
2224  return false;
2225 }
RtpsUdpReceiveStrategy_rch receive_strategy()
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ writer_count()

size_t OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::writer_count ( ) const

Definition at line 2203 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, mutex_, and remote_writers_.

2204 {
2206  return remote_writers_.size();
2207 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

Member Data Documentation

◆ heartbeat_period_

TimeDuration OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::heartbeat_period_
private

Definition at line 679 of file RtpsUdpDataLink.h.

Referenced by add_writer(), and send_preassociation_acknacks().

◆ id_

const GUID_t OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_
private

◆ link_

WeakRcHandle<RtpsUdpDataLink> OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_
private

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_
mutableprivate

◆ nackfrag_count_

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::nackfrag_count_
private

Definition at line 677 of file RtpsUdpDataLink.h.

Referenced by generate_nack_frags_i().

◆ preassociation_task_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::preassociation_task_
private

Definition at line 678 of file RtpsUdpDataLink.h.

Referenced by add_writer(), and send_preassociation_acknacks().

◆ preassociation_writers_

WriterInfoSet OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::preassociation_writers_
private

◆ remote_writers_

WriterInfoMap OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::remote_writers_
private

◆ stopping_

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_
private

The documentation for this class was generated from the following files: