16 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 51 if (now >= expiration_time) {
55 ACE_TEXT(
"OpenDDS (%P|%t) Data to be sent ")
56 ACE_TEXT(
"expired by %d seconds, %d microseconds.\n"),
76 const char* topic_name,
77 const char* type_name,
78 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
87 : cached_cumulative_ack_valid_(false)
91 , max_samples_per_instance_(max_samples_per_instance)
92 , history_depth_(history_depth)
93 , max_durable_per_instance_(max_durable_per_instance)
94 , max_num_instances_(max_instances)
95 , max_num_samples_(max_total_samples)
96 , max_blocking_time_(max_blocking_time)
97 , waiting_on_release_(false)
99 , empty_condition_(
lock_)
100 , wfa_condition_(wfa_lock_)
101 , n_chunks_(n_chunks)
102 , sample_list_element_allocator_(2 * n_chunks_)
104 , domain_id_(domain_id)
105 , topic_name_(topic_name)
106 , type_name_(type_name)
107 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
108 , durability_cache_(durability_cache)
109 , durability_service_(durability_service)
113 , deadline_status_lock_(deadline_status_lock)
114 , deadline_status_(deadline_status)
115 , deadline_last_total_count_(deadline_last_total_count)
119 "(%P|%t) WriteDataContainer " 120 "sample_list_element_allocator %x with %d chunks\n",
132 ACE_TEXT(
"(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
133 ACE_TEXT(
"destroyed with %d samples unsent.\n"),
145 ACE_TEXT(
"(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
146 ACE_TEXT(
"destroyed with %d samples sending.\n"),
153 ACE_TEXT(
"(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
154 ACE_TEXT(
"destroyed with %d samples sent.\n"),
161 ACE_TEXT(
"(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
162 ACE_TEXT(
"destroyed with %d samples orphaned_to_transport.\n"),
170 ACE_TEXT(
"WriteDataContainer::~WriteDataContainer, ")
171 ACE_TEXT(
"The container has not been cleaned.\n")));
219 if (!it->second.empty()) {
237 if (!it->second.empty()) {
247 bool do_notify =
false;
251 it->second.insert(seq);
253 if (prev_cum_ack != it->second.cumulative_ack()) {
261 if (prev_cum_ack < seq) {
264 if (prev_cum_ack != it->second.cumulative_ack()) {
328 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
342 for (PublicationInstanceMapType::iterator it =
instances_.begin();
344 const ssize_t durable = std::min(it->second->samples_.size(),
346 total_size += durable;
347 it->second->durable_samples_remaining_ = durable;
354 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
355 filterClassName, eval, expression_params,
363 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
364 filterClassName, eval, expression_params,
384 ACE_TEXT(
"(%P|%t) WriteDataContainer::reenqueue_all: ")
385 ACE_TEXT(
"domain %d topic %C publication %C copying ")
386 ACE_TEXT(
"sending/sent to resend to %C.\n"),
416 if (0 != insert_attempt) {
419 ACE_TEXT(
"WriteDataContainer::register_instance, ")
420 ACE_TEXT(
"failed to insert instance handle=%X\n"),
431 int const find_attempt =
find(
instances_, instance_handle, instance);
433 if (0 != find_attempt) {
436 ACE_TEXT(
"WriteDataContainer::register_instance, ")
437 ACE_TEXT(
"The provided instance handle=%X is not a valid")
455 bool dup_registered_sample)
464 PublicationInstanceMapType::iterator pos =
instances_.find(instance_handle);
468 ACE_TEXT(
"WriteDataContainer::unregister, ")
469 ACE_TEXT(
"The instance(handle=%X) ")
470 ACE_TEXT(
"is not registered yet.\n"),
474 instance = pos->second;
478 return remove_instance(instance, registered_sample, dup_registered_sample);
484 bool dup_registered_sample)
493 int const find_attempt =
find(
instances_, instance_handle, instance);
495 if (0 != find_attempt) {
498 ACE_TEXT(
"WriteDataContainer::dispose, ")
499 ACE_TEXT(
"The instance(handle=%X) ")
500 ACE_TEXT(
"is not registered yet.\n"),
505 return remove_instance(instance, registered_sample, dup_registered_sample);
511 bool dup_registered_sample)
513 if (dup_registered_sample) {
528 while (instance_list.
size() > 0) {
529 bool released =
false;
553 if (0 != find_attempt) {
572 for (PublicationInstanceMapType::iterator iter =
instances_.begin();
576 size += iter->second->samples_.size();
598 while (iter != list.
end()) {
692 ACE_TEXT(
"WriteDataContainer::data_delivered, ")
693 ACE_TEXT(
"The delivered sample is not in sending_data_ and ")
698 ACE_TEXT(
"WriteDataContainer::data_delivered, ")
699 ACE_TEXT(
"The delivered sample is not in sending_data_ and ")
700 ACE_TEXT(
"WAS IN unsent_data_ list.\n")));
710 ACE_TEXT(
"(%P|%t) WriteDataContainer::data_delivered: ")
711 ACE_TEXT(
"domain %d topic %C publication %C control message delivered.\n"),
719 if (containing_list == &orphaned_to_transport_) {
720 orphaned_to_transport_.dequeue(sample);
723 }
else if (!containing_list) {
750 ACE_TEXT(
"(%P|%t) WriteDataContainer::data_delivered: ")
751 ACE_TEXT(
"domain %d topic %C publication %C control message delivered.\n"),
777 ACE_TEXT(
"(%P|%t) WriteDataContainer::data_delivered: ")
778 ACE_TEXT(
"domain %d topic %C publication %C seq# %q %s.\n"),
785 : ACE_TEXT(
"released")));
792 ACE_TEXT(
"Inserting acked_sequence: %q\n"),
803 ACE_TEXT(
"(%P|%t) WriteDataContainer::data_delivered - ")
804 ACE_TEXT(
"broadcasting wait_for_acknowledgments update.\n")));
818 bool dropped_by_transport)
824 ACE_TEXT(
" sample %X dropped_by_transport %d\n"),
825 sample, dropped_by_transport));
832 if (dropped_by_transport) {
887 ACE_TEXT(
"WriteDataContainer::data_dropped, ")
888 ACE_TEXT(
"The dropped sample is not in sending_data_ and ")
893 ACE_TEXT(
"WriteDataContainer::data_dropped, ")
894 ACE_TEXT(
"The dropped sample is not in sending_data_ and ")
895 ACE_TEXT(
"WAS IN unsent_data_ list.\n")));
905 ACE_TEXT(
"(%P|%t) WriteDataContainer::data_dropped: ")
906 ACE_TEXT(
"domain %d topic %C publication %C control message dropped.\n"),
914 if (containing_list == &orphaned_to_transport_) {
915 orphaned_to_transport_.dequeue(sample);
922 }
else if (!containing_list) {
946 size_t n_released = 0;
948 for (PublicationInstanceMapType::iterator iter =
instances_.begin();
960 if (durable_allowed) {
974 ACE_TEXT(
"(%P|%t) WriteDataContainer::remove_excess_durable: ")
975 ACE_TEXT(
"domain %d topic %C publication %C %B samples removed ")
995 ACE_TEXT(
"WriteDataContainer::remove_oldest_sample, ")
996 ACE_TEXT(
"dequeue_head_next_sample failed\n")),
1031 bool result =
false;
1037 ACE_TEXT(
"WriteDataContainer::remove_oldest_sample, ")
1038 ACE_TEXT(
"removing from sending_data_ so must notify transport to remove sample\n")));
1052 this->orphaned_to_transport_.enqueue_tail(stale);
1061 }
else if (containing_list == &this->
sent_data_) {
1071 ACE_TEXT(
"(%P|%t) WriteDataContainer::remove_oldest_sample: ")
1072 ACE_TEXT(
"domain %d topic %C publication %C sample removed from HISTORY.\n"),
1089 ACE_TEXT(
"(%P|%t) WriteDataContainer::remove_oldest_sample: ")
1090 ACE_TEXT(
"domain %d topic %C publication %C sample removed from unsent.\n"),
1098 ACE_TEXT(
"WriteDataContainer::remove_oldest_sample, ")
1099 ACE_TEXT(
"The oldest sample is not in any internal list.\n")),
1110 ACE_TEXT(
"WriteDataContainer::remove_oldest_sample, ")
1111 ACE_TEXT(
"dequeue_next_send_sample from internal list failed.\n")),
1122 DBG_ENTRY_LVL(
"WriteDataContainer",
"obtain_buffer_for_control", 6);
1126 static_cast<DataSampleElement*>(
1153 static_cast<DataSampleElement*>(
1165 bool set_timeout =
true;
1179 ACE_TEXT(
" instance %d attempting to remove")
1180 ACE_TEXT(
" its oldest sample (reliable)\n"),
1183 bool oldest_released =
false;
1185 if (oldest_released) {
1192 set_timeout =
false;
1197 ACE_TEXT(
" instance %d waiting for samples to be released by transport\n"),
1210 ACE_TEXT(
" instance %d timed out waiting for samples to be released by transport\n"),
1219 "error in wait_until\n"));
1233 bool oldest_released =
false;
1238 if (instance_list.size() > 0) {
1241 ACE_TEXT(
" instance %d attempting to remove")
1251 ACE_TEXT(
" instance %d attempting to remove")
1252 ACE_TEXT(
" oldest sample from any full instances\n"),
1255 PublicationInstanceMapType::iterator it =
instances_.begin();
1268 ACE_TEXT(
" instance %d attempting to remove")
1269 ACE_TEXT(
" oldest sample from any instance with samples currently\n"),
1272 PublicationInstanceMapType::iterator it =
instances_.begin();
1275 if (it->second->samples_.size() > 0) {
1281 if (!oldest_released) {
1286 ACE_TEXT(
"WriteDataContainer::obtain_buffer, ")
1287 ACE_TEXT(
"hitting resource limits with no samples to remove\n")));
1295 ACE_TEXT(
" instance %d could not obtain buffer for sample")
1296 ACE_TEXT(
" releasing allotted sample and returning\n"),
1342 for (PublicationInstanceMapType::iterator pos =
instances_.begin(), limit =
instances_.end(); pos != limit;) {
1346 "remove_instance %X failed\n", pos->first));
1362 ACE_TEXT(
"WriteDataContainer::get_handle_instance, ")
1363 ACE_TEXT(
"lookup for %d failed\n"), handle));
1374 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1382 cur != appended.
rend() && max_resend_samples; ++cur) {
1387 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 1405 static_cast<DataSampleElement*>(
1414 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) WriteDataContainer::copy_and_prepend added seq# %q\n",
1415 cur->get_header().sequence_.getValue()));
1419 --max_resend_samples;
1423 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 1440 bool const inserted =
1453 ACE_TEXT(
"WriteDataContainer::persist_data, ")
1454 ACE_TEXT(
"failed to make data durable for ")
1455 ACE_TEXT(
"(domain, topic, type) = (%d, %C, %C)\n"),
1467 const bool no_deadline = deadline.
is_zero();
1476 &deadline.
value()));
1491 "Timed out waiting for messages to be transported\n"));
1501 "error in wait_until\n"));
1518 PublicationInstanceMapType::iterator it =
instances_.begin();
1521 instance_handles.push_back(it->second->instance_handle_);
1528 bool deadline_is_infinite,
1540 ACE_TEXT(
" timed out waiting for sequence %q to be acked\n"),
1547 "error in wait/wait_until\n"));
1595 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
1596 description.c_str(),
1618 for (PublicationInstanceMapType::iterator iter =
instances_.begin();
1621 iter->second->deadline_ = deadline;
1622 deadline_map_.insert(std::make_pair(deadline, iter->second));
1635 DeadlineMapType new_map;
1636 for (PublicationInstanceMapType::iterator iter =
instances_.begin();
1639 iter->second->deadline_ = deadline;
1640 new_map.insert(std::make_pair(iter->second->deadline_, iter->second));
1666 bool notify =
false;
1669 pos != limit && pos->first < now; pos =
deadline_map_.begin()) {
1693 listener->on_offered_deadline_missed(
writer_, status);
1720 std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r =
deadline_map_.equal_range(instance->
deadline_);
1721 while (r.first != r.second && r.first->second != instance) {
1724 if (r.first != r.second) {
1745 std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r =
deadline_map_.equal_range(instance->
deadline_);
1746 while (r.first != r.second && r.first->second != instance) {
1749 if (r.first != r.second) {
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
DataSampleElementAllocator sample_list_element_allocator_
RcHandle< PublicationInstance > PublicationInstance_rch
CORBA::ULong get_num_subs() const
void swap(MessageBlock &lhs, MessageBlock &rhs)
DDS::Duration_t max_blocking_time_
RcHandle< T > rchandle_from(T *pointer)
void * malloc(size_t nbytes=sizeof(T))
void remove_reader_acks(const GUID_t &reader)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool insert(DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos)
void copy_and_prepend(SendStateDataSampleList &list, const SendStateDataSampleList &appended, const GUID_t &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq ¶ms, ssize_t &max_resend_samples)
const DataSampleHeader & get_header() const
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
void erase(SequenceNumber value)
bool dequeue(const DataSampleElement *stale)
ReliabilityQosPolicy reliability
AckedSequenceMap acked_sequences_
bool sequence_acknowledged_i(const SequenceNumber &sequence)
iterator end()
Return iterator to end of list.
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
void set_sub_id(CORBA::ULong index, OpenDDS::DCPS::GUID_t id)
DDS::ReturnCode_t remove_oldest_sample(InstanceDataSampleList &instance_list, bool &released)
void log_send_state_lists(OPENDDS_STRING description)
const long DURATION_INFINITE_SEC
DataSampleElement * tail() const
void reset()
Reset to initial state.
DDS::ReturnCode_t wait_ack_of_seq(const MonotonicTimePoint &abs_deadline, bool deadline_is_infinite, const SequenceNumber &sequence)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
static const SendStateDataSampleList * send_list_containing_element(const DataSampleElement *element, SendStateDataSampleList **begin, SendStateDataSampleList **end)
int bind(Container &c, const FirstType &first, const SecondType &second)
GUID_t publication_id_
The publication Id from repo.
bool sequence_acknowledged(const SequenceNumber &sequence)
DDS::ReturnCode_t enqueue(DataSampleElement *sample, DDS::InstanceHandle_t instance)
CORBA::Long max_num_instances_
DDS::ReturnCode_t enqueue_control(DataSampleElement *control_sample)
ACE_UINT32 source_timestamp_nanosec_
SequenceNumber get_cumulative_ack()
void cancel_deadline(const PublicationInstance_rch &instance)
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
bool dequeue(const DataSampleElement *stale)
Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..
CORBA::Long max_num_samples_
#define OPENDDS_ASSERT(C)
void update_acked(const SequenceNumber &seq, const GUID_t &id=GUID_UNKNOWN)
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
void wait_pending(const MonotonicTimePoint &deadline)
const ACE_Time_Value & value() const
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
SendStateDataSampleList orphaned_to_transport_
T::rv_reference move(T &p)
ACE_Recursive_Thread_Mutex & deadline_status_lock_
Lock for synchronization of status_ member.
DDS::ReturnCode_t register_instance(DDS::InstanceHandle_t &instance_handle, Message_Block_Ptr ®istered_sample)
PublicationInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
ACE_Guard< ACE_Thread_Mutex > lock_
void remove_excess_durable()
void set_transaction_id(ACE_UINT64 transaction_id)
MessageTracker controlTracker
static bool on_some_list(const DataSampleElement *iter)
DDS::InstanceHandle_t get_next_handle()
void release_buffer(DataSampleElement *element)
CORBA::Long history_depth_
static TimePoint_T< SystemClock > now()
MonotonicTimePoint deadline_
Deadline for Deadline QoS.
InstanceHandle_t last_instance_handle
DDS::ReturnCode_t unregister(DDS::InstanceHandle_t handle, Message_Block_Ptr ®istered_sample, bool dup_registered_sample=true)
DDS::ReturnCode_t obtain_buffer(DataSampleElement *&element, DDS::InstanceHandle_t handle)
DOMAINID_TYPE_NATIVE DomainId_t
bool remove_sample(const DataSampleElement *sample)
WriterDataSampleList data_holder_
void enqueue_tail(const DataSampleElement *element)
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
void add_reader_acks(const GUID_t &reader, const SequenceNumber &base)
DataSample * get_sample() const
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.
ReliabilityQosPolicyKind kind
const StatusKind OFFERED_DEADLINE_MISSED_STATUS
DDS::DurabilityServiceQosPolicy const & durability_service_
DURABILITY_SERVICE QoS specific to the DataWriter.
OpenDDS::DCPS::GUID_t get_sub_id(CORBA::ULong index) const
CORBA::Long max_samples_per_instance_
PublicationInstance_rch get_handle() const
TimeDuration deadline_period_
bool dequeue_head(DataSampleElement *&stale)
SequenceNumber get_last_ack()
SendStateDataSampleList sending_data_
List of data that is currently being sent.
void data_dropped(const DataSampleElement *element, bool dropped_by_transport)
const ReturnCode_t RETCODE_TIMEOUT
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ssize_t durable_samples_remaining_
Only used by WriteDataContainer::reenqueue_all() while WDC is locked.
void free(void *ptr)
Return a chunk of memory back to free list cache.
void return_handle(DDS::InstanceHandle_t handle)
ACE_INLINE OpenDDS_Dcps_Export ACE_Time_Value time_to_time_value(const DDS::Time_t &t)
ACE_UINT64 get_unsent_data(SendStateDataSampleList &list)
iterator begin()
Return iterator to beginning of list.
virtual ACE_Message_Block * duplicate(void) const
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
static SequenceNumber ZERO()
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
bool dequeue_head(DataSampleElement *&stale)
WriteDataContainer(DataWriterImpl *writer, CORBA::Long max_samples_per_instance, CORBA::Long history_depth, CORBA::Long max_durable_per_instance, DDS::Duration_t max_blocking_time, size_t n_chunks, DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataDurabilityCache *durability_cache, DDS::DurabilityServiceQosPolicy const &durability_service, CORBA::Long max_instances, CORBA::Long max_total_samples, ACE_Recursive_Thread_Mutex &deadline_status_lock, DDS::OfferedDeadlineMissedStatus &deadline_status, CORBA::Long &deadline_last_total_count)
void set_num_subs(CORBA::ULong num_subs)
SequenceNumber cached_cumulative_ack_
bool dequeue(const DataSampleElement *stale)
void data_delivered(const DataSampleElement *sample)
bool notify_all()
Unblock all of the threads waiting on this condition.
HANDLE_TYPE_NATIVE InstanceHandle_t
ACE_INT32 source_timestamp_sec_
ACE_Recursive_Thread_Mutex lock_
const unsigned long DURATION_INFINITE_NSEC
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
bool waiting_on_release_
The block waiting flag.
bool cached_cumulative_ack_valid_
The wait has returned because of a timeout.
unsigned long long ACE_UINT64
std::pair< SequenceNumber, SequenceNumber > SequenceRange
DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
static void remove(DataSampleElement *stale)
DDS::OfferedDeadlineMissedStatus & deadline_status_
Reference to the missed requested deadline status structure.
bool resend_data_expired(const DataSampleElement &element, const DDS::LifespanQosPolicy &lifespan)
suseconds_t usec(void) const
SendStateDataSampleList resend_data_
static void set_flag(DataSampleHeaderFlag flag, ACE_Message_Block *buffer)
WfaConditionVariableType wfa_condition_
Used to block in wait_for_acks().
void wakeup_blocking_writers(DataSampleElement *stale)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
void enqueue_head(const DataSampleElement *element)
DataWriterImpl * writer_
The writer that owns this container.
ConditionVariableType condition_
void enqueue_tail(const DataSampleElement *element)
InstanceDataSampleList samples_
History of the instance samples.
CORBA::Long max_durable_per_instance_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
void get_instance_handles(InstanceHandleVec &instance_handles)
SendStateDataSampleList STL-style iterator implementation.
CORBA::Long & deadline_last_total_count_
Last total_count when status was last checked.
Sequence number abstraction. Only allows positive 64 bit values.
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static TransportRegistry * instance()
Return a singleton instance of this class.
void set_deadline_period(const TimeDuration &deadline_period)
SendStateDataSampleList sent_data_
List of data that has already been sent.
const ReturnCode_t RETCODE_OK
void notify_status_condition()
DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle, Message_Block_Ptr ®istered_sample, bool dup_registered_sample=true)
void process_deadlines(const MonotonicTimePoint &now)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
DDS::DomainId_t const domain_id_
Domain ID.
DDS::ReturnCode_t remove_instance(PublicationInstance_rch instance, Message_Block_Ptr ®istered_sample, bool dup_registered_sample)
void enqueue_tail(const DataSampleElement *element)
DeadlineMapType deadline_map_
#define ACE_ERROR_RETURN(X, Y)
The wait has returned because it was woken up.
DDS::ReturnCode_t reenqueue_all(const GUID_t &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq ¶ms)
std::reverse_iterator< const_iterator > const_reverse_iterator
Message_Block_Ptr registered_sample_
The sample data for registration.
DataDurabilityCache *const durability_cache_
Pointer to the data durability cache.
#define TheServiceParticipant
void extend_deadline(const PublicationInstance_rch &instance)
DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle, size_t &size)
const ACE_Time_Value_T< AceClock > & value() const
char const *const topic_name_
Topic name.
ACE_UINT64 transaction_id_
The Internal API and Implementation of OpenDDS.
A container for instances sample data.
static DataSampleElement * prev(const DataSampleElement *iter)
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
ConditionVariableType empty_condition_
char const *const type_name_
Type name.
reverse_iterator rbegin()
Struct that has information about an instance and the instance sample list.
DDS::ReturnCode_t obtain_buffer_for_control(DataSampleElement *&element)
sequence< string > StringSeq
SendStateDataSampleList get_resend_data()
static const TimeDuration max_value
const ReturnCode_t RETCODE_BAD_PARAMETER
bool shutdown_
The flag indicates the datawriter will be destroyed.
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.