#include <WriteDataContainer.h>
Inheritance diagram for OpenDDS::DCPS::WriteDataContainer:
This container is instantiated per DataWriter. It maintains list of PublicationInstance objects which is internally referenced by the instance handle.
This container contains threaded lists of all data written to a given DataWriter. The real data sample is represented by the DataSampleElement. The data_holder_ holds all DataSampleElement in the writing order via the next_writer_sample_/previous_writer_sample_ thread. The instance list in PublicationInstance links samples via the next_instance_sample_ thread.
There are three state transition lists used during write operations: unsent, sending, and sent. These lists are linked via the next_send_sample_/previous_send_sample_ thread. Any DataSampleElement should be in one of these three lists and SHOULD NOT be shared between these three lists. A normal transition of a DataSampleElement would be unsent->sending->sent. A DataSampleElement transitions from unsent to sent naturally during a write operation when get_unsent_data is called. A DataSampleElement transitions from sending to sent when data_delivered is called notifying the container that the transport is finished with the sample and it can be marked as a historical sample. A DataSampleElement may transition back to unsent from sending if the transport notifies that the sample was dropped and should be resent. A DataSampleElement is removed from sent list and freed when the instance queue or container (for another instance) needs more space. A DataSampleElement may be removed and freed directly from sending when and unreliable writer needs space. In this case the transport will be notified that the sample should be removed. A DataSampleElement may also be removed directly from unsent if an unreliable writer needs space for new samples. Note: The real data sample will be freed when the reference counting goes 0. The resend list is only used when the datawriter uses TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements for the data sample duplicates of the sending and sent list and hands this list off to the transport.
Definition at line 116 of file WriteDataContainer.h.
OpenDDS::DCPS::WriteDataContainer::WriteDataContainer | ( | DataWriterImpl * | writer, | |
CORBA::Long | depth, | |||
::DDS::Duration_t | max_blocking_time, | |||
size_t | n_chunks, | |||
DDS::DomainId_t | domain_id, | |||
char const * | topic_name, | |||
char const * | type_name, | |||
DataDurabilityCache * | durability_cache, | |||
DDS::DurabilityServiceQosPolicy const & | durability_service, | |||
CORBA::Long | max_instances, | |||
CORBA::Long | max_total_samples | |||
) |
No default constructor, must be initialized.
writer | The writer which owns this container. |
depth | Depth of the instance sample queue. |
max_blocking_time | The timeout for write. |
n_chunks | The number of chunks that the DataSampleElementAllocator needs allocate. |
domain_id | Domain ID. |
topic_name | Topic name. |
type_name | Type name. |
durability_cache | The data durability cache for unsent data. |
durability_service | DURABILITY_SERVICE QoS specific to the DataWriter. |
max_instances | maximum number of instances, 0 for unlimited |
max_total_samples | maximum total number of samples, 0 for unlimited |
Definition at line 69 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DCPS_debug_level, n_chunks_, sample_list_element_allocator_, and transport_send_element_allocator_.
00083 : transaction_id_(0), 00084 writer_(writer), 00085 depth_(depth), 00086 max_num_instances_(max_instances), 00087 max_num_samples_(max_total_samples), 00088 max_blocking_time_(max_blocking_time), 00089 waiting_on_release_(false), 00090 condition_(lock_), 00091 empty_condition_(lock_), 00092 wfa_condition_(this->wfa_lock_), 00093 n_chunks_(n_chunks), 00094 sample_list_element_allocator_(2 * n_chunks_), 00095 transport_send_element_allocator_(2 * n_chunks_, 00096 sizeof(TransportSendElement)), 00097 transport_customized_element_allocator_(2 * n_chunks_, 00098 sizeof(TransportCustomizedElement)), 00099 shutdown_(false), 00100 domain_id_(domain_id), 00101 topic_name_(topic_name), 00102 type_name_(type_name) 00103 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 00104 , durability_cache_(durability_cache) 00105 , durability_service_(durability_service) 00106 #endif 00107 { 00108 00109 if (DCPS_debug_level >= 2) { 00110 ACE_DEBUG((LM_DEBUG, 00111 "(%P|%t) WriteDataContainer " 00112 "sample_list_element_allocator %x with %d chunks\n", 00113 &sample_list_element_allocator_, n_chunks_)); 00114 ACE_DEBUG((LM_DEBUG, 00115 "(%P|%t) WriteDataContainer " 00116 "transport_send_element_allocator %x with %d chunks\n", 00117 &transport_send_element_allocator_, n_chunks_)); 00118 } 00119 }
OpenDDS::DCPS::WriteDataContainer::~WriteDataContainer | ( | ) |
Default destructor.
Definition at line 121 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DCPS_debug_level, and shutdown_.
00122 { 00123 if (this->unsent_data_.size() > 0) { 00124 ACE_DEBUG((LM_WARNING, 00125 ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ") 00126 ACE_TEXT("destroyed with %d samples unsent.\n"), 00127 this->unsent_data_.size())); 00128 } 00129 00130 if (this->sending_data_.size() > 0) { 00131 ACE_DEBUG((LM_WARNING, 00132 ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ") 00133 ACE_TEXT("destroyed with %d samples sending.\n"), 00134 this->sending_data_.size())); 00135 } 00136 00137 if (this->sent_data_.size() > 0) { 00138 if (DCPS_debug_level > 0) { 00139 ACE_DEBUG((LM_DEBUG, 00140 ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ") 00141 ACE_TEXT("destroyed with %d samples sent.\n"), 00142 this->sent_data_.size())); 00143 } 00144 } 00145 00146 if (this->orphaned_to_transport_.size() > 0) { 00147 if (DCPS_debug_level > 0) { 00148 ACE_DEBUG((LM_DEBUG, 00149 ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ") 00150 ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"), 00151 this->orphaned_to_transport_.size())); 00152 } 00153 } 00154 00155 if (!shutdown_) { 00156 ACE_ERROR((LM_ERROR, 00157 ACE_TEXT("(%P|%t) ERROR: ") 00158 ACE_TEXT("WriteDataContainer::~WriteDataContainer, ") 00159 ACE_TEXT("The container has not been cleaned.\n"))); 00160 } 00161 }
OpenDDS::DCPS::WriteDataContainer::WriteDataContainer | ( | WriteDataContainer const & | ) | [private] |
void OpenDDS::DCPS::WriteDataContainer::copy_and_append | ( | SendStateDataSampleList & | list, | |
const SendStateDataSampleList & | appended, | |||
const RepoId & | reader_id, | |||
const DDS::LifespanQosPolicy & | lifespan, | |||
const OPENDDS_STRING & | filterClassName, | |||
const FilterEvaluator * | eval, | |||
const DDS::StringSeq & | params | |||
) | [private] |
Definition at line 1292 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::begin(), OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataWriterImpl::filter_out(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::resend_data_expired(), sample_list_element_allocator_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and writer_.
01303 { 01304 for (SendStateDataSampleList::const_iterator cur = appended.begin(); 01305 cur != appended.end(); ++cur) { 01306 01307 // Do not copy and append data that has exceeded the configured 01308 // lifespan. 01309 if (resend_data_expired(*cur, lifespan)) 01310 continue; 01311 01312 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01313 if (eval && writer_->filter_out(*cur, filterClassName, *eval, params)) 01314 continue; 01315 #endif 01316 01317 DataSampleElement* element = 0; 01318 ACE_NEW_MALLOC(element, 01319 static_cast<DataSampleElement*>( 01320 sample_list_element_allocator_.malloc( 01321 sizeof(DataSampleElement))), 01322 DataSampleElement(*cur)); 01323 01324 element->set_num_subs(1); 01325 element->set_sub_id(0, reader_id); 01326 01327 list.enqueue_tail(element); 01328 } 01329 }
void OpenDDS::DCPS::WriteDataContainer::data_delivered | ( | const DataSampleElement * | sample | ) |
Acknowledge the delivery of data. The sample that resides in this container will be moved from sending_data_ list to the internal sent_data_ list. If there are any threads waiting for available space, it wakes up these threads.
Definition at line 533 of file WriteDataContainer.cpp.
References acked_sequences_, OpenDDS::DCPS::DataWriterImpl::controlTracker, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MessageTracker::message_delivered(), OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OPENDDS_VECTOR(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::set_flag(), unsent_data_, wakeup_blocking_writers(), wfa_condition_, and writer_.
Referenced by OpenDDS::DCPS::DataWriterImpl::data_delivered(), and data_dropped().
00534 { 00535 DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6); 00536 00537 if (DCPS_debug_level >= 2) { 00538 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered") 00539 ACE_TEXT(" %X \n"), sample)); 00540 } 00541 00542 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00543 guard, 00544 this->lock_); 00545 00546 // Delivered samples _must_ be on sending_data_ list 00547 00548 // If it is not found in one of the lists, an invariant 00549 // exception is declared. 00550 00551 // The element now needs to be removed from the sending_data_ 00552 // list, and appended to the end of the sent_data_ list here 00553 00554 DataSampleElement* stale = const_cast<DataSampleElement*>(sample); 00555 00556 // If sample is on a SendStateDataSampleList it should be on the 00557 // sending_data_ list signifying it was given to the transport to 00558 // deliver and now the transport is signaling it has been delivered 00559 if (!sending_data_.dequeue(sample)) { 00560 // 00561 // Should be on sending_data_. If it is in sent_data_ 00562 // or unsent_data there was a problem. 00563 // 00564 OPENDDS_VECTOR(SendStateDataSampleList*) send_lists; 00565 send_lists.push_back(&sent_data_); 00566 send_lists.push_back(&unsent_data_); 00567 send_lists.push_back(&orphaned_to_transport_); 00568 00569 const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists); 00570 00571 if (containing_list == &this->sent_data_) { 00572 ACE_ERROR((LM_WARNING, 00573 ACE_TEXT("(%P|%t) WARNING: ") 00574 ACE_TEXT("WriteDataContainer::data_delivered, ") 00575 ACE_TEXT("The delivered sample is not in sending_data_ and ") 00576 ACE_TEXT("WAS IN sent_data_.\n"))); 00577 } else if (containing_list == &this->unsent_data_) { 00578 ACE_ERROR((LM_WARNING, 00579 ACE_TEXT("(%P|%t) WARNING: ") 00580 ACE_TEXT("WriteDataContainer::data_delivered, ") 00581 ACE_TEXT("The delivered sample is not in sending_data_ and ") 00582 ACE_TEXT("WAS IN unsent_data_ list.\n"))); 00583 } else { 00584 00585 if (containing_list == &this->orphaned_to_transport_) { 00586 orphaned_to_transport_.dequeue(sample); 00587 release_buffer(stale); 00588 } 00589 //No-op: elements may be removed from all WriteDataContainer lists during shutdown 00590 //and inform transport of their release. Transport will call data-delivered on the 00591 //elements as it processes the removal but they will already be gone from the send lists. 00592 if (stale->get_header().message_id_ != SAMPLE_DATA) { 00593 //this message was a control message so release it 00594 if (DCPS_debug_level > 9) { 00595 GuidConverter converter(publication_id_); 00596 ACE_DEBUG((LM_DEBUG, 00597 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ") 00598 ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"), 00599 this->domain_id_, 00600 this->topic_name_, 00601 OPENDDS_STRING(converter).c_str())); 00602 } 00603 writer_->controlTracker.message_delivered(); 00604 } 00605 00606 if (!pending_data()) 00607 empty_condition_.broadcast(); 00608 } 00609 00610 return; 00611 } 00612 ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, this->wfa_lock_); 00613 SequenceNumber acked_seq = stale->get_header().sequence_; 00614 SequenceNumber prev_max = acked_sequences_.cumulative_ack(); 00615 00616 if (stale->get_header().message_id_ != SAMPLE_DATA) { 00617 //this message was a control message so release it 00618 if (DCPS_debug_level > 9) { 00619 GuidConverter converter(publication_id_); 00620 ACE_DEBUG((LM_DEBUG, 00621 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ") 00622 ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"), 00623 this->domain_id_, 00624 this->topic_name_, 00625 OPENDDS_STRING(converter).c_str())); 00626 } 00627 release_buffer(stale); 00628 writer_->controlTracker.message_delivered(); 00629 } else { 00630 if (DCPS_debug_level > 9) { 00631 GuidConverter converter(publication_id_); 00632 ACE_DEBUG((LM_DEBUG, 00633 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ") 00634 ACE_TEXT("domain %d topic %C publication %C pushed to HISTORY.\n"), 00635 this->domain_id_, 00636 this->topic_name_, 00637 OPENDDS_STRING(converter).c_str())); 00638 } 00639 00640 DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample()); 00641 sent_data_.enqueue_tail(sample); 00642 00643 this->wakeup_blocking_writers (stale); 00644 } 00645 if (DCPS_debug_level > 9) { 00646 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ") 00647 ACE_TEXT("Inserting acked_sequence: %q\n"), 00648 acked_seq.getValue())); 00649 } 00650 00651 acked_sequences_.insert(acked_seq); 00652 00653 if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || 00654 prev_max < acked_sequences_.cumulative_ack()) { 00655 00656 if (DCPS_debug_level > 9) { 00657 ACE_DEBUG((LM_DEBUG, 00658 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ") 00659 ACE_TEXT("broadcasting wait_for_acknowledgments update.\n"))); 00660 } 00661 00662 wfa_condition_.broadcast(); 00663 } 00664 00665 // Signal if there is no pending data. 00666 if (!pending_data()) 00667 empty_condition_.broadcast(); 00668 }
void OpenDDS::DCPS::WriteDataContainer::data_dropped | ( | const DataSampleElement * | element, | |
bool | dropped_by_transport | |||
) |
This method is called by the transport to notify the sample is dropped. Which the transport was told to do by the publication code by calling TransportClient::remove_sample(). If the sample was "sending" then it is moved to the "unsent" list. If there are any threads waiting for available space then it needs wake up these threads. The dropped_by_transport flag true indicates the dropping initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample().
Definition at line 671 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DataWriterImpl::controlTracker, data_delivered(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OPENDDS_VECTOR(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, unsent_data_, wakeup_blocking_writers(), and writer_.
Referenced by OpenDDS::DCPS::DataWriterImpl::data_dropped().
00673 { 00674 DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6); 00675 00676 if (DCPS_debug_level >= 2) { 00677 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped") 00678 ACE_TEXT(" sample %X dropped_by_transport %d\n"), 00679 sample, dropped_by_transport)); 00680 } 00681 00682 // If the transport initiates the data dropping, we need do same thing 00683 // as data_delivered. e.g. remove the sample from the internal list 00684 // and the instance list. We do not need acquire the lock here since 00685 // the data_delivered acquires the lock. 00686 if (dropped_by_transport) { 00687 this->data_delivered(sample); 00688 return; 00689 } 00690 00691 //The data_dropped could be called from the thread initiating sample remove 00692 //which already hold the lock. In this case, it's not necessary to acquire 00693 //lock here. It also could be called from the transport thread in a delayed 00694 //notification, it's necessary to acquire lock here to protect the internal 00695 //structures in this class. 00696 00697 ACE_GUARD (ACE_Recursive_Thread_Mutex, 00698 guard, 00699 this->lock_); 00700 00701 // The dropped sample should be in the sending_data_ list. 00702 // Otherwise an exception will be raised. 00703 // 00704 // We are now been notified by transport, so we can 00705 // keep the sample from the sending_data_ list still in 00706 // sample list since we will send it. 00707 00708 DataSampleElement* stale = const_cast<DataSampleElement*>(sample); 00709 00710 // If sample is on a SendStateDataSampleList it should be on the 00711 // sending_data_ list signifying it was given to the transport to 00712 // deliver and now the transport is signaling it has been dropped 00713 00714 if (sending_data_.dequeue(sample)) { 00715 // else: The data_dropped is called as a result of remove_sample() 00716 // called from reenqueue_all() which supports the TRANSIENT_LOCAL 00717 // qos. The samples that are sending by transport are dropped from 00718 // transport and will be moved to the unsent list for resend. 00719 unsent_data_.enqueue_tail(sample); 00720 00721 } else { 00722 // 00723 // If it is in sent_data_ or unsent_data there was a problem. 00724 // 00725 OPENDDS_VECTOR(SendStateDataSampleList*) send_lists; 00726 send_lists.push_back(&sent_data_); 00727 send_lists.push_back(&unsent_data_); 00728 send_lists.push_back(&orphaned_to_transport_); 00729 00730 const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists); 00731 00732 if (containing_list == &this->sent_data_) { 00733 ACE_ERROR((LM_WARNING, 00734 ACE_TEXT("(%P|%t) WARNING: ") 00735 ACE_TEXT("WriteDataContainer::data_dropped, ") 00736 ACE_TEXT("The dropped sample is not in sending_data_ and ") 00737 ACE_TEXT("WAS IN sent_data_.\n"))); 00738 } else if (containing_list == &this->unsent_data_) { 00739 ACE_ERROR((LM_WARNING, 00740 ACE_TEXT("(%P|%t) WARNING: ") 00741 ACE_TEXT("WriteDataContainer::data_dropped, ") 00742 ACE_TEXT("The dropped sample is not in sending_data_ and ") 00743 ACE_TEXT("WAS IN unsent_data_ list.\n"))); 00744 } else { 00745 //No-op: elements may be removed from all WriteDataContainer lists during shutdown 00746 //and inform transport of their release. Transport will call data-dropped on the 00747 //elements as it processes the removal but they will already be gone from the send lists. 00748 if (stale->get_header().message_id_ != SAMPLE_DATA) { 00749 //this message was a control message so release it 00750 if (DCPS_debug_level > 9) { 00751 GuidConverter converter(publication_id_); 00752 ACE_DEBUG((LM_DEBUG, 00753 ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ") 00754 ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"), 00755 this->domain_id_, 00756 this->topic_name_, 00757 OPENDDS_STRING(converter).c_str())); 00758 } 00759 writer_->controlTracker.message_dropped(); 00760 } 00761 if (containing_list == &this->orphaned_to_transport_) { 00762 orphaned_to_transport_.dequeue(sample); 00763 release_buffer(stale); 00764 if (!pending_data()) 00765 empty_condition_.broadcast(); 00766 } 00767 } 00768 00769 return; 00770 } 00771 00772 this->wakeup_blocking_writers (stale); 00773 00774 if (!pending_data()) 00775 empty_condition_.broadcast(); 00776 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::dispose | ( | DDS::InstanceHandle_t | handle, | |
DataSample *& | registered_sample, | |||
bool | dup_registered_sample = true | |||
) |
Delete the samples for the provided instance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.
This method returns error if the instance is not registered.
Definition at line 369 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::OfferedDeadlineWatchdog::cancel_timer(), OpenDDS::DCPS::find(), instances_, OpenDDS::DCPS::PublicationInstance::registered_sample_, remove_oldest_sample(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::PublicationInstance::samples_, OpenDDS::DCPS::InstanceDataSampleList::size(), OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
Referenced by OpenDDS::DCPS::DataWriterImpl::dispose(), OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister(), and unregister_all().
00372 { 00373 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00374 guard, 00375 this->lock_, 00376 DDS::RETCODE_ERROR); 00377 00378 PublicationInstance* instance = 0; 00379 00380 int const find_attempt = find(instances_, instance_handle, instance); 00381 00382 if (0 != find_attempt) { 00383 ACE_ERROR_RETURN((LM_ERROR, 00384 ACE_TEXT("(%P|%t) ERROR: ") 00385 ACE_TEXT("WriteDataContainer::dispose, ") 00386 ACE_TEXT("The instance(handle=%X) ") 00387 ACE_TEXT("is not registered yet.\n"), 00388 instance_handle), 00389 DDS::RETCODE_PRECONDITION_NOT_MET); 00390 } 00391 00392 if (dup_registered_sample) { 00393 // The registered_sample is shallow copied. 00394 registered_sample = instance->registered_sample_->duplicate(); 00395 } 00396 00397 // Note: The DDS specification is unclear as to if samples in the process 00398 // of being sent should be removed or not. 00399 // The advantage of calling remove_sample() on them is that the 00400 // cached allocator memory for them is freed. The disadvantage 00401 // is that the slow reader may see multiple disposes without 00402 // any write sample between them and hence not temporarily move into the 00403 // Alive state. 00404 // We have chosen to NOT remove the sending samples. 00405 00406 InstanceDataSampleList& instance_list = instance->samples_; 00407 00408 while (instance_list.size() > 0) { 00409 bool released = false; 00410 DDS::ReturnCode_t ret 00411 = remove_oldest_sample(instance_list, released); 00412 00413 if (ret != DDS::RETCODE_OK) { 00414 return ret; 00415 } 00416 } 00417 00418 if (this->writer_->watchdog_) 00419 this->writer_->watchdog_->cancel_timer(instance); 00420 return DDS::RETCODE_OK; 00421 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue | ( | DataSampleElement * | sample, | |
DDS::InstanceHandle_t | instance | |||
) |
Enqueue the data sample in its instance thread. This method assumes there is an available space for the sample in the instance list.
Definition at line 178 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::PublicationInstance::cur_sample_tv_, OpenDDS::DCPS::InstanceDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(), get_handle_instance(), OpenDDS::DCPS::PublicationInstance::last_sample_tv_, DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, unsent_data_, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
Referenced by OpenDDS::DCPS::DataWriterImpl::write().
00181 { 00182 // Get the PublicationInstance pointer from InstanceHandle_t. 00183 PublicationInstance* const instance = 00184 get_handle_instance(instance_handle); 00185 // Extract the instance queue. 00186 InstanceDataSampleList& instance_list = instance->samples_; 00187 00188 if (this->writer_->watchdog_) { 00189 instance->last_sample_tv_ = instance->cur_sample_tv_; 00190 instance->cur_sample_tv_ = ACE_OS::gettimeofday(); 00191 this->writer_->watchdog_->execute(instance, false); 00192 } 00193 00194 // 00195 // Enqueue to the next_send_sample_ thread of unsent_data_ 00196 // will link samples with the next_sample/previous_sample and 00197 // also next_send_sample_. 00198 // This would save time when we actually send the data. 00199 00200 unsent_data_.enqueue_tail(sample); 00201 00202 // 00203 // Add this sample to the INSTANCE scope list. 00204 instance_list.enqueue_tail(sample); 00205 00206 return DDS::RETCODE_OK; 00207 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue_control | ( | DataSampleElement * | control_sample | ) |
Definition at line 164 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), DDS::RETCODE_OK, and unsent_data_.
Referenced by OpenDDS::DCPS::DataWriterImpl::dispose(), OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister(), OpenDDS::DCPS::DataWriterImpl::register_instance_i(), and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().
00165 { 00166 // Enqueue to the next_send_sample_ thread of unsent_data_ 00167 // will link samples with the next_sample/previous_sample and 00168 // also next_send_sample_. 00169 // This would save time when we actually send the data. 00170 00171 unsent_data_.enqueue_tail(control_sample); 00172 00173 return DDS::RETCODE_OK; 00174 }
PublicationInstance * OpenDDS::DCPS::WriteDataContainer::get_handle_instance | ( | DDS::InstanceHandle_t | handle | ) |
Definition at line 1276 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::find(), and instances_.
Referenced by OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), enqueue(), OpenDDS::DCPS::DataWriterImpl::get_handle_instance(), and obtain_buffer().
01277 { 01278 PublicationInstance* instance = 0; 01279 01280 if (0 != find(instances_, handle, instance)) { 01281 ACE_ERROR((LM_ERROR, 01282 ACE_TEXT("(%P|%t) ERROR: ") 01283 ACE_TEXT("WriteDataContainer::get_handle_instance, ") 01284 ACE_TEXT("lookup for %d failed\n"), 01285 handle)); 01286 } 01287 01288 return instance; 01289 }
void OpenDDS::DCPS::WriteDataContainer::get_instance_handles | ( | InstanceHandleVec & | instance_handles | ) |
Definition at line 1436 of file WriteDataContainer.cpp.
References instances_.
Referenced by OpenDDS::DCPS::DataWriterImpl::get_instance_handles().
01437 { 01438 ACE_GUARD(ACE_Recursive_Thread_Mutex, 01439 guard, 01440 this->lock_); 01441 PublicationInstanceMapType::iterator it = instances_.begin(); 01442 01443 while (it != instances_.end()) { 01444 instance_handles.push_back(it->second->instance_handle_); 01445 ++it; 01446 } 01447 }
SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::get_resend_data | ( | ) |
Obtain a list of data for resending. This is only used when TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list returned is not put on any SendStateDataSampleList.
Definition at line 504 of file WriteDataContainer.cpp.
References DBG_ENTRY_LVL, resend_data_, and OpenDDS::DCPS::SendStateDataSampleList::reset().
00505 { 00506 DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6); 00507 00508 // 00509 // The samples in unsent_data are added to the sending_data 00510 // during enqueue. 00511 // 00512 SendStateDataSampleList list = this->resend_data_; 00513 00514 // 00515 // Clear the unsent data list. 00516 // 00517 this->resend_data_.reset(); 00518 // 00519 // Return the moved list. 00520 // 00521 return list; 00522 }
ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::get_unsent_data | ( | SendStateDataSampleList & | list | ) |
Obtain a list of data that has not yet been sent. The data on the list returned is moved from the internal unsent_data_ list to the internal sending_data_ list as part of this call. The entire list is linked via the DataSampleElement.next_send_sample_ link as well.
Definition at line 465 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::SendStateDataSampleList::begin(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::reset(), sending_data_, transaction_id_, and unsent_data_.
00466 { 00467 DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6); 00468 // 00469 // The samples in unsent_data are added to the local datawriter 00470 // list and enqueued to the sending_data_ signifying they have 00471 // been passed to the transport to send in a transaction 00472 // 00473 list = this->unsent_data_; 00474 00475 // Increment send counter for this send operation 00476 ++transaction_id_; 00477 00478 // Mark all samples with current send counter 00479 SendStateDataSampleList::iterator iter = list.begin(); 00480 while (iter != list.end()) { 00481 iter->set_transaction_id(this->transaction_id_); 00482 ++iter; 00483 } 00484 00485 // 00486 // The unsent_data_ already linked with the 00487 // next_send_sample during enqueue. 00488 // Append the unsent_data_ to current sending_data_ 00489 // list. 00490 sending_data_.enqueue_tail(list); 00491 00492 // 00493 // Clear the unsent data list. 00494 // 00495 this->unsent_data_.reset(); 00496 00497 // 00498 // Return the moved list. 00499 // 00500 return transaction_id_; 00501 }
void OpenDDS::DCPS::WriteDataContainer::log_send_state_lists | ( | OPENDDS_STRING | description | ) | [private] |
Definition at line 1516 of file WriteDataContainer.cpp.
References instances_, num_all_samples(), orphaned_to_transport_, sending_data_, sent_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.
Referenced by wait_pending().
01517 { 01518 ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n", 01519 description.c_str(), 01520 unsent_data_.size(), 01521 sending_data_.size(), 01522 sent_data_.size(), 01523 orphaned_to_transport_.size(), 01524 num_all_samples(), 01525 instances_.size())); 01526 }
size_t OpenDDS::DCPS::WriteDataContainer::num_all_samples | ( | ) |
Return the number of samples for all instances.
Definition at line 445 of file WriteDataContainer.cpp.
References instances_.
Referenced by log_send_state_lists().
00446 { 00447 size_t size = 0; 00448 00449 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00450 guard, 00451 this->lock_, 00452 0); 00453 00454 for (PublicationInstanceMapType::iterator iter = instances_.begin(); 00455 iter != instances_.end(); 00456 ++iter) 00457 { 00458 size += iter->second->samples_.size(); 00459 } 00460 00461 return size; 00462 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::num_samples | ( | DDS::InstanceHandle_t | handle, | |
size_t & | size | |||
) |
Return the number of samples for the given instance.
Definition at line 424 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::find(), instances_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, and OpenDDS::DCPS::InstanceDataSampleList::size().
Referenced by OpenDDS::DCPS::DataWriterImpl::num_samples().
00426 { 00427 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00428 guard, 00429 this->lock_, 00430 DDS::RETCODE_ERROR); 00431 PublicationInstance* instance = 0; 00432 00433 int const find_attempt = find(instances_, handle, instance); 00434 00435 if (0 != find_attempt) { 00436 return DDS::RETCODE_ERROR; 00437 00438 } else { 00439 size = instance->samples_.size(); 00440 return DDS::RETCODE_OK; 00441 } 00442 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer | ( | DataSampleElement *& | element, | |
DDS::InstanceHandle_t | handle | |||
) |
Allocate a DataSampleElement object and check the space availability for newly allocated element according to qos settings. For the blocking write case, if resource limits or history qos limits are reached, then it blocks for max blocking time for a previous sample to be delivered or dropped by the transport. In non-blocking write case, if resource limits or history qos limits are reached, will attempt to remove oldest samples (forcing the transport to drop samples if necessary) to make space. If there are several threads waiting then the first one in the waiting list can enqueue, others continue waiting.
Definition at line 1020 of file WriteDataContainer.cpp.
References condition_, data_holder_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, depth_, OpenDDS::DCPS::duration_to_absolute_time_value(), OpenDDS::DCPS::WriterDataSampleList::enqueue_tail(), get_handle_instance(), instances_, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), max_blocking_time_, max_num_samples_, publication_id_, release_buffer(), DDS::RELIABLE_RELIABILITY_QOS, remove_oldest_historical_sample(), remove_oldest_sample(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sample_list_element_allocator_, OpenDDS::DCPS::PublicationInstance::samples_, shutdown_, transport_customized_element_allocator_, transport_send_element_allocator_, and waiting_on_release_.
Referenced by OpenDDS::DCPS::DataWriterImpl::write().
01022 { 01023 DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6); 01024 01025 PublicationInstance* instance = get_handle_instance(handle); 01026 01027 ACE_NEW_MALLOC_RETURN( 01028 element, 01029 static_cast<DataSampleElement*>( 01030 sample_list_element_allocator_.malloc( 01031 sizeof(DataSampleElement))), 01032 DataSampleElement(publication_id_, 01033 this->writer_, 01034 instance, 01035 &transport_send_element_allocator_, 01036 &transport_customized_element_allocator_), 01037 DDS::RETCODE_ERROR); 01038 01039 // Extract the current instance queue. 01040 InstanceDataSampleList& instance_list = instance->samples_; 01041 DDS::ReturnCode_t ret = DDS::RETCODE_OK; 01042 01043 bool need_to_set_abs_timeout = true; 01044 ACE_Time_Value abs_timeout; 01045 01046 //depth_ covers both HistoryQosPolicy kind and depth as well as 01047 //ResourceLimitsQosPolicy max_samples_per_instance 01048 //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and 01049 //max_instances and max_instances * depth 01050 while ((instance_list.size() >= depth_) || 01051 ((this->max_num_samples_ > 0) && 01052 ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) { 01053 01054 //Need to either remove stale samples or wait for space to become available 01055 if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) { 01056 //Remove historical samples already sent 01057 bool removed_historical = false; 01058 if (DCPS_debug_level >= 2) { 01059 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01060 ACE_TEXT(" instance %d attempting to remove") 01061 ACE_TEXT(" its oldest historical sample\n"), 01062 handle)); 01063 } 01064 01065 ret = this->remove_oldest_historical_sample(instance_list, removed_historical); 01066 01067 //else try to remove historical samples from other instances 01068 if (ret == DDS::RETCODE_OK && !removed_historical) { 01069 if (DCPS_debug_level >= 2) { 01070 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01071 ACE_TEXT(" instance %d attempting to remove") 01072 ACE_TEXT(" oldest historical sample from any full instances\n"), 01073 handle)); 01074 } 01075 PublicationInstanceMapType::iterator it = instances_.begin(); 01076 01077 while (!removed_historical && it != instances_.end() && ret == DDS::RETCODE_OK) { 01078 if(it->second->samples_.size() >= depth_) { 01079 ret = this->remove_oldest_historical_sample(it->second->samples_, removed_historical); 01080 } 01081 ++it; 01082 } 01083 } 01084 01085 if (ret == DDS::RETCODE_OK && !removed_historical) { 01086 //Reliable writers can wait 01087 if (need_to_set_abs_timeout) { 01088 abs_timeout = duration_to_absolute_time_value (max_blocking_time_); 01089 need_to_set_abs_timeout = false; 01090 } 01091 if (!shutdown_ && ACE_OS::gettimeofday() < abs_timeout) { 01092 if (DCPS_debug_level >= 2) { 01093 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01094 ACE_TEXT(" instance %d waiting for samples to be released by transport\n"), 01095 handle)); 01096 } 01097 01098 waiting_on_release_ = true; 01099 // lock is released while waiting and acquired before returning 01100 // from wait. 01101 int const wait_result = condition_.wait(&abs_timeout); 01102 01103 if (wait_result != 0) { 01104 if (errno == ETIME) { 01105 if (DCPS_debug_level >= 2) { 01106 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01107 ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"), 01108 handle)); 01109 } 01110 ret = DDS::RETCODE_TIMEOUT; 01111 } else { 01112 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: WriteDataContainer::obtain_buffer condition_.wait()") 01113 ACE_TEXT("%p\n"))); 01114 ret = DDS::RETCODE_ERROR; 01115 } 01116 } 01117 } else { 01118 //either shutdown has been signaled or max_blocking_time 01119 //has surpassed so treat as timeout 01120 ret = DDS::RETCODE_TIMEOUT; 01121 } 01122 } 01123 01124 } else { 01125 //BEST EFFORT 01126 bool oldest_released = false; 01127 01128 //try to remove stale samples from this instance 01129 // The remove_oldest_sample() method removes the oldest sample 01130 // from instance list and removes it from the internal lists. 01131 if (instance_list.size() > 0) { 01132 if (DCPS_debug_level >= 2) { 01133 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01134 ACE_TEXT(" instance %d attempting to remove") 01135 ACE_TEXT(" its oldest sample\n"), 01136 handle)); 01137 } 01138 ret = this->remove_oldest_sample(instance_list, oldest_released); 01139 } 01140 //else try to remove stale samples from other instances which are full 01141 if (ret == DDS::RETCODE_OK && !oldest_released) { 01142 if (DCPS_debug_level >= 2) { 01143 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01144 ACE_TEXT(" instance %d attempting to remove") 01145 ACE_TEXT(" oldest sample from any full instances\n"), 01146 handle)); 01147 } 01148 PublicationInstanceMapType::iterator it = instances_.begin(); 01149 01150 while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) { 01151 if(it->second->samples_.size() >= depth_) { 01152 ret = this->remove_oldest_sample(it->second->samples_, oldest_released); 01153 } 01154 ++it; 01155 } 01156 } 01157 //else try to remove stale samples from other non-full instances 01158 if (ret == DDS::RETCODE_OK && !oldest_released) { 01159 if (DCPS_debug_level >= 2) { 01160 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01161 ACE_TEXT(" instance %d attempting to remove") 01162 ACE_TEXT(" oldest sample from any instance with samples currently\n"), 01163 handle)); 01164 } 01165 PublicationInstanceMapType::iterator it = instances_.begin(); 01166 01167 while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) { 01168 if(it->second->samples_.size() > 0) { 01169 ret = this->remove_oldest_sample(it->second->samples_, oldest_released); 01170 } 01171 ++it; 01172 } 01173 } 01174 if (!oldest_released) { 01175 //This means that no instances have samples to remove and yet 01176 //still hitting resource limits. 01177 ACE_ERROR((LM_ERROR, 01178 ACE_TEXT("(%P|%t) ERROR: ") 01179 ACE_TEXT("WriteDataContainer::obtain_buffer, ") 01180 ACE_TEXT("hitting resource limits with no samples to remove\n"))); 01181 ret = DDS::RETCODE_ERROR; 01182 } 01183 } //END BEST EFFORT 01184 01185 if (ret != DDS::RETCODE_OK) { 01186 if (DCPS_debug_level >= 2) { 01187 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer") 01188 ACE_TEXT(" instance %d could not obtain buffer for sample") 01189 ACE_TEXT(" releasing allotted sample and returning\n"), 01190 handle)); 01191 } 01192 this->release_buffer(element); 01193 return ret; 01194 } 01195 } //END WHILE 01196 01197 data_holder_.enqueue_tail(element); 01198 01199 return ret; 01200 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control | ( | DataSampleElement *& | element | ) |
Definition at line 1000 of file WriteDataContainer.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), publication_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, transport_customized_element_allocator_, and transport_send_element_allocator_.
Referenced by OpenDDS::DCPS::DataWriterImpl::dispose(), OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister(), OpenDDS::DCPS::DataWriterImpl::register_instance_i(), and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().
01001 { 01002 DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6); 01003 01004 ACE_NEW_MALLOC_RETURN( 01005 element, 01006 static_cast<DataSampleElement*>( 01007 sample_list_element_allocator_.malloc( 01008 sizeof(DataSampleElement))), 01009 DataSampleElement(publication_id_, 01010 this->writer_, 01011 0, 01012 &transport_send_element_allocator_, 01013 &transport_customized_element_allocator_), 01014 DDS::RETCODE_ERROR); 01015 01016 return DDS::RETCODE_OK; 01017 }
typedef OpenDDS::DCPS::WriteDataContainer::OPENDDS_VECTOR | ( | DDS::InstanceHandle_t | ) |
Returns a vector of handles for the instances registered for this data writer.
Referenced by data_delivered(), data_dropped(), remove_oldest_historical_sample(), and remove_oldest_sample().
WriteDataContainer& OpenDDS::DCPS::WriteDataContainer::operator= | ( | WriteDataContainer const & | ) | [private] |
bool OpenDDS::DCPS::WriteDataContainer::pending_data | ( | ) |
Returns if pending data exists. This includes sending, and unsent data.
Definition at line 525 of file WriteDataContainer.cpp.
References orphaned_to_transport_, sending_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.
Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().
00526 { 00527 return this->sending_data_.size() != 0 00528 || this->orphaned_to_transport_.size() != 0 00529 || this->unsent_data_.size() != 0; 00530 }
bool OpenDDS::DCPS::WriteDataContainer::persist_data | ( | ) |
Copy sent data to data DURABILITY cache.
Definition at line 1333 of file WriteDataContainer.cpp.
References durability_cache_, and OpenDDS::DCPS::DataDurabilityCache::insert().
Referenced by OpenDDS::DCPS::DataWriterImpl::persist_data().
01334 { 01335 bool result = true; 01336 01337 // ------------------------------------------------------------ 01338 // Transfer sent data to data DURABILITY cache. 01339 // ------------------------------------------------------------ 01340 if (this->durability_cache_) { 01341 // A data durability cache is available for TRANSIENT or 01342 // PERSISTENT data durability. Cache the data samples. 01343 01344 // 01345 // We only cache data that is not still in use outside of 01346 // this instance of WriteDataContainer 01347 // (only cache samples in sent_data_ meaning transport has delivered). 01348 bool const inserted = 01349 this->durability_cache_->insert(this->domain_id_, 01350 this->topic_name_, 01351 this->type_name_, 01352 this->sent_data_, 01353 this->durability_service_ 01354 ); 01355 01356 result = inserted; 01357 01358 if (!inserted) 01359 ACE_ERROR((LM_ERROR, 01360 ACE_TEXT("(%P|%t) ERROR: ") 01361 ACE_TEXT("WriteDataContainer::persist_data, ") 01362 ACE_TEXT("failed to make data durable for ") 01363 ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"), 01364 this->domain_id_, 01365 this->topic_name_, 01366 this->type_name_)); 01367 } 01368 01369 return result; 01370 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::reenqueue_all | ( | const RepoId & | reader_id, | |
const DDS::LifespanQosPolicy & | lifespan, | |||
const OPENDDS_STRING & | filterClassName, | |||
const FilterEvaluator * | eval, | |||
const DDS::StringSeq & | params | |||
) |
Create a resend list with the copies of all current "sending" and "sent" samples. The samples will be sent to the subscriber specified.
Definition at line 210 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, publication_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sending_data_, sent_data_, and OpenDDS::DCPS::SendStateDataSampleList::size().
Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i().
00219 { 00220 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00221 guard, 00222 this->lock_, 00223 DDS::RETCODE_ERROR); 00224 00225 // Make a copy of sending_data_ and sent_data_; 00226 if (sent_data_.size() > 0) { 00227 this->copy_and_append(this->resend_data_, 00228 sent_data_, 00229 reader_id, 00230 lifespan 00231 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00232 , filterClassName, eval, expression_params 00233 #endif 00234 ); 00235 00236 if (sending_data_.size() > 0) { 00237 this->copy_and_append(this->resend_data_, 00238 sending_data_, 00239 reader_id, 00240 lifespan 00241 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00242 , filterClassName, eval, expression_params 00243 #endif 00244 ); 00245 } 00246 00247 if (DCPS_debug_level > 9) { 00248 GuidConverter converter(publication_id_); 00249 GuidConverter reader(reader_id); 00250 ACE_DEBUG((LM_DEBUG, 00251 ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ") 00252 ACE_TEXT("domain %d topic %C publication %C copying HISTORY to resend to %C.\n"), 00253 this->domain_id_, 00254 this->topic_name_, 00255 OPENDDS_STRING(converter).c_str(), 00256 OPENDDS_STRING(reader).c_str())); 00257 } 00258 } 00259 00260 return DDS::RETCODE_OK; 00261 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::register_instance | ( | DDS::InstanceHandle_t & | instance_handle, | |
DataSample *& | registered_sample | |||
) |
Dynamically allocate a PublicationInstance object and add to the instances_ list.
Definition at line 264 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::bind(), OpenDDS::DCPS::find(), OpenDDS::DCPS::DataWriterImpl::get_next_handle(), DDS::HANDLE_NIL, instances_, max_num_instances_, OpenDDS::DCPS::PublicationInstance::registered_sample_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, OpenDDS::DCPS::OfferedDeadlineWatchdog::schedule_timer(), OpenDDS::DCPS::PublicationInstance::unregistered_, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
Referenced by OpenDDS::DCPS::DataWriterImpl::register_instance_i().
00267 { 00268 PublicationInstance* instance = 0; 00269 auto_ptr<PublicationInstance> safe_instance; 00270 00271 if (instance_handle == DDS::HANDLE_NIL) { 00272 if (max_num_instances_ > 0 00273 && max_num_instances_ <= (CORBA::Long) instances_.size()) { 00274 return DDS::RETCODE_OUT_OF_RESOURCES; 00275 } 00276 00277 // registered the instance for the first time. 00278 ACE_NEW_RETURN(instance, 00279 PublicationInstance(registered_sample), 00280 DDS::RETCODE_ERROR); 00281 00282 ACE_auto_ptr_reset(safe_instance, instance); 00283 00284 instance_handle = this->writer_->get_next_handle(); 00285 00286 int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance); 00287 00288 if (0 != insert_attempt) { 00289 ACE_ERROR((LM_ERROR, 00290 ACE_TEXT("(%P|%t) ERROR: ") 00291 ACE_TEXT("WriteDataContainer::register_instance, ") 00292 ACE_TEXT("failed to insert instance handle=%X\n"), 00293 instance)); 00294 return DDS::RETCODE_ERROR; 00295 } // if (0 != insert_attempt) 00296 00297 instance->instance_handle_ = instance_handle; 00298 00299 } else { 00300 int const find_attempt = find(instances_, instance_handle, instance); 00301 ACE_auto_ptr_reset(safe_instance, instance); 00302 00303 if (0 != find_attempt) { 00304 ACE_ERROR((LM_ERROR, 00305 ACE_TEXT("(%P|%t) ERROR: ") 00306 ACE_TEXT("WriteDataContainer::register_instance, ") 00307 ACE_TEXT("The provided instance handle=%X is not a valid") 00308 ACE_TEXT("handle.\n"), 00309 instance_handle)); 00310 00311 return DDS::RETCODE_ERROR; 00312 } // if (0 != find_attempt) 00313 00314 // don't need this - the PublicationInstances already has a sample. 00315 registered_sample->release(); 00316 00317 instance->unregistered_ = false; 00318 } 00319 00320 // The registered_sample is shallow copied. 00321 registered_sample = instance->registered_sample_->duplicate(); 00322 00323 if (this->writer_->watchdog_) { 00324 this->writer_->watchdog_->schedule_timer(instance); 00325 } 00326 00327 safe_instance.release(); // Safe to relinquish ownership. 00328 00329 return DDS::RETCODE_OK; 00330 }
void OpenDDS::DCPS::WriteDataContainer::release_buffer | ( | DataSampleElement * | element | ) |
Release the memory previously allocated. This method is corresponding to the obtain_buffer method. If the memory is allocated by some allocator then the memory needs to be released to the allocator.
Definition at line 1203 of file WriteDataContainer.cpp.
References data_holder_, OpenDDS::DCPS::WriterDataSampleList::dequeue(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::SAMPLE_DATA, and sample_list_element_allocator_.
Referenced by data_delivered(), data_dropped(), obtain_buffer(), remove_oldest_historical_sample(), and remove_oldest_sample().
01204 { 01205 if (element->get_header().message_id_ == SAMPLE_DATA) 01206 data_holder_.dequeue(element); 01207 // Release the memory to the allocator. 01208 ACE_DES_FREE(element, 01209 sample_list_element_allocator_.free, 01210 DataSampleElement); 01211 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::remove_oldest_historical_sample | ( | InstanceDataSampleList & | instance_list, | |
bool & | released | |||
) | [private] |
Remove the sample (head) from the instance list if it is only being used for history purposes (located on sent_data_ list). This method also updates the internal lists to reflect the change. The "released" boolean value indicates whether a sample was released.
Definition at line 779 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue_head(), OpenDDS::DCPS::InstanceDataSampleList::head(), OPENDDS_STRING, OPENDDS_VECTOR(), publication_id_, release_buffer(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), and sent_data_.
Referenced by obtain_buffer().
00782 { 00783 DataSampleElement* stale = 0; 00784 00785 if (instance_list.head() != 0) { 00786 stale = instance_list.head(); 00787 } else { 00788 //it is fine for the instance_list to be empty 00789 return DDS::RETCODE_OK; 00790 } 00791 00792 // Only interested in trying to remove historical samples. 00793 // If the sample is not on the sent_data_ list, simply can't 00794 // be removed -- not an error 00795 00796 OPENDDS_VECTOR(SendStateDataSampleList*) send_lists; 00797 send_lists.push_back(&sent_data_); 00798 00799 const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists); 00800 00801 // Identify if the stale data is in the sent_data_ (i.e. HISTORY) 00802 // and can therefore be released 00803 00804 bool result = false; 00805 00806 if (containing_list == &this->sent_data_) { 00807 // No one is using the historical data sample, so we can release it back to 00808 // its allocator. 00809 // First remove the oldest sample from the instance list. 00810 // 00811 if (instance_list.dequeue_head(stale) == false) { 00812 ACE_ERROR_RETURN((LM_ERROR, 00813 ACE_TEXT("(%P|%t) ERROR: ") 00814 ACE_TEXT("WriteDataContainer::remove_oldest_historical_sample, ") 00815 ACE_TEXT("dequeue_head_next_sample failed\n")), 00816 DDS::RETCODE_ERROR); 00817 } 00818 00819 // Now attempt to remove the sample from the internal list and release its buffer 00820 result = this->sent_data_.dequeue(stale) != 0; 00821 release_buffer(stale); 00822 released = true; 00823 00824 if (DCPS_debug_level > 9) { 00825 GuidConverter converter(publication_id_); 00826 ACE_DEBUG((LM_DEBUG, 00827 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_historical_sample: ") 00828 ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"), 00829 this->domain_id_, 00830 this->topic_name_, 00831 OPENDDS_STRING(converter).c_str())); 00832 } 00833 00834 } else { 00835 //Sample is not a historical sample 00836 return DDS::RETCODE_OK; 00837 } 00838 00839 if (result == false) { 00840 ACE_ERROR_RETURN((LM_ERROR, 00841 ACE_TEXT("(%P|%t) ERROR: ") 00842 ACE_TEXT("WriteDataContainer::remove_oldest_historical_sample, ") 00843 ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")), 00844 DDS::RETCODE_ERROR); 00845 00846 } 00847 00848 return DDS::RETCODE_OK; 00849 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample | ( | InstanceDataSampleList & | instance_list, | |
bool & | released | |||
) | [private] |
Remove the oldest sample (head) from the instance history list. This method also updates the internal lists to reflect the change. If the sample is in the unsent_data_ or sent_data_ list then it will be released. If the sample is in the sending_data_ list then the transport will be notified to release the sample, then the sample will be released. Otherwise an error is returned. The "released" boolean value indicates whether the sample is released.
Definition at line 853 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue_head(), empty_condition_, OPENDDS_STRING, OPENDDS_VECTOR(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, and unsent_data_.
Referenced by dispose(), and obtain_buffer().
00856 { 00857 DataSampleElement* stale = 0; 00858 00859 // 00860 // Remove the oldest sample from the instance list. 00861 // 00862 if (instance_list.dequeue_head(stale) == false) { 00863 ACE_ERROR_RETURN((LM_ERROR, 00864 ACE_TEXT("(%P|%t) ERROR: ") 00865 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ") 00866 ACE_TEXT("dequeue_head_next_sample failed\n")), 00867 DDS::RETCODE_ERROR); 00868 } 00869 00870 // 00871 // Remove the stale data from the next_writer_sample_ list. The 00872 // sending_data_/next_send_sample_ list is not managed within the 00873 // container, it is only used external to the container and does 00874 // not need to be managed internally. 00875 // 00876 // The next_writer_sample_ link is being used in one of the sent_data_, 00877 // sending_data_, or unsent_data lists. Removal from the doubly 00878 // linked list needs to repair the list only when the stale sample 00879 // is either the head or tail of the list. 00880 // 00881 00882 // 00883 // Locate the head of the list that the stale data is in. 00884 // 00885 OPENDDS_VECTOR(SendStateDataSampleList*) send_lists; 00886 send_lists.push_back(&sending_data_); 00887 send_lists.push_back(&sent_data_); 00888 send_lists.push_back(&unsent_data_); 00889 send_lists.push_back(&orphaned_to_transport_); 00890 00891 const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists); 00892 00893 00894 // 00895 // Identify the list that the stale data is in. 00896 // The stale data should be in one of the sent_data_, sending_data_ 00897 // or unsent_data_. It should not be in released_data_ list since 00898 // this function is the only place a sample is moved from 00899 // sending_data_ to released_data_ list. 00900 00901 // Remove the element from the internal list. 00902 bool result = false; 00903 00904 if (containing_list == &this->sending_data_) { 00905 if (DCPS_debug_level > 2) { 00906 ACE_ERROR((LM_WARNING, 00907 ACE_TEXT("(%P|%t) WARNING: ") 00908 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ") 00909 ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n"))); 00910 } 00911 00912 // This means transport is still using the sample that needs to 00913 // be released currently so notify transport that sample is being removed. 00914 00915 if (this->writer_->remove_sample(stale)) { 00916 if (this->sent_data_.dequeue(stale)) { 00917 release_buffer(stale); 00918 result = true; 00919 } 00920 00921 } else { 00922 if (this->sending_data_.dequeue(stale)) { 00923 this->orphaned_to_transport_.enqueue_tail(stale); 00924 } else if (this->sent_data_.dequeue(stale)) { 00925 release_buffer(stale); 00926 result = true; 00927 } 00928 result = true; 00929 } 00930 released = true; 00931 00932 } else if (containing_list == &this->sent_data_) { 00933 // No one is using the data sample, so we can release it back to 00934 // its allocator. 00935 // 00936 result = this->sent_data_.dequeue(stale) != 0; 00937 release_buffer(stale); 00938 released = true; 00939 00940 if (DCPS_debug_level > 9) { 00941 GuidConverter converter(publication_id_); 00942 ACE_DEBUG((LM_DEBUG, 00943 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ") 00944 ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"), 00945 this->domain_id_, 00946 this->topic_name_, 00947 OPENDDS_STRING(converter).c_str())); 00948 } 00949 00950 } else if (containing_list == &this->unsent_data_) { 00951 // 00952 // No one is using the data sample, so we can release it back to 00953 // its allocator. 00954 // 00955 result = this->unsent_data_.dequeue(stale) != 0; 00956 release_buffer(stale); 00957 released = true; 00958 00959 if (DCPS_debug_level > 9) { 00960 GuidConverter converter(publication_id_); 00961 ACE_DEBUG((LM_DEBUG, 00962 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ") 00963 ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"), 00964 this->domain_id_, 00965 this->topic_name_, 00966 OPENDDS_STRING(converter).c_str())); 00967 } 00968 } else { 00969 ACE_ERROR_RETURN((LM_ERROR, 00970 ACE_TEXT("(%P|%t) ERROR: ") 00971 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ") 00972 ACE_TEXT("The oldest sample is not in any internal list.\n")), 00973 DDS::RETCODE_ERROR); 00974 } 00975 00976 // Signal if there is no pending data. 00977 { 00978 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00979 guard, 00980 this->lock_, 00981 DDS::RETCODE_ERROR); 00982 00983 if (!pending_data()) 00984 empty_condition_.broadcast(); 00985 } 00986 00987 if (result == false) { 00988 ACE_ERROR_RETURN((LM_ERROR, 00989 ACE_TEXT("(%P|%t) ERROR: ") 00990 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ") 00991 ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")), 00992 DDS::RETCODE_ERROR); 00993 00994 } 00995 00996 return DDS::RETCODE_OK; 00997 }
void OpenDDS::DCPS::WriteDataContainer::reschedule_deadline | ( | ) |
Reset time interval for each instance.
Definition at line 1373 of file WriteDataContainer.cpp.
References instances_.
Referenced by OpenDDS::DCPS::DataWriterImpl::reschedule_deadline().
01374 { 01375 for (PublicationInstanceMapType::iterator iter = instances_.begin(); 01376 iter != instances_.end(); 01377 ++iter) { 01378 if (iter->second->deadline_timer_id_ != -1) { 01379 if (this->writer_->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) { 01380 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WriteDataContainer::reschedule_deadline %p\n") 01381 ACE_TEXT("reset_timer_interval"))); 01382 } 01383 } 01384 } 01385 }
bool OpenDDS::DCPS::WriteDataContainer::sequence_acknowledged | ( | const SequenceNumber | sequence | ) |
Definition at line 1485 of file WriteDataContainer.cpp.
References acked_sequences_, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().
Referenced by wait_ack_of_seq().
01486 { 01487 if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 01488 //return true here so that wait_for_acknowledgements doesn't block 01489 return true; 01490 } 01491 01492 SequenceNumber acked = acked_sequences_.cumulative_ack(); 01493 if (DCPS_debug_level >= 10) { 01494 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged ") 01495 ACE_TEXT("- cumulative ack is currently: %q\n"), acked.getValue())); 01496 } 01497 if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){ 01498 //if acked_sequences_ is empty or its cumulative_ack is lower than 01499 //the requests sequence, return false 01500 return false; 01501 } 01502 return true; 01503 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::unregister | ( | DDS::InstanceHandle_t | handle, | |
DataSample *& | registered_sample, | |||
bool | dup_registered_sample = true | |||
) |
Remove the provided instance from the instances_ list. The registered sample data will be released upon the deletion of the PublicationInstance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.
This method returns error if the instance is not registered.
Definition at line 333 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::OfferedDeadlineWatchdog::cancel_timer(), OpenDDS::DCPS::find(), instances_, OpenDDS::DCPS::PublicationInstance::registered_sample_, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::DataWriterImpl::unregistered(), OpenDDS::DCPS::PublicationInstance::unregistered_, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.
Referenced by OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister(), unregister_all(), and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().
00337 { 00338 PublicationInstance* instance = 0; 00339 00340 int const find_attempt = find(instances_, instance_handle, instance); 00341 00342 if (0 != find_attempt) { 00343 ACE_ERROR_RETURN((LM_ERROR, 00344 ACE_TEXT("(%P|%t) ERROR: ") 00345 ACE_TEXT("WriteDataContainer::unregister, ") 00346 ACE_TEXT("The instance(handle=%X) ") 00347 ACE_TEXT("is not registered yet.\n"), 00348 instance_handle), 00349 DDS::RETCODE_PRECONDITION_NOT_MET); 00350 } // if (0 != find_attempt) 00351 00352 instance->unregistered_ = true; 00353 00354 if (dup_registered_sample) { 00355 // The registered_sample is shallow copied. 00356 registered_sample = instance->registered_sample_->duplicate(); 00357 } 00358 00359 // Unregister the instance with typed DataWriter. 00360 this->writer_->unregistered(instance_handle); 00361 00362 if (this->writer_->watchdog_) 00363 this->writer_->watchdog_->cancel_timer(instance); 00364 00365 return DDS::RETCODE_OK; 00366 }
void OpenDDS::DCPS::WriteDataContainer::unregister_all | ( | ) |
Unregister all instances managed by this data containers.
Definition at line 1214 of file WriteDataContainer.cpp.
References condition_, DBG_ENTRY_LVL, dispose(), instances_, DDS::RETCODE_OK, shutdown_, OpenDDS::DCPS::unbind(), unregister(), and waiting_on_release_.
Referenced by OpenDDS::DCPS::DataWriterImpl::unregister_all().
01215 { 01216 DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6); 01217 shutdown_ = true; 01218 01219 { 01220 //The internal list needs protection since this call may result from the 01221 //the delete_datawriter call which does not acquire the lock in advance. 01222 ACE_GUARD(ACE_Recursive_Thread_Mutex, 01223 guard, 01224 this->lock_); 01225 // Tell transport remove all control messages currently 01226 // transport is processing. 01227 (void) this->writer_->remove_all_msgs(); 01228 01229 // Broadcast to wake up all waiting threads. 01230 if (waiting_on_release_) { 01231 condition_.broadcast(); 01232 } 01233 } 01234 DDS::ReturnCode_t ret; 01235 DataSample* registered_sample; 01236 PublicationInstanceMapType::iterator it = instances_.begin(); 01237 01238 while (it != instances_.end()) { 01239 // Release the instance data. 01240 ret = dispose(it->first, registered_sample, false); 01241 01242 if (ret != DDS::RETCODE_OK) { 01243 ACE_ERROR((LM_ERROR, 01244 ACE_TEXT("(%P|%t) ERROR: ") 01245 ACE_TEXT("WriteDataContainer::unregister_all, ") 01246 ACE_TEXT("dispose instance %X failed\n"), 01247 it->first)); 01248 } 01249 // Mark the instance unregistered. 01250 ret = unregister(it->first, registered_sample, false); 01251 01252 if (ret != DDS::RETCODE_OK) { 01253 ACE_ERROR((LM_ERROR, 01254 ACE_TEXT("(%P|%t) ERROR: ") 01255 ACE_TEXT("WriteDataContainer::unregister_all, ") 01256 ACE_TEXT("unregister instance %X failed\n"), 01257 it->first)); 01258 } 01259 01260 PublicationInstance* instance = it->second; 01261 01262 delete instance; 01263 01264 // Get the next iterator before erase the instance handle. 01265 PublicationInstanceMapType::iterator it_next = it; 01266 ++it_next; 01267 // Remove the instance from the instance list. 01268 unbind(instances_, it->first); 01269 it = it_next; 01270 } 01271 01272 ACE_UNUSED_ARG(registered_sample); 01273 }
DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::wait_ack_of_seq | ( | const ACE_Time_Value & | abs_deadline, | |
const SequenceNumber & | sequence | |||
) |
Definition at line 1450 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sequence_acknowledged(), and wfa_condition_.
Referenced by OpenDDS::DCPS::DataWriterImpl::wait_for_specific_ack().
01451 { 01452 ACE_Time_Value deadline(abs_deadline); 01453 DDS::ReturnCode_t ret = DDS::RETCODE_OK; 01454 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->wfa_lock_, DDS::RETCODE_ERROR); 01455 01456 while (ACE_OS::gettimeofday() < deadline) { 01457 01458 if (!sequence_acknowledged(sequence)) { 01459 // lock is released while waiting and acquired before returning 01460 // from wait. 01461 int const wait_result = wfa_condition_.wait(&deadline); 01462 01463 if (wait_result != 0) { 01464 if (errno == ETIME) { 01465 if (DCPS_debug_level >= 2) { 01466 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq") 01467 ACE_TEXT(" timed out waiting for sequence %q to be acked\n"), 01468 sequence.getValue())); 01469 } 01470 ret = DDS::RETCODE_TIMEOUT; 01471 } else { 01472 ret = DDS::RETCODE_ERROR; 01473 } 01474 } 01475 } else { 01476 ret = DDS::RETCODE_OK; 01477 break; 01478 } 01479 } 01480 01481 return ret; 01482 }
void OpenDDS::DCPS::WriteDataContainer::wait_pending | ( | ) |
Block until pending samples have either been delivered or dropped.
Definition at line 1388 of file WriteDataContainer.cpp.
References OpenDDS::DCPS::DCPS_debug_level, empty_condition_, log_send_state_lists(), pending_data(), TheServiceParticipant, and OpenDDS::DCPS::MessageTracker::timestamp().
Referenced by OpenDDS::DCPS::DataWriterImpl::wait_pending().
01389 { 01390 ACE_Time_Value pending_timeout = 01391 TheServiceParticipant->pending_timeout(); 01392 01393 ACE_Time_Value* pTimeout = 0; 01394 01395 if (pending_timeout != ACE_Time_Value::zero) { 01396 pTimeout = &pending_timeout; 01397 pending_timeout += ACE_OS::gettimeofday(); 01398 } 01399 01400 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_); 01401 const bool report = DCPS_debug_level > 0 && pending_data(); 01402 if (report) { 01403 ACE_TCHAR date_time[50]; 01404 ACE_TCHAR* const time = 01405 MessageTracker::timestamp(pending_timeout, 01406 date_time, 01407 50); 01408 ACE_DEBUG((LM_DEBUG, 01409 ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending timeout ") 01410 ACE_TEXT("at %s\n"), 01411 (pending_timeout == ACE_Time_Value::zero ? 01412 ACE_TEXT("(no timeout)") : time))); 01413 } 01414 while (true) { 01415 01416 if (!pending_data()) 01417 break; 01418 01419 if (empty_condition_.wait(pTimeout) == -1 && pending_data()) { 01420 if (DCPS_debug_level) { 01421 ACE_DEBUG((LM_INFO, 01422 ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending %p\n"), 01423 ACE_TEXT("Timed out waiting for messages to be transported"))); 01424 this->log_send_state_lists("WriteDataContainer::wait_pending - wait failed: "); 01425 } 01426 break; 01427 } 01428 } 01429 if (report) { 01430 ACE_DEBUG((LM_DEBUG, 01431 "%T (%P|%t) WriteDataContainer::wait_pending done\n")); 01432 } 01433 }
void OpenDDS::DCPS::WriteDataContainer::wakeup_blocking_writers | ( | DataSampleElement * | stale | ) | [private] |
Called when data has been dropped or delivered and any blocked writers should be notified
Definition at line 1506 of file WriteDataContainer.cpp.
References condition_, and waiting_on_release_.
Referenced by data_delivered(), and data_dropped().
01507 { 01508 if (stale && waiting_on_release_) { 01509 waiting_on_release_ = false; 01510 01511 condition_.broadcast(); 01512 } 01513 }
friend class ::DDS_TEST [friend] |
Definition at line 350 of file WriteDataContainer.h.
friend class DataWriterImpl [friend] |
Definition at line 119 of file WriteDataContainer.h.
Definition at line 408 of file WriteDataContainer.h.
Referenced by data_delivered(), and sequence_acknowledged().
ACE_Condition<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::WriteDataContainer::condition_ [private] |
Definition at line 485 of file WriteDataContainer.h.
Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().
The list of all samples written to this datawriter in writing order.
Definition at line 429 of file WriteDataContainer.h.
Referenced by obtain_buffer(), and release_buffer().
CORBA::Long OpenDDS::DCPS::WriteDataContainer::depth_ [private] |
The maximum size a container should allow for an instance sample list It corresponds to the QoS.HISTORY.depth value for QoS.HISTORY.kind==KEEP_LAST and corresponds to the QoS.RESOURCE_LIMITS.max_samples_per_instance for the case of QoS.HISTORY.kind==KEEP_ALL.
Definition at line 452 of file WriteDataContainer.h.
Referenced by obtain_buffer().
DDS::DomainId_t const OpenDDS::DCPS::WriteDataContainer::domain_id_ [private] |
DataDurabilityCache* const OpenDDS::DCPS::WriteDataContainer::durability_cache_ [private] |
Pointer to the data durability cache.
This a pointer to the data durability cache owned by the Service Participant singleton, which means this cache is also a singleton.
Definition at line 530 of file WriteDataContainer.h.
Referenced by persist_data().
DDS::DurabilityServiceQosPolicy const& OpenDDS::DCPS::WriteDataContainer::durability_service_ [private] |
DURABILITY_SERVICE QoS specific to the DataWriter.
Definition at line 533 of file WriteDataContainer.h.
ACE_Condition<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::WriteDataContainer::empty_condition_ [private] |
Definition at line 486 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().
PublicationInstanceMapType OpenDDS::DCPS::WriteDataContainer::instances_ [private] |
The individual instance queue threads in the data.
Definition at line 438 of file WriteDataContainer.h.
Referenced by dispose(), get_handle_instance(), get_instance_handles(), log_send_state_lists(), num_all_samples(), num_samples(), obtain_buffer(), register_instance(), reschedule_deadline(), unregister(), unregister_all(), and OpenDDS::DCPS::DataWriterImpl::unregister_instances().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::WriteDataContainer::lock_ [private] |
This lock is used to protect the container and the map in the type-specific DataWriter. This lock can be accessible via the datawriter. This lock is made to be globally accessible for performance concern. The lock is acquired as the external call (e.g. FooDataWriterImpl::write) started and the same lock will be used by the transport thread to notify the datawriter the data is delivered. Other internal operations will not lock.
Definition at line 484 of file WriteDataContainer.h.
The maximum time to block on write operation. This comes from DataWriter's QoS HISTORY.max_blocking_time
Definition at line 470 of file WriteDataContainer.h.
Referenced by obtain_buffer().
CORBA::Long OpenDDS::DCPS::WriteDataContainer::max_num_instances_ [private] |
The maximum number of instances allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
Definition at line 458 of file WriteDataContainer.h.
Referenced by register_instance().
CORBA::Long OpenDDS::DCPS::WriteDataContainer::max_num_samples_ [private] |
The maximum number of samples allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS It also covers QoS.RESOURCE_LIMITS.max_samples and max_instances * depth
Definition at line 466 of file WriteDataContainer.h.
Referenced by obtain_buffer().
size_t OpenDDS::DCPS::WriteDataContainer::n_chunks_ [private] |
The number of chunks that sample_list_element_allocator_ needs initialize.
Definition at line 496 of file WriteDataContainer.h.
Referenced by WriteDataContainer().
List of data that has been released by WriteDataContainer but is still in process of delivery (or dropping) by transport
Definition at line 425 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), log_send_state_lists(), pending_data(), and remove_oldest_sample().
The publication Id from repo.
Definition at line 441 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), OpenDDS::DCPS::DataWriterImpl::enable(), obtain_buffer(), obtain_buffer_for_control(), reenqueue_all(), remove_oldest_historical_sample(), and remove_oldest_sample().
List of the data reenqueued to support the TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the samples in sent and sending list. This list will be passed to the transport for re-sending.
Definition at line 435 of file WriteDataContainer.h.
Referenced by get_resend_data().
DataSampleElementAllocator OpenDDS::DCPS::WriteDataContainer::sample_list_element_allocator_ [private] |
The cached allocator to allocate DataSampleElement objects.
Definition at line 500 of file WriteDataContainer.h.
Referenced by copy_and_append(), obtain_buffer(), obtain_buffer_for_control(), release_buffer(), and WriteDataContainer().
List of data that is currently being sent.
Definition at line 418 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), get_unsent_data(), log_send_state_lists(), pending_data(), reenqueue_all(), and remove_oldest_sample().
List of data that has already been sent.
Definition at line 421 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), log_send_state_lists(), reenqueue_all(), remove_oldest_historical_sample(), and remove_oldest_sample().
bool OpenDDS::DCPS::WriteDataContainer::shutdown_ [private] |
The flag indicates the datawriter will be destroyed.
Definition at line 511 of file WriteDataContainer.h.
Referenced by obtain_buffer(), unregister_all(), and ~WriteDataContainer().
char const* const OpenDDS::DCPS::WriteDataContainer::topic_name_ [private] |
ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::transaction_id_ [private] |
Id used to keep track of which send transaction DataWriter is currently creating
Definition at line 415 of file WriteDataContainer.h.
Referenced by get_unsent_data().
TransportCustomizedElementAllocator OpenDDS::DCPS::WriteDataContainer::transport_customized_element_allocator_ [private] |
Definition at line 508 of file WriteDataContainer.h.
Referenced by obtain_buffer(), and obtain_buffer_for_control().
TransportSendElementAllocator OpenDDS::DCPS::WriteDataContainer::transport_send_element_allocator_ [private] |
The allocator for TransportSendElement. The TransportSendElement allocator is put here because it needs the number of chunks information that WriteDataContainer has.
Definition at line 506 of file WriteDataContainer.h.
Referenced by obtain_buffer(), obtain_buffer_for_control(), and WriteDataContainer().
char const* const OpenDDS::DCPS::WriteDataContainer::type_name_ [private] |
List of data that has not been sent yet.
Definition at line 411 of file WriteDataContainer.h.
Referenced by data_delivered(), data_dropped(), enqueue(), enqueue_control(), get_unsent_data(), log_send_state_lists(), pending_data(), and remove_oldest_sample().
bool OpenDDS::DCPS::WriteDataContainer::waiting_on_release_ [private] |
The block waiting flag.
Definition at line 473 of file WriteDataContainer.h.
Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().
ACE_Condition<ACE_Thread_Mutex> OpenDDS::DCPS::WriteDataContainer::wfa_condition_ [private] |
Used to block in wait_for_acks().
Definition at line 492 of file WriteDataContainer.h.
Referenced by data_delivered(), and wait_ack_of_seq().
ACE_Thread_Mutex OpenDDS::DCPS::WriteDataContainer::wfa_lock_ [private] |
The writer that owns this container.
Definition at line 444 of file WriteDataContainer.h.
Referenced by copy_and_append(), data_delivered(), data_dropped(), dispose(), enqueue(), register_instance(), and unregister().