OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter Class Reference
Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter:
Collaboration graph
[legend]

Public Member Functions

 RtpsWriter (const TransportClient_rch &client, const RtpsUdpDataLink_rch &link, const GUID_t &id, bool durable, SequenceNumber max_sn, CORBA::Long heartbeat_count, size_t capacity)
 
virtual ~RtpsWriter ()
 
SequenceNumber max_data_seq (const SingleSendBuffer::Proxy &proxy, const ReaderInfo_rch &) const
 
SequenceNumber update_max_sn (const GUID_t &reader, SequenceNumber seq)
 
void add_elem_awaiting_ack (TransportQueueElement *element)
 
RemoveResult remove_sample (const DataSampleElement *sample)
 
void remove_all_msgs ()
 
bool add_reader (const ReaderInfo_rch &reader)
 
bool has_reader (const GUID_t &id) const
 
bool is_leading (const GUID_t &id) const
 
bool remove_reader (const GUID_t &id)
 
size_t reader_count () const
 
CORBA::Long inc_heartbeat_count ()
 
void pre_stop_helper (TqeVector &to_drop, bool true_stop)
 
TransportQueueElementcustomize_queue_element_helper (TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send)
 
void process_acknack (const RTPS::AckNackSubmessage &acknack, const GUID_t &src, MetaSubmessageVec &meta_submessages)
 
void process_nackfrag (const RTPS::NackFragSubmessage &nackfrag, const GUID_t &src, MetaSubmessageVec &meta_submessages)
 
void process_acked_by_all ()
 
void gather_nack_replies_i (MetaSubmessageVec &meta_submessages)
 
void gather_heartbeats_i (MetaSubmessageVec &meta_submessages)
 
void gather_heartbeats (RcHandle< ConstSharedRepoIdSet > additional_guids, MetaSubmessageVec &meta_submessages)
 
void update_required_acknack_count (const GUID_t &id, CORBA::Long current)
 
RcHandle< SingleSendBufferget_send_buff ()
 
RcHandle< ConstSharedRepoIdSetget_remote_reader_guids ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Member Functions

typedef OPENDDS_MULTISET (OpenDDS::DCPS::SequenceNumber) SequenceNumberMultiset
 
typedef OPENDDS_SET_CMP (TransportQueueElement *, TransportQueueElement::OrderBySequenceNumber) TqeSet
 
typedef OPENDDS_MULTIMAP (SequenceNumber, TransportQueueElement *) SnToTqeMap
 
void send_heartbeats (const MonotonicTimePoint &now)
 
void send_nack_responses (const MonotonicTimePoint &now)
 
void add_gap_submsg_i (RTPS::SubmessageSeq &msg, SequenceNumber gap_start)
 
void end_historic_samples_i (const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
 
void request_ack_i (const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
 
void send_heartbeats_manual_i (MetaSubmessageVec &meta_submessages)
 
void gather_gaps_i (const ReaderInfo_rch &reader, const DisjointSequence &gaps, MetaSubmessageVec &meta_submessages)
 
void acked_by_all_helper_i (TqeSet &to_deliver)
 
SequenceNumber expected_max_sn (const ReaderInfo_rch &reader) const
 
void make_leader_lagger (const GUID_t &reader, SequenceNumber previous_max_sn)
 
void make_lagger_leader (const ReaderInfo_rch &reader, const SequenceNumber previous_acked_sn)
 
bool is_lagging (const ReaderInfo_rch &reader) const
 
bool is_leading (const ReaderInfo_rch &reader) const
 
void check_leader_lagger () const
 
void record_directed (const GUID_t &reader, SequenceNumber seq)
 
void update_remote_guids_cache_i (bool add, const GUID_t &guid)
 
bool is_pvs_writer () const
 
SequenceNumber non_durable_first_sn (const SingleSendBuffer::Proxy &proxy) const
 
void remove_preassociation_reader (const ReaderInfo_rch &reader)
 
void initialize_heartbeat (const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
 
void gather_directed_heartbeat_i (const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
 
void log_remote_counts (const char *funcname)
 

Static Private Member Functions

static void snris_insert (RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
 
static void snris_erase (RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
 

Private Attributes

ReaderInfoMap remote_readers_
 
RcHandle< ConstSharedRepoIdSetremote_reader_guids_
 
ReaderInfoSet preassociation_readers_
 Preassociation readers require a non-final heartbeat. More...
 
SequenceNumberMultiset preassociation_reader_start_sns_
 
SNRIS lagging_readers_
 
SNRIS leading_readers_
 These reader have acked everything they are supposed to have acked. More...
 
ReaderInfoSet readers_expecting_data_
 These readers have sent a nack and are expecting data. More...
 
ReaderInfoSet readers_expecting_heartbeat_
 These readers have sent a non-final ack are are expecting a heartbeat. More...
 
RcHandle< SingleSendBuffersend_buff_
 
SequenceNumber max_sn_
 
SnToTqeMap elems_not_acked_
 
WeakRcHandle< TransportClientclient_
 
WeakRcHandle< RtpsUdpDataLinklink_
 
const GUID_t id_
 
const bool durable_
 
bool stopping_
 
CORBA::Long heartbeat_count_
 
const bool is_pvs_writer_
 Participant Volatile Secure writer. More...
 
const bool is_ps_writer_
 Partcicipant Secure (Reliable SPDP) writer. More...
 
ACE_Thread_Mutex mutex_
 
ACE_Thread_Mutex remote_reader_guids_mutex_
 
ACE_Thread_Mutex elems_not_acked_mutex_
 
RcHandle< SporadicEventheartbeat_
 
RcHandle< SporadicEventnack_response_
 
const TimeDuration initial_fallback_
 
FibonacciSequence< TimeDurationfallback_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Detailed Description

Definition at line 424 of file RtpsUdpDataLink.h.

Constructor & Destructor Documentation

◆ RtpsWriter()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::RtpsWriter ( const TransportClient_rch client,
const RtpsUdpDataLink_rch link,
const GUID_t id,
bool  durable,
SequenceNumber  max_sn,
CORBA::Long  heartbeat_count,
size_t  capacity 
)

Definition at line 4445 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::in(), send_buff_, and OpenDDS::DCPS::RtpsUdpDataLink::send_strategy().

4448  : send_buff_(make_rch<SingleSendBuffer>(capacity, ONE_SAMPLE_PER_PACKET))
4450  , client_(client)
4451  , link_(link)
4452  , id_(id)
4453  , durable_(durable)
4454  , stopping_(false)
4455  , heartbeat_count_(heartbeat_count)
4456 #ifdef OPENDDS_SECURITY
4459 #endif
4460  , heartbeat_(make_rch<SporadicEvent>(link->event_dispatcher(), make_rch<PmfNowEvent<RtpsWriter> >(rchandle_from(this), &RtpsWriter::send_heartbeats)))
4461  , nack_response_(make_rch<SporadicEvent>(link->event_dispatcher(), make_rch<PmfNowEvent<RtpsWriter> >(rchandle_from(this), &RtpsWriter::send_nack_responses)))
4462  , initial_fallback_(link->config()->heartbeat_period_)
4464 {
4465  send_buff_->bind(link->send_strategy().in());
4466 }
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
WeakRcHandle< TransportClient > client_
const bool is_pvs_writer_
Participant Volatile Secure writer.
static SequenceNumber ZERO()
const size_t ONE_SAMPLE_PER_PACKET
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
Definition: MessageTypes.h:85
void send_heartbeats(const MonotonicTimePoint &now)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
FibonacciSequence< TimeDuration > fallback_
const bool is_ps_writer_
Partcicipant Secure (Reliable SPDP) writer.
const EntityId_t ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER
Definition: MessageTypes.h:87
void send_nack_responses(const MonotonicTimePoint &now)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_

◆ ~RtpsWriter()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::~RtpsWriter ( )
virtual

Definition at line 4468 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), elems_not_acked_, and LM_WARNING.

4469 {
4470  if (!elems_not_acked_.empty()) {
4471  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: RtpsWriter::~RtpsWriter - ")
4472  ACE_TEXT("deleting with %d elements left not fully acknowledged\n"),
4473  elems_not_acked_.size()));
4474  }
4475 }
#define ACE_DEBUG(X)
ACE_TEXT("TCP_Factory")

Member Function Documentation

◆ acked_by_all_helper_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::acked_by_all_helper_i ( TqeSet &  to_deliver)
private

Definition at line 3969 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_MULTIMAP, and OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR().

3970 {
3971  using namespace OpenDDS::RTPS;
3972  typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
3973  OPENDDS_VECTOR(GUID_t) to_check;
3974 
3975  RtpsUdpDataLink_rch link = link_.lock();
3976 
3977  if (!link) {
3978  return;
3979  }
3980 
3981  //start with the max sequence number writer knows about and decrease
3982  //by what the min over all readers is
3983  SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
3984  if (!preassociation_readers_.empty()) {
3985  all_readers_ack = std::min(all_readers_ack, *preassociation_reader_start_sns_.begin());
3986  }
3987  if (!lagging_readers_.empty()) {
3988  all_readers_ack = std::min(all_readers_ack, lagging_readers_.begin()->first + 1);
3989  }
3990  if (!leading_readers_.empty()) {
3991  // When is_pvs_writer_ is true, the leading_readers_ will all be
3992  // at different sequence numbers. The minimum could actually be
3993  // before the preassociation readers or lagging readers. Use the
3994  // largest sequence number to avoid holding onto samples.
3995  all_readers_ack = std::min(all_readers_ack, leading_readers_.rbegin()->first + 1);
3996  }
3997 
3998  if (all_readers_ack == SequenceNumber::MAX_VALUE) {
3999  return;
4000  }
4001 
4003 
4004  if (!elems_not_acked_.empty()) {
4005  for (iter_t it = elems_not_acked_.begin(), limit = elems_not_acked_.end();
4006  it != limit && it->first < all_readers_ack;) {
4007  send_buff_->release_acked(it->first);
4008  to_deliver.insert(it->second);
4009  elems_not_acked_.erase(it++);
4010  }
4011  }
4012 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
SequenceNumberMultiset preassociation_reader_start_sns_
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
static const Value MAX_VALUE
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement *) SnToTqeMap
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_

◆ add_elem_awaiting_ack()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_elem_awaiting_ack ( TransportQueueElement element)

Definition at line 4505 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, elems_not_acked_, elems_not_acked_mutex_, and OpenDDS::DCPS::TransportQueueElement::sequence().

4506 {
4508  elems_not_acked_.insert(SnToTqeMap::value_type(element->sequence(), element));
4509 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ add_gap_submsg_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_gap_submsg_i ( RTPS::SubmessageSeq msg,
SequenceNumber  gap_start 
)
private

Definition at line 1501 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::DCPS::grow(), OpenDDS::DCPS::DataLink::id_, OpenDDS::DCPS::Encoding::KIND_XCDR1, OPENDDS_ASSERT, OpenDDS::DCPS::serialized_size(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::GapSubmessage::smHeader, OpenDDS::RTPS::SubmessageHeader::submessageLength, and OpenDDS::RTPS::to_rtps_seqnum().

1503 {
1504  // These are the GAP submessages that we'll send directly in-line with the
1505  // DATA when we notice that the DataWriter has deliberately skipped seq #s.
1506  // There are other GAP submessages generated in meta_submessage to reader ACKNACKS,
1507  // see send_nack_replies().
1508  using namespace OpenDDS::RTPS;
1509 
1510  const LongSeq8 bitmap;
1511 
1512  // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
1513  // [gapStart, gapListBase) and those in the SNSet.
1514  GapSubmessage gap = {
1515  {GAP, FLAG_E, 0 /*length determined below*/},
1516  ENTITYID_UNKNOWN, // readerId: applies to all matched readers
1517  id_.entityId,
1518  to_rtps_seqnum(gap_start),
1519  {to_rtps_seqnum(max_sn_), 0, bitmap}
1520  };
1521  OPENDDS_ASSERT(gap_start < max_sn_);
1522 
1523  const size_t size = serialized_size(Encoding(Encoding::KIND_XCDR1), gap);
1525  static_cast<CORBA::UShort>(size) - SMHDR_SZ;
1526 
1527  const CORBA::ULong idx = grow(msg) - 1;
1528  msg[idx].gap_sm(gap);
1529 }
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
const octet FLAG_E
Definition: RtpsCore.idl:518
SubmessageHeader smHeader
Definition: RtpsCore.idl:570
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
ACE_CDR::ULong ULong
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
ACE_CDR::UShort UShort
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ add_reader()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_reader ( const ReaderInfo_rch reader)

Definition at line 2032 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::log_remote_counts(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OPENDDS_ASSERT, OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages(), and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_.

2033 {
2035  OPENDDS_ASSERT(!reader->durable_ || durable_);
2036 
2037  if (stopping_) {
2038  return false;
2039  }
2040 
2041  ReaderInfoMap::const_iterator iter = remote_readers_.find(reader->id_);
2042  if (iter == remote_readers_.end()) {
2043 #ifdef OPENDDS_SECURITY
2044  if (is_pvs_writer_) {
2045  reader->max_pvs_sn_ = max_sn_;
2046  }
2047 #endif
2048  remote_readers_.insert(ReaderInfoMap::value_type(reader->id_, reader));
2049  update_remote_guids_cache_i(true, reader->id_);
2050  preassociation_readers_.insert(reader);
2051  preassociation_reader_start_sns_.insert(reader->start_sn_);
2052  log_remote_counts("add_reader");
2053 
2054  RtpsUdpDataLink_rch link = link_.lock();
2055  if (!link) {
2056  return false;
2057  }
2058 
2060  heartbeat_->schedule(fallback_.get());
2061  // Durable readers will get their heartbeat from end historic samples.
2062  if (!reader->durable_) {
2063  MetaSubmessageVec meta_submessages;
2064  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
2065  const SingleSendBuffer::Proxy proxy(*send_buff_);
2066  initialize_heartbeat(proxy, meta_submessage);
2067  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
2068  g.release();
2069  link->queue_submessages(meta_submessages);
2070  }
2071 
2072  return true;
2073  }
2074  return false;
2075 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
void update_remote_guids_cache_i(bool add, const GUID_t &guid)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
const bool is_pvs_writer_
Participant Volatile Secure writer.
SequenceNumberMultiset preassociation_reader_start_sns_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
FibonacciSequence< TimeDuration > fallback_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_

◆ check_leader_lagger()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::check_leader_lagger ( ) const
private

Definition at line 3896 of file RtpsUdpDataLink.cpp.

References OPENDDS_ASSERT, OpenDDS::DCPS::SequenceNumber::previous(), and OpenDDS::DCPS::SequenceNumber::ZERO().

Referenced by update_max_sn().

3897 {
3898 #ifndef OPENDDS_SAFETY_PROFILE
3899 #ifndef NDEBUG
3900  static const SequenceNumber negative_one = SequenceNumber::ZERO().previous();
3901  for (SNRIS::const_iterator pos1 = lagging_readers_.begin(), limit = lagging_readers_.end();
3902  pos1 != limit; ++pos1) {
3903  const SequenceNumber& sn = pos1->first;
3904  const ReaderInfoSetHolder_rch& readers = pos1->second;
3905  for (ReaderInfoSet::const_iterator pos2 = readers->readers.begin(), limit = readers->readers.end();
3906  pos2 != limit; ++pos2) {
3907  const ReaderInfo_rch& reader = *pos2;
3908  OPENDDS_ASSERT(reader->acked_sn() == sn);
3909  const SequenceNumber expect_max_sn = expected_max_sn(reader);
3910  OPENDDS_ASSERT(sn == negative_one || sn < expect_max_sn);
3911  OPENDDS_ASSERT(preassociation_readers_.count(reader) == 0);
3912  }
3913  }
3914 
3915  for (SNRIS::const_iterator pos1 = leading_readers_.begin(), limit = leading_readers_.end();
3916  pos1 != limit; ++pos1) {
3917  const SequenceNumber& sn = pos1->first;
3918  const ReaderInfoSetHolder_rch& readers = pos1->second;
3919  for (ReaderInfoSet::const_iterator pos2 = readers->readers.begin(), limit = readers->readers.end();
3920  pos2 != limit; ++pos2) {
3921  const ReaderInfo_rch& reader = *pos2;
3922  OPENDDS_ASSERT(reader->acked_sn() == sn);
3923  const SequenceNumber expect_max_sn = expected_max_sn(reader);
3924  OPENDDS_ASSERT(sn == expect_max_sn);
3925  OPENDDS_ASSERT(preassociation_readers_.count(reader) == 0);
3926  }
3927  }
3928 #endif
3929 #endif
3930 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
SequenceNumber previous() const
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
static SequenceNumber ZERO()
RcHandle< ReaderInfo > ReaderInfo_rch
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
RcHandle< ReaderInfoSetHolder > ReaderInfoSetHolder_rch

◆ customize_queue_element_helper()

TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper ( TransportQueueElement element,
bool  requires_inline_qos,
MetaSubmessageVec &  meta_submessages,
bool &  deliver_after_send 
)

Definition at line 1077 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, OpenDDS::DCPS::LogGuid::c_str(), ACE_Message_Block::cont(), OpenDDS::DCPS::RtpsSampleHeader::control_message_supported(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, ACE_Message_Block::duplicate(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::TransportSendControlElement::header(), OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataLink::id_, OpenDDS::DCPS::TransportQueueElement::is_fragment(), OpenDDS::DCPS::TransportQueueElement::is_last_fragment(), LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_messages, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, OPENDDS_ASSERT, OpenDDS::DCPS::TransportCustomizedElement::original_send_element(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::REQUEST_ACK, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::TransportSendElement::sample(), OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual_i(), OpenDDS::DCPS::RtpsUdpDataLink::send_strategy(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::TransportCustomizedElement::sequence(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::DCPS::RtpsUdpDataLink::submsgs_to_msgblock(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::transport_debug, and OpenDDS::DCPS::Transport_debug_level.

1082 {
1084 
1085  RtpsUdpDataLink_rch link = link_.lock();
1086  if (stopping_ || !link) {
1087  g.release();
1088  element->data_dropped(true);
1089  return 0;
1090  }
1091 
1092  OPENDDS_ASSERT(element->publication_id() == id_);
1093 
1094  const SequenceNumber previous_max_sn = max_sn_;
1095  RTPS::SubmessageSeq subm;
1096 
1097  const SequenceNumber seq = element->sequence();
1099  if (!element->is_fragment() || element->is_last_fragment()) {
1100  max_sn_ = std::max(max_sn_, seq);
1101  }
1102  if (!durable_ && !is_pvs_writer() &&
1103  element->subscription_id() == GUID_UNKNOWN &&
1104  previous_max_sn < max_sn_.previous()) {
1105  add_gap_submsg_i(subm, previous_max_sn + 1);
1106  }
1107  }
1108 
1109  make_leader_lagger(element->subscription_id(), previous_max_sn);
1111 
1112  TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
1113  TransportCustomizedElement* tce =
1114  dynamic_cast<TransportCustomizedElement*>(element);
1115  TransportSendControlElement* tsce =
1116  dynamic_cast<TransportSendControlElement*>(element);
1117 
1118  Message_Block_Ptr data;
1119  bool durable = false;
1120 
1121  const ACE_Message_Block* msg = element->msg();
1122  const GUID_t pub_id = element->publication_id();
1123 
1124  // Based on the type of 'element', find and duplicate the data payload
1125  // continuation block.
1126  if (tsce) { // Control message
1127  if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
1128  data.reset(msg->cont()->duplicate());
1129  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1131  subm, *tsce, requires_inline_qos);
1132  record_directed(element->subscription_id(), seq);
1133  } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
1134  end_historic_samples_i(tsce->header(), msg->cont(), meta_submessages);
1135  g.release();
1136  element->data_delivered();
1137  return 0;
1138  } else if (tsce->header().message_id_ == REQUEST_ACK) {
1139  request_ack_i(tsce->header(), msg->cont(), meta_submessages);
1140  deliver_after_send = true;
1141  return 0;
1142  } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
1143  send_heartbeats_manual_i(meta_submessages);
1144  deliver_after_send = true;
1145  return 0;
1146  } else {
1147  g.release();
1148  element->data_dropped(true /*dropped_by_transport*/);
1149  return 0;
1150  }
1151 
1152  } else if (tse) { // Basic data message
1153  // {DataSampleHeader} -> {Data Payload}
1154  data.reset(msg->cont()->duplicate());
1155  const DataSampleElement* dsle = tse->sample();
1156  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1158  subm, *dsle, requires_inline_qos);
1159  record_directed(element->subscription_id(), seq);
1160  durable = dsle->get_header().historic_sample_;
1161 
1162  } else if (tce) { // Customized data message
1163  // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
1164  data.reset(msg->cont()->cont()->duplicate());
1165  const DataSampleElement* dsle = tce->original_send_element()->sample();
1166  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1168  subm, *dsle, requires_inline_qos);
1169  record_directed(element->subscription_id(), seq);
1170  durable = dsle->get_header().historic_sample_;
1171 
1172  } else {
1173  send_buff_->pre_insert(seq);
1174  return element;
1175  }
1176 
1177 #ifdef OPENDDS_SECURITY
1178  {
1179  GuardType guard(link->strategy_lock_);
1180  if (link->send_strategy_) {
1181  link->send_strategy()->encode_payload(pub_id, data, subm);
1182  }
1183  }
1184 #endif
1185 
1186  if (stopping_) {
1187  g.release();
1188  element->data_dropped(true);
1189  return 0;
1190  }
1191 
1193  link->send_strategy()->append_submessages(subm);
1194  }
1195 
1196  Message_Block_Ptr hdr(link->submsgs_to_msgblock(subm));
1197  hdr->cont(data.release());
1198  RtpsCustomizedElement* rtps =
1199  new RtpsCustomizedElement(element, move(hdr));
1200 
1201  // Handle durability resends
1202  if (durable) {
1203  const GUID_t sub = element->subscription_id();
1204  if (sub != GUID_UNKNOWN) {
1205  ReaderInfoMap::iterator ri = remote_readers_.find(sub);
1206  if (ri != remote_readers_.end()) {
1207  ri->second->durable_data_[rtps->sequence()] = rtps;
1208  ri->second->durable_timestamp_.set_to_now();
1209  if (Transport_debug_level > 3) {
1210  const LogGuid conv(pub_id), sub_conv(sub);
1211  ACE_DEBUG((LM_DEBUG,
1212  "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
1213  "storing durable data for local %C remote %C seq %q\n",
1214  conv.c_str(), sub_conv.c_str(),
1215  rtps->sequence().getValue()));
1216  }
1217  return 0;
1218  }
1219  }
1220  }
1221 
1222  send_buff_->pre_insert(seq);
1223  return rtps;
1224 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
sequence< Submessage > SubmessageSeq
Definition: RtpsCore.idl:879
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
SequenceNumber previous() const
void reset(void)
static void populate_data_sample_submessages(RTPS::SubmessageSeq &subm, const DataSampleElement &dsle, bool requires_inline_qos)
void end_historic_samples_i(const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
void request_ack_i(const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
void record_directed(const GUID_t &reader, SequenceNumber seq)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void add_gap_submsg_i(RTPS::SubmessageSeq &msg, SequenceNumber gap_start)
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
static bool control_message_supported(char message_id)
void make_leader_lagger(const GUID_t &reader, SequenceNumber previous_max_sn)
void send_heartbeats_manual_i(MetaSubmessageVec &meta_submessages)
bool log_messages
Log all RTPS messages sent or recieved.
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
WeakRcHandle< RtpsUdpDataLink > link_
static void populate_data_control_submessages(RTPS::SubmessageSeq &subm, const TransportSendControlElement &tsce, bool requires_inline_qos)
RcHandle< SingleSendBuffer > send_buff_
bool requires_inline_qos(const GUIDSeq_var &peers)

◆ end_historic_samples_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::end_historic_samples_i ( const DataSampleHeader header,
ACE_Message_Block body,
MetaSubmessageVec &  meta_submessages 
)
private

Definition at line 1343 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataLink::id_, LM_DEBUG, OpenDDS::DCPS::log_progress(), OpenDDS::DCPS::TransportDebug::log_progress, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::transport_debug, and OpenDDS::DCPS::Transport_debug_level.

1346 {
1347  // Set the ReaderInfo::durable_timestamp_ for the case where no
1348  // durable samples exist in the DataWriter.
1349  if (durable_) {
1351  GUID_t sub = GUID_UNKNOWN;
1352  if (body && header.message_length_ >= sizeof(sub)) {
1353  std::memcpy(&sub, body->rd_ptr(), sizeof(sub));
1354  }
1355  typedef ReaderInfoMap::iterator iter_t;
1356  if (sub == GUID_UNKNOWN) {
1357  if (Transport_debug_level > 3) {
1358  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
1359  "local %C all readers\n", LogGuid(id_).c_str()));
1360  }
1361  for (iter_t iter = remote_readers_.begin();
1362  iter != remote_readers_.end(); ++iter) {
1363  if (iter->second->durable_) {
1364  iter->second->durable_timestamp_ = now;
1366  log_progress("durable data queued", id_, iter->first, iter->second->participant_discovered_at_);
1367  }
1368  }
1369  }
1370  } else {
1371  iter_t iter = remote_readers_.find(sub);
1372  if (iter != remote_readers_.end()) {
1373  if (iter->second->durable_) {
1374  iter->second->durable_timestamp_ = now;
1376  log_progress("durable data queued", id_, iter->first, iter->second->participant_discovered_at_);
1377  }
1378  const SingleSendBuffer::Proxy proxy(*send_buff_);
1379  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
1380  initialize_heartbeat(proxy, meta_submessage);
1381  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, iter->second);
1382  if (Transport_debug_level > 3) {
1383  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
1384  " local %C remote %C\n", LogGuid(id_).c_str(), LogGuid(sub).c_str()));
1385  }
1386  }
1387  }
1388  }
1389  }
1390 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
char * rd_ptr(void) const
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
RcHandle< SingleSendBuffer > send_buff_
bool log_progress
Log progress for RTPS entity discovery and association.

◆ expected_max_sn()

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::expected_max_sn ( const ReaderInfo_rch reader) const
private

Definition at line 3758 of file RtpsUdpDataLink.cpp.

3759 {
3760  ACE_UNUSED_ARG(reader);
3761 #ifdef OPENDDS_SECURITY
3762  if (is_pvs_writer_) {
3763  return reader->max_pvs_sn_;
3764  } else {
3765 #endif
3766  return max_sn_;
3767 #ifdef OPENDDS_SECURITY
3768  }
3769 #endif
3770 }
const bool is_pvs_writer_
Participant Volatile Secure writer.

◆ gather_directed_heartbeat_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_directed_heartbeat_i ( const SingleSendBuffer::Proxy proxy,
MetaSubmessageVec &  meta_submessages,
MetaSubmessage meta_submessage,
const ReaderInfo_rch reader 
)
private

Definition at line 4179 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::MetaSubmessage::dst_guid_, OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HeartBeatSubmessage::lastSN, OPENDDS_ASSERT, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::RTPS::HeartBeatSubmessage::readerId, OpenDDS::DCPS::MetaSubmessage::reset_destination(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::to_rtps_seqnum(), and OpenDDS::RTPS::Count_t::value.

4183 {
4184  const SequenceNumber first_sn = reader->durable_ ? 1 : std::max(non_durable_first_sn(proxy), reader->start_sn_);
4185  SequenceNumber last_sn = expected_max_sn(reader);
4186 #ifdef OPENDDS_SECURITY
4187  if (is_pvs_writer_ && last_sn < first_sn.previous()) {
4188  // This can happen if the reader get's reset.
4189  // Adjust the heartbeat to be valid.
4190  // Make lagger_leader will eventually correct the problem.
4191  last_sn = first_sn.previous();
4192  }
4193 #endif
4194  meta_submessage.dst_guid_ = reader->id_;
4195  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4196  meta_submessage.sm_.heartbeat_sm().readerId = reader->id_.entityId;
4197  meta_submessage.sm_.heartbeat_sm().firstSN = to_rtps_seqnum(first_sn);
4198  meta_submessage.sm_.heartbeat_sm().lastSN = to_rtps_seqnum(last_sn);
4199  OPENDDS_ASSERT(!(first_sn < 1 || last_sn < 0 || last_sn < first_sn.previous()));
4200  meta_submessages.push_back(meta_submessage);
4201  meta_submessage.reset_destination();
4202 }
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const bool is_pvs_writer_
Participant Volatile Secure writer.
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139

◆ gather_gaps_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_gaps_i ( const ReaderInfo_rch reader,
const DisjointSequence gaps,
MetaSubmessageVec &  meta_submessages 
)
private

Definition at line 3099 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DisjointSequence::bitmap_num_longs(), OpenDDS::RTPS::SequenceNumberSet::bitmapBase, OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::RTPS::Submessage::gap_sm, OpenDDS::RTPS::GapSubmessage::gapList, OpenDDS::RTPS::GapSubmessage::gapStart, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_ASSERT, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::DCPS::DisjointSequence::to_bitmap(), OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::RTPS::to_rtps_seqnum(), and OpenDDS::DCPS::Transport_debug_level.

3102 {
3103  using namespace RTPS;
3104 
3105  OPENDDS_ASSERT(reader || !durable_);
3106 
3107  if (gaps.empty()) {
3108  return;
3109  }
3110 
3111  // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
3112  // [gapStart, gapListBase) and those in the SNSet.
3113  const SequenceNumber firstMissing = gaps.low(),
3114  base = ++SequenceNumber(gaps.cumulative_ack());
3115  const SequenceNumber_t gapStart = to_rtps_seqnum(firstMissing);
3116  const SequenceNumber_t gapListBase = to_rtps_seqnum(base);
3117  CORBA::ULong num_bits = 0;
3118  LongSeq8 bitmap;
3119 
3120  if (gaps.disjoint()) {
3121  bitmap.length(DisjointSequence::bitmap_num_longs(base, gaps.high()));
3122  if (bitmap.length() > 0) {
3123  ACE_CDR::ULong cumulative_bits_added = 0;
3124  (void)gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits, cumulative_bits_added);
3125  }
3126  }
3127 
3128  MetaSubmessage meta_submessage(id_, reader ? reader->id_ : GUID_UNKNOWN);
3129  GapSubmessage gap = {
3130  {GAP, FLAG_E, 0 /*length determined later*/},
3131  reader ? reader->id_.entityId : ENTITYID_UNKNOWN,
3132  id_.entityId,
3133  gapStart,
3134  {gapListBase, num_bits, bitmap}
3135  };
3136  OPENDDS_ASSERT(firstMissing < base);
3137  meta_submessage.sm_.gap_sm(gap);
3138 
3139  if (Transport_debug_level > 5) {
3140  const LogGuid conv(id_);
3141  SequenceRange sr;
3142  sr.first = to_opendds_seqnum(gap.gapStart);
3143  const SequenceNumber srbase = to_opendds_seqnum(gap.gapList.bitmapBase);
3144  sr.second = srbase.previous();
3145  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_gaps_i "
3146  "GAP with range [%q, %q] from %C\n",
3147  sr.first.getValue(), sr.second.getValue(),
3148  conv.c_str()));
3149  }
3150 
3151  meta_submessages.push_back(meta_submessage);
3152 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
const octet FLAG_E
Definition: RtpsCore.idl:518
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
static ACE_CDR::ULong bitmap_num_longs(const SequenceNumber &low, const SequenceNumber &high)
ACE_CDR::ULong ULong
SequenceNumber_t gapStart
Definition: RtpsCore.idl:573
ACE_UINT32 ULong
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
std::pair< SequenceNumber, SequenceNumber > SequenceRange
SequenceNumberSet gapList
Definition: RtpsCore.idl:574

◆ gather_heartbeats()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_heartbeats ( RcHandle< ConstSharedRepoIdSet additional_guids,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 4297 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::MetaSubmessage::dst_guid_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OPENDDS_ASSERT, OpenDDS::DCPS::MetaSubmessage::reset_destination(), OpenDDS::DCPS::MetaSubmessage::sm_, and OpenDDS::RTPS::Count_t::value.

4299 {
4300  OPENDDS_ASSERT(!additional_guids->guids_.empty());
4301 
4303 
4304  RtpsUdpDataLink_rch link = link_.lock();
4305  if (!link) {
4306  return;
4307  }
4308 
4309  const SingleSendBuffer::Proxy proxy(*send_buff_);
4310 
4311  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4312  initialize_heartbeat(proxy, meta_submessage);
4313 
4314  for (RepoIdSet::const_iterator it = additional_guids->guids_.begin(),
4315  limit = additional_guids->guids_.end(); it != limit; ++it) {
4316 
4317  // Semi-directed (INFO_DST but ENTITYID_UNKNOWN, non-final.
4318  meta_submessage.dst_guid_ = *it;
4319  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4320  meta_submessages.push_back(meta_submessage);
4321  meta_submessage.reset_destination();
4322  }
4323 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_

◆ gather_heartbeats_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_heartbeats_i ( MetaSubmessageVec &  meta_submessages)

Definition at line 4231 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::RTPS::HeartBeatSubmessage::lastSN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::RTPS::HeartBeatSubmessage::readerId, OpenDDS::DCPS::MetaSubmessage::reset_destination(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::to_rtps_seqnum(), and OpenDDS::RTPS::Count_t::value.

4232 {
4233  if (preassociation_readers_.empty() && lagging_readers_.empty()) {
4234  return;
4235  }
4236 
4238 
4239  RtpsUdpDataLink_rch link = link_.lock();
4240  if (!link) {
4241  return;
4242  }
4243 
4244  using namespace OpenDDS::RTPS;
4245 
4246  const SingleSendBuffer::Proxy proxy(*send_buff_);
4247 
4248  // Assume no samples are available.
4249  const SequenceNumber nonDurableFirstSN = non_durable_first_sn(proxy);
4250  const SequenceNumber firstSN = durable_ ? 1 : nonDurableFirstSN;
4251  const SequenceNumber lastSN = max_sn_;
4252 
4253  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4254  initialize_heartbeat(proxy, meta_submessage);
4255 
4256  // Directed, non-final.
4257  if (!preassociation_readers_.empty()) {
4258  meta_submessages.reserve(meta_submessages.size() + preassociation_readers_.size());
4259  for (ReaderInfoSet::const_iterator pos = preassociation_readers_.begin(), limit = preassociation_readers_.end();
4260  pos != limit; ++pos) {
4261  const ReaderInfo_rch& reader = *pos;
4262  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
4263  }
4264  }
4265 
4266  if (!lagging_readers_.empty()) {
4267  if (leading_readers_.empty() && remote_readers_.size() > 1
4268 #ifdef OPENDDS_SECURITY
4269  && !is_pvs_writer_
4270  && !is_ps_writer_
4271 #endif
4272  ) {
4273  // Every reader is lagging and there is more than one.
4274  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4275  meta_submessage.sm_.heartbeat_sm().readerId = ENTITYID_UNKNOWN;
4276  meta_submessage.sm_.heartbeat_sm().firstSN = to_rtps_seqnum(firstSN);
4277  meta_submessage.sm_.heartbeat_sm().lastSN = to_rtps_seqnum(lastSN);
4278 
4279  meta_submessages.push_back(meta_submessage);
4280  meta_submessage.reset_destination();
4281  } else {
4282  for (SNRIS::const_iterator snris_pos = lagging_readers_.begin(), snris_limit = lagging_readers_.end();
4283  snris_pos != snris_limit; ++snris_pos) {
4284  meta_submessages.reserve(meta_submessages.size() + snris_pos->second->readers.size());
4285  for (ReaderInfoSet::const_iterator pos = snris_pos->second->readers.begin(),
4286  limit = snris_pos->second->readers.end();
4287  pos != limit; ++pos) {
4288  const ReaderInfo_rch& reader = *pos;
4289  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
4290  }
4291  }
4292  }
4293  }
4294 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
const bool is_pvs_writer_
Participant Volatile Secure writer.
RcHandle< ReaderInfo > ReaderInfo_rch
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
const bool is_ps_writer_
Partcicipant Secure (Reliable SPDP) writer.
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_

◆ gather_nack_replies_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i ( MetaSubmessageVec &  meta_submessages)

Definition at line 3466 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::accumulate_addresses(), ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::RtpsUdpDataLink::durability_resend(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::RtpsUdpDataLink::get_addresses(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, LM_DEBUG, OpenDDS::DCPS::RtpsUdpDataLink::locators_lock_, OpenDDS::RTPS::OPENDDS_FLAG_R, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP(), OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::RtpsUdpDataLink::send_strategy(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::HeartBeatSubmessage::smHeader, OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_mutex_, and OpenDDS::DCPS::InternalTransportStatistics::writer_resend_count.

3467 {
3468  RtpsUdpDataLink_rch link = link_.lock();
3469 
3470  if (!link) {
3471  return;
3472  }
3473 
3474  // Process naks requests from each reader replying with either data or a gap.
3475  // Requests for directed data are answered with a directed reply.
3476  // Requests for undirected data are answered with an undirected and consolidated reply.
3477  // Directed data includes things like the PVS writer.
3478  // The requests_ for a reader should not contain requests for durable data.
3479 
3480  typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
3481 
3482  // Consolidated non-directed requests and address sets to be sent together at the end, after directed replies
3483  typedef OPENDDS_MAP(SequenceNumber, AddrSet) RecipientMap;
3484  typedef OPENDDS_MAP(SequenceNumber, RepoIdSet) ReaderMap;
3485  DisjointSequence consolidated_requests;
3486  ReaderMap consolidated_request_readers;
3487  RecipientMap consolidated_recipients_unicast;
3488  RecipientMap consolidated_recipients_multicast;
3489  FragmentInfo consolidated_fragment_requests;
3490  ReaderMap consolidated_fragment_request_readers;
3491  RecipientMap consolidated_fragment_recipients_unicast;
3492  RecipientMap consolidated_fragment_recipients_multicast;
3493  DisjointSequence consolidated_gaps;
3494 
3495  ACE_GUARD(TransportSendBuffer::LockType, guard, send_buff_->strategy_lock());
3496  SingleSendBuffer::Proxy proxy(*send_buff_);
3497 
3498  size_t cumulative_send_count = 0;
3499 
3500  for (ReaderInfoSet::const_iterator pos = readers_expecting_data_.begin(), limit = readers_expecting_data_.end();
3501  pos != limit; ++pos) {
3502 
3503  const ReaderInfo_rch& reader = *pos;
3504  const AddrSet addrs = link->get_addresses(id_, reader->id_);
3505 
3506  DisjointSequence gaps;
3507 
3508  if (reader->expecting_durable_data()) {
3509 
3510  if (!reader->requests_.empty() && !reader->durable_data_.empty()) {
3511  const SequenceNumber dd_first = reader->durable_data_.begin()->first;
3512  const SequenceNumber dd_last = reader->durable_data_.rbegin()->first;
3513 
3514  if (reader->requests_.high() < dd_first) {
3515  gaps.insert(SequenceRange(reader->requests_.low(), dd_first.previous()));
3516  reader->requests_.reset();
3517  } else {
3518  const OPENDDS_VECTOR(SequenceRange) psr = reader->requests_.present_sequence_ranges();
3519  for (OPENDDS_VECTOR(SequenceRange)::const_iterator iter = psr.begin(), limit = psr.end();
3520  iter != limit && iter->first <= dd_last; ++iter) {
3521  for (SequenceNumber s = iter->first; s <= iter->second; ++s) {
3522  if (s <= dd_last) {
3523  const OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator dd_iter = reader->durable_data_.find(s);
3524  if (dd_iter != reader->durable_data_.end()) {
3525  link->durability_resend(dd_iter->second, cumulative_send_count);
3526  } else {
3527  gaps.insert(s);
3528  }
3529  reader->requests_.erase(s);
3530  }
3531  }
3532  }
3533  }
3534  }
3535 
3536  typedef RequestedFragSeqMap::const_iterator rfs_iter;
3537  for (rfs_iter rfs = reader->requested_frags_.begin(), limit = reader->requested_frags_.end(); rfs != limit; ++rfs) {
3538  const OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator dd_iter = reader->durable_data_.find(rfs->first);
3539  if (dd_iter != reader->durable_data_.end()) {
3540  for (RequestedFragMap::const_iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3541  link->durability_resend(dd_iter->second, rf->second, cumulative_send_count);
3542  }
3543  } else if ((!reader->durable_data_.empty() && rfs->first < reader->durable_data_.begin()->first)) {
3544  gaps.insert(rfs->first);
3545  }
3546  }
3547 
3548  gather_gaps_i(reader, gaps, meta_submessages);
3549 
3550  // The writer may not be done replaying durable data.
3551  // Do not send a gap.
3552  // TODO: If we have all of the durable data, adjust the request so that we can answer the non-durable part.
3553  reader->requests_.reset();
3554  reader->requested_frags_.clear();
3555  continue;
3556  }
3557 
3558  const SequenceNumber first_sn = std::max(non_durable_first_sn(proxy), reader->start_sn_);
3559  if (!reader->requests_.empty() &&
3560  reader->requests_.high() < first_sn) {
3561  // The reader is not going to get any data.
3562  // Send a gap that is going to to catch them up.
3563  gaps.insert(SequenceRange(reader->requests_.low(), first_sn.previous()));
3564  reader->requests_.reset();
3565  }
3566 
3567  const OPENDDS_VECTOR(SequenceRange) ranges = reader->requests_.present_sequence_ranges();
3568  for (OPENDDS_VECTOR(SequenceRange)::const_iterator iter = ranges.begin(), limit = ranges.end();
3569  iter != limit; ++iter) {
3570  for (SequenceNumber seq = iter->first; seq <= iter->second; ++seq) {
3571  GUID_t destination;
3572  if (proxy.contains(seq, destination)) {
3573  if (destination == GUID_UNKNOWN) {
3574  // Not directed.
3575  consolidated_requests.insert(seq);
3576  consolidated_request_readers[seq].insert(reader->id_);
3577  consolidated_recipients_unicast[seq].insert(addrs.begin(), addrs.end());
3578  ACE_Guard<ACE_Thread_Mutex> g(link->locators_lock_);
3579  link->accumulate_addresses(id_, reader->id_, consolidated_recipients_multicast[seq], false);
3580  continue;
3581  } else if (destination != reader->id_) {
3582  // Directed at another reader.
3583  gaps.insert(seq);
3584  continue;
3585  } else {
3586  // Directed at the reader.
3588  link->send_strategy()->override_destinations(addrs);
3589  proxy.resend_i(SequenceRange(seq, seq), 0, reader->id_);
3590  ++cumulative_send_count;
3591  continue;
3592  }
3593  } else if (proxy.pre_contains(seq) || seq > max_sn_) {
3594  // Can't answer, don't gap.
3595  continue;
3596  }
3597 
3598  if (durable_ || is_pvs_writer()) {
3599  // Must send directed gap.
3600  gaps.insert(seq);
3601  } else {
3602  // Non-directed gap.
3603  consolidated_gaps.insert(seq);
3604  }
3605  }
3606  }
3607 
3608  reader->requests_.reset();
3609 
3610  typedef RequestedFragSeqMap::iterator rfs_iter;
3611  const rfs_iter rfs_end = reader->requested_frags_.end();
3612  for (rfs_iter rfs = reader->requested_frags_.begin(); rfs != rfs_end; ++rfs) {
3613  const SequenceNumber& seq = rfs->first;
3614  GUID_t destination;
3615  if (proxy.contains(seq, destination)) {
3616  if (destination == GUID_UNKNOWN) {
3617  for (RequestedFragMap::iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3618  consolidated_fragment_requests[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
3619  rf->second.bitmap.get_buffer());
3620  }
3621  consolidated_fragment_request_readers[seq].insert(reader->id_);
3622  consolidated_fragment_recipients_unicast[seq].insert(addrs.begin(), addrs.end());
3623  ACE_Guard<ACE_Thread_Mutex> g(link->locators_lock_);
3624  link->accumulate_addresses(id_, reader->id_, consolidated_fragment_recipients_multicast[seq], false);
3625  continue;
3626  } else if (destination != reader->id_) {
3627  // Directed at another reader.
3628  gaps.insert(seq);
3629  continue;
3630  } else {
3631  // Directed at the reader.
3633  link->send_strategy()->override_destinations(addrs);
3634  for (RequestedFragMap::iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3635  DisjointSequence x;
3636  x.insert(rf->second.bitmapBase.value, rf->second.numBits,
3637  rf->second.bitmap.get_buffer());
3638  proxy.resend_fragments_i(seq, x, cumulative_send_count);
3639  }
3640  continue;
3641  }
3642  } else if (proxy.pre_contains(seq) || seq > max_sn_) {
3643  // Can't answer, don't gap.
3644  continue;
3645  }
3646 
3647  if (durable_ || is_pvs_writer()) {
3648  // Must send directed gap.
3649  gaps.insert(seq);
3650  } else {
3651  // Non-directed gap.
3652  consolidated_gaps.insert(seq);
3653  }
3654  }
3655 
3656  reader->requested_frags_.clear();
3657 
3658  // Gather directed gaps.
3659  gather_gaps_i(reader, gaps, meta_submessages);
3660  }
3661 
3662  {
3663  // Send the consolidated requests.
3664  const OPENDDS_VECTOR(SequenceRange) ranges = consolidated_requests.present_sequence_ranges();
3665  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = ranges.begin(), limit = ranges.end();
3666  pos != limit; ++pos) {
3667  if (Transport_debug_level > 5) {
3668  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: "
3669  "resend data %q-%q\n", pos->first.getValue(),
3670  pos->second.getValue()));
3671  }
3672  for (SequenceNumber seq = pos->first; seq <= pos->second; ++seq) {
3673  const AddrSet& uni = consolidated_recipients_unicast[seq];
3674  const AddrSet& multi = consolidated_recipients_multicast[seq];
3675  const RepoIdSet& readers = consolidated_request_readers[seq];
3676 
3677  if (proxy.has_frags(seq)) {
3678  if (consolidated_fragment_requests.find(seq) == consolidated_fragment_requests.end()) {
3679  consolidated_fragment_requests[seq].insert(1);
3680  }
3681  consolidated_fragment_recipients_unicast[seq].insert(uni.begin(), uni.end());
3682  consolidated_fragment_recipients_multicast[seq].insert(multi.begin(), multi.end());
3683  consolidated_fragment_request_readers[seq].insert(readers.begin(), readers.end());
3684  } else {
3686  link->send_strategy()->override_destinations(readers.size() * 2 > remote_readers_.size() ? multi : uni);
3687 
3688  proxy.resend_i(SequenceRange(seq, seq));
3689  ++cumulative_send_count;
3690  }
3691  }
3692  }
3693 
3694  for (FragmentInfo::const_iterator pos = consolidated_fragment_requests.begin(),
3695  limit = consolidated_fragment_requests.end(); pos != limit; ++pos) {
3696  const AddrSet& uni = consolidated_fragment_recipients_unicast[pos->first];
3697  const AddrSet& multi = consolidated_fragment_recipients_multicast[pos->first];
3698  const RepoIdSet& readers = consolidated_fragment_request_readers[pos->first];
3700  link->send_strategy()->override_destinations(readers.size() * 2 > remote_readers_.size() ? multi : uni);
3701 
3702  proxy.resend_fragments_i(pos->first, pos->second, cumulative_send_count);
3703  }
3704  }
3705 
3706  if (cumulative_send_count) {
3707  RtpsUdpInst_rch cfg = link->config();
3708  if (cfg && cfg->count_messages()) {
3709  ACE_GUARD(ACE_Thread_Mutex, g, link->transport_statistics_mutex_);
3710  link->transport_statistics_.writer_resend_count[id_] += static_cast<ACE_CDR::ULong>(cumulative_send_count);
3711  }
3712  }
3713 
3714  // Gather the consolidated gaps.
3715  if (!consolidated_gaps.empty()) {
3716  if (Transport_debug_level > 5) {
3717  ACE_DEBUG((LM_DEBUG,
3718  "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: GAPs:\n"));
3719  consolidated_gaps.dump();
3720  }
3721  gather_gaps_i(ReaderInfo_rch(), consolidated_gaps, meta_submessages);
3722  } else if (Transport_debug_level > 5) {
3723  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: "
3724  "no GAPs to send\n"));
3725  }
3726 
3727  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
3728  initialize_heartbeat(proxy, meta_submessage);
3729 
3730  // Directed, non-final.
3731  for (ReaderInfoSet::const_iterator pos = readers_expecting_data_.begin(), limit = readers_expecting_data_.end();
3732  pos != limit; ++pos) {
3733  const ReaderInfo_rch& reader = *pos;
3734  readers_expecting_heartbeat_.erase(reader);
3735  if (reader->reflects_heartbeat_count()) {
3736  meta_submessage.sm_.heartbeat_sm().smHeader.flags |= RTPS::OPENDDS_FLAG_R;
3737  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3738  reader->required_acknack_count_ = heartbeat_count_;
3739  meta_submessage.sm_.heartbeat_sm().smHeader.flags &= ~RTPS::OPENDDS_FLAG_R;
3740  } else {
3741  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3742  }
3743  }
3744  readers_expecting_data_.clear();
3745 
3746  // Directed, final.
3747  meta_submessage.sm_.heartbeat_sm().smHeader.flags |= RTPS::FLAG_F;
3748  meta_submessages.reserve(meta_submessages.size() + readers_expecting_heartbeat_.size());
3749  for (ReaderInfoSet::const_iterator pos = readers_expecting_heartbeat_.begin(), limit = readers_expecting_heartbeat_.end();
3750  pos != limit; ++pos) {
3751  const ReaderInfo_rch& reader = *pos;
3752  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3753  }
3755 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
GuidSet RepoIdSet
Definition: GuidUtils.h:113
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
void gather_gaps_i(const ReaderInfo_rch &reader, const DisjointSequence &gaps, MetaSubmessageVec &meta_submessages)
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
RcHandle< ReaderInfo > ReaderInfo_rch
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
TransportSendStrategy::LockType LockType
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
ACE_UINT32 ULong
const octet FLAG_F
Definition: RtpsCore.idl:520
ReaderInfoSet readers_expecting_heartbeat_
These readers have sent a non-final ack are are expecting a heartbeat.
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
const octet OPENDDS_FLAG_R
Definition: RtpsCore.idl:526
RcHandle< SingleSendBuffer > send_buff_
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ get_remote_reader_guids()

RcHandle<ConstSharedRepoIdSet> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::get_remote_reader_guids ( )
inline

Definition at line 570 of file RtpsUdpDataLink.h.

◆ get_send_buff()

RcHandle<SingleSendBuffer> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::get_send_buff ( )
inline

Definition at line 569 of file RtpsUdpDataLink.h.

569 { return send_buff_; }
RcHandle< SingleSendBuffer > send_buff_

◆ has_reader()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::has_reader ( const GUID_t id) const

Definition at line 2078 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_.

2079 {
2081  return remote_readers_.count(id) != 0;
2082 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ inc_heartbeat_count()

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::inc_heartbeat_count ( )

Definition at line 4477 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, heartbeat_count_, and mutex_.

4478 {
4480  return ++heartbeat_count_;
4481 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ initialize_heartbeat()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::initialize_heartbeat ( const SingleSendBuffer::Proxy proxy,
MetaSubmessage meta_submessage 
)
private

Definition at line 4155 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::MetaSubmessage::sm_, and OpenDDS::RTPS::to_rtps_seqnum().

4157 {
4158  using namespace OpenDDS::RTPS;
4159 
4160  // Assume no samples are available.
4161  const SequenceNumber nonDurableFirstSN = non_durable_first_sn(proxy);
4162  const SequenceNumber firstSN = durable_ ? 1 : nonDurableFirstSN;
4163 
4164  const HeartBeatSubmessage hb = {
4165  {HEARTBEAT,
4166  FLAG_E,
4167  HEARTBEAT_SZ},
4168  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4169  id_.entityId,
4170  to_rtps_seqnum(firstSN),
4172  {0}
4173  };
4174 
4175  meta_submessage.sm_.heartbeat_sm(hb);
4176 }
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
const octet FLAG_E
Definition: RtpsCore.idl:518
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ is_lagging()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_lagging ( const ReaderInfo_rch reader) const
private

Definition at line 3884 of file RtpsUdpDataLink.cpp.

3885 {
3886  return reader->acked_sn() != expected_max_sn(reader);
3887 }
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const

◆ is_leading() [1/2]

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_leading ( const ReaderInfo_rch reader) const
private

Definition at line 3890 of file RtpsUdpDataLink.cpp.

Referenced by is_leading().

3891 {
3892  return reader->acked_sn() == expected_max_sn(reader);
3893 }
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const

◆ is_leading() [2/2]

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_leading ( const GUID_t id) const

Definition at line 4512 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, is_leading(), mutex_, and remote_readers_.

4513 {
4515 
4516  ReaderInfoMap::const_iterator iter = remote_readers_.find(reader_id);
4517  if (iter != remote_readers_.end()) {
4518  return is_leading(iter->second);
4519  }
4520 
4521  return false;
4522 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
bool is_leading(const ReaderInfo_rch &reader) const

◆ is_pvs_writer()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_pvs_writer ( ) const
inlineprivate

Definition at line 495 of file RtpsUdpDataLink.h.

495 { return is_pvs_writer_; }
const bool is_pvs_writer_
Participant Volatile Secure writer.

◆ log_remote_counts()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::log_remote_counts ( const char *  funcname)
private

Definition at line 4816 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, id_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_remote_counts, preassociation_readers_, remote_readers_, and OpenDDS::DCPS::transport_debug.

4817 {
4819  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_remote_counts} "
4820  "RtpsUdpDataLink::RtpsWriter::%C: "
4821  "%C pre: %b assoc: %b\n",
4822  funcname, LogGuid(id_).c_str(),
4823  preassociation_readers_.size(), remote_readers_.size()));
4824  }
4825 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
bool log_remote_counts
Log number of associations and pending associations of RTPS entities.

◆ make_lagger_leader()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::make_lagger_leader ( const ReaderInfo_rch reader,
const SequenceNumber  previous_acked_sn 
)
private

Definition at line 3858 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_.

3860 {
3861  RtpsUdpDataLink_rch link = link_.lock();
3862  if (!link) {
3863  return;
3864  }
3865 
3866  const SequenceNumber acked_sn = reader->acked_sn();
3867  if (previous_acked_sn == acked_sn) { return; }
3868  const SequenceNumber previous_max_sn = expected_max_sn(reader);
3869 #ifdef OPENDDS_SECURITY
3870  if (is_pvs_writer_ && acked_sn > previous_max_sn) {
3871  reader->max_pvs_sn_ = acked_sn;
3872  }
3873 #endif
3874  const SequenceNumber max_sn = expected_max_sn(reader);
3875 
3876  snris_erase(previous_acked_sn == previous_max_sn ? leading_readers_ : lagging_readers_, previous_acked_sn, reader);
3877  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3878  if (acked_sn != max_sn) {
3879  heartbeat_->schedule(fallback_.get());
3880  }
3881 }
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
const bool is_pvs_writer_
Participant Volatile Secure writer.
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
FibonacciSequence< TimeDuration > fallback_
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
static void snris_insert(RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_

◆ make_leader_lagger()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::make_leader_lagger ( const GUID_t reader,
SequenceNumber  previous_max_sn 
)
private

Definition at line 3799 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_.

Referenced by update_max_sn().

3801 {
3802  ACE_UNUSED_ARG(reader_id);
3803 
3804  if (stopping_) {
3805  return;
3806  }
3807 
3808  RtpsUdpDataLink_rch link = link_.lock();
3809  if (!link) {
3810  return;
3811  }
3812 
3813 #ifdef OPENDDS_SECURITY
3814  if (!is_pvs_writer_) {
3815 #endif
3816  if (previous_max_sn != max_sn_) {
3817  // All readers that have acked previous_max_sn are now lagging.
3818  // Move leader_readers_[previous_max_sn] to lagging_readers_[previous_max_sn].
3819  SNRIS::iterator leading_pos = leading_readers_.find(previous_max_sn);
3820  SNRIS::iterator lagging_pos = lagging_readers_.find(previous_max_sn);
3821  if (leading_pos != leading_readers_.end()) {
3822  if (lagging_pos != lagging_readers_.end()) {
3823  lagging_pos->second->readers.insert(leading_pos->second->readers.begin(), leading_pos->second->readers.end());
3824  } else {
3825  lagging_readers_[previous_max_sn] = leading_pos->second;
3826  }
3827  leading_readers_.erase(leading_pos);
3828  heartbeat_->schedule(fallback_.get());
3829  }
3830  }
3831 #ifdef OPENDDS_SECURITY
3832  } else {
3833  // Move a specific reader.
3834  const ReaderInfoMap::iterator iter = remote_readers_.find(reader_id);
3835  if (iter == remote_readers_.end()) {
3836  return;
3837  }
3838 
3839  const ReaderInfo_rch& reader = iter->second;
3840  previous_max_sn = reader->max_pvs_sn_;
3841  reader->max_pvs_sn_ = max_sn_;
3842  if (preassociation_readers_.count(reader)) {
3843  // Will be inserted once association is complete.
3844  return;
3845  }
3846 
3847  const SequenceNumber acked_sn = reader->acked_sn();
3848  if (acked_sn == previous_max_sn && previous_max_sn != max_sn_) {
3849  snris_erase(leading_readers_, acked_sn, reader);
3850  snris_insert(lagging_readers_, reader);
3851  heartbeat_->schedule(fallback_.get());
3852  }
3853  }
3854 #endif
3855 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
const bool is_pvs_writer_
Participant Volatile Secure writer.
RcHandle< ReaderInfo > ReaderInfo_rch
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
FibonacciSequence< TimeDuration > fallback_
static void snris_insert(RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_

◆ max_data_seq()

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::max_data_seq ( const SingleSendBuffer::Proxy proxy,
const ReaderInfo_rch ri 
) const

Definition at line 4484 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::SingleSendBuffer::Proxy::empty(), OpenDDS::DCPS::SingleSendBuffer::Proxy::high(), OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_empty(), and OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_high().

4486 {
4487  const SequenceNumber durable_max = ri->durable_data_.empty() ? 0 : ri->durable_data_.rbegin()->first;
4488  const SequenceNumber pre_max = proxy.pre_empty() ? 0 : proxy.pre_high();
4489  const SequenceNumber data_max = proxy.empty() ? 0 : proxy.high();
4490  return std::max(durable_max, std::max(pre_max, data_max));
4491 }

◆ non_durable_first_sn()

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::non_durable_first_sn ( const SingleSendBuffer::Proxy proxy) const
inlineprivate

Definition at line 500 of file RtpsUdpDataLink.h.

References OpenDDS::DCPS::SingleSendBuffer::Proxy::empty(), OpenDDS::DCPS::SingleSendBuffer::Proxy::low(), OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_empty(), and OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_low().

501  {
502  if (!proxy.empty()) {
503  return proxy.low();
504  }
505  if (!proxy.pre_empty()) {
506  return proxy.pre_low();
507  }
508  return max_sn_ + 1;
509  }

◆ OPENDDS_MULTIMAP()

typedef OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::OPENDDS_MULTIMAP ( SequenceNumber  ,
TransportQueueElement  
)
private

◆ OPENDDS_MULTISET()

typedef OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::OPENDDS_MULTISET ( OpenDDS::DCPS::SequenceNumber  )
private

◆ OPENDDS_SET_CMP()

typedef OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::OPENDDS_SET_CMP ( TransportQueueElement ,
TransportQueueElement::OrderBySequenceNumber   
)
private

◆ pre_stop_helper()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::pre_stop_helper ( TqeVector &  to_drop,
bool  true_stop 
)

Definition at line 823 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, and OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET().

824 {
825  typedef SnToTqeMap::iterator iter_t;
826 
829 
830  stopping_ = true_stop;
831 
832  if (!elems_not_acked_.empty()) {
833  OPENDDS_SET(SequenceNumber) sns_to_release;
834  iter_t iter = elems_not_acked_.begin();
835  while (iter != elems_not_acked_.end()) {
836  to_drop.push_back(iter->second);
837  sns_to_release.insert(iter->first);
838  elems_not_acked_.erase(iter);
839  iter = elems_not_acked_.begin();
840  }
841  OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
842  while (sns_it != sns_to_release.end()) {
843  send_buff_->release_acked(*sns_it);
844  ++sns_it;
845  }
846  }
847 
848  send_buff_->pre_clear();
849 
850  g2.release();
851  g.release();
852 
853  if (stopping_) {
854  heartbeat_->cancel();
855  nack_response_->cancel();
856  }
857 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< SingleSendBuffer > send_buff_
typedef OPENDDS_SET(ReaderInfo_rch) ReaderInfoSet

◆ process_acked_by_all()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acked_by_all ( )

Definition at line 3953 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_.

3954 {
3955  TqeSet to_deliver;
3956  {
3958  acked_by_all_helper_i(to_deliver);
3959  }
3960 
3961  TqeSet::iterator deliver_iter = to_deliver.begin();
3962  while (deliver_iter != to_deliver.end()) {
3963  (*deliver_iter)->data_delivered();
3964  ++deliver_iter;
3965  }
3966 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ process_acknack()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acknack ( const RTPS::AckNackSubmessage acknack,
const GUID_t src,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 3155 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, OpenDDS::RTPS::SequenceNumberSet::bitmap, OpenDDS::RTPS::SequenceNumberSet::bitmapBase, OpenDDS::RTPS::bitmapNonEmpty(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::DCPS::SingleSendBuffer::Proxy::contains(), OpenDDS::RTPS::AckNackSubmessage::count, OpenDDS::DCPS::RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC, OpenDDS::DCPS::EventDispatcher::dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::event_dispatcher(), OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::TransportDebug::log_nonfinal_messages, OpenDDS::DCPS::log_progress(), OpenDDS::DCPS::TransportDebug::log_progress, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::log_remote_counts(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OpenDDS::RTPS::SequenceNumberSet::numBits, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP(), OPENDDS_MULTIMAP, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_contains(), OpenDDS::RTPS::AckNackSubmessage::readerSNState, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::AckNackSubmessage::smHeader, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_, OpenDDS::DCPS::RcHandle< T >::swap(), OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, OpenDDS::RTPS::Count_t::value, VDBG, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

3158 {
3160 
3161  if (stopping_) {
3162  return;
3163  }
3164 
3165  RtpsUdpDataLink_rch link = link_.lock();
3166 
3167  if (!link) {
3168  return;
3169  }
3170 
3171  const SequenceNumber ack = to_opendds_seqnum(acknack.readerSNState.bitmapBase);
3172 
3173  if (Transport_debug_level > 5) {
3174  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C base %q bits %u count %d\n",
3175  LogGuid(src).c_str(), LogGuid(id_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
3176  }
3177 
3178  ReaderInfoMap::iterator ri = remote_readers_.find(src);
3179  if (ri == remote_readers_.end()) {
3181  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C unknown remote reader\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3182  }
3183  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
3184  "WARNING ReaderInfo not found\n"));
3185  return;
3186  }
3187 
3188  const ReaderInfo_rch& reader = ri->second;
3189 
3190  SequenceNumber previous_acked_sn = reader->acked_sn();
3191  const bool count_is_not_zero = acknack.count.value != 0;
3192  const CORBA::Long previous_count = reader->acknack_recvd_count_;
3193  bool dont_schedule_nack_response = false;
3194 
3195  if (count_is_not_zero) {
3196  if (!compare_and_update_counts(acknack.count.value, reader->acknack_recvd_count_) &&
3197  (!reader->reflects_heartbeat_count() || acknack.count.value != 0 || reader->acknack_recvd_count_ != 0)) {
3199  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C stale/duplicate message\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3200  }
3201  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
3202  "WARNING Count indicates duplicate, dropping\n"));
3203  return;
3204  }
3205 
3206  if (reader->reflects_heartbeat_count()) {
3207  if (acknack.count.value < reader->required_acknack_count_) {
3209  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C stale message (reflect %d < %d)\n", LogGuid(src).c_str(), LogGuid(id_).c_str(), acknack.count.value, reader->required_acknack_count_));
3210  }
3211  dont_schedule_nack_response = true;
3212  }
3213  }
3214  }
3215 
3217 
3218  const bool is_final = acknack.smHeader.flags & RTPS::FLAG_F;
3219  const bool is_postassociation = count_is_not_zero && (is_final || bitmapNonEmpty(acknack.readerSNState) || ack != 1);
3220 
3221  if (preassociation_readers_.count(reader)) {
3222  if (is_postassociation) {
3225  log_progress("RTPS writer/reader association complete", id_, reader->id_, reader->participant_discovered_at_);
3226  }
3227  log_remote_counts("process_acknack");
3228 
3229  const SequenceNumber max_sn = expected_max_sn(reader);
3230  const SequenceNumber acked_sn = reader->acked_sn();
3231  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3233  // Heartbeat is already scheduled.
3234  }
3235  }
3236 
3237  OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
3238 
3239  if (!is_final && transport_debug.log_nonfinal_messages) {
3240  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C base %q bits %u count %d\n",
3241  LogGuid(src).c_str(), LogGuid(id_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
3242  }
3243 
3244  // Process the ack.
3245  bool inform_send_listener = false;
3247 
3248  if (ack >= reader->cur_cumulative_ack_) {
3249  reader->cur_cumulative_ack_ = ack;
3250  inform_send_listener = true;
3251  } else if (count_is_not_zero) {
3252  // Count increased but ack decreased. Reset.
3253  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING RtpsUdpDataLink::RtpsWriter::process_acknack: "
3254  "%C -> %C reset detected count %d > %d ack %q < %q\n",
3255  LogGuid(id_).c_str(), LogGuid(reader->id_).c_str(),
3256  acknack.count.value, previous_count, ack.getValue(), reader->cur_cumulative_ack_.getValue()));
3257  const SequenceNumber max_sn = expected_max_sn(reader);
3258  snris_erase(previous_acked_sn == max_sn ? leading_readers_ : lagging_readers_, previous_acked_sn, reader);
3259  reader->cur_cumulative_ack_ = ack;
3260  const SequenceNumber acked_sn = reader->acked_sn();
3261  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3262  previous_acked_sn = acked_sn;
3264  heartbeat_->schedule(fallback_.get());
3265 
3266  if (reader->durable_) {
3267  if (Transport_debug_level > 5) {
3268  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: enqueuing ReplayDurableData\n"));
3269  }
3270  reader->durable_data_.swap(pendingCallbacks);
3271  link->event_dispatcher()->dispatch(make_rch<ReplayDurableData>(link_, id_, src));
3272  reader->durable_timestamp_ = MonotonicTimePoint::zero_value;
3273  }
3274  }
3275 
3276  if (!reader->durable_data_.empty()) {
3277  if (Transport_debug_level > 5) {
3278  const LogGuid local_conv(id_), remote_conv(src);
3279  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3280  "local %C has durable for remote %C\n",
3281  local_conv.c_str(),
3282  remote_conv.c_str()));
3283  }
3284  const SequenceNumber& dd_last = reader->durable_data_.rbegin()->first;
3285  if (Transport_debug_level > 5) {
3286  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3287  "check base %q against last durable %q\n",
3288  ack.getValue(), dd_last.getValue()));
3289  }
3290  if (ack > dd_last) {
3292  log_progress("durable delivered", id_, reader->id_, reader->participant_discovered_at_);
3293  }
3294  // Reader acknowledges durable data, we no longer need to store it
3295  if (Transport_debug_level > 5) {
3296  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3297  "durable data acked\n"));
3298  }
3299  reader->durable_data_.swap(pendingCallbacks);
3300  } else {
3301  for (OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator pos = reader->durable_data_.begin(),
3302  limit = reader->durable_data_.end(); pos != limit && pos->first < ack;) {
3303  pendingCallbacks.insert(*pos);
3304  reader->durable_data_.erase(pos++);
3305  }
3306  }
3307  }
3308  } else {
3309  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C invalid acknack\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3310  }
3311 
3312  // Process the nack.
3313  bool schedule_nack_response = false;
3314  if (!dont_schedule_nack_response) {
3315  if (count_is_not_zero) {
3316  reader->requests_.reset();
3317  {
3318  const SingleSendBuffer::Proxy proxy(*send_buff_);
3319  if ((acknack.readerSNState.numBits == 0 ||
3320  (acknack.readerSNState.numBits == 1 && !(acknack.readerSNState.bitmap[0] & 1)))
3321  && ack == max_data_seq(proxy, reader)) {
3322  // Since there is an entry in requested_changes_, the DR must have
3323  // sent a non-final AckNack. If the base value is the high end of
3324  // the heartbeat range, treat it as a request for that seq#.
3325  if (reader->durable_data_.count(ack) || proxy.contains(ack) || proxy.pre_contains(ack)) {
3326  reader->requests_.insert(ack);
3327  }
3328  } else {
3329  reader->requests_.insert(ack, acknack.readerSNState.numBits, acknack.readerSNState.bitmap.get_buffer());
3330  }
3331 
3332  if (!reader->requests_.empty()) {
3333  readers_expecting_data_.insert(reader);
3334  schedule_nack_response = true;
3335  } else if (reader->requested_frags_.empty()) {
3336  readers_expecting_data_.erase(reader);
3337  }
3338  }
3339  }
3340 
3341  if (!is_final) {
3342  readers_expecting_heartbeat_.insert(reader);
3343  schedule_nack_response = true;
3344  }
3345  }
3346 
3347  if (preassociation_readers_.count(reader) == 0) {
3348  make_lagger_leader(reader, previous_acked_sn);
3350  }
3351 
3352  TqeSet to_deliver;
3353  acked_by_all_helper_i(to_deliver);
3354 
3355 #ifdef OPENDDS_SECURITY
3356  if (is_pvs_writer_ &&
3357  !reader->pvs_outstanding_.empty() &&
3358  reader->pvs_outstanding_.low() < reader->cur_cumulative_ack_) {
3359  const OPENDDS_VECTOR(SequenceRange) psr = reader->pvs_outstanding_.present_sequence_ranges();
3360  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end();
3361  pos != limit && pos->first < reader->cur_cumulative_ack_; ++pos) {
3363  for (SequenceNumber seq = pos->first; seq <= pos->second && seq < reader->cur_cumulative_ack_; ++seq) {
3364  reader->pvs_outstanding_.erase(seq);
3365  OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter = elems_not_acked_.find(seq);
3366  if (iter != elems_not_acked_.end()) {
3367  send_buff_->release_acked(iter->first);
3368  to_deliver.insert(iter->second);
3369  elems_not_acked_.erase(iter);
3370  }
3371  }
3372  }
3373  }
3374 #endif
3375 
3376  if (!dont_schedule_nack_response && schedule_nack_response) {
3377  RtpsUdpInst_rch cfg = link->config();
3378  nack_response_->schedule(cfg ? cfg->nak_response_delay_ : TimeDuration(0, RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC));
3379  }
3380 
3381  TransportClient_rch client = client_.lock();
3382 
3383  g.release();
3384 
3385  if (inform_send_listener && client) {
3386  client->data_acked(src);
3387  }
3388 
3389  typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
3390  for (iter_t it = pendingCallbacks.begin();
3391  it != pendingCallbacks.end(); ++it) {
3392  it->second->data_delivered();
3393  }
3394 
3395  TqeSet::iterator deliver_iter = to_deliver.begin();
3396  while (deliver_iter != to_deliver.end()) {
3397  (*deliver_iter)->data_delivered();
3398  ++deliver_iter;
3399  }
3400 }
#define ACE_DEBUG(X)
ACE_CDR::Long Long
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
static const suseconds_t DEFAULT_NAK_RESPONSE_DELAY_USEC
Definition: RtpsUdpInst.h:34
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
RcHandle< TransportClient > TransportClient_rch
WeakRcHandle< TransportClient > client_
RcHandle< T > lock() const
Definition: RcObject.h:188
bool log_dropped_messages
Log received RTPS messages that were dropped.
const bool is_pvs_writer_
Participant Volatile Secure writer.
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
bool bitmapNonEmpty(const SequenceNumberSet &snSet)
#define VDBG(DBG_ARGS)
RcHandle< ReaderInfo > ReaderInfo_rch
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
const octet FLAG_F
Definition: RtpsCore.idl:520
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
SequenceNumber max_data_seq(const SingleSendBuffer::Proxy &proxy, const ReaderInfo_rch &) const
ReaderInfoSet readers_expecting_heartbeat_
These readers have sent a non-final ack are are expecting a heartbeat.
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
void remove_preassociation_reader(const ReaderInfo_rch &reader)
FibonacciSequence< TimeDuration > fallback_
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
void make_lagger_leader(const ReaderInfo_rch &reader, const SequenceNumber previous_acked_sn)
static void snris_insert(RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement *) SnToTqeMap
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
std::pair< SequenceNumber, SequenceNumber > SequenceRange
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40
bool log_progress
Log progress for RTPS entity discovery and association.

◆ process_nackfrag()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_nackfrag ( const RTPS::NackFragSubmessage nackfrag,
const GUID_t src,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 3411 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::TransportDebug::log_nonfinal_messages, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OpenDDS::RTPS::FragmentNumberSet::numBits, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, OpenDDS::RTPS::Count_t::value, OpenDDS::RTPS::FragmentNumber_t::value, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

3414 {
3416 
3417  if (stopping_) {
3418  return;
3419  }
3420 
3421  RtpsUdpDataLink_rch link = link_.lock();
3422 
3423  if (!link) {
3424  return;
3425  }
3426 
3427  if (Transport_debug_level > 5) {
3428  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C\n",
3429  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3430  }
3431 
3432  const ReaderInfoMap::iterator ri = remote_readers_.find(src);
3433  if (ri == remote_readers_.end()) {
3435  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C unknown remote reader\n",
3436  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3437  }
3438  return;
3439  }
3440 
3441  const ReaderInfo_rch& reader = ri->second;
3442 
3443  if (!compare_and_update_counts(nackfrag.count.value, reader->nackfrag_recvd_count_)) {
3445  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C stale/duplicate message\n",
3446  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3447  }
3448  return;
3449  }
3450 
3451  const SequenceNumber seq = to_opendds_seqnum(nackfrag.writerSN);
3452 
3453  // All NackFrag messages are technically 'non-final' since they are only used to negatively acknowledge fragments and expect a response
3455  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C seq %q base %u bits %u\n",
3456  LogGuid(src).c_str(), LogGuid(id_).c_str(), seq.getValue(), nackfrag.fragmentNumberState.bitmapBase.value, nackfrag.fragmentNumberState.numBits));
3457  }
3458 
3459  reader->requested_frags_[seq][nackfrag.fragmentNumberState.bitmapBase.value] = nackfrag.fragmentNumberState;
3460  readers_expecting_data_.insert(reader);
3461  RtpsUdpInst_rch cfg = link->config();
3462  nack_response_->schedule(cfg ? cfg->nak_response_delay_ : TimeDuration(0, RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC));
3463 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
static const suseconds_t DEFAULT_NAK_RESPONSE_DELAY_USEC
Definition: RtpsUdpInst.h:34
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
bool log_dropped_messages
Log received RTPS messages that were dropped.
RcHandle< ReaderInfo > ReaderInfo_rch
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< RtpsUdpInst > RtpsUdpInst_rch

◆ reader_count()

size_t OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::reader_count ( ) const

Definition at line 2142 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_.

2143 {
2145  return remote_readers_.size();
2146 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ record_directed()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::record_directed ( const GUID_t reader,
SequenceNumber  seq 
)
private

Definition at line 3933 of file RtpsUdpDataLink.cpp.

3934 {
3935  ACE_UNUSED_ARG(reader_id);
3936  ACE_UNUSED_ARG(seq);
3937 #ifdef OPENDDS_SECURITY
3938  if (!is_pvs_writer_) {
3939  return;
3940  }
3941 
3942  const ReaderInfoMap::iterator iter = remote_readers_.find(reader_id);
3943  if (iter == remote_readers_.end()) {
3944  return;
3945  }
3946 
3947  const ReaderInfo_rch& reader = iter->second;
3948  reader->pvs_outstanding_.insert(seq);
3949 #endif
3950 }
const bool is_pvs_writer_
Participant Volatile Secure writer.
RcHandle< ReaderInfo > ReaderInfo_rch

◆ remove_all_msgs()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_all_msgs ( )

Definition at line 249 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::DataLink::id_, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::DataLink::strategy_lock_, and OpenDDS::DCPS::SequenceNumber::ZERO().

250 {
252 
253  RtpsUdpDataLink_rch link = link_.lock();
254  if (!link) {
255  return;
256  }
257 
258  send_buff_->retain_all(id_);
259 
260  {
263  GuardType guard(link->strategy_lock_);
264  if (link->send_strategy_) {
265  link->send_strategy_->remove_all_msgs(id_);
266  }
267  }
268 
270 
271  SnToTqeMap sn_tqe_map;
272  sn_tqe_map.swap(elems_not_acked_);
273 
274  g2.release();
275 
276  SequenceNumber prev = SequenceNumber::ZERO();
277  typedef SnToTqeMap::iterator iter_t;
278  for (iter_t it = sn_tqe_map.begin(); it != sn_tqe_map.end(); ++it) {
279  if (it->first != prev) {
280  send_buff_->release_acked(it->first);
281  prev = it->first;
282  }
283  }
284 
285  g.release();
286 
287  for (iter_t it = sn_tqe_map.begin(); it != sn_tqe_map.end(); ++it) {
288  it->second->data_dropped(true);
289  }
290 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
static SequenceNumber ZERO()
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_

◆ remove_preassociation_reader()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_preassociation_reader ( const ReaderInfo_rch reader)
inlineprivate

Definition at line 511 of file RtpsUdpDataLink.h.

References OPENDDS_ASSERT.

512  {
513  if (preassociation_readers_.erase(reader)) {
514  SequenceNumberMultiset::iterator pos = preassociation_reader_start_sns_.find(reader->start_sn_);
517  }
518  }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
SequenceNumberMultiset preassociation_reader_start_sns_

◆ remove_reader()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_reader ( const GUID_t id)

Definition at line 2085 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::log_remote_counts(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP(), OPENDDS_MULTIMAP, and OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR().

2086 {
2087  OPENDDS_MAP(SequenceNumber, TransportQueueElement*) dd;
2088  TqeSet to_drop;
2089 
2090  bool result = false;
2091  {
2093  ReaderInfoMap::iterator it = remote_readers_.find(id);
2094  if (it != remote_readers_.end()) {
2095  const ReaderInfo_rch& reader = it->second;
2096  reader->swap_durable_data(dd);
2098  const SequenceNumber acked_sn = reader->acked_sn();
2099  const SequenceNumber max_sn = expected_max_sn(reader);
2100  readers_expecting_data_.erase(reader);
2101  readers_expecting_heartbeat_.erase(reader);
2102  snris_erase(acked_sn == max_sn ? leading_readers_ : lagging_readers_, acked_sn, reader);
2104 
2105 #ifdef OPENDDS_SECURITY
2106  if (is_pvs_writer_ &&
2107  !reader->pvs_outstanding_.empty()) {
2108  const OPENDDS_VECTOR(SequenceRange) psr = reader->pvs_outstanding_.present_sequence_ranges();
2109  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end(); pos != limit; ++pos) {
2111  for (SequenceNumber seq = pos->first; seq <= pos->second; ++seq) {
2112  OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter = elems_not_acked_.find(seq);
2113  if (iter != elems_not_acked_.end()) {
2114  send_buff_->release_acked(iter->first);
2115  to_drop.insert(iter->second);
2116  elems_not_acked_.erase(iter);
2117  }
2118  }
2119  }
2120  }
2121 #endif
2122 
2123  remote_readers_.erase(it);
2124  update_remote_guids_cache_i(false, id);
2125  result = true;
2126  log_remote_counts("remove_reader");
2127  }
2128  }
2129  typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
2130  for (iter_t it = dd.begin(); it != dd.end(); ++it) {
2131  it->second->data_dropped();
2132  }
2133 
2134  for (TqeSet::iterator pos = to_drop.begin(), limit = to_drop.end(); pos != limit; ++pos) {
2135  (*pos)->data_dropped();
2136  }
2137 
2138  return result;
2139 }
void update_remote_guids_cache_i(bool add, const GUID_t &guid)
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
const bool is_pvs_writer_
Participant Volatile Secure writer.
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
RcHandle< ReaderInfo > ReaderInfo_rch
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
ReaderInfoSet readers_expecting_heartbeat_
These readers have sent a non-final ack are are expecting a heartbeat.
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
void remove_preassociation_reader(const ReaderInfo_rch &reader)
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement *) SnToTqeMap
RcHandle< SingleSendBuffer > send_buff_
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ remove_sample()

RemoveResult OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_sample ( const DataSampleElement sample)

Definition at line 182 of file RtpsUdpDataLink.cpp.

References ACE_Message_Block::cont(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::TransportQueueElement::MatchOnDataPayload::matches(), OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, ACE_Message_Block::rd_ptr(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::TransportSendStrategy::remove_sample(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::DataSampleHeader::sequence_, and OpenDDS::DCPS::DataLink::strategy_lock_.

183 {
184  bool found = false;
185  SequenceNumber to_release;
186  TransportQueueElement* tqe = 0;
187 
188  const SequenceNumber& seq = sample->get_header().sequence_;
189  const char* const payload = sample->get_sample()->cont()->rd_ptr();
190  const TransportQueueElement::MatchOnDataPayload modp(payload);
191  SingleSendBuffer::BufferVec removed;
192 
194 
195  RtpsUdpDataLink_rch link = link_.lock();
196  if (!link) {
197  return REMOVE_NOT_FOUND;
198  }
199 
201  {
202  GuardType guard(link->strategy_lock_);
203  if (link->send_strategy_) {
206  result = link->send_strategy_->remove_sample(sample);
207  guard.release();
208  }
209  }
210 
212 
213  if (!elems_not_acked_.empty()) {
214  typedef SnToTqeMap::iterator iter_t;
215  for (std::pair<iter_t, iter_t> er = elems_not_acked_.equal_range(seq); er.first != er.second; ++er.first) {
216  if (modp.matches(*er.first->second)) {
217  found = true;
218  to_release = seq;
219  tqe = er.first->second;
220  elems_not_acked_.erase(er.first);
221  break;
222  }
223  }
224  }
225 
226  g2.release();
227 
228  if (found) {
229  send_buff_->remove_acked(to_release, removed);
230  }
231 
232  g.release();
233 
234  if (found) {
235  for (size_t i = 0; i < removed.size(); ++i) {
236  RemoveAllVisitor visitor;
237  removed[i].first->accept_remove_visitor(visitor);
238  delete removed[i].first;
239  removed[i].second->release();
240  }
241  removed.clear();
242  tqe->data_dropped(true);
243  result = REMOVE_FOUND;
244  }
245  return result;
246 }
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_

◆ request_ack_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::request_ack_i ( const DataSampleHeader header,
ACE_Message_Block body,
MetaSubmessageVec &  meta_submessages 
)
private

Definition at line 1393 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataLink::id_, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_length_, ACE_Message_Block::rd_ptr(), and OpenDDS::DCPS::Transport_debug_level.

1396 {
1397  // Set the ReaderInfo::durable_timestamp_ for the case where no
1398  // durable samples exist in the DataWriter.
1399  GUID_t sub = GUID_UNKNOWN;
1400  if (body && header.message_length_ >= sizeof(sub)) {
1401  std::memcpy(&sub, body->rd_ptr(), sizeof(sub));
1402  }
1403  typedef ReaderInfoMap::iterator iter_t;
1404  if (sub == GUID_UNKNOWN) {
1405  gather_heartbeats_i(meta_submessages);
1406  if (Transport_debug_level > 3) {
1407  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::request_ack "
1408  "local %C all readers\n", LogGuid(id_).c_str()));
1409  }
1410  } else {
1411  iter_t iter = remote_readers_.find(sub);
1412  if (iter != remote_readers_.end()) {
1413  const SingleSendBuffer::Proxy proxy(*send_buff_);
1414  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
1415  initialize_heartbeat(proxy, meta_submessage);
1416  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, iter->second);
1417  if (Transport_debug_level > 3) {
1418  const LogGuid conv(id_), sub_conv(sub);
1419  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::request_ack"
1420  " local %C remote %C\n", conv.c_str(), sub_conv.c_str()));
1421  }
1422  }
1423  }
1424 }
#define ACE_DEBUG(X)
void gather_heartbeats_i(MetaSubmessageVec &meta_submessages)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
char * rd_ptr(void) const
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
RcHandle< SingleSendBuffer > send_buff_

◆ send_heartbeats()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_heartbeats ( const MonotonicTimePoint now)
private

Definition at line 1449 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, and OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages().

1450 {
1452 
1453  if (stopping_) {
1454  return;
1455  }
1456 
1457  RtpsUdpDataLink_rch link = link_.lock();
1458 
1459  if (!link) {
1460  return;
1461  }
1462 
1463  MetaSubmessageVec meta_submessages;
1464  gather_heartbeats_i(meta_submessages);
1465 
1466  if (!preassociation_readers_.empty() || !lagging_readers_.empty()) {
1467  heartbeat_->schedule(fallback_.get());
1468  fallback_.advance();
1469  } else {
1471  }
1472 
1473  g.release();
1474 
1475  link->queue_submessages(meta_submessages);
1476 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
void gather_heartbeats_i(MetaSubmessageVec &meta_submessages)
FibonacciSequence< TimeDuration > fallback_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_

◆ send_heartbeats_manual_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_heartbeats_manual_i ( MetaSubmessageVec &  meta_submessages)
private

Definition at line 4380 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::FLAG_L, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::DCPS::MetaSubmessage::sm_, and OpenDDS::RTPS::to_rtps_seqnum().

4381 {
4382  using namespace OpenDDS::RTPS;
4383 
4384  RtpsUdpDataLink_rch link = link_.lock();
4385  if (!link) {
4386  return;
4387  }
4388 
4389  const SingleSendBuffer::Proxy proxy(*send_buff_);
4390  const SequenceNumber firstSN = durable_ ? 1 : non_durable_first_sn(proxy);
4391  const SequenceNumber lastSN = max_sn_;
4392  const int counter = ++heartbeat_count_;
4393 
4394  const HeartBeatSubmessage hb = {
4395  {HEARTBEAT,
4397  HEARTBEAT_SZ},
4398  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4399  id_.entityId,
4400  to_rtps_seqnum(firstSN),
4401  to_rtps_seqnum(lastSN),
4402  {counter}
4403  };
4404 
4405  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4406  meta_submessage.sm_.heartbeat_sm(hb);
4407 
4408  meta_submessages.push_back(meta_submessage);
4409 }
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
const octet FLAG_E
Definition: RtpsCore.idl:518
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
const octet FLAG_L
Definition: RtpsCore.idl:524
const octet FLAG_F
Definition: RtpsCore.idl:520
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
ACE_CDR::Octet Octet
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_

◆ send_nack_responses()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_nack_responses ( const MonotonicTimePoint now)
private

Definition at line 1479 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, and OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages().

1480 {
1481  RtpsUdpDataLink_rch link = link_.lock();
1482  if (!link) {
1483  return;
1484  }
1485 
1486  MetaSubmessageVec meta_submessages;
1487  {
1489 
1490  if (stopping_) {
1491  return;
1492  }
1493 
1494  gather_nack_replies_i(meta_submessages);
1495  }
1496 
1497  link->queue_submessages(meta_submessages);
1498 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void gather_nack_replies_i(MetaSubmessageVec &meta_submessages)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_

◆ snris_erase()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::snris_erase ( RtpsUdpDataLink::SNRIS &  snris,
const SequenceNumber  sn,
const ReaderInfo_rch reader 
)
staticprivate

Definition at line 3785 of file RtpsUdpDataLink.cpp.

3788 {
3789  SNRIS::iterator pos = snris.find(sn);
3790  if (pos != snris.end()) {
3791  pos->second->readers.erase(reader);
3792  if (pos->second->readers.empty()) {
3793  snris.erase(pos);
3794  }
3795  }
3796 }

◆ snris_insert()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::snris_insert ( RtpsUdpDataLink::SNRIS &  snris,
const ReaderInfo_rch reader 
)
staticprivate

Definition at line 3773 of file RtpsUdpDataLink.cpp.

3775 {
3776  const SequenceNumber sn = reader->acked_sn();
3777  SNRIS::iterator pos = snris.find(sn);
3778  if (pos == snris.end()) {
3779  pos = snris.insert(RtpsUdpDataLink::SNRIS::value_type(sn, make_rch<ReaderInfoSetHolder>())).first;
3780  }
3781  pos->second->readers.insert(reader);
3782 }

◆ update_max_sn()

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::update_max_sn ( const GUID_t reader,
SequenceNumber  seq 
)

Definition at line 4494 of file RtpsUdpDataLink.cpp.

References check_leader_lagger(), make_leader_lagger(), max_sn_, and mutex_.

4495 {
4497  SequenceNumber previous_max_sn = max_sn_;
4498  max_sn_ = std::max(max_sn_, seq);
4499  make_leader_lagger(reader, previous_max_sn);
4501  return max_sn_;
4502 }
void make_leader_lagger(const GUID_t &reader, SequenceNumber previous_max_sn)

◆ update_remote_guids_cache_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::update_remote_guids_cache_i ( bool  add,
const GUID_t guid 
)
private

Definition at line 4204 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::bundling_cache_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::insert(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, and OpenDDS::DCPS::AddressCache< Key >::remove_id().

4205 {
4206  RtpsUdpDataLink_rch link = link_.lock();
4207  if (!link) {
4208  return;
4209  }
4210 
4211  {
4213 
4214  // We make a new RcHandle to prevent changing what's being pointed to by existing references in the send queue (i.e. to preserve historic values)
4215  RcHandle<ConstSharedRepoIdSet> temp = make_rch<ConstSharedRepoIdSet>();
4216  if (remote_reader_guids_) {
4217  const_cast<RepoIdSet&>(temp->guids_) = remote_reader_guids_->guids_;
4218  }
4219  if (add) {
4220  const_cast<RepoIdSet&>(temp->guids_).insert(guid);
4221  } else {
4222  const_cast<RepoIdSet&>(temp->guids_).erase(guid);
4223  }
4224  remote_reader_guids_ = temp;
4225  }
4226 
4227  link->bundling_cache_.remove_id(GUID_UNKNOWN);
4228 }
GuidSet RepoIdSet
Definition: GuidUtils.h:113
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< ConstSharedRepoIdSet > remote_reader_guids_

◆ update_required_acknack_count()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::update_required_acknack_count ( const GUID_t id,
CORBA::Long  current 
)

Definition at line 2731 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_.

2732 {
2734  ReaderInfoMap::iterator ri = remote_readers_.find(id);
2735  if (ri != remote_readers_.end()) {
2736  ri->second->required_acknack_count_ = current;
2737  }
2738 }

Member Data Documentation

◆ client_

WeakRcHandle<TransportClient> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::client_
private

Definition at line 446 of file RtpsUdpDataLink.h.

◆ durable_

const bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_
private

Definition at line 449 of file RtpsUdpDataLink.h.

◆ elems_not_acked_

SnToTqeMap OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_
private

Definition at line 445 of file RtpsUdpDataLink.h.

Referenced by add_elem_awaiting_ack(), and ~RtpsWriter().

◆ elems_not_acked_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_mutex_
mutableprivate

Definition at line 460 of file RtpsUdpDataLink.h.

Referenced by add_elem_awaiting_ack().

◆ fallback_

FibonacciSequence<TimeDuration> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::fallback_
private

Definition at line 466 of file RtpsUdpDataLink.h.

◆ heartbeat_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::heartbeat_
private

Definition at line 462 of file RtpsUdpDataLink.h.

◆ heartbeat_count_

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::heartbeat_count_
private

Definition at line 451 of file RtpsUdpDataLink.h.

Referenced by inc_heartbeat_count().

◆ id_

const GUID_t OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::id_
private

◆ initial_fallback_

const TimeDuration OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::initial_fallback_
private

Definition at line 465 of file RtpsUdpDataLink.h.

◆ is_ps_writer_

const bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_ps_writer_
private

Partcicipant Secure (Reliable SPDP) writer.

Definition at line 456 of file RtpsUdpDataLink.h.

◆ is_pvs_writer_

const bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_pvs_writer_
private

Participant Volatile Secure writer.

Definition at line 454 of file RtpsUdpDataLink.h.

◆ lagging_readers_

SNRIS OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::lagging_readers_
private

These readers have not acked everything they are supposed to have acked.

Definition at line 434 of file RtpsUdpDataLink.h.

◆ leading_readers_

SNRIS OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::leading_readers_
private

These reader have acked everything they are supposed to have acked.

Definition at line 436 of file RtpsUdpDataLink.h.

◆ link_

WeakRcHandle<RtpsUdpDataLink> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::link_
private

◆ max_sn_

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::max_sn_
private

Definition at line 442 of file RtpsUdpDataLink.h.

Referenced by update_max_sn().

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::mutex_
mutableprivate

◆ nack_response_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::nack_response_
private

Definition at line 463 of file RtpsUdpDataLink.h.

◆ preassociation_reader_start_sns_

SequenceNumberMultiset OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::preassociation_reader_start_sns_
private

Definition at line 431 of file RtpsUdpDataLink.h.

◆ preassociation_readers_

ReaderInfoSet OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::preassociation_readers_
private

Preassociation readers require a non-final heartbeat.

Definition at line 429 of file RtpsUdpDataLink.h.

Referenced by log_remote_counts().

◆ readers_expecting_data_

ReaderInfoSet OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::readers_expecting_data_
private

These readers have sent a nack and are expecting data.

Definition at line 438 of file RtpsUdpDataLink.h.

◆ readers_expecting_heartbeat_

ReaderInfoSet OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::readers_expecting_heartbeat_
private

These readers have sent a non-final ack are are expecting a heartbeat.

Definition at line 440 of file RtpsUdpDataLink.h.

◆ remote_reader_guids_

RcHandle<ConstSharedRepoIdSet> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_reader_guids_
private

Definition at line 427 of file RtpsUdpDataLink.h.

◆ remote_reader_guids_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_reader_guids_mutex_
mutableprivate

Definition at line 459 of file RtpsUdpDataLink.h.

◆ remote_readers_

ReaderInfoMap OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_
private

Definition at line 426 of file RtpsUdpDataLink.h.

Referenced by is_leading(), and log_remote_counts().

◆ send_buff_

RcHandle<SingleSendBuffer> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_
private

Definition at line 441 of file RtpsUdpDataLink.h.

Referenced by RtpsWriter().

◆ stopping_

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::stopping_
private

The documentation for this class was generated from the following files: