OpenDDS
Snapshot(2023/04/07-19:43)
|
A container for instances sample data. More...
#include <WriteDataContainer.h>
Private Types | |
typedef ConditionVariable< ACE_Recursive_Thread_Mutex > | ConditionVariableType |
typedef ConditionVariable< ACE_Thread_Mutex > | WfaConditionVariableType |
Friends | |
class | DataWriterImpl |
class | ::DDS_TEST |
Additional Inherited Members | |
![]() | |
RcObject () | |
A container for instances sample data.
This container is instantiated per DataWriter. It maintains list of PublicationInstance objects which is internally referenced by the instance handle.
This container contains threaded lists of all data written to a given DataWriter. The real data sample is represented by the DataSampleElement. The data_holder_ holds all DataSampleElement in the writing order via the next_writer_sample_/previous_writer_sample_ thread. The instance list in PublicationInstance links samples via the next_instance_sample_ thread.
There are three state transition lists used during write operations: unsent, sending, and sent. These lists are linked via the next_send_sample_/previous_send_sample_ thread. Any DataSampleElement should be in one of these three lists and SHOULD NOT be shared between these three lists. A normal transition of a DataSampleElement would be unsent->sending->sent. A DataSampleElement transitions from unsent to sent naturally during a write operation when get_unsent_data is called. A DataSampleElement transitions from sending to sent when data_delivered is called notifying the container that the transport is finished with the sample and it can be marked as a historical sample. A DataSampleElement may transition back to unsent from sending if the transport notifies that the sample was dropped and should be resent. A DataSampleElement is removed from sent list and freed when the instance queue or container (for another instance) needs more space. A DataSampleElement may be removed and freed directly from sending when and unreliable writer needs space. In this case the transport will be notified that the sample should be removed. A DataSampleElement may also be removed directly from unsent if an unreliable writer needs space for new samples. Note: The real data sample will be freed when the reference counting goes 0. The resend list is only used when the datawriter uses TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements for the data sample duplicates of the sending and sent list and hands this list off to the transport.
Definition at line 122 of file WriteDataContainer.h.
|
private |
Definition at line 512 of file WriteDataContainer.h.
|
private |
Definition at line 519 of file WriteDataContainer.h.
OpenDDS::DCPS::WriteDataContainer::WriteDataContainer | ( | DataWriterImpl * | writer, |
CORBA::Long | max_samples_per_instance, | ||
CORBA::Long | history_depth, | ||
CORBA::Long | max_durable_per_instance, | ||
DDS::Duration_t | max_blocking_time, | ||
size_t | n_chunks, | ||
DDS::DomainId_t | domain_id, | ||
char const * | topic_name, | ||
char const * | type_name, | ||
DataDurabilityCache * | durability_cache, | ||
DDS::DurabilityServiceQosPolicy const & | durability_service, | ||
CORBA::Long | max_instances, | ||
CORBA::Long | max_total_samples, | ||
ACE_Recursive_Thread_Mutex & | deadline_status_lock, | ||
DDS::OfferedDeadlineMissedStatus & | deadline_status, | ||
CORBA::Long & | deadline_last_total_count | ||
) |
No default constructor, must be initialized.
writer | The writer which owns this container. |
max_samples_per_instance | Max samples kept within each instance |
max_durable_per_instance | Max durable samples sent for each instance |
max_blocking_time | The timeout for write. |
n_chunks | The number of chunks that the DataSampleElementAllocator needs allocate. |
domain_id | Domain ID. |
topic_name | Topic name. |
type_name | Type name. |
durability_cache | The data durability cache for unsent data. |
durability_service | DURABILITY_SERVICE QoS specific to the DataWriter. |
max_instances | maximum number of instances, 0 for unlimited |
max_total_samples | maximum total number of samples, 0 for unlimited |
Definition at line 68 of file WriteDataContainer.cpp.
References ACE_DEBUG, acked_sequences_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::GUID_UNKNOWN, LM_DEBUG, n_chunks_, sample_list_element_allocator_, and OpenDDS::DCPS::SequenceNumber::ZERO().
OpenDDS::DCPS::WriteDataContainer::~WriteDataContainer | ( | ) |
Definition at line 126 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, deadline_task_, OpenDDS::DCPS::SendStateDataSampleList::dequeue_head(), OpenDDS::DCPS::TransportRegistry::instance(), LM_DEBUG, LM_ERROR, LM_WARNING, orphaned_to_transport_, release_buffer(), OpenDDS::DCPS::TransportRegistry::released(), sending_data_, sent_data_, shutdown_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.
|
private |
|
private |
Definition at line 176 of file WriteDataContainer.cpp.
References acked_sequences_, cached_cumulative_ack_valid_, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::DisjointSequence::reset(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), wfa_lock_, and OpenDDS::DCPS::SequenceNumber::ZERO().
|
private |
Definition at line 1737 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::PublicationInstance::deadline_, deadline_map_, deadline_period_, deadline_task_, OpenDDS::DCPS::TimeDuration::max_value, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.
Referenced by remove_instance().
|
private |
Definition at line 1370 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_NEW_MALLOC, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::PublicationInstance::durable_samples_remaining_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_head(), OpenDDS::DCPS::DataWriterImpl::filter_out(), LM_DEBUG, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::SendStateDataSampleList::rbegin(), OpenDDS::DCPS::SendStateDataSampleList::rend(), OpenDDS::DCPS::resend_data_expired(), sample_list_element_allocator_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and writer_.
Referenced by reenqueue_all().
void OpenDDS::DCPS::WriteDataContainer::data_delivered | ( | const DataSampleElement * | sample | ) |
Acknowledge the delivery of data. The sample that resides in this container will be moved from sending_data_ list to the internal sent_data_ list. If there are any threads waiting for available space, it wakes up these threads.
Definition at line 651 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_SYNCH_MUTEX, ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::controlTracker, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::InstanceDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), get_cumulative_ack(), OpenDDS::DCPS::DataSampleElement::get_handle(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_num_subs(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::DataSampleElement::get_sub_id(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, LM_DEBUG, LM_WARNING, lock_, max_durable_per_instance_, OpenDDS::DCPS::MessageTracker::message_delivered(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::InstanceDataSampleList::on_some_list(), orphaned_to_transport_, pending_data(), publication_id_, ACE_Guard< ACE_LOCK >::release(), release_buffer(), OpenDDS::DCPS::SendStateDataSampleList::remove(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::PublicationInstance::samples_, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::set_flag(), shutdown_, topic_name_, unsent_data_, update_acked(), wakeup_blocking_writers(), wfa_condition_, wfa_lock_, and writer_.
Referenced by data_dropped().
void OpenDDS::DCPS::WriteDataContainer::data_dropped | ( | const DataSampleElement * | element, |
bool | dropped_by_transport | ||
) |
This method is called by the transport to notify the sample is dropped. Which the transport was told to do by the publication code by calling TransportClient::remove_sample(). If the sample was "sending" then it is moved to the "unsent" list. If there are any threads waiting for available space then it needs wake up these threads. The dropped_by_transport flag true indicates the dropping initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample().
Definition at line 817 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::controlTracker, data_delivered(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), LM_DEBUG, LM_WARNING, lock_, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::InstanceDataSampleList::on_some_list(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::SendStateDataSampleList::remove(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, shutdown_, topic_name_, unsent_data_, wakeup_blocking_writers(), and writer_.
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::dispose | ( | DDS::InstanceHandle_t | handle, |
Message_Block_Ptr & | registered_sample, | ||
bool | dup_registered_sample = true |
||
) |
Delete the samples for the provided instance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.
This method returns error if the instance is not registered.
Definition at line 482 of file WriteDataContainer.cpp.
References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::find(), instances_, LM_ERROR, lock_, remove_instance(), DDS::RETCODE_ERROR, and DDS::RETCODE_PRECONDITION_NOT_MET.
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue | ( | DataSampleElement * | sample, |
DDS::InstanceHandle_t | instance | ||
) |
Enqueue the data sample in its instance thread. This method assumes there is an available space for the sample in the instance list.
Definition at line 294 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::InstanceDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), extend_deadline(), get_handle_instance(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, shutdown_, and unsent_data_.
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue_control | ( | DataSampleElement * | control_sample | ) |
Definition at line 276 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, shutdown_, and unsent_data_.
|
private |
Definition at line 1712 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::PublicationInstance::deadline_, deadline_map_, deadline_period_, deadline_task_, OpenDDS::DCPS::TimeDuration::max_value, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now().
Referenced by enqueue(), and register_instance().
|
private |
Definition at line 207 of file WriteDataContainer.cpp.
References acked_sequences_, cached_cumulative_ack_, cached_cumulative_ack_valid_, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().
Referenced by data_delivered(), remove_reader_acks(), and sequence_acknowledged_i().
PublicationInstance_rch OpenDDS::DCPS::WriteDataContainer::get_handle_instance | ( | DDS::InstanceHandle_t | handle | ) |
Definition at line 1355 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::find(), instances_, and LM_DEBUG.
Referenced by enqueue(), and obtain_buffer().
void OpenDDS::DCPS::WriteDataContainer::get_instance_handles | ( | InstanceHandleVec & | instance_handles | ) |
Definition at line 1513 of file WriteDataContainer.cpp.
References ACE_GUARD, instances_, and lock_.
|
private |
Definition at line 229 of file WriteDataContainer.cpp.
References acked_sequences_, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().
SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::get_resend_data | ( | ) |
Obtain a list of data for resending. This is only used when TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list returned is not put on any SendStateDataSampleList.
Definition at line 622 of file WriteDataContainer.cpp.
References DBG_ENTRY_LVL, resend_data_, and OpenDDS::DCPS::SendStateDataSampleList::reset().
ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::get_unsent_data | ( | SendStateDataSampleList & | list | ) |
Obtain a list of data that has not yet been sent. The data on the list returned is moved from the internal unsent_data_ list to the internal sending_data_ list as part of this call. The entire list is linked via the DataSampleElement.next_send_sample_ link as well.
Definition at line 583 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::begin(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::reset(), sending_data_, OpenDDS::DCPS::DataSampleElement::set_transaction_id(), transaction_id_, and unsent_data_.
|
private |
Definition at line 1593 of file WriteDataContainer.cpp.
References ACE_DEBUG, instances_, LM_DEBUG, num_all_samples(), orphaned_to_transport_, sending_data_, sent_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.
Referenced by wait_pending().
size_t OpenDDS::DCPS::WriteDataContainer::num_all_samples | ( | ) |
Return the number of samples for all instances.
Definition at line 563 of file WriteDataContainer.cpp.
References ACE_GUARD_RETURN, instances_, and lock_.
Referenced by log_send_state_lists(), and obtain_buffer().
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::num_samples | ( | DDS::InstanceHandle_t | handle, |
size_t & | size | ||
) |
Return the number of samples for the given instance.
Definition at line 542 of file WriteDataContainer.cpp.
References ACE_GUARD_RETURN, OpenDDS::DCPS::find(), instances_, lock_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, and OpenDDS::DCPS::InstanceDataSampleList::size().
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer | ( | DataSampleElement *& | element, |
DDS::InstanceHandle_t | handle | ||
) |
Allocate a DataSampleElement object and check the space availability for newly allocated element according to qos settings. For the blocking write case, if resource limits or history qos limits are reached, then it blocks for max blocking time for a previous sample to be delivered or dropped by the transport. In non-blocking write case, if resource limits or history qos limits are reached, will attempt to remove oldest samples (forcing the transport to drop samples if necessary) to make space. If there are several threads waiting then the first one in the waiting list can enqueue, others continue waiting. Note: the lock should be held before calling this method
Definition at line 1138 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_NEW_MALLOC_RETURN, ACE_TEXT(), condition_, OpenDDS::DCPS::CvStatus_Error, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::CvStatus_Timeout, data_holder_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterDataSampleList::enqueue_tail(), get_handle_instance(), history_depth_, instances_, DDS::ReliabilityQosPolicy::kind, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), max_blocking_time_, max_num_samples_, max_samples_per_instance_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), num_all_samples(), publication_id_, OpenDDS::DCPS::DataWriterImpl::qos_, release_buffer(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, remove_excess_durable(), remove_oldest_sample(), DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sample_list_element_allocator_, OpenDDS::DCPS::PublicationInstance::samples_, shutdown_, TheServiceParticipant, OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until(), waiting_on_release_, and writer_.
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control | ( | DataSampleElement *& | element | ) |
Definition at line 1120 of file WriteDataContainer.cpp.
References ACE_NEW_MALLOC_RETURN, DBG_ENTRY_LVL, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), publication_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, and writer_.
|
private |
|
private |
typedef OpenDDS::DCPS::WriteDataContainer::OPENDDS_VECTOR | ( | DDS::InstanceHandle_t | ) |
Returns a vector of handles for the instances registered for this data writer.
|
private |
|
private |
Returns if pending data exists. This includes sending, and unsent data.
Definition at line 643 of file WriteDataContainer.cpp.
References orphaned_to_transport_, sending_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.
Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().
bool OpenDDS::DCPS::WriteDataContainer::persist_data | ( | ) |
Copy sent data to data DURABILITY cache.
Definition at line 1425 of file WriteDataContainer.cpp.
References ACE_ERROR, ACE_TEXT(), domain_id_, durability_cache_, durability_service_, OpenDDS::DCPS::DataDurabilityCache::insert(), LM_ERROR, sent_data_, topic_name_, and type_name_.
|
private |
Definition at line 1655 of file WriteDataContainer.cpp.
References ACE_GUARD, OpenDDS::DCPS::PublicationInstance::deadline_, deadline_last_total_count_, deadline_map_, deadline_period_, deadline_status_, deadline_status_lock_, deadline_task_, OpenDDS::DCPS::PublicationInstance::instance_handle_, DDS::OfferedDeadlineMissedStatus::last_instance_handle, OpenDDS::DCPS::DataWriterImpl::listener_for(), lock_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), DDS::OFFERED_DEADLINE_MISSED_STATUS, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::OfferedDeadlineMissedStatus::total_count, DDS::OfferedDeadlineMissedStatus::total_count_change, and writer_.
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::reenqueue_all | ( | const GUID_t & | reader_id, |
const DDS::LifespanQosPolicy & | lifespan, | ||
const OPENDDS_STRING & | filterClassName, | ||
const FilterEvaluator * | eval, | ||
const DDS::StringSeq & | params | ||
) |
Create a resend list with the copies of all current "sending" and "sent" samples. The samples will be sent to the subscriber specified.
Definition at line 326 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), acked_sequences_, OpenDDS::DCPS::SendStateDataSampleList::begin(), cached_cumulative_ack_valid_, copy_and_prepend(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::DisjointSequence::erase(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::GUID_UNKNOWN, instances_, LM_DEBUG, lock_, max_durable_per_instance_, publication_id_, resend_data_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sending_data_, sent_data_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SendStateDataSampleList::size(), topic_name_, and wfa_lock_.
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::register_instance | ( | DDS::InstanceHandle_t & | instance_handle, |
Message_Block_Ptr & | registered_sample | ||
) |
Dynamically allocate a PublicationInstance object and add to the instances_ list.
Definition at line 397 of file WriteDataContainer.cpp.
References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::bind(), ACE_Message_Block::duplicate(), extend_deadline(), OpenDDS::DCPS::find(), OpenDDS::DCPS::DataWriterImpl::get_next_handle(), DDS::HANDLE_NIL, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::PublicationInstance::instance_handle_, instances_, LM_ERROR, max_num_instances_, OpenDDS::DCPS::move(), OpenDDS::DCPS::PublicationInstance::registered_sample_, OpenDDS::DCPS::RcHandle< T >::reset(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, and writer_.
void OpenDDS::DCPS::WriteDataContainer::release_buffer | ( | DataSampleElement * | element | ) |
Release the memory previously allocated. This method is corresponding to the obtain_buffer method. If the memory is allocated by some allocator then the memory needs to be released to the allocator.
Definition at line 1310 of file WriteDataContainer.cpp.
References ACE_DES_FREE, data_holder_, OpenDDS::DCPS::WriterDataSampleList::dequeue(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::SAMPLE_DATA, and sample_list_element_allocator_.
Referenced by data_delivered(), data_dropped(), obtain_buffer(), remove_excess_durable(), remove_oldest_sample(), and ~WriteDataContainer().
|
private |
Remove the oldest "n" samples from each instance list that are in a state such that they could only be used for durability purposes (see reenqueue_all). "n" is determined by max_durable_per_instance_, so these samples are truly unneeded – there are max_durable_per_instance_ newer samples available in the instance.
Definition at line 941 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::InstanceDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, instances_, LM_DEBUG, max_durable_per_instance_, OpenDDS::DCPS::InstanceDataSampleList::prev(), publication_id_, release_buffer(), sent_data_, OpenDDS::DCPS::InstanceDataSampleList::tail(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and topic_name_.
Referenced by obtain_buffer().
|
private |
Definition at line 509 of file WriteDataContainer.cpp.
References cancel_deadline(), ACE_Message_Block::duplicate(), OpenDDS::DCPS::PublicationInstance::registered_sample_, remove_oldest_sample(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, and OpenDDS::DCPS::InstanceDataSampleList::size().
Referenced by dispose(), unregister(), and unregister_all().
|
private |
Remove the oldest sample (head) from the instance history list. This method also updates the internal lists to reflect the change. If the sample is in the unsent_data_ or sent_data_ list then it will be released. If the sample is in the sending_data_ list then the transport will be notified to release the sample, then the sample will be released. Otherwise an error is returned. The "released" boolean value indicates whether the sample is released.
Definition at line 983 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue_head(), domain_id_, empty_condition_, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::TransportClient::remove_sample(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, topic_name_, unsent_data_, and writer_.
Referenced by obtain_buffer(), and remove_instance().
|
private |
Definition at line 191 of file WriteDataContainer.cpp.
References acked_sequences_, cached_cumulative_ack_valid_, get_cumulative_ack(), OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), wfa_condition_, and wfa_lock_.
bool OpenDDS::DCPS::WriteDataContainer::sequence_acknowledged | ( | const SequenceNumber & | sequence | ) |
Definition at line 1557 of file WriteDataContainer.cpp.
References sequence_acknowledged_i(), and wfa_lock_.
|
private |
Definition at line 1564 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, get_cumulative_ack(), OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, publication_id_, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().
Referenced by sequence_acknowledged(), and wait_ack_of_seq().
|
private |
Definition at line 1606 of file WriteDataContainer.cpp.
References deadline_map_, deadline_period_, deadline_task_, instances_, OpenDDS::DCPS::TimeDuration::max_value, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_ASSERT, and OpenDDS::DCPS::swap().
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::unregister | ( | DDS::InstanceHandle_t | handle, |
Message_Block_Ptr & | registered_sample, | ||
bool | dup_registered_sample = true |
||
) |
Remove the provided instance from the instances_ list. The registered sample data will be released upon the deletion of the PublicationInstance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.
This method returns error if the instance is not registered.
Definition at line 452 of file WriteDataContainer.cpp.
References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), instances_, LM_ERROR, lock_, remove_instance(), DDS::RETCODE_ERROR, and DDS::RETCODE_PRECONDITION_NOT_MET.
void OpenDDS::DCPS::WriteDataContainer::unregister_all | ( | ) |
Unregister all instances managed by this data containers.
Definition at line 1321 of file WriteDataContainer.cpp.
References ACE_ERROR, ACE_GUARD, condition_, DBG_ENTRY_LVL, instances_, LM_ERROR, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::TransportClient::remove_all_msgs(), remove_instance(), DDS::RETCODE_OK, OpenDDS::DCPS::DataWriterImpl::return_handle(), shutdown_, waiting_on_release_, and writer_.
|
private |
Definition at line 245 of file WriteDataContainer.cpp.
References acked_sequences_, cached_cumulative_ack_valid_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), and wfa_condition_.
Referenced by data_delivered().
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::wait_ack_of_seq | ( | const MonotonicTimePoint & | abs_deadline, |
bool | deadline_is_infinite, | ||
const SequenceNumber & | sequence | ||
) |
Definition at line 1527 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::CvStatus_Error, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::CvStatus_Timeout, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sequence_acknowledged_i(), TheServiceParticipant, OpenDDS::DCPS::ConditionVariable< Mutex >::wait(), OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until(), wfa_condition_, and wfa_lock_.
void OpenDDS::DCPS::WriteDataContainer::wait_pending | ( | const MonotonicTimePoint & | deadline | ) |
Block until pending samples have either been delivered, dropped, or the deadline has passed. Blocks indefinitely if deadline is zero.
Definition at line 1465 of file WriteDataContainer.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::CvStatus_Error, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::CvStatus_Timeout, OpenDDS::DCPS::DCPS_debug_level, empty_condition_, OpenDDS::DCPS::TimePoint_T< AceClock >::is_zero(), LM_DEBUG, LM_ERROR, LM_INFO, lock_, log_send_state_lists(), pending_data(), TheServiceParticipant, OpenDDS::DCPS::TimePoint_T< AceClock >::value(), and OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until().
|
private |
Called when data has been dropped or delivered and any blocked writers should be notified
Definition at line 1583 of file WriteDataContainer.cpp.
References condition_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), and waiting_on_release_.
Referenced by data_delivered(), and data_dropped().
|
friend |
Definition at line 356 of file WriteDataContainer.h.
|
friend |
Definition at line 125 of file WriteDataContainer.h.
|
private |
Definition at line 426 of file WriteDataContainer.h.
Referenced by add_reader_acks(), get_cumulative_ack(), get_last_ack(), reenqueue_all(), remove_reader_acks(), update_acked(), and WriteDataContainer().
|
private |
Definition at line 427 of file WriteDataContainer.h.
Referenced by get_cumulative_ack().
|
private |
Definition at line 428 of file WriteDataContainer.h.
Referenced by add_reader_acks(), get_cumulative_ack(), reenqueue_all(), remove_reader_acks(), and update_acked().
|
private |
Definition at line 513 of file WriteDataContainer.h.
Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().
|
private |
The list of all samples written to this datawriter in writing order.
Definition at line 454 of file WriteDataContainer.h.
Referenced by obtain_buffer(), and release_buffer().
|
private |
Last total_count when status was last checked.
Definition at line 571 of file WriteDataContainer.h.
Referenced by process_deadlines().
|
private |
Definition at line 562 of file WriteDataContainer.h.
Referenced by cancel_deadline(), extend_deadline(), process_deadlines(), and set_deadline_period().
|
private |
Definition at line 560 of file WriteDataContainer.h.
Referenced by cancel_deadline(), extend_deadline(), process_deadlines(), and set_deadline_period().
|
private |
Reference to the missed requested deadline status structure.
Definition at line 568 of file WriteDataContainer.h.
Referenced by process_deadlines().
|
private |
Lock for synchronization of status_
member.
Definition at line 565 of file WriteDataContainer.h.
Referenced by process_deadlines().
|
private |
Timer responsible for reporting missed offered deadlines.
Definition at line 559 of file WriteDataContainer.h.
Referenced by cancel_deadline(), extend_deadline(), process_deadlines(), set_deadline_period(), and ~WriteDataContainer().
|
private |
Domain ID.
Definition at line 535 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), persist_data(), reenqueue_all(), remove_excess_durable(), and remove_oldest_sample().
|
private |
Pointer to the data durability cache.
This a pointer to the data durability cache owned by the Service Participant singleton, which means this cache is also a singleton.
Definition at line 551 of file WriteDataContainer.h.
Referenced by persist_data().
|
private |
DURABILITY_SERVICE QoS specific to the DataWriter.
Definition at line 554 of file WriteDataContainer.h.
Referenced by persist_data().
|
private |
Definition at line 514 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().
|
private |
Definition at line 475 of file WriteDataContainer.h.
Referenced by obtain_buffer().
|
private |
The individual instance queue threads in the data.
Definition at line 463 of file WriteDataContainer.h.
Referenced by dispose(), get_handle_instance(), get_instance_handles(), log_send_state_lists(), num_all_samples(), num_samples(), obtain_buffer(), reenqueue_all(), register_instance(), remove_excess_durable(), set_deadline_period(), unregister(), and unregister_all().
|
mutableprivate |
This lock is used to protect the container and the map in the type-specific DataWriter. This lock can be accessible via the datawriter. This lock is made to be globally accessible for performance concern. The lock is acquired as the external call (e.g. FooDataWriterImpl::write) started and the same lock will be used by the transport thread to notify the datawriter the data is delivered. Other internal operations will not lock.
Definition at line 511 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), dispose(), get_instance_handles(), num_all_samples(), num_samples(), process_deadlines(), reenqueue_all(), unregister(), unregister_all(), and wait_pending().
|
private |
The maximum time to block on write operation. This comes from DataWriter's QoS HISTORY.max_blocking_time
Definition at line 497 of file WriteDataContainer.h.
Referenced by obtain_buffer().
|
private |
The maximum number of samples from each instance that can be added to the resend_data_ for durability.
Definition at line 479 of file WriteDataContainer.h.
Referenced by data_delivered(), reenqueue_all(), and remove_excess_durable().
|
private |
The maximum number of instances allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
Definition at line 485 of file WriteDataContainer.h.
Referenced by register_instance().
|
private |
The maximum number of samples allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS It also covers QoS.RESOURCE_LIMITS.max_samples and max_instances * max_samples_per_instance
Definition at line 493 of file WriteDataContainer.h.
Referenced by obtain_buffer().
|
private |
The maximum size a container should allow for an instance sample list
Definition at line 473 of file WriteDataContainer.h.
Referenced by obtain_buffer().
|
private |
The number of chunks that sample_list_element_allocator_ needs initialize.
Definition at line 525 of file WriteDataContainer.h.
Referenced by WriteDataContainer().
|
private |
List of data that has been released by WriteDataContainer but is still in process of delivery (or dropping) by transport
Definition at line 450 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), log_send_state_lists(), pending_data(), remove_oldest_sample(), and ~WriteDataContainer().
|
private |
The publication Id from repo.
Definition at line 466 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), obtain_buffer(), obtain_buffer_for_control(), reenqueue_all(), remove_excess_durable(), remove_oldest_sample(), and sequence_acknowledged_i().
|
private |
List of the data reenqueued to support the TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the samples in sent and sending list. This list will be passed to the transport for re-sending.
Definition at line 460 of file WriteDataContainer.h.
Referenced by get_resend_data(), and reenqueue_all().
|
private |
The cached allocator to allocate DataSampleElement objects.
Definition at line 529 of file WriteDataContainer.h.
Referenced by copy_and_prepend(), obtain_buffer(), obtain_buffer_for_control(), release_buffer(), and WriteDataContainer().
|
private |
List of data that is currently being sent.
Definition at line 443 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), get_unsent_data(), log_send_state_lists(), pending_data(), reenqueue_all(), remove_oldest_sample(), and ~WriteDataContainer().
|
private |
List of data that has already been sent.
Definition at line 446 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), log_send_state_lists(), persist_data(), reenqueue_all(), remove_excess_durable(), remove_oldest_sample(), and ~WriteDataContainer().
|
private |
The flag indicates the datawriter will be destroyed.
Definition at line 532 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), enqueue(), enqueue_control(), obtain_buffer(), unregister_all(), and ~WriteDataContainer().
|
private |
Topic name.
Definition at line 538 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), persist_data(), reenqueue_all(), remove_excess_durable(), and remove_oldest_sample().
|
private |
Id used to keep track of which send transaction DataWriter is currently creating
Definition at line 440 of file WriteDataContainer.h.
Referenced by get_unsent_data().
|
private |
|
private |
List of data that has not been sent yet.
Definition at line 436 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), enqueue(), enqueue_control(), get_unsent_data(), log_send_state_lists(), pending_data(), remove_oldest_sample(), and ~WriteDataContainer().
|
private |
The block waiting flag.
Definition at line 500 of file WriteDataContainer.h.
Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().
|
private |
Used to block in wait_for_acks().
Definition at line 521 of file WriteDataContainer.h.
Referenced by data_delivered(), remove_reader_acks(), update_acked(), and wait_ack_of_seq().
|
private |
Lock used for wait_for_acks() processing.
Definition at line 517 of file WriteDataContainer.h.
Referenced by add_reader_acks(), data_delivered(), reenqueue_all(), remove_reader_acks(), sequence_acknowledged(), and wait_ack_of_seq().
|
private |
The writer that owns this container.
Definition at line 469 of file WriteDataContainer.h.
Referenced by copy_and_prepend(), data_delivered(), data_dropped(), obtain_buffer(), obtain_buffer_for_control(), process_deadlines(), register_instance(), remove_oldest_sample(), and unregister_all().