A container for instances sample data. More...
#include <WriteDataContainer.h>
A container for instances sample data.
This container is instantiated per DataWriter. It maintains list of PublicationInstance objects which is internally referenced by the instance handle.
This container contains threaded lists of all data written to a given DataWriter. The real data sample is represented by the DataSampleElement. The data_holder_ holds all DataSampleElement in the writing order via the next_writer_sample_/previous_writer_sample_ thread. The instance list in PublicationInstance links samples via the next_instance_sample_ thread.
There are three state transition lists used during write operations: unsent, sending, and sent. These lists are linked via the next_send_sample_/previous_send_sample_ thread. Any DataSampleElement should be in one of these three lists and SHOULD NOT be shared between these three lists. A normal transition of a DataSampleElement would be unsent->sending->sent. A DataSampleElement transitions from unsent to sent naturally during a write operation when get_unsent_data is called. A DataSampleElement transitions from sending to sent when data_delivered is called notifying the container that the transport is finished with the sample and it can be marked as a historical sample. A DataSampleElement may transition back to unsent from sending if the transport notifies that the sample was dropped and should be resent. A DataSampleElement is removed from sent list and freed when the instance queue or container (for another instance) needs more space. A DataSampleElement may be removed and freed directly from sending when and unreliable writer needs space. In this case the transport will be notified that the sample should be removed. A DataSampleElement may also be removed directly from unsent if an unreliable writer needs space for new samples. Note: The real data sample will be freed when the reference counting goes 0. The resend list is only used when the datawriter uses TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements for the data sample duplicates of the sending and sent list and hands this list off to the transport.
Definition at line 118 of file WriteDataContainer.h.
OpenDDS::DCPS::WriteDataContainer::WriteDataContainer | ( | DataWriterImpl * | writer, | |
CORBA::Long | max_samples_per_instance, | |||
CORBA::Long | history_depth, | |||
CORBA::Long | max_durable_per_instance, | |||
DDS::Duration_t | max_blocking_time, | |||
size_t | n_chunks, | |||
DDS::DomainId_t | domain_id, | |||
char const * | topic_name, | |||
char const * | type_name, | |||
DataDurabilityCache * | durability_cache, | |||
DDS::DurabilityServiceQosPolicy const & | durability_service, | |||
CORBA::Long | max_instances, | |||
CORBA::Long | max_total_samples | |||
) |
No default constructor, must be initialized.
writer | The writer which owns this container. | |
max_samples_per_instance | Max samples kept within each instance | |
max_durable_per_instance | Max durable samples sent for each instance | |
max_blocking_time | The timeout for write. | |
n_chunks | The number of chunks that the DataSampleElementAllocator needs allocate. | |
domain_id | Domain ID. | |
topic_name | Topic name. | |
type_name | Type name. | |
durability_cache | The data durability cache for unsent data. | |
durability_service | DURABILITY_SERVICE QoS specific to the DataWriter. | |
max_instances | maximum number of instances, 0 for unlimited | |
max_total_samples | maximum total number of samples, 0 for unlimited |
Definition at line 73 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, n_chunks_, and sample_list_element_allocator_.
00089 : transaction_id_(0), 00090 publication_id_(GUID_UNKNOWN), 00091 writer_(writer), 00092 max_samples_per_instance_(max_samples_per_instance), 00093 history_depth_(history_depth), 00094 max_durable_per_instance_(max_durable_per_instance), 00095 max_num_instances_(max_instances), 00096 max_num_samples_(max_total_samples), 00097 max_blocking_time_(max_blocking_time), 00098 waiting_on_release_(false), 00099 condition_(lock_), 00100 empty_condition_(lock_), 00101 wfa_condition_(this->wfa_lock_), 00102 n_chunks_(n_chunks), 00103 sample_list_element_allocator_(2 * n_chunks_), 00104 shutdown_(false), 00105 domain_id_(domain_id), 00106 topic_name_(topic_name), 00107 type_name_(type_name) 00108 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00109 , durability_cache_(durability_cache) 00110 , durability_service_(durability_service) 00111 #endif 00112 { 00113 00114 if (DCPS_debug_level >= 2) { 00115 ACE_DEBUG((LM_DEBUG, 00116 "(%P|%t) WriteDataContainer " 00117 "sample_list_element_allocator %x with %d chunks\n", 00118 &sample_list_element_allocator_, n_chunks_)); 00119 } 00120 }
OpenDDS::DCPS::WriteDataContainer::~WriteDataContainer | ( | ) |
Definition at line 122 of file WriteDataContainer.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue_head(), OpenDDS::DCPS::TransportRegistry::instance(), LM_DEBUG, LM_ERROR, LM_WARNING, orphaned_to_transport_, release_buffer(), OpenDDS::DCPS::TransportRegistry::released(), sending_data_, sent_data_, shutdown_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.
00123 { 00124 if (this->unsent_data_.size() > 0) { 00125 ACE_DEBUG((LM_WARNING, 00126 ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ") 00127 ACE_TEXT("destroyed with %d samples unsent.\n"), 00128 this->unsent_data_.size())); 00129 } 00130 00131 if (this->sending_data_.size() > 0) { 00132 if (TransportRegistry::instance()->released()) { 00133 for (DataSampleElement* e; sending_data_.dequeue_head(e);) { 00134 release_buffer(e); 00135 } 00136 } 00137 if (sending_data_.size() && DCPS_debug_level) { 00138 ACE_DEBUG((LM_WARNING, 00139 ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ") 00140 ACE_TEXT("destroyed with %d samples sending.\n"), 00141 this->sending_data_.size())); 00142 } 00143 } 00144 00145 if (this->sent_data_.size() > 0) { 00146 ACE_DEBUG((LM_DEBUG, 00147 ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ") 00148 ACE_TEXT("destroyed with %d samples sent.\n"), 00149 this->sent_data_.size())); 00150 } 00151 00152 if (this->orphaned_to_transport_.size() > 0) { 00153 if (DCPS_debug_level > 0) { 00154 ACE_DEBUG((LM_DEBUG, 00155 ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ") 00156 ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"), 00157 this->orphaned_to_transport_.size())); 00158 } 00159 } 00160 00161 if (!shutdown_) { 00162 ACE_ERROR((LM_ERROR, 00163 ACE_TEXT("(%P|%t) ERROR: ") 00164 ACE_TEXT("WriteDataContainer::~WriteDataContainer, ") 00165 ACE_TEXT("The container has not been cleaned.\n"))); 00166 } 00167 }
OpenDDS::DCPS::WriteDataContainer::WriteDataContainer | ( | WriteDataContainer const & | ) | [private] |
void OpenDDS::DCPS::WriteDataContainer::copy_and_prepend | ( | SendStateDataSampleList & | list, | |
const SendStateDataSampleList & | appended, | |||
const RepoId & | reader_id, | |||
const DDS::LifespanQosPolicy & | lifespan, | |||
const OPENDDS_STRING & | filterClassName, | |||
const FilterEvaluator * | eval, | |||
const DDS::StringSeq & | params, | |||
ssize_t & | max_resend_samples | |||
) | [private] |
Definition at line 1265 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::enqueue_head(), OpenDDS::DCPS::DataWriterImpl::filter_out(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::SendStateDataSampleList::rbegin(), OpenDDS::DCPS::SendStateDataSampleList::rend(), OpenDDS::DCPS::resend_data_expired(), sample_list_element_allocator_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and writer_.
Referenced by reenqueue_all().
01275 { 01276 for (SendStateDataSampleList::const_reverse_iterator cur = appended.rbegin(); 01277 cur != appended.rend() && max_resend_samples; ++cur) { 01278 01279 if (resend_data_expired(*cur, lifespan)) 01280 continue; 01281 01282 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01283 if (eval && writer_->filter_out(*cur, filterClassName, *eval, params)) 01284 continue; 01285 #endif 01286 01287 PublicationInstance_rch inst = cur->get_handle(); 01288 01289 if (!inst) { 01290 // *cur is a control message, just skip it 01291 continue; 01292 } 01293 01294 if (inst->durable_samples_remaining_ == 0) 01295 continue; 01296 --inst->durable_samples_remaining_; 01297 01298 DataSampleElement* element = 0; 01299 ACE_NEW_MALLOC(element, 01300 static_cast<DataSampleElement*>( 01301 sample_list_element_allocator_.malloc( 01302 sizeof(DataSampleElement))), 01303 DataSampleElement(*cur)); 01304 01305 element->set_num_subs(1); 01306 element->set_sub_id(0, reader_id); 01307 01308 list.enqueue_head(element); 01309 --max_resend_samples; 01310 } 01311 }
void OpenDDS::DCPS::WriteDataContainer::data_delivered | ( | const DataSampleElement * | sample | ) |
Acknowledge the delivery of data. The sample that resides in this container will be moved from sending_data_ list to the internal sent_data_ list. If there are any threads waiting for available space, it wakes up these threads.
Definition at line 531 of file WriteDataContainer.cpp.
References ACE_TEXT(), acked_sequences_, ACE_Condition< ACE_Thread_Mutex >::broadcast(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), OpenDDS::DCPS::DataWriterImpl::controlTracker, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_handle(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, LM_WARNING, lock_, max_durable_per_instance_, OpenDDS::DCPS::MessageTracker::message_delivered(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::InstanceDataSampleList::on_some_list(), OPENDDS_STRING, orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::remove(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::set_flag(), topic_name_, unsent_data_, wakeup_blocking_writers(), wfa_condition_, wfa_lock_, and writer_.
Referenced by data_dropped().
00532 { 00533 DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6); 00534 00535 if (DCPS_debug_level >= 2) { 00536 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered") 00537 ACE_TEXT(" %@\n"), sample)); 00538 } 00539 00540 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00541 guard, 00542 this->lock_); 00543 00544 // Delivered samples _must_ be on sending_data_ list 00545 00546 // If it is not found in one of the lists, an invariant 00547 // exception is declared. 00548 00549 // The element now needs to be removed from the sending_data_ 00550 // list, and appended to the end of the sent_data_ list here 00551 00552 DataSampleElement* stale = const_cast<DataSampleElement*>(sample); 00553 00554 // If sample is on a SendStateDataSampleList it should be on the 00555 // sending_data_ list signifying it was given to the transport to 00556 // deliver and now the transport is signaling it has been delivered 00557 if (!sending_data_.dequeue(sample)) { 00558 // 00559 // Should be on sending_data_. If it is in sent_data_ 00560 // or unsent_data there was a problem. 00561 // 00562 SendStateDataSampleList* send_lists[] = { 00563 &sent_data_, 00564 &unsent_data_, 00565 &orphaned_to_transport_}; 00566 const SendStateDataSampleList* containing_list = 00567 SendStateDataSampleList::send_list_containing_element(stale, send_lists); 00568 00569 if (containing_list == &this->sent_data_) { 00570 ACE_ERROR((LM_WARNING, 00571 ACE_TEXT("(%P|%t) WARNING: ") 00572 ACE_TEXT("WriteDataContainer::data_delivered, ") 00573 ACE_TEXT("The delivered sample is not in sending_data_ and ") 00574 ACE_TEXT("WAS IN sent_data_.\n"))); 00575 } else if (containing_list == &this->unsent_data_) { 00576 ACE_ERROR((LM_WARNING, 00577 ACE_TEXT("(%P|%t) WARNING: ") 00578 ACE_TEXT("WriteDataContainer::data_delivered, ") 00579 ACE_TEXT("The delivered sample is not in sending_data_ and ") 00580 ACE_TEXT("WAS IN unsent_data_ list.\n"))); 00581 } else { 00582 00583 //No-op: elements may be removed from all WriteDataContainer lists during shutdown 00584 //and inform transport of their release. Transport will call data-delivered on the 00585 //elements as it processes the removal but they will already be gone from the send lists. 00586 if (stale->get_header().message_id_ != SAMPLE_DATA) { 00587 //this message was a control message so release it 00588 if (DCPS_debug_level > 9) { 00589 GuidConverter converter(publication_id_); 00590 ACE_DEBUG((LM_DEBUG, 00591 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ") 00592 ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"), 00593 this->domain_id_, 00594 this->topic_name_, 00595 OPENDDS_STRING(converter).c_str())); 00596 } 00597 writer_->controlTracker.message_delivered(); 00598 } 00599 00600 if (containing_list == &this->orphaned_to_transport_) { 00601 orphaned_to_transport_.dequeue(sample); 00602 release_buffer(stale); 00603 00604 } else if (!containing_list) { 00605 // samples that were retrieved from get_resend_data() 00606 SendStateDataSampleList::remove(stale); 00607 release_buffer(stale); 00608 } 00609 00610 if (!pending_data()) 00611 empty_condition_.broadcast(); 00612 } 00613 00614 return; 00615 } 00616 ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, this->wfa_lock_); 00617 SequenceNumber acked_seq = stale->get_header().sequence_; 00618 SequenceNumber prev_max = acked_sequences_.cumulative_ack(); 00619 00620 if (stale->get_header().message_id_ != SAMPLE_DATA) { 00621 //this message was a control message so release it 00622 if (DCPS_debug_level > 9) { 00623 GuidConverter converter(publication_id_); 00624 ACE_DEBUG((LM_DEBUG, 00625 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ") 00626 ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"), 00627 this->domain_id_, 00628 this->topic_name_, 00629 OPENDDS_STRING(converter).c_str())); 00630 } 00631 release_buffer(stale); 00632 stale = 0; 00633 writer_->controlTracker.message_delivered(); 00634 } else { 00635 00636 if (max_durable_per_instance_) { 00637 DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample()); 00638 sent_data_.enqueue_tail(sample); 00639 00640 } else { 00641 if (InstanceDataSampleList::on_some_list(sample)) { 00642 PublicationInstance_rch inst = sample->get_handle(); 00643 inst->samples_.dequeue(sample); 00644 } 00645 release_buffer(stale); 00646 stale = 0; 00647 } 00648 00649 if (DCPS_debug_level > 9) { 00650 GuidConverter converter(publication_id_); 00651 ACE_DEBUG((LM_DEBUG, 00652 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ") 00653 ACE_TEXT("domain %d topic %C publication %C %s.\n"), 00654 this->domain_id_, 00655 this->topic_name_, 00656 OPENDDS_STRING(converter).c_str(), 00657 max_durable_per_instance_ 00658 ? ACE_TEXT("stored for durability") 00659 : ACE_TEXT("released"))); 00660 } 00661 00662 this->wakeup_blocking_writers (stale); 00663 } 00664 if (DCPS_debug_level > 9) { 00665 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ") 00666 ACE_TEXT("Inserting acked_sequence: %q\n"), 00667 acked_seq.getValue())); 00668 } 00669 00670 acked_sequences_.insert(acked_seq); 00671 00672 if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || 00673 prev_max < acked_sequences_.cumulative_ack()) { 00674 00675 if (DCPS_debug_level > 9) { 00676 ACE_DEBUG((LM_DEBUG, 00677 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ") 00678 ACE_TEXT("broadcasting wait_for_acknowledgments update.\n"))); 00679 } 00680 00681 wfa_condition_.broadcast(); 00682 } 00683 00684 // Signal if there is no pending data. 00685 if (!pending_data()) 00686 empty_condition_.broadcast(); 00687 }
void OpenDDS::DCPS::WriteDataContainer::data_dropped | ( | const DataSampleElement * | element, | |
bool | dropped_by_transport | |||
) |
This method is called by the transport to notify the sample is dropped. Which the transport was told to do by the publication code by calling TransportClient::remove_sample(). If the sample was "sending" then it is moved to the "unsent" list. If there are any threads waiting for available space then it needs wake up these threads. The dropped_by_transport flag true indicates the dropping initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample().
Definition at line 690 of file WriteDataContainer.cpp.
References ACE_TEXT(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), OpenDDS::DCPS::DataWriterImpl::controlTracker, data_delivered(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), LM_DEBUG, LM_WARNING, lock_, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::remove(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, topic_name_, unsent_data_, wakeup_blocking_writers(), and writer_.
00692 { 00693 DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6); 00694 00695 if (DCPS_debug_level >= 2) { 00696 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped") 00697 ACE_TEXT(" sample %X dropped_by_transport %d\n"), 00698 sample, dropped_by_transport)); 00699 } 00700 00701 // If the transport initiates the data dropping, we need do same thing 00702 // as data_delivered. e.g. remove the sample from the internal list 00703 // and the instance list. We do not need acquire the lock here since 00704 // the data_delivered acquires the lock. 00705 if (dropped_by_transport) { 00706 this->data_delivered(sample); 00707 return; 00708 } 00709 00710 //The data_dropped could be called from the thread initiating sample remove 00711 //which already hold the lock. In this case, it's not necessary to acquire 00712 //lock here. It also could be called from the transport thread in a delayed 00713 //notification, it's necessary to acquire lock here to protect the internal 00714 //structures in this class. 00715 00716 ACE_GUARD (ACE_Recursive_Thread_Mutex, 00717 guard, 00718 this->lock_); 00719 00720 // The dropped sample should be in the sending_data_ list. 00721 // Otherwise an exception will be raised. 00722 // 00723 // We are now been notified by transport, so we can 00724 // keep the sample from the sending_data_ list still in 00725 // sample list since we will send it. 00726 00727 DataSampleElement* stale = const_cast<DataSampleElement*>(sample); 00728 00729 // If sample is on a SendStateDataSampleList it should be on the 00730 // sending_data_ list signifying it was given to the transport to 00731 // deliver and now the transport is signaling it has been dropped 00732 00733 if (sending_data_.dequeue(sample)) { 00734 // else: The data_dropped is called as a result of remove_sample() 00735 // called from reenqueue_all() which supports the TRANSIENT_LOCAL 00736 // qos. The samples that are sending by transport are dropped from 00737 // transport and will be moved to the unsent list for resend. 00738 unsent_data_.enqueue_tail(sample); 00739 00740 } else { 00741 // 00742 // If it is in sent_data_ or unsent_data there was a problem. 00743 // 00744 SendStateDataSampleList* send_lists[] = { 00745 &sent_data_, 00746 &unsent_data_, 00747 &orphaned_to_transport_}; 00748 const SendStateDataSampleList* containing_list = 00749 SendStateDataSampleList::send_list_containing_element(stale, send_lists); 00750 00751 if (containing_list == &this->sent_data_) { 00752 ACE_ERROR((LM_WARNING, 00753 ACE_TEXT("(%P|%t) WARNING: ") 00754 ACE_TEXT("WriteDataContainer::data_dropped, ") 00755 ACE_TEXT("The dropped sample is not in sending_data_ and ") 00756 ACE_TEXT("WAS IN sent_data_.\n"))); 00757 } else if (containing_list == &this->unsent_data_) { 00758 ACE_ERROR((LM_WARNING, 00759 ACE_TEXT("(%P|%t) WARNING: ") 00760 ACE_TEXT("WriteDataContainer::data_dropped, ") 00761 ACE_TEXT("The dropped sample is not in sending_data_ and ") 00762 ACE_TEXT("WAS IN unsent_data_ list.\n"))); 00763 } else { 00764 00765 //No-op: elements may be removed from all WriteDataContainer lists during shutdown 00766 //and inform transport of their release. Transport will call data-dropped on the 00767 //elements as it processes the removal but they will already be gone from the send lists. 00768 if (stale->get_header().message_id_ != SAMPLE_DATA) { 00769 //this message was a control message so release it 00770 if (DCPS_debug_level > 9) { 00771 GuidConverter converter(publication_id_); 00772 ACE_DEBUG((LM_DEBUG, 00773 ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ") 00774 ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"), 00775 this->domain_id_, 00776 this->topic_name_, 00777 OPENDDS_STRING(converter).c_str())); 00778 } 00779 writer_->controlTracker.message_dropped(); 00780 } 00781 00782 if (containing_list == &this->orphaned_to_transport_) { 00783 orphaned_to_transport_.dequeue(sample); 00784 release_buffer(stale); 00785 if (!pending_data()) 00786 empty_condition_.broadcast(); 00787 00788 } else if (!containing_list) { 00789 // samples that were retrieved from get_resend_data() 00790 SendStateDataSampleList::remove(stale); 00791 release_buffer(stale); 00792 } 00793 } 00794 00795 return; 00796 } 00797 00798 this->wakeup_blocking_writers (stale); 00799 00800 if (!pending_data()) 00801 empty_condition_.broadcast(); 00802 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::dispose | ( | DDS::InstanceHandle_t | handle, | |
Message_Block_Ptr & | registered_sample, | |||
bool | dup_registered_sample = true | |||
) |
Delete the samples for the provided instance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.
This method returns error if the instance is not registered.
Definition at line 367 of file WriteDataContainer.cpp.
References ACE_TEXT(), OpenDDS::DCPS::find(), OpenDDS::DCPS::RcHandle< T >::in(), instances_, LM_ERROR, lock_, remove_oldest_sample(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::InstanceDataSampleList::size(), OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
Referenced by unregister_all().
00370 { 00371 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00372 guard, 00373 this->lock_, 00374 DDS::RETCODE_ERROR); 00375 00376 PublicationInstance_rch instance; 00377 00378 int const find_attempt = find(instances_, instance_handle, instance); 00379 00380 if (0 != find_attempt) { 00381 ACE_ERROR_RETURN((LM_ERROR, 00382 ACE_TEXT("(%P|%t) ERROR: ") 00383 ACE_TEXT("WriteDataContainer::dispose, ") 00384 ACE_TEXT("The instance(handle=%X) ") 00385 ACE_TEXT("is not registered yet.\n"), 00386 instance_handle), 00387 DDS::RETCODE_PRECONDITION_NOT_MET); 00388 } 00389 00390 if (dup_registered_sample) { 00391 // The registered_sample is shallow copied. 00392 registered_sample.reset(instance->registered_sample_->duplicate()); 00393 } 00394 00395 // Note: The DDS specification is unclear as to if samples in the process 00396 // of being sent should be removed or not. 00397 // The advantage of calling remove_sample() on them is that the 00398 // cached allocator memory for them is freed. The disadvantage 00399 // is that the slow reader may see multiple disposes without 00400 // any write sample between them and hence not temporarily move into the 00401 // Alive state. 00402 // We have chosen to NOT remove the sending samples. 00403 00404 InstanceDataSampleList& instance_list = instance->samples_; 00405 00406 while (instance_list.size() > 0) { 00407 bool released = false; 00408 DDS::ReturnCode_t ret 00409 = remove_oldest_sample(instance_list, released); 00410 00411 if (ret != DDS::RETCODE_OK) { 00412 return ret; 00413 } 00414 } 00415 00416 if (this->writer_->watchdog_.in()) 00417 this->writer_->watchdog_->cancel_timer(instance); 00418 return DDS::RETCODE_OK; 00419 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue | ( | DataSampleElement * | sample, | |
DDS::InstanceHandle_t | instance | |||
) |
Enqueue the data sample in its instance thread. This method assumes there is an available space for the sample in the instance list.
Definition at line 184 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::InstanceDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), get_handle_instance(), ACE_OS::gettimeofday(), OpenDDS::DCPS::RcHandle< T >::in(), DDS::RETCODE_OK, unsent_data_, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
00187 { 00188 // Get the PublicationInstance pointer from InstanceHandle_t. 00189 PublicationInstance_rch instance = 00190 get_handle_instance(instance_handle); 00191 // Extract the instance queue. 00192 InstanceDataSampleList& instance_list = instance->samples_; 00193 00194 if (this->writer_->watchdog_.in()) { 00195 instance->last_sample_tv_ = instance->cur_sample_tv_; 00196 instance->cur_sample_tv_ = ACE_OS::gettimeofday(); 00197 this->writer_->watchdog_->execute(*this->writer_, instance, false); 00198 } 00199 00200 // 00201 // Enqueue to the next_send_sample_ thread of unsent_data_ 00202 // will link samples with the next_sample/previous_sample and 00203 // also next_send_sample_. 00204 // This would save time when we actually send the data. 00205 00206 unsent_data_.enqueue_tail(sample); 00207 00208 // 00209 // Add this sample to the INSTANCE scope list. 00210 instance_list.enqueue_tail(sample); 00211 00212 return DDS::RETCODE_OK; 00213 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue_control | ( | DataSampleElement * | control_sample | ) |
Definition at line 170 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), DDS::RETCODE_OK, and unsent_data_.
00171 { 00172 // Enqueue to the next_send_sample_ thread of unsent_data_ 00173 // will link samples with the next_sample/previous_sample and 00174 // also next_send_sample_. 00175 // This would save time when we actually send the data. 00176 00177 unsent_data_.enqueue_tail(control_sample); 00178 00179 return DDS::RETCODE_OK; 00180 }
PublicationInstance_rch OpenDDS::DCPS::WriteDataContainer::get_handle_instance | ( | DDS::InstanceHandle_t | handle | ) |
Definition at line 1250 of file WriteDataContainer.cpp.
References ACE_TEXT(), OpenDDS::DCPS::find(), instances_, and LM_DEBUG.
Referenced by enqueue(), and obtain_buffer().
01251 { 01252 PublicationInstance_rch instance; 01253 01254 if (0 != find(instances_, handle, instance)) { 01255 ACE_DEBUG((LM_DEBUG, 01256 ACE_TEXT("(%P|%t) ") 01257 ACE_TEXT("WriteDataContainer::get_handle_instance, ") 01258 ACE_TEXT("lookup for %d failed\n"), handle)); 01259 } 01260 01261 return instance; 01262 }
void OpenDDS::DCPS::WriteDataContainer::get_instance_handles | ( | InstanceHandleVec & | instance_handles | ) |
Definition at line 1417 of file WriteDataContainer.cpp.
References instances_, and lock_.
01418 { 01419 ACE_GUARD(ACE_Recursive_Thread_Mutex, 01420 guard, 01421 this->lock_); 01422 PublicationInstanceMapType::iterator it = instances_.begin(); 01423 01424 while (it != instances_.end()) { 01425 instance_handles.push_back(it->second->instance_handle_); 01426 ++it; 01427 } 01428 }
SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::get_resend_data | ( | ) |
Obtain a list of data for resending. This is only used when TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list returned is not put on any SendStateDataSampleList.
Definition at line 502 of file WriteDataContainer.cpp.
References DBG_ENTRY_LVL, resend_data_, and OpenDDS::DCPS::SendStateDataSampleList::reset().
00503 { 00504 DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6); 00505 00506 // 00507 // The samples in unsent_data are added to the sending_data 00508 // during enqueue. 00509 // 00510 SendStateDataSampleList list = this->resend_data_; 00511 00512 // 00513 // Clear the unsent data list. 00514 // 00515 this->resend_data_.reset(); 00516 // 00517 // Return the moved list. 00518 // 00519 return list; 00520 }
ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::get_unsent_data | ( | SendStateDataSampleList & | list | ) |
Obtain a list of data that has not yet been sent. The data on the list returned is moved from the internal unsent_data_ list to the internal sending_data_ list as part of this call. The entire list is linked via the DataSampleElement.next_send_sample_ link as well.
Definition at line 463 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::begin(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::reset(), sending_data_, transaction_id_, and unsent_data_.
00464 { 00465 DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6); 00466 // 00467 // The samples in unsent_data are added to the local datawriter 00468 // list and enqueued to the sending_data_ signifying they have 00469 // been passed to the transport to send in a transaction 00470 // 00471 list = this->unsent_data_; 00472 00473 // Increment send counter for this send operation 00474 ++transaction_id_; 00475 00476 // Mark all samples with current send counter 00477 SendStateDataSampleList::iterator iter = list.begin(); 00478 while (iter != list.end()) { 00479 iter->set_transaction_id(this->transaction_id_); 00480 ++iter; 00481 } 00482 00483 // 00484 // The unsent_data_ already linked with the 00485 // next_send_sample during enqueue. 00486 // Append the unsent_data_ to current sending_data_ 00487 // list. 00488 sending_data_.enqueue_tail(list); 00489 00490 // 00491 // Clear the unsent data list. 00492 // 00493 this->unsent_data_.reset(); 00494 00495 // 00496 // Return the moved list. 00497 // 00498 return transaction_id_; 00499 }
void OpenDDS::DCPS::WriteDataContainer::log_send_state_lists | ( | OPENDDS_STRING | description | ) | [private] |
Definition at line 1497 of file WriteDataContainer.cpp.
References instances_, LM_DEBUG, num_all_samples(), orphaned_to_transport_, sending_data_, sent_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.
Referenced by wait_pending().
01498 { 01499 ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n", 01500 description.c_str(), 01501 unsent_data_.size(), 01502 sending_data_.size(), 01503 sent_data_.size(), 01504 orphaned_to_transport_.size(), 01505 num_all_samples(), 01506 instances_.size())); 01507 }
size_t OpenDDS::DCPS::WriteDataContainer::num_all_samples | ( | ) |
Return the number of samples for all instances.
Definition at line 443 of file WriteDataContainer.cpp.
References instances_, lock_, and size.
Referenced by log_send_state_lists(), and obtain_buffer().
00444 { 00445 size_t size = 0; 00446 00447 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00448 guard, 00449 this->lock_, 00450 0); 00451 00452 for (PublicationInstanceMapType::iterator iter = instances_.begin(); 00453 iter != instances_.end(); 00454 ++iter) 00455 { 00456 size += iter->second->samples_.size(); 00457 } 00458 00459 return size; 00460 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::num_samples | ( | DDS::InstanceHandle_t | handle, | |
size_t & | size | |||
) |
Return the number of samples for the given instance.
Definition at line 422 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::find(), instances_, lock_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00424 { 00425 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00426 guard, 00427 this->lock_, 00428 DDS::RETCODE_ERROR); 00429 PublicationInstance_rch instance; 00430 00431 int const find_attempt = find(instances_, handle, instance); 00432 00433 if (0 != find_attempt) { 00434 return DDS::RETCODE_ERROR; 00435 00436 } else { 00437 size = instance->samples_.size(); 00438 return DDS::RETCODE_OK; 00439 } 00440 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer | ( | DataSampleElement *& | element, | |
DDS::InstanceHandle_t | handle | |||
) |
Allocate a DataSampleElement object and check the space availability for newly allocated element according to qos settings. For the blocking write case, if resource limits or history qos limits are reached, then it blocks for max blocking time for a previous sample to be delivered or dropped by the transport. In non-blocking write case, if resource limits or history qos limits are reached, will attempt to remove oldest samples (forcing the transport to drop samples if necessary) to make space. If there are several threads waiting then the first one in the waiting list can enqueue, others continue waiting.
Definition at line 1012 of file WriteDataContainer.cpp.
References ACE_TEXT(), condition_, data_holder_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_absolute_time_value(), OpenDDS::DCPS::WriterDataSampleList::enqueue_tail(), get_handle_instance(), ACE_OS::gettimeofday(), history_depth_, instances_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), max_blocking_time_, max_num_samples_, max_samples_per_instance_, num_all_samples(), publication_id_, OpenDDS::DCPS::DataWriterImpl::qos_, release_buffer(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, remove_excess_durable(), remove_oldest_sample(), DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sample_list_element_allocator_, shutdown_, ACE_Condition< ACE_Recursive_Thread_Mutex >::wait(), waiting_on_release_, and writer_.
01014 { 01015 DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6); 01016 01017 remove_excess_durable(); 01018 01019 PublicationInstance_rch instance = get_handle_instance(handle); 01020 01021 if (!instance) { 01022 return DDS::RETCODE_BAD_PARAMETER; 01023 } 01024 01025 ACE_NEW_MALLOC_RETURN( 01026 element, 01027 static_cast<DataSampleElement*>( 01028 sample_list_element_allocator_.malloc( 01029 sizeof(DataSampleElement))), 01030 DataSampleElement(publication_id_, 01031 this->writer_, 01032 instance), 01033 DDS::RETCODE_ERROR); 01034 01035 // Extract the current instance queue. 01036 InstanceDataSampleList& instance_list = instance->samples_; 01037 DDS::ReturnCode_t ret = DDS::RETCODE_OK; 01038 01039 bool need_to_set_abs_timeout = true; 01040 ACE_Time_Value abs_timeout; 01041 01042 //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and 01043 //max_instances and max_instances * depth 01044 while ((instance_list.size() >= max_samples_per_instance_) || 01045 ((this->max_num_samples_ > 0) && 01046 ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) { 01047 01048 if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) { 01049 if (instance_list.size() >= history_depth_) { 01050 if (DCPS_debug_level >= 2) { 01051 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01052 ACE_TEXT(" instance %d attempting to remove") 01053 ACE_TEXT(" its oldest sample (reliable)\n"), 01054 handle)); 01055 } 01056 bool oldest_released = false; 01057 ret = remove_oldest_sample(instance_list, oldest_released); 01058 if (oldest_released) { 01059 break; 01060 } 01061 } 01062 // Reliable writers can wait 01063 if (need_to_set_abs_timeout) { 01064 abs_timeout = duration_to_absolute_time_value (max_blocking_time_); 01065 need_to_set_abs_timeout = false; 01066 } 01067 if (!shutdown_ && ACE_OS::gettimeofday() < abs_timeout) { 01068 if (DCPS_debug_level >= 2) { 01069 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01070 ACE_TEXT(" instance %d waiting for samples to be released by transport\n"), 01071 handle)); 01072 } 01073 01074 waiting_on_release_ = true; 01075 int const wait_result = condition_.wait(&abs_timeout); 01076 01077 if (wait_result == 0) { 01078 remove_excess_durable(); 01079 01080 } else { 01081 if (errno == ETIME) { 01082 if (DCPS_debug_level >= 2) { 01083 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01084 ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"), 01085 handle)); 01086 } 01087 ret = DDS::RETCODE_TIMEOUT; 01088 01089 } else { 01090 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: WriteDataContainer::obtain_buffer condition_.wait()") 01091 ACE_TEXT("%p\n"))); 01092 ret = DDS::RETCODE_ERROR; 01093 } 01094 } 01095 01096 } else { 01097 //either shutdown has been signaled or max_blocking_time 01098 //has surpassed so treat as timeout 01099 ret = DDS::RETCODE_TIMEOUT; 01100 } 01101 01102 } else { 01103 //BEST EFFORT 01104 bool oldest_released = false; 01105 01106 //try to remove stale samples from this instance 01107 // The remove_oldest_sample() method removes the oldest sample 01108 // from instance list and removes it from the internal lists. 01109 if (instance_list.size() > 0) { 01110 if (DCPS_debug_level >= 2) { 01111 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01112 ACE_TEXT(" instance %d attempting to remove") 01113 ACE_TEXT(" its oldest sample\n"), 01114 handle)); 01115 } 01116 ret = remove_oldest_sample(instance_list, oldest_released); 01117 } 01118 //else try to remove stale samples from other instances which are full 01119 if (ret == DDS::RETCODE_OK && !oldest_released) { 01120 if (DCPS_debug_level >= 2) { 01121 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01122 ACE_TEXT(" instance %d attempting to remove") 01123 ACE_TEXT(" oldest sample from any full instances\n"), 01124 handle)); 01125 } 01126 PublicationInstanceMapType::iterator it = instances_.begin(); 01127 01128 while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) { 01129 if (it->second->samples_.size() >= max_samples_per_instance_) { 01130 ret = remove_oldest_sample(it->second->samples_, oldest_released); 01131 } 01132 ++it; 01133 } 01134 } 01135 //else try to remove stale samples from other non-full instances 01136 if (ret == DDS::RETCODE_OK && !oldest_released) { 01137 if (DCPS_debug_level >= 2) { 01138 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01139 ACE_TEXT(" instance %d attempting to remove") 01140 ACE_TEXT(" oldest sample from any instance with samples currently\n"), 01141 handle)); 01142 } 01143 PublicationInstanceMapType::iterator it = instances_.begin(); 01144 01145 while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) { 01146 if (it->second->samples_.size() > 0) { 01147 ret = remove_oldest_sample(it->second->samples_, oldest_released); 01148 } 01149 ++it; 01150 } 01151 } 01152 if (!oldest_released) { 01153 //This means that no instances have samples to remove and yet 01154 //still hitting resource limits. 01155 ACE_ERROR((LM_ERROR, 01156 ACE_TEXT("(%P|%t) ERROR: ") 01157 ACE_TEXT("WriteDataContainer::obtain_buffer, ") 01158 ACE_TEXT("hitting resource limits with no samples to remove\n"))); 01159 ret = DDS::RETCODE_ERROR; 01160 } 01161 } //END BEST EFFORT 01162 01163 if (ret != DDS::RETCODE_OK) { 01164 if (DCPS_debug_level >= 2) { 01165 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01166 ACE_TEXT(" instance %d could not obtain buffer for sample") 01167 ACE_TEXT(" releasing allotted sample and returning\n"), 01168 handle)); 01169 } 01170 this->release_buffer(element); 01171 return ret; 01172 } 01173 } //END WHILE 01174 01175 data_holder_.enqueue_tail(element); 01176 01177 return ret; 01178 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control | ( | DataSampleElement *& | element | ) |
Definition at line 994 of file WriteDataContainer.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), publication_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, and writer_.
00995 { 00996 DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6); 00997 00998 ACE_NEW_MALLOC_RETURN( 00999 element, 01000 static_cast<DataSampleElement*>( 01001 sample_list_element_allocator_.malloc( 01002 sizeof(DataSampleElement))), 01003 DataSampleElement(publication_id_, 01004 this->writer_, 01005 PublicationInstance_rch()), 01006 DDS::RETCODE_ERROR); 01007 01008 return DDS::RETCODE_OK; 01009 }
typedef OpenDDS::DCPS::WriteDataContainer::OPENDDS_VECTOR | ( | DDS::InstanceHandle_t | ) |
Returns a vector of handles for the instances registered for this data writer.
WriteDataContainer& OpenDDS::DCPS::WriteDataContainer::operator= | ( | WriteDataContainer const & | ) | [private] |
bool OpenDDS::DCPS::WriteDataContainer::pending_data | ( | ) |
Returns if pending data exists. This includes sending, and unsent data.
Definition at line 523 of file WriteDataContainer.cpp.
References orphaned_to_transport_, sending_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.
Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().
00524 { 00525 return this->sending_data_.size() != 0 00526 || this->orphaned_to_transport_.size() != 0 00527 || this->unsent_data_.size() != 0; 00528 }
bool OpenDDS::DCPS::WriteDataContainer::persist_data | ( | ) |
Copy sent data to data DURABILITY cache.
Definition at line 1315 of file WriteDataContainer.cpp.
References ACE_TEXT(), domain_id_, durability_cache_, durability_service_, OpenDDS::DCPS::DataDurabilityCache::insert(), LM_ERROR, sent_data_, topic_name_, and type_name_.
01316 { 01317 bool result = true; 01318 01319 // ------------------------------------------------------------ 01320 // Transfer sent data to data DURABILITY cache. 01321 // ------------------------------------------------------------ 01322 if (this->durability_cache_) { 01323 // A data durability cache is available for TRANSIENT or 01324 // PERSISTENT data durability. Cache the data samples. 01325 01326 // 01327 // We only cache data that is not still in use outside of 01328 // this instance of WriteDataContainer 01329 // (only cache samples in sent_data_ meaning transport has delivered). 01330 bool const inserted = 01331 this->durability_cache_->insert(this->domain_id_, 01332 this->topic_name_, 01333 this->type_name_, 01334 this->sent_data_, 01335 this->durability_service_ 01336 ); 01337 01338 result = inserted; 01339 01340 if (!inserted) 01341 ACE_ERROR((LM_ERROR, 01342 ACE_TEXT("(%P|%t) ERROR: ") 01343 ACE_TEXT("WriteDataContainer::persist_data, ") 01344 ACE_TEXT("failed to make data durable for ") 01345 ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"), 01346 this->domain_id_, 01347 this->topic_name_, 01348 this->type_name_)); 01349 } 01350 01351 return result; 01352 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::reenqueue_all | ( | const RepoId & | reader_id, | |
const DDS::LifespanQosPolicy & | lifespan, | |||
const OPENDDS_STRING & | filterClassName, | |||
const FilterEvaluator * | eval, | |||
const DDS::StringSeq & | params | |||
) |
Create a resend list with the copies of all current "sending" and "sent" samples. The samples will be sent to the subscriber specified.
Definition at line 216 of file WriteDataContainer.cpp.
References ACE_TEXT(), copy_and_prepend(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, instances_, LM_DEBUG, lock_, max_durable_per_instance_, OPENDDS_STRING, publication_id_, resend_data_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sending_data_, sent_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and topic_name_.
00225 { 00226 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00227 guard, 00228 lock_, 00229 DDS::RETCODE_ERROR); 00230 00231 ssize_t total_size = 0; 00232 for (PublicationInstanceMapType::iterator it = instances_.begin(); 00233 it != instances_.end(); ++it) { 00234 const ssize_t durable = std::min(it->second->samples_.size(), 00235 ssize_t(max_durable_per_instance_)); 00236 total_size += durable; 00237 it->second->durable_samples_remaining_ = durable; 00238 } 00239 00240 copy_and_prepend(resend_data_, 00241 sending_data_, 00242 reader_id, 00243 lifespan, 00244 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00245 filterClassName, eval, expression_params, 00246 #endif 00247 total_size); 00248 00249 copy_and_prepend(resend_data_, 00250 sent_data_, 00251 reader_id, 00252 lifespan, 00253 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00254 filterClassName, eval, expression_params, 00255 #endif 00256 total_size); 00257 00258 if (DCPS_debug_level > 9 && resend_data_.size()) { 00259 GuidConverter converter(publication_id_); 00260 GuidConverter reader(reader_id); 00261 ACE_DEBUG((LM_DEBUG, 00262 ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ") 00263 ACE_TEXT("domain %d topic %C publication %C copying ") 00264 ACE_TEXT("sending/sent to resend to %C.\n"), 00265 domain_id_, 00266 topic_name_, 00267 OPENDDS_STRING(converter).c_str(), 00268 OPENDDS_STRING(reader).c_str())); 00269 } 00270 00271 return DDS::RETCODE_OK; 00272 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::register_instance | ( | DDS::InstanceHandle_t & | instance_handle, | |
Message_Block_Ptr & | registered_sample | |||
) |
Dynamically allocate a PublicationInstance object and add to the instances_ list.
Definition at line 275 of file WriteDataContainer.cpp.
References ACE_TEXT(), OpenDDS::DCPS::bind(), OpenDDS::DCPS::find(), OpenDDS::DCPS::DataWriterImpl::get_next_handle(), DDS::HANDLE_NIL, OpenDDS::DCPS::RcHandle< T >::in(), instances_, LM_ERROR, max_num_instances_, OpenDDS::DCPS::move(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::RcHandle< T >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
00278 { 00279 PublicationInstance_rch instance; 00280 00281 if (instance_handle == DDS::HANDLE_NIL) { 00282 if (max_num_instances_ > 0 00283 && max_num_instances_ <= (CORBA::Long) instances_.size()) { 00284 return DDS::RETCODE_OUT_OF_RESOURCES; 00285 } 00286 00287 // registered the instance for the first time. 00288 instance.reset(new PublicationInstance(move(registered_sample)), keep_count()); 00289 00290 instance_handle = this->writer_->get_next_handle(); 00291 00292 int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance); 00293 00294 if (0 != insert_attempt) { 00295 ACE_ERROR((LM_ERROR, 00296 ACE_TEXT("(%P|%t) ERROR: ") 00297 ACE_TEXT("WriteDataContainer::register_instance, ") 00298 ACE_TEXT("failed to insert instance handle=%X\n"), 00299 instance.in())); 00300 return DDS::RETCODE_ERROR; 00301 } // if (0 != insert_attempt) 00302 00303 instance->instance_handle_ = instance_handle; 00304 00305 } else { 00306 00307 int const find_attempt = find(instances_, instance_handle, instance); 00308 00309 if (0 != find_attempt) { 00310 ACE_ERROR((LM_ERROR, 00311 ACE_TEXT("(%P|%t) ERROR: ") 00312 ACE_TEXT("WriteDataContainer::register_instance, ") 00313 ACE_TEXT("The provided instance handle=%X is not a valid") 00314 ACE_TEXT("handle.\n"), 00315 instance_handle)); 00316 00317 return DDS::RETCODE_ERROR; 00318 } // if (0 != find_attempt) 00319 00320 instance->unregistered_ = false; 00321 } 00322 00323 // The registered_sample is shallow copied. 00324 registered_sample.reset(instance->registered_sample_->duplicate()); 00325 00326 if (this->writer_->watchdog_.in()) { 00327 this->writer_->watchdog_->schedule_timer(instance); 00328 } 00329 00330 return DDS::RETCODE_OK; 00331 }
void OpenDDS::DCPS::WriteDataContainer::release_buffer | ( | DataSampleElement * | element | ) |
Release the memory previously allocated. This method is corresponding to the obtain_buffer method. If the memory is allocated by some allocator then the memory needs to be released to the allocator.
Definition at line 1181 of file WriteDataContainer.cpp.
References data_holder_, OpenDDS::DCPS::WriterDataSampleList::dequeue(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::SAMPLE_DATA, and sample_list_element_allocator_.
Referenced by data_delivered(), data_dropped(), obtain_buffer(), remove_excess_durable(), remove_oldest_sample(), and ~WriteDataContainer().
01182 { 01183 if (element->get_header().message_id_ == SAMPLE_DATA) 01184 data_holder_.dequeue(element); 01185 // Release the memory to the allocator. 01186 ACE_DES_FREE(element, 01187 sample_list_element_allocator_.free, 01188 DataSampleElement); 01189 }
void OpenDDS::DCPS::WriteDataContainer::remove_excess_durable | ( | ) | [private] |
Remove the oldest "n" samples from each instance list that are in a state such that they could only be used for durability purposes (see reenqueue_all). "n" is determined by max_durable_per_instance_, so these samples are truly unneeded -- there are max_durable_per_instance_ newer samples available in the instance.
Definition at line 805 of file WriteDataContainer.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue(), domain_id_, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, instances_, LM_DEBUG, max_durable_per_instance_, OPENDDS_STRING, OpenDDS::DCPS::InstanceDataSampleList::prev(), publication_id_, release_buffer(), sent_data_, OpenDDS::DCPS::InstanceDataSampleList::tail(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and topic_name_.
Referenced by obtain_buffer().
00806 { 00807 if (!max_durable_per_instance_) 00808 return; 00809 00810 size_t n_released = 0; 00811 00812 for (PublicationInstanceMapType::iterator iter = instances_.begin(); 00813 iter != instances_.end(); 00814 ++iter) { 00815 00816 CORBA::Long durable_allowed = max_durable_per_instance_; 00817 InstanceDataSampleList& instance_list = iter->second->samples_; 00818 00819 for (DataSampleElement* it = instance_list.tail(), *prev; it; it = prev) { 00820 prev = InstanceDataSampleList::prev(it); 00821 00822 if (DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, it->get_sample())) { 00823 00824 if (durable_allowed) { 00825 --durable_allowed; 00826 } else { 00827 instance_list.dequeue(it); 00828 sent_data_.dequeue(it); 00829 release_buffer(it); 00830 ++n_released; 00831 } 00832 } 00833 } 00834 } 00835 00836 if (n_released && DCPS_debug_level > 9) { 00837 const GuidConverter converter(publication_id_); 00838 ACE_DEBUG((LM_DEBUG, 00839 ACE_TEXT("(%P|%t) WriteDataContainer::remove_excess_durable: ") 00840 ACE_TEXT("domain %d topic %C publication %C %B samples removed ") 00841 ACE_TEXT("from durable data.\n"), domain_id_, topic_name_, 00842 OPENDDS_STRING(converter).c_str(), n_released)); 00843 } 00844 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample | ( | InstanceDataSampleList & | instance_list, | |
bool & | released | |||
) | [private] |
Remove the oldest sample (head) from the instance history list. This method also updates the internal lists to reflect the change. If the sample is in the unsent_data_ or sent_data_ list then it will be released. If the sample is in the sending_data_ list then the transport will be notified to release the sample, then the sample will be released. Otherwise an error is returned. The "released" boolean value indicates whether the sample is released.
Definition at line 848 of file WriteDataContainer.cpp.
References ACE_TEXT(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue_head(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), LM_DEBUG, LM_ERROR, LM_WARNING, lock_, OPENDDS_STRING, orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::TransportClient::remove_sample(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, topic_name_, unsent_data_, and writer_.
Referenced by dispose(), and obtain_buffer().
00851 { 00852 DataSampleElement* stale = 0; 00853 00854 // 00855 // Remove the oldest sample from the instance list. 00856 // 00857 if (instance_list.dequeue_head(stale) == false) { 00858 ACE_ERROR_RETURN((LM_ERROR, 00859 ACE_TEXT("(%P|%t) ERROR: ") 00860 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ") 00861 ACE_TEXT("dequeue_head_next_sample failed\n")), 00862 DDS::RETCODE_ERROR); 00863 } 00864 00865 // 00866 // Remove the stale data from the next_writer_sample_ list. The 00867 // sending_data_/next_send_sample_ list is not managed within the 00868 // container, it is only used external to the container and does 00869 // not need to be managed internally. 00870 // 00871 // The next_writer_sample_ link is being used in one of the sent_data_, 00872 // sending_data_, or unsent_data lists. Removal from the doubly 00873 // linked list needs to repair the list only when the stale sample 00874 // is either the head or tail of the list. 00875 // 00876 00877 // 00878 // Locate the head of the list that the stale data is in. 00879 // 00880 SendStateDataSampleList* send_lists[] = { 00881 &sending_data_, 00882 &sent_data_, 00883 &unsent_data_, 00884 &orphaned_to_transport_}; 00885 const SendStateDataSampleList* containing_list = 00886 SendStateDataSampleList::send_list_containing_element(stale, send_lists); 00887 00888 // 00889 // Identify the list that the stale data is in. 00890 // The stale data should be in one of the sent_data_, sending_data_ 00891 // or unsent_data_. It should not be in released_data_ list since 00892 // this function is the only place a sample is moved from 00893 // sending_data_ to released_data_ list. 00894 00895 // Remove the element from the internal list. 00896 bool result = false; 00897 00898 if (containing_list == &this->sending_data_) { 00899 if (DCPS_debug_level > 2) { 00900 ACE_ERROR((LM_WARNING, 00901 ACE_TEXT("(%P|%t) WARNING: ") 00902 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ") 00903 ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n"))); 00904 } 00905 00906 // This means transport is still using the sample that needs to 00907 // be released currently so notify transport that sample is being removed. 00908 00909 if (this->writer_->remove_sample(stale)) { 00910 if (this->sent_data_.dequeue(stale)) { 00911 release_buffer(stale); 00912 result = true; 00913 } 00914 00915 } else { 00916 if (this->sending_data_.dequeue(stale)) { 00917 this->orphaned_to_transport_.enqueue_tail(stale); 00918 } else if (this->sent_data_.dequeue(stale)) { 00919 release_buffer(stale); 00920 result = true; 00921 } 00922 result = true; 00923 } 00924 released = true; 00925 00926 } else if (containing_list == &this->sent_data_) { 00927 // No one is using the data sample, so we can release it back to 00928 // its allocator. 00929 // 00930 result = this->sent_data_.dequeue(stale) != 0; 00931 release_buffer(stale); 00932 released = true; 00933 00934 if (DCPS_debug_level > 9) { 00935 GuidConverter converter(publication_id_); 00936 ACE_DEBUG((LM_DEBUG, 00937 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ") 00938 ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"), 00939 this->domain_id_, 00940 this->topic_name_, 00941 OPENDDS_STRING(converter).c_str())); 00942 } 00943 00944 } else if (containing_list == &this->unsent_data_) { 00945 // 00946 // No one is using the data sample, so we can release it back to 00947 // its allocator. 00948 // 00949 result = this->unsent_data_.dequeue(stale) != 0; 00950 release_buffer(stale); 00951 released = true; 00952 00953 if (DCPS_debug_level > 9) { 00954 GuidConverter converter(publication_id_); 00955 ACE_DEBUG((LM_DEBUG, 00956 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ") 00957 ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"), 00958 this->domain_id_, 00959 this->topic_name_, 00960 OPENDDS_STRING(converter).c_str())); 00961 } 00962 } else { 00963 ACE_ERROR_RETURN((LM_ERROR, 00964 ACE_TEXT("(%P|%t) ERROR: ") 00965 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ") 00966 ACE_TEXT("The oldest sample is not in any internal list.\n")), 00967 DDS::RETCODE_ERROR); 00968 } 00969 00970 // Signal if there is no pending data. 00971 { 00972 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00973 guard, 00974 this->lock_, 00975 DDS::RETCODE_ERROR); 00976 00977 if (!pending_data()) 00978 empty_condition_.broadcast(); 00979 } 00980 00981 if (result == false) { 00982 ACE_ERROR_RETURN((LM_ERROR, 00983 ACE_TEXT("(%P|%t) ERROR: ") 00984 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ") 00985 ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")), 00986 DDS::RETCODE_ERROR); 00987 00988 } 00989 00990 return DDS::RETCODE_OK; 00991 }
void OpenDDS::DCPS::WriteDataContainer::reschedule_deadline | ( | ) |
Reset time interval for each instance.
Definition at line 1355 of file WriteDataContainer.cpp.
References ACE_TEXT(), instances_, LM_ERROR, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
01356 { 01357 for (PublicationInstanceMapType::iterator iter = instances_.begin(); 01358 iter != instances_.end(); 01359 ++iter) { 01360 if (iter->second->deadline_timer_id_ != -1) { 01361 if (this->writer_->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) { 01362 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WriteDataContainer::reschedule_deadline %p\n") 01363 ACE_TEXT("reset_timer_interval"))); 01364 } 01365 } 01366 } 01367 }
bool OpenDDS::DCPS::WriteDataContainer::sequence_acknowledged | ( | const SequenceNumber | sequence | ) |
Definition at line 1466 of file WriteDataContainer.cpp.
References ACE_TEXT(), acked_sequences_, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().
Referenced by wait_ack_of_seq().
01467 { 01468 if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 01469 //return true here so that wait_for_acknowledgements doesn't block 01470 return true; 01471 } 01472 01473 SequenceNumber acked = acked_sequences_.cumulative_ack(); 01474 if (DCPS_debug_level >= 10) { 01475 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged ") 01476 ACE_TEXT("- cumulative ack is currently: %q\n"), acked.getValue())); 01477 } 01478 if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){ 01479 //if acked_sequences_ is empty or its cumulative_ack is lower than 01480 //the requests sequence, return false 01481 return false; 01482 } 01483 return true; 01484 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::unregister | ( | DDS::InstanceHandle_t | handle, | |
Message_Block_Ptr & | registered_sample, | |||
bool | dup_registered_sample = true | |||
) |
Remove the provided instance from the instances_ list. The registered sample data will be released upon the deletion of the PublicationInstance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.
This method returns error if the instance is not registered.
Definition at line 334 of file WriteDataContainer.cpp.
References ACE_TEXT(), OpenDDS::DCPS::find(), OpenDDS::DCPS::RcHandle< T >::in(), instances_, LM_ERROR, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
Referenced by unregister_all().
00338 { 00339 PublicationInstance_rch instance; 00340 00341 int const find_attempt = find(instances_, instance_handle, instance); 00342 00343 if (0 != find_attempt) { 00344 ACE_ERROR_RETURN((LM_ERROR, 00345 ACE_TEXT("(%P|%t) ERROR: ") 00346 ACE_TEXT("WriteDataContainer::unregister, ") 00347 ACE_TEXT("The instance(handle=%X) ") 00348 ACE_TEXT("is not registered yet.\n"), 00349 instance_handle), 00350 DDS::RETCODE_PRECONDITION_NOT_MET); 00351 } // if (0 != find_attempt) 00352 00353 instance->unregistered_ = true; 00354 00355 if (dup_registered_sample) { 00356 // The registered_sample is shallow copied. 00357 registered_sample.reset(instance->registered_sample_->duplicate()); 00358 } 00359 00360 if (this->writer_->watchdog_.in()) 00361 this->writer_->watchdog_->cancel_timer(instance); 00362 00363 return DDS::RETCODE_OK; 00364 }
void OpenDDS::DCPS::WriteDataContainer::unregister_all | ( | ) |
Unregister all instances managed by this data containers.
Definition at line 1192 of file WriteDataContainer.cpp.
References ACE_TEXT(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), condition_, DBG_ENTRY_LVL, dispose(), instances_, LM_ERROR, lock_, OpenDDS::DCPS::TransportClient::remove_all_msgs(), DDS::RETCODE_OK, shutdown_, OpenDDS::DCPS::unbind(), unregister(), waiting_on_release_, and writer_.
01193 { 01194 DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6); 01195 shutdown_ = true; 01196 01197 { 01198 //The internal list needs protection since this call may result from the 01199 //the delete_datawriter call which does not acquire the lock in advance. 01200 ACE_GUARD(ACE_Recursive_Thread_Mutex, 01201 guard, 01202 this->lock_); 01203 // Tell transport remove all control messages currently 01204 // transport is processing. 01205 (void) this->writer_->remove_all_msgs(); 01206 01207 // Broadcast to wake up all waiting threads. 01208 if (waiting_on_release_) { 01209 condition_.broadcast(); 01210 } 01211 } 01212 DDS::ReturnCode_t ret; 01213 Message_Block_Ptr registered_sample; 01214 PublicationInstanceMapType::iterator it = instances_.begin(); 01215 01216 while (it != instances_.end()) { 01217 // Release the instance data. 01218 ret = dispose(it->first, registered_sample, false); 01219 01220 if (ret != DDS::RETCODE_OK) { 01221 ACE_ERROR((LM_ERROR, 01222 ACE_TEXT("(%P|%t) ERROR: ") 01223 ACE_TEXT("WriteDataContainer::unregister_all, ") 01224 ACE_TEXT("dispose instance %X failed\n"), 01225 it->first)); 01226 } 01227 // Mark the instance unregistered. 01228 ret = unregister(it->first, registered_sample, false); 01229 01230 if (ret != DDS::RETCODE_OK) { 01231 ACE_ERROR((LM_ERROR, 01232 ACE_TEXT("(%P|%t) ERROR: ") 01233 ACE_TEXT("WriteDataContainer::unregister_all, ") 01234 ACE_TEXT("unregister instance %X failed\n"), 01235 it->first)); 01236 } 01237 01238 // Get the next iterator before erase the instance handle. 01239 PublicationInstanceMapType::iterator it_next = it; 01240 ++it_next; 01241 // Remove the instance from the instance list. 01242 unbind(instances_, it->first); 01243 it = it_next; 01244 } 01245 01246 ACE_UNUSED_ARG(registered_sample); 01247 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::wait_ack_of_seq | ( | const ACE_Time_Value & | abs_deadline, | |
const SequenceNumber & | sequence | |||
) |
Definition at line 1431 of file WriteDataContainer.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ACE_OS::gettimeofday(), OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sequence_acknowledged(), ACE_Condition< ACE_Thread_Mutex >::wait(), wfa_condition_, and wfa_lock_.
01432 { 01433 ACE_Time_Value deadline(abs_deadline); 01434 DDS::ReturnCode_t ret = DDS::RETCODE_OK; 01435 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->wfa_lock_, DDS::RETCODE_ERROR); 01436 01437 while (ACE_OS::gettimeofday() < deadline) { 01438 01439 if (!sequence_acknowledged(sequence)) { 01440 // lock is released while waiting and acquired before returning 01441 // from wait. 01442 int const wait_result = wfa_condition_.wait(&deadline); 01443 01444 if (wait_result != 0) { 01445 if (errno == ETIME) { 01446 if (DCPS_debug_level >= 2) { 01447 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq") 01448 ACE_TEXT(" timed out waiting for sequence %q to be acked\n"), 01449 sequence.getValue())); 01450 } 01451 ret = DDS::RETCODE_TIMEOUT; 01452 } else { 01453 ret = DDS::RETCODE_ERROR; 01454 } 01455 } 01456 } else { 01457 ret = DDS::RETCODE_OK; 01458 break; 01459 } 01460 } 01461 01462 return ret; 01463 }
void OpenDDS::DCPS::WriteDataContainer::wait_pending | ( | ) |
Block until pending samples have either been delivered or dropped.
Definition at line 1370 of file WriteDataContainer.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, empty_condition_, ACE_OS::gettimeofday(), LM_DEBUG, LM_INFO, lock_, log_send_state_lists(), pending_data(), TheServiceParticipant, ACE_Condition< ACE_Recursive_Thread_Mutex >::wait(), and ACE_Time_Value::zero.
01371 { 01372 ACE_Time_Value pending_timeout = 01373 TheServiceParticipant->pending_timeout(); 01374 01375 ACE_Time_Value* pTimeout = 0; 01376 01377 if (pending_timeout != ACE_Time_Value::zero) { 01378 pTimeout = &pending_timeout; 01379 pending_timeout += ACE_OS::gettimeofday(); 01380 } 01381 01382 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01383 const bool report = DCPS_debug_level > 0 && pending_data(); 01384 if (report) { 01385 if (pending_timeout == ACE_Time_Value::zero) { 01386 ACE_DEBUG((LM_DEBUG, 01387 ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending no timeout\n"))); 01388 } else { 01389 ACE_DEBUG((LM_DEBUG, 01390 ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending timeout ") 01391 ACE_TEXT("at %#T\n"), 01392 &pending_timeout)); 01393 } 01394 } 01395 while (true) { 01396 01397 if (!pending_data()) 01398 break; 01399 01400 if (empty_condition_.wait(pTimeout) == -1 && pending_data()) { 01401 if (DCPS_debug_level) { 01402 ACE_DEBUG((LM_INFO, 01403 ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending %p\n"), 01404 ACE_TEXT("Timed out waiting for messages to be transported"))); 01405 this->log_send_state_lists("WriteDataContainer::wait_pending - wait failed: "); 01406 } 01407 break; 01408 } 01409 } 01410 if (report) { 01411 ACE_DEBUG((LM_DEBUG, 01412 "%T (%P|%t) WriteDataContainer::wait_pending done\n")); 01413 } 01414 }
void OpenDDS::DCPS::WriteDataContainer::wakeup_blocking_writers | ( | DataSampleElement * | stale | ) | [private] |
Called when data has been dropped or delivered and any blocked writers should be notified
Definition at line 1487 of file WriteDataContainer.cpp.
References ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), condition_, and waiting_on_release_.
Referenced by data_delivered(), and data_dropped().
01488 { 01489 if (!stale && waiting_on_release_) { 01490 waiting_on_release_ = false; 01491 01492 condition_.broadcast(); 01493 } 01494 }
friend class ::DDS_TEST [friend] |
Definition at line 352 of file WriteDataContainer.h.
friend class DataWriterImpl [friend] |
Definition at line 121 of file WriteDataContainer.h.
Definition at line 408 of file WriteDataContainer.h.
Referenced by data_delivered(), and sequence_acknowledged().
Definition at line 487 of file WriteDataContainer.h.
Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().
The list of all samples written to this datawriter in writing order.
Definition at line 429 of file WriteDataContainer.h.
Referenced by obtain_buffer(), and release_buffer().
DDS::DomainId_t const OpenDDS::DCPS::WriteDataContainer::domain_id_ [private] |
Domain ID.
Definition at line 508 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), persist_data(), reenqueue_all(), remove_excess_durable(), and remove_oldest_sample().
DataDurabilityCache* const OpenDDS::DCPS::WriteDataContainer::durability_cache_ [private] |
Pointer to the data durability cache.
This a pointer to the data durability cache owned by the Service Participant singleton, which means this cache is also a singleton.
Definition at line 524 of file WriteDataContainer.h.
Referenced by persist_data().
DDS::DurabilityServiceQosPolicy const& OpenDDS::DCPS::WriteDataContainer::durability_service_ [private] |
DURABILITY_SERVICE QoS specific to the DataWriter.
Definition at line 527 of file WriteDataContainer.h.
Referenced by persist_data().
ACE_Condition<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::WriteDataContainer::empty_condition_ [private] |
Definition at line 488 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().
Definition at line 450 of file WriteDataContainer.h.
Referenced by obtain_buffer().
PublicationInstanceMapType OpenDDS::DCPS::WriteDataContainer::instances_ [private] |
The individual instance queue threads in the data.
Definition at line 438 of file WriteDataContainer.h.
Referenced by dispose(), get_handle_instance(), get_instance_handles(), log_send_state_lists(), num_all_samples(), num_samples(), obtain_buffer(), reenqueue_all(), register_instance(), remove_excess_durable(), reschedule_deadline(), unregister(), and unregister_all().
This lock is used to protect the container and the map in the type-specific DataWriter. This lock can be accessible via the datawriter. This lock is made to be globally accessible for performance concern. The lock is acquired as the external call (e.g. FooDataWriterImpl::write) started and the same lock will be used by the transport thread to notify the datawriter the data is delivered. Other internal operations will not lock.
Definition at line 486 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), dispose(), get_instance_handles(), num_all_samples(), num_samples(), reenqueue_all(), remove_oldest_sample(), unregister_all(), and wait_pending().
The maximum time to block on write operation. This comes from DataWriter's QoS HISTORY.max_blocking_time
Definition at line 472 of file WriteDataContainer.h.
Referenced by obtain_buffer().
The maximum number of samples from each instance that can be added to the resend_data_ for durability.
Definition at line 454 of file WriteDataContainer.h.
Referenced by data_delivered(), reenqueue_all(), and remove_excess_durable().
The maximum number of instances allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
Definition at line 460 of file WriteDataContainer.h.
Referenced by register_instance().
The maximum number of samples allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS It also covers QoS.RESOURCE_LIMITS.max_samples and max_instances * max_samples_per_instance
Definition at line 468 of file WriteDataContainer.h.
Referenced by obtain_buffer().
The maximum size a container should allow for an instance sample list
Definition at line 448 of file WriteDataContainer.h.
Referenced by obtain_buffer().
The number of chunks that sample_list_element_allocator_ needs initialize.
Definition at line 498 of file WriteDataContainer.h.
Referenced by WriteDataContainer().
List of data that has been released by WriteDataContainer but is still in process of delivery (or dropping) by transport
Definition at line 425 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), log_send_state_lists(), pending_data(), remove_oldest_sample(), and ~WriteDataContainer().
The publication Id from repo.
Definition at line 441 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), obtain_buffer(), obtain_buffer_for_control(), reenqueue_all(), remove_excess_durable(), and remove_oldest_sample().
List of the data reenqueued to support the TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the samples in sent and sending list. This list will be passed to the transport for re-sending.
Definition at line 435 of file WriteDataContainer.h.
Referenced by get_resend_data(), and reenqueue_all().
DataSampleElementAllocator OpenDDS::DCPS::WriteDataContainer::sample_list_element_allocator_ [private] |
The cached allocator to allocate DataSampleElement objects.
Definition at line 502 of file WriteDataContainer.h.
Referenced by copy_and_prepend(), obtain_buffer(), obtain_buffer_for_control(), release_buffer(), and WriteDataContainer().
List of data that is currently being sent.
Definition at line 418 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), get_unsent_data(), log_send_state_lists(), pending_data(), reenqueue_all(), remove_oldest_sample(), and ~WriteDataContainer().
List of data that has already been sent.
Definition at line 421 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), log_send_state_lists(), persist_data(), reenqueue_all(), remove_excess_durable(), remove_oldest_sample(), and ~WriteDataContainer().
bool OpenDDS::DCPS::WriteDataContainer::shutdown_ [private] |
The flag indicates the datawriter will be destroyed.
Definition at line 505 of file WriteDataContainer.h.
Referenced by obtain_buffer(), unregister_all(), and ~WriteDataContainer().
char const* const OpenDDS::DCPS::WriteDataContainer::topic_name_ [private] |
Topic name.
Definition at line 511 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), persist_data(), reenqueue_all(), remove_excess_durable(), and remove_oldest_sample().
Id used to keep track of which send transaction DataWriter is currently creating
Definition at line 415 of file WriteDataContainer.h.
Referenced by get_unsent_data().
char const* const OpenDDS::DCPS::WriteDataContainer::type_name_ [private] |
List of data that has not been sent yet.
Definition at line 411 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), enqueue(), enqueue_control(), get_unsent_data(), log_send_state_lists(), pending_data(), remove_oldest_sample(), and ~WriteDataContainer().
bool OpenDDS::DCPS::WriteDataContainer::waiting_on_release_ [private] |
The block waiting flag.
Definition at line 475 of file WriteDataContainer.h.
Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().
Used to block in wait_for_acks().
Definition at line 494 of file WriteDataContainer.h.
Referenced by data_delivered(), and wait_ack_of_seq().
Lock used for wait_for_acks() processing.
Definition at line 491 of file WriteDataContainer.h.
Referenced by data_delivered(), and wait_ack_of_seq().
The writer that owns this container.
Definition at line 444 of file WriteDataContainer.h.
Referenced by copy_and_prepend(), data_delivered(), data_dropped(), dispose(), enqueue(), obtain_buffer(), obtain_buffer_for_control(), register_instance(), remove_oldest_sample(), reschedule_deadline(), unregister(), and unregister_all().