OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::WriteDataContainer Class Reference

A container for instances sample data. More...

#include <WriteDataContainer.h>

Inheritance diagram for OpenDDS::DCPS::WriteDataContainer:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::WriteDataContainer:
Collaboration graph
[legend]

Public Member Functions

 WriteDataContainer (DataWriterImpl *writer, CORBA::Long max_samples_per_instance, CORBA::Long history_depth, CORBA::Long max_durable_per_instance, DDS::Duration_t max_blocking_time, size_t n_chunks, DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataDurabilityCache *durability_cache, DDS::DurabilityServiceQosPolicy const &durability_service, CORBA::Long max_instances, CORBA::Long max_total_samples, ACE_Recursive_Thread_Mutex &deadline_status_lock, DDS::OfferedDeadlineMissedStatus &deadline_status, CORBA::Long &deadline_last_total_count)
 
 ~WriteDataContainer ()
 
DDS::ReturnCode_t enqueue_control (DataSampleElement *control_sample)
 
DDS::ReturnCode_t enqueue (DataSampleElement *sample, DDS::InstanceHandle_t instance)
 
DDS::ReturnCode_t reenqueue_all (const GUID_t &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params)
 
DDS::ReturnCode_t register_instance (DDS::InstanceHandle_t &instance_handle, Message_Block_Ptr &registered_sample)
 
DDS::ReturnCode_t unregister (DDS::InstanceHandle_t handle, Message_Block_Ptr &registered_sample, bool dup_registered_sample=true)
 
DDS::ReturnCode_t dispose (DDS::InstanceHandle_t handle, Message_Block_Ptr &registered_sample, bool dup_registered_sample=true)
 
DDS::ReturnCode_t num_samples (DDS::InstanceHandle_t handle, size_t &size)
 
size_t num_all_samples ()
 
ACE_UINT64 get_unsent_data (SendStateDataSampleList &list)
 
SendStateDataSampleList get_resend_data ()
 
void data_delivered (const DataSampleElement *sample)
 
void data_dropped (const DataSampleElement *element, bool dropped_by_transport)
 
DDS::ReturnCode_t obtain_buffer_for_control (DataSampleElement *&element)
 
DDS::ReturnCode_t obtain_buffer (DataSampleElement *&element, DDS::InstanceHandle_t handle)
 
void release_buffer (DataSampleElement *element)
 
void unregister_all ()
 
PublicationInstance_rch get_handle_instance (DDS::InstanceHandle_t handle)
 
bool persist_data ()
 
void wait_pending (const MonotonicTimePoint &deadline)
 
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
 
void get_instance_handles (InstanceHandleVec &instance_handles)
 
DDS::ReturnCode_t wait_ack_of_seq (const MonotonicTimePoint &abs_deadline, bool deadline_is_infinite, const SequenceNumber &sequence)
 
bool sequence_acknowledged (const SequenceNumber &sequence)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Types

typedef ConditionVariable< ACE_Recursive_Thread_MutexConditionVariableType
 
typedef ConditionVariable< ACE_Thread_MutexWfaConditionVariableType
 

Private Member Functions

DDS::ReturnCode_t remove_instance (PublicationInstance_rch instance, Message_Block_Ptr &registered_sample, bool dup_registered_sample)
 
 WriteDataContainer (WriteDataContainer const &)
 
WriteDataContaineroperator= (WriteDataContainer const &)
 
bool pending_data ()
 
void copy_and_prepend (SendStateDataSampleList &list, const SendStateDataSampleList &appended, const GUID_t &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params, ssize_t &max_resend_samples)
 
void remove_excess_durable ()
 
DDS::ReturnCode_t remove_oldest_sample (InstanceDataSampleList &instance_list, bool &released)
 
void wakeup_blocking_writers (DataSampleElement *stale)
 
void add_reader_acks (const GUID_t &reader, const SequenceNumber &base)
 
void remove_reader_acks (const GUID_t &reader)
 
void log_send_state_lists (OPENDDS_STRING description)
 
typedef OPENDDS_MAP_CMP (GUID_t, DisjointSequence, GUID_tKeyLessThan) AckedSequenceMap
 
SequenceNumber get_cumulative_ack ()
 
SequenceNumber get_last_ack ()
 
void update_acked (const SequenceNumber &seq, const GUID_t &id=GUID_UNKNOWN)
 
bool sequence_acknowledged_i (const SequenceNumber &sequence)
 
typedef OPENDDS_MULTIMAP (MonotonicTimePoint, PublicationInstance_rch) DeadlineMapType
 
void set_deadline_period (const TimeDuration &deadline_period)
 
void process_deadlines (const MonotonicTimePoint &now)
 
void extend_deadline (const PublicationInstance_rch &instance)
 
void cancel_deadline (const PublicationInstance_rch &instance)
 

Private Attributes

AckedSequenceMap acked_sequences_
 
SequenceNumber cached_cumulative_ack_
 
bool cached_cumulative_ack_valid_
 
SendStateDataSampleList unsent_data_
 List of data that has not been sent yet. More...
 
ACE_UINT64 transaction_id_
 
SendStateDataSampleList sending_data_
 List of data that is currently being sent. More...
 
SendStateDataSampleList sent_data_
 List of data that has already been sent. More...
 
SendStateDataSampleList orphaned_to_transport_
 
WriterDataSampleList data_holder_
 
SendStateDataSampleList resend_data_
 
PublicationInstanceMapType instances_
 The individual instance queue threads in the data. More...
 
GUID_t publication_id_
 The publication Id from repo. More...
 
DataWriterImplwriter_
 The writer that owns this container. More...
 
CORBA::Long max_samples_per_instance_
 
CORBA::Long history_depth_
 
CORBA::Long max_durable_per_instance_
 
CORBA::Long max_num_instances_
 
CORBA::Long max_num_samples_
 
DDS::Duration_t max_blocking_time_
 
bool waiting_on_release_
 The block waiting flag. More...
 
ACE_Recursive_Thread_Mutex lock_
 
ConditionVariableType condition_
 
ConditionVariableType empty_condition_
 
ACE_Thread_Mutex wfa_lock_
 Lock used for wait_for_acks() processing. More...
 
WfaConditionVariableType wfa_condition_
 Used to block in wait_for_acks(). More...
 
size_t n_chunks_
 
DataSampleElementAllocator sample_list_element_allocator_
 
bool shutdown_
 The flag indicates the datawriter will be destroyed. More...
 
DDS::DomainId_t const domain_id_
 Domain ID. More...
 
char const *const topic_name_
 Topic name. More...
 
char const *const type_name_
 Type name. More...
 
DataDurabilityCache *const durability_cache_
 Pointer to the data durability cache. More...
 
DDS::DurabilityServiceQosPolicy const & durability_service_
 DURABILITY_SERVICE QoS specific to the DataWriter. More...
 
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
 Timer responsible for reporting missed offered deadlines. More...
 
TimeDuration deadline_period_
 
DeadlineMapType deadline_map_
 
ACE_Recursive_Thread_Mutexdeadline_status_lock_
 Lock for synchronization of status_ member. More...
 
DDS::OfferedDeadlineMissedStatusdeadline_status_
 Reference to the missed requested deadline status structure. More...
 
CORBA::Longdeadline_last_total_count_
 Last total_count when status was last checked. More...
 

Friends

class DataWriterImpl
 
class ::DDS_TEST
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Detailed Description

A container for instances sample data.

This container is instantiated per DataWriter. It maintains list of PublicationInstance objects which is internally referenced by the instance handle.

This container contains threaded lists of all data written to a given DataWriter. The real data sample is represented by the DataSampleElement. The data_holder_ holds all DataSampleElement in the writing order via the next_writer_sample_/previous_writer_sample_ thread. The instance list in PublicationInstance links samples via the next_instance_sample_ thread.

There are three state transition lists used during write operations: unsent, sending, and sent. These lists are linked via the next_send_sample_/previous_send_sample_ thread. Any DataSampleElement should be in one of these three lists and SHOULD NOT be shared between these three lists. A normal transition of a DataSampleElement would be unsent->sending->sent. A DataSampleElement transitions from unsent to sent naturally during a write operation when get_unsent_data is called. A DataSampleElement transitions from sending to sent when data_delivered is called notifying the container that the transport is finished with the sample and it can be marked as a historical sample. A DataSampleElement may transition back to unsent from sending if the transport notifies that the sample was dropped and should be resent. A DataSampleElement is removed from sent list and freed when the instance queue or container (for another instance) needs more space. A DataSampleElement may be removed and freed directly from sending when and unreliable writer needs space. In this case the transport will be notified that the sample should be removed. A DataSampleElement may also be removed directly from unsent if an unreliable writer needs space for new samples. Note: The real data sample will be freed when the reference counting goes 0. The resend list is only used when the datawriter uses TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements for the data sample duplicates of the sending and sent list and hands this list off to the transport.

Note
: 1) The PublicationInstance object is not removed from this container until the instance is unregistered. The same instance handle is reused for re-registration. The instance data is deleted when this container is deleted. This would simplify instance data memory management. An alternative way is to remove the handle from the instance list when unregister occurs and delete the instance data after the transport is done with the instance, but we do not have a way to know when the transport is done since we have the sending list for all instances in the same datawriter. 2) This container has the ownership of the instance data samples when the data is written. The type-specific datawriter that allocates the memory for the sample data gives the ownership to its base class. 3) It is the responsibility of the owner of objects of this class to ensure that access to the lists are properly locked. This means using the same lock/condition to access the lists via the enqueue(), get*(), and data_delivered() methods. For the case where these are all called from the same (client) thread, this should be a recursive lock so that: 1) we do not deadlock; and, 2) we incur the cost of obtaining the lock only once.

Definition at line 122 of file WriteDataContainer.h.

Member Typedef Documentation

◆ ConditionVariableType

Definition at line 512 of file WriteDataContainer.h.

◆ WfaConditionVariableType

Definition at line 519 of file WriteDataContainer.h.

Constructor & Destructor Documentation

◆ WriteDataContainer() [1/2]

OpenDDS::DCPS::WriteDataContainer::WriteDataContainer ( DataWriterImpl writer,
CORBA::Long  max_samples_per_instance,
CORBA::Long  history_depth,
CORBA::Long  max_durable_per_instance,
DDS::Duration_t  max_blocking_time,
size_t  n_chunks,
DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
DataDurabilityCache durability_cache,
DDS::DurabilityServiceQosPolicy const &  durability_service,
CORBA::Long  max_instances,
CORBA::Long  max_total_samples,
ACE_Recursive_Thread_Mutex deadline_status_lock,
DDS::OfferedDeadlineMissedStatus deadline_status,
CORBA::Long deadline_last_total_count 
)

No default constructor, must be initialized.

Parameters
writerThe writer which owns this container.
max_samples_per_instanceMax samples kept within each instance
max_durable_per_instanceMax durable samples sent for each instance
max_blocking_timeThe timeout for write.
n_chunksThe number of chunks that the DataSampleElementAllocator needs allocate.
domain_idDomain ID.
topic_nameTopic name.
type_nameType name.
durability_cacheThe data durability cache for unsent data.
durability_serviceDURABILITY_SERVICE QoS specific to the DataWriter.
max_instancesmaximum number of instances, 0 for unlimited
max_total_samplesmaximum total number of samples, 0 for unlimited

Definition at line 68 of file WriteDataContainer.cpp.

References ACE_DEBUG, acked_sequences_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::GUID_UNKNOWN, LM_DEBUG, n_chunks_, sample_list_element_allocator_, and OpenDDS::DCPS::SequenceNumber::ZERO().

88  , transaction_id_(0)
90  , writer_(writer)
91  , max_samples_per_instance_(max_samples_per_instance)
92  , history_depth_(history_depth)
93  , max_durable_per_instance_(max_durable_per_instance)
94  , max_num_instances_(max_instances)
95  , max_num_samples_(max_total_samples)
96  , max_blocking_time_(max_blocking_time)
97  , waiting_on_release_(false)
98  , condition_(lock_)
101  , n_chunks_(n_chunks)
103  , shutdown_(false)
104  , domain_id_(domain_id)
105  , topic_name_(topic_name)
106  , type_name_(type_name)
107 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
108  , durability_cache_(durability_cache)
109  , durability_service_(durability_service)
110 #endif
111  , deadline_task_(DCPS::make_rch<DCPS::PmfSporadicTask<WriteDataContainer> >(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &WriteDataContainer::process_deadlines))
113  , deadline_status_lock_(deadline_status_lock)
114  , deadline_status_(deadline_status)
115  , deadline_last_total_count_(deadline_last_total_count)
116 {
117  if (DCPS_debug_level >= 2) {
118  ACE_DEBUG((LM_DEBUG,
119  "(%P|%t) WriteDataContainer "
120  "sample_list_element_allocator %x with %d chunks\n",
122  }
124 }
#define ACE_DEBUG(X)
char const *const topic_name_
Topic name.
DataSampleElementAllocator sample_list_element_allocator_
ConditionVariableType empty_condition_
static const TimeDuration max_value
Definition: TimeDuration.h:32
DDS::DurabilityServiceQosPolicy const & durability_service_
DURABILITY_SERVICE QoS specific to the DataWriter.
bool shutdown_
The flag indicates the datawriter will be destroyed.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Recursive_Thread_Mutex & deadline_status_lock_
Lock for synchronization of status_ member.
ACE_Recursive_Thread_Mutex lock_
char const *const type_name_
Type name.
bool waiting_on_release_
The block waiting flag.
static SequenceNumber ZERO()
DDS::DomainId_t const domain_id_
Domain ID.
DataWriterImpl * writer_
The writer that owns this container.
GUID_t publication_id_
The publication Id from repo.
WfaConditionVariableType wfa_condition_
Used to block in wait_for_acks().
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::OfferedDeadlineMissedStatus & deadline_status_
Reference to the missed requested deadline status structure.
CORBA::Long & deadline_last_total_count_
Last total_count when status was last checked.
void process_deadlines(const MonotonicTimePoint &now)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.
#define TheServiceParticipant
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
DataDurabilityCache *const durability_cache_
Pointer to the data durability cache.

◆ ~WriteDataContainer()

OpenDDS::DCPS::WriteDataContainer::~WriteDataContainer ( )

Definition at line 126 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, deadline_task_, OpenDDS::DCPS::SendStateDataSampleList::dequeue_head(), OpenDDS::DCPS::TransportRegistry::instance(), LM_DEBUG, LM_ERROR, LM_WARNING, orphaned_to_transport_, release_buffer(), OpenDDS::DCPS::TransportRegistry::released(), sending_data_, sent_data_, shutdown_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.

127 {
128  deadline_task_->cancel();
129 
130  if (this->unsent_data_.size() > 0) {
131  ACE_DEBUG((LM_WARNING,
132  ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
133  ACE_TEXT("destroyed with %d samples unsent.\n"),
134  this->unsent_data_.size()));
135  }
136 
137  if (this->sending_data_.size() > 0) {
139  for (DataSampleElement* e; sending_data_.dequeue_head(e);) {
140  release_buffer(e);
141  }
142  }
144  ACE_DEBUG((LM_WARNING,
145  ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
146  ACE_TEXT("destroyed with %d samples sending.\n"),
147  this->sending_data_.size()));
148  }
149  }
150 
151  if (this->sent_data_.size() > 0) {
152  ACE_DEBUG((LM_DEBUG,
153  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
154  ACE_TEXT("destroyed with %d samples sent.\n"),
155  this->sent_data_.size()));
156  }
157 
158  if (this->orphaned_to_transport_.size() > 0) {
159  if (DCPS_debug_level > 0) {
160  ACE_DEBUG((LM_DEBUG,
161  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
162  ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
163  this->orphaned_to_transport_.size()));
164  }
165  }
166 
167  if (!shutdown_) {
168  ACE_ERROR((LM_ERROR,
169  ACE_TEXT("(%P|%t) ERROR: ")
170  ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
171  ACE_TEXT("The container has not been cleaned.\n")));
172  }
173 }
#define ACE_DEBUG(X)
bool dequeue_head(DataSampleElement *&stale)
#define ACE_ERROR(X)
SendStateDataSampleList orphaned_to_transport_
SendStateDataSampleList sent_data_
List of data that has already been sent.
bool shutdown_
The flag indicates the datawriter will be destroyed.
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
static TransportRegistry * instance()
Return a singleton instance of this class.
SendStateDataSampleList sending_data_
List of data that is currently being sent.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
void release_buffer(DataSampleElement *element)
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.

◆ WriteDataContainer() [2/2]

OpenDDS::DCPS::WriteDataContainer::WriteDataContainer ( WriteDataContainer const &  )
private

Member Function Documentation

◆ add_reader_acks()

void OpenDDS::DCPS::WriteDataContainer::add_reader_acks ( const GUID_t reader,
const SequenceNumber base 
)
private

Definition at line 176 of file WriteDataContainer.cpp.

References acked_sequences_, cached_cumulative_ack_valid_, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::DisjointSequence::reset(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), wfa_lock_, and OpenDDS::DCPS::SequenceNumber::ZERO().

177 {
179 
180  DisjointSequence& ds = acked_sequences_[reader];
181  ds.reset();
183  ds.insert(SequenceNumber::ZERO());
184  } else {
185  ds.insert(SequenceRange(SequenceNumber(), base));
186  }
188 }
static SequenceNumber ZERO()
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ cancel_deadline()

void OpenDDS::DCPS::WriteDataContainer::cancel_deadline ( const PublicationInstance_rch instance)
private

Definition at line 1737 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::PublicationInstance::deadline_, deadline_map_, deadline_period_, deadline_task_, OpenDDS::DCPS::TimeDuration::max_value, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

Referenced by remove_instance().

1738 {
1739  // Call comes from DataWriterImpl_t which should arleady have the lock_.
1740 
1742  return;
1743  }
1744 
1745  std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r = deadline_map_.equal_range(instance->deadline_);
1746  while (r.first != r.second && r.first->second != instance) {
1747  ++r.first;
1748  }
1749  if (r.first != r.second) {
1750  deadline_map_.erase(r.first);
1751  if (deadline_map_.empty()) {
1752  deadline_task_->cancel();
1753  }
1754  }
1755 }
static const TimeDuration max_value
Definition: TimeDuration.h:32
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.

◆ copy_and_prepend()

void OpenDDS::DCPS::WriteDataContainer::copy_and_prepend ( SendStateDataSampleList list,
const SendStateDataSampleList appended,
const GUID_t reader_id,
const DDS::LifespanQosPolicy lifespan,
const OPENDDS_STRING filterClassName,
const FilterEvaluator eval,
const DDS::StringSeq params,
ssize_t max_resend_samples 
)
private

Definition at line 1370 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_NEW_MALLOC, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::PublicationInstance::durable_samples_remaining_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_head(), OpenDDS::DCPS::DataWriterImpl::filter_out(), LM_DEBUG, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::SendStateDataSampleList::rbegin(), OpenDDS::DCPS::SendStateDataSampleList::rend(), OpenDDS::DCPS::resend_data_expired(), sample_list_element_allocator_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and writer_.

Referenced by reenqueue_all().

1380 {
1381  for (SendStateDataSampleList::const_reverse_iterator cur = appended.rbegin();
1382  cur != appended.rend() && max_resend_samples; ++cur) {
1383 
1384  if (resend_data_expired(*cur, lifespan))
1385  continue;
1386 
1387 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1388  if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
1389  continue;
1390 #endif
1391 
1392  PublicationInstance_rch inst = cur->get_handle();
1393 
1394  if (!inst) {
1395  // *cur is a control message, just skip it
1396  continue;
1397  }
1398 
1399  if (inst->durable_samples_remaining_ == 0)
1400  continue;
1402 
1403  DataSampleElement* element = 0;
1404  ACE_NEW_MALLOC(element,
1405  static_cast<DataSampleElement*>(
1407  sizeof(DataSampleElement))),
1408  DataSampleElement(*cur));
1409 
1410  element->set_num_subs(1);
1411  element->set_sub_id(0, reader_id);
1412 
1413  if (DCPS_debug_level > 9) {
1414  ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::copy_and_prepend added seq# %q\n",
1415  cur->get_header().sequence_.getValue()));
1416  }
1417 
1418  list.enqueue_head(element);
1419  --max_resend_samples;
1420  }
1421 }
#define ACE_DEBUG(X)
DataSampleElementAllocator sample_list_element_allocator_
ssize_t durable_samples_remaining_
Only used by WriteDataContainer::reenqueue_all() while WDC is locked.
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
bool resend_data_expired(const DataSampleElement &element, const DDS::LifespanQosPolicy &lifespan)
DataWriterImpl * writer_
The writer that owns this container.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
std::reverse_iterator< const_iterator > const_reverse_iterator
RcHandle< PublicationInstance > PublicationInstance_rch

◆ data_delivered()

void OpenDDS::DCPS::WriteDataContainer::data_delivered ( const DataSampleElement sample)

Acknowledge the delivery of data. The sample that resides in this container will be moved from sending_data_ list to the internal sent_data_ list. If there are any threads waiting for available space, it wakes up these threads.

Definition at line 651 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_SYNCH_MUTEX, ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::controlTracker, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::InstanceDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), get_cumulative_ack(), OpenDDS::DCPS::DataSampleElement::get_handle(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::DataSampleElement::get_sub_id(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, LM_DEBUG, LM_WARNING, lock_, max_durable_per_instance_, OpenDDS::DCPS::MessageTracker::message_delivered(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::InstanceDataSampleList::on_some_list(), orphaned_to_transport_, pending_data(), publication_id_, ACE_Guard< ACE_LOCK >::release(), release_buffer(), OpenDDS::DCPS::SendStateDataSampleList::remove(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::PublicationInstance::samples_, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::set_flag(), shutdown_, topic_name_, unsent_data_, update_acked(), wakeup_blocking_writers(), wfa_condition_, wfa_lock_, and writer_.

Referenced by data_dropped().

652 {
653  DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
654 
655  if (DCPS_debug_level >= 2) {
656  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
657  ACE_TEXT(" %@\n"), sample));
658  }
659 
661  guard,
662  lock_);
663 
664  // Delivered samples _must_ be on sending_data_ list
665 
666  // If it is not found in one of the lists, an invariant
667  // exception is declared.
668 
669  // The element now needs to be removed from the sending_data_
670  // list, and appended to the end of the sent_data_ list here
671 
672  DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
673 
674  // If sample is on a SendStateDataSampleList it should be on the
675  // sending_data_ list signifying it was given to the transport to
676  // deliver and now the transport is signaling it has been delivered
677  if (!sending_data_.dequeue(sample)) {
678  //
679  // Should be on sending_data_. If it is in sent_data_
680  // or unsent_data there was a problem.
681  //
682  SendStateDataSampleList* send_lists[] = {
683  &sent_data_,
684  &unsent_data_,
686  const SendStateDataSampleList* containing_list =
688 
689  if (containing_list == &sent_data_) {
690  ACE_ERROR((LM_WARNING,
691  ACE_TEXT("(%P|%t) WARNING: ")
692  ACE_TEXT("WriteDataContainer::data_delivered, ")
693  ACE_TEXT("The delivered sample is not in sending_data_ and ")
694  ACE_TEXT("WAS IN sent_data_.\n")));
695  } else if (containing_list == &unsent_data_) {
696  ACE_ERROR((LM_WARNING,
697  ACE_TEXT("(%P|%t) WARNING: ")
698  ACE_TEXT("WriteDataContainer::data_delivered, ")
699  ACE_TEXT("The delivered sample is not in sending_data_ and ")
700  ACE_TEXT("WAS IN unsent_data_ list.\n")));
701  } else {
702 
703  //No-op: elements may be removed from all WriteDataContainer lists during shutdown
704  //and inform transport of their release. Transport will call data-delivered on the
705  //elements as it processes the removal but they will already be gone from the send lists.
706  if (stale->get_header().message_id_ != SAMPLE_DATA) {
707  //this message was a control message so release it
708  if (DCPS_debug_level > 9) {
709  ACE_DEBUG((LM_DEBUG,
710  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
711  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
712  domain_id_,
713  topic_name_,
714  LogGuid(publication_id_).c_str()));
715  }
717  }
718 
719  if (containing_list == &orphaned_to_transport_) {
720  orphaned_to_transport_.dequeue(sample);
721  release_buffer(stale);
722 
723  } else if (!containing_list) {
724  // samples that were retrieved from get_resend_data()
726  const CORBA::ULong num_subs = stale->get_num_subs();
727  for (CORBA::ULong i = 0; i < num_subs; ++i) {
728  update_acked(stale->get_header().sequence_, stale->get_sub_id(i));
729  }
730  wfa_guard.release();
732  release_buffer(stale);
733  }
734 
735  if (!pending_data()) {
737  }
738  }
739 
740  return;
741  }
742  ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, wfa_lock_);
743  SequenceNumber acked_seq = stale->get_header().sequence_;
744  SequenceNumber prev_max = get_cumulative_ack();
745 
746  if (stale->get_header().message_id_ != SAMPLE_DATA) {
747  //this message was a control message so release it
748  if (DCPS_debug_level > 9) {
749  ACE_DEBUG((LM_DEBUG,
750  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
751  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
752  domain_id_,
753  topic_name_,
754  LogGuid(publication_id_).c_str()));
755  }
756  release_buffer(stale);
757  stale = 0;
759  } else {
760 
762  const_cast<DataSampleElement*>(sample)->get_header().historic_sample_ = true;
763  DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
764  sent_data_.enqueue_tail(sample);
765 
766  } else {
768  PublicationInstance_rch inst = sample->get_handle();
769  inst->samples_.dequeue(sample);
770  }
771  release_buffer(stale);
772  stale = 0;
773  }
774 
775  if (DCPS_debug_level > 9) {
776  ACE_DEBUG((LM_DEBUG,
777  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
778  ACE_TEXT("domain %d topic %C publication %C seq# %q %s.\n"),
779  domain_id_,
780  topic_name_,
781  LogGuid(publication_id_).c_str(),
782  acked_seq.getValue(),
784  ? ACE_TEXT("stored for durability")
785  : ACE_TEXT("released")));
786  }
787 
789  }
790  if (DCPS_debug_level > 9) {
791  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
792  ACE_TEXT("Inserting acked_sequence: %q\n"),
793  acked_seq.getValue()));
794  }
795 
796  update_acked(acked_seq);
797 
798  if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
799  prev_max < get_cumulative_ack()) {
800 
801  if (DCPS_debug_level > 9) {
802  ACE_DEBUG((LM_DEBUG,
803  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
804  ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
805  }
806 
808  }
809 
810  // Signal if there is no pending data.
811  if (!pending_data()) {
813  }
814 }
static bool on_some_list(const DataSampleElement *iter)
#define ACE_DEBUG(X)
static void set_flag(DataSampleHeaderFlag flag, ACE_Message_Block *buffer)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
char const *const topic_name_
Topic name.
SendStateDataSampleList orphaned_to_transport_
void wakeup_blocking_writers(DataSampleElement *stale)
ConditionVariableType empty_condition_
SendStateDataSampleList sent_data_
List of data that has already been sent.
bool shutdown_
The flag indicates the datawriter will be destroyed.
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
bool notify_all()
Unblock all of the threads waiting on this condition.
ACE_Recursive_Thread_Mutex lock_
static void remove(DataSampleElement *stale)
bool dequeue(const DataSampleElement *stale)
ACE_CDR::ULong ULong
SendStateDataSampleList sending_data_
List of data that is currently being sent.
DDS::DomainId_t const domain_id_
Domain ID.
DataWriterImpl * writer_
The writer that owns this container.
static const SendStateDataSampleList * send_list_containing_element(const DataSampleElement *element, SendStateDataSampleList **begin, SendStateDataSampleList **end)
GUID_t publication_id_
The publication Id from repo.
WfaConditionVariableType wfa_condition_
Used to block in wait_for_acks().
bool dequeue(const DataSampleElement *stale)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
InstanceDataSampleList samples_
History of the instance samples.
ACE_TEXT("TCP_Factory")
void update_acked(const SequenceNumber &seq, const GUID_t &id=GUID_UNKNOWN)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
void enqueue_tail(const DataSampleElement *element)
void release_buffer(DataSampleElement *element)
RcHandle< PublicationInstance > PublicationInstance_rch

◆ data_dropped()

void OpenDDS::DCPS::WriteDataContainer::data_dropped ( const DataSampleElement element,
bool  dropped_by_transport 
)

This method is called by the transport to notify the sample is dropped. Which the transport was told to do by the publication code by calling TransportClient::remove_sample(). If the sample was "sending" then it is moved to the "unsent" list. If there are any threads waiting for available space then it needs wake up these threads. The dropped_by_transport flag true indicates the dropping initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample().

Definition at line 817 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::controlTracker, data_delivered(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), LM_DEBUG, LM_WARNING, lock_, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::InstanceDataSampleList::on_some_list(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::SendStateDataSampleList::remove(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, shutdown_, topic_name_, unsent_data_, wakeup_blocking_writers(), and writer_.

819 {
820  DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
821 
822  if (DCPS_debug_level >= 2) {
823  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
824  ACE_TEXT(" sample %X dropped_by_transport %d\n"),
825  sample, dropped_by_transport));
826  }
827 
828  // If the transport initiates the data dropping, we need do same thing
829  // as data_delivered. e.g. remove the sample from the internal list
830  // and the instance list. We do not need acquire the lock here since
831  // the data_delivered acquires the lock.
832  if (dropped_by_transport) {
833  data_delivered(sample);
834  return;
835  }
836 
837  //The data_dropped could be called from the thread initiating sample remove
838  //which already hold the lock. In this case, it's not necessary to acquire
839  //lock here. It also could be called from the transport thread in a delayed
840  //notification, it's necessary to acquire lock here to protect the internal
841  //structures in this class.
842 
844  guard,
845  lock_);
846 
847  // The dropped sample should be in the sending_data_ list.
848  // Otherwise an exception will be raised.
849  //
850  // We are now been notified by transport, so we can
851  // keep the sample from the sending_data_ list still in
852  // sample list since we will send it.
853 
854  DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
855 
856  // If sample is on a SendStateDataSampleList it should be on the
857  // sending_data_ list signifying it was given to the transport to
858  // deliver and now the transport is signaling it has been dropped
859 
860  if (sending_data_.dequeue(sample)) {
861  // else: The data_dropped is called as a result of remove_sample()
862  // called from reenqueue_all() which supports the TRANSIENT_LOCAL
863  // qos. The samples that are sending by transport are dropped from
864  // transport and will be moved to the unsent list for resend.
866  unsent_data_.enqueue_tail(sample);
867  } else {
869  release_buffer(stale);
870  stale = 0;
871  }
872 
873  } else {
874  //
875  // If it is in sent_data_ or unsent_data there was a problem.
876  //
877  SendStateDataSampleList* send_lists[] = {
878  &sent_data_,
879  &unsent_data_,
881  const SendStateDataSampleList* containing_list =
883 
884  if (containing_list == &sent_data_) {
885  ACE_ERROR((LM_WARNING,
886  ACE_TEXT("(%P|%t) WARNING: ")
887  ACE_TEXT("WriteDataContainer::data_dropped, ")
888  ACE_TEXT("The dropped sample is not in sending_data_ and ")
889  ACE_TEXT("WAS IN sent_data_.\n")));
890  } else if (containing_list == &unsent_data_) {
891  ACE_ERROR((LM_WARNING,
892  ACE_TEXT("(%P|%t) WARNING: ")
893  ACE_TEXT("WriteDataContainer::data_dropped, ")
894  ACE_TEXT("The dropped sample is not in sending_data_ and ")
895  ACE_TEXT("WAS IN unsent_data_ list.\n")));
896  } else {
897 
898  //No-op: elements may be removed from all WriteDataContainer lists during shutdown
899  //and inform transport of their release. Transport will call data-dropped on the
900  //elements as it processes the removal but they will already be gone from the send lists.
901  if (stale->get_header().message_id_ != SAMPLE_DATA) {
902  //this message was a control message so release it
903  if (DCPS_debug_level > 9) {
904  ACE_DEBUG((LM_DEBUG,
905  ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
906  ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
907  domain_id_,
908  topic_name_,
909  LogGuid(publication_id_).c_str()));
910  }
912  }
913 
914  if (containing_list == &orphaned_to_transport_) {
915  orphaned_to_transport_.dequeue(sample);
916  release_buffer(stale);
917  stale = 0;
918  if (!pending_data()) {
920  }
921 
922  } else if (!containing_list) {
923  // samples that were retrieved from get_resend_data()
925  release_buffer(stale);
926  stale = 0;
927  }
928  }
929 
930  return;
931  }
932 
934 
935  if (!pending_data()) {
937  }
938 }
static bool on_some_list(const DataSampleElement *iter)
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
char const *const topic_name_
Topic name.
SendStateDataSampleList orphaned_to_transport_
void wakeup_blocking_writers(DataSampleElement *stale)
ConditionVariableType empty_condition_
SendStateDataSampleList sent_data_
List of data that has already been sent.
bool shutdown_
The flag indicates the datawriter will be destroyed.
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
bool notify_all()
Unblock all of the threads waiting on this condition.
ACE_Recursive_Thread_Mutex lock_
static void remove(DataSampleElement *stale)
bool dequeue(const DataSampleElement *stale)
SendStateDataSampleList sending_data_
List of data that is currently being sent.
void data_delivered(const DataSampleElement *sample)
DDS::DomainId_t const domain_id_
Domain ID.
DataWriterImpl * writer_
The writer that owns this container.
static const SendStateDataSampleList * send_list_containing_element(const DataSampleElement *element, SendStateDataSampleList **begin, SendStateDataSampleList **end)
GUID_t publication_id_
The publication Id from repo.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void enqueue_tail(const DataSampleElement *element)
void release_buffer(DataSampleElement *element)

◆ dispose()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::dispose ( DDS::InstanceHandle_t  handle,
Message_Block_Ptr registered_sample,
bool  dup_registered_sample = true 
)

Delete the samples for the provided instance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.

This method returns error if the instance is not registered.

Definition at line 482 of file WriteDataContainer.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::find(), instances_, LM_ERROR, lock_, remove_instance(), DDS::RETCODE_ERROR, and DDS::RETCODE_PRECONDITION_NOT_MET.

485 {
487  guard,
488  lock_,
490 
491  PublicationInstance_rch instance;
492 
493  int const find_attempt = find(instances_, instance_handle, instance);
494 
495  if (0 != find_attempt) {
496  ACE_ERROR_RETURN((LM_ERROR,
497  ACE_TEXT("(%P|%t) ERROR: ")
498  ACE_TEXT("WriteDataContainer::dispose, ")
499  ACE_TEXT("The instance(handle=%X) ")
500  ACE_TEXT("is not registered yet.\n"),
501  instance_handle),
503  }
504 
505  return remove_instance(instance, registered_sample, dup_registered_sample);
506 }
ACE_Recursive_Thread_Mutex lock_
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
ACE_TEXT("TCP_Factory")
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
DDS::ReturnCode_t remove_instance(PublicationInstance_rch instance, Message_Block_Ptr &registered_sample, bool dup_registered_sample)
#define ACE_ERROR_RETURN(X, Y)
RcHandle< PublicationInstance > PublicationInstance_rch

◆ enqueue()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue ( DataSampleElement sample,
DDS::InstanceHandle_t  instance 
)

Enqueue the data sample in its instance thread. This method assumes there is an available space for the sample in the instance list.

Definition at line 294 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::InstanceDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), extend_deadline(), get_handle_instance(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, shutdown_, and unsent_data_.

297 {
298  if (shutdown_) {
299  return DDS::RETCODE_ERROR;
300  }
301 
302  // Get the PublicationInstance pointer from InstanceHandle_t.
303  PublicationInstance_rch instance =
304  get_handle_instance(instance_handle);
305  // Extract the instance queue.
306  InstanceDataSampleList& instance_list = instance->samples_;
307 
308  extend_deadline(instance);
309 
310  //
311  // Enqueue to the next_send_sample_ thread of unsent_data_
312  // will link samples with the next_sample/previous_sample and
313  // also next_send_sample_.
314  // This would save time when we actually send the data.
315 
316  unsent_data_.enqueue_tail(sample);
317 
318  //
319  // Add this sample to the INSTANCE scope list.
320  instance_list.enqueue_tail(sample);
321 
322  return DDS::RETCODE_OK;
323 }
PublicationInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
const ReturnCode_t RETCODE_OK
bool shutdown_
The flag indicates the datawriter will be destroyed.
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
void extend_deadline(const PublicationInstance_rch &instance)
const ReturnCode_t RETCODE_ERROR
void enqueue_tail(const DataSampleElement *element)
RcHandle< PublicationInstance > PublicationInstance_rch

◆ enqueue_control()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue_control ( DataSampleElement control_sample)

Definition at line 276 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, shutdown_, and unsent_data_.

277 {
278  // Enqueue to the next_send_sample_ thread of unsent_data_
279  // will link samples with the next_sample/previous_sample and
280  // also next_send_sample_.
281  // This would save time when we actually send the data.
282 
283  if (shutdown_) {
284  return DDS::RETCODE_ERROR;
285  }
286 
287  unsent_data_.enqueue_tail(control_sample);
288 
289  return DDS::RETCODE_OK;
290 }
const ReturnCode_t RETCODE_OK
bool shutdown_
The flag indicates the datawriter will be destroyed.
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
const ReturnCode_t RETCODE_ERROR
void enqueue_tail(const DataSampleElement *element)

◆ extend_deadline()

void OpenDDS::DCPS::WriteDataContainer::extend_deadline ( const PublicationInstance_rch instance)
private

Definition at line 1712 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::PublicationInstance::deadline_, deadline_map_, deadline_period_, deadline_task_, OpenDDS::DCPS::TimeDuration::max_value, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now().

Referenced by enqueue(), and register_instance().

1713 {
1714  // Call comes from DataWriterImpl_t which should arleady have the lock_.
1715 
1717  return;
1718  }
1719 
1720  std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r = deadline_map_.equal_range(instance->deadline_);
1721  while (r.first != r.second && r.first->second != instance) {
1722  ++r.first;
1723  }
1724  if (r.first != r.second) {
1725  // The instance was in the map.
1726  deadline_map_.erase(r.first);
1727  }
1728  instance->deadline_ = MonotonicTimePoint::now() + deadline_period_;
1729  bool schedule = deadline_map_.empty();
1730  deadline_map_.insert(std::make_pair(instance->deadline_, instance));
1731  if (schedule) {
1732  deadline_task_->schedule(deadline_period_);
1733  }
1734 }
static const TimeDuration max_value
Definition: TimeDuration.h:32
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.

◆ get_cumulative_ack()

SequenceNumber OpenDDS::DCPS::WriteDataContainer::get_cumulative_ack ( )
private

Definition at line 207 of file WriteDataContainer.cpp.

References acked_sequences_, cached_cumulative_ack_, cached_cumulative_ack_valid_, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().

Referenced by data_delivered(), remove_reader_acks(), and sequence_acknowledged_i().

208 {
209  if (acked_sequences_.empty()) {
211  }
212 
214  return cached_cumulative_ack_;
215  }
216 
217  SequenceNumber result = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
218  for (AckedSequenceMap::const_iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
219  if (!it->second.empty()) {
220  result = result == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? it->second.cumulative_ack() : std::min(result, it->second.cumulative_ack());
221  }
222  }
223  cached_cumulative_ack_ = result;
225  return result;
226 }
static SequenceNumber SEQUENCENUMBER_UNKNOWN()

◆ get_handle_instance()

PublicationInstance_rch OpenDDS::DCPS::WriteDataContainer::get_handle_instance ( DDS::InstanceHandle_t  handle)
Todo:
remove/document this!

Definition at line 1355 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::find(), instances_, and LM_DEBUG.

Referenced by enqueue(), and obtain_buffer().

1356 {
1357  PublicationInstance_rch instance;
1358 
1359  if (0 != find(instances_, handle, instance)) {
1360  ACE_DEBUG((LM_DEBUG,
1361  ACE_TEXT("(%P|%t) ")
1362  ACE_TEXT("WriteDataContainer::get_handle_instance, ")
1363  ACE_TEXT("lookup for %d failed\n"), handle));
1364  }
1365 
1366  return instance;
1367 }
#define ACE_DEBUG(X)
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
ACE_TEXT("TCP_Factory")
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
RcHandle< PublicationInstance > PublicationInstance_rch

◆ get_instance_handles()

void OpenDDS::DCPS::WriteDataContainer::get_instance_handles ( InstanceHandleVec &  instance_handles)

Definition at line 1513 of file WriteDataContainer.cpp.

References ACE_GUARD, instances_, and lock_.

1514 {
1516  guard,
1517  lock_);
1518  PublicationInstanceMapType::iterator it = instances_.begin();
1519 
1520  while (it != instances_.end()) {
1521  instance_handles.push_back(it->second->instance_handle_);
1522  ++it;
1523  }
1524 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_
PublicationInstanceMapType instances_
The individual instance queue threads in the data.

◆ get_last_ack()

SequenceNumber OpenDDS::DCPS::WriteDataContainer::get_last_ack ( )
private

Definition at line 229 of file WriteDataContainer.cpp.

References acked_sequences_, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().

230 {
231  if (acked_sequences_.empty()) {
233  }
234 
235  SequenceNumber result = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
236  for (AckedSequenceMap::const_iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
237  if (!it->second.empty()) {
238  result = result == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? it->second.last_ack() : std::max(result, it->second.last_ack());
239  }
240  }
241  return result;
242 }
static SequenceNumber SEQUENCENUMBER_UNKNOWN()

◆ get_resend_data()

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::get_resend_data ( )

Obtain a list of data for resending. This is only used when TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list returned is not put on any SendStateDataSampleList.

Definition at line 622 of file WriteDataContainer.cpp.

References DBG_ENTRY_LVL, resend_data_, and OpenDDS::DCPS::SendStateDataSampleList::reset().

623 {
624  DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
625 
626  //
627  // The samples in unsent_data are added to the sending_data
628  // during enqueue.
629  //
630  SendStateDataSampleList list = this->resend_data_;
631 
632  //
633  // Clear the unsent data list.
634  //
635  this->resend_data_.reset();
636  //
637  // Return the moved list.
638  //
639  return list;
640 }
SendStateDataSampleList resend_data_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ get_unsent_data()

ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::get_unsent_data ( SendStateDataSampleList list)

Obtain a list of data that has not yet been sent. The data on the list returned is moved from the internal unsent_data_ list to the internal sending_data_ list as part of this call. The entire list is linked via the DataSampleElement.next_send_sample_ link as well.

Definition at line 583 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::begin(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::reset(), sending_data_, OpenDDS::DCPS::DataSampleElement::set_transaction_id(), transaction_id_, and unsent_data_.

584 {
585  DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
586  //
587  // The samples in unsent_data are added to the local datawriter
588  // list and enqueued to the sending_data_ signifying they have
589  // been passed to the transport to send in a transaction
590  //
591  list = this->unsent_data_;
592 
593  // Increment send counter for this send operation
594  ++transaction_id_;
595 
596  // Mark all samples with current send counter
597  SendStateDataSampleList::iterator iter = list.begin();
598  while (iter != list.end()) {
599  iter->set_transaction_id(this->transaction_id_);
600  ++iter;
601  }
602 
603  //
604  // The unsent_data_ already linked with the
605  // next_send_sample during enqueue.
606  // Append the unsent_data_ to current sending_data_
607  // list.
609 
610  //
611  // Clear the unsent data list.
612  //
613  this->unsent_data_.reset();
614 
615  //
616  // Return the moved list.
617  //
618  return transaction_id_;
619 }
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
SendStateDataSampleList sending_data_
List of data that is currently being sent.
SendStateDataSampleListIterator iterator
STL-style bidirectional iterator and const-iterator types.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void enqueue_tail(const DataSampleElement *element)
void set_transaction_id(ACE_UINT64 transaction_id)

◆ log_send_state_lists()

void OpenDDS::DCPS::WriteDataContainer::log_send_state_lists ( OPENDDS_STRING  description)
private

Definition at line 1593 of file WriteDataContainer.cpp.

References ACE_DEBUG, instances_, LM_DEBUG, num_all_samples(), orphaned_to_transport_, sending_data_, sent_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.

Referenced by wait_pending().

1594 {
1595  ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
1596  description.c_str(),
1597  unsent_data_.size(),
1598  sending_data_.size(),
1599  sent_data_.size(),
1601  num_all_samples(),
1602  instances_.size()));
1603 }
#define ACE_DEBUG(X)
SendStateDataSampleList orphaned_to_transport_
SendStateDataSampleList sent_data_
List of data that has already been sent.
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
SendStateDataSampleList sending_data_
List of data that is currently being sent.
PublicationInstanceMapType instances_
The individual instance queue threads in the data.

◆ num_all_samples()

size_t OpenDDS::DCPS::WriteDataContainer::num_all_samples ( )

Return the number of samples for all instances.

Definition at line 563 of file WriteDataContainer.cpp.

References ACE_GUARD_RETURN, instances_, and lock_.

Referenced by log_send_state_lists(), and obtain_buffer().

564 {
565  size_t size = 0;
566 
568  guard,
569  lock_,
570  0);
571 
572  for (PublicationInstanceMapType::iterator iter = instances_.begin();
573  iter != instances_.end();
574  ++iter)
575  {
576  size += iter->second->samples_.size();
577  }
578 
579  return size;
580 }
ACE_Recursive_Thread_Mutex lock_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
PublicationInstanceMapType instances_
The individual instance queue threads in the data.

◆ num_samples()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::num_samples ( DDS::InstanceHandle_t  handle,
size_t &  size 
)

Return the number of samples for the given instance.

Definition at line 542 of file WriteDataContainer.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::find(), instances_, lock_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, and OpenDDS::DCPS::InstanceDataSampleList::size().

544 {
546  guard,
547  lock_,
549  PublicationInstance_rch instance;
550 
551  int const find_attempt = find(instances_, handle, instance);
552 
553  if (0 != find_attempt) {
554  return DDS::RETCODE_ERROR;
555 
556  } else {
557  size = instance->samples_.size();
558  return DDS::RETCODE_OK;
559  }
560 }
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex lock_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
RcHandle< PublicationInstance > PublicationInstance_rch

◆ obtain_buffer()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer ( DataSampleElement *&  element,
DDS::InstanceHandle_t  handle 
)

Allocate a DataSampleElement object and check the space availability for newly allocated element according to qos settings. For the blocking write case, if resource limits or history qos limits are reached, then it blocks for max blocking time for a previous sample to be delivered or dropped by the transport. In non-blocking write case, if resource limits or history qos limits are reached, will attempt to remove oldest samples (forcing the transport to drop samples if necessary) to make space. If there are several threads waiting then the first one in the waiting list can enqueue, others continue waiting. Note: the lock should be held before calling this method

Definition at line 1138 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_NEW_MALLOC_RETURN, ACE_TEXT(), condition_, OpenDDS::DCPS::CvStatus_Error, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::CvStatus_Timeout, data_holder_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterDataSampleList::enqueue_tail(), get_handle_instance(), history_depth_, instances_, DDS::ReliabilityQosPolicy::kind, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), max_blocking_time_, max_num_samples_, max_samples_per_instance_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), num_all_samples(), publication_id_, OpenDDS::DCPS::DataWriterImpl::qos_, release_buffer(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, remove_excess_durable(), remove_oldest_sample(), DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sample_list_element_allocator_, OpenDDS::DCPS::PublicationInstance::samples_, shutdown_, TheServiceParticipant, OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until(), waiting_on_release_, and writer_.

1140 {
1141  DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
1142 
1144 
1145  PublicationInstance_rch instance = get_handle_instance(handle);
1146 
1147  if (!instance) {
1149  }
1150 
1152  element,
1153  static_cast<DataSampleElement*>(
1155  sizeof(DataSampleElement))),
1156  DataSampleElement(publication_id_,
1157  this->writer_,
1158  instance),
1160 
1161  // Extract the current instance queue.
1162  InstanceDataSampleList& instance_list = instance->samples_;
1164 
1165  bool set_timeout = true;
1166  MonotonicTimePoint timeout;
1167 
1168  //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and
1169  //max_instances and max_instances * depth
1170  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1171  while ((instance_list.size() >= max_samples_per_instance_) ||
1172  ((this->max_num_samples_ > 0) &&
1173  ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
1174 
1176  if (instance_list.size() >= history_depth_) {
1177  if (DCPS_debug_level >= 2) {
1178  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1179  ACE_TEXT(" instance %d attempting to remove")
1180  ACE_TEXT(" its oldest sample (reliable)\n"),
1181  handle));
1182  }
1183  bool oldest_released = false;
1184  ret = remove_oldest_sample(instance_list, oldest_released);
1185  if (oldest_released) {
1186  break;
1187  }
1188  }
1189  // Reliable writers can wait
1190  if (set_timeout) {
1191  timeout = MonotonicTimePoint::now() + TimeDuration(max_blocking_time_);
1192  set_timeout = false;
1193  }
1194  if (!shutdown_ && MonotonicTimePoint::now() < timeout) {
1195  if (DCPS_debug_level >= 2) {
1196  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1197  ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
1198  handle));
1199  }
1200 
1201  waiting_on_release_ = true;
1202  switch (condition_.wait_until(timeout, thread_status_manager)) {
1203  case CvStatus_NoTimeout:
1205  break;
1206 
1207  case CvStatus_Timeout:
1208  if (DCPS_debug_level >= 2) {
1209  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1210  ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
1211  handle));
1212  }
1213  ret = DDS::RETCODE_TIMEOUT;
1214  break;
1215 
1216  case CvStatus_Error:
1217  if (DCPS_debug_level) {
1218  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::obtain_buffer: "
1219  "error in wait_until\n"));
1220  }
1221  ret = DDS::RETCODE_ERROR;
1222  break;
1223  }
1224 
1225  } else {
1226  //either shutdown has been signaled or max_blocking_time
1227  //has surpassed so treat as timeout
1228  ret = DDS::RETCODE_TIMEOUT;
1229  }
1230 
1231  } else {
1232  //BEST EFFORT
1233  bool oldest_released = false;
1234 
1235  //try to remove stale samples from this instance
1236  // The remove_oldest_sample() method removes the oldest sample
1237  // from instance list and removes it from the internal lists.
1238  if (instance_list.size() > 0) {
1239  if (DCPS_debug_level >= 2) {
1240  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1241  ACE_TEXT(" instance %d attempting to remove")
1242  ACE_TEXT(" its oldest sample\n"),
1243  handle));
1244  }
1245  ret = remove_oldest_sample(instance_list, oldest_released);
1246  }
1247  //else try to remove stale samples from other instances which are full
1248  if (ret == DDS::RETCODE_OK && !oldest_released) {
1249  if (DCPS_debug_level >= 2) {
1250  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1251  ACE_TEXT(" instance %d attempting to remove")
1252  ACE_TEXT(" oldest sample from any full instances\n"),
1253  handle));
1254  }
1255  PublicationInstanceMapType::iterator it = instances_.begin();
1256 
1257  while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
1258  if (it->second->samples_.size() >= max_samples_per_instance_) {
1259  ret = remove_oldest_sample(it->second->samples_, oldest_released);
1260  }
1261  ++it;
1262  }
1263  }
1264  //else try to remove stale samples from other non-full instances
1265  if (ret == DDS::RETCODE_OK && !oldest_released) {
1266  if (DCPS_debug_level >= 2) {
1267  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1268  ACE_TEXT(" instance %d attempting to remove")
1269  ACE_TEXT(" oldest sample from any instance with samples currently\n"),
1270  handle));
1271  }
1272  PublicationInstanceMapType::iterator it = instances_.begin();
1273 
1274  while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
1275  if (it->second->samples_.size() > 0) {
1276  ret = remove_oldest_sample(it->second->samples_, oldest_released);
1277  }
1278  ++it;
1279  }
1280  }
1281  if (!oldest_released) {
1282  //This means that no instances have samples to remove and yet
1283  //still hitting resource limits.
1284  ACE_ERROR((LM_ERROR,
1285  ACE_TEXT("(%P|%t) ERROR: ")
1286  ACE_TEXT("WriteDataContainer::obtain_buffer, ")
1287  ACE_TEXT("hitting resource limits with no samples to remove\n")));
1288  ret = DDS::RETCODE_ERROR;
1289  }
1290  } //END BEST EFFORT
1291 
1292  if (ret != DDS::RETCODE_OK) {
1293  if (DCPS_debug_level >= 2) {
1294  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1295  ACE_TEXT(" instance %d could not obtain buffer for sample")
1296  ACE_TEXT(" releasing allotted sample and returning\n"),
1297  handle));
1298  }
1299  this->release_buffer(element);
1300  return ret;
1301  }
1302  } //END WHILE
1303 
1304  data_holder_.enqueue_tail(element);
1305 
1306  return ret;
1307 }
#define ACE_DEBUG(X)
ACE_CDR::Long Long
#define ACE_ERROR(X)
PublicationInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
DataSampleElementAllocator sample_list_element_allocator_
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_BAD_PARAMETER
ReliabilityQosPolicyKind kind
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
bool shutdown_
The flag indicates the datawriter will be destroyed.
ReliabilityQosPolicy reliability
bool waiting_on_release_
The block waiting flag.
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
DDS::ReturnCode_t remove_oldest_sample(InstanceDataSampleList &instance_list, bool &released)
const ReturnCode_t RETCODE_ERROR
DataWriterImpl * writer_
The writer that owns this container.
The wait has returned because of a timeout.
GUID_t publication_id_
The publication Id from repo.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void enqueue_tail(const DataSampleElement *element)
void release_buffer(DataSampleElement *element)
#define TheServiceParticipant
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
RcHandle< PublicationInstance > PublicationInstance_rch
The wait has returned because it was woken up.
const ReturnCode_t RETCODE_TIMEOUT

◆ obtain_buffer_for_control()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control ( DataSampleElement *&  element)

Definition at line 1120 of file WriteDataContainer.cpp.

References ACE_NEW_MALLOC_RETURN, DBG_ENTRY_LVL, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), publication_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, and writer_.

1121 {
1122  DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
1123 
1125  element,
1126  static_cast<DataSampleElement*>(
1128  sizeof(DataSampleElement))),
1129  DataSampleElement(publication_id_,
1130  this->writer_,
1133 
1134  return DDS::RETCODE_OK;
1135 }
DataSampleElementAllocator sample_list_element_allocator_
const ReturnCode_t RETCODE_OK
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
const ReturnCode_t RETCODE_ERROR
DataWriterImpl * writer_
The writer that owns this container.
GUID_t publication_id_
The publication Id from repo.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
RcHandle< PublicationInstance > PublicationInstance_rch

◆ OPENDDS_MAP_CMP()

typedef OpenDDS::DCPS::WriteDataContainer::OPENDDS_MAP_CMP ( GUID_t  ,
DisjointSequence  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MULTIMAP()

typedef OpenDDS::DCPS::WriteDataContainer::OPENDDS_MULTIMAP ( MonotonicTimePoint  ,
PublicationInstance_rch   
)
private

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::WriteDataContainer::OPENDDS_VECTOR ( DDS::InstanceHandle_t  )

Returns a vector of handles for the instances registered for this data writer.

◆ operator=()

WriteDataContainer& OpenDDS::DCPS::WriteDataContainer::operator= ( WriteDataContainer const &  )
private

◆ pending_data()

bool OpenDDS::DCPS::WriteDataContainer::pending_data ( )
private

Returns if pending data exists. This includes sending, and unsent data.

Definition at line 643 of file WriteDataContainer.cpp.

References orphaned_to_transport_, sending_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.

Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().

644 {
645  return this->sending_data_.size() != 0
646  || this->orphaned_to_transport_.size() != 0
647  || this->unsent_data_.size() != 0;
648 }
SendStateDataSampleList orphaned_to_transport_
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
SendStateDataSampleList sending_data_
List of data that is currently being sent.

◆ persist_data()

bool OpenDDS::DCPS::WriteDataContainer::persist_data ( )

Copy sent data to data DURABILITY cache.

Definition at line 1425 of file WriteDataContainer.cpp.

References ACE_ERROR, ACE_TEXT(), domain_id_, durability_cache_, durability_service_, OpenDDS::DCPS::DataDurabilityCache::insert(), LM_ERROR, sent_data_, topic_name_, and type_name_.

1426 {
1427  bool result = true;
1428 
1429  // ------------------------------------------------------------
1430  // Transfer sent data to data DURABILITY cache.
1431  // ------------------------------------------------------------
1432  if (this->durability_cache_) {
1433  // A data durability cache is available for TRANSIENT or
1434  // PERSISTENT data durability. Cache the data samples.
1435 
1436  //
1437  // We only cache data that is not still in use outside of
1438  // this instance of WriteDataContainer
1439  // (only cache samples in sent_data_ meaning transport has delivered).
1440  bool const inserted =
1441  this->durability_cache_->insert(this->domain_id_,
1442  this->topic_name_,
1443  this->type_name_,
1444  this->sent_data_,
1445  this->durability_service_
1446  );
1447 
1448  result = inserted;
1449 
1450  if (!inserted)
1451  ACE_ERROR((LM_ERROR,
1452  ACE_TEXT("(%P|%t) ERROR: ")
1453  ACE_TEXT("WriteDataContainer::persist_data, ")
1454  ACE_TEXT("failed to make data durable for ")
1455  ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
1456  this->domain_id_,
1457  this->topic_name_,
1458  this->type_name_));
1459  }
1460 
1461  return result;
1462 }
#define ACE_ERROR(X)
char const *const topic_name_
Topic name.
SendStateDataSampleList sent_data_
List of data that has already been sent.
DDS::DurabilityServiceQosPolicy const & durability_service_
DURABILITY_SERVICE QoS specific to the DataWriter.
char const *const type_name_
Type name.
DDS::DomainId_t const domain_id_
Domain ID.
bool insert(DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos)
ACE_TEXT("TCP_Factory")
DataDurabilityCache *const durability_cache_
Pointer to the data durability cache.

◆ process_deadlines()

void OpenDDS::DCPS::WriteDataContainer::process_deadlines ( const MonotonicTimePoint now)
private

Definition at line 1655 of file WriteDataContainer.cpp.

References ACE_GUARD, OpenDDS::DCPS::PublicationInstance::deadline_, deadline_last_total_count_, deadline_map_, deadline_period_, deadline_status_, deadline_status_lock_, deadline_task_, OpenDDS::DCPS::PublicationInstance::instance_handle_, DDS::OfferedDeadlineMissedStatus::last_instance_handle, OpenDDS::DCPS::DataWriterImpl::listener_for(), lock_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), DDS::OFFERED_DEADLINE_MISSED_STATUS, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::OfferedDeadlineMissedStatus::total_count, DDS::OfferedDeadlineMissedStatus::total_count_change, and writer_.

1656 {
1657  // Lock the DataWriterImpl.
1659  // Lock ourselves.
1661 
1662  if (deadline_map_.empty()) {
1663  return;
1664  }
1665 
1666  bool notify = false;
1667 
1668  for (DeadlineMapType::iterator pos = deadline_map_.begin(), limit = deadline_map_.end();
1669  pos != limit && pos->first < now; pos = deadline_map_.begin()) {
1670 
1671  PublicationInstance_rch instance = pos->second;
1672  deadline_map_.erase(pos);
1673 
1676  deadline_status_.last_instance_handle = instance->instance_handle_;
1677 
1679  notify = true;
1680 
1681  DDS::DataWriterListener_var listener = writer_->listener_for(DDS::OFFERED_DEADLINE_MISSED_STATUS);
1682 
1683  if (listener) {
1684  // Copy before releasing the lock.
1686 
1687  // Release the lock during the upcall.
1689  ACE_GUARD(ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex>, rev_dwi_guard, deadline_reverse_status_lock);
1690 
1691  // @todo Will this operation ever throw? If so we may want to
1692  // catch all exceptions, and act accordingly.
1693  listener->on_offered_deadline_missed(writer_, status);
1694 
1695  // We need to update the last total count value to our current total
1696  // so that the next time we will calculate the correct total_count_change;
1698  }
1699 
1700  instance->deadline_ += deadline_period_;
1701  deadline_map_.insert(std::make_pair(instance->deadline_, instance));
1702  }
1703 
1704  if (notify) {
1706  }
1707 
1708  deadline_task_->schedule(deadline_map_.begin()->first - now);
1709 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex & deadline_status_lock_
Lock for synchronization of status_ member.
ACE_Recursive_Thread_Mutex lock_
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
DataWriterImpl * writer_
The writer that owns this container.
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
const StatusKind OFFERED_DEADLINE_MISSED_STATUS
DDS::OfferedDeadlineMissedStatus & deadline_status_
Reference to the missed requested deadline status structure.
CORBA::Long & deadline_last_total_count_
Last total_count when status was last checked.
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.
RcHandle< PublicationInstance > PublicationInstance_rch

◆ reenqueue_all()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::reenqueue_all ( const GUID_t reader_id,
const DDS::LifespanQosPolicy lifespan,
const OPENDDS_STRING filterClassName,
const FilterEvaluator eval,
const DDS::StringSeq params 
)

Create a resend list with the copies of all current "sending" and "sent" samples. The samples will be sent to the subscriber specified.

Definition at line 326 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), acked_sequences_, OpenDDS::DCPS::SendStateDataSampleList::begin(), cached_cumulative_ack_valid_, copy_and_prepend(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::DisjointSequence::erase(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::GUID_UNKNOWN, instances_, LM_DEBUG, lock_, max_durable_per_instance_, publication_id_, resend_data_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sending_data_, sent_data_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SendStateDataSampleList::size(), topic_name_, and wfa_lock_.

335 {
337  guard,
338  lock_,
340 
341  ssize_t total_size = 0;
342  for (PublicationInstanceMapType::iterator it = instances_.begin();
343  it != instances_.end(); ++it) {
344  const ssize_t durable = std::min(it->second->samples_.size(),
346  total_size += durable;
347  it->second->durable_samples_remaining_ = durable;
348  }
349 
352  reader_id,
353  lifespan,
354 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
355  filterClassName, eval, expression_params,
356 #endif
357  total_size);
358 
360  sent_data_,
361  reader_id,
362  lifespan,
363 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
364  filterClassName, eval, expression_params,
365 #endif
366  total_size);
367 
368  {
371  DisjointSequence& ds = acked_sequences_[reader_id];
373 
374  // Remove exactly what will be sent
376  while (iter != resend_data_.end()) {
377  ds.erase(iter->get_header().sequence_);
378  ++iter;
379  }
380  }
381 
382  if (DCPS_debug_level > 9 && resend_data_.size()) {
383  ACE_DEBUG((LM_DEBUG,
384  ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
385  ACE_TEXT("domain %d topic %C publication %C copying ")
386  ACE_TEXT("sending/sent to resend to %C.\n"),
387  domain_id_,
388  topic_name_,
389  LogGuid(publication_id_).c_str(),
390  LogGuid(reader_id).c_str()));
391  }
392 
393  return DDS::RETCODE_OK;
394 }
#define ACE_DEBUG(X)
char const *const topic_name_
Topic name.
const ReturnCode_t RETCODE_OK
SendStateDataSampleList sent_data_
List of data that has already been sent.
void copy_and_prepend(SendStateDataSampleList &list, const SendStateDataSampleList &appended, const GUID_t &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params, ssize_t &max_resend_samples)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
int ssize_t
ACE_Recursive_Thread_Mutex lock_
SendStateDataSampleList resend_data_
SendStateDataSampleList sending_data_
List of data that is currently being sent.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
DDS::DomainId_t const domain_id_
Domain ID.
GUID_t publication_id_
The publication Id from repo.
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
SendStateDataSampleListIterator iterator
STL-style bidirectional iterator and const-iterator types.
iterator begin()
Return iterator to beginning of list.
iterator end()
Return iterator to end of list.
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.

◆ register_instance()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::register_instance ( DDS::InstanceHandle_t instance_handle,
Message_Block_Ptr registered_sample 
)

Dynamically allocate a PublicationInstance object and add to the instances_ list.

Note
: The registered_sample is an input and output parameter. A shallow copy of the sample data will be given to datawriter as part of the control message.

Definition at line 397 of file WriteDataContainer.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::bind(), ACE_Message_Block::duplicate(), extend_deadline(), OpenDDS::DCPS::find(), OpenDDS::DCPS::DataWriterImpl::get_next_handle(), DDS::HANDLE_NIL, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PublicationInstance::instance_handle_, instances_, LM_ERROR, max_num_instances_, OpenDDS::DCPS::move(), OpenDDS::DCPS::PublicationInstance::registered_sample_, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, and writer_.

400 {
401  PublicationInstance_rch instance;
402 
403  if (instance_handle == DDS::HANDLE_NIL) {
404  if (max_num_instances_ > 0
405  && max_num_instances_ <= (CORBA::Long) instances_.size()) {
407  }
408 
409  // registered the instance for the first time.
410  instance.reset(new PublicationInstance(move(registered_sample)), keep_count());
411 
412  instance_handle = this->writer_->get_next_handle();
413 
414  int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
415 
416  if (0 != insert_attempt) {
417  ACE_ERROR((LM_ERROR,
418  ACE_TEXT("(%P|%t) ERROR: ")
419  ACE_TEXT("WriteDataContainer::register_instance, ")
420  ACE_TEXT("failed to insert instance handle=%X\n"),
421  instance.in()));
422  return DDS::RETCODE_ERROR;
423  } // if (0 != insert_attempt)
424 
425  instance->instance_handle_ = instance_handle;
426 
427  extend_deadline(instance);
428 
429  } else {
430 
431  int const find_attempt = find(instances_, instance_handle, instance);
432 
433  if (0 != find_attempt) {
434  ACE_ERROR((LM_ERROR,
435  ACE_TEXT("(%P|%t) ERROR: ")
436  ACE_TEXT("WriteDataContainer::register_instance, ")
437  ACE_TEXT("The provided instance handle=%X is not a valid")
438  ACE_TEXT("handle.\n"),
439  instance_handle));
440 
441  return DDS::RETCODE_ERROR;
442  } // if (0 != find_attempt)
443  }
444 
445  // The registered_sample is shallow copied.
446  registered_sample.reset(instance->registered_sample_->duplicate());
447 
448  return DDS::RETCODE_OK;
449 }
ACE_CDR::Long Long
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
const InstanceHandle_t HANDLE_NIL
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
void extend_deadline(const PublicationInstance_rch &instance)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
const ReturnCode_t RETCODE_ERROR
DataWriterImpl * writer_
The writer that owns this container.
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
ACE_TEXT("TCP_Factory")
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
DDS::InstanceHandle_t get_next_handle()
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
RcHandle< PublicationInstance > PublicationInstance_rch

◆ release_buffer()

void OpenDDS::DCPS::WriteDataContainer::release_buffer ( DataSampleElement element)

Release the memory previously allocated. This method is corresponding to the obtain_buffer method. If the memory is allocated by some allocator then the memory needs to be released to the allocator.

Definition at line 1310 of file WriteDataContainer.cpp.

References ACE_DES_FREE, data_holder_, OpenDDS::DCPS::WriterDataSampleList::dequeue(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::SAMPLE_DATA, and sample_list_element_allocator_.

Referenced by data_delivered(), data_dropped(), obtain_buffer(), remove_excess_durable(), remove_oldest_sample(), and ~WriteDataContainer().

1311 {
1312  if (element->get_header().message_id_ == SAMPLE_DATA)
1313  data_holder_.dequeue(element);
1314  // Release the memory to the allocator.
1315  ACE_DES_FREE(element,
1317  DataSampleElement);
1318 }
DataSampleElementAllocator sample_list_element_allocator_
void free(void *ptr)
Return a chunk of memory back to free list cache.
bool dequeue(const DataSampleElement *stale)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)

◆ remove_excess_durable()

void OpenDDS::DCPS::WriteDataContainer::remove_excess_durable ( )
private

Remove the oldest "n" samples from each instance list that are in a state such that they could only be used for durability purposes (see reenqueue_all). "n" is determined by max_durable_per_instance_, so these samples are truly unneeded – there are max_durable_per_instance_ newer samples available in the instance.

Definition at line 941 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::InstanceDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, instances_, LM_DEBUG, max_durable_per_instance_, OpenDDS::DCPS::InstanceDataSampleList::prev(), publication_id_, release_buffer(), sent_data_, OpenDDS::DCPS::InstanceDataSampleList::tail(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and topic_name_.

Referenced by obtain_buffer().

942 {
944  return;
945 
946  size_t n_released = 0;
947 
948  for (PublicationInstanceMapType::iterator iter = instances_.begin();
949  iter != instances_.end();
950  ++iter) {
951 
952  CORBA::Long durable_allowed = max_durable_per_instance_;
953  InstanceDataSampleList& instance_list = iter->second->samples_;
954 
955  for (DataSampleElement* it = instance_list.tail(), *prev; it; it = prev) {
956  prev = InstanceDataSampleList::prev(it);
957 
958  if (DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, it->get_sample())) {
959 
960  if (durable_allowed) {
961  --durable_allowed;
962  } else {
963  instance_list.dequeue(it);
964  sent_data_.dequeue(it);
965  release_buffer(it);
966  ++n_released;
967  }
968  }
969  }
970  }
971 
972  if (n_released && DCPS_debug_level > 9) {
973  ACE_DEBUG((LM_DEBUG,
974  ACE_TEXT("(%P|%t) WriteDataContainer::remove_excess_durable: ")
975  ACE_TEXT("domain %d topic %C publication %C %B samples removed ")
976  ACE_TEXT("from durable data.\n"), domain_id_, topic_name_,
977  LogGuid(publication_id_).c_str(), n_released));
978  }
979 }
#define ACE_DEBUG(X)
ACE_CDR::Long Long
char const *const topic_name_
Topic name.
SendStateDataSampleList sent_data_
List of data that has already been sent.
bool dequeue(const DataSampleElement *stale)
DDS::DomainId_t const domain_id_
Domain ID.
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
GUID_t publication_id_
The publication Id from repo.
static DataSampleElement * prev(const DataSampleElement *iter)
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
void release_buffer(DataSampleElement *element)

◆ remove_instance()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::remove_instance ( PublicationInstance_rch  instance,
Message_Block_Ptr registered_sample,
bool  dup_registered_sample 
)
private

Definition at line 509 of file WriteDataContainer.cpp.

References cancel_deadline(), ACE_Message_Block::duplicate(), OpenDDS::DCPS::PublicationInstance::registered_sample_, remove_oldest_sample(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, and OpenDDS::DCPS::InstanceDataSampleList::size().

Referenced by dispose(), unregister(), and unregister_all().

512 {
513  if (dup_registered_sample) {
514  // The registered_sample is shallow copied.
515  registered_sample.reset(instance->registered_sample_->duplicate());
516  }
517 
518  // Note: The DDS specification is unclear as to if samples in the process
519  // of being sent should be removed or not.
520  // The advantage of calling remove_sample() on them is that the
521  // cached allocator memory for them is freed. The disadvantage
522  // is that the slow reader may see multiple disposes without
523  // any write sample between them and hence not temporarily move into the
524  // Alive state.
525  // We have chosen to NOT remove the sending samples.
526  InstanceDataSampleList& instance_list = instance->samples_;
527 
528  while (instance_list.size() > 0) {
529  bool released = false;
530  const DDS::ReturnCode_t ret = remove_oldest_sample(instance_list, released);
531  if (ret != DDS::RETCODE_OK) {
532  return ret;
533  }
534  }
535 
536  cancel_deadline(instance);
537 
538  return DDS::RETCODE_OK;
539 }
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t remove_oldest_sample(InstanceDataSampleList &instance_list, bool &released)
void cancel_deadline(const PublicationInstance_rch &instance)

◆ remove_oldest_sample()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample ( InstanceDataSampleList instance_list,
bool &  released 
)
private

Remove the oldest sample (head) from the instance history list. This method also updates the internal lists to reflect the change. If the sample is in the unsent_data_ or sent_data_ list then it will be released. If the sample is in the sending_data_ list then the transport will be notified to release the sample, then the sample will be released. Otherwise an error is returned. The "released" boolean value indicates whether the sample is released.

Definition at line 983 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue_head(), domain_id_, empty_condition_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::TransportClient::remove_sample(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, topic_name_, unsent_data_, and writer_.

Referenced by obtain_buffer(), and remove_instance().

986 {
987  DataSampleElement* stale = 0;
988 
989  //
990  // Remove the oldest sample from the instance list.
991  //
992  if (!instance_list.dequeue_head(stale)) {
993  ACE_ERROR_RETURN((LM_ERROR,
994  ACE_TEXT("(%P|%t) ERROR: ")
995  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
996  ACE_TEXT("dequeue_head_next_sample failed\n")),
998  }
999 
1000  //
1001  // Remove the stale data from the next_writer_sample_ list. The
1002  // sending_data_/next_send_sample_ list is not managed within the
1003  // container, it is only used external to the container and does
1004  // not need to be managed internally.
1005  //
1006  // The next_writer_sample_ link is being used in one of the sent_data_,
1007  // sending_data_, or unsent_data lists. Removal from the doubly
1008  // linked list needs to repair the list only when the stale sample
1009  // is either the head or tail of the list.
1010  //
1011 
1012  //
1013  // Locate the head of the list that the stale data is in.
1014  //
1015  SendStateDataSampleList* send_lists[] = {
1016  &sending_data_,
1017  &sent_data_,
1018  &unsent_data_,
1020  const SendStateDataSampleList* containing_list =
1022 
1023  //
1024  // Identify the list that the stale data is in.
1025  // The stale data should be in one of the sent_data_, sending_data_
1026  // or unsent_data_. It should not be in released_data_ list since
1027  // this function is the only place a sample is moved from
1028  // sending_data_ to released_data_ list.
1029 
1030  // Remove the element from the internal list.
1031  bool result = false;
1032 
1033  if (containing_list == &this->sending_data_) {
1034  if (DCPS_debug_level > 2) {
1035  ACE_ERROR((LM_WARNING,
1036  ACE_TEXT("(%P|%t) WARNING: ")
1037  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1038  ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
1039  }
1040 
1041  // This means transport is still using the sample that needs to
1042  // be released currently so notify transport that sample is being removed.
1043 
1044  if (this->writer_->remove_sample(stale)) {
1045  if (this->sent_data_.dequeue(stale)) {
1046  release_buffer(stale);
1047  }
1048  result = true;
1049 
1050  } else {
1051  if (this->sending_data_.dequeue(stale)) {
1052  this->orphaned_to_transport_.enqueue_tail(stale);
1053  } else if (this->sent_data_.dequeue(stale)) {
1054  release_buffer(stale);
1055  result = true;
1056  }
1057  result = true;
1058  }
1059  released = true;
1060 
1061  } else if (containing_list == &this->sent_data_) {
1062  // No one is using the data sample, so we can release it back to
1063  // its allocator.
1064  //
1065  result = this->sent_data_.dequeue(stale) != 0;
1066  release_buffer(stale);
1067  released = true;
1068 
1069  if (DCPS_debug_level > 9) {
1070  ACE_DEBUG((LM_DEBUG,
1071  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
1072  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
1073  this->domain_id_,
1074  this->topic_name_,
1075  LogGuid(publication_id_).c_str()));
1076  }
1077 
1078  } else if (containing_list == &this->unsent_data_) {
1079  //
1080  // No one is using the data sample, so we can release it back to
1081  // its allocator.
1082  //
1083  result = this->unsent_data_.dequeue(stale) != 0;
1084  release_buffer(stale);
1085  released = true;
1086 
1087  if (DCPS_debug_level > 9) {
1088  ACE_DEBUG((LM_DEBUG,
1089  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
1090  ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
1091  this->domain_id_,
1092  this->topic_name_,
1093  LogGuid(publication_id_).c_str()));
1094  }
1095  } else {
1096  ACE_ERROR_RETURN((LM_ERROR,
1097  ACE_TEXT("(%P|%t) ERROR: ")
1098  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1099  ACE_TEXT("The oldest sample is not in any internal list.\n")),
1101  }
1102 
1103  if (!pending_data()) {
1105  }
1106 
1107  if (!result) {
1108  ACE_ERROR_RETURN((LM_ERROR,
1109  ACE_TEXT("(%P|%t) ERROR: ")
1110  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1111  ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
1113 
1114  }
1115 
1116  return DDS::RETCODE_OK;
1117 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
char const *const topic_name_
Topic name.
SendStateDataSampleList orphaned_to_transport_
bool remove_sample(const DataSampleElement *sample)
ConditionVariableType empty_condition_
const ReturnCode_t RETCODE_OK
SendStateDataSampleList sent_data_
List of data that has already been sent.
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
bool notify_all()
Unblock all of the threads waiting on this condition.
bool dequeue(const DataSampleElement *stale)
SendStateDataSampleList sending_data_
List of data that is currently being sent.
const ReturnCode_t RETCODE_ERROR
DDS::DomainId_t const domain_id_
Domain ID.
DataWriterImpl * writer_
The writer that owns this container.
static const SendStateDataSampleList * send_list_containing_element(const DataSampleElement *element, SendStateDataSampleList **begin, SendStateDataSampleList **end)
GUID_t publication_id_
The publication Id from repo.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define ACE_ERROR_RETURN(X, Y)
void release_buffer(DataSampleElement *element)

◆ remove_reader_acks()

void OpenDDS::DCPS::WriteDataContainer::remove_reader_acks ( const GUID_t reader)
private

Definition at line 191 of file WriteDataContainer.cpp.

References acked_sequences_, cached_cumulative_ack_valid_, get_cumulative_ack(), OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), wfa_condition_, and wfa_lock_.

192 {
194 
195  const SequenceNumber prev_cum_ack = get_cumulative_ack();
196  const AckedSequenceMap::iterator it = acked_sequences_.find(reader);
197  if (it != acked_sequences_.end()) {
198  acked_sequences_.erase(it);
200  if (prev_cum_ack != get_cumulative_ack()) {
202  }
203  }
204 }
bool notify_all()
Unblock all of the threads waiting on this condition.
WfaConditionVariableType wfa_condition_
Used to block in wait_for_acks().
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.

◆ sequence_acknowledged()

bool OpenDDS::DCPS::WriteDataContainer::sequence_acknowledged ( const SequenceNumber sequence)

Definition at line 1557 of file WriteDataContainer.cpp.

References sequence_acknowledged_i(), and wfa_lock_.

1558 {
1560  return sequence_acknowledged_i(sequence);
1561 }
bool sequence_acknowledged_i(const SequenceNumber &sequence)
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.

◆ sequence_acknowledged_i()

bool OpenDDS::DCPS::WriteDataContainer::sequence_acknowledged_i ( const SequenceNumber sequence)
private

Definition at line 1564 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, get_cumulative_ack(), OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, publication_id_, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().

Referenced by sequence_acknowledged(), and wait_ack_of_seq().

1565 {
1566  if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
1567  //return true here so that wait_for_acknowledgments doesn't block
1568  return true;
1569  }
1570 
1571  SequenceNumber acked = get_cumulative_ack();
1572  if (DCPS_debug_level >= 10) {
1573  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged_i ")
1574  ACE_TEXT("- %C cumulative ack is currently: %q\n"), DCPS::LogGuid(publication_id_).c_str(), acked.getValue()));
1575  }
1576  if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
1577  return false;
1578  }
1579  return true;
1580 }
#define ACE_DEBUG(X)
GUID_t publication_id_
The publication Id from repo.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
static SequenceNumber SEQUENCENUMBER_UNKNOWN()

◆ set_deadline_period()

void OpenDDS::DCPS::WriteDataContainer::set_deadline_period ( const TimeDuration deadline_period)
private

Definition at line 1606 of file WriteDataContainer.cpp.

References deadline_map_, deadline_period_, deadline_task_, instances_, OpenDDS::DCPS::TimeDuration::max_value, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_ASSERT, and OpenDDS::DCPS::swap().

1607 {
1608  // Call comes from DataWriterImpl_t which should arleady have the lock_.
1609 
1610  // Deadline for all instances starting from now.
1611  const MonotonicTimePoint deadline = MonotonicTimePoint::now() + deadline_period;
1612 
1613  // Reset the deadline timer if the period has changed.
1614  if (deadline_period_ != deadline_period) {
1616  OPENDDS_ASSERT(deadline_map_.empty());
1617 
1618  for (PublicationInstanceMapType::iterator iter = instances_.begin();
1619  iter != instances_.end();
1620  ++iter) {
1621  iter->second->deadline_ = deadline;
1622  deadline_map_.insert(std::make_pair(deadline, iter->second));
1623  }
1624 
1625  if (!deadline_map_.empty()) {
1626  deadline_task_->schedule(deadline_period);
1627  }
1628  } else if (deadline_period == TimeDuration::max_value) {
1629  if (!deadline_map_.empty()) {
1630  deadline_task_->cancel();
1631  }
1632 
1633  deadline_map_.clear();
1634  } else {
1635  DeadlineMapType new_map;
1636  for (PublicationInstanceMapType::iterator iter = instances_.begin();
1637  iter != instances_.end();
1638  ++iter) {
1639  iter->second->deadline_ = deadline;
1640  new_map.insert(std::make_pair(iter->second->deadline_, iter->second));
1641  }
1642  std::swap(new_map, deadline_map_);
1643 
1644  if (!deadline_map_.empty()) {
1645  deadline_task_->cancel();
1646  deadline_task_->schedule(deadline_map_.begin()->first - MonotonicTimePoint::now());
1647  }
1648  }
1649 
1650  deadline_period_ = deadline_period;
1651  }
1652 }
static const TimeDuration max_value
Definition: TimeDuration.h:32
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.
void swap(MessageBlock &lhs, MessageBlock &rhs)

◆ unregister()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::unregister ( DDS::InstanceHandle_t  handle,
Message_Block_Ptr registered_sample,
bool  dup_registered_sample = true 
)

Remove the provided instance from the instances_ list. The registered sample data will be released upon the deletion of the PublicationInstance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.

This method returns error if the instance is not registered.

Definition at line 452 of file WriteDataContainer.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), instances_, LM_ERROR, lock_, remove_instance(), DDS::RETCODE_ERROR, and DDS::RETCODE_PRECONDITION_NOT_MET.

456 {
458  guard,
459  lock_,
461 
462  PublicationInstance_rch instance;
463  {
464  PublicationInstanceMapType::iterator pos = instances_.find(instance_handle);
465  if (pos == instances_.end()) {
466  ACE_ERROR_RETURN((LM_ERROR,
467  ACE_TEXT("(%P|%t) ERROR: ")
468  ACE_TEXT("WriteDataContainer::unregister, ")
469  ACE_TEXT("The instance(handle=%X) ")
470  ACE_TEXT("is not registered yet.\n"),
471  instance_handle),
473  }
474  instance = pos->second;
475  instances_.erase(pos);
476  }
477 
478  return remove_instance(instance, registered_sample, dup_registered_sample);
479 }
ACE_Recursive_Thread_Mutex lock_
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
ACE_TEXT("TCP_Factory")
DDS::ReturnCode_t remove_instance(PublicationInstance_rch instance, Message_Block_Ptr &registered_sample, bool dup_registered_sample)
#define ACE_ERROR_RETURN(X, Y)
RcHandle< PublicationInstance > PublicationInstance_rch

◆ unregister_all()

void OpenDDS::DCPS::WriteDataContainer::unregister_all ( )

Unregister all instances managed by this data containers.

Definition at line 1321 of file WriteDataContainer.cpp.

References ACE_ERROR, ACE_GUARD, condition_, DBG_ENTRY_LVL, instances_, LM_ERROR, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::TransportClient::remove_all_msgs(), remove_instance(), DDS::RETCODE_OK, OpenDDS::DCPS::DataWriterImpl::return_handle(), shutdown_, waiting_on_release_, and writer_.

1322 {
1323  DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
1324  shutdown_ = true;
1325 
1326  //The internal list needs protection since this call may result from the
1327  //the delete_datawriter call which does not acquire the lock in advance.
1329  guard,
1330  lock_);
1331  // Tell transport remove all control messages currently
1332  // transport is processing.
1333  (void) this->writer_->remove_all_msgs();
1334 
1335  // Broadcast to wake up all waiting threads.
1336  if (waiting_on_release_) {
1338  }
1339 
1340  Message_Block_Ptr registered_sample;
1341 
1342  for (PublicationInstanceMapType::iterator pos = instances_.begin(), limit = instances_.end(); pos != limit;) {
1343  // Release the instance data.
1344  if (remove_instance(pos->second, registered_sample, false) != DDS::RETCODE_OK) {
1345  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::unregister_all, "
1346  "remove_instance %X failed\n", pos->first));
1347  }
1348 
1349  writer_->return_handle(pos->first);
1350  instances_.erase(pos++);
1351  }
1352 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const ReturnCode_t RETCODE_OK
bool shutdown_
The flag indicates the datawriter will be destroyed.
bool notify_all()
Unblock all of the threads waiting on this condition.
ACE_Recursive_Thread_Mutex lock_
bool waiting_on_release_
The block waiting flag.
void return_handle(DDS::InstanceHandle_t handle)
DataWriterImpl * writer_
The writer that owns this container.
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
DDS::ReturnCode_t remove_instance(PublicationInstance_rch instance, Message_Block_Ptr &registered_sample, bool dup_registered_sample)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ update_acked()

void OpenDDS::DCPS::WriteDataContainer::update_acked ( const SequenceNumber seq,
const GUID_t id = GUID_UNKNOWN 
)
private

Definition at line 245 of file WriteDataContainer.cpp.

References acked_sequences_, cached_cumulative_ack_valid_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), and wfa_condition_.

Referenced by data_delivered().

246 {
247  bool do_notify = false;
248  if (id == GUID_UNKNOWN) {
249  for (AckedSequenceMap::iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
250  SequenceNumber prev_cum_ack = it->second.cumulative_ack();
251  it->second.insert(seq);
253  if (prev_cum_ack != it->second.cumulative_ack()) {
254  do_notify = true;
255  }
256  }
257  } else {
258  const AckedSequenceMap::iterator it = acked_sequences_.find(id);
259  if (it != acked_sequences_.end()) {
260  SequenceNumber prev_cum_ack = it->second.cumulative_ack();
261  if (prev_cum_ack < seq) {
262  it->second.insert(SequenceRange(prev_cum_ack, seq));
264  if (prev_cum_ack != it->second.cumulative_ack()) {
265  do_notify = true;
266  }
267  }
268  }
269  }
270  if (do_notify) {
272  }
273 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool notify_all()
Unblock all of the threads waiting on this condition.
WfaConditionVariableType wfa_condition_
Used to block in wait_for_acks().
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ wait_ack_of_seq()

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::wait_ack_of_seq ( const MonotonicTimePoint abs_deadline,
bool  deadline_is_infinite,
const SequenceNumber sequence 
)

Definition at line 1527 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::CvStatus_Error, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::CvStatus_Timeout, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sequence_acknowledged_i(), TheServiceParticipant, OpenDDS::DCPS::ConditionVariable< Mutex >::wait(), OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until(), wfa_condition_, and wfa_lock_.

1530 {
1532  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1533  while ((deadline_is_infinite || MonotonicTimePoint::now() < deadline) && !sequence_acknowledged_i(sequence)) {
1534  switch (deadline_is_infinite ? wfa_condition_.wait(thread_status_manager) : wfa_condition_.wait_until(deadline, thread_status_manager)) {
1535  case CvStatus_NoTimeout:
1536  break;
1537  case CvStatus_Timeout:
1538  if (DCPS_debug_level >= 2) {
1539  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
1540  ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
1541  sequence.getValue()));
1542  }
1543  return DDS::RETCODE_TIMEOUT;
1544  case CvStatus_Error:
1545  if (DCPS_debug_level) {
1546  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::wait_ack_of_seq: "
1547  "error in wait/wait_until\n"));
1548  }
1549  return DDS::RETCODE_ERROR;
1550  }
1551  }
1552 
1554 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
const ReturnCode_t RETCODE_ERROR
bool sequence_acknowledged_i(const SequenceNumber &sequence)
The wait has returned because of a timeout.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
WfaConditionVariableType wfa_condition_
Used to block in wait_for_acks().
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.
#define TheServiceParticipant
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
The wait has returned because it was woken up.
const ReturnCode_t RETCODE_TIMEOUT

◆ wait_pending()

void OpenDDS::DCPS::WriteDataContainer::wait_pending ( const MonotonicTimePoint deadline)

Block until pending samples have either been delivered, dropped, or the deadline has passed. Blocks indefinitely if deadline is zero.

Definition at line 1465 of file WriteDataContainer.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::CvStatus_Error, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::CvStatus_Timeout, OpenDDS::DCPS::DCPS_debug_level, empty_condition_, OpenDDS::DCPS::TimePoint_T< AceClock >::is_zero(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, log_send_state_lists(), pending_data(), TheServiceParticipant, OpenDDS::DCPS::TimePoint_T< AceClock >::value(), and OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until().

1466 {
1467  const bool no_deadline = deadline.is_zero();
1469  const bool report = DCPS_debug_level > 0 && pending_data();
1470  if (report) {
1471  if (no_deadline) {
1472  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending no timeout\n")));
1473  } else {
1474  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending ")
1475  ACE_TEXT("timeout at %#T\n"),
1476  &deadline.value()));
1477  }
1478  }
1479 
1480  bool loop = true;
1481  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1482  while (loop && pending_data()) {
1483  switch (empty_condition_.wait_until(deadline, thread_status_manager)) {
1484  case CvStatus_NoTimeout:
1485  break;
1486 
1487  case CvStatus_Timeout:
1488  if (pending_data()) {
1489  if (DCPS_debug_level >= 2) {
1490  ACE_DEBUG((LM_INFO, "(%P|%t) WriteDataContainer::wait_pending: "
1491  "Timed out waiting for messages to be transported\n"));
1492  log_send_state_lists("WriteDataContainer::wait_pending - wait timedout: ");
1493  }
1494  }
1495  loop = false;
1496  break;
1497 
1498  case CvStatus_Error:
1499  if (DCPS_debug_level) {
1500  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::wait_pending: "
1501  "error in wait_until\n"));
1502  }
1503  loop = false;
1504  break;
1505  }
1506  }
1507  if (report) {
1508  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending done\n")));
1509  }
1510 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ConditionVariableType empty_condition_
ACE_Recursive_Thread_Mutex lock_
The wait has returned because of a timeout.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define TheServiceParticipant
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
The wait has returned because it was woken up.
void log_send_state_lists(OPENDDS_STRING description)

◆ wakeup_blocking_writers()

void OpenDDS::DCPS::WriteDataContainer::wakeup_blocking_writers ( DataSampleElement stale)
private

Called when data has been dropped or delivered and any blocked writers should be notified

Definition at line 1583 of file WriteDataContainer.cpp.

References condition_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), and waiting_on_release_.

Referenced by data_delivered(), and data_dropped().

1584 {
1585  if (!stale && waiting_on_release_) {
1586  waiting_on_release_ = false;
1587 
1589  }
1590 }
bool notify_all()
Unblock all of the threads waiting on this condition.
bool waiting_on_release_
The block waiting flag.

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 356 of file WriteDataContainer.h.

◆ DataWriterImpl

friend class DataWriterImpl
friend

Definition at line 125 of file WriteDataContainer.h.

Member Data Documentation

◆ acked_sequences_

AckedSequenceMap OpenDDS::DCPS::WriteDataContainer::acked_sequences_
private

◆ cached_cumulative_ack_

SequenceNumber OpenDDS::DCPS::WriteDataContainer::cached_cumulative_ack_
private

Definition at line 427 of file WriteDataContainer.h.

Referenced by get_cumulative_ack().

◆ cached_cumulative_ack_valid_

bool OpenDDS::DCPS::WriteDataContainer::cached_cumulative_ack_valid_
private

◆ condition_

ConditionVariableType OpenDDS::DCPS::WriteDataContainer::condition_
private

Definition at line 513 of file WriteDataContainer.h.

Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().

◆ data_holder_

WriterDataSampleList OpenDDS::DCPS::WriteDataContainer::data_holder_
private

The list of all samples written to this datawriter in writing order.

Definition at line 454 of file WriteDataContainer.h.

Referenced by obtain_buffer(), and release_buffer().

◆ deadline_last_total_count_

CORBA::Long& OpenDDS::DCPS::WriteDataContainer::deadline_last_total_count_
private

Last total_count when status was last checked.

Definition at line 571 of file WriteDataContainer.h.

Referenced by process_deadlines().

◆ deadline_map_

DeadlineMapType OpenDDS::DCPS::WriteDataContainer::deadline_map_
private

◆ deadline_period_

TimeDuration OpenDDS::DCPS::WriteDataContainer::deadline_period_
private

◆ deadline_status_

DDS::OfferedDeadlineMissedStatus& OpenDDS::DCPS::WriteDataContainer::deadline_status_
private

Reference to the missed requested deadline status structure.

Definition at line 568 of file WriteDataContainer.h.

Referenced by process_deadlines().

◆ deadline_status_lock_

ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::WriteDataContainer::deadline_status_lock_
private

Lock for synchronization of status_ member.

Definition at line 565 of file WriteDataContainer.h.

Referenced by process_deadlines().

◆ deadline_task_

RcHandle<DCPS::PmfSporadicTask<WriteDataContainer> > OpenDDS::DCPS::WriteDataContainer::deadline_task_
private

Timer responsible for reporting missed offered deadlines.

Definition at line 559 of file WriteDataContainer.h.

Referenced by cancel_deadline(), extend_deadline(), process_deadlines(), set_deadline_period(), and ~WriteDataContainer().

◆ domain_id_

DDS::DomainId_t const OpenDDS::DCPS::WriteDataContainer::domain_id_
private

◆ durability_cache_

DataDurabilityCache* const OpenDDS::DCPS::WriteDataContainer::durability_cache_
private

Pointer to the data durability cache.

This a pointer to the data durability cache owned by the Service Participant singleton, which means this cache is also a singleton.

Definition at line 551 of file WriteDataContainer.h.

Referenced by persist_data().

◆ durability_service_

DDS::DurabilityServiceQosPolicy const& OpenDDS::DCPS::WriteDataContainer::durability_service_
private

DURABILITY_SERVICE QoS specific to the DataWriter.

Definition at line 554 of file WriteDataContainer.h.

Referenced by persist_data().

◆ empty_condition_

ConditionVariableType OpenDDS::DCPS::WriteDataContainer::empty_condition_
private

◆ history_depth_

CORBA::Long OpenDDS::DCPS::WriteDataContainer::history_depth_
private

Definition at line 475 of file WriteDataContainer.h.

Referenced by obtain_buffer().

◆ instances_

PublicationInstanceMapType OpenDDS::DCPS::WriteDataContainer::instances_
private

◆ lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::WriteDataContainer::lock_
mutableprivate

This lock is used to protect the container and the map in the type-specific DataWriter. This lock can be accessible via the datawriter. This lock is made to be globally accessible for performance concern. The lock is acquired as the external call (e.g. FooDataWriterImpl::write) started and the same lock will be used by the transport thread to notify the datawriter the data is delivered. Other internal operations will not lock.

Definition at line 511 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), dispose(), get_instance_handles(), num_all_samples(), num_samples(), process_deadlines(), reenqueue_all(), unregister(), unregister_all(), and wait_pending().

◆ max_blocking_time_

DDS::Duration_t OpenDDS::DCPS::WriteDataContainer::max_blocking_time_
private

The maximum time to block on write operation. This comes from DataWriter's QoS HISTORY.max_blocking_time

Definition at line 497 of file WriteDataContainer.h.

Referenced by obtain_buffer().

◆ max_durable_per_instance_

CORBA::Long OpenDDS::DCPS::WriteDataContainer::max_durable_per_instance_
private

The maximum number of samples from each instance that can be added to the resend_data_ for durability.

Definition at line 479 of file WriteDataContainer.h.

Referenced by data_delivered(), reenqueue_all(), and remove_excess_durable().

◆ max_num_instances_

CORBA::Long OpenDDS::DCPS::WriteDataContainer::max_num_instances_
private

The maximum number of instances allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS

Definition at line 485 of file WriteDataContainer.h.

Referenced by register_instance().

◆ max_num_samples_

CORBA::Long OpenDDS::DCPS::WriteDataContainer::max_num_samples_
private

The maximum number of samples allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS It also covers QoS.RESOURCE_LIMITS.max_samples and max_instances * max_samples_per_instance

Definition at line 493 of file WriteDataContainer.h.

Referenced by obtain_buffer().

◆ max_samples_per_instance_

CORBA::Long OpenDDS::DCPS::WriteDataContainer::max_samples_per_instance_
private

The maximum size a container should allow for an instance sample list

Definition at line 473 of file WriteDataContainer.h.

Referenced by obtain_buffer().

◆ n_chunks_

size_t OpenDDS::DCPS::WriteDataContainer::n_chunks_
private

The number of chunks that sample_list_element_allocator_ needs initialize.

Definition at line 525 of file WriteDataContainer.h.

Referenced by WriteDataContainer().

◆ orphaned_to_transport_

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::orphaned_to_transport_
private

List of data that has been released by WriteDataContainer but is still in process of delivery (or dropping) by transport

Definition at line 450 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), log_send_state_lists(), pending_data(), remove_oldest_sample(), and ~WriteDataContainer().

◆ publication_id_

GUID_t OpenDDS::DCPS::WriteDataContainer::publication_id_
private

◆ resend_data_

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::resend_data_
private

List of the data reenqueued to support the TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the samples in sent and sending list. This list will be passed to the transport for re-sending.

Definition at line 460 of file WriteDataContainer.h.

Referenced by get_resend_data(), and reenqueue_all().

◆ sample_list_element_allocator_

DataSampleElementAllocator OpenDDS::DCPS::WriteDataContainer::sample_list_element_allocator_
private

The cached allocator to allocate DataSampleElement objects.

Definition at line 529 of file WriteDataContainer.h.

Referenced by copy_and_prepend(), obtain_buffer(), obtain_buffer_for_control(), release_buffer(), and WriteDataContainer().

◆ sending_data_

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::sending_data_
private

◆ sent_data_

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::sent_data_
private

◆ shutdown_

bool OpenDDS::DCPS::WriteDataContainer::shutdown_
private

The flag indicates the datawriter will be destroyed.

Definition at line 532 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), enqueue(), enqueue_control(), obtain_buffer(), unregister_all(), and ~WriteDataContainer().

◆ topic_name_

char const* const OpenDDS::DCPS::WriteDataContainer::topic_name_
private

◆ transaction_id_

ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::transaction_id_
private

Id used to keep track of which send transaction DataWriter is currently creating

Definition at line 440 of file WriteDataContainer.h.

Referenced by get_unsent_data().

◆ type_name_

char const* const OpenDDS::DCPS::WriteDataContainer::type_name_
private

Type name.

Definition at line 541 of file WriteDataContainer.h.

Referenced by persist_data().

◆ unsent_data_

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::unsent_data_
private

◆ waiting_on_release_

bool OpenDDS::DCPS::WriteDataContainer::waiting_on_release_
private

The block waiting flag.

Definition at line 500 of file WriteDataContainer.h.

Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().

◆ wfa_condition_

WfaConditionVariableType OpenDDS::DCPS::WriteDataContainer::wfa_condition_
private

Used to block in wait_for_acks().

Definition at line 521 of file WriteDataContainer.h.

Referenced by data_delivered(), remove_reader_acks(), update_acked(), and wait_ack_of_seq().

◆ wfa_lock_

ACE_Thread_Mutex OpenDDS::DCPS::WriteDataContainer::wfa_lock_
private

Lock used for wait_for_acks() processing.

Definition at line 517 of file WriteDataContainer.h.

Referenced by add_reader_acks(), data_delivered(), reenqueue_all(), remove_reader_acks(), sequence_acknowledged(), and wait_ack_of_seq().

◆ writer_

DataWriterImpl* OpenDDS::DCPS::WriteDataContainer::writer_
private

The documentation for this class was generated from the following files: