OpenDDS::DCPS::WriteDataContainer Class Reference

A container for instances sample data. More...

#include <WriteDataContainer.h>

Inheritance diagram for OpenDDS::DCPS::WriteDataContainer:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::WriteDataContainer:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 WriteDataContainer (DataWriterImpl *writer, CORBA::Long depth,::DDS::Duration_t max_blocking_time, size_t n_chunks, DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataDurabilityCache *durability_cache, DDS::DurabilityServiceQosPolicy const &durability_service, CORBA::Long max_instances, CORBA::Long max_total_samples)
 ~WriteDataContainer ()
DDS::ReturnCode_t enqueue_control (DataSampleElement *control_sample)
DDS::ReturnCode_t enqueue (DataSampleElement *sample, DDS::InstanceHandle_t instance)
DDS::ReturnCode_t reenqueue_all (const RepoId &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params)
DDS::ReturnCode_t register_instance (DDS::InstanceHandle_t &instance_handle, DataSample *&registered_sample)
DDS::ReturnCode_t unregister (DDS::InstanceHandle_t handle, DataSample *&registered_sample, bool dup_registered_sample=true)
DDS::ReturnCode_t dispose (DDS::InstanceHandle_t handle, DataSample *&registered_sample, bool dup_registered_sample=true)
DDS::ReturnCode_t num_samples (DDS::InstanceHandle_t handle, size_t &size)
size_t num_all_samples ()
ACE_UINT64 get_unsent_data (SendStateDataSampleList &list)
SendStateDataSampleList get_resend_data ()
bool pending_data ()
void data_delivered (const DataSampleElement *sample)
void data_dropped (const DataSampleElement *element, bool dropped_by_transport)
DDS::ReturnCode_t obtain_buffer_for_control (DataSampleElement *&element)
DDS::ReturnCode_t obtain_buffer (DataSampleElement *&element, DDS::InstanceHandle_t handle)
void release_buffer (DataSampleElement *element)
void unregister_all ()
PublicationInstanceget_handle_instance (DDS::InstanceHandle_t handle)
bool persist_data ()
void reschedule_deadline ()
 Reset time interval for each instance.
void wait_pending ()
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
void get_instance_handles (InstanceHandleVec &instance_handles)
DDS::ReturnCode_t wait_ack_of_seq (const ACE_Time_Value &abs_deadline, const SequenceNumber &sequence)
bool sequence_acknowledged (const SequenceNumber sequence)

Private Member Functions

 WriteDataContainer (WriteDataContainer const &)
WriteDataContaineroperator= (WriteDataContainer const &)
void copy_and_append (SendStateDataSampleList &list, const SendStateDataSampleList &appended, const RepoId &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params)
DDS::ReturnCode_t remove_oldest_historical_sample (InstanceDataSampleList &instance_list, bool &released)
DDS::ReturnCode_t remove_oldest_sample (InstanceDataSampleList &instance_list, bool &released)
void wakeup_blocking_writers (DataSampleElement *stale)
void log_send_state_lists (OPENDDS_STRING description)

Private Attributes

DisjointSequence acked_sequences_
SendStateDataSampleList unsent_data_
 List of data that has not been sent yet.
ACE_UINT64 transaction_id_
SendStateDataSampleList sending_data_
 List of data that is currently being sent.
SendStateDataSampleList sent_data_
 List of data that has already been sent.
SendStateDataSampleList orphaned_to_transport_
WriterDataSampleList data_holder_
SendStateDataSampleList resend_data_
PublicationInstanceMapType instances_
 The individual instance queue threads in the data.
PublicationId publication_id_
 The publication Id from repo.
DataWriterImplwriter_
 The writer that owns this container.
CORBA::Long depth_
CORBA::Long max_num_instances_
CORBA::Long max_num_samples_
::DDS::Duration_t max_blocking_time_
bool waiting_on_release_
 The block waiting flag.
ACE_Recursive_Thread_Mutex lock_
ACE_Condition< ACE_Recursive_Thread_Mutex > condition_
ACE_Condition< ACE_Recursive_Thread_Mutex > empty_condition_
ACE_Thread_Mutex wfa_lock_
 Lock used for wait_for_acks() processing.
ACE_Condition< ACE_Thread_Mutex > wfa_condition_
 Used to block in wait_for_acks().
size_t n_chunks_
DataSampleElementAllocator sample_list_element_allocator_
TransportSendElementAllocator transport_send_element_allocator_
TransportCustomizedElementAllocator transport_customized_element_allocator_
bool shutdown_
 The flag indicates the datawriter will be destroyed.
DDS::DomainId_t const domain_id_
 Domain ID.
char const *const topic_name_
 Topic name.
char const *const type_name_
 Type name.
DataDurabilityCache *const durability_cache_
 Pointer to the data durability cache.
DDS::DurabilityServiceQosPolicy
const & 
durability_service_
 DURABILITY_SERVICE QoS specific to the DataWriter.

Friends

class DataWriterImpl
class ::DDS_TEST

Detailed Description

A container for instances sample data.

This container is instantiated per DataWriter. It maintains list of PublicationInstance objects which is internally referenced by the instance handle.

This container contains threaded lists of all data written to a given DataWriter. The real data sample is represented by the DataSampleElement. The data_holder_ holds all DataSampleElement in the writing order via the next_writer_sample_/previous_writer_sample_ thread. The instance list in PublicationInstance links samples via the next_instance_sample_ thread.

There are three state transition lists used during write operations: unsent, sending, and sent. These lists are linked via the next_send_sample_/previous_send_sample_ thread. Any DataSampleElement should be in one of these three lists and SHOULD NOT be shared between these three lists. A normal transition of a DataSampleElement would be unsent->sending->sent. A DataSampleElement transitions from unsent to sent naturally during a write operation when get_unsent_data is called. A DataSampleElement transitions from sending to sent when data_delivered is called notifying the container that the transport is finished with the sample and it can be marked as a historical sample. A DataSampleElement may transition back to unsent from sending if the transport notifies that the sample was dropped and should be resent. A DataSampleElement is removed from sent list and freed when the instance queue or container (for another instance) needs more space. A DataSampleElement may be removed and freed directly from sending when and unreliable writer needs space. In this case the transport will be notified that the sample should be removed. A DataSampleElement may also be removed directly from unsent if an unreliable writer needs space for new samples. Note: The real data sample will be freed when the reference counting goes 0. The resend list is only used when the datawriter uses TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements for the data sample duplicates of the sending and sent list and hands this list off to the transport.

Note:
: 1) The PublicationInstance object is not removed from this container until the instance is unregistered. The same instance handle is reused for re-registration. The instance data is deleted when this container is deleted. This would simplify instance data memory management. An alternative way is to remove the handle from the instance list when unregister occurs and delete the instance data after the transport is done with the instance, but we do not have a way to know when the transport is done since we have the sending list for all instances in the same datawriter. 2) This container has the ownership of the instance data samples when the data is written. The type-specific datawriter that allocates the memory for the sample data gives the ownership to its base class. 3) It is the responsibility of the owner of objects of this class to ensure that access to the lists are properly locked. This means using the same lock/condition to access the lists via the enqueue(), get*(), and data_delivered() methods. For the case where these are all called from the same (client) thread, this should be a recursive lock so that: 1) we do not deadlock; and, 2) we incur the cost of obtaining the lock only once.

Definition at line 116 of file WriteDataContainer.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::WriteDataContainer::WriteDataContainer ( DataWriterImpl writer,
CORBA::Long  depth,
::DDS::Duration_t  max_blocking_time,
size_t  n_chunks,
DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
DataDurabilityCache durability_cache,
DDS::DurabilityServiceQosPolicy const &  durability_service,
CORBA::Long  max_instances,
CORBA::Long  max_total_samples 
)

No default constructor, must be initialized.

Parameters:
writer  The writer which owns this container.
depth  Depth of the instance sample queue.
max_blocking_time  The timeout for write.
n_chunks  The number of chunks that the DataSampleElementAllocator needs allocate.
domain_id  Domain ID.
topic_name  Topic name.
type_name  Type name.
durability_cache  The data durability cache for unsent data.
durability_service  DURABILITY_SERVICE QoS specific to the DataWriter.
max_instances  maximum number of instances, 0 for unlimited
max_total_samples  maximum total number of samples, 0 for unlimited

Definition at line 69 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DCPS_debug_level, n_chunks_, sample_list_element_allocator_, and transport_send_element_allocator_.

00083   : transaction_id_(0),
00084     writer_(writer),
00085     depth_(depth),
00086     max_num_instances_(max_instances),
00087     max_num_samples_(max_total_samples),
00088     max_blocking_time_(max_blocking_time),
00089     waiting_on_release_(false),
00090     condition_(lock_),
00091     empty_condition_(lock_),
00092     wfa_condition_(this->wfa_lock_),
00093     n_chunks_(n_chunks),
00094     sample_list_element_allocator_(2 * n_chunks_),
00095     transport_send_element_allocator_(2 * n_chunks_,
00096                                       sizeof(TransportSendElement)),
00097     transport_customized_element_allocator_(2 * n_chunks_,
00098                                             sizeof(TransportCustomizedElement)),
00099     shutdown_(false),
00100     domain_id_(domain_id),
00101     topic_name_(topic_name),
00102     type_name_(type_name)
00103 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00104   , durability_cache_(durability_cache)
00105   , durability_service_(durability_service)
00106 #endif
00107 {
00108 
00109   if (DCPS_debug_level >= 2) {
00110     ACE_DEBUG((LM_DEBUG,
00111                "(%P|%t) WriteDataContainer "
00112                "sample_list_element_allocator %x with %d chunks\n",
00113                &sample_list_element_allocator_, n_chunks_));
00114     ACE_DEBUG((LM_DEBUG,
00115                "(%P|%t) WriteDataContainer "
00116                "transport_send_element_allocator %x with %d chunks\n",
00117                &transport_send_element_allocator_, n_chunks_));
00118   }
00119 }

OpenDDS::DCPS::WriteDataContainer::~WriteDataContainer (  ) 

Default destructor.

Definition at line 121 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DCPS_debug_level, and shutdown_.

00122 {
00123   if (this->unsent_data_.size() > 0) {
00124     ACE_DEBUG((LM_WARNING,
00125                ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00126                ACE_TEXT("destroyed with %d samples unsent.\n"),
00127                this->unsent_data_.size()));
00128   }
00129 
00130   if (this->sending_data_.size() > 0) {
00131     ACE_DEBUG((LM_WARNING,
00132                ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00133                ACE_TEXT("destroyed with %d samples sending.\n"),
00134                this->sending_data_.size()));
00135   }
00136 
00137   if (this->sent_data_.size() > 0) {
00138     if (DCPS_debug_level > 0) {
00139       ACE_DEBUG((LM_DEBUG,
00140                  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00141                  ACE_TEXT("destroyed with %d samples sent.\n"),
00142                  this->sent_data_.size()));
00143     }
00144   }
00145 
00146   if (this->orphaned_to_transport_.size() > 0) {
00147     if (DCPS_debug_level > 0) {
00148       ACE_DEBUG((LM_DEBUG,
00149                  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00150                  ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
00151                  this->orphaned_to_transport_.size()));
00152     }
00153   }
00154 
00155   if (!shutdown_) {
00156     ACE_ERROR((LM_ERROR,
00157                ACE_TEXT("(%P|%t) ERROR: ")
00158                ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
00159                ACE_TEXT("The container has not been cleaned.\n")));
00160   }
00161 }

OpenDDS::DCPS::WriteDataContainer::WriteDataContainer ( WriteDataContainer const &   )  [private]


Member Function Documentation

void OpenDDS::DCPS::WriteDataContainer::copy_and_append ( SendStateDataSampleList list,
const SendStateDataSampleList appended,
const RepoId reader_id,
const DDS::LifespanQosPolicy lifespan,
const OPENDDS_STRING &  filterClassName,
const FilterEvaluator eval,
const DDS::StringSeq params 
) [private]

Definition at line 1292 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::begin(), OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataWriterImpl::filter_out(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::resend_data_expired(), sample_list_element_allocator_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and writer_.

01303 {
01304   for (SendStateDataSampleList::const_iterator cur = appended.begin();
01305        cur != appended.end(); ++cur) {
01306 
01307     // Do not copy and append data that has exceeded the configured
01308     // lifespan.
01309     if (resend_data_expired(*cur, lifespan))
01310       continue;
01311 
01312 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01313     if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
01314       continue;
01315 #endif
01316 
01317     DataSampleElement* element = 0;
01318     ACE_NEW_MALLOC(element,
01319                     static_cast<DataSampleElement*>(
01320                       sample_list_element_allocator_.malloc(
01321                         sizeof(DataSampleElement))),
01322                     DataSampleElement(*cur));
01323 
01324     element->set_num_subs(1);
01325     element->set_sub_id(0, reader_id);
01326 
01327     list.enqueue_tail(element);
01328   }
01329 }

void OpenDDS::DCPS::WriteDataContainer::data_delivered ( const DataSampleElement sample  ) 

Acknowledge the delivery of data. The sample that resides in this container will be moved from sending_data_ list to the internal sent_data_ list. If there are any threads waiting for available space, it wakes up these threads.

Definition at line 533 of file WriteDataContainer.cpp.

References acked_sequences_, OpenDDS::DCPS::DataWriterImpl::controlTracker, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MessageTracker::message_delivered(), OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OPENDDS_VECTOR(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::set_flag(), unsent_data_, wakeup_blocking_writers(), wfa_condition_, and writer_.

Referenced by OpenDDS::DCPS::DataWriterImpl::data_delivered(), and data_dropped().

00534 {
00535   DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
00536 
00537   if (DCPS_debug_level >= 2) {
00538     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
00539                           ACE_TEXT(" %X \n"), sample));
00540   }
00541 
00542   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00543             guard,
00544             this->lock_);
00545 
00546   // Delivered samples _must_ be on sending_data_ list
00547 
00548   // If it is not found in one of the lists, an invariant
00549   // exception is declared.
00550 
00551   // The element now needs to be removed from the sending_data_
00552   // list, and appended to the end of the sent_data_ list here
00553 
00554   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00555 
00556   // If sample is on a SendStateDataSampleList it should be on the
00557   // sending_data_ list signifying it was given to the transport to
00558   // deliver and now the transport is signaling it has been delivered
00559   if (!sending_data_.dequeue(sample)) {
00560     //
00561     // Should be on sending_data_.  If it is in sent_data_
00562     // or unsent_data there was a problem.
00563     //
00564     OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00565     send_lists.push_back(&sent_data_);
00566     send_lists.push_back(&unsent_data_);
00567     send_lists.push_back(&orphaned_to_transport_);
00568 
00569     const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00570 
00571     if (containing_list == &this->sent_data_) {
00572       ACE_ERROR((LM_WARNING,
00573                  ACE_TEXT("(%P|%t) WARNING: ")
00574                  ACE_TEXT("WriteDataContainer::data_delivered, ")
00575                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
00576                  ACE_TEXT("WAS IN sent_data_.\n")));
00577     } else if (containing_list == &this->unsent_data_) {
00578       ACE_ERROR((LM_WARNING,
00579                  ACE_TEXT("(%P|%t) WARNING: ")
00580                  ACE_TEXT("WriteDataContainer::data_delivered, ")
00581                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
00582                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
00583     } else {
00584 
00585       if (containing_list == &this->orphaned_to_transport_) {
00586         orphaned_to_transport_.dequeue(sample);
00587         release_buffer(stale);
00588       }
00589       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
00590       //and inform transport of their release.  Transport will call data-delivered on the
00591       //elements as it processes the removal but they will already be gone from the send lists.
00592       if (stale->get_header().message_id_ != SAMPLE_DATA) {
00593         //this message was a control message so release it
00594         if (DCPS_debug_level > 9) {
00595           GuidConverter converter(publication_id_);
00596           ACE_DEBUG((LM_DEBUG,
00597                      ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00598                      ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00599                      this->domain_id_,
00600                      this->topic_name_,
00601                      OPENDDS_STRING(converter).c_str()));
00602         }
00603         writer_->controlTracker.message_delivered();
00604       }
00605 
00606       if (!pending_data())
00607         empty_condition_.broadcast();
00608     }
00609 
00610     return;
00611   }
00612   ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, this->wfa_lock_);
00613   SequenceNumber acked_seq = stale->get_header().sequence_;
00614   SequenceNumber prev_max = acked_sequences_.cumulative_ack();
00615 
00616   if (stale->get_header().message_id_ != SAMPLE_DATA) {
00617     //this message was a control message so release it
00618     if (DCPS_debug_level > 9) {
00619       GuidConverter converter(publication_id_);
00620       ACE_DEBUG((LM_DEBUG,
00621                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00622                  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00623                  this->domain_id_,
00624                  this->topic_name_,
00625                  OPENDDS_STRING(converter).c_str()));
00626     }
00627     release_buffer(stale);
00628     writer_->controlTracker.message_delivered();
00629   } else {
00630     if (DCPS_debug_level > 9) {
00631       GuidConverter converter(publication_id_);
00632       ACE_DEBUG((LM_DEBUG,
00633                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00634                  ACE_TEXT("domain %d topic %C publication %C pushed to HISTORY.\n"),
00635                  this->domain_id_,
00636                  this->topic_name_,
00637                  OPENDDS_STRING(converter).c_str()));
00638     }
00639 
00640     DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
00641     sent_data_.enqueue_tail(sample);
00642 
00643     this->wakeup_blocking_writers (stale);
00644   }
00645   if (DCPS_debug_level > 9) {
00646     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00647                          ACE_TEXT("Inserting acked_sequence: %q\n"),
00648                          acked_seq.getValue()));
00649   }
00650 
00651   acked_sequences_.insert(acked_seq);
00652 
00653   if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
00654       prev_max < acked_sequences_.cumulative_ack()) {
00655 
00656     if (DCPS_debug_level > 9) {
00657       ACE_DEBUG((LM_DEBUG,
00658                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
00659                  ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
00660     }
00661 
00662     wfa_condition_.broadcast();
00663   }
00664 
00665   // Signal if there is no pending data.
00666   if (!pending_data())
00667     empty_condition_.broadcast();
00668 }

void OpenDDS::DCPS::WriteDataContainer::data_dropped ( const DataSampleElement element,
bool  dropped_by_transport 
)

This method is called by the transport to notify the sample is dropped. Which the transport was told to do by the publication code by calling TransportClient::remove_sample(). If the sample was "sending" then it is moved to the "unsent" list. If there are any threads waiting for available space then it needs wake up these threads. The dropped_by_transport flag true indicates the dropping initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample().

Definition at line 671 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DataWriterImpl::controlTracker, data_delivered(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, OPENDDS_VECTOR(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, unsent_data_, wakeup_blocking_writers(), and writer_.

Referenced by OpenDDS::DCPS::DataWriterImpl::data_dropped().

00673 {
00674   DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
00675 
00676   if (DCPS_debug_level >= 2) {
00677     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
00678                           ACE_TEXT(" sample %X dropped_by_transport %d\n"),
00679                           sample, dropped_by_transport));
00680   }
00681 
00682   // If the transport initiates the data dropping, we need do same thing
00683   // as data_delivered. e.g. remove the sample from the internal list
00684   // and the instance list. We do not need acquire the lock here since
00685   // the data_delivered acquires the lock.
00686   if (dropped_by_transport) {
00687     this->data_delivered(sample);
00688     return;
00689   }
00690 
00691   //The data_dropped could be called from the thread initiating sample remove
00692   //which already hold the lock. In this case, it's not necessary to acquire
00693   //lock here. It also could be called from the transport thread in a delayed
00694   //notification, it's necessary to acquire lock here to protect the internal
00695   //structures in this class.
00696 
00697   ACE_GUARD (ACE_Recursive_Thread_Mutex,
00698     guard,
00699     this->lock_);
00700 
00701   // The dropped sample should be in the sending_data_ list.
00702   // Otherwise an exception will be raised.
00703   //
00704   // We are now been notified by transport, so we can
00705   // keep the sample from the sending_data_ list still in
00706   // sample list since we will send it.
00707 
00708   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00709 
00710   // If sample is on a SendStateDataSampleList it should be on the
00711   // sending_data_ list signifying it was given to the transport to
00712   // deliver and now the transport is signaling it has been dropped
00713 
00714   if (sending_data_.dequeue(sample)) {
00715     // else: The data_dropped is called as a result of remove_sample()
00716     // called from reenqueue_all() which supports the TRANSIENT_LOCAL
00717     // qos. The samples that are sending by transport are dropped from
00718     // transport and will be moved to the unsent list for resend.
00719     unsent_data_.enqueue_tail(sample);
00720 
00721   } else {
00722     //
00723     // If it is in sent_data_ or unsent_data there was a problem.
00724     //
00725     OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00726     send_lists.push_back(&sent_data_);
00727     send_lists.push_back(&unsent_data_);
00728     send_lists.push_back(&orphaned_to_transport_);
00729 
00730     const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00731 
00732     if (containing_list == &this->sent_data_) {
00733       ACE_ERROR((LM_WARNING,
00734                  ACE_TEXT("(%P|%t) WARNING: ")
00735                  ACE_TEXT("WriteDataContainer::data_dropped, ")
00736                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
00737                  ACE_TEXT("WAS IN sent_data_.\n")));
00738     } else if (containing_list == &this->unsent_data_) {
00739       ACE_ERROR((LM_WARNING,
00740                  ACE_TEXT("(%P|%t) WARNING: ")
00741                  ACE_TEXT("WriteDataContainer::data_dropped, ")
00742                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
00743                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
00744     } else {
00745       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
00746       //and inform transport of their release.  Transport will call data-dropped on the
00747       //elements as it processes the removal but they will already be gone from the send lists.
00748       if (stale->get_header().message_id_ != SAMPLE_DATA) {
00749         //this message was a control message so release it
00750         if (DCPS_debug_level > 9) {
00751           GuidConverter converter(publication_id_);
00752           ACE_DEBUG((LM_DEBUG,
00753                      ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
00754                      ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
00755                      this->domain_id_,
00756                      this->topic_name_,
00757                      OPENDDS_STRING(converter).c_str()));
00758         }
00759         writer_->controlTracker.message_dropped();
00760       }
00761       if (containing_list == &this->orphaned_to_transport_) {
00762         orphaned_to_transport_.dequeue(sample);
00763         release_buffer(stale);
00764         if (!pending_data())
00765           empty_condition_.broadcast();
00766       }
00767     }
00768 
00769     return;
00770   }
00771 
00772   this->wakeup_blocking_writers (stale);
00773 
00774   if (!pending_data())
00775     empty_condition_.broadcast();
00776 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::dispose ( DDS::InstanceHandle_t  handle,
DataSample *&  registered_sample,
bool  dup_registered_sample = true 
)

Delete the samples for the provided instance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.

This method returns error if the instance is not registered.

Definition at line 369 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::OfferedDeadlineWatchdog::cancel_timer(), OpenDDS::DCPS::find(), instances_, OpenDDS::DCPS::PublicationInstance::registered_sample_, remove_oldest_sample(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::PublicationInstance::samples_, OpenDDS::DCPS::InstanceDataSampleList::size(), OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

Referenced by OpenDDS::DCPS::DataWriterImpl::dispose(), OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister(), and unregister_all().

00372 {
00373   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00374                    guard,
00375                    this->lock_,
00376                    DDS::RETCODE_ERROR);
00377 
00378   PublicationInstance* instance = 0;
00379 
00380   int const find_attempt = find(instances_, instance_handle, instance);
00381 
00382   if (0 != find_attempt) {
00383     ACE_ERROR_RETURN((LM_ERROR,
00384                       ACE_TEXT("(%P|%t) ERROR: ")
00385                       ACE_TEXT("WriteDataContainer::dispose, ")
00386                       ACE_TEXT("The instance(handle=%X) ")
00387                       ACE_TEXT("is not registered yet.\n"),
00388                       instance_handle),
00389                      DDS::RETCODE_PRECONDITION_NOT_MET);
00390   }
00391 
00392   if (dup_registered_sample) {
00393     // The registered_sample is shallow copied.
00394     registered_sample = instance->registered_sample_->duplicate();
00395   }
00396 
00397   // Note: The DDS specification is unclear as to if samples in the process
00398   // of being sent should be removed or not.
00399   // The advantage of calling remove_sample() on them is that the
00400   // cached allocator memory for them is freed.  The disadvantage
00401   // is that the slow reader may see multiple disposes without
00402   // any write sample between them and hence not temporarily move into the
00403   // Alive state.
00404   // We have chosen to NOT remove the sending samples.
00405 
00406   InstanceDataSampleList& instance_list = instance->samples_;
00407 
00408   while (instance_list.size() > 0) {
00409     bool released = false;
00410     DDS::ReturnCode_t ret
00411     = remove_oldest_sample(instance_list, released);
00412 
00413     if (ret != DDS::RETCODE_OK) {
00414       return ret;
00415     }
00416   }
00417 
00418   if (this->writer_->watchdog_)
00419     this->writer_->watchdog_->cancel_timer(instance);
00420   return DDS::RETCODE_OK;
00421 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue ( DataSampleElement sample,
DDS::InstanceHandle_t  instance 
)

Enqueue the data sample in its instance thread. This method assumes there is an available space for the sample in the instance list.

Definition at line 178 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::PublicationInstance::cur_sample_tv_, OpenDDS::DCPS::InstanceDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(), get_handle_instance(), OpenDDS::DCPS::PublicationInstance::last_sample_tv_, DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, unsent_data_, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

Referenced by OpenDDS::DCPS::DataWriterImpl::write().

00181 {
00182   // Get the PublicationInstance pointer from InstanceHandle_t.
00183   PublicationInstance* const instance =
00184     get_handle_instance(instance_handle);
00185   // Extract the instance queue.
00186   InstanceDataSampleList& instance_list = instance->samples_;
00187 
00188   if (this->writer_->watchdog_) {
00189     instance->last_sample_tv_ = instance->cur_sample_tv_;
00190     instance->cur_sample_tv_ = ACE_OS::gettimeofday();
00191     this->writer_->watchdog_->execute(instance, false);
00192   }
00193 
00194   //
00195   // Enqueue to the next_send_sample_ thread of unsent_data_
00196   // will link samples with the next_sample/previous_sample and
00197   // also next_send_sample_.
00198   // This would save time when we actually send the data.
00199 
00200   unsent_data_.enqueue_tail(sample);
00201 
00202   //
00203   // Add this sample to the INSTANCE scope list.
00204   instance_list.enqueue_tail(sample);
00205 
00206   return DDS::RETCODE_OK;
00207 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue_control ( DataSampleElement control_sample  ) 

Definition at line 164 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), DDS::RETCODE_OK, and unsent_data_.

Referenced by OpenDDS::DCPS::DataWriterImpl::dispose(), OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister(), OpenDDS::DCPS::DataWriterImpl::register_instance_i(), and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().

00165 {
00166   // Enqueue to the next_send_sample_ thread of unsent_data_
00167   // will link samples with the next_sample/previous_sample and
00168   // also next_send_sample_.
00169   // This would save time when we actually send the data.
00170 
00171   unsent_data_.enqueue_tail(control_sample);
00172 
00173   return DDS::RETCODE_OK;
00174 }

PublicationInstance * OpenDDS::DCPS::WriteDataContainer::get_handle_instance ( DDS::InstanceHandle_t  handle  ) 

Todo:
remove/document this!

Definition at line 1276 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::find(), and instances_.

Referenced by OpenDDS::DCPS::DataWriterImpl::create_sample_data_message(), enqueue(), OpenDDS::DCPS::DataWriterImpl::get_handle_instance(), and obtain_buffer().

01277 {
01278   PublicationInstance* instance = 0;
01279 
01280   if (0 != find(instances_, handle, instance)) {
01281     ACE_ERROR((LM_ERROR,
01282                ACE_TEXT("(%P|%t) ERROR: ")
01283                ACE_TEXT("WriteDataContainer::get_handle_instance, ")
01284                ACE_TEXT("lookup for %d failed\n"),
01285                handle));
01286   }
01287 
01288   return instance;
01289 }

void OpenDDS::DCPS::WriteDataContainer::get_instance_handles ( InstanceHandleVec &  instance_handles  ) 

Definition at line 1436 of file WriteDataContainer.cpp.

References instances_.

Referenced by OpenDDS::DCPS::DataWriterImpl::get_instance_handles().

01437 {
01438   ACE_GUARD(ACE_Recursive_Thread_Mutex,
01439             guard,
01440             this->lock_);
01441   PublicationInstanceMapType::iterator it = instances_.begin();
01442 
01443   while (it != instances_.end()) {
01444     instance_handles.push_back(it->second->instance_handle_);
01445     ++it;
01446   }
01447 }

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::get_resend_data (  ) 

Obtain a list of data for resending. This is only used when TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list returned is not put on any SendStateDataSampleList.

Definition at line 504 of file WriteDataContainer.cpp.

References DBG_ENTRY_LVL, resend_data_, and OpenDDS::DCPS::SendStateDataSampleList::reset().

00505 {
00506   DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
00507 
00508   //
00509   // The samples in unsent_data are added to the sending_data
00510   // during enqueue.
00511   //
00512   SendStateDataSampleList list = this->resend_data_;
00513 
00514   //
00515   // Clear the unsent data list.
00516   //
00517   this->resend_data_.reset();
00518   //
00519   // Return the moved list.
00520   //
00521   return list;
00522 }

ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::get_unsent_data ( SendStateDataSampleList list  ) 

Obtain a list of data that has not yet been sent. The data on the list returned is moved from the internal unsent_data_ list to the internal sending_data_ list as part of this call. The entire list is linked via the DataSampleElement.next_send_sample_ link as well.

Definition at line 465 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::begin(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::reset(), sending_data_, transaction_id_, and unsent_data_.

00466 {
00467   DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
00468   //
00469   // The samples in unsent_data are added to the local datawriter
00470   // list and enqueued to the sending_data_ signifying they have
00471   // been passed to the transport to send in a transaction
00472   //
00473   list = this->unsent_data_;
00474 
00475   // Increment send counter for this send operation
00476   ++transaction_id_;
00477 
00478   // Mark all samples with current send counter
00479   SendStateDataSampleList::iterator iter = list.begin();
00480   while (iter != list.end()) {
00481     iter->set_transaction_id(this->transaction_id_);
00482     ++iter;
00483   }
00484 
00485   //
00486   // The unsent_data_ already linked with the
00487   // next_send_sample during enqueue.
00488   // Append the unsent_data_ to current sending_data_
00489   // list.
00490   sending_data_.enqueue_tail(list);
00491 
00492   //
00493   // Clear the unsent data list.
00494   //
00495   this->unsent_data_.reset();
00496 
00497   //
00498   // Return the moved list.
00499   //
00500   return transaction_id_;
00501 }

void OpenDDS::DCPS::WriteDataContainer::log_send_state_lists ( OPENDDS_STRING  description  )  [private]

Definition at line 1516 of file WriteDataContainer.cpp.

References instances_, num_all_samples(), orphaned_to_transport_, sending_data_, sent_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.

Referenced by wait_pending().

01517 {
01518   ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
01519              description.c_str(),
01520              unsent_data_.size(),
01521              sending_data_.size(),
01522              sent_data_.size(),
01523              orphaned_to_transport_.size(),
01524              num_all_samples(),
01525              instances_.size()));
01526 }

size_t OpenDDS::DCPS::WriteDataContainer::num_all_samples (  ) 

Return the number of samples for all instances.

Definition at line 445 of file WriteDataContainer.cpp.

References instances_.

Referenced by log_send_state_lists().

00446 {
00447   size_t size = 0;
00448 
00449   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00450                    guard,
00451                    this->lock_,
00452                    0);
00453 
00454   for (PublicationInstanceMapType::iterator iter = instances_.begin();
00455        iter != instances_.end();
00456        ++iter)
00457   {
00458     size += iter->second->samples_.size();
00459   }
00460 
00461   return size;
00462 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::num_samples ( DDS::InstanceHandle_t  handle,
size_t &  size 
)

Return the number of samples for the given instance.

Definition at line 424 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::find(), instances_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::PublicationInstance::samples_, and OpenDDS::DCPS::InstanceDataSampleList::size().

Referenced by OpenDDS::DCPS::DataWriterImpl::num_samples().

00426 {
00427   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00428                    guard,
00429                    this->lock_,
00430                    DDS::RETCODE_ERROR);
00431   PublicationInstance* instance = 0;
00432 
00433   int const find_attempt = find(instances_, handle, instance);
00434 
00435   if (0 != find_attempt) {
00436     return DDS::RETCODE_ERROR;
00437 
00438   } else {
00439     size = instance->samples_.size();
00440     return DDS::RETCODE_OK;
00441   }
00442 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer ( DataSampleElement *&  element,
DDS::InstanceHandle_t  handle 
)

Allocate a DataSampleElement object and check the space availability for newly allocated element according to qos settings. For the blocking write case, if resource limits or history qos limits are reached, then it blocks for max blocking time for a previous sample to be delivered or dropped by the transport. In non-blocking write case, if resource limits or history qos limits are reached, will attempt to remove oldest samples (forcing the transport to drop samples if necessary) to make space. If there are several threads waiting then the first one in the waiting list can enqueue, others continue waiting.

Definition at line 1020 of file WriteDataContainer.cpp.

References condition_, data_holder_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, depth_, OpenDDS::DCPS::duration_to_absolute_time_value(), OpenDDS::DCPS::WriterDataSampleList::enqueue_tail(), get_handle_instance(), instances_, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), max_blocking_time_, max_num_samples_, publication_id_, release_buffer(), DDS::RELIABLE_RELIABILITY_QOS, remove_oldest_historical_sample(), remove_oldest_sample(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sample_list_element_allocator_, OpenDDS::DCPS::PublicationInstance::samples_, shutdown_, transport_customized_element_allocator_, transport_send_element_allocator_, and waiting_on_release_.

Referenced by OpenDDS::DCPS::DataWriterImpl::write().

01022 {
01023   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
01024 
01025   PublicationInstance* instance = get_handle_instance(handle);
01026 
01027   ACE_NEW_MALLOC_RETURN(
01028     element,
01029     static_cast<DataSampleElement*>(
01030       sample_list_element_allocator_.malloc(
01031         sizeof(DataSampleElement))),
01032     DataSampleElement(publication_id_,
01033                           this->writer_,
01034                           instance,
01035                           &transport_send_element_allocator_,
01036                           &transport_customized_element_allocator_),
01037     DDS::RETCODE_ERROR);
01038 
01039   // Extract the current instance queue.
01040   InstanceDataSampleList& instance_list = instance->samples_;
01041   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01042 
01043   bool need_to_set_abs_timeout = true;
01044   ACE_Time_Value abs_timeout;
01045 
01046   //depth_ covers both HistoryQosPolicy kind and depth as well as
01047   //ResourceLimitsQosPolicy max_samples_per_instance
01048   //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and
01049   //max_instances and max_instances * depth
01050   while ((instance_list.size() >= depth_) ||
01051          ((this->max_num_samples_ > 0) &&
01052          ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
01053 
01054     //Need to either remove stale samples or wait for space to become available
01055     if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01056       //Remove historical samples already sent
01057       bool removed_historical = false;
01058       if (DCPS_debug_level >= 2) {
01059         ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01060                               ACE_TEXT(" instance %d attempting to remove")
01061                               ACE_TEXT(" its oldest historical sample\n"),
01062                               handle));
01063       }
01064 
01065       ret = this->remove_oldest_historical_sample(instance_list, removed_historical);
01066 
01067       //else try to remove historical samples from other instances
01068       if (ret == DDS::RETCODE_OK && !removed_historical) {
01069         if (DCPS_debug_level >= 2) {
01070           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01071                                 ACE_TEXT(" instance %d attempting to remove")
01072                                 ACE_TEXT(" oldest historical sample from any full instances\n"),
01073                                 handle));
01074         }
01075         PublicationInstanceMapType::iterator it = instances_.begin();
01076 
01077         while (!removed_historical && it != instances_.end() && ret == DDS::RETCODE_OK) {
01078           if(it->second->samples_.size() >= depth_) {
01079             ret = this->remove_oldest_historical_sample(it->second->samples_, removed_historical);
01080           }
01081           ++it;
01082         }
01083       }
01084 
01085       if (ret == DDS::RETCODE_OK && !removed_historical) {
01086         //Reliable writers can wait
01087         if (need_to_set_abs_timeout) {
01088           abs_timeout = duration_to_absolute_time_value (max_blocking_time_);
01089           need_to_set_abs_timeout = false;
01090         }
01091         if (!shutdown_ && ACE_OS::gettimeofday() < abs_timeout) {
01092           if (DCPS_debug_level >= 2) {
01093             ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01094                                   ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
01095                                   handle));
01096           }
01097 
01098           waiting_on_release_ = true;
01099           // lock is released while waiting and acquired before returning
01100           // from wait.
01101           int const wait_result = condition_.wait(&abs_timeout);
01102 
01103           if (wait_result != 0) {
01104             if (errno == ETIME) {
01105               if (DCPS_debug_level >= 2) {
01106                 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01107                                       ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
01108                                       handle));
01109               }
01110               ret = DDS::RETCODE_TIMEOUT;
01111             } else {
01112               ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: WriteDataContainer::obtain_buffer condition_.wait()")
01113                                     ACE_TEXT("%p\n")));
01114               ret = DDS::RETCODE_ERROR;
01115             }
01116           }
01117         } else {
01118           //either shutdown has been signaled or max_blocking_time
01119           //has surpassed so treat as timeout
01120           ret = DDS::RETCODE_TIMEOUT;
01121         }
01122       }
01123 
01124     } else {
01125       //BEST EFFORT
01126       bool oldest_released = false;
01127 
01128       //try to remove stale samples from this instance
01129       // The remove_oldest_sample() method removes the oldest sample
01130       // from instance list and removes it from the internal lists.
01131       if (instance_list.size() > 0) {
01132         if (DCPS_debug_level >= 2) {
01133           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01134                                 ACE_TEXT(" instance %d attempting to remove")
01135                                 ACE_TEXT(" its oldest sample\n"),
01136                                 handle));
01137         }
01138         ret = this->remove_oldest_sample(instance_list, oldest_released);
01139       }
01140       //else try to remove stale samples from other instances which are full
01141       if (ret == DDS::RETCODE_OK && !oldest_released) {
01142         if (DCPS_debug_level >= 2) {
01143           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01144                                 ACE_TEXT(" instance %d attempting to remove")
01145                                 ACE_TEXT(" oldest sample from any full instances\n"),
01146                                 handle));
01147         }
01148         PublicationInstanceMapType::iterator it = instances_.begin();
01149 
01150         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01151           if(it->second->samples_.size() >= depth_) {
01152             ret = this->remove_oldest_sample(it->second->samples_, oldest_released);
01153           }
01154           ++it;
01155         }
01156       }
01157       //else try to remove stale samples from other non-full instances
01158       if (ret == DDS::RETCODE_OK && !oldest_released) {
01159         if (DCPS_debug_level >= 2) {
01160           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01161                                 ACE_TEXT(" instance %d attempting to remove")
01162                                 ACE_TEXT(" oldest sample from any instance with samples currently\n"),
01163                                 handle));
01164         }
01165         PublicationInstanceMapType::iterator it = instances_.begin();
01166 
01167         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01168           if(it->second->samples_.size() > 0) {
01169             ret = this->remove_oldest_sample(it->second->samples_, oldest_released);
01170           }
01171           ++it;
01172         }
01173       }
01174       if (!oldest_released) {
01175         //This means that no instances have samples to remove and yet
01176         //still hitting resource limits.
01177         ACE_ERROR((LM_ERROR,
01178                    ACE_TEXT("(%P|%t) ERROR: ")
01179                    ACE_TEXT("WriteDataContainer::obtain_buffer, ")
01180                    ACE_TEXT("hitting resource limits with no samples to remove\n")));
01181         ret = DDS::RETCODE_ERROR;
01182       }
01183     }  //END BEST EFFORT
01184 
01185     if (ret != DDS::RETCODE_OK) {
01186       if (DCPS_debug_level >= 2) {
01187         ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01188                               ACE_TEXT(" instance %d could not obtain buffer for sample")
01189                               ACE_TEXT(" releasing allotted sample and returning\n"),
01190                               handle));
01191       }
01192       this->release_buffer(element);
01193       return ret;
01194     }
01195   }  //END WHILE
01196 
01197   data_holder_.enqueue_tail(element);
01198 
01199   return ret;
01200 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control ( DataSampleElement *&  element  ) 

Definition at line 1000 of file WriteDataContainer.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), publication_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, transport_customized_element_allocator_, and transport_send_element_allocator_.

Referenced by OpenDDS::DCPS::DataWriterImpl::dispose(), OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister(), OpenDDS::DCPS::DataWriterImpl::register_instance_i(), and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().

01001 {
01002   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
01003 
01004   ACE_NEW_MALLOC_RETURN(
01005     element,
01006     static_cast<DataSampleElement*>(
01007       sample_list_element_allocator_.malloc(
01008         sizeof(DataSampleElement))),
01009     DataSampleElement(publication_id_,
01010                           this->writer_,
01011                           0,
01012                           &transport_send_element_allocator_,
01013                           &transport_customized_element_allocator_),
01014     DDS::RETCODE_ERROR);
01015 
01016   return DDS::RETCODE_OK;
01017 }

typedef OpenDDS::DCPS::WriteDataContainer::OPENDDS_VECTOR ( DDS::InstanceHandle_t   ) 

Returns a vector of handles for the instances registered for this data writer.

Referenced by data_delivered(), data_dropped(), remove_oldest_historical_sample(), and remove_oldest_sample().

WriteDataContainer& OpenDDS::DCPS::WriteDataContainer::operator= ( WriteDataContainer const &   )  [private]

bool OpenDDS::DCPS::WriteDataContainer::pending_data (  ) 

Returns if pending data exists. This includes sending, and unsent data.

Definition at line 525 of file WriteDataContainer.cpp.

References orphaned_to_transport_, sending_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.

Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().

00526 {
00527   return this->sending_data_.size() != 0
00528          || this->orphaned_to_transport_.size() != 0
00529          || this->unsent_data_.size() != 0;
00530 }

bool OpenDDS::DCPS::WriteDataContainer::persist_data (  ) 

Copy sent data to data DURABILITY cache.

Definition at line 1333 of file WriteDataContainer.cpp.

References durability_cache_, and OpenDDS::DCPS::DataDurabilityCache::insert().

Referenced by OpenDDS::DCPS::DataWriterImpl::persist_data().

01334 {
01335   bool result = true;
01336 
01337   // ------------------------------------------------------------
01338   // Transfer sent data to data DURABILITY cache.
01339   // ------------------------------------------------------------
01340   if (this->durability_cache_) {
01341     // A data durability cache is available for TRANSIENT or
01342     // PERSISTENT data durability.  Cache the data samples.
01343 
01344     //
01345     //  We only cache data that is not still in use outside of
01346     //  this instance of WriteDataContainer
01347     //  (only cache samples in sent_data_ meaning transport has delivered).
01348     bool const inserted =
01349       this->durability_cache_->insert(this->domain_id_,
01350                                       this->topic_name_,
01351                                       this->type_name_,
01352                                       this->sent_data_,
01353                                       this->durability_service_
01354                                      );
01355 
01356     result = inserted;
01357 
01358     if (!inserted)
01359       ACE_ERROR((LM_ERROR,
01360                  ACE_TEXT("(%P|%t) ERROR: ")
01361                  ACE_TEXT("WriteDataContainer::persist_data, ")
01362                  ACE_TEXT("failed to make data durable for ")
01363                  ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
01364                  this->domain_id_,
01365                  this->topic_name_,
01366                  this->type_name_));
01367   }
01368 
01369   return result;
01370 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::reenqueue_all ( const RepoId reader_id,
const DDS::LifespanQosPolicy lifespan,
const OPENDDS_STRING &  filterClassName,
const FilterEvaluator eval,
const DDS::StringSeq params 
)

Create a resend list with the copies of all current "sending" and "sent" samples. The samples will be sent to the subscriber specified.

Definition at line 210 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OPENDDS_STRING, publication_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sending_data_, sent_data_, and OpenDDS::DCPS::SendStateDataSampleList::size().

Referenced by OpenDDS::DCPS::DataWriterImpl::association_complete_i().

00219 {
00220   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00221                    guard,
00222                    this->lock_,
00223                    DDS::RETCODE_ERROR);
00224 
00225   // Make a copy of sending_data_ and sent_data_;
00226   if (sent_data_.size() > 0) {
00227     this->copy_and_append(this->resend_data_,
00228                           sent_data_,
00229                           reader_id,
00230                           lifespan
00231 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00232                           , filterClassName, eval, expression_params
00233 #endif
00234                           );
00235 
00236   if (sending_data_.size() > 0) {
00237     this->copy_and_append(this->resend_data_,
00238                           sending_data_,
00239                           reader_id,
00240                           lifespan
00241 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00242                           , filterClassName, eval, expression_params
00243 #endif
00244                           );
00245   }
00246 
00247     if (DCPS_debug_level > 9) {
00248       GuidConverter converter(publication_id_);
00249       GuidConverter reader(reader_id);
00250       ACE_DEBUG((LM_DEBUG,
00251                  ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
00252                  ACE_TEXT("domain %d topic %C publication %C copying HISTORY to resend to %C.\n"),
00253                  this->domain_id_,
00254                  this->topic_name_,
00255                  OPENDDS_STRING(converter).c_str(),
00256                  OPENDDS_STRING(reader).c_str()));
00257     }
00258   }
00259 
00260   return DDS::RETCODE_OK;
00261 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::register_instance ( DDS::InstanceHandle_t instance_handle,
DataSample *&  registered_sample 
)

Dynamically allocate a PublicationInstance object and add to the instances_ list.

Note:
: The registered_sample is an input and output parameter. A shallow copy of the sample data will be given to datawriter as part of the control message.

Definition at line 264 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::bind(), OpenDDS::DCPS::find(), OpenDDS::DCPS::DataWriterImpl::get_next_handle(), DDS::HANDLE_NIL, instances_, max_num_instances_, OpenDDS::DCPS::PublicationInstance::registered_sample_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, OpenDDS::DCPS::OfferedDeadlineWatchdog::schedule_timer(), OpenDDS::DCPS::PublicationInstance::unregistered_, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

Referenced by OpenDDS::DCPS::DataWriterImpl::register_instance_i().

00267 {
00268   PublicationInstance* instance = 0;
00269   auto_ptr<PublicationInstance> safe_instance;
00270 
00271   if (instance_handle == DDS::HANDLE_NIL) {
00272     if (max_num_instances_ > 0
00273         && max_num_instances_ <= (CORBA::Long) instances_.size()) {
00274       return DDS::RETCODE_OUT_OF_RESOURCES;
00275     }
00276 
00277     // registered the instance for the first time.
00278     ACE_NEW_RETURN(instance,
00279                    PublicationInstance(registered_sample),
00280                    DDS::RETCODE_ERROR);
00281 
00282     ACE_auto_ptr_reset(safe_instance, instance);
00283 
00284     instance_handle = this->writer_->get_next_handle();
00285 
00286     int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
00287 
00288     if (0 != insert_attempt) {
00289       ACE_ERROR((LM_ERROR,
00290                  ACE_TEXT("(%P|%t) ERROR: ")
00291                  ACE_TEXT("WriteDataContainer::register_instance, ")
00292                  ACE_TEXT("failed to insert instance handle=%X\n"),
00293                  instance));
00294       return DDS::RETCODE_ERROR;
00295     } // if (0 != insert_attempt)
00296 
00297     instance->instance_handle_ = instance_handle;
00298 
00299   } else {
00300     int const find_attempt = find(instances_, instance_handle, instance);
00301     ACE_auto_ptr_reset(safe_instance, instance);
00302 
00303     if (0 != find_attempt) {
00304       ACE_ERROR((LM_ERROR,
00305                  ACE_TEXT("(%P|%t) ERROR: ")
00306                  ACE_TEXT("WriteDataContainer::register_instance, ")
00307                  ACE_TEXT("The provided instance handle=%X is not a valid")
00308                  ACE_TEXT("handle.\n"),
00309                  instance_handle));
00310 
00311       return DDS::RETCODE_ERROR;
00312     } // if (0 != find_attempt)
00313 
00314     // don't need this - the PublicationInstances already has a sample.
00315     registered_sample->release();
00316 
00317     instance->unregistered_ = false;
00318   }
00319 
00320   // The registered_sample is shallow copied.
00321   registered_sample = instance->registered_sample_->duplicate();
00322 
00323   if (this->writer_->watchdog_) {
00324     this->writer_->watchdog_->schedule_timer(instance);
00325   }
00326 
00327   safe_instance.release();  // Safe to relinquish ownership.
00328 
00329   return DDS::RETCODE_OK;
00330 }

void OpenDDS::DCPS::WriteDataContainer::release_buffer ( DataSampleElement element  ) 

Release the memory previously allocated. This method is corresponding to the obtain_buffer method. If the memory is allocated by some allocator then the memory needs to be released to the allocator.

Definition at line 1203 of file WriteDataContainer.cpp.

References data_holder_, OpenDDS::DCPS::WriterDataSampleList::dequeue(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::SAMPLE_DATA, and sample_list_element_allocator_.

Referenced by data_delivered(), data_dropped(), obtain_buffer(), remove_oldest_historical_sample(), and remove_oldest_sample().

01204 {
01205   if (element->get_header().message_id_ == SAMPLE_DATA)
01206     data_holder_.dequeue(element);
01207   // Release the memory to the allocator.
01208   ACE_DES_FREE(element,
01209                sample_list_element_allocator_.free,
01210                DataSampleElement);
01211 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::remove_oldest_historical_sample ( InstanceDataSampleList instance_list,
bool &  released 
) [private]

Remove the sample (head) from the instance list if it is only being used for history purposes (located on sent_data_ list). This method also updates the internal lists to reflect the change. The "released" boolean value indicates whether a sample was released.

Definition at line 779 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue_head(), OpenDDS::DCPS::InstanceDataSampleList::head(), OPENDDS_STRING, OPENDDS_VECTOR(), publication_id_, release_buffer(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), and sent_data_.

Referenced by obtain_buffer().

00782 {
00783   DataSampleElement* stale = 0;
00784 
00785   if (instance_list.head() != 0) {
00786     stale = instance_list.head();
00787   } else {
00788     //it is fine for the instance_list to be empty
00789     return DDS::RETCODE_OK;
00790   }
00791 
00792   // Only interested in trying to remove historical samples.
00793   // If the sample is not on the sent_data_ list, simply can't
00794   // be removed  -- not an error
00795 
00796   OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00797   send_lists.push_back(&sent_data_);
00798 
00799   const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00800 
00801   // Identify if the stale data is in the sent_data_ (i.e. HISTORY)
00802   // and can therefore be released
00803 
00804   bool result = false;
00805 
00806   if (containing_list == &this->sent_data_) {
00807     // No one is using the historical data sample, so we can release it back to
00808     // its allocator.
00809     // First remove the oldest sample from the instance list.
00810     //
00811     if (instance_list.dequeue_head(stale) == false) {
00812       ACE_ERROR_RETURN((LM_ERROR,
00813                         ACE_TEXT("(%P|%t) ERROR: ")
00814                         ACE_TEXT("WriteDataContainer::remove_oldest_historical_sample, ")
00815                         ACE_TEXT("dequeue_head_next_sample failed\n")),
00816                        DDS::RETCODE_ERROR);
00817     }
00818 
00819     // Now attempt to remove the sample from the internal list and release its buffer
00820     result = this->sent_data_.dequeue(stale) != 0;
00821     release_buffer(stale);
00822     released = true;
00823 
00824     if (DCPS_debug_level > 9) {
00825       GuidConverter converter(publication_id_);
00826       ACE_DEBUG((LM_DEBUG,
00827                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_historical_sample: ")
00828                  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00829                  this->domain_id_,
00830                  this->topic_name_,
00831                  OPENDDS_STRING(converter).c_str()));
00832     }
00833 
00834   } else {
00835     //Sample is not a historical sample
00836     return DDS::RETCODE_OK;
00837   }
00838 
00839   if (result == false) {
00840     ACE_ERROR_RETURN((LM_ERROR,
00841                       ACE_TEXT("(%P|%t) ERROR: ")
00842                       ACE_TEXT("WriteDataContainer::remove_oldest_historical_sample, ")
00843                       ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00844                      DDS::RETCODE_ERROR);
00845 
00846   }
00847 
00848   return DDS::RETCODE_OK;
00849 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample ( InstanceDataSampleList instance_list,
bool &  released 
) [private]

Remove the oldest sample (head) from the instance history list. This method also updates the internal lists to reflect the change. If the sample is in the unsent_data_ or sent_data_ list then it will be released. If the sample is in the sending_data_ list then the transport will be notified to release the sample, then the sample will be released. Otherwise an error is returned. The "released" boolean value indicates whether the sample is released.

Definition at line 853 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue_head(), empty_condition_, OPENDDS_STRING, OPENDDS_VECTOR(), orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, and unsent_data_.

Referenced by dispose(), and obtain_buffer().

00856 {
00857   DataSampleElement* stale = 0;
00858 
00859   //
00860   // Remove the oldest sample from the instance list.
00861   //
00862   if (instance_list.dequeue_head(stale) == false) {
00863     ACE_ERROR_RETURN((LM_ERROR,
00864                       ACE_TEXT("(%P|%t) ERROR: ")
00865                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00866                       ACE_TEXT("dequeue_head_next_sample failed\n")),
00867                      DDS::RETCODE_ERROR);
00868   }
00869 
00870   //
00871   // Remove the stale data from the next_writer_sample_ list.  The
00872   // sending_data_/next_send_sample_ list is not managed within the
00873   // container, it is only used external to the container and does
00874   // not need to be managed internally.
00875   //
00876   // The next_writer_sample_ link is being used in one of the sent_data_,
00877   // sending_data_, or unsent_data lists.  Removal from the doubly
00878   // linked list needs to repair the list only when the stale sample
00879   // is either the head or tail of the list.
00880   //
00881 
00882   //
00883   // Locate the head of the list that the stale data is in.
00884   //
00885   OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00886   send_lists.push_back(&sending_data_);
00887   send_lists.push_back(&sent_data_);
00888   send_lists.push_back(&unsent_data_);
00889   send_lists.push_back(&orphaned_to_transport_);
00890 
00891   const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00892 
00893 
00894   //
00895   // Identify the list that the stale data is in.
00896   // The stale data should be in one of the sent_data_, sending_data_
00897   // or unsent_data_. It should not be in released_data_ list since
00898   // this function is the only place a sample is moved from
00899   // sending_data_ to released_data_ list.
00900 
00901   // Remove the element from the internal list.
00902   bool result = false;
00903 
00904   if (containing_list == &this->sending_data_) {
00905     if (DCPS_debug_level > 2) {
00906       ACE_ERROR((LM_WARNING,
00907                  ACE_TEXT("(%P|%t) WARNING: ")
00908                  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00909                  ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
00910     }
00911 
00912     // This means transport is still using the sample that needs to
00913     // be released currently so notify transport that sample is being removed.
00914 
00915     if (this->writer_->remove_sample(stale)) {
00916       if (this->sent_data_.dequeue(stale)) {
00917         release_buffer(stale);
00918         result = true;
00919       }
00920 
00921     } else {
00922       if (this->sending_data_.dequeue(stale)) {
00923         this->orphaned_to_transport_.enqueue_tail(stale);
00924       } else if (this->sent_data_.dequeue(stale)) {
00925         release_buffer(stale);
00926         result = true;
00927       }
00928       result = true;
00929     }
00930     released = true;
00931 
00932   } else if (containing_list == &this->sent_data_) {
00933     // No one is using the data sample, so we can release it back to
00934     // its allocator.
00935     //
00936     result = this->sent_data_.dequeue(stale) != 0;
00937     release_buffer(stale);
00938     released = true;
00939 
00940     if (DCPS_debug_level > 9) {
00941       GuidConverter converter(publication_id_);
00942       ACE_DEBUG((LM_DEBUG,
00943                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00944                  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00945                  this->domain_id_,
00946                  this->topic_name_,
00947                  OPENDDS_STRING(converter).c_str()));
00948     }
00949 
00950   } else if (containing_list == &this->unsent_data_) {
00951     //
00952     // No one is using the data sample, so we can release it back to
00953     // its allocator.
00954     //
00955     result = this->unsent_data_.dequeue(stale) != 0;
00956     release_buffer(stale);
00957     released = true;
00958 
00959     if (DCPS_debug_level > 9) {
00960       GuidConverter converter(publication_id_);
00961       ACE_DEBUG((LM_DEBUG,
00962                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00963                  ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
00964                  this->domain_id_,
00965                  this->topic_name_,
00966                  OPENDDS_STRING(converter).c_str()));
00967     }
00968   } else {
00969     ACE_ERROR_RETURN((LM_ERROR,
00970                       ACE_TEXT("(%P|%t) ERROR: ")
00971                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00972                       ACE_TEXT("The oldest sample is not in any internal list.\n")),
00973                      DDS::RETCODE_ERROR);
00974   }
00975 
00976   // Signal if there is no pending data.
00977   {
00978     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00979                      guard,
00980                      this->lock_,
00981                      DDS::RETCODE_ERROR);
00982 
00983     if (!pending_data())
00984       empty_condition_.broadcast();
00985   }
00986 
00987   if (result == false) {
00988     ACE_ERROR_RETURN((LM_ERROR,
00989                       ACE_TEXT("(%P|%t) ERROR: ")
00990                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00991                       ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00992                      DDS::RETCODE_ERROR);
00993 
00994   }
00995 
00996   return DDS::RETCODE_OK;
00997 }

void OpenDDS::DCPS::WriteDataContainer::reschedule_deadline (  ) 

Reset time interval for each instance.

Definition at line 1373 of file WriteDataContainer.cpp.

References instances_.

Referenced by OpenDDS::DCPS::DataWriterImpl::reschedule_deadline().

01374 {
01375   for (PublicationInstanceMapType::iterator iter = instances_.begin();
01376        iter != instances_.end();
01377        ++iter) {
01378     if (iter->second->deadline_timer_id_ != -1) {
01379       if (this->writer_->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
01380         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WriteDataContainer::reschedule_deadline %p\n")
01381                    ACE_TEXT("reset_timer_interval")));
01382       }
01383     }
01384   }
01385 }

bool OpenDDS::DCPS::WriteDataContainer::sequence_acknowledged ( const SequenceNumber  sequence  ) 

Definition at line 1485 of file WriteDataContainer.cpp.

References acked_sequences_, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().

Referenced by wait_ack_of_seq().

01486 {
01487   if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01488     //return true here so that wait_for_acknowledgements doesn't block
01489     return true;
01490   }
01491 
01492   SequenceNumber acked = acked_sequences_.cumulative_ack();
01493   if (DCPS_debug_level >= 10) {
01494     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged ")
01495                           ACE_TEXT("- cumulative ack is currently: %q\n"), acked.getValue()));
01496   }
01497   if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
01498     //if acked_sequences_ is empty or its cumulative_ack is lower than
01499     //the requests sequence, return false
01500     return false;
01501   }
01502   return true;
01503 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::unregister ( DDS::InstanceHandle_t  handle,
DataSample *&  registered_sample,
bool  dup_registered_sample = true 
)

Remove the provided instance from the instances_ list. The registered sample data will be released upon the deletion of the PublicationInstance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.

This method returns error if the instance is not registered.

Definition at line 333 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::OfferedDeadlineWatchdog::cancel_timer(), OpenDDS::DCPS::find(), instances_, OpenDDS::DCPS::PublicationInstance::registered_sample_, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::DataWriterImpl::unregistered(), OpenDDS::DCPS::PublicationInstance::unregistered_, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

Referenced by OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister(), unregister_all(), and OpenDDS::DCPS::DataWriterImpl::unregister_instance_i().

00337 {
00338   PublicationInstance* instance = 0;
00339 
00340   int const find_attempt = find(instances_, instance_handle, instance);
00341 
00342   if (0 != find_attempt) {
00343     ACE_ERROR_RETURN((LM_ERROR,
00344                       ACE_TEXT("(%P|%t) ERROR: ")
00345                       ACE_TEXT("WriteDataContainer::unregister, ")
00346                       ACE_TEXT("The instance(handle=%X) ")
00347                       ACE_TEXT("is not registered yet.\n"),
00348                       instance_handle),
00349                      DDS::RETCODE_PRECONDITION_NOT_MET);
00350   } // if (0 != find_attempt)
00351 
00352   instance->unregistered_ = true;
00353 
00354   if (dup_registered_sample) {
00355     // The registered_sample is shallow copied.
00356     registered_sample = instance->registered_sample_->duplicate();
00357   }
00358 
00359   // Unregister the instance with typed DataWriter.
00360   this->writer_->unregistered(instance_handle);
00361 
00362   if (this->writer_->watchdog_)
00363     this->writer_->watchdog_->cancel_timer(instance);
00364 
00365   return DDS::RETCODE_OK;
00366 }

void OpenDDS::DCPS::WriteDataContainer::unregister_all (  ) 

Unregister all instances managed by this data containers.

Definition at line 1214 of file WriteDataContainer.cpp.

References condition_, DBG_ENTRY_LVL, dispose(), instances_, DDS::RETCODE_OK, shutdown_, OpenDDS::DCPS::unbind(), unregister(), and waiting_on_release_.

Referenced by OpenDDS::DCPS::DataWriterImpl::unregister_all().

01215 {
01216   DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
01217   shutdown_ = true;
01218 
01219   {
01220     //The internal list needs protection since this call may result from the
01221     //the delete_datawriter call which does not acquire the lock in advance.
01222     ACE_GUARD(ACE_Recursive_Thread_Mutex,
01223               guard,
01224               this->lock_);
01225     // Tell transport remove all control messages currently
01226     // transport is processing.
01227     (void) this->writer_->remove_all_msgs();
01228 
01229     // Broadcast to wake up all waiting threads.
01230     if (waiting_on_release_) {
01231       condition_.broadcast();
01232     }
01233   }
01234   DDS::ReturnCode_t ret;
01235   DataSample* registered_sample;
01236   PublicationInstanceMapType::iterator it = instances_.begin();
01237 
01238   while (it != instances_.end()) {
01239     // Release the instance data.
01240     ret = dispose(it->first, registered_sample, false);
01241 
01242     if (ret != DDS::RETCODE_OK) {
01243       ACE_ERROR((LM_ERROR,
01244                  ACE_TEXT("(%P|%t) ERROR: ")
01245                  ACE_TEXT("WriteDataContainer::unregister_all, ")
01246                  ACE_TEXT("dispose instance %X failed\n"),
01247                  it->first));
01248     }
01249     // Mark the instance unregistered.
01250     ret = unregister(it->first, registered_sample, false);
01251 
01252     if (ret != DDS::RETCODE_OK) {
01253       ACE_ERROR((LM_ERROR,
01254                  ACE_TEXT("(%P|%t) ERROR: ")
01255                  ACE_TEXT("WriteDataContainer::unregister_all, ")
01256                  ACE_TEXT("unregister instance %X failed\n"),
01257                  it->first));
01258     }
01259 
01260     PublicationInstance* instance = it->second;
01261 
01262     delete instance;
01263 
01264     // Get the next iterator before erase the instance handle.
01265     PublicationInstanceMapType::iterator it_next = it;
01266     ++it_next;
01267     // Remove the instance from the instance list.
01268     unbind(instances_, it->first);
01269     it = it_next;
01270   }
01271 
01272   ACE_UNUSED_ARG(registered_sample);
01273 }

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::wait_ack_of_seq ( const ACE_Time_Value &  abs_deadline,
const SequenceNumber sequence 
)

Definition at line 1450 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sequence_acknowledged(), and wfa_condition_.

Referenced by OpenDDS::DCPS::DataWriterImpl::wait_for_specific_ack().

01451 {
01452   ACE_Time_Value deadline(abs_deadline);
01453   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01454   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->wfa_lock_, DDS::RETCODE_ERROR);
01455 
01456   while (ACE_OS::gettimeofday() < deadline) {
01457 
01458     if (!sequence_acknowledged(sequence)) {
01459       // lock is released while waiting and acquired before returning
01460       // from wait.
01461       int const wait_result = wfa_condition_.wait(&deadline);
01462 
01463       if (wait_result != 0) {
01464         if (errno == ETIME) {
01465           if (DCPS_debug_level >= 2) {
01466             ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
01467                                   ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
01468                                   sequence.getValue()));
01469           }
01470           ret = DDS::RETCODE_TIMEOUT;
01471         } else {
01472           ret = DDS::RETCODE_ERROR;
01473         }
01474       }
01475     } else {
01476       ret = DDS::RETCODE_OK;
01477       break;
01478     }
01479   }
01480 
01481   return ret;
01482 }

void OpenDDS::DCPS::WriteDataContainer::wait_pending (  ) 

Block until pending samples have either been delivered or dropped.

Definition at line 1388 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DCPS_debug_level, empty_condition_, log_send_state_lists(), pending_data(), TheServiceParticipant, and OpenDDS::DCPS::MessageTracker::timestamp().

Referenced by OpenDDS::DCPS::DataWriterImpl::wait_pending().

01389 {
01390   ACE_Time_Value pending_timeout =
01391     TheServiceParticipant->pending_timeout();
01392 
01393   ACE_Time_Value* pTimeout = 0;
01394 
01395   if (pending_timeout != ACE_Time_Value::zero) {
01396     pTimeout = &pending_timeout;
01397     pending_timeout += ACE_OS::gettimeofday();
01398   }
01399 
01400   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01401   const bool report = DCPS_debug_level > 0 && pending_data();
01402   if (report) {
01403     ACE_TCHAR date_time[50];
01404     ACE_TCHAR* const time =
01405       MessageTracker::timestamp(pending_timeout,
01406                                 date_time,
01407                                 50);
01408     ACE_DEBUG((LM_DEBUG,
01409                ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending timeout ")
01410                ACE_TEXT("at %s\n"),
01411                (pending_timeout == ACE_Time_Value::zero ?
01412                   ACE_TEXT("(no timeout)") : time)));
01413   }
01414   while (true) {
01415 
01416     if (!pending_data())
01417       break;
01418 
01419     if (empty_condition_.wait(pTimeout) == -1 && pending_data()) {
01420       if (DCPS_debug_level) {
01421         ACE_DEBUG((LM_INFO,
01422                    ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending %p\n"),
01423                    ACE_TEXT("Timed out waiting for messages to be transported")));
01424         this->log_send_state_lists("WriteDataContainer::wait_pending - wait failed: ");
01425       }
01426       break;
01427     }
01428   }
01429   if (report) {
01430     ACE_DEBUG((LM_DEBUG,
01431                "%T (%P|%t) WriteDataContainer::wait_pending done\n"));
01432   }
01433 }

void OpenDDS::DCPS::WriteDataContainer::wakeup_blocking_writers ( DataSampleElement stale  )  [private]

Called when data has been dropped or delivered and any blocked writers should be notified

Definition at line 1506 of file WriteDataContainer.cpp.

References condition_, and waiting_on_release_.

Referenced by data_delivered(), and data_dropped().

01507 {
01508   if (stale && waiting_on_release_) {
01509     waiting_on_release_ = false;
01510 
01511     condition_.broadcast();
01512   }
01513 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Definition at line 350 of file WriteDataContainer.h.

friend class DataWriterImpl [friend]

Definition at line 119 of file WriteDataContainer.h.


Member Data Documentation

DisjointSequence OpenDDS::DCPS::WriteDataContainer::acked_sequences_ [private]

Definition at line 408 of file WriteDataContainer.h.

Referenced by data_delivered(), and sequence_acknowledged().

ACE_Condition<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::WriteDataContainer::condition_ [private]

Definition at line 485 of file WriteDataContainer.h.

Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().

WriterDataSampleList OpenDDS::DCPS::WriteDataContainer::data_holder_ [private]

The list of all samples written to this datawriter in writing order.

Definition at line 429 of file WriteDataContainer.h.

Referenced by obtain_buffer(), and release_buffer().

CORBA::Long OpenDDS::DCPS::WriteDataContainer::depth_ [private]

The maximum size a container should allow for an instance sample list It corresponds to the QoS.HISTORY.depth value for QoS.HISTORY.kind==KEEP_LAST and corresponds to the QoS.RESOURCE_LIMITS.max_samples_per_instance for the case of QoS.HISTORY.kind==KEEP_ALL.

Definition at line 452 of file WriteDataContainer.h.

Referenced by obtain_buffer().

DDS::DomainId_t const OpenDDS::DCPS::WriteDataContainer::domain_id_ [private]

Domain ID.

Definition at line 514 of file WriteDataContainer.h.

DataDurabilityCache* const OpenDDS::DCPS::WriteDataContainer::durability_cache_ [private]

Pointer to the data durability cache.

This a pointer to the data durability cache owned by the Service Participant singleton, which means this cache is also a singleton.

Definition at line 530 of file WriteDataContainer.h.

Referenced by persist_data().

DDS::DurabilityServiceQosPolicy const& OpenDDS::DCPS::WriteDataContainer::durability_service_ [private]

DURABILITY_SERVICE QoS specific to the DataWriter.

Definition at line 533 of file WriteDataContainer.h.

ACE_Condition<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::WriteDataContainer::empty_condition_ [private]

Definition at line 486 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().

PublicationInstanceMapType OpenDDS::DCPS::WriteDataContainer::instances_ [private]

The individual instance queue threads in the data.

Definition at line 438 of file WriteDataContainer.h.

Referenced by dispose(), get_handle_instance(), get_instance_handles(), log_send_state_lists(), num_all_samples(), num_samples(), obtain_buffer(), register_instance(), reschedule_deadline(), unregister(), unregister_all(), and OpenDDS::DCPS::DataWriterImpl::unregister_instances().

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::WriteDataContainer::lock_ [private]

This lock is used to protect the container and the map in the type-specific DataWriter. This lock can be accessible via the datawriter. This lock is made to be globally accessible for performance concern. The lock is acquired as the external call (e.g. FooDataWriterImpl::write) started and the same lock will be used by the transport thread to notify the datawriter the data is delivered. Other internal operations will not lock.

Definition at line 484 of file WriteDataContainer.h.

::DDS::Duration_t OpenDDS::DCPS::WriteDataContainer::max_blocking_time_ [private]

The maximum time to block on write operation. This comes from DataWriter's QoS HISTORY.max_blocking_time

Definition at line 470 of file WriteDataContainer.h.

Referenced by obtain_buffer().

CORBA::Long OpenDDS::DCPS::WriteDataContainer::max_num_instances_ [private]

The maximum number of instances allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS

Definition at line 458 of file WriteDataContainer.h.

Referenced by register_instance().

CORBA::Long OpenDDS::DCPS::WriteDataContainer::max_num_samples_ [private]

The maximum number of samples allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS It also covers QoS.RESOURCE_LIMITS.max_samples and max_instances * depth

Definition at line 466 of file WriteDataContainer.h.

Referenced by obtain_buffer().

size_t OpenDDS::DCPS::WriteDataContainer::n_chunks_ [private]

The number of chunks that sample_list_element_allocator_ needs initialize.

Definition at line 496 of file WriteDataContainer.h.

Referenced by WriteDataContainer().

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::orphaned_to_transport_ [private]

List of data that has been released by WriteDataContainer but is still in process of delivery (or dropping) by transport

Definition at line 425 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), log_send_state_lists(), pending_data(), and remove_oldest_sample().

PublicationId OpenDDS::DCPS::WriteDataContainer::publication_id_ [private]

The publication Id from repo.

Definition at line 441 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), OpenDDS::DCPS::DataWriterImpl::enable(), obtain_buffer(), obtain_buffer_for_control(), reenqueue_all(), remove_oldest_historical_sample(), and remove_oldest_sample().

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::resend_data_ [private]

List of the data reenqueued to support the TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the samples in sent and sending list. This list will be passed to the transport for re-sending.

Definition at line 435 of file WriteDataContainer.h.

Referenced by get_resend_data().

DataSampleElementAllocator OpenDDS::DCPS::WriteDataContainer::sample_list_element_allocator_ [private]

The cached allocator to allocate DataSampleElement objects.

Definition at line 500 of file WriteDataContainer.h.

Referenced by copy_and_append(), obtain_buffer(), obtain_buffer_for_control(), release_buffer(), and WriteDataContainer().

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::sending_data_ [private]

List of data that is currently being sent.

Definition at line 418 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), get_unsent_data(), log_send_state_lists(), pending_data(), reenqueue_all(), and remove_oldest_sample().

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::sent_data_ [private]

List of data that has already been sent.

Definition at line 421 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), log_send_state_lists(), reenqueue_all(), remove_oldest_historical_sample(), and remove_oldest_sample().

bool OpenDDS::DCPS::WriteDataContainer::shutdown_ [private]

The flag indicates the datawriter will be destroyed.

Definition at line 511 of file WriteDataContainer.h.

Referenced by obtain_buffer(), unregister_all(), and ~WriteDataContainer().

char const* const OpenDDS::DCPS::WriteDataContainer::topic_name_ [private]

Topic name.

Definition at line 517 of file WriteDataContainer.h.

ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::transaction_id_ [private]

Id used to keep track of which send transaction DataWriter is currently creating

Definition at line 415 of file WriteDataContainer.h.

Referenced by get_unsent_data().

TransportCustomizedElementAllocator OpenDDS::DCPS::WriteDataContainer::transport_customized_element_allocator_ [private]

Definition at line 508 of file WriteDataContainer.h.

Referenced by obtain_buffer(), and obtain_buffer_for_control().

TransportSendElementAllocator OpenDDS::DCPS::WriteDataContainer::transport_send_element_allocator_ [private]

The allocator for TransportSendElement. The TransportSendElement allocator is put here because it needs the number of chunks information that WriteDataContainer has.

Definition at line 506 of file WriteDataContainer.h.

Referenced by obtain_buffer(), obtain_buffer_for_control(), and WriteDataContainer().

char const* const OpenDDS::DCPS::WriteDataContainer::type_name_ [private]

Type name.

Definition at line 520 of file WriteDataContainer.h.

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::unsent_data_ [private]

List of data that has not been sent yet.

Definition at line 411 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), enqueue(), enqueue_control(), get_unsent_data(), log_send_state_lists(), pending_data(), and remove_oldest_sample().

bool OpenDDS::DCPS::WriteDataContainer::waiting_on_release_ [private]

The block waiting flag.

Definition at line 473 of file WriteDataContainer.h.

Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().

ACE_Condition<ACE_Thread_Mutex> OpenDDS::DCPS::WriteDataContainer::wfa_condition_ [private]

Used to block in wait_for_acks().

Definition at line 492 of file WriteDataContainer.h.

Referenced by data_delivered(), and wait_ack_of_seq().

ACE_Thread_Mutex OpenDDS::DCPS::WriteDataContainer::wfa_lock_ [private]

Lock used for wait_for_acks() processing.

Definition at line 489 of file WriteDataContainer.h.

DataWriterImpl* OpenDDS::DCPS::WriteDataContainer::writer_ [private]

The writer that owns this container.

Definition at line 444 of file WriteDataContainer.h.

Referenced by copy_and_append(), data_delivered(), data_dropped(), dispose(), enqueue(), register_instance(), and unregister().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:41 2016 for OpenDDS by  doxygen 1.4.7