OpenDDS::DCPS::WriteDataContainer Class Reference

A container for instances sample data. More...

#include <WriteDataContainer.h>

Inheritance diagram for OpenDDS::DCPS::WriteDataContainer:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::WriteDataContainer:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 WriteDataContainer (DataWriterImpl *writer, CORBA::Long max_samples_per_instance, CORBA::Long history_depth, CORBA::Long max_durable_per_instance, DDS::Duration_t max_blocking_time, size_t n_chunks, DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataDurabilityCache *durability_cache, DDS::DurabilityServiceQosPolicy const &durability_service, CORBA::Long max_instances, CORBA::Long max_total_samples)
 ~WriteDataContainer ()
DDS::ReturnCode_t enqueue_control (DataSampleElement *control_sample)
DDS::ReturnCode_t enqueue (DataSampleElement *sample, DDS::InstanceHandle_t instance)
DDS::ReturnCode_t reenqueue_all (const RepoId &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params)
DDS::ReturnCode_t register_instance (DDS::InstanceHandle_t &instance_handle, Message_Block_Ptr &registered_sample)
DDS::ReturnCode_t unregister (DDS::InstanceHandle_t handle, Message_Block_Ptr &registered_sample, bool dup_registered_sample=true)
DDS::ReturnCode_t dispose (DDS::InstanceHandle_t handle, Message_Block_Ptr &registered_sample, bool dup_registered_sample=true)
DDS::ReturnCode_t num_samples (DDS::InstanceHandle_t handle, size_t &size)
size_t num_all_samples ()
ACE_UINT64 get_unsent_data (SendStateDataSampleList &list)
SendStateDataSampleList get_resend_data ()
bool pending_data ()
void data_delivered (const DataSampleElement *sample)
void data_dropped (const DataSampleElement *element, bool dropped_by_transport)
DDS::ReturnCode_t obtain_buffer_for_control (DataSampleElement *&element)
DDS::ReturnCode_t obtain_buffer (DataSampleElement *&element, DDS::InstanceHandle_t handle)
void release_buffer (DataSampleElement *element)
void unregister_all ()
PublicationInstance_rch get_handle_instance (DDS::InstanceHandle_t handle)
bool persist_data ()
void reschedule_deadline ()
 Reset time interval for each instance.
void wait_pending ()
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
void get_instance_handles (InstanceHandleVec &instance_handles)
DDS::ReturnCode_t wait_ack_of_seq (const ACE_Time_Value &abs_deadline, const SequenceNumber &sequence)
bool sequence_acknowledged (const SequenceNumber sequence)

Private Member Functions

 WriteDataContainer (WriteDataContainer const &)
WriteDataContaineroperator= (WriteDataContainer const &)
void copy_and_prepend (SendStateDataSampleList &list, const SendStateDataSampleList &appended, const RepoId &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params, ssize_t &max_resend_samples)
void remove_excess_durable ()
DDS::ReturnCode_t remove_oldest_sample (InstanceDataSampleList &instance_list, bool &released)
void wakeup_blocking_writers (DataSampleElement *stale)
void log_send_state_lists (OPENDDS_STRING description)

Private Attributes

DisjointSequence acked_sequences_
SendStateDataSampleList unsent_data_
 List of data that has not been sent yet.
ACE_UINT64 transaction_id_
SendStateDataSampleList sending_data_
 List of data that is currently being sent.
SendStateDataSampleList sent_data_
 List of data that has already been sent.
SendStateDataSampleList orphaned_to_transport_
WriterDataSampleList data_holder_
SendStateDataSampleList resend_data_
PublicationInstanceMapType instances_
 The individual instance queue threads in the data.
PublicationId publication_id_
 The publication Id from repo.
DataWriterImplwriter_
 The writer that owns this container.
CORBA::Long max_samples_per_instance_
CORBA::Long history_depth_
CORBA::Long max_durable_per_instance_
CORBA::Long max_num_instances_
CORBA::Long max_num_samples_
DDS::Duration_t max_blocking_time_
bool waiting_on_release_
 The block waiting flag.
ACE_Recursive_Thread_Mutex lock_
ACE_Condition
< ACE_Recursive_Thread_Mutex
condition_
ACE_Condition
< ACE_Recursive_Thread_Mutex
empty_condition_
ACE_Thread_Mutex wfa_lock_
 Lock used for wait_for_acks() processing.
ACE_Condition< ACE_Thread_Mutexwfa_condition_
 Used to block in wait_for_acks().
size_t n_chunks_
DataSampleElementAllocator sample_list_element_allocator_
bool shutdown_
 The flag indicates the datawriter will be destroyed.
DDS::DomainId_t const domain_id_
 Domain ID.
char const *const topic_name_
 Topic name.
char const *const type_name_
 Type name.
DataDurabilityCache *const durability_cache_
 Pointer to the data durability cache.
DDS::DurabilityServiceQosPolicy
const & 
durability_service_
 DURABILITY_SERVICE QoS specific to the DataWriter.

Friends

class DataWriterImpl
class ::DDS_TEST

Detailed Description

A container for instances sample data.

This container is instantiated per DataWriter. It maintains list of PublicationInstance objects which is internally referenced by the instance handle.

This container contains threaded lists of all data written to a given DataWriter. The real data sample is represented by the DataSampleElement. The data_holder_ holds all DataSampleElement in the writing order via the next_writer_sample_/previous_writer_sample_ thread. The instance list in PublicationInstance links samples via the next_instance_sample_ thread.

There are three state transition lists used during write operations: unsent, sending, and sent. These lists are linked via the next_send_sample_/previous_send_sample_ thread. Any DataSampleElement should be in one of these three lists and SHOULD NOT be shared between these three lists. A normal transition of a DataSampleElement would be unsent->sending->sent. A DataSampleElement transitions from unsent to sent naturally during a write operation when get_unsent_data is called. A DataSampleElement transitions from sending to sent when data_delivered is called notifying the container that the transport is finished with the sample and it can be marked as a historical sample. A DataSampleElement may transition back to unsent from sending if the transport notifies that the sample was dropped and should be resent. A DataSampleElement is removed from sent list and freed when the instance queue or container (for another instance) needs more space. A DataSampleElement may be removed and freed directly from sending when and unreliable writer needs space. In this case the transport will be notified that the sample should be removed. A DataSampleElement may also be removed directly from unsent if an unreliable writer needs space for new samples. Note: The real data sample will be freed when the reference counting goes 0. The resend list is only used when the datawriter uses TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements for the data sample duplicates of the sending and sent list and hands this list off to the transport.

Note:
: 1) The PublicationInstance object is not removed from this container until the instance is unregistered. The same instance handle is reused for re-registration. The instance data is deleted when this container is deleted. This would simplify instance data memory management. An alternative way is to remove the handle from the instance list when unregister occurs and delete the instance data after the transport is done with the instance, but we do not have a way to know when the transport is done since we have the sending list for all instances in the same datawriter. 2) This container has the ownership of the instance data samples when the data is written. The type-specific datawriter that allocates the memory for the sample data gives the ownership to its base class. 3) It is the responsibility of the owner of objects of this class to ensure that access to the lists are properly locked. This means using the same lock/condition to access the lists via the enqueue(), get*(), and data_delivered() methods. For the case where these are all called from the same (client) thread, this should be a recursive lock so that: 1) we do not deadlock; and, 2) we incur the cost of obtaining the lock only once.

Definition at line 118 of file WriteDataContainer.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::WriteDataContainer::WriteDataContainer ( DataWriterImpl writer,
CORBA::Long  max_samples_per_instance,
CORBA::Long  history_depth,
CORBA::Long  max_durable_per_instance,
DDS::Duration_t  max_blocking_time,
size_t  n_chunks,
DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
DataDurabilityCache durability_cache,
DDS::DurabilityServiceQosPolicy const &  durability_service,
CORBA::Long  max_instances,
CORBA::Long  max_total_samples 
)

No default constructor, must be initialized.

Parameters:
writer The writer which owns this container.
max_samples_per_instance Max samples kept within each instance
max_durable_per_instance Max durable samples sent for each instance
max_blocking_time The timeout for write.
n_chunks The number of chunks that the DataSampleElementAllocator needs allocate.
domain_id Domain ID.
topic_name Topic name.
type_name Type name.
durability_cache The data durability cache for unsent data.
durability_service DURABILITY_SERVICE QoS specific to the DataWriter.
max_instances maximum number of instances, 0 for unlimited
max_total_samples maximum total number of samples, 0 for unlimited

Definition at line 73 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, n_chunks_, and sample_list_element_allocator_.

00089   : transaction_id_(0),
00090     publication_id_(GUID_UNKNOWN),
00091     writer_(writer),
00092     max_samples_per_instance_(max_samples_per_instance),
00093     history_depth_(history_depth),
00094     max_durable_per_instance_(max_durable_per_instance),
00095     max_num_instances_(max_instances),
00096     max_num_samples_(max_total_samples),
00097     max_blocking_time_(max_blocking_time),
00098     waiting_on_release_(false),
00099     condition_(lock_),
00100     empty_condition_(lock_),
00101     wfa_condition_(this->wfa_lock_),
00102     n_chunks_(n_chunks),
00103     sample_list_element_allocator_(2 * n_chunks_),
00104     shutdown_(false),
00105     domain_id_(domain_id),
00106     topic_name_(topic_name),
00107     type_name_(type_name)
00108 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00109   , durability_cache_(durability_cache)
00110   , durability_service_(durability_service)
00111 #endif
00112 {
00113 
00114   if (DCPS_debug_level >= 2) {
00115     ACE_DEBUG((LM_DEBUG,
00116                "(%P|%t) WriteDataContainer "
00117                "sample_list_element_allocator %x with %d chunks\n",
00118                &sample_list_element_allocator_, n_chunks_));
00119   }
00120 }

OpenDDS::DCPS::WriteDataContainer::~WriteDataContainer (  ) 

Definition at line 122 of file WriteDataContainer.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue_head(), OpenDDS::DCPS::TransportRegistry::instance(), LM_DEBUG, LM_ERROR, LM_WARNING, orphaned_to_transport_, release_buffer(), OpenDDS::DCPS::TransportRegistry::released(), sending_data_, sent_data_, shutdown_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.

00123 {
00124   if (this->unsent_data_.size() > 0) {
00125     ACE_DEBUG((LM_WARNING,
00126                ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00127                ACE_TEXT("destroyed with %d samples unsent.\n"),
00128                this->unsent_data_.size()));
00129   }
00130 
00131   if (this->sending_data_.size() > 0) {
00132     if (TransportRegistry::instance()->released()) {
00133       for (DataSampleElement* e; sending_data_.dequeue_head(e);) {
00134         release_buffer(e);
00135       }
00136     }
00137     if (sending_data_.size() && DCPS_debug_level) {
00138       ACE_DEBUG((LM_WARNING,
00139                  ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00140                  ACE_TEXT("destroyed with %d samples sending.\n"),
00141                  this->sending_data_.size()));
00142     }
00143   }
00144 
00145   if (this->sent_data_.size() > 0) {
00146     ACE_DEBUG((LM_DEBUG,
00147                ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00148                ACE_TEXT("destroyed with %d samples sent.\n"),
00149                this->sent_data_.size()));
00150   }
00151 
00152   if (this->orphaned_to_transport_.size() > 0) {
00153     if (DCPS_debug_level > 0) {
00154       ACE_DEBUG((LM_DEBUG,
00155                  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00156                  ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
00157                  this->orphaned_to_transport_.size()));
00158     }
00159   }
00160 
00161   if (!shutdown_) {
00162     ACE_ERROR((LM_ERROR,
00163                ACE_TEXT("(%P|%t) ERROR: ")
00164                ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
00165                ACE_TEXT("The container has not been cleaned.\n")));
00166   }
00167 }

Here is the call graph for this function:

OpenDDS::DCPS::WriteDataContainer::WriteDataContainer ( WriteDataContainer const &   )  [private]

Member Function Documentation

void OpenDDS::DCPS::WriteDataContainer::copy_and_prepend ( SendStateDataSampleList list,
const SendStateDataSampleList appended,
const RepoId reader_id,
const DDS::LifespanQosPolicy lifespan,
const OPENDDS_STRING &  filterClassName,
const FilterEvaluator eval,
const DDS::StringSeq params,
ssize_t max_resend_samples 
) [private]

Definition at line 1265 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::enqueue_head(), OpenDDS::DCPS::DataWriterImpl::filter_out(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), OpenDDS::DCPS::SendStateDataSampleList::rbegin(), OpenDDS::DCPS::SendStateDataSampleList::rend(), OpenDDS::DCPS::resend_data_expired(), sample_list_element_allocator_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and writer_.

Referenced by reenqueue_all().

01275 {
01276   for (SendStateDataSampleList::const_reverse_iterator cur = appended.rbegin();
01277        cur != appended.rend() && max_resend_samples; ++cur) {
01278 
01279     if (resend_data_expired(*cur, lifespan))
01280       continue;
01281 
01282 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01283     if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
01284       continue;
01285 #endif
01286 
01287     PublicationInstance_rch inst = cur->get_handle();
01288 
01289     if (!inst) {
01290       // *cur is a control message, just skip it
01291       continue;
01292     }
01293 
01294     if (inst->durable_samples_remaining_ == 0)
01295       continue;
01296     --inst->durable_samples_remaining_;
01297 
01298     DataSampleElement* element = 0;
01299     ACE_NEW_MALLOC(element,
01300                    static_cast<DataSampleElement*>(
01301                      sample_list_element_allocator_.malloc(
01302                        sizeof(DataSampleElement))),
01303                    DataSampleElement(*cur));
01304 
01305     element->set_num_subs(1);
01306     element->set_sub_id(0, reader_id);
01307 
01308     list.enqueue_head(element);
01309     --max_resend_samples;
01310   }
01311 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::WriteDataContainer::data_delivered ( const DataSampleElement sample  ) 

Acknowledge the delivery of data. The sample that resides in this container will be moved from sending_data_ list to the internal sent_data_ list. If there are any threads waiting for available space, it wakes up these threads.

Definition at line 531 of file WriteDataContainer.cpp.

References ACE_TEXT(), acked_sequences_, ACE_Condition< ACE_Thread_Mutex >::broadcast(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), OpenDDS::DCPS::DataWriterImpl::controlTracker, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_handle(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, LM_WARNING, lock_, max_durable_per_instance_, OpenDDS::DCPS::MessageTracker::message_delivered(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::InstanceDataSampleList::on_some_list(), OPENDDS_STRING, orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::remove(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::set_flag(), topic_name_, unsent_data_, wakeup_blocking_writers(), wfa_condition_, wfa_lock_, and writer_.

Referenced by data_dropped().

00532 {
00533   DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
00534 
00535   if (DCPS_debug_level >= 2) {
00536     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
00537                           ACE_TEXT(" %@\n"), sample));
00538   }
00539 
00540   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00541             guard,
00542             this->lock_);
00543 
00544   // Delivered samples _must_ be on sending_data_ list
00545 
00546   // If it is not found in one of the lists, an invariant
00547   // exception is declared.
00548 
00549   // The element now needs to be removed from the sending_data_
00550   // list, and appended to the end of the sent_data_ list here
00551 
00552   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00553 
00554   // If sample is on a SendStateDataSampleList it should be on the
00555   // sending_data_ list signifying it was given to the transport to
00556   // deliver and now the transport is signaling it has been delivered
00557   if (!sending_data_.dequeue(sample)) {
00558     //
00559     // Should be on sending_data_.  If it is in sent_data_
00560     // or unsent_data there was a problem.
00561     //
00562     SendStateDataSampleList* send_lists[] = {
00563       &sent_data_,
00564       &unsent_data_,
00565       &orphaned_to_transport_};
00566     const SendStateDataSampleList* containing_list =
00567       SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00568 
00569     if (containing_list == &this->sent_data_) {
00570       ACE_ERROR((LM_WARNING,
00571                  ACE_TEXT("(%P|%t) WARNING: ")
00572                  ACE_TEXT("WriteDataContainer::data_delivered, ")
00573                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
00574                  ACE_TEXT("WAS IN sent_data_.\n")));
00575     } else if (containing_list == &this->unsent_data_) {
00576       ACE_ERROR((LM_WARNING,
00577                  ACE_TEXT("(%P|%t) WARNING: ")
00578                  ACE_TEXT("WriteDataContainer::data_delivered, ")
00579                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
00580                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
00581     } else {
00582 
00583       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
00584       //and inform transport of their release.  Transport will call data-delivered on the
00585       //elements as it processes the removal but they will already be gone from the send lists.
00586       if (stale->get_header().message_id_ != SAMPLE_DATA) {
00587         //this message was a control message so release it
00588         if (DCPS_debug_level > 9) {
00589           GuidConverter converter(publication_id_);
00590           ACE_DEBUG((LM_DEBUG,
00591                      ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00592                      ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00593                      this->domain_id_,
00594                      this->topic_name_,
00595                      OPENDDS_STRING(converter).c_str()));
00596         }
00597         writer_->controlTracker.message_delivered();
00598       }
00599 
00600       if (containing_list == &this->orphaned_to_transport_) {
00601         orphaned_to_transport_.dequeue(sample);
00602         release_buffer(stale);
00603 
00604       } else if (!containing_list) {
00605         // samples that were retrieved from get_resend_data()
00606         SendStateDataSampleList::remove(stale);
00607         release_buffer(stale);
00608       }
00609 
00610       if (!pending_data())
00611         empty_condition_.broadcast();
00612     }
00613 
00614     return;
00615   }
00616   ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, this->wfa_lock_);
00617   SequenceNumber acked_seq = stale->get_header().sequence_;
00618   SequenceNumber prev_max = acked_sequences_.cumulative_ack();
00619 
00620   if (stale->get_header().message_id_ != SAMPLE_DATA) {
00621     //this message was a control message so release it
00622     if (DCPS_debug_level > 9) {
00623       GuidConverter converter(publication_id_);
00624       ACE_DEBUG((LM_DEBUG,
00625                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00626                  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00627                  this->domain_id_,
00628                  this->topic_name_,
00629                  OPENDDS_STRING(converter).c_str()));
00630     }
00631     release_buffer(stale);
00632     stale = 0;
00633     writer_->controlTracker.message_delivered();
00634   } else {
00635 
00636     if (max_durable_per_instance_) {
00637       DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
00638       sent_data_.enqueue_tail(sample);
00639 
00640     } else {
00641       if (InstanceDataSampleList::on_some_list(sample)) {
00642         PublicationInstance_rch inst = sample->get_handle();
00643         inst->samples_.dequeue(sample);
00644       }
00645       release_buffer(stale);
00646       stale = 0;
00647     }
00648 
00649     if (DCPS_debug_level > 9) {
00650       GuidConverter converter(publication_id_);
00651       ACE_DEBUG((LM_DEBUG,
00652                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00653                  ACE_TEXT("domain %d topic %C publication %C %s.\n"),
00654                  this->domain_id_,
00655                  this->topic_name_,
00656                  OPENDDS_STRING(converter).c_str(),
00657                  max_durable_per_instance_
00658                  ? ACE_TEXT("stored for durability")
00659                  : ACE_TEXT("released")));
00660     }
00661 
00662     this->wakeup_blocking_writers (stale);
00663   }
00664   if (DCPS_debug_level > 9) {
00665     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00666                          ACE_TEXT("Inserting acked_sequence: %q\n"),
00667                          acked_seq.getValue()));
00668   }
00669 
00670   acked_sequences_.insert(acked_seq);
00671 
00672   if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
00673       prev_max < acked_sequences_.cumulative_ack()) {
00674 
00675     if (DCPS_debug_level > 9) {
00676       ACE_DEBUG((LM_DEBUG,
00677                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
00678                  ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
00679     }
00680 
00681     wfa_condition_.broadcast();
00682   }
00683 
00684   // Signal if there is no pending data.
00685   if (!pending_data())
00686     empty_condition_.broadcast();
00687 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::WriteDataContainer::data_dropped ( const DataSampleElement element,
bool  dropped_by_transport 
)

This method is called by the transport to notify the sample is dropped. Which the transport was told to do by the publication code by calling TransportClient::remove_sample(). If the sample was "sending" then it is moved to the "unsent" list. If there are any threads waiting for available space then it needs wake up these threads. The dropped_by_transport flag true indicates the dropping initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample().

Definition at line 690 of file WriteDataContainer.cpp.

References ACE_TEXT(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), OpenDDS::DCPS::DataWriterImpl::controlTracker, data_delivered(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), LM_DEBUG, LM_WARNING, lock_, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::DataSampleHeader::message_id_, OPENDDS_STRING, orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::remove(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, topic_name_, unsent_data_, wakeup_blocking_writers(), and writer_.

00692 {
00693   DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
00694 
00695   if (DCPS_debug_level >= 2) {
00696     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
00697                           ACE_TEXT(" sample %X dropped_by_transport %d\n"),
00698                           sample, dropped_by_transport));
00699   }
00700 
00701   // If the transport initiates the data dropping, we need do same thing
00702   // as data_delivered. e.g. remove the sample from the internal list
00703   // and the instance list. We do not need acquire the lock here since
00704   // the data_delivered acquires the lock.
00705   if (dropped_by_transport) {
00706     this->data_delivered(sample);
00707     return;
00708   }
00709 
00710   //The data_dropped could be called from the thread initiating sample remove
00711   //which already hold the lock. In this case, it's not necessary to acquire
00712   //lock here. It also could be called from the transport thread in a delayed
00713   //notification, it's necessary to acquire lock here to protect the internal
00714   //structures in this class.
00715 
00716   ACE_GUARD (ACE_Recursive_Thread_Mutex,
00717     guard,
00718     this->lock_);
00719 
00720   // The dropped sample should be in the sending_data_ list.
00721   // Otherwise an exception will be raised.
00722   //
00723   // We are now been notified by transport, so we can
00724   // keep the sample from the sending_data_ list still in
00725   // sample list since we will send it.
00726 
00727   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00728 
00729   // If sample is on a SendStateDataSampleList it should be on the
00730   // sending_data_ list signifying it was given to the transport to
00731   // deliver and now the transport is signaling it has been dropped
00732 
00733   if (sending_data_.dequeue(sample)) {
00734     // else: The data_dropped is called as a result of remove_sample()
00735     // called from reenqueue_all() which supports the TRANSIENT_LOCAL
00736     // qos. The samples that are sending by transport are dropped from
00737     // transport and will be moved to the unsent list for resend.
00738     unsent_data_.enqueue_tail(sample);
00739 
00740   } else {
00741     //
00742     // If it is in sent_data_ or unsent_data there was a problem.
00743     //
00744     SendStateDataSampleList* send_lists[] = {
00745       &sent_data_,
00746       &unsent_data_,
00747       &orphaned_to_transport_};
00748     const SendStateDataSampleList* containing_list =
00749       SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00750 
00751     if (containing_list == &this->sent_data_) {
00752       ACE_ERROR((LM_WARNING,
00753                  ACE_TEXT("(%P|%t) WARNING: ")
00754                  ACE_TEXT("WriteDataContainer::data_dropped, ")
00755                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
00756                  ACE_TEXT("WAS IN sent_data_.\n")));
00757     } else if (containing_list == &this->unsent_data_) {
00758       ACE_ERROR((LM_WARNING,
00759                  ACE_TEXT("(%P|%t) WARNING: ")
00760                  ACE_TEXT("WriteDataContainer::data_dropped, ")
00761                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
00762                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
00763     } else {
00764 
00765       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
00766       //and inform transport of their release.  Transport will call data-dropped on the
00767       //elements as it processes the removal but they will already be gone from the send lists.
00768       if (stale->get_header().message_id_ != SAMPLE_DATA) {
00769         //this message was a control message so release it
00770         if (DCPS_debug_level > 9) {
00771           GuidConverter converter(publication_id_);
00772           ACE_DEBUG((LM_DEBUG,
00773                      ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
00774                      ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
00775                      this->domain_id_,
00776                      this->topic_name_,
00777                      OPENDDS_STRING(converter).c_str()));
00778         }
00779         writer_->controlTracker.message_dropped();
00780       }
00781 
00782       if (containing_list == &this->orphaned_to_transport_) {
00783         orphaned_to_transport_.dequeue(sample);
00784         release_buffer(stale);
00785         if (!pending_data())
00786           empty_condition_.broadcast();
00787 
00788       } else if (!containing_list) {
00789         // samples that were retrieved from get_resend_data()
00790         SendStateDataSampleList::remove(stale);
00791         release_buffer(stale);
00792       }
00793     }
00794 
00795     return;
00796   }
00797 
00798   this->wakeup_blocking_writers (stale);
00799 
00800   if (!pending_data())
00801     empty_condition_.broadcast();
00802 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::dispose ( DDS::InstanceHandle_t  handle,
Message_Block_Ptr registered_sample,
bool  dup_registered_sample = true 
)

Delete the samples for the provided instance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.

This method returns error if the instance is not registered.

Definition at line 367 of file WriteDataContainer.cpp.

References ACE_TEXT(), OpenDDS::DCPS::find(), OpenDDS::DCPS::RcHandle< T >::in(), instances_, LM_ERROR, lock_, remove_oldest_sample(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::InstanceDataSampleList::size(), OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

Referenced by unregister_all().

00370 {
00371   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00372                    guard,
00373                    this->lock_,
00374                    DDS::RETCODE_ERROR);
00375 
00376   PublicationInstance_rch instance;
00377 
00378   int const find_attempt = find(instances_, instance_handle, instance);
00379 
00380   if (0 != find_attempt) {
00381     ACE_ERROR_RETURN((LM_ERROR,
00382                       ACE_TEXT("(%P|%t) ERROR: ")
00383                       ACE_TEXT("WriteDataContainer::dispose, ")
00384                       ACE_TEXT("The instance(handle=%X) ")
00385                       ACE_TEXT("is not registered yet.\n"),
00386                       instance_handle),
00387                      DDS::RETCODE_PRECONDITION_NOT_MET);
00388   }
00389 
00390   if (dup_registered_sample) {
00391     // The registered_sample is shallow copied.
00392     registered_sample.reset(instance->registered_sample_->duplicate());
00393   }
00394 
00395   // Note: The DDS specification is unclear as to if samples in the process
00396   // of being sent should be removed or not.
00397   // The advantage of calling remove_sample() on them is that the
00398   // cached allocator memory for them is freed.  The disadvantage
00399   // is that the slow reader may see multiple disposes without
00400   // any write sample between them and hence not temporarily move into the
00401   // Alive state.
00402   // We have chosen to NOT remove the sending samples.
00403 
00404   InstanceDataSampleList& instance_list = instance->samples_;
00405 
00406   while (instance_list.size() > 0) {
00407     bool released = false;
00408     DDS::ReturnCode_t ret
00409     = remove_oldest_sample(instance_list, released);
00410 
00411     if (ret != DDS::RETCODE_OK) {
00412       return ret;
00413     }
00414   }
00415 
00416   if (this->writer_->watchdog_.in())
00417     this->writer_->watchdog_->cancel_timer(instance);
00418   return DDS::RETCODE_OK;
00419 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue ( DataSampleElement sample,
DDS::InstanceHandle_t  instance 
)

Enqueue the data sample in its instance thread. This method assumes there is an available space for the sample in the instance list.

Definition at line 184 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::InstanceDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), get_handle_instance(), ACE_OS::gettimeofday(), OpenDDS::DCPS::RcHandle< T >::in(), DDS::RETCODE_OK, unsent_data_, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

00187 {
00188   // Get the PublicationInstance pointer from InstanceHandle_t.
00189   PublicationInstance_rch instance =
00190     get_handle_instance(instance_handle);
00191   // Extract the instance queue.
00192   InstanceDataSampleList& instance_list = instance->samples_;
00193 
00194   if (this->writer_->watchdog_.in()) {
00195     instance->last_sample_tv_ = instance->cur_sample_tv_;
00196     instance->cur_sample_tv_ = ACE_OS::gettimeofday();
00197     this->writer_->watchdog_->execute(*this->writer_, instance, false);
00198   }
00199 
00200   //
00201   // Enqueue to the next_send_sample_ thread of unsent_data_
00202   // will link samples with the next_sample/previous_sample and
00203   // also next_send_sample_.
00204   // This would save time when we actually send the data.
00205 
00206   unsent_data_.enqueue_tail(sample);
00207 
00208   //
00209   // Add this sample to the INSTANCE scope list.
00210   instance_list.enqueue_tail(sample);
00211 
00212   return DDS::RETCODE_OK;
00213 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::enqueue_control ( DataSampleElement control_sample  ) 

Definition at line 170 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), DDS::RETCODE_OK, and unsent_data_.

00171 {
00172   // Enqueue to the next_send_sample_ thread of unsent_data_
00173   // will link samples with the next_sample/previous_sample and
00174   // also next_send_sample_.
00175   // This would save time when we actually send the data.
00176 
00177   unsent_data_.enqueue_tail(control_sample);
00178 
00179   return DDS::RETCODE_OK;
00180 }

Here is the call graph for this function:

PublicationInstance_rch OpenDDS::DCPS::WriteDataContainer::get_handle_instance ( DDS::InstanceHandle_t  handle  ) 
Todo:
remove/document this!

Definition at line 1250 of file WriteDataContainer.cpp.

References ACE_TEXT(), OpenDDS::DCPS::find(), instances_, and LM_DEBUG.

Referenced by enqueue(), and obtain_buffer().

01251 {
01252   PublicationInstance_rch instance;
01253 
01254   if (0 != find(instances_, handle, instance)) {
01255     ACE_DEBUG((LM_DEBUG,
01256                ACE_TEXT("(%P|%t) ")
01257                ACE_TEXT("WriteDataContainer::get_handle_instance, ")
01258                ACE_TEXT("lookup for %d failed\n"), handle));
01259   }
01260 
01261   return instance;
01262 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::WriteDataContainer::get_instance_handles ( InstanceHandleVec &  instance_handles  ) 

Definition at line 1417 of file WriteDataContainer.cpp.

References instances_, and lock_.

01418 {
01419   ACE_GUARD(ACE_Recursive_Thread_Mutex,
01420             guard,
01421             this->lock_);
01422   PublicationInstanceMapType::iterator it = instances_.begin();
01423 
01424   while (it != instances_.end()) {
01425     instance_handles.push_back(it->second->instance_handle_);
01426     ++it;
01427   }
01428 }

SendStateDataSampleList OpenDDS::DCPS::WriteDataContainer::get_resend_data (  ) 

Obtain a list of data for resending. This is only used when TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list returned is not put on any SendStateDataSampleList.

Definition at line 502 of file WriteDataContainer.cpp.

References DBG_ENTRY_LVL, resend_data_, and OpenDDS::DCPS::SendStateDataSampleList::reset().

00503 {
00504   DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
00505 
00506   //
00507   // The samples in unsent_data are added to the sending_data
00508   // during enqueue.
00509   //
00510   SendStateDataSampleList list = this->resend_data_;
00511 
00512   //
00513   // Clear the unsent data list.
00514   //
00515   this->resend_data_.reset();
00516   //
00517   // Return the moved list.
00518   //
00519   return list;
00520 }

Here is the call graph for this function:

ACE_UINT64 OpenDDS::DCPS::WriteDataContainer::get_unsent_data ( SendStateDataSampleList list  ) 

Obtain a list of data that has not yet been sent. The data on the list returned is moved from the internal unsent_data_ list to the internal sending_data_ list as part of this call. The entire list is linked via the DataSampleElement.next_send_sample_ link as well.

Definition at line 463 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::SendStateDataSampleList::begin(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::SendStateDataSampleList::reset(), sending_data_, transaction_id_, and unsent_data_.

00464 {
00465   DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
00466   //
00467   // The samples in unsent_data are added to the local datawriter
00468   // list and enqueued to the sending_data_ signifying they have
00469   // been passed to the transport to send in a transaction
00470   //
00471   list = this->unsent_data_;
00472 
00473   // Increment send counter for this send operation
00474   ++transaction_id_;
00475 
00476   // Mark all samples with current send counter
00477   SendStateDataSampleList::iterator iter = list.begin();
00478   while (iter != list.end()) {
00479     iter->set_transaction_id(this->transaction_id_);
00480     ++iter;
00481   }
00482 
00483   //
00484   // The unsent_data_ already linked with the
00485   // next_send_sample during enqueue.
00486   // Append the unsent_data_ to current sending_data_
00487   // list.
00488   sending_data_.enqueue_tail(list);
00489 
00490   //
00491   // Clear the unsent data list.
00492   //
00493   this->unsent_data_.reset();
00494 
00495   //
00496   // Return the moved list.
00497   //
00498   return transaction_id_;
00499 }

Here is the call graph for this function:

void OpenDDS::DCPS::WriteDataContainer::log_send_state_lists ( OPENDDS_STRING  description  )  [private]

Definition at line 1497 of file WriteDataContainer.cpp.

References instances_, LM_DEBUG, num_all_samples(), orphaned_to_transport_, sending_data_, sent_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.

Referenced by wait_pending().

01498 {
01499   ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
01500              description.c_str(),
01501              unsent_data_.size(),
01502              sending_data_.size(),
01503              sent_data_.size(),
01504              orphaned_to_transport_.size(),
01505              num_all_samples(),
01506              instances_.size()));
01507 }

Here is the call graph for this function:

Here is the caller graph for this function:

size_t OpenDDS::DCPS::WriteDataContainer::num_all_samples (  ) 

Return the number of samples for all instances.

Definition at line 443 of file WriteDataContainer.cpp.

References instances_, lock_, and size.

Referenced by log_send_state_lists(), and obtain_buffer().

00444 {
00445   size_t size = 0;
00446 
00447   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00448                    guard,
00449                    this->lock_,
00450                    0);
00451 
00452   for (PublicationInstanceMapType::iterator iter = instances_.begin();
00453        iter != instances_.end();
00454        ++iter)
00455   {
00456     size += iter->second->samples_.size();
00457   }
00458 
00459   return size;
00460 }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::num_samples ( DDS::InstanceHandle_t  handle,
size_t size 
)

Return the number of samples for the given instance.

Definition at line 422 of file WriteDataContainer.cpp.

References OpenDDS::DCPS::find(), instances_, lock_, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

00424 {
00425   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00426                    guard,
00427                    this->lock_,
00428                    DDS::RETCODE_ERROR);
00429   PublicationInstance_rch instance;
00430 
00431   int const find_attempt = find(instances_, handle, instance);
00432 
00433   if (0 != find_attempt) {
00434     return DDS::RETCODE_ERROR;
00435 
00436   } else {
00437     size = instance->samples_.size();
00438     return DDS::RETCODE_OK;
00439   }
00440 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer ( DataSampleElement *&  element,
DDS::InstanceHandle_t  handle 
)

Allocate a DataSampleElement object and check the space availability for newly allocated element according to qos settings. For the blocking write case, if resource limits or history qos limits are reached, then it blocks for max blocking time for a previous sample to be delivered or dropped by the transport. In non-blocking write case, if resource limits or history qos limits are reached, will attempt to remove oldest samples (forcing the transport to drop samples if necessary) to make space. If there are several threads waiting then the first one in the waiting list can enqueue, others continue waiting.

Definition at line 1012 of file WriteDataContainer.cpp.

References ACE_TEXT(), condition_, data_holder_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_absolute_time_value(), OpenDDS::DCPS::WriterDataSampleList::enqueue_tail(), get_handle_instance(), ACE_OS::gettimeofday(), history_depth_, instances_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), max_blocking_time_, max_num_samples_, max_samples_per_instance_, num_all_samples(), publication_id_, OpenDDS::DCPS::DataWriterImpl::qos_, release_buffer(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, remove_excess_durable(), remove_oldest_sample(), DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sample_list_element_allocator_, shutdown_, ACE_Condition< ACE_Recursive_Thread_Mutex >::wait(), waiting_on_release_, and writer_.

01014 {
01015   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
01016 
01017   remove_excess_durable();
01018 
01019   PublicationInstance_rch instance = get_handle_instance(handle);
01020 
01021   if (!instance) {
01022     return DDS::RETCODE_BAD_PARAMETER;
01023   }
01024 
01025   ACE_NEW_MALLOC_RETURN(
01026     element,
01027     static_cast<DataSampleElement*>(
01028       sample_list_element_allocator_.malloc(
01029         sizeof(DataSampleElement))),
01030     DataSampleElement(publication_id_,
01031                           this->writer_,
01032                           instance),
01033     DDS::RETCODE_ERROR);
01034 
01035   // Extract the current instance queue.
01036   InstanceDataSampleList& instance_list = instance->samples_;
01037   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01038 
01039   bool need_to_set_abs_timeout = true;
01040   ACE_Time_Value abs_timeout;
01041 
01042   //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and
01043   //max_instances and max_instances * depth
01044   while ((instance_list.size() >= max_samples_per_instance_) ||
01045          ((this->max_num_samples_ > 0) &&
01046          ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
01047 
01048     if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01049       if (instance_list.size() >= history_depth_) {
01050         if (DCPS_debug_level >= 2) {
01051           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01052                      ACE_TEXT(" instance %d attempting to remove")
01053                      ACE_TEXT(" its oldest sample (reliable)\n"),
01054                      handle));
01055         }
01056         bool oldest_released = false;
01057         ret = remove_oldest_sample(instance_list, oldest_released);
01058         if (oldest_released) {
01059           break;
01060         }
01061       }
01062       // Reliable writers can wait
01063       if (need_to_set_abs_timeout) {
01064         abs_timeout = duration_to_absolute_time_value (max_blocking_time_);
01065         need_to_set_abs_timeout = false;
01066       }
01067       if (!shutdown_ && ACE_OS::gettimeofday() < abs_timeout) {
01068         if (DCPS_debug_level >= 2) {
01069           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01070                                 ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
01071                       handle));
01072         }
01073 
01074         waiting_on_release_ = true;
01075         int const wait_result = condition_.wait(&abs_timeout);
01076 
01077         if (wait_result == 0) {
01078           remove_excess_durable();
01079 
01080         } else {
01081           if (errno == ETIME) {
01082             if (DCPS_debug_level >= 2) {
01083               ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01084                                     ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
01085                           handle));
01086             }
01087             ret = DDS::RETCODE_TIMEOUT;
01088 
01089           } else {
01090             ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: WriteDataContainer::obtain_buffer condition_.wait()")
01091                                   ACE_TEXT("%p\n")));
01092             ret = DDS::RETCODE_ERROR;
01093           }
01094         }
01095 
01096       } else {
01097         //either shutdown has been signaled or max_blocking_time
01098         //has surpassed so treat as timeout
01099         ret = DDS::RETCODE_TIMEOUT;
01100       }
01101 
01102     } else {
01103       //BEST EFFORT
01104       bool oldest_released = false;
01105 
01106       //try to remove stale samples from this instance
01107       // The remove_oldest_sample() method removes the oldest sample
01108       // from instance list and removes it from the internal lists.
01109       if (instance_list.size() > 0) {
01110         if (DCPS_debug_level >= 2) {
01111           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01112                                 ACE_TEXT(" instance %d attempting to remove")
01113                                 ACE_TEXT(" its oldest sample\n"),
01114                                 handle));
01115         }
01116         ret = remove_oldest_sample(instance_list, oldest_released);
01117       }
01118       //else try to remove stale samples from other instances which are full
01119       if (ret == DDS::RETCODE_OK && !oldest_released) {
01120         if (DCPS_debug_level >= 2) {
01121           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01122                                 ACE_TEXT(" instance %d attempting to remove")
01123                                 ACE_TEXT(" oldest sample from any full instances\n"),
01124                                 handle));
01125         }
01126         PublicationInstanceMapType::iterator it = instances_.begin();
01127 
01128         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01129           if (it->second->samples_.size() >= max_samples_per_instance_) {
01130             ret = remove_oldest_sample(it->second->samples_, oldest_released);
01131           }
01132           ++it;
01133         }
01134       }
01135       //else try to remove stale samples from other non-full instances
01136       if (ret == DDS::RETCODE_OK && !oldest_released) {
01137         if (DCPS_debug_level >= 2) {
01138           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01139                                 ACE_TEXT(" instance %d attempting to remove")
01140                                 ACE_TEXT(" oldest sample from any instance with samples currently\n"),
01141                                 handle));
01142         }
01143         PublicationInstanceMapType::iterator it = instances_.begin();
01144 
01145         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01146           if (it->second->samples_.size() > 0) {
01147             ret = remove_oldest_sample(it->second->samples_, oldest_released);
01148           }
01149           ++it;
01150         }
01151       }
01152       if (!oldest_released) {
01153         //This means that no instances have samples to remove and yet
01154         //still hitting resource limits.
01155         ACE_ERROR((LM_ERROR,
01156                    ACE_TEXT("(%P|%t) ERROR: ")
01157                    ACE_TEXT("WriteDataContainer::obtain_buffer, ")
01158                    ACE_TEXT("hitting resource limits with no samples to remove\n")));
01159         ret = DDS::RETCODE_ERROR;
01160       }
01161     }  //END BEST EFFORT
01162 
01163     if (ret != DDS::RETCODE_OK) {
01164       if (DCPS_debug_level >= 2) {
01165         ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01166                               ACE_TEXT(" instance %d could not obtain buffer for sample")
01167                               ACE_TEXT(" releasing allotted sample and returning\n"),
01168                               handle));
01169       }
01170       this->release_buffer(element);
01171       return ret;
01172     }
01173   }  //END WHILE
01174 
01175   data_holder_.enqueue_tail(element);
01176 
01177   return ret;
01178 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::obtain_buffer_for_control ( DataSampleElement *&  element  ) 

Definition at line 994 of file WriteDataContainer.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), publication_id_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, and writer_.

00995 {
00996   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
00997 
00998   ACE_NEW_MALLOC_RETURN(
00999     element,
01000     static_cast<DataSampleElement*>(
01001       sample_list_element_allocator_.malloc(
01002         sizeof(DataSampleElement))),
01003     DataSampleElement(publication_id_,
01004                       this->writer_,
01005                       PublicationInstance_rch()),
01006     DDS::RETCODE_ERROR);
01007 
01008   return DDS::RETCODE_OK;
01009 }

Here is the call graph for this function:

typedef OpenDDS::DCPS::WriteDataContainer::OPENDDS_VECTOR ( DDS::InstanceHandle_t   ) 

Returns a vector of handles for the instances registered for this data writer.

WriteDataContainer& OpenDDS::DCPS::WriteDataContainer::operator= ( WriteDataContainer const &   )  [private]
bool OpenDDS::DCPS::WriteDataContainer::pending_data (  ) 

Returns if pending data exists. This includes sending, and unsent data.

Definition at line 523 of file WriteDataContainer.cpp.

References orphaned_to_transport_, sending_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and unsent_data_.

Referenced by data_delivered(), data_dropped(), remove_oldest_sample(), and wait_pending().

00524 {
00525   return this->sending_data_.size() != 0
00526          || this->orphaned_to_transport_.size() != 0
00527          || this->unsent_data_.size() != 0;
00528 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::WriteDataContainer::persist_data (  ) 

Copy sent data to data DURABILITY cache.

Definition at line 1315 of file WriteDataContainer.cpp.

References ACE_TEXT(), domain_id_, durability_cache_, durability_service_, OpenDDS::DCPS::DataDurabilityCache::insert(), LM_ERROR, sent_data_, topic_name_, and type_name_.

01316 {
01317   bool result = true;
01318 
01319   // ------------------------------------------------------------
01320   // Transfer sent data to data DURABILITY cache.
01321   // ------------------------------------------------------------
01322   if (this->durability_cache_) {
01323     // A data durability cache is available for TRANSIENT or
01324     // PERSISTENT data durability.  Cache the data samples.
01325 
01326     //
01327     //  We only cache data that is not still in use outside of
01328     //  this instance of WriteDataContainer
01329     //  (only cache samples in sent_data_ meaning transport has delivered).
01330     bool const inserted =
01331       this->durability_cache_->insert(this->domain_id_,
01332                                       this->topic_name_,
01333                                       this->type_name_,
01334                                       this->sent_data_,
01335                                       this->durability_service_
01336                                      );
01337 
01338     result = inserted;
01339 
01340     if (!inserted)
01341       ACE_ERROR((LM_ERROR,
01342                  ACE_TEXT("(%P|%t) ERROR: ")
01343                  ACE_TEXT("WriteDataContainer::persist_data, ")
01344                  ACE_TEXT("failed to make data durable for ")
01345                  ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
01346                  this->domain_id_,
01347                  this->topic_name_,
01348                  this->type_name_));
01349   }
01350 
01351   return result;
01352 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::reenqueue_all ( const RepoId reader_id,
const DDS::LifespanQosPolicy lifespan,
const OPENDDS_STRING &  filterClassName,
const FilterEvaluator eval,
const DDS::StringSeq params 
)

Create a resend list with the copies of all current "sending" and "sent" samples. The samples will be sent to the subscriber specified.

Definition at line 216 of file WriteDataContainer.cpp.

References ACE_TEXT(), copy_and_prepend(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, instances_, LM_DEBUG, lock_, max_durable_per_instance_, OPENDDS_STRING, publication_id_, resend_data_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sending_data_, sent_data_, OpenDDS::DCPS::SendStateDataSampleList::size(), and topic_name_.

00225 {
00226   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00227                    guard,
00228                    lock_,
00229                    DDS::RETCODE_ERROR);
00230 
00231   ssize_t total_size = 0;
00232   for (PublicationInstanceMapType::iterator it = instances_.begin();
00233        it != instances_.end(); ++it) {
00234     const ssize_t durable = std::min(it->second->samples_.size(),
00235                                      ssize_t(max_durable_per_instance_));
00236     total_size += durable;
00237     it->second->durable_samples_remaining_ = durable;
00238   }
00239 
00240   copy_and_prepend(resend_data_,
00241                    sending_data_,
00242                    reader_id,
00243                    lifespan,
00244 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00245                    filterClassName, eval, expression_params,
00246 #endif
00247                    total_size);
00248 
00249   copy_and_prepend(resend_data_,
00250                    sent_data_,
00251                    reader_id,
00252                    lifespan,
00253 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00254                    filterClassName, eval, expression_params,
00255 #endif
00256                    total_size);
00257 
00258   if (DCPS_debug_level > 9 && resend_data_.size()) {
00259     GuidConverter converter(publication_id_);
00260     GuidConverter reader(reader_id);
00261     ACE_DEBUG((LM_DEBUG,
00262                ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
00263                ACE_TEXT("domain %d topic %C publication %C copying ")
00264                ACE_TEXT("sending/sent to resend to %C.\n"),
00265                domain_id_,
00266                topic_name_,
00267                OPENDDS_STRING(converter).c_str(),
00268                OPENDDS_STRING(reader).c_str()));
00269   }
00270 
00271   return DDS::RETCODE_OK;
00272 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::register_instance ( DDS::InstanceHandle_t instance_handle,
Message_Block_Ptr registered_sample 
)

Dynamically allocate a PublicationInstance object and add to the instances_ list.

Note:
: The registered_sample is an input and output parameter. A shallow copy of the sample data will be given to datawriter as part of the control message.

Definition at line 275 of file WriteDataContainer.cpp.

References ACE_TEXT(), OpenDDS::DCPS::bind(), OpenDDS::DCPS::find(), OpenDDS::DCPS::DataWriterImpl::get_next_handle(), DDS::HANDLE_NIL, OpenDDS::DCPS::RcHandle< T >::in(), instances_, LM_ERROR, max_num_instances_, OpenDDS::DCPS::move(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::RcHandle< T >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

00278 {
00279   PublicationInstance_rch instance;
00280 
00281   if (instance_handle == DDS::HANDLE_NIL) {
00282     if (max_num_instances_ > 0
00283         && max_num_instances_ <= (CORBA::Long) instances_.size()) {
00284       return DDS::RETCODE_OUT_OF_RESOURCES;
00285     }
00286 
00287     // registered the instance for the first time.
00288     instance.reset(new PublicationInstance(move(registered_sample)), keep_count());
00289 
00290     instance_handle = this->writer_->get_next_handle();
00291 
00292     int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
00293 
00294     if (0 != insert_attempt) {
00295       ACE_ERROR((LM_ERROR,
00296                  ACE_TEXT("(%P|%t) ERROR: ")
00297                  ACE_TEXT("WriteDataContainer::register_instance, ")
00298                  ACE_TEXT("failed to insert instance handle=%X\n"),
00299                  instance.in()));
00300       return DDS::RETCODE_ERROR;
00301     } // if (0 != insert_attempt)
00302 
00303     instance->instance_handle_ = instance_handle;
00304 
00305   } else {
00306 
00307     int const find_attempt = find(instances_, instance_handle, instance);
00308 
00309     if (0 != find_attempt) {
00310       ACE_ERROR((LM_ERROR,
00311                  ACE_TEXT("(%P|%t) ERROR: ")
00312                  ACE_TEXT("WriteDataContainer::register_instance, ")
00313                  ACE_TEXT("The provided instance handle=%X is not a valid")
00314                  ACE_TEXT("handle.\n"),
00315                  instance_handle));
00316 
00317       return DDS::RETCODE_ERROR;
00318     } // if (0 != find_attempt)
00319 
00320     instance->unregistered_ = false;
00321   }
00322 
00323   // The registered_sample is shallow copied.
00324   registered_sample.reset(instance->registered_sample_->duplicate());
00325 
00326   if (this->writer_->watchdog_.in()) {
00327     this->writer_->watchdog_->schedule_timer(instance);
00328   }
00329 
00330   return DDS::RETCODE_OK;
00331 }

Here is the call graph for this function:

void OpenDDS::DCPS::WriteDataContainer::release_buffer ( DataSampleElement element  ) 

Release the memory previously allocated. This method is corresponding to the obtain_buffer method. If the memory is allocated by some allocator then the memory needs to be released to the allocator.

Definition at line 1181 of file WriteDataContainer.cpp.

References data_holder_, OpenDDS::DCPS::WriterDataSampleList::dequeue(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::SAMPLE_DATA, and sample_list_element_allocator_.

Referenced by data_delivered(), data_dropped(), obtain_buffer(), remove_excess_durable(), remove_oldest_sample(), and ~WriteDataContainer().

01182 {
01183   if (element->get_header().message_id_ == SAMPLE_DATA)
01184     data_holder_.dequeue(element);
01185   // Release the memory to the allocator.
01186   ACE_DES_FREE(element,
01187                sample_list_element_allocator_.free,
01188                DataSampleElement);
01189 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::WriteDataContainer::remove_excess_durable (  )  [private]

Remove the oldest "n" samples from each instance list that are in a state such that they could only be used for durability purposes (see reenqueue_all). "n" is determined by max_durable_per_instance_, so these samples are truly unneeded -- there are max_durable_per_instance_ newer samples available in the instance.

Definition at line 805 of file WriteDataContainer.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue(), domain_id_, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, instances_, LM_DEBUG, max_durable_per_instance_, OPENDDS_STRING, OpenDDS::DCPS::InstanceDataSampleList::prev(), publication_id_, release_buffer(), sent_data_, OpenDDS::DCPS::InstanceDataSampleList::tail(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and topic_name_.

Referenced by obtain_buffer().

00806 {
00807   if (!max_durable_per_instance_)
00808     return;
00809 
00810   size_t n_released = 0;
00811 
00812   for (PublicationInstanceMapType::iterator iter = instances_.begin();
00813        iter != instances_.end();
00814        ++iter) {
00815 
00816     CORBA::Long durable_allowed = max_durable_per_instance_;
00817     InstanceDataSampleList& instance_list = iter->second->samples_;
00818 
00819     for (DataSampleElement* it = instance_list.tail(), *prev; it; it = prev) {
00820       prev = InstanceDataSampleList::prev(it);
00821 
00822       if (DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, it->get_sample())) {
00823 
00824         if (durable_allowed) {
00825           --durable_allowed;
00826         } else {
00827           instance_list.dequeue(it);
00828           sent_data_.dequeue(it);
00829           release_buffer(it);
00830           ++n_released;
00831         }
00832       }
00833     }
00834   }
00835 
00836   if (n_released && DCPS_debug_level > 9) {
00837     const GuidConverter converter(publication_id_);
00838     ACE_DEBUG((LM_DEBUG,
00839                ACE_TEXT("(%P|%t) WriteDataContainer::remove_excess_durable: ")
00840                ACE_TEXT("domain %d topic %C publication %C %B samples removed ")
00841                ACE_TEXT("from durable data.\n"), domain_id_, topic_name_,
00842                OPENDDS_STRING(converter).c_str(), n_released));
00843   }
00844 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::remove_oldest_sample ( InstanceDataSampleList instance_list,
bool &  released 
) [private]

Remove the oldest sample (head) from the instance history list. This method also updates the internal lists to reflect the change. If the sample is in the unsent_data_ or sent_data_ list then it will be released. If the sample is in the sending_data_ list then the transport will be notified to release the sample, then the sample will be released. Otherwise an error is returned. The "released" boolean value indicates whether the sample is released.

Definition at line 848 of file WriteDataContainer.cpp.

References ACE_TEXT(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::InstanceDataSampleList::dequeue_head(), domain_id_, empty_condition_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), LM_DEBUG, LM_ERROR, LM_WARNING, lock_, OPENDDS_STRING, orphaned_to_transport_, pending_data(), publication_id_, release_buffer(), OpenDDS::DCPS::TransportClient::remove_sample(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SendStateDataSampleList::send_list_containing_element(), sending_data_, sent_data_, topic_name_, unsent_data_, and writer_.

Referenced by dispose(), and obtain_buffer().

00851 {
00852   DataSampleElement* stale = 0;
00853 
00854   //
00855   // Remove the oldest sample from the instance list.
00856   //
00857   if (instance_list.dequeue_head(stale) == false) {
00858     ACE_ERROR_RETURN((LM_ERROR,
00859                       ACE_TEXT("(%P|%t) ERROR: ")
00860                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00861                       ACE_TEXT("dequeue_head_next_sample failed\n")),
00862                      DDS::RETCODE_ERROR);
00863   }
00864 
00865   //
00866   // Remove the stale data from the next_writer_sample_ list.  The
00867   // sending_data_/next_send_sample_ list is not managed within the
00868   // container, it is only used external to the container and does
00869   // not need to be managed internally.
00870   //
00871   // The next_writer_sample_ link is being used in one of the sent_data_,
00872   // sending_data_, or unsent_data lists.  Removal from the doubly
00873   // linked list needs to repair the list only when the stale sample
00874   // is either the head or tail of the list.
00875   //
00876 
00877   //
00878   // Locate the head of the list that the stale data is in.
00879   //
00880   SendStateDataSampleList* send_lists[] = {
00881     &sending_data_,
00882     &sent_data_,
00883     &unsent_data_,
00884     &orphaned_to_transport_};
00885   const SendStateDataSampleList* containing_list =
00886     SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00887 
00888   //
00889   // Identify the list that the stale data is in.
00890   // The stale data should be in one of the sent_data_, sending_data_
00891   // or unsent_data_. It should not be in released_data_ list since
00892   // this function is the only place a sample is moved from
00893   // sending_data_ to released_data_ list.
00894 
00895   // Remove the element from the internal list.
00896   bool result = false;
00897 
00898   if (containing_list == &this->sending_data_) {
00899     if (DCPS_debug_level > 2) {
00900       ACE_ERROR((LM_WARNING,
00901                  ACE_TEXT("(%P|%t) WARNING: ")
00902                  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00903                  ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
00904     }
00905 
00906     // This means transport is still using the sample that needs to
00907     // be released currently so notify transport that sample is being removed.
00908 
00909     if (this->writer_->remove_sample(stale)) {
00910       if (this->sent_data_.dequeue(stale)) {
00911         release_buffer(stale);
00912         result = true;
00913       }
00914 
00915     } else {
00916       if (this->sending_data_.dequeue(stale)) {
00917         this->orphaned_to_transport_.enqueue_tail(stale);
00918       } else if (this->sent_data_.dequeue(stale)) {
00919         release_buffer(stale);
00920         result = true;
00921       }
00922       result = true;
00923     }
00924     released = true;
00925 
00926   } else if (containing_list == &this->sent_data_) {
00927     // No one is using the data sample, so we can release it back to
00928     // its allocator.
00929     //
00930     result = this->sent_data_.dequeue(stale) != 0;
00931     release_buffer(stale);
00932     released = true;
00933 
00934     if (DCPS_debug_level > 9) {
00935       GuidConverter converter(publication_id_);
00936       ACE_DEBUG((LM_DEBUG,
00937                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00938                  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00939                  this->domain_id_,
00940                  this->topic_name_,
00941                  OPENDDS_STRING(converter).c_str()));
00942     }
00943 
00944   } else if (containing_list == &this->unsent_data_) {
00945     //
00946     // No one is using the data sample, so we can release it back to
00947     // its allocator.
00948     //
00949     result = this->unsent_data_.dequeue(stale) != 0;
00950     release_buffer(stale);
00951     released = true;
00952 
00953     if (DCPS_debug_level > 9) {
00954       GuidConverter converter(publication_id_);
00955       ACE_DEBUG((LM_DEBUG,
00956                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00957                  ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
00958                  this->domain_id_,
00959                  this->topic_name_,
00960                  OPENDDS_STRING(converter).c_str()));
00961     }
00962   } else {
00963     ACE_ERROR_RETURN((LM_ERROR,
00964                       ACE_TEXT("(%P|%t) ERROR: ")
00965                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00966                       ACE_TEXT("The oldest sample is not in any internal list.\n")),
00967                      DDS::RETCODE_ERROR);
00968   }
00969 
00970   // Signal if there is no pending data.
00971   {
00972     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00973                      guard,
00974                      this->lock_,
00975                      DDS::RETCODE_ERROR);
00976 
00977     if (!pending_data())
00978       empty_condition_.broadcast();
00979   }
00980 
00981   if (result == false) {
00982     ACE_ERROR_RETURN((LM_ERROR,
00983                       ACE_TEXT("(%P|%t) ERROR: ")
00984                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00985                       ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00986                      DDS::RETCODE_ERROR);
00987 
00988   }
00989 
00990   return DDS::RETCODE_OK;
00991 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::WriteDataContainer::reschedule_deadline (  ) 

Reset time interval for each instance.

Definition at line 1355 of file WriteDataContainer.cpp.

References ACE_TEXT(), instances_, LM_ERROR, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

01356 {
01357   for (PublicationInstanceMapType::iterator iter = instances_.begin();
01358        iter != instances_.end();
01359        ++iter) {
01360     if (iter->second->deadline_timer_id_ != -1) {
01361       if (this->writer_->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
01362         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WriteDataContainer::reschedule_deadline %p\n")
01363                    ACE_TEXT("reset_timer_interval")));
01364       }
01365     }
01366   }
01367 }

Here is the call graph for this function:

bool OpenDDS::DCPS::WriteDataContainer::sequence_acknowledged ( const SequenceNumber  sequence  ) 

Definition at line 1466 of file WriteDataContainer.cpp.

References ACE_TEXT(), acked_sequences_, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().

Referenced by wait_ack_of_seq().

01467 {
01468   if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01469     //return true here so that wait_for_acknowledgements doesn't block
01470     return true;
01471   }
01472 
01473   SequenceNumber acked = acked_sequences_.cumulative_ack();
01474   if (DCPS_debug_level >= 10) {
01475     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged ")
01476                           ACE_TEXT("- cumulative ack is currently: %q\n"), acked.getValue()));
01477   }
01478   if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
01479     //if acked_sequences_ is empty or its cumulative_ack is lower than
01480     //the requests sequence, return false
01481     return false;
01482   }
01483   return true;
01484 }

Here is the call graph for this function:

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::unregister ( DDS::InstanceHandle_t  handle,
Message_Block_Ptr registered_sample,
bool  dup_registered_sample = true 
)

Remove the provided instance from the instances_ list. The registered sample data will be released upon the deletion of the PublicationInstance. A shallow copy of the sample data will be given to datawriter as part of the control message if the dup_registered_sample is true.

This method returns error if the instance is not registered.

Definition at line 334 of file WriteDataContainer.cpp.

References ACE_TEXT(), OpenDDS::DCPS::find(), OpenDDS::DCPS::RcHandle< T >::in(), instances_, LM_ERROR, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::DataWriterImpl::watchdog_, and writer_.

Referenced by unregister_all().

00338 {
00339   PublicationInstance_rch instance;
00340 
00341   int const find_attempt = find(instances_, instance_handle, instance);
00342 
00343   if (0 != find_attempt) {
00344     ACE_ERROR_RETURN((LM_ERROR,
00345                       ACE_TEXT("(%P|%t) ERROR: ")
00346                       ACE_TEXT("WriteDataContainer::unregister, ")
00347                       ACE_TEXT("The instance(handle=%X) ")
00348                       ACE_TEXT("is not registered yet.\n"),
00349                       instance_handle),
00350                      DDS::RETCODE_PRECONDITION_NOT_MET);
00351   } // if (0 != find_attempt)
00352 
00353   instance->unregistered_ = true;
00354 
00355   if (dup_registered_sample) {
00356     // The registered_sample is shallow copied.
00357     registered_sample.reset(instance->registered_sample_->duplicate());
00358   }
00359 
00360   if (this->writer_->watchdog_.in())
00361     this->writer_->watchdog_->cancel_timer(instance);
00362 
00363   return DDS::RETCODE_OK;
00364 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::WriteDataContainer::unregister_all (  ) 

Unregister all instances managed by this data containers.

Definition at line 1192 of file WriteDataContainer.cpp.

References ACE_TEXT(), ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), condition_, DBG_ENTRY_LVL, dispose(), instances_, LM_ERROR, lock_, OpenDDS::DCPS::TransportClient::remove_all_msgs(), DDS::RETCODE_OK, shutdown_, OpenDDS::DCPS::unbind(), unregister(), waiting_on_release_, and writer_.

01193 {
01194   DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
01195   shutdown_ = true;
01196 
01197   {
01198     //The internal list needs protection since this call may result from the
01199     //the delete_datawriter call which does not acquire the lock in advance.
01200     ACE_GUARD(ACE_Recursive_Thread_Mutex,
01201               guard,
01202               this->lock_);
01203     // Tell transport remove all control messages currently
01204     // transport is processing.
01205     (void) this->writer_->remove_all_msgs();
01206 
01207     // Broadcast to wake up all waiting threads.
01208     if (waiting_on_release_) {
01209       condition_.broadcast();
01210     }
01211   }
01212   DDS::ReturnCode_t ret;
01213   Message_Block_Ptr registered_sample;
01214   PublicationInstanceMapType::iterator it = instances_.begin();
01215 
01216   while (it != instances_.end()) {
01217     // Release the instance data.
01218     ret = dispose(it->first, registered_sample, false);
01219 
01220     if (ret != DDS::RETCODE_OK) {
01221       ACE_ERROR((LM_ERROR,
01222                  ACE_TEXT("(%P|%t) ERROR: ")
01223                  ACE_TEXT("WriteDataContainer::unregister_all, ")
01224                  ACE_TEXT("dispose instance %X failed\n"),
01225                  it->first));
01226     }
01227     // Mark the instance unregistered.
01228     ret = unregister(it->first, registered_sample, false);
01229 
01230     if (ret != DDS::RETCODE_OK) {
01231       ACE_ERROR((LM_ERROR,
01232                  ACE_TEXT("(%P|%t) ERROR: ")
01233                  ACE_TEXT("WriteDataContainer::unregister_all, ")
01234                  ACE_TEXT("unregister instance %X failed\n"),
01235                  it->first));
01236     }
01237 
01238     // Get the next iterator before erase the instance handle.
01239     PublicationInstanceMapType::iterator it_next = it;
01240     ++it_next;
01241     // Remove the instance from the instance list.
01242     unbind(instances_, it->first);
01243     it = it_next;
01244   }
01245 
01246   ACE_UNUSED_ARG(registered_sample);
01247 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::WriteDataContainer::wait_ack_of_seq ( const ACE_Time_Value abs_deadline,
const SequenceNumber sequence 
)

Definition at line 1431 of file WriteDataContainer.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, ACE_OS::gettimeofday(), OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, sequence_acknowledged(), ACE_Condition< ACE_Thread_Mutex >::wait(), wfa_condition_, and wfa_lock_.

01432 {
01433   ACE_Time_Value deadline(abs_deadline);
01434   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01435   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->wfa_lock_, DDS::RETCODE_ERROR);
01436 
01437   while (ACE_OS::gettimeofday() < deadline) {
01438 
01439     if (!sequence_acknowledged(sequence)) {
01440       // lock is released while waiting and acquired before returning
01441       // from wait.
01442       int const wait_result = wfa_condition_.wait(&deadline);
01443 
01444       if (wait_result != 0) {
01445         if (errno == ETIME) {
01446           if (DCPS_debug_level >= 2) {
01447             ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
01448                                   ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
01449                                   sequence.getValue()));
01450           }
01451           ret = DDS::RETCODE_TIMEOUT;
01452         } else {
01453           ret = DDS::RETCODE_ERROR;
01454         }
01455       }
01456     } else {
01457       ret = DDS::RETCODE_OK;
01458       break;
01459     }
01460   }
01461 
01462   return ret;
01463 }

Here is the call graph for this function:

void OpenDDS::DCPS::WriteDataContainer::wait_pending (  ) 

Block until pending samples have either been delivered or dropped.

Definition at line 1370 of file WriteDataContainer.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, empty_condition_, ACE_OS::gettimeofday(), LM_DEBUG, LM_INFO, lock_, log_send_state_lists(), pending_data(), TheServiceParticipant, ACE_Condition< ACE_Recursive_Thread_Mutex >::wait(), and ACE_Time_Value::zero.

01371 {
01372   ACE_Time_Value pending_timeout =
01373     TheServiceParticipant->pending_timeout();
01374 
01375   ACE_Time_Value* pTimeout = 0;
01376 
01377   if (pending_timeout != ACE_Time_Value::zero) {
01378     pTimeout = &pending_timeout;
01379     pending_timeout += ACE_OS::gettimeofday();
01380   }
01381 
01382   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01383   const bool report = DCPS_debug_level > 0 && pending_data();
01384   if (report) {
01385     if (pending_timeout == ACE_Time_Value::zero) {
01386       ACE_DEBUG((LM_DEBUG,
01387                  ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending no timeout\n")));
01388     } else {
01389       ACE_DEBUG((LM_DEBUG,
01390                  ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending timeout ")
01391                  ACE_TEXT("at %#T\n"),
01392                  &pending_timeout));
01393     }
01394   }
01395   while (true) {
01396 
01397     if (!pending_data())
01398       break;
01399 
01400     if (empty_condition_.wait(pTimeout) == -1 && pending_data()) {
01401       if (DCPS_debug_level) {
01402         ACE_DEBUG((LM_INFO,
01403                    ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending %p\n"),
01404                    ACE_TEXT("Timed out waiting for messages to be transported")));
01405         this->log_send_state_lists("WriteDataContainer::wait_pending - wait failed: ");
01406       }
01407       break;
01408     }
01409   }
01410   if (report) {
01411     ACE_DEBUG((LM_DEBUG,
01412                "%T (%P|%t) WriteDataContainer::wait_pending done\n"));
01413   }
01414 }

Here is the call graph for this function:

void OpenDDS::DCPS::WriteDataContainer::wakeup_blocking_writers ( DataSampleElement stale  )  [private]

Called when data has been dropped or delivered and any blocked writers should be notified

Definition at line 1487 of file WriteDataContainer.cpp.

References ACE_Condition< ACE_Recursive_Thread_Mutex >::broadcast(), condition_, and waiting_on_release_.

Referenced by data_delivered(), and data_dropped().

01488 {
01489   if (!stale && waiting_on_release_) {
01490     waiting_on_release_ = false;
01491 
01492     condition_.broadcast();
01493   }
01494 }

Here is the call graph for this function:

Here is the caller graph for this function:


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Definition at line 352 of file WriteDataContainer.h.

friend class DataWriterImpl [friend]

Definition at line 121 of file WriteDataContainer.h.


Member Data Documentation

Definition at line 408 of file WriteDataContainer.h.

Referenced by data_delivered(), and sequence_acknowledged().

Definition at line 487 of file WriteDataContainer.h.

Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().

The list of all samples written to this datawriter in writing order.

Definition at line 429 of file WriteDataContainer.h.

Referenced by obtain_buffer(), and release_buffer().

Pointer to the data durability cache.

This a pointer to the data durability cache owned by the Service Participant singleton, which means this cache is also a singleton.

Definition at line 524 of file WriteDataContainer.h.

Referenced by persist_data().

DURABILITY_SERVICE QoS specific to the DataWriter.

Definition at line 527 of file WriteDataContainer.h.

Referenced by persist_data().

Definition at line 450 of file WriteDataContainer.h.

Referenced by obtain_buffer().

PublicationInstanceMapType OpenDDS::DCPS::WriteDataContainer::instances_ [private]

This lock is used to protect the container and the map in the type-specific DataWriter. This lock can be accessible via the datawriter. This lock is made to be globally accessible for performance concern. The lock is acquired as the external call (e.g. FooDataWriterImpl::write) started and the same lock will be used by the transport thread to notify the datawriter the data is delivered. Other internal operations will not lock.

Definition at line 486 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), dispose(), get_instance_handles(), num_all_samples(), num_samples(), reenqueue_all(), remove_oldest_sample(), unregister_all(), and wait_pending().

The maximum time to block on write operation. This comes from DataWriter's QoS HISTORY.max_blocking_time

Definition at line 472 of file WriteDataContainer.h.

Referenced by obtain_buffer().

The maximum number of samples from each instance that can be added to the resend_data_ for durability.

Definition at line 454 of file WriteDataContainer.h.

Referenced by data_delivered(), reenqueue_all(), and remove_excess_durable().

The maximum number of instances allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS

Definition at line 460 of file WriteDataContainer.h.

Referenced by register_instance().

The maximum number of samples allowed or zero to indicate unlimited. It corresponds to the QoS.RESOURCE_LIMITS.max_instances when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS It also covers QoS.RESOURCE_LIMITS.max_samples and max_instances * max_samples_per_instance

Definition at line 468 of file WriteDataContainer.h.

Referenced by obtain_buffer().

The maximum size a container should allow for an instance sample list

Definition at line 448 of file WriteDataContainer.h.

Referenced by obtain_buffer().

The number of chunks that sample_list_element_allocator_ needs initialize.

Definition at line 498 of file WriteDataContainer.h.

Referenced by WriteDataContainer().

List of data that has been released by WriteDataContainer but is still in process of delivery (or dropping) by transport

Definition at line 425 of file WriteDataContainer.h.

Referenced by data_delivered(), data_dropped(), log_send_state_lists(), pending_data(), remove_oldest_sample(), and ~WriteDataContainer().

List of the data reenqueued to support the TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the samples in sent and sending list. This list will be passed to the transport for re-sending.

Definition at line 435 of file WriteDataContainer.h.

Referenced by get_resend_data(), and reenqueue_all().

The cached allocator to allocate DataSampleElement objects.

Definition at line 502 of file WriteDataContainer.h.

Referenced by copy_and_prepend(), obtain_buffer(), obtain_buffer_for_control(), release_buffer(), and WriteDataContainer().

The flag indicates the datawriter will be destroyed.

Definition at line 505 of file WriteDataContainer.h.

Referenced by obtain_buffer(), unregister_all(), and ~WriteDataContainer().

Id used to keep track of which send transaction DataWriter is currently creating

Definition at line 415 of file WriteDataContainer.h.

Referenced by get_unsent_data().

char const* const OpenDDS::DCPS::WriteDataContainer::type_name_ [private]

Type name.

Definition at line 514 of file WriteDataContainer.h.

Referenced by persist_data().

The block waiting flag.

Definition at line 475 of file WriteDataContainer.h.

Referenced by obtain_buffer(), unregister_all(), and wakeup_blocking_writers().

Used to block in wait_for_acks().

Definition at line 494 of file WriteDataContainer.h.

Referenced by data_delivered(), and wait_ack_of_seq().

Lock used for wait_for_acks() processing.

Definition at line 491 of file WriteDataContainer.h.

Referenced by data_delivered(), and wait_ack_of_seq().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1