OpenDDS  Snapshot(2023/04/28-20:55)
Public Member Functions | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter Class Reference
Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter:
Collaboration graph
[legend]

Public Member Functions

 RtpsWriter (const TransportClient_rch &client, const RtpsUdpDataLink_rch &link, const GUID_t &id, bool durable, SequenceNumber max_sn, CORBA::Long heartbeat_count, size_t capacity)
 
virtual ~RtpsWriter ()
 
SequenceNumber max_data_seq (const SingleSendBuffer::Proxy &proxy, const ReaderInfo_rch &) const
 
SequenceNumber update_max_sn (const GUID_t &reader, SequenceNumber seq)
 
void add_elem_awaiting_ack (TransportQueueElement *element)
 
RemoveResult remove_sample (const DataSampleElement *sample)
 
void remove_all_msgs ()
 
bool add_reader (const ReaderInfo_rch &reader)
 
bool has_reader (const GUID_t &id) const
 
bool is_leading (const GUID_t &id) const
 
bool remove_reader (const GUID_t &id)
 
size_t reader_count () const
 
CORBA::Long inc_heartbeat_count ()
 
void pre_stop_helper (TqeVector &to_drop, bool true_stop)
 
TransportQueueElementcustomize_queue_element_helper (TransportQueueElement *element, bool requires_inline_qos, MetaSubmessageVec &meta_submessages, bool &deliver_after_send)
 
void process_acknack (const RTPS::AckNackSubmessage &acknack, const GUID_t &src, MetaSubmessageVec &meta_submessages)
 
void process_nackfrag (const RTPS::NackFragSubmessage &nackfrag, const GUID_t &src, MetaSubmessageVec &meta_submessages)
 
void process_acked_by_all ()
 
void gather_nack_replies_i (MetaSubmessageVec &meta_submessages)
 
void gather_heartbeats_i (MetaSubmessageVec &meta_submessages)
 
void gather_heartbeats (RcHandle< ConstSharedRepoIdSet > additional_guids, MetaSubmessageVec &meta_submessages)
 
void update_required_acknack_count (const GUID_t &id, CORBA::Long current)
 
RcHandle< SingleSendBufferget_send_buff ()
 
RcHandle< ConstSharedRepoIdSetget_remote_reader_guids ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Member Functions

typedef OPENDDS_MULTISET (OpenDDS::DCPS::SequenceNumber) SequenceNumberMultiset
 
typedef OPENDDS_SET_CMP (TransportQueueElement *, TransportQueueElement::OrderBySequenceNumber) TqeSet
 
typedef OPENDDS_MULTIMAP (SequenceNumber, TransportQueueElement *) SnToTqeMap
 
void send_heartbeats (const MonotonicTimePoint &now)
 
void send_nack_responses (const MonotonicTimePoint &now)
 
void add_gap_submsg_i (RTPS::SubmessageSeq &msg, SequenceNumber gap_start)
 
void end_historic_samples_i (const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
 
void request_ack_i (const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
 
void send_heartbeats_manual_i (MetaSubmessageVec &meta_submessages)
 
void gather_gaps_i (const ReaderInfo_rch &reader, const DisjointSequence &gaps, MetaSubmessageVec &meta_submessages)
 
void acked_by_all_helper_i (TqeSet &to_deliver)
 
SequenceNumber expected_max_sn (const ReaderInfo_rch &reader) const
 
void make_leader_lagger (const GUID_t &reader, SequenceNumber previous_max_sn)
 
void make_lagger_leader (const ReaderInfo_rch &reader, const SequenceNumber previous_acked_sn)
 
bool is_lagging (const ReaderInfo_rch &reader) const
 
bool is_leading (const ReaderInfo_rch &reader) const
 
void check_leader_lagger () const
 
void record_directed (const GUID_t &reader, SequenceNumber seq)
 
void update_remote_guids_cache_i (bool add, const GUID_t &guid)
 
bool is_pvs_writer () const
 
SequenceNumber non_durable_first_sn (const SingleSendBuffer::Proxy &proxy) const
 
void remove_preassociation_reader (const ReaderInfo_rch &reader)
 
void initialize_heartbeat (const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
 
void gather_directed_heartbeat_i (const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
 
void log_remote_counts (const char *funcname)
 

Static Private Member Functions

static void snris_insert (RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
 
static void snris_erase (RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
 

Private Attributes

ReaderInfoMap remote_readers_
 
RcHandle< ConstSharedRepoIdSetremote_reader_guids_
 
ReaderInfoSet preassociation_readers_
 Preassociation readers require a non-final heartbeat. More...
 
SequenceNumberMultiset preassociation_reader_start_sns_
 
SNRIS lagging_readers_
 
SNRIS leading_readers_
 These reader have acked everything they are supposed to have acked. More...
 
ReaderInfoSet readers_expecting_data_
 These readers have sent a nack and are expecting data. More...
 
ReaderInfoSet readers_expecting_heartbeat_
 These readers have sent a non-final ack are are expecting a heartbeat. More...
 
RcHandle< SingleSendBuffersend_buff_
 
SequenceNumber max_sn_
 
SnToTqeMap elems_not_acked_
 
WeakRcHandle< TransportClientclient_
 
WeakRcHandle< RtpsUdpDataLinklink_
 
const GUID_t id_
 
const bool durable_
 
bool stopping_
 
CORBA::Long heartbeat_count_
 
const bool is_pvs_writer_
 Participant Volatile Secure writer. More...
 
const bool is_ps_writer_
 Partcicipant Secure (Reliable SPDP) writer. More...
 
ACE_Thread_Mutex mutex_
 
ACE_Thread_Mutex remote_reader_guids_mutex_
 
ACE_Thread_Mutex elems_not_acked_mutex_
 
RcHandle< SporadicEventheartbeat_
 
RcHandle< SporadicEventnack_response_
 
const TimeDuration initial_fallback_
 
FibonacciSequence< TimeDurationfallback_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Detailed Description

Definition at line 424 of file RtpsUdpDataLink.h.

Constructor & Destructor Documentation

◆ RtpsWriter()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::RtpsWriter ( const TransportClient_rch client,
const RtpsUdpDataLink_rch link,
const GUID_t id,
bool  durable,
SequenceNumber  max_sn,
CORBA::Long  heartbeat_count,
size_t  capacity 
)

Definition at line 4446 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::in(), send_buff_, and OpenDDS::DCPS::RtpsUdpDataLink::send_strategy().

4449  : send_buff_(make_rch<SingleSendBuffer>(capacity, ONE_SAMPLE_PER_PACKET))
4451  , client_(client)
4452  , link_(link)
4453  , id_(id)
4454  , durable_(durable)
4455  , stopping_(false)
4456  , heartbeat_count_(heartbeat_count)
4457 #ifdef OPENDDS_SECURITY
4460 #endif
4461  , heartbeat_(make_rch<SporadicEvent>(link->event_dispatcher(), make_rch<PmfNowEvent<RtpsWriter> >(rchandle_from(this), &RtpsWriter::send_heartbeats)))
4462  , nack_response_(make_rch<SporadicEvent>(link->event_dispatcher(), make_rch<PmfNowEvent<RtpsWriter> >(rchandle_from(this), &RtpsWriter::send_nack_responses)))
4463  , initial_fallback_(link->config()->heartbeat_period_)
4465 {
4466  send_buff_->bind(link->send_strategy().in());
4467 }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
WeakRcHandle< RtpsUdpDataLink > link_
void send_heartbeats(const MonotonicTimePoint &now)
FibonacciSequence< TimeDuration > fallback_
WeakRcHandle< TransportClient > client_
void send_nack_responses(const MonotonicTimePoint &now)
const bool is_ps_writer_
Partcicipant Secure (Reliable SPDP) writer.
RcHandle< SingleSendBuffer > send_buff_
const size_t ONE_SAMPLE_PER_PACKET
static SequenceNumber ZERO()
const bool is_pvs_writer_
Participant Volatile Secure writer.
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
Definition: MessageTypes.h:85
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
const EntityId_t ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER
Definition: MessageTypes.h:87
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59

◆ ~RtpsWriter()

OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::~RtpsWriter ( )
virtual

Definition at line 4469 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_TEXT(), elems_not_acked_, and LM_WARNING.

4470 {
4471  if (!elems_not_acked_.empty()) {
4472  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: RtpsWriter::~RtpsWriter - ")
4473  ACE_TEXT("deleting with %d elements left not fully acknowledged\n"),
4474  elems_not_acked_.size()));
4475  }
4476 }
#define ACE_DEBUG(X)
ACE_TEXT("TCP_Factory")

Member Function Documentation

◆ acked_by_all_helper_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::acked_by_all_helper_i ( TqeSet &  to_deliver)
private

Definition at line 3970 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_MULTIMAP, and OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR().

3971 {
3972  using namespace OpenDDS::RTPS;
3973  typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
3974  OPENDDS_VECTOR(GUID_t) to_check;
3975 
3976  RtpsUdpDataLink_rch link = link_.lock();
3977 
3978  if (!link) {
3979  return;
3980  }
3981 
3982  //start with the max sequence number writer knows about and decrease
3983  //by what the min over all readers is
3984  SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
3985  if (!preassociation_readers_.empty()) {
3986  all_readers_ack = std::min(all_readers_ack, *preassociation_reader_start_sns_.begin());
3987  }
3988  if (!lagging_readers_.empty()) {
3989  all_readers_ack = std::min(all_readers_ack, lagging_readers_.begin()->first + 1);
3990  }
3991  if (!leading_readers_.empty()) {
3992  // When is_pvs_writer_ is true, the leading_readers_ will all be
3993  // at different sequence numbers. The minimum could actually be
3994  // before the preassociation readers or lagging readers. Use the
3995  // largest sequence number to avoid holding onto samples.
3996  all_readers_ack = std::min(all_readers_ack, leading_readers_.rbegin()->first + 1);
3997  }
3998 
3999  if (all_readers_ack == SequenceNumber::MAX_VALUE) {
4000  return;
4001  }
4002 
4004 
4005  if (!elems_not_acked_.empty()) {
4006  for (iter_t it = elems_not_acked_.begin(), limit = elems_not_acked_.end();
4007  it != limit && it->first < all_readers_ack;) {
4008  send_buff_->release_acked(it->first);
4009  to_deliver.insert(it->second);
4010  elems_not_acked_.erase(it++);
4011  }
4012  }
4013 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement *) SnToTqeMap
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
RcHandle< SingleSendBuffer > send_buff_
SequenceNumberMultiset preassociation_reader_start_sns_
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
static const Value MAX_VALUE

◆ add_elem_awaiting_ack()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_elem_awaiting_ack ( TransportQueueElement element)

Definition at line 4506 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, elems_not_acked_, elems_not_acked_mutex_, and OpenDDS::DCPS::TransportQueueElement::sequence().

4507 {
4509  elems_not_acked_.insert(SnToTqeMap::value_type(element->sequence(), element));
4510 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ add_gap_submsg_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_gap_submsg_i ( RTPS::SubmessageSeq msg,
SequenceNumber  gap_start 
)
private

Definition at line 1502 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::DCPS::grow(), OpenDDS::DCPS::DataLink::id_, OpenDDS::DCPS::Encoding::KIND_XCDR1, OPENDDS_ASSERT, OpenDDS::DCPS::serialized_size(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::GapSubmessage::smHeader, OpenDDS::RTPS::SubmessageHeader::submessageLength, and OpenDDS::RTPS::to_rtps_seqnum().

1504 {
1505  // These are the GAP submessages that we'll send directly in-line with the
1506  // DATA when we notice that the DataWriter has deliberately skipped seq #s.
1507  // There are other GAP submessages generated in meta_submessage to reader ACKNACKS,
1508  // see send_nack_replies().
1509  using namespace OpenDDS::RTPS;
1510 
1511  const LongSeq8 bitmap;
1512 
1513  // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
1514  // [gapStart, gapListBase) and those in the SNSet.
1515  GapSubmessage gap = {
1516  {GAP, FLAG_E, 0 /*length determined below*/},
1517  ENTITYID_UNKNOWN, // readerId: applies to all matched readers
1518  id_.entityId,
1519  to_rtps_seqnum(gap_start),
1520  {to_rtps_seqnum(max_sn_), 0, bitmap}
1521  };
1522  OPENDDS_ASSERT(gap_start < max_sn_);
1523 
1524  const size_t size = serialized_size(Encoding(Encoding::KIND_XCDR1), gap);
1526  static_cast<CORBA::UShort>(size) - SMHDR_SZ;
1527 
1528  const CORBA::ULong idx = grow(msg) - 1;
1529  msg[idx].gap_sm(gap);
1530 }
const octet FLAG_E
Definition: RtpsCore.idl:521
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
ACE_CDR::ULong ULong
ACE_CDR::UShort UShort
Seq::size_type grow(Seq &seq)
Definition: Util.h:151
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
SubmessageHeader smHeader
Definition: RtpsCore.idl:574

◆ add_reader()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_reader ( const ReaderInfo_rch reader)

Definition at line 2033 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::log_remote_counts(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OPENDDS_ASSERT, OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages(), and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_.

2034 {
2036  OPENDDS_ASSERT(!reader->durable_ || durable_);
2037 
2038  if (stopping_) {
2039  return false;
2040  }
2041 
2042  ReaderInfoMap::const_iterator iter = remote_readers_.find(reader->id_);
2043  if (iter == remote_readers_.end()) {
2044 #ifdef OPENDDS_SECURITY
2045  if (is_pvs_writer_) {
2046  reader->max_pvs_sn_ = max_sn_;
2047  }
2048 #endif
2049  remote_readers_.insert(ReaderInfoMap::value_type(reader->id_, reader));
2050  update_remote_guids_cache_i(true, reader->id_);
2051  preassociation_readers_.insert(reader);
2052  preassociation_reader_start_sns_.insert(reader->start_sn_);
2053  log_remote_counts("add_reader");
2054 
2055  RtpsUdpDataLink_rch link = link_.lock();
2056  if (!link) {
2057  return false;
2058  }
2059 
2061  heartbeat_->schedule(fallback_.get());
2062  // Durable readers will get their heartbeat from end historic samples.
2063  if (!reader->durable_) {
2064  MetaSubmessageVec meta_submessages;
2065  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
2066  const SingleSendBuffer::Proxy proxy(*send_buff_);
2067  initialize_heartbeat(proxy, meta_submessage);
2068  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
2069  g.release();
2070  link->queue_submessages(meta_submessages);
2071  }
2072 
2073  return true;
2074  }
2075  return false;
2076 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
WeakRcHandle< RtpsUdpDataLink > link_
FibonacciSequence< TimeDuration > fallback_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
RcHandle< SingleSendBuffer > send_buff_
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
SequenceNumberMultiset preassociation_reader_start_sns_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const bool is_pvs_writer_
Participant Volatile Secure writer.
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
void update_remote_guids_cache_i(bool add, const GUID_t &guid)

◆ check_leader_lagger()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::check_leader_lagger ( ) const
private

Definition at line 3897 of file RtpsUdpDataLink.cpp.

References OPENDDS_ASSERT, OpenDDS::DCPS::SequenceNumber::previous(), and OpenDDS::DCPS::SequenceNumber::ZERO().

Referenced by update_max_sn().

3898 {
3899 #ifndef OPENDDS_SAFETY_PROFILE
3900 #ifndef NDEBUG
3901  static const SequenceNumber negative_one = SequenceNumber::ZERO().previous();
3902  for (SNRIS::const_iterator pos1 = lagging_readers_.begin(), limit = lagging_readers_.end();
3903  pos1 != limit; ++pos1) {
3904  const SequenceNumber& sn = pos1->first;
3905  const ReaderInfoSetHolder_rch& readers = pos1->second;
3906  for (ReaderInfoSet::const_iterator pos2 = readers->readers.begin(), limit = readers->readers.end();
3907  pos2 != limit; ++pos2) {
3908  const ReaderInfo_rch& reader = *pos2;
3909  OPENDDS_ASSERT(reader->acked_sn() == sn);
3910  const SequenceNumber expect_max_sn = expected_max_sn(reader);
3911  OPENDDS_ASSERT(sn == negative_one || sn < expect_max_sn);
3912  OPENDDS_ASSERT(preassociation_readers_.count(reader) == 0);
3913  }
3914  }
3915 
3916  for (SNRIS::const_iterator pos1 = leading_readers_.begin(), limit = leading_readers_.end();
3917  pos1 != limit; ++pos1) {
3918  const SequenceNumber& sn = pos1->first;
3919  const ReaderInfoSetHolder_rch& readers = pos1->second;
3920  for (ReaderInfoSet::const_iterator pos2 = readers->readers.begin(), limit = readers->readers.end();
3921  pos2 != limit; ++pos2) {
3922  const ReaderInfo_rch& reader = *pos2;
3923  OPENDDS_ASSERT(reader->acked_sn() == sn);
3924  const SequenceNumber expect_max_sn = expected_max_sn(reader);
3925  OPENDDS_ASSERT(sn == expect_max_sn);
3926  OPENDDS_ASSERT(preassociation_readers_.count(reader) == 0);
3927  }
3928  }
3929 #endif
3930 #endif
3931 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
SequenceNumber previous() const
RcHandle< ReaderInfo > ReaderInfo_rch
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
RcHandle< ReaderInfoSetHolder > ReaderInfoSetHolder_rch
static SequenceNumber ZERO()
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.

◆ customize_queue_element_helper()

TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper ( TransportQueueElement element,
bool  requires_inline_qos,
MetaSubmessageVec &  meta_submessages,
bool &  deliver_after_send 
)

Definition at line 1078 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, OpenDDS::DCPS::LogGuid::c_str(), ACE_Message_Block::cont(), OpenDDS::DCPS::RtpsSampleHeader::control_message_supported(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, ACE_Message_Block::duplicate(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::TransportSendControlElement::header(), OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataLink::id_, OpenDDS::DCPS::TransportQueueElement::is_fragment(), OpenDDS::DCPS::TransportQueueElement::is_last_fragment(), LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_messages, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, OPENDDS_ASSERT, OpenDDS::DCPS::TransportCustomizedElement::original_send_element(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::REQUEST_ACK, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::TransportSendElement::sample(), OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual_i(), OpenDDS::DCPS::RtpsUdpDataLink::send_strategy(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::TransportCustomizedElement::sequence(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataLink::strategy_lock_, OpenDDS::DCPS::RtpsUdpDataLink::submsgs_to_msgblock(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::transport_debug, and OpenDDS::DCPS::Transport_debug_level.

1083 {
1085 
1086  RtpsUdpDataLink_rch link = link_.lock();
1087  if (stopping_ || !link) {
1088  g.release();
1089  element->data_dropped(true);
1090  return 0;
1091  }
1092 
1093  OPENDDS_ASSERT(element->publication_id() == id_);
1094 
1095  const SequenceNumber previous_max_sn = max_sn_;
1096  RTPS::SubmessageSeq subm;
1097 
1098  const SequenceNumber seq = element->sequence();
1100  if (!element->is_fragment() || element->is_last_fragment()) {
1101  max_sn_ = std::max(max_sn_, seq);
1102  }
1103  if (!durable_ && !is_pvs_writer() &&
1104  element->subscription_id() == GUID_UNKNOWN &&
1105  previous_max_sn < max_sn_.previous()) {
1106  add_gap_submsg_i(subm, previous_max_sn + 1);
1107  }
1108  }
1109 
1110  make_leader_lagger(element->subscription_id(), previous_max_sn);
1112 
1113  TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
1114  TransportCustomizedElement* tce =
1115  dynamic_cast<TransportCustomizedElement*>(element);
1116  TransportSendControlElement* tsce =
1117  dynamic_cast<TransportSendControlElement*>(element);
1118 
1119  Message_Block_Ptr data;
1120  bool durable = false;
1121 
1122  const ACE_Message_Block* msg = element->msg();
1123  const GUID_t pub_id = element->publication_id();
1124 
1125  // Based on the type of 'element', find and duplicate the data payload
1126  // continuation block.
1127  if (tsce) { // Control message
1128  if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
1129  data.reset(msg->cont()->duplicate());
1130  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1132  subm, *tsce, requires_inline_qos);
1133  record_directed(element->subscription_id(), seq);
1134  } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
1135  end_historic_samples_i(tsce->header(), msg->cont(), meta_submessages);
1136  g.release();
1137  element->data_delivered();
1138  return 0;
1139  } else if (tsce->header().message_id_ == REQUEST_ACK) {
1140  request_ack_i(tsce->header(), msg->cont(), meta_submessages);
1141  deliver_after_send = true;
1142  return 0;
1143  } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
1144  send_heartbeats_manual_i(meta_submessages);
1145  deliver_after_send = true;
1146  return 0;
1147  } else {
1148  g.release();
1149  element->data_dropped(true /*dropped_by_transport*/);
1150  return 0;
1151  }
1152 
1153  } else if (tse) { // Basic data message
1154  // {DataSampleHeader} -> {Data Payload}
1155  data.reset(msg->cont()->duplicate());
1156  const DataSampleElement* dsle = tse->sample();
1157  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1159  subm, *dsle, requires_inline_qos);
1160  record_directed(element->subscription_id(), seq);
1161  durable = dsle->get_header().historic_sample_;
1162 
1163  } else if (tce) { // Customized data message
1164  // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
1165  data.reset(msg->cont()->cont()->duplicate());
1166  const DataSampleElement* dsle = tce->original_send_element()->sample();
1167  // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
1169  subm, *dsle, requires_inline_qos);
1170  record_directed(element->subscription_id(), seq);
1171  durable = dsle->get_header().historic_sample_;
1172 
1173  } else {
1174  send_buff_->pre_insert(seq);
1175  return element;
1176  }
1177 
1178 #ifdef OPENDDS_SECURITY
1179  {
1180  GuardType guard(link->strategy_lock_);
1181  if (link->send_strategy_) {
1182  link->send_strategy()->encode_payload(pub_id, data, subm);
1183  }
1184  }
1185 #endif
1186 
1187  if (stopping_) {
1188  g.release();
1189  element->data_dropped(true);
1190  return 0;
1191  }
1192 
1194  link->send_strategy()->append_submessages(subm);
1195  }
1196 
1197  Message_Block_Ptr hdr(link->submsgs_to_msgblock(subm));
1198  hdr->cont(data.release());
1199  RtpsCustomizedElement* rtps =
1200  new RtpsCustomizedElement(element, move(hdr));
1201 
1202  // Handle durability resends
1203  if (durable) {
1204  const GUID_t sub = element->subscription_id();
1205  if (sub != GUID_UNKNOWN) {
1206  ReaderInfoMap::iterator ri = remote_readers_.find(sub);
1207  if (ri != remote_readers_.end()) {
1208  ri->second->durable_data_[rtps->sequence()] = rtps;
1209  ri->second->durable_timestamp_.set_to_now();
1210  if (Transport_debug_level > 3) {
1211  const LogGuid conv(pub_id), sub_conv(sub);
1212  ACE_DEBUG((LM_DEBUG,
1213  "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
1214  "storing durable data for local %C remote %C seq %q\n",
1215  conv.c_str(), sub_conv.c_str(),
1216  rtps->sequence().getValue()));
1217  }
1218  return 0;
1219  }
1220  }
1221  }
1222 
1223  send_buff_->pre_insert(seq);
1224  return rtps;
1225 }
sequence< Submessage > SubmessageSeq
Definition: RtpsCore.idl:885
#define ACE_DEBUG(X)
static void populate_data_control_submessages(RTPS::SubmessageSeq &subm, const TransportSendControlElement &tsce, bool requires_inline_qos)
WeakRcHandle< RtpsUdpDataLink > link_
bool requires_inline_qos(const GUIDSeq_var &peers)
SequenceNumber previous() const
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void reset(void)
static void populate_data_sample_submessages(RTPS::SubmessageSeq &subm, const DataSampleElement &dsle, bool requires_inline_qos)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
void end_historic_samples_i(const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
static bool control_message_supported(char message_id)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
RcHandle< SingleSendBuffer > send_buff_
void send_heartbeats_manual_i(MetaSubmessageVec &meta_submessages)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
void make_leader_lagger(const GUID_t &reader, SequenceNumber previous_max_sn)
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
void add_gap_submsg_i(RTPS::SubmessageSeq &msg, SequenceNumber gap_start)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
bool log_messages
Log all RTPS messages sent or recieved.
void request_ack_i(const DataSampleHeader &header, ACE_Message_Block *body, MetaSubmessageVec &meta_submessages)
void record_directed(const GUID_t &reader, SequenceNumber seq)

◆ end_historic_samples_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::end_historic_samples_i ( const DataSampleHeader header,
ACE_Message_Block body,
MetaSubmessageVec &  meta_submessages 
)
private

Definition at line 1344 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataLink::id_, LM_DEBUG, OpenDDS::DCPS::log_progress(), OpenDDS::DCPS::TransportDebug::log_progress, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::transport_debug, and OpenDDS::DCPS::Transport_debug_level.

1347 {
1348  // Set the ReaderInfo::durable_timestamp_ for the case where no
1349  // durable samples exist in the DataWriter.
1350  if (durable_) {
1352  GUID_t sub = GUID_UNKNOWN;
1353  if (body && header.message_length_ >= sizeof(sub)) {
1354  std::memcpy(&sub, body->rd_ptr(), sizeof(sub));
1355  }
1356  typedef ReaderInfoMap::iterator iter_t;
1357  if (sub == GUID_UNKNOWN) {
1358  if (Transport_debug_level > 3) {
1359  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
1360  "local %C all readers\n", LogGuid(id_).c_str()));
1361  }
1362  for (iter_t iter = remote_readers_.begin();
1363  iter != remote_readers_.end(); ++iter) {
1364  if (iter->second->durable_) {
1365  iter->second->durable_timestamp_ = now;
1367  log_progress("durable data queued", id_, iter->first, iter->second->participant_discovered_at_);
1368  }
1369  }
1370  }
1371  } else {
1372  iter_t iter = remote_readers_.find(sub);
1373  if (iter != remote_readers_.end()) {
1374  if (iter->second->durable_) {
1375  iter->second->durable_timestamp_ = now;
1377  log_progress("durable data queued", id_, iter->first, iter->second->participant_discovered_at_);
1378  }
1379  const SingleSendBuffer::Proxy proxy(*send_buff_);
1380  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
1381  initialize_heartbeat(proxy, meta_submessage);
1382  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, iter->second);
1383  if (Transport_debug_level > 3) {
1384  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
1385  " local %C remote %C\n", LogGuid(id_).c_str(), LogGuid(sub).c_str()));
1386  }
1387  }
1388  }
1389  }
1390  }
1391 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
char * rd_ptr(void) const
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
RcHandle< SingleSendBuffer > send_buff_
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
bool log_progress
Log progress for RTPS entity discovery and association.

◆ expected_max_sn()

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::expected_max_sn ( const ReaderInfo_rch reader) const
private

Definition at line 3759 of file RtpsUdpDataLink.cpp.

3760 {
3761  ACE_UNUSED_ARG(reader);
3762 #ifdef OPENDDS_SECURITY
3763  if (is_pvs_writer_) {
3764  return reader->max_pvs_sn_;
3765  } else {
3766 #endif
3767  return max_sn_;
3768 #ifdef OPENDDS_SECURITY
3769  }
3770 #endif
3771 }
const bool is_pvs_writer_
Participant Volatile Secure writer.

◆ gather_directed_heartbeat_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_directed_heartbeat_i ( const SingleSendBuffer::Proxy proxy,
MetaSubmessageVec &  meta_submessages,
MetaSubmessage meta_submessage,
const ReaderInfo_rch reader 
)
private

Definition at line 4180 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::MetaSubmessage::dst_guid_, OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HeartBeatSubmessage::lastSN, OPENDDS_ASSERT, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::RTPS::HeartBeatSubmessage::readerId, OpenDDS::DCPS::MetaSubmessage::reset_destination(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::to_rtps_seqnum(), and OpenDDS::RTPS::Count_t::value.

4184 {
4185  const SequenceNumber first_sn = reader->durable_ ? 1 : std::max(non_durable_first_sn(proxy), reader->start_sn_);
4186  SequenceNumber last_sn = expected_max_sn(reader);
4187 #ifdef OPENDDS_SECURITY
4188  if (is_pvs_writer_ && last_sn < first_sn.previous()) {
4189  // This can happen if the reader get's reset.
4190  // Adjust the heartbeat to be valid.
4191  // Make lagger_leader will eventually correct the problem.
4192  last_sn = first_sn.previous();
4193  }
4194 #endif
4195  meta_submessage.dst_guid_ = reader->id_;
4196  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4197  meta_submessage.sm_.heartbeat_sm().readerId = reader->id_.entityId;
4198  meta_submessage.sm_.heartbeat_sm().firstSN = to_rtps_seqnum(first_sn);
4199  meta_submessage.sm_.heartbeat_sm().lastSN = to_rtps_seqnum(last_sn);
4200  OPENDDS_ASSERT(!(first_sn < 1 || last_sn < 0 || last_sn < first_sn.previous()));
4201  meta_submessages.push_back(meta_submessage);
4202  meta_submessage.reset_destination();
4203 }
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
const bool is_pvs_writer_
Participant Volatile Secure writer.
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139

◆ gather_gaps_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_gaps_i ( const ReaderInfo_rch reader,
const DisjointSequence gaps,
MetaSubmessageVec &  meta_submessages 
)
private

Definition at line 3100 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DisjointSequence::bitmap_num_longs(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::RTPS::Submessage::gap_sm, OpenDDS::RTPS::GapSubmessage::gapList, OpenDDS::RTPS::GapSubmessage::gapStart, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_ASSERT, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::DCPS::DisjointSequence::to_bitmap(), OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::RTPS::to_rtps_seqnum(), and OpenDDS::DCPS::Transport_debug_level.

3103 {
3104  using namespace RTPS;
3105 
3106  OPENDDS_ASSERT(reader || !durable_);
3107 
3108  if (gaps.empty()) {
3109  return;
3110  }
3111 
3112  // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
3113  // [gapStart, gapListBase) and those in the SNSet.
3114  const SequenceNumber firstMissing = gaps.low(),
3115  base = ++SequenceNumber(gaps.cumulative_ack());
3116  const SequenceNumber_t gapStart = to_rtps_seqnum(firstMissing);
3117  const SequenceNumber_t gapListBase = to_rtps_seqnum(base);
3118  CORBA::ULong num_bits = 0;
3119  LongSeq8 bitmap;
3120 
3121  if (gaps.disjoint()) {
3122  bitmap.length(DisjointSequence::bitmap_num_longs(base, gaps.high()));
3123  if (bitmap.length() > 0) {
3124  ACE_CDR::ULong cumulative_bits_added = 0;
3125  (void)gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits, cumulative_bits_added);
3126  }
3127  }
3128 
3129  MetaSubmessage meta_submessage(id_, reader ? reader->id_ : GUID_UNKNOWN);
3130  GapSubmessage gap = {
3131  {GAP, FLAG_E, 0 /*length determined later*/},
3132  reader ? reader->id_.entityId : ENTITYID_UNKNOWN,
3133  id_.entityId,
3134  gapStart,
3135  {gapListBase, num_bits, bitmap}
3136  };
3137  OPENDDS_ASSERT(firstMissing < base);
3138  meta_submessage.sm_.gap_sm(gap);
3139 
3140  if (Transport_debug_level > 5) {
3141  const LogGuid conv(id_);
3142  SequenceRange sr;
3143  sr.first = to_opendds_seqnum(gap.gapStart);
3144  const SequenceNumber srbase = to_opendds_seqnum(gap.gapList.bitmapBase);
3145  sr.second = srbase.previous();
3146  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_gaps_i "
3147  "GAP with range [%q, %q] from %C\n",
3148  sr.first.getValue(), sr.second.getValue(),
3149  conv.c_str()));
3150  }
3151 
3152  meta_submessages.push_back(meta_submessage);
3153 }
#define ACE_DEBUG(X)
const octet FLAG_E
Definition: RtpsCore.idl:521
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ACE_CDR::ULong ULong
ACE_UINT32 ULong
static ACE_CDR::ULong bitmap_num_longs(const SequenceNumber &low, const SequenceNumber &high)
SequenceNumber_t gapStart
Definition: RtpsCore.idl:577
std::pair< SequenceNumber, SequenceNumber > SequenceRange
sequence< long, 8 > LongSeq8
Definition: RtpsCore.idl:69
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
SequenceNumberSet gapList
Definition: RtpsCore.idl:578

◆ gather_heartbeats()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_heartbeats ( RcHandle< ConstSharedRepoIdSet additional_guids,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 4298 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::MetaSubmessage::dst_guid_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OPENDDS_ASSERT, OpenDDS::DCPS::MetaSubmessage::reset_destination(), OpenDDS::DCPS::MetaSubmessage::sm_, and OpenDDS::RTPS::Count_t::value.

4300 {
4301  OPENDDS_ASSERT(!additional_guids->guids_.empty());
4302 
4304 
4305  RtpsUdpDataLink_rch link = link_.lock();
4306  if (!link) {
4307  return;
4308  }
4309 
4310  const SingleSendBuffer::Proxy proxy(*send_buff_);
4311 
4312  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4313  initialize_heartbeat(proxy, meta_submessage);
4314 
4315  for (RepoIdSet::const_iterator it = additional_guids->guids_.begin(),
4316  limit = additional_guids->guids_.end(); it != limit; ++it) {
4317 
4318  // Semi-directed (INFO_DST but ENTITYID_UNKNOWN, non-final.
4319  meta_submessage.dst_guid_ = *it;
4320  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4321  meta_submessages.push_back(meta_submessage);
4322  meta_submessage.reset_destination();
4323  }
4324 }
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
RcHandle< SingleSendBuffer > send_buff_
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ gather_heartbeats_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_heartbeats_i ( MetaSubmessageVec &  meta_submessages)

Definition at line 4232 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::HeartBeatSubmessage::count, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::RTPS::HeartBeatSubmessage::lastSN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::RTPS::HeartBeatSubmessage::readerId, OpenDDS::DCPS::MetaSubmessage::reset_destination(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::to_rtps_seqnum(), and OpenDDS::RTPS::Count_t::value.

4233 {
4234  if (preassociation_readers_.empty() && lagging_readers_.empty()) {
4235  return;
4236  }
4237 
4239 
4240  RtpsUdpDataLink_rch link = link_.lock();
4241  if (!link) {
4242  return;
4243  }
4244 
4245  using namespace OpenDDS::RTPS;
4246 
4247  const SingleSendBuffer::Proxy proxy(*send_buff_);
4248 
4249  // Assume no samples are available.
4250  const SequenceNumber nonDurableFirstSN = non_durable_first_sn(proxy);
4251  const SequenceNumber firstSN = durable_ ? 1 : nonDurableFirstSN;
4252  const SequenceNumber lastSN = max_sn_;
4253 
4254  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4255  initialize_heartbeat(proxy, meta_submessage);
4256 
4257  // Directed, non-final.
4258  if (!preassociation_readers_.empty()) {
4259  meta_submessages.reserve(meta_submessages.size() + preassociation_readers_.size());
4260  for (ReaderInfoSet::const_iterator pos = preassociation_readers_.begin(), limit = preassociation_readers_.end();
4261  pos != limit; ++pos) {
4262  const ReaderInfo_rch& reader = *pos;
4263  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
4264  }
4265  }
4266 
4267  if (!lagging_readers_.empty()) {
4268  if (leading_readers_.empty() && remote_readers_.size() > 1
4269 #ifdef OPENDDS_SECURITY
4270  && !is_pvs_writer_
4271  && !is_ps_writer_
4272 #endif
4273  ) {
4274  // Every reader is lagging and there is more than one.
4275  meta_submessage.sm_.heartbeat_sm().count.value = ++heartbeat_count_;
4276  meta_submessage.sm_.heartbeat_sm().readerId = ENTITYID_UNKNOWN;
4277  meta_submessage.sm_.heartbeat_sm().firstSN = to_rtps_seqnum(firstSN);
4278  meta_submessage.sm_.heartbeat_sm().lastSN = to_rtps_seqnum(lastSN);
4279 
4280  meta_submessages.push_back(meta_submessage);
4281  meta_submessage.reset_destination();
4282  } else {
4283  for (SNRIS::const_iterator snris_pos = lagging_readers_.begin(), snris_limit = lagging_readers_.end();
4284  snris_pos != snris_limit; ++snris_pos) {
4285  meta_submessages.reserve(meta_submessages.size() + snris_pos->second->readers.size());
4286  for (ReaderInfoSet::const_iterator pos = snris_pos->second->readers.begin(),
4287  limit = snris_pos->second->readers.end();
4288  pos != limit; ++pos) {
4289  const ReaderInfo_rch& reader = *pos;
4290  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
4291  }
4292  }
4293  }
4294  }
4295 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
WeakRcHandle< RtpsUdpDataLink > link_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RcHandle< ReaderInfo > ReaderInfo_rch
const bool is_ps_writer_
Partcicipant Secure (Reliable SPDP) writer.
RcHandle< SingleSendBuffer > send_buff_
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
const bool is_pvs_writer_
Participant Volatile Secure writer.
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36

◆ gather_nack_replies_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i ( MetaSubmessageVec &  meta_submessages)

Definition at line 3467 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::accumulate_addresses(), ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::RtpsUdpDataLink::durability_resend(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::RtpsUdpDataLink::get_addresses(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, LM_DEBUG, OpenDDS::DCPS::RtpsUdpDataLink::locators_lock_, OpenDDS::RTPS::OPENDDS_FLAG_R, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP(), OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::RtpsUdpDataLink::send_strategy(), OpenDDS::DCPS::MetaSubmessage::sm_, OpenDDS::RTPS::HeartBeatSubmessage::smHeader, OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_, OpenDDS::DCPS::RtpsUdpDataLink::transport_statistics_mutex_, and OpenDDS::DCPS::InternalTransportStatistics::writer_resend_count.

3468 {
3469  RtpsUdpDataLink_rch link = link_.lock();
3470 
3471  if (!link) {
3472  return;
3473  }
3474 
3475  // Process naks requests from each reader replying with either data or a gap.
3476  // Requests for directed data are answered with a directed reply.
3477  // Requests for undirected data are answered with an undirected and consolidated reply.
3478  // Directed data includes things like the PVS writer.
3479  // The requests_ for a reader should not contain requests for durable data.
3480 
3481  typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
3482 
3483  // Consolidated non-directed requests and address sets to be sent together at the end, after directed replies
3484  typedef OPENDDS_MAP(SequenceNumber, AddrSet) RecipientMap;
3485  typedef OPENDDS_MAP(SequenceNumber, RepoIdSet) ReaderMap;
3486  DisjointSequence consolidated_requests;
3487  ReaderMap consolidated_request_readers;
3488  RecipientMap consolidated_recipients_unicast;
3489  RecipientMap consolidated_recipients_multicast;
3490  FragmentInfo consolidated_fragment_requests;
3491  ReaderMap consolidated_fragment_request_readers;
3492  RecipientMap consolidated_fragment_recipients_unicast;
3493  RecipientMap consolidated_fragment_recipients_multicast;
3494  DisjointSequence consolidated_gaps;
3495 
3496  ACE_GUARD(TransportSendBuffer::LockType, guard, send_buff_->strategy_lock());
3497  SingleSendBuffer::Proxy proxy(*send_buff_);
3498 
3499  size_t cumulative_send_count = 0;
3500 
3501  for (ReaderInfoSet::const_iterator pos = readers_expecting_data_.begin(), limit = readers_expecting_data_.end();
3502  pos != limit; ++pos) {
3503 
3504  const ReaderInfo_rch& reader = *pos;
3505  const AddrSet addrs = link->get_addresses(id_, reader->id_);
3506 
3507  DisjointSequence gaps;
3508 
3509  if (reader->expecting_durable_data()) {
3510 
3511  if (!reader->requests_.empty() && !reader->durable_data_.empty()) {
3512  const SequenceNumber dd_first = reader->durable_data_.begin()->first;
3513  const SequenceNumber dd_last = reader->durable_data_.rbegin()->first;
3514 
3515  if (reader->requests_.high() < dd_first) {
3516  gaps.insert(SequenceRange(reader->requests_.low(), dd_first.previous()));
3517  reader->requests_.reset();
3518  } else {
3519  const OPENDDS_VECTOR(SequenceRange) psr = reader->requests_.present_sequence_ranges();
3520  for (OPENDDS_VECTOR(SequenceRange)::const_iterator iter = psr.begin(), limit = psr.end();
3521  iter != limit && iter->first <= dd_last; ++iter) {
3522  for (SequenceNumber s = iter->first; s <= iter->second; ++s) {
3523  if (s <= dd_last) {
3524  const OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator dd_iter = reader->durable_data_.find(s);
3525  if (dd_iter != reader->durable_data_.end()) {
3526  link->durability_resend(dd_iter->second, cumulative_send_count);
3527  } else {
3528  gaps.insert(s);
3529  }
3530  reader->requests_.erase(s);
3531  }
3532  }
3533  }
3534  }
3535  }
3536 
3537  typedef RequestedFragSeqMap::const_iterator rfs_iter;
3538  for (rfs_iter rfs = reader->requested_frags_.begin(), limit = reader->requested_frags_.end(); rfs != limit; ++rfs) {
3539  const OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator dd_iter = reader->durable_data_.find(rfs->first);
3540  if (dd_iter != reader->durable_data_.end()) {
3541  for (RequestedFragMap::const_iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3542  link->durability_resend(dd_iter->second, rf->second, cumulative_send_count);
3543  }
3544  } else if ((!reader->durable_data_.empty() && rfs->first < reader->durable_data_.begin()->first)) {
3545  gaps.insert(rfs->first);
3546  }
3547  }
3548 
3549  gather_gaps_i(reader, gaps, meta_submessages);
3550 
3551  // The writer may not be done replaying durable data.
3552  // Do not send a gap.
3553  // TODO: If we have all of the durable data, adjust the request so that we can answer the non-durable part.
3554  reader->requests_.reset();
3555  reader->requested_frags_.clear();
3556  continue;
3557  }
3558 
3559  const SequenceNumber first_sn = std::max(non_durable_first_sn(proxy), reader->start_sn_);
3560  if (!reader->requests_.empty() &&
3561  reader->requests_.high() < first_sn) {
3562  // The reader is not going to get any data.
3563  // Send a gap that is going to to catch them up.
3564  gaps.insert(SequenceRange(reader->requests_.low(), first_sn.previous()));
3565  reader->requests_.reset();
3566  }
3567 
3568  const OPENDDS_VECTOR(SequenceRange) ranges = reader->requests_.present_sequence_ranges();
3569  for (OPENDDS_VECTOR(SequenceRange)::const_iterator iter = ranges.begin(), limit = ranges.end();
3570  iter != limit; ++iter) {
3571  for (SequenceNumber seq = iter->first; seq <= iter->second; ++seq) {
3572  GUID_t destination;
3573  if (proxy.contains(seq, destination)) {
3574  if (destination == GUID_UNKNOWN) {
3575  // Not directed.
3576  consolidated_requests.insert(seq);
3577  consolidated_request_readers[seq].insert(reader->id_);
3578  consolidated_recipients_unicast[seq].insert(addrs.begin(), addrs.end());
3579  ACE_Guard<ACE_Thread_Mutex> g(link->locators_lock_);
3580  link->accumulate_addresses(id_, reader->id_, consolidated_recipients_multicast[seq], false);
3581  continue;
3582  } else if (destination != reader->id_) {
3583  // Directed at another reader.
3584  gaps.insert(seq);
3585  continue;
3586  } else {
3587  // Directed at the reader.
3589  link->send_strategy()->override_destinations(addrs);
3590  proxy.resend_i(SequenceRange(seq, seq), 0, reader->id_);
3591  ++cumulative_send_count;
3592  continue;
3593  }
3594  } else if (proxy.pre_contains(seq) || seq > max_sn_) {
3595  // Can't answer, don't gap.
3596  continue;
3597  }
3598 
3599  if (durable_ || is_pvs_writer()) {
3600  // Must send directed gap.
3601  gaps.insert(seq);
3602  } else {
3603  // Non-directed gap.
3604  consolidated_gaps.insert(seq);
3605  }
3606  }
3607  }
3608 
3609  reader->requests_.reset();
3610 
3611  typedef RequestedFragSeqMap::iterator rfs_iter;
3612  const rfs_iter rfs_end = reader->requested_frags_.end();
3613  for (rfs_iter rfs = reader->requested_frags_.begin(); rfs != rfs_end; ++rfs) {
3614  const SequenceNumber& seq = rfs->first;
3615  GUID_t destination;
3616  if (proxy.contains(seq, destination)) {
3617  if (destination == GUID_UNKNOWN) {
3618  for (RequestedFragMap::iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3619  consolidated_fragment_requests[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
3620  rf->second.bitmap.get_buffer());
3621  }
3622  consolidated_fragment_request_readers[seq].insert(reader->id_);
3623  consolidated_fragment_recipients_unicast[seq].insert(addrs.begin(), addrs.end());
3624  ACE_Guard<ACE_Thread_Mutex> g(link->locators_lock_);
3625  link->accumulate_addresses(id_, reader->id_, consolidated_fragment_recipients_multicast[seq], false);
3626  continue;
3627  } else if (destination != reader->id_) {
3628  // Directed at another reader.
3629  gaps.insert(seq);
3630  continue;
3631  } else {
3632  // Directed at the reader.
3634  link->send_strategy()->override_destinations(addrs);
3635  for (RequestedFragMap::iterator rf = rfs->second.begin(); rf != rfs->second.end(); ++rf) {
3636  DisjointSequence x;
3637  x.insert(rf->second.bitmapBase.value, rf->second.numBits,
3638  rf->second.bitmap.get_buffer());
3639  proxy.resend_fragments_i(seq, x, cumulative_send_count);
3640  }
3641  continue;
3642  }
3643  } else if (proxy.pre_contains(seq) || seq > max_sn_) {
3644  // Can't answer, don't gap.
3645  continue;
3646  }
3647 
3648  if (durable_ || is_pvs_writer()) {
3649  // Must send directed gap.
3650  gaps.insert(seq);
3651  } else {
3652  // Non-directed gap.
3653  consolidated_gaps.insert(seq);
3654  }
3655  }
3656 
3657  reader->requested_frags_.clear();
3658 
3659  // Gather directed gaps.
3660  gather_gaps_i(reader, gaps, meta_submessages);
3661  }
3662 
3663  {
3664  // Send the consolidated requests.
3665  const OPENDDS_VECTOR(SequenceRange) ranges = consolidated_requests.present_sequence_ranges();
3666  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = ranges.begin(), limit = ranges.end();
3667  pos != limit; ++pos) {
3668  if (Transport_debug_level > 5) {
3669  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: "
3670  "resend data %q-%q\n", pos->first.getValue(),
3671  pos->second.getValue()));
3672  }
3673  for (SequenceNumber seq = pos->first; seq <= pos->second; ++seq) {
3674  const AddrSet& uni = consolidated_recipients_unicast[seq];
3675  const AddrSet& multi = consolidated_recipients_multicast[seq];
3676  const RepoIdSet& readers = consolidated_request_readers[seq];
3677 
3678  if (proxy.has_frags(seq)) {
3679  if (consolidated_fragment_requests.find(seq) == consolidated_fragment_requests.end()) {
3680  consolidated_fragment_requests[seq].insert(1);
3681  }
3682  consolidated_fragment_recipients_unicast[seq].insert(uni.begin(), uni.end());
3683  consolidated_fragment_recipients_multicast[seq].insert(multi.begin(), multi.end());
3684  consolidated_fragment_request_readers[seq].insert(readers.begin(), readers.end());
3685  } else {
3687  link->send_strategy()->override_destinations(readers.size() * 2 > remote_readers_.size() ? multi : uni);
3688 
3689  proxy.resend_i(SequenceRange(seq, seq));
3690  ++cumulative_send_count;
3691  }
3692  }
3693  }
3694 
3695  for (FragmentInfo::const_iterator pos = consolidated_fragment_requests.begin(),
3696  limit = consolidated_fragment_requests.end(); pos != limit; ++pos) {
3697  const AddrSet& uni = consolidated_fragment_recipients_unicast[pos->first];
3698  const AddrSet& multi = consolidated_fragment_recipients_multicast[pos->first];
3699  const RepoIdSet& readers = consolidated_fragment_request_readers[pos->first];
3701  link->send_strategy()->override_destinations(readers.size() * 2 > remote_readers_.size() ? multi : uni);
3702 
3703  proxy.resend_fragments_i(pos->first, pos->second, cumulative_send_count);
3704  }
3705  }
3706 
3707  if (cumulative_send_count) {
3708  RtpsUdpInst_rch cfg = link->config();
3709  if (cfg && cfg->count_messages()) {
3710  ACE_GUARD(ACE_Thread_Mutex, g, link->transport_statistics_mutex_);
3711  link->transport_statistics_.writer_resend_count[id_] += static_cast<ACE_CDR::ULong>(cumulative_send_count);
3712  }
3713  }
3714 
3715  // Gather the consolidated gaps.
3716  if (!consolidated_gaps.empty()) {
3717  if (Transport_debug_level > 5) {
3718  ACE_DEBUG((LM_DEBUG,
3719  "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: GAPs:\n"));
3720  consolidated_gaps.dump();
3721  }
3722  gather_gaps_i(ReaderInfo_rch(), consolidated_gaps, meta_submessages);
3723  } else if (Transport_debug_level > 5) {
3724  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::gather_nack_replies_i: "
3725  "no GAPs to send\n"));
3726  }
3727 
3728  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
3729  initialize_heartbeat(proxy, meta_submessage);
3730 
3731  // Directed, non-final.
3732  for (ReaderInfoSet::const_iterator pos = readers_expecting_data_.begin(), limit = readers_expecting_data_.end();
3733  pos != limit; ++pos) {
3734  const ReaderInfo_rch& reader = *pos;
3735  readers_expecting_heartbeat_.erase(reader);
3736  if (reader->reflects_heartbeat_count()) {
3737  meta_submessage.sm_.heartbeat_sm().smHeader.flags |= RTPS::OPENDDS_FLAG_R;
3738  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3739  reader->required_acknack_count_ = heartbeat_count_;
3740  meta_submessage.sm_.heartbeat_sm().smHeader.flags &= ~RTPS::OPENDDS_FLAG_R;
3741  } else {
3742  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3743  }
3744  }
3745  readers_expecting_data_.clear();
3746 
3747  // Directed, final.
3748  meta_submessage.sm_.heartbeat_sm().smHeader.flags |= RTPS::FLAG_F;
3749  meta_submessages.reserve(meta_submessages.size() + readers_expecting_heartbeat_.size());
3750  for (ReaderInfoSet::const_iterator pos = readers_expecting_heartbeat_.begin(), limit = readers_expecting_heartbeat_.end();
3751  pos != limit; ++pos) {
3752  const ReaderInfo_rch& reader = *pos;
3753  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, reader);
3754  }
3756 }
#define ACE_DEBUG(X)
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
RcHandle< ReaderInfo > ReaderInfo_rch
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
TransportSendStrategy::LockType LockType
GuidSet RepoIdSet
Definition: GuidUtils.h:113
RcHandle< SingleSendBuffer > send_buff_
const octet FLAG_F
Definition: RtpsCore.idl:523
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
ReaderInfoSet readers_expecting_heartbeat_
These readers have sent a non-final ack are are expecting a heartbeat.
void gather_gaps_i(const ReaderInfo_rch &reader, const DisjointSequence &gaps, MetaSubmessageVec &meta_submessages)
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
ACE_UINT32 ULong
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)
std::pair< SequenceNumber, SequenceNumber > SequenceRange
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
const octet OPENDDS_FLAG_R
Definition: RtpsCore.idl:529
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71

◆ get_remote_reader_guids()

RcHandle<ConstSharedRepoIdSet> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::get_remote_reader_guids ( )
inline

Definition at line 570 of file RtpsUdpDataLink.h.

◆ get_send_buff()

RcHandle<SingleSendBuffer> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::get_send_buff ( )
inline

Definition at line 569 of file RtpsUdpDataLink.h.

569 { return send_buff_; }
RcHandle< SingleSendBuffer > send_buff_

◆ has_reader()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::has_reader ( const GUID_t id) const

Definition at line 2079 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_.

2080 {
2082  return remote_readers_.count(id) != 0;
2083 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ inc_heartbeat_count()

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::inc_heartbeat_count ( )

Definition at line 4478 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, heartbeat_count_, and mutex_.

4479 {
4481  return ++heartbeat_count_;
4482 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ initialize_heartbeat()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::initialize_heartbeat ( const SingleSendBuffer::Proxy proxy,
MetaSubmessage meta_submessage 
)
private

Definition at line 4156 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::MetaSubmessage::sm_, and OpenDDS::RTPS::to_rtps_seqnum().

4158 {
4159  using namespace OpenDDS::RTPS;
4160 
4161  // Assume no samples are available.
4162  const SequenceNumber nonDurableFirstSN = non_durable_first_sn(proxy);
4163  const SequenceNumber firstSN = durable_ ? 1 : nonDurableFirstSN;
4164 
4165  const HeartBeatSubmessage hb = {
4166  {HEARTBEAT,
4167  FLAG_E,
4168  HEARTBEAT_SZ},
4169  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4170  id_.entityId,
4171  to_rtps_seqnum(firstSN),
4173  {0}
4174  };
4175 
4176  meta_submessage.sm_.heartbeat_sm(hb);
4177 }
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
const octet FLAG_E
Definition: RtpsCore.idl:521
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59

◆ is_lagging()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_lagging ( const ReaderInfo_rch reader) const
private

Definition at line 3885 of file RtpsUdpDataLink.cpp.

3886 {
3887  return reader->acked_sn() != expected_max_sn(reader);
3888 }
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const

◆ is_leading() [1/2]

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_leading ( const ReaderInfo_rch reader) const
private

Definition at line 3891 of file RtpsUdpDataLink.cpp.

Referenced by is_leading().

3892 {
3893  return reader->acked_sn() == expected_max_sn(reader);
3894 }
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const

◆ is_leading() [2/2]

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_leading ( const GUID_t id) const

Definition at line 4513 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, is_leading(), mutex_, and remote_readers_.

4514 {
4516 
4517  ReaderInfoMap::const_iterator iter = remote_readers_.find(reader_id);
4518  if (iter != remote_readers_.end()) {
4519  return is_leading(iter->second);
4520  }
4521 
4522  return false;
4523 }
bool is_leading(const ReaderInfo_rch &reader) const
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ is_pvs_writer()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_pvs_writer ( ) const
inlineprivate

Definition at line 495 of file RtpsUdpDataLink.h.

495 { return is_pvs_writer_; }
const bool is_pvs_writer_
Participant Volatile Secure writer.

◆ log_remote_counts()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::log_remote_counts ( const char *  funcname)
private

Definition at line 4817 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, id_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_remote_counts, preassociation_readers_, remote_readers_, and OpenDDS::DCPS::transport_debug.

4818 {
4820  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_remote_counts} "
4821  "RtpsUdpDataLink::RtpsWriter::%C: "
4822  "%C pre: %b assoc: %b\n",
4823  funcname, LogGuid(id_).c_str(),
4824  preassociation_readers_.size(), remote_readers_.size()));
4825  }
4826 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
bool log_remote_counts
Log number of associations and pending associations of RTPS entities.

◆ make_lagger_leader()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::make_lagger_leader ( const ReaderInfo_rch reader,
const SequenceNumber  previous_acked_sn 
)
private

Definition at line 3859 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_.

3861 {
3862  RtpsUdpDataLink_rch link = link_.lock();
3863  if (!link) {
3864  return;
3865  }
3866 
3867  const SequenceNumber acked_sn = reader->acked_sn();
3868  if (previous_acked_sn == acked_sn) { return; }
3869  const SequenceNumber previous_max_sn = expected_max_sn(reader);
3870 #ifdef OPENDDS_SECURITY
3871  if (is_pvs_writer_ && acked_sn > previous_max_sn) {
3872  reader->max_pvs_sn_ = acked_sn;
3873  }
3874 #endif
3875  const SequenceNumber max_sn = expected_max_sn(reader);
3876 
3877  snris_erase(previous_acked_sn == previous_max_sn ? leading_readers_ : lagging_readers_, previous_acked_sn, reader);
3878  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3879  if (acked_sn != max_sn) {
3880  heartbeat_->schedule(fallback_.get());
3881  }
3882 }
WeakRcHandle< RtpsUdpDataLink > link_
FibonacciSequence< TimeDuration > fallback_
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
static void snris_insert(RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
const bool is_pvs_writer_
Participant Volatile Secure writer.
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ make_leader_lagger()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::make_leader_lagger ( const GUID_t reader,
SequenceNumber  previous_max_sn 
)
private

Definition at line 3800 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_.

Referenced by update_max_sn().

3802 {
3803  ACE_UNUSED_ARG(reader_id);
3804 
3805  if (stopping_) {
3806  return;
3807  }
3808 
3809  RtpsUdpDataLink_rch link = link_.lock();
3810  if (!link) {
3811  return;
3812  }
3813 
3814 #ifdef OPENDDS_SECURITY
3815  if (!is_pvs_writer_) {
3816 #endif
3817  if (previous_max_sn != max_sn_) {
3818  // All readers that have acked previous_max_sn are now lagging.
3819  // Move leader_readers_[previous_max_sn] to lagging_readers_[previous_max_sn].
3820  SNRIS::iterator leading_pos = leading_readers_.find(previous_max_sn);
3821  SNRIS::iterator lagging_pos = lagging_readers_.find(previous_max_sn);
3822  if (leading_pos != leading_readers_.end()) {
3823  if (lagging_pos != lagging_readers_.end()) {
3824  lagging_pos->second->readers.insert(leading_pos->second->readers.begin(), leading_pos->second->readers.end());
3825  } else {
3826  lagging_readers_[previous_max_sn] = leading_pos->second;
3827  }
3828  leading_readers_.erase(leading_pos);
3829  heartbeat_->schedule(fallback_.get());
3830  }
3831  }
3832 #ifdef OPENDDS_SECURITY
3833  } else {
3834  // Move a specific reader.
3835  const ReaderInfoMap::iterator iter = remote_readers_.find(reader_id);
3836  if (iter == remote_readers_.end()) {
3837  return;
3838  }
3839 
3840  const ReaderInfo_rch& reader = iter->second;
3841  previous_max_sn = reader->max_pvs_sn_;
3842  reader->max_pvs_sn_ = max_sn_;
3843  if (preassociation_readers_.count(reader)) {
3844  // Will be inserted once association is complete.
3845  return;
3846  }
3847 
3848  const SequenceNumber acked_sn = reader->acked_sn();
3849  if (acked_sn == previous_max_sn && previous_max_sn != max_sn_) {
3850  snris_erase(leading_readers_, acked_sn, reader);
3851  snris_insert(lagging_readers_, reader);
3852  heartbeat_->schedule(fallback_.get());
3853  }
3854  }
3855 #endif
3856 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
WeakRcHandle< RtpsUdpDataLink > link_
FibonacciSequence< TimeDuration > fallback_
RcHandle< ReaderInfo > ReaderInfo_rch
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
static void snris_insert(RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
const bool is_pvs_writer_
Participant Volatile Secure writer.
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ max_data_seq()

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::max_data_seq ( const SingleSendBuffer::Proxy proxy,
const ReaderInfo_rch ri 
) const

Definition at line 4485 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::SingleSendBuffer::Proxy::empty(), OpenDDS::DCPS::SingleSendBuffer::Proxy::high(), OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_empty(), and OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_high().

4487 {
4488  const SequenceNumber durable_max = ri->durable_data_.empty() ? 0 : ri->durable_data_.rbegin()->first;
4489  const SequenceNumber pre_max = proxy.pre_empty() ? 0 : proxy.pre_high();
4490  const SequenceNumber data_max = proxy.empty() ? 0 : proxy.high();
4491  return std::max(durable_max, std::max(pre_max, data_max));
4492 }

◆ non_durable_first_sn()

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::non_durable_first_sn ( const SingleSendBuffer::Proxy proxy) const
inlineprivate

Definition at line 500 of file RtpsUdpDataLink.h.

References OpenDDS::DCPS::SingleSendBuffer::Proxy::empty(), OpenDDS::DCPS::SingleSendBuffer::Proxy::low(), OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_empty(), and OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_low().

501  {
502  if (!proxy.empty()) {
503  return proxy.low();
504  }
505  if (!proxy.pre_empty()) {
506  return proxy.pre_low();
507  }
508  return max_sn_ + 1;
509  }

◆ OPENDDS_MULTIMAP()

typedef OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::OPENDDS_MULTIMAP ( SequenceNumber  ,
TransportQueueElement  
)
private

◆ OPENDDS_MULTISET()

typedef OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::OPENDDS_MULTISET ( OpenDDS::DCPS::SequenceNumber  )
private

◆ OPENDDS_SET_CMP()

typedef OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::OPENDDS_SET_CMP ( TransportQueueElement ,
TransportQueueElement::OrderBySequenceNumber   
)
private

◆ pre_stop_helper()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::pre_stop_helper ( TqeVector &  to_drop,
bool  true_stop 
)

Definition at line 824 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, and OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET().

825 {
826  typedef SnToTqeMap::iterator iter_t;
827 
830 
831  stopping_ = true_stop;
832 
833  if (!elems_not_acked_.empty()) {
834  OPENDDS_SET(SequenceNumber) sns_to_release;
835  iter_t iter = elems_not_acked_.begin();
836  while (iter != elems_not_acked_.end()) {
837  to_drop.push_back(iter->second);
838  sns_to_release.insert(iter->first);
839  elems_not_acked_.erase(iter);
840  iter = elems_not_acked_.begin();
841  }
842  OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
843  while (sns_it != sns_to_release.end()) {
844  send_buff_->release_acked(*sns_it);
845  ++sns_it;
846  }
847  }
848 
849  send_buff_->pre_clear();
850 
851  g2.release();
852  g.release();
853 
854  if (stopping_) {
855  heartbeat_->cancel();
856  nack_response_->cancel();
857  }
858 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< SingleSendBuffer > send_buff_
typedef OPENDDS_SET(ReaderInfo_rch) ReaderInfoSet

◆ process_acked_by_all()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acked_by_all ( )

Definition at line 3954 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_.

3955 {
3956  TqeSet to_deliver;
3957  {
3959  acked_by_all_helper_i(to_deliver);
3960  }
3961 
3962  TqeSet::iterator deliver_iter = to_deliver.begin();
3963  while (deliver_iter != to_deliver.end()) {
3964  (*deliver_iter)->data_delivered();
3965  ++deliver_iter;
3966  }
3967 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ process_acknack()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_acknack ( const RTPS::AckNackSubmessage acknack,
const GUID_t src,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 3156 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, OpenDDS::RTPS::bitmapNonEmpty(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::DCPS::SingleSendBuffer::Proxy::contains(), OpenDDS::RTPS::AckNackSubmessage::count, OpenDDS::DCPS::RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC, OpenDDS::DCPS::EventDispatcher::dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::event_dispatcher(), OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::SubmessageHeader::flags, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::TransportDebug::log_nonfinal_messages, OpenDDS::DCPS::log_progress(), OpenDDS::DCPS::TransportDebug::log_progress, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::log_remote_counts(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP(), OPENDDS_MULTIMAP, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SingleSendBuffer::Proxy::pre_contains(), OpenDDS::RTPS::AckNackSubmessage::readerSNState, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::AckNackSubmessage::smHeader, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_, OpenDDS::DCPS::RcHandle< T >::swap(), OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, OpenDDS::RTPS::Count_t::value, VDBG, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

3159 {
3161 
3162  if (stopping_) {
3163  return;
3164  }
3165 
3166  RtpsUdpDataLink_rch link = link_.lock();
3167 
3168  if (!link) {
3169  return;
3170  }
3171 
3172  const SequenceNumber ack = to_opendds_seqnum(acknack.readerSNState.bitmapBase);
3173 
3174  if (Transport_debug_level > 5) {
3175  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C base %q bits %u count %d\n",
3176  LogGuid(src).c_str(), LogGuid(id_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
3177  }
3178 
3179  ReaderInfoMap::iterator ri = remote_readers_.find(src);
3180  if (ri == remote_readers_.end()) {
3182  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C unknown remote reader\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3183  }
3184  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
3185  "WARNING ReaderInfo not found\n"));
3186  return;
3187  }
3188 
3189  const ReaderInfo_rch& reader = ri->second;
3190 
3191  SequenceNumber previous_acked_sn = reader->acked_sn();
3192  const bool count_is_not_zero = acknack.count.value != 0;
3193  const CORBA::Long previous_count = reader->acknack_recvd_count_;
3194  bool dont_schedule_nack_response = false;
3195 
3196  if (count_is_not_zero) {
3197  if (!compare_and_update_counts(acknack.count.value, reader->acknack_recvd_count_) &&
3198  (!reader->reflects_heartbeat_count() || acknack.count.value != 0 || reader->acknack_recvd_count_ != 0)) {
3200  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C stale/duplicate message\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3201  }
3202  VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
3203  "WARNING Count indicates duplicate, dropping\n"));
3204  return;
3205  }
3206 
3207  if (reader->reflects_heartbeat_count()) {
3208  if (acknack.count.value < reader->required_acknack_count_) {
3210  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C stale message (reflect %d < %d)\n", LogGuid(src).c_str(), LogGuid(id_).c_str(), acknack.count.value, reader->required_acknack_count_));
3211  }
3212  dont_schedule_nack_response = true;
3213  }
3214  }
3215  }
3216 
3218 
3219  const bool is_final = acknack.smHeader.flags & RTPS::FLAG_F;
3220  const bool is_postassociation = count_is_not_zero && (is_final || bitmapNonEmpty(acknack.readerSNState) || ack != 1);
3221 
3222  if (preassociation_readers_.count(reader)) {
3223  if (is_postassociation) {
3226  log_progress("RTPS writer/reader association complete", id_, reader->id_, reader->participant_discovered_at_);
3227  }
3228  log_remote_counts("process_acknack");
3229 
3230  const SequenceNumber max_sn = expected_max_sn(reader);
3231  const SequenceNumber acked_sn = reader->acked_sn();
3232  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3234  // Heartbeat is already scheduled.
3235  }
3236  }
3237 
3238  OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
3239 
3240  if (!is_final && transport_debug.log_nonfinal_messages) {
3241  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C base %q bits %u count %d\n",
3242  LogGuid(src).c_str(), LogGuid(id_).c_str(), ack.getValue(), acknack.readerSNState.numBits, acknack.count.value));
3243  }
3244 
3245  // Process the ack.
3246  bool inform_send_listener = false;
3248 
3249  if (ack >= reader->cur_cumulative_ack_) {
3250  reader->cur_cumulative_ack_ = ack;
3251  inform_send_listener = true;
3252  } else if (count_is_not_zero) {
3253  // Count increased but ack decreased. Reset.
3254  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING RtpsUdpDataLink::RtpsWriter::process_acknack: "
3255  "%C -> %C reset detected count %d > %d ack %q < %q\n",
3256  LogGuid(id_).c_str(), LogGuid(reader->id_).c_str(),
3257  acknack.count.value, previous_count, ack.getValue(), reader->cur_cumulative_ack_.getValue()));
3258  const SequenceNumber max_sn = expected_max_sn(reader);
3259  snris_erase(previous_acked_sn == max_sn ? leading_readers_ : lagging_readers_, previous_acked_sn, reader);
3260  reader->cur_cumulative_ack_ = ack;
3261  const SequenceNumber acked_sn = reader->acked_sn();
3262  snris_insert(acked_sn == max_sn ? leading_readers_ : lagging_readers_, reader);
3263  previous_acked_sn = acked_sn;
3265  heartbeat_->schedule(fallback_.get());
3266 
3267  if (reader->durable_) {
3268  if (Transport_debug_level > 5) {
3269  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: enqueuing ReplayDurableData\n"));
3270  }
3271  reader->durable_data_.swap(pendingCallbacks);
3272  link->event_dispatcher()->dispatch(make_rch<ReplayDurableData>(link_, id_, src));
3273  reader->durable_timestamp_ = MonotonicTimePoint::zero_value;
3274  }
3275  }
3276 
3277  if (!reader->durable_data_.empty()) {
3278  if (Transport_debug_level > 5) {
3279  const LogGuid local_conv(id_), remote_conv(src);
3280  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3281  "local %C has durable for remote %C\n",
3282  local_conv.c_str(),
3283  remote_conv.c_str()));
3284  }
3285  const SequenceNumber& dd_last = reader->durable_data_.rbegin()->first;
3286  if (Transport_debug_level > 5) {
3287  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3288  "check base %q against last durable %q\n",
3289  ack.getValue(), dd_last.getValue()));
3290  }
3291  if (ack > dd_last) {
3293  log_progress("durable delivered", id_, reader->id_, reader->participant_discovered_at_);
3294  }
3295  // Reader acknowledges durable data, we no longer need to store it
3296  if (Transport_debug_level > 5) {
3297  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_acknack: "
3298  "durable data acked\n"));
3299  }
3300  reader->durable_data_.swap(pendingCallbacks);
3301  } else {
3302  for (OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator pos = reader->durable_data_.begin(),
3303  limit = reader->durable_data_.end(); pos != limit && pos->first < ack;) {
3304  pendingCallbacks.insert(*pos);
3305  reader->durable_data_.erase(pos++);
3306  }
3307  }
3308  }
3309  } else {
3310  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: RtpsUdpDataLink::RtpsWriter::process_acknack: %C -> %C invalid acknack\n", LogGuid(src).c_str(), LogGuid(id_).c_str()));
3311  }
3312 
3313  // Process the nack.
3314  bool schedule_nack_response = false;
3315  if (!dont_schedule_nack_response) {
3316  if (count_is_not_zero) {
3317  reader->requests_.reset();
3318  {
3319  const SingleSendBuffer::Proxy proxy(*send_buff_);
3320  if ((acknack.readerSNState.numBits == 0 ||
3321  (acknack.readerSNState.numBits == 1 && !(acknack.readerSNState.bitmap[0] & 1)))
3322  && ack == max_data_seq(proxy, reader)) {
3323  // Since there is an entry in requested_changes_, the DR must have
3324  // sent a non-final AckNack. If the base value is the high end of
3325  // the heartbeat range, treat it as a request for that seq#.
3326  if (reader->durable_data_.count(ack) || proxy.contains(ack) || proxy.pre_contains(ack)) {
3327  reader->requests_.insert(ack);
3328  }
3329  } else {
3330  reader->requests_.insert(ack, acknack.readerSNState.numBits, acknack.readerSNState.bitmap.get_buffer());
3331  }
3332 
3333  if (!reader->requests_.empty()) {
3334  readers_expecting_data_.insert(reader);
3335  schedule_nack_response = true;
3336  } else if (reader->requested_frags_.empty()) {
3337  readers_expecting_data_.erase(reader);
3338  }
3339  }
3340  }
3341 
3342  if (!is_final) {
3343  readers_expecting_heartbeat_.insert(reader);
3344  schedule_nack_response = true;
3345  }
3346  }
3347 
3348  if (preassociation_readers_.count(reader) == 0) {
3349  make_lagger_leader(reader, previous_acked_sn);
3351  }
3352 
3353  TqeSet to_deliver;
3354  acked_by_all_helper_i(to_deliver);
3355 
3356 #ifdef OPENDDS_SECURITY
3357  if (is_pvs_writer_ &&
3358  !reader->pvs_outstanding_.empty() &&
3359  reader->pvs_outstanding_.low() < reader->cur_cumulative_ack_) {
3360  const OPENDDS_VECTOR(SequenceRange) psr = reader->pvs_outstanding_.present_sequence_ranges();
3361  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end();
3362  pos != limit && pos->first < reader->cur_cumulative_ack_; ++pos) {
3364  for (SequenceNumber seq = pos->first; seq <= pos->second && seq < reader->cur_cumulative_ack_; ++seq) {
3365  reader->pvs_outstanding_.erase(seq);
3366  OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter = elems_not_acked_.find(seq);
3367  if (iter != elems_not_acked_.end()) {
3368  send_buff_->release_acked(iter->first);
3369  to_deliver.insert(iter->second);
3370  elems_not_acked_.erase(iter);
3371  }
3372  }
3373  }
3374  }
3375 #endif
3376 
3377  if (!dont_schedule_nack_response && schedule_nack_response) {
3378  RtpsUdpInst_rch cfg = link->config();
3379  nack_response_->schedule(cfg ? cfg->nak_response_delay_ : TimeDuration(0, RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC));
3380  }
3381 
3382  TransportClient_rch client = client_.lock();
3383 
3384  g.release();
3385 
3386  if (inform_send_listener && client) {
3387  client->data_acked(src);
3388  }
3389 
3390  typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
3391  for (iter_t it = pendingCallbacks.begin();
3392  it != pendingCallbacks.end(); ++it) {
3393  it->second->data_delivered();
3394  }
3395 
3396  TqeSet::iterator deliver_iter = to_deliver.begin();
3397  while (deliver_iter != to_deliver.end()) {
3398  (*deliver_iter)->data_delivered();
3399  ++deliver_iter;
3400  }
3401 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
#define ACE_DEBUG(X)
ACE_CDR::Long Long
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement *) SnToTqeMap
FibonacciSequence< TimeDuration > fallback_
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
WeakRcHandle< TransportClient > client_
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
RcHandle< ReaderInfo > ReaderInfo_rch
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
bool log_dropped_messages
Log received RTPS messages that were dropped.
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
static const suseconds_t DEFAULT_NAK_RESPONSE_DELAY_USEC
Definition: RtpsUdpInst.h:34
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
RcHandle< SingleSendBuffer > send_buff_
const octet FLAG_F
Definition: RtpsCore.idl:523
bool bitmapNonEmpty(const SequenceNumberSet &snSet)
SequenceNumber max_data_seq(const SingleSendBuffer::Proxy &proxy, const ReaderInfo_rch &) const
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40
ReaderInfoSet readers_expecting_heartbeat_
These readers have sent a non-final ack are are expecting a heartbeat.
#define VDBG(DBG_ARGS)
RcHandle< TransportClient > TransportClient_rch
static void snris_insert(RtpsUdpDataLink::SNRIS &snris, const ReaderInfo_rch &reader)
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
const bool is_pvs_writer_
Participant Volatile Secure writer.
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
std::pair< SequenceNumber, SequenceNumber > SequenceRange
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
void make_lagger_leader(const ReaderInfo_rch &reader, const SequenceNumber previous_acked_sn)
RcHandle< T > lock() const
Definition: RcObject.h:188
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
bool log_progress
Log progress for RTPS entity discovery and association.
void remove_preassociation_reader(const ReaderInfo_rch &reader)
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71

◆ process_nackfrag()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::process_nackfrag ( const RTPS::NackFragSubmessage nackfrag,
const GUID_t src,
MetaSubmessageVec &  meta_submessages 
)

Definition at line 3412 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_dropped_messages, OpenDDS::DCPS::TransportDebug::log_nonfinal_messages, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::stopping_, OpenDDS::RTPS::to_opendds_seqnum(), OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, OpenDDS::RTPS::Count_t::value, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::received().

3415 {
3417 
3418  if (stopping_) {
3419  return;
3420  }
3421 
3422  RtpsUdpDataLink_rch link = link_.lock();
3423 
3424  if (!link) {
3425  return;
3426  }
3427 
3428  if (Transport_debug_level > 5) {
3429  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C\n",
3430  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3431  }
3432 
3433  const ReaderInfoMap::iterator ri = remote_readers_.find(src);
3434  if (ri == remote_readers_.end()) {
3436  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C unknown remote reader\n",
3437  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3438  }
3439  return;
3440  }
3441 
3442  const ReaderInfo_rch& reader = ri->second;
3443 
3444  if (!compare_and_update_counts(nackfrag.count.value, reader->nackfrag_recvd_count_)) {
3446  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_dropped_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C stale/duplicate message\n",
3447  LogGuid(src).c_str(), LogGuid(id_).c_str()));
3448  }
3449  return;
3450  }
3451 
3452  const SequenceNumber seq = to_opendds_seqnum(nackfrag.writerSN);
3453 
3454  // All NackFrag messages are technically 'non-final' since they are only used to negatively acknowledge fragments and expect a response
3456  ACE_DEBUG((LM_DEBUG, "(%P|%t) {transport_debug.log_nonfinal_messages} RtpsUdpDataLink::RtpsWriter::process_nackfrag: %C -> %C seq %q base %u bits %u\n",
3457  LogGuid(src).c_str(), LogGuid(id_).c_str(), seq.getValue(), nackfrag.fragmentNumberState.bitmapBase.value, nackfrag.fragmentNumberState.numBits));
3458  }
3459 
3460  reader->requested_frags_[seq][nackfrag.fragmentNumberState.bitmapBase.value] = nackfrag.fragmentNumberState;
3461  readers_expecting_data_.insert(reader);
3462  RtpsUdpInst_rch cfg = link->config();
3463  nack_response_->schedule(cfg ? cfg->nak_response_delay_ : TimeDuration(0, RtpsUdpInst::DEFAULT_NAK_RESPONSE_DELAY_USEC));
3464 }
#define ACE_DEBUG(X)
RcHandle< RtpsUdpInst > RtpsUdpInst_rch
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
RcHandle< ReaderInfo > ReaderInfo_rch
bool log_dropped_messages
Log received RTPS messages that were dropped.
static const suseconds_t DEFAULT_NAK_RESPONSE_DELAY_USEC
Definition: RtpsUdpInst.h:34
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132

◆ reader_count()

size_t OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::reader_count ( ) const

Definition at line 2143 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, and OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_.

2144 {
2146  return remote_readers_.size();
2147 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ record_directed()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::record_directed ( const GUID_t reader,
SequenceNumber  seq 
)
private

Definition at line 3934 of file RtpsUdpDataLink.cpp.

3935 {
3936  ACE_UNUSED_ARG(reader_id);
3937  ACE_UNUSED_ARG(seq);
3938 #ifdef OPENDDS_SECURITY
3939  if (!is_pvs_writer_) {
3940  return;
3941  }
3942 
3943  const ReaderInfoMap::iterator iter = remote_readers_.find(reader_id);
3944  if (iter == remote_readers_.end()) {
3945  return;
3946  }
3947 
3948  const ReaderInfo_rch& reader = iter->second;
3949  reader->pvs_outstanding_.insert(seq);
3950 #endif
3951 }
RcHandle< ReaderInfo > ReaderInfo_rch
const bool is_pvs_writer_
Participant Volatile Secure writer.

◆ remove_all_msgs()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_all_msgs ( )

Definition at line 250 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::DataLink::id_, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::DataLink::strategy_lock_, and OpenDDS::DCPS::SequenceNumber::ZERO().

251 {
253 
254  RtpsUdpDataLink_rch link = link_.lock();
255  if (!link) {
256  return;
257  }
258 
259  send_buff_->retain_all(id_);
260 
261  {
264  GuardType guard(link->strategy_lock_);
265  if (link->send_strategy_) {
266  link->send_strategy_->remove_all_msgs(id_);
267  }
268  }
269 
271 
272  SnToTqeMap sn_tqe_map;
273  sn_tqe_map.swap(elems_not_acked_);
274 
275  g2.release();
276 
277  SequenceNumber prev = SequenceNumber::ZERO();
278  typedef SnToTqeMap::iterator iter_t;
279  for (iter_t it = sn_tqe_map.begin(); it != sn_tqe_map.end(); ++it) {
280  if (it->first != prev) {
281  send_buff_->release_acked(it->first);
282  prev = it->first;
283  }
284  }
285 
286  g.release();
287 
288  for (iter_t it = sn_tqe_map.begin(); it != sn_tqe_map.end(); ++it) {
289  it->second->data_dropped(true);
290  }
291 }
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< SingleSendBuffer > send_buff_
static SequenceNumber ZERO()
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ remove_preassociation_reader()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_preassociation_reader ( const ReaderInfo_rch reader)
inlineprivate

Definition at line 511 of file RtpsUdpDataLink.h.

References OPENDDS_ASSERT.

512  {
513  if (preassociation_readers_.erase(reader)) {
514  SequenceNumberMultiset::iterator pos = preassociation_reader_start_sns_.find(reader->start_sn_);
517  }
518  }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
SequenceNumberMultiset preassociation_reader_start_sns_

◆ remove_reader()

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_reader ( const GUID_t id)

Definition at line 2086 of file RtpsUdpDataLink.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::log_remote_counts(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_, OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP(), OPENDDS_MULTIMAP, and OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR().

2087 {
2088  OPENDDS_MAP(SequenceNumber, TransportQueueElement*) dd;
2089  TqeSet to_drop;
2090 
2091  bool result = false;
2092  {
2094  ReaderInfoMap::iterator it = remote_readers_.find(id);
2095  if (it != remote_readers_.end()) {
2096  const ReaderInfo_rch& reader = it->second;
2097  reader->swap_durable_data(dd);
2099  const SequenceNumber acked_sn = reader->acked_sn();
2100  const SequenceNumber max_sn = expected_max_sn(reader);
2101  readers_expecting_data_.erase(reader);
2102  readers_expecting_heartbeat_.erase(reader);
2103  snris_erase(acked_sn == max_sn ? leading_readers_ : lagging_readers_, acked_sn, reader);
2105 
2106 #ifdef OPENDDS_SECURITY
2107  if (is_pvs_writer_ &&
2108  !reader->pvs_outstanding_.empty()) {
2109  const OPENDDS_VECTOR(SequenceRange) psr = reader->pvs_outstanding_.present_sequence_ranges();
2110  for (OPENDDS_VECTOR(SequenceRange)::const_iterator pos = psr.begin(), limit = psr.end(); pos != limit; ++pos) {
2112  for (SequenceNumber seq = pos->first; seq <= pos->second; ++seq) {
2113  OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter = elems_not_acked_.find(seq);
2114  if (iter != elems_not_acked_.end()) {
2115  send_buff_->release_acked(iter->first);
2116  to_drop.insert(iter->second);
2117  elems_not_acked_.erase(iter);
2118  }
2119  }
2120  }
2121  }
2122 #endif
2123 
2124  remote_readers_.erase(it);
2125  update_remote_guids_cache_i(false, id);
2126  result = true;
2127  log_remote_counts("remove_reader");
2128  }
2129  }
2130  typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
2131  for (iter_t it = dd.begin(); it != dd.end(); ++it) {
2132  it->second->data_dropped();
2133  }
2134 
2135  for (TqeSet::iterator pos = to_drop.begin(), limit = to_drop.end(); pos != limit; ++pos) {
2136  (*pos)->data_dropped();
2137  }
2138 
2139  return result;
2140 }
typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement *) SnToTqeMap
ReaderInfoSet readers_expecting_data_
These readers have sent a nack and are expecting data.
RcHandle< ReaderInfo > ReaderInfo_rch
typedef OPENDDS_VECTOR(MetaSubmessageVec::iterator) MetaSubmessageIterVec
SequenceNumber expected_max_sn(const ReaderInfo_rch &reader) const
static void snris_erase(RtpsUdpDataLink::SNRIS &snris, const SequenceNumber sn, const ReaderInfo_rch &reader)
RcHandle< SingleSendBuffer > send_buff_
ReaderInfoSet readers_expecting_heartbeat_
These readers have sent a non-final ack are are expecting a heartbeat.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
typedef OPENDDS_MAP(FragmentNumberValue, RTPS::FragmentNumberSet) RequestedFragMap
const bool is_pvs_writer_
Participant Volatile Secure writer.
SNRIS leading_readers_
These reader have acked everything they are supposed to have acked.
std::pair< SequenceNumber, SequenceNumber > SequenceRange
void update_remote_guids_cache_i(bool add, const GUID_t &guid)
void remove_preassociation_reader(const ReaderInfo_rch &reader)
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71

◆ remove_sample()

RemoveResult OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_sample ( const DataSampleElement sample)

Definition at line 183 of file RtpsUdpDataLink.cpp.

References ACE_Message_Block::cont(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::TransportQueueElement::MatchOnDataPayload::matches(), OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, ACE_Message_Block::rd_ptr(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::TransportSendStrategy::remove_sample(), OpenDDS::DCPS::DataLink::send_strategy_, OpenDDS::DCPS::DataSampleHeader::sequence_, and OpenDDS::DCPS::DataLink::strategy_lock_.

184 {
185  bool found = false;
186  SequenceNumber to_release;
187  TransportQueueElement* tqe = 0;
188 
189  const SequenceNumber& seq = sample->get_header().sequence_;
190  const char* const payload = sample->get_sample()->cont()->rd_ptr();
191  const TransportQueueElement::MatchOnDataPayload modp(payload);
192  SingleSendBuffer::BufferVec removed;
193 
195 
196  RtpsUdpDataLink_rch link = link_.lock();
197  if (!link) {
198  return REMOVE_NOT_FOUND;
199  }
200 
202  {
203  GuardType guard(link->strategy_lock_);
204  if (link->send_strategy_) {
207  result = link->send_strategy_->remove_sample(sample);
208  guard.release();
209  }
210  }
211 
213 
214  if (!elems_not_acked_.empty()) {
215  typedef SnToTqeMap::iterator iter_t;
216  for (std::pair<iter_t, iter_t> er = elems_not_acked_.equal_range(seq); er.first != er.second; ++er.first) {
217  if (modp.matches(*er.first->second)) {
218  found = true;
219  to_release = seq;
220  tqe = er.first->second;
221  elems_not_acked_.erase(er.first);
222  break;
223  }
224  }
225  }
226 
227  g2.release();
228 
229  if (found) {
230  send_buff_->remove_acked(to_release, removed);
231  }
232 
233  g.release();
234 
235  if (found) {
236  for (size_t i = 0; i < removed.size(); ++i) {
237  RemoveAllVisitor visitor;
238  removed[i].first->accept_remove_visitor(visitor);
239  delete removed[i].first;
240  removed[i].second->release();
241  }
242  removed.clear();
243  tqe->data_dropped(true);
244  result = REMOVE_FOUND;
245  }
246  return result;
247 }
WeakRcHandle< RtpsUdpDataLink > link_
RcHandle< SingleSendBuffer > send_buff_
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ request_ack_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::request_ack_i ( const DataSampleHeader header,
ACE_Message_Block body,
MetaSubmessageVec &  meta_submessages 
)
private

Definition at line 1394 of file RtpsUdpDataLink.cpp.

References ACE_DEBUG, OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataLink::id_, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_length_, ACE_Message_Block::rd_ptr(), and OpenDDS::DCPS::Transport_debug_level.

1397 {
1398  // Set the ReaderInfo::durable_timestamp_ for the case where no
1399  // durable samples exist in the DataWriter.
1400  GUID_t sub = GUID_UNKNOWN;
1401  if (body && header.message_length_ >= sizeof(sub)) {
1402  std::memcpy(&sub, body->rd_ptr(), sizeof(sub));
1403  }
1404  typedef ReaderInfoMap::iterator iter_t;
1405  if (sub == GUID_UNKNOWN) {
1406  gather_heartbeats_i(meta_submessages);
1407  if (Transport_debug_level > 3) {
1408  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::request_ack "
1409  "local %C all readers\n", LogGuid(id_).c_str()));
1410  }
1411  } else {
1412  iter_t iter = remote_readers_.find(sub);
1413  if (iter != remote_readers_.end()) {
1414  const SingleSendBuffer::Proxy proxy(*send_buff_);
1415  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
1416  initialize_heartbeat(proxy, meta_submessage);
1417  gather_directed_heartbeat_i(proxy, meta_submessages, meta_submessage, iter->second);
1418  if (Transport_debug_level > 3) {
1419  const LogGuid conv(id_), sub_conv(sub);
1420  ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::request_ack"
1421  " local %C remote %C\n", conv.c_str(), sub_conv.c_str()));
1422  }
1423  }
1424  }
1425 }
#define ACE_DEBUG(X)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
char * rd_ptr(void) const
void gather_heartbeats_i(MetaSubmessageVec &meta_submessages)
RcHandle< SingleSendBuffer > send_buff_
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
void initialize_heartbeat(const SingleSendBuffer::Proxy &proxy, MetaSubmessage &meta_submessage)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
void gather_directed_heartbeat_i(const SingleSendBuffer::Proxy &proxy, MetaSubmessageVec &meta_submessages, MetaSubmessage &meta_submessage, const ReaderInfo_rch &reader)

◆ send_heartbeats()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_heartbeats ( const MonotonicTimePoint now)
private

Definition at line 1450 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, and OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages().

1451 {
1453 
1454  if (stopping_) {
1455  return;
1456  }
1457 
1458  RtpsUdpDataLink_rch link = link_.lock();
1459 
1460  if (!link) {
1461  return;
1462  }
1463 
1464  MetaSubmessageVec meta_submessages;
1465  gather_heartbeats_i(meta_submessages);
1466 
1467  if (!preassociation_readers_.empty() || !lagging_readers_.empty()) {
1468  heartbeat_->schedule(fallback_.get());
1469  fallback_.advance();
1470  } else {
1472  }
1473 
1474  g.release();
1475 
1476  link->queue_submessages(meta_submessages);
1477 }
ReaderInfoSet preassociation_readers_
Preassociation readers require a non-final heartbeat.
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
FibonacciSequence< TimeDuration > fallback_
void gather_heartbeats_i(MetaSubmessageVec &meta_submessages)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ send_heartbeats_manual_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_heartbeats_manual_i ( MetaSubmessageVec &  meta_submessages)
private

Definition at line 4381 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::FLAG_L, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::id_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, OpenDDS::DCPS::MetaSubmessage::sm_, and OpenDDS::RTPS::to_rtps_seqnum().

4382 {
4383  using namespace OpenDDS::RTPS;
4384 
4385  RtpsUdpDataLink_rch link = link_.lock();
4386  if (!link) {
4387  return;
4388  }
4389 
4390  const SingleSendBuffer::Proxy proxy(*send_buff_);
4391  const SequenceNumber firstSN = durable_ ? 1 : non_durable_first_sn(proxy);
4392  const SequenceNumber lastSN = max_sn_;
4393  const int counter = ++heartbeat_count_;
4394 
4395  const HeartBeatSubmessage hb = {
4396  {HEARTBEAT,
4398  HEARTBEAT_SZ},
4399  ENTITYID_UNKNOWN, // any matched reader may be interested in this
4400  id_.entityId,
4401  to_rtps_seqnum(firstSN),
4402  to_rtps_seqnum(lastSN),
4403  {counter}
4404  };
4405 
4406  MetaSubmessage meta_submessage(id_, GUID_UNKNOWN);
4407  meta_submessage.sm_.heartbeat_sm(hb);
4408 
4409  meta_submessages.push_back(meta_submessage);
4410 }
WeakRcHandle< RtpsUdpDataLink > link_
const ACE_CDR::UShort HEARTBEAT_SZ
Definition: MessageTypes.h:107
const octet FLAG_E
Definition: RtpsCore.idl:521
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RcHandle< SingleSendBuffer > send_buff_
const octet FLAG_F
Definition: RtpsCore.idl:523
const octet FLAG_L
Definition: RtpsCore.idl:527
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
ACE_CDR::Octet Octet
SequenceNumber non_durable_first_sn(const SingleSendBuffer::Proxy &proxy) const
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59

◆ send_nack_responses()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_nack_responses ( const MonotonicTimePoint now)
private

Definition at line 1480 of file RtpsUdpDataLink.cpp.

References ACE_GUARD, OpenDDS::DCPS::InternalDataReaderListener< NetworkInterfaceAddress >::mutex_, and OpenDDS::DCPS::RtpsUdpDataLink::queue_submessages().

1481 {
1482  RtpsUdpDataLink_rch link = link_.lock();
1483  if (!link) {
1484  return;
1485  }
1486 
1487  MetaSubmessageVec meta_submessages;
1488  {
1490 
1491  if (stopping_) {
1492  return;
1493  }
1494 
1495  gather_nack_replies_i(meta_submessages);
1496  }
1497 
1498  link->queue_submessages(meta_submessages);
1499 }
WeakRcHandle< RtpsUdpDataLink > link_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void gather_nack_replies_i(MetaSubmessageVec &meta_submessages)
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch

◆ snris_erase()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::snris_erase ( RtpsUdpDataLink::SNRIS &  snris,
const SequenceNumber  sn,
const ReaderInfo_rch reader 
)
staticprivate

Definition at line 3786 of file RtpsUdpDataLink.cpp.

3789 {
3790  SNRIS::iterator pos = snris.find(sn);
3791  if (pos != snris.end()) {
3792  pos->second->readers.erase(reader);
3793  if (pos->second->readers.empty()) {
3794  snris.erase(pos);
3795  }
3796  }
3797 }

◆ snris_insert()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::snris_insert ( RtpsUdpDataLink::SNRIS &  snris,
const ReaderInfo_rch reader 
)
staticprivate

Definition at line 3774 of file RtpsUdpDataLink.cpp.

3776 {
3777  const SequenceNumber sn = reader->acked_sn();
3778  SNRIS::iterator pos = snris.find(sn);
3779  if (pos == snris.end()) {
3780  pos = snris.insert(RtpsUdpDataLink::SNRIS::value_type(sn, make_rch<ReaderInfoSetHolder>())).first;
3781  }
3782  pos->second->readers.insert(reader);
3783 }

◆ update_max_sn()

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::update_max_sn ( const GUID_t reader,
SequenceNumber  seq 
)

Definition at line 4495 of file RtpsUdpDataLink.cpp.

References check_leader_lagger(), make_leader_lagger(), max_sn_, and mutex_.

4496 {
4498  SequenceNumber previous_max_sn = max_sn_;
4499  max_sn_ = std::max(max_sn_, seq);
4500  make_leader_lagger(reader, previous_max_sn);
4502  return max_sn_;
4503 }
void make_leader_lagger(const GUID_t &reader, SequenceNumber previous_max_sn)

◆ update_remote_guids_cache_i()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::update_remote_guids_cache_i ( bool  add,
const GUID_t guid 
)
private

Definition at line 4205 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::bundling_cache_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::insert(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::link_, and OpenDDS::DCPS::AddressCache< Key >::remove_id().

4206 {
4207  RtpsUdpDataLink_rch link = link_.lock();
4208  if (!link) {
4209  return;
4210  }
4211 
4212  {
4214 
4215  // We make a new RcHandle to prevent changing what's being pointed to by existing references in the send queue (i.e. to preserve historic values)
4216  RcHandle<ConstSharedRepoIdSet> temp = make_rch<ConstSharedRepoIdSet>();
4217  if (remote_reader_guids_) {
4218  const_cast<RepoIdSet&>(temp->guids_) = remote_reader_guids_->guids_;
4219  }
4220  if (add) {
4221  const_cast<RepoIdSet&>(temp->guids_).insert(guid);
4222  } else {
4223  const_cast<RepoIdSet&>(temp->guids_).erase(guid);
4224  }
4225  remote_reader_guids_ = temp;
4226  }
4227 
4228  link->bundling_cache_.remove_id(GUID_UNKNOWN);
4229 }
WeakRcHandle< RtpsUdpDataLink > link_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
GuidSet RepoIdSet
Definition: GuidUtils.h:113
RcHandle< ConstSharedRepoIdSet > remote_reader_guids_
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
int insert(Container &c, const ValueType &v)
Definition: Util.h:105

◆ update_required_acknack_count()

void OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::update_required_acknack_count ( const GUID_t id,
CORBA::Long  current 
)

Definition at line 2732 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::RtpsReader::mutex_.

2733 {
2735  ReaderInfoMap::iterator ri = remote_readers_.find(id);
2736  if (ri != remote_readers_.end()) {
2737  ri->second->required_acknack_count_ = current;
2738  }
2739 }

Member Data Documentation

◆ client_

WeakRcHandle<TransportClient> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::client_
private

Definition at line 446 of file RtpsUdpDataLink.h.

◆ durable_

const bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_
private

Definition at line 449 of file RtpsUdpDataLink.h.

◆ elems_not_acked_

SnToTqeMap OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_
private

Definition at line 445 of file RtpsUdpDataLink.h.

Referenced by add_elem_awaiting_ack(), and ~RtpsWriter().

◆ elems_not_acked_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_mutex_
mutableprivate

Definition at line 460 of file RtpsUdpDataLink.h.

Referenced by add_elem_awaiting_ack().

◆ fallback_

FibonacciSequence<TimeDuration> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::fallback_
private

Definition at line 466 of file RtpsUdpDataLink.h.

◆ heartbeat_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::heartbeat_
private

Definition at line 462 of file RtpsUdpDataLink.h.

◆ heartbeat_count_

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::heartbeat_count_
private

Definition at line 451 of file RtpsUdpDataLink.h.

Referenced by inc_heartbeat_count().

◆ id_

const GUID_t OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::id_
private

◆ initial_fallback_

const TimeDuration OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::initial_fallback_
private

Definition at line 465 of file RtpsUdpDataLink.h.

◆ is_ps_writer_

const bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_ps_writer_
private

Partcicipant Secure (Reliable SPDP) writer.

Definition at line 456 of file RtpsUdpDataLink.h.

◆ is_pvs_writer_

const bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::is_pvs_writer_
private

Participant Volatile Secure writer.

Definition at line 454 of file RtpsUdpDataLink.h.

◆ lagging_readers_

SNRIS OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::lagging_readers_
private

These readers have not acked everything they are supposed to have acked.

Definition at line 434 of file RtpsUdpDataLink.h.

◆ leading_readers_

SNRIS OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::leading_readers_
private

These reader have acked everything they are supposed to have acked.

Definition at line 436 of file RtpsUdpDataLink.h.

◆ link_

WeakRcHandle<RtpsUdpDataLink> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::link_
private

◆ max_sn_

SequenceNumber OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::max_sn_
private

Definition at line 442 of file RtpsUdpDataLink.h.

Referenced by update_max_sn().

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::mutex_
mutableprivate

◆ nack_response_

RcHandle<SporadicEvent> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::nack_response_
private

Definition at line 463 of file RtpsUdpDataLink.h.

◆ preassociation_reader_start_sns_

SequenceNumberMultiset OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::preassociation_reader_start_sns_
private

Definition at line 431 of file RtpsUdpDataLink.h.

◆ preassociation_readers_

ReaderInfoSet OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::preassociation_readers_
private

Preassociation readers require a non-final heartbeat.

Definition at line 429 of file RtpsUdpDataLink.h.

Referenced by log_remote_counts().

◆ readers_expecting_data_

ReaderInfoSet OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::readers_expecting_data_
private

These readers have sent a nack and are expecting data.

Definition at line 438 of file RtpsUdpDataLink.h.

◆ readers_expecting_heartbeat_

ReaderInfoSet OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::readers_expecting_heartbeat_
private

These readers have sent a non-final ack are are expecting a heartbeat.

Definition at line 440 of file RtpsUdpDataLink.h.

◆ remote_reader_guids_

RcHandle<ConstSharedRepoIdSet> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_reader_guids_
private

Definition at line 427 of file RtpsUdpDataLink.h.

◆ remote_reader_guids_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_reader_guids_mutex_
mutableprivate

Definition at line 459 of file RtpsUdpDataLink.h.

◆ remote_readers_

ReaderInfoMap OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_
private

Definition at line 426 of file RtpsUdpDataLink.h.

Referenced by is_leading(), and log_remote_counts().

◆ send_buff_

RcHandle<SingleSendBuffer> OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_
private

Definition at line 441 of file RtpsUdpDataLink.h.

Referenced by RtpsWriter().

◆ stopping_

bool OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::stopping_
private

The documentation for this class was generated from the following files: