LCOV - code coverage report
Current view: top level - DCPS - WriteDataContainer.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 714 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 45 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : 
      10             : #include "WriteDataContainer.h"
      11             : 
      12             : #include "DataSampleHeader.h"
      13             : #include "InstanceDataSampleList.h"
      14             : #include "DataWriterImpl.h"
      15             : #include "MessageTracker.h"
      16             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
      17             : #  include "DataDurabilityCache.h"
      18             : #endif
      19             : #include "PublicationInstance.h"
      20             : #include "Util.h"
      21             : #include "Time_Helper.h"
      22             : #include "GuidConverter.h"
      23             : #include "transport/framework/TransportSendElement.h"
      24             : #include "transport/framework/TransportCustomizedElement.h"
      25             : #include "transport/framework/TransportRegistry.h"
      26             : 
      27             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      28             : 
      29             : namespace OpenDDS {
      30             : namespace DCPS {
      31             : 
      32             : /**
      33             :  * @todo Refactor this code and DataReaderImpl::data_expired() to
      34             :  *       a common function.
      35             :  */
      36             : bool
      37           0 : resend_data_expired(const DataSampleElement& element,
      38             :                     const DDS::LifespanQosPolicy& lifespan)
      39             : {
      40           0 :   if (lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
      41           0 :       || lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
      42             :     // Finite lifespan.  Check if data has expired.
      43             : 
      44             :     const DDS::Time_t tmp = {
      45           0 :       element.get_header().source_timestamp_sec_ + lifespan.duration.sec,
      46           0 :       element.get_header().source_timestamp_nanosec_ + lifespan.duration.nanosec
      47           0 :     };
      48           0 :     const SystemTimePoint expiration_time(time_to_time_value(tmp));
      49           0 :     const SystemTimePoint now = SystemTimePoint::now();
      50             : 
      51           0 :     if (now >= expiration_time) {
      52           0 :       if (DCPS_debug_level >= 8) {
      53           0 :         const TimeDuration diff = now - expiration_time;
      54           0 :         ACE_DEBUG((LM_DEBUG,
      55             :                    ACE_TEXT("OpenDDS (%P|%t) Data to be sent ")
      56             :                    ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
      57             :                    diff.value().sec(),
      58             :                    diff.value().usec()));
      59           0 :       }
      60             : 
      61           0 :       return true; // Data expired.
      62             :     }
      63           0 :   }
      64             : 
      65           0 :   return false;
      66             : }
      67             : 
      68           0 : WriteDataContainer::WriteDataContainer(
      69             :   DataWriterImpl* writer,
      70             :   CORBA::Long max_samples_per_instance,
      71             :   CORBA::Long history_depth,
      72             :   CORBA::Long max_durable_per_instance,
      73             :   DDS::Duration_t max_blocking_time,
      74             :   size_t n_chunks,
      75             :   DDS::DomainId_t domain_id,
      76             :   const char* topic_name,
      77             :   const char* type_name,
      78             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
      79             :   DataDurabilityCache* durability_cache,
      80             :   const DDS::DurabilityServiceQosPolicy& durability_service,
      81             : #endif
      82             :   CORBA::Long max_instances,
      83             :   CORBA::Long max_total_samples,
      84             :   ACE_Recursive_Thread_Mutex& deadline_status_lock,
      85             :   DDS::OfferedDeadlineMissedStatus& deadline_status,
      86           0 :   CORBA::Long& deadline_last_total_count)
      87           0 :   : cached_cumulative_ack_valid_(false)
      88           0 :   , transaction_id_(0)
      89           0 :   , publication_id_(GUID_UNKNOWN)
      90           0 :   , writer_(writer)
      91           0 :   , max_samples_per_instance_(max_samples_per_instance)
      92           0 :   , history_depth_(history_depth)
      93           0 :   , max_durable_per_instance_(max_durable_per_instance)
      94           0 :   , max_num_instances_(max_instances)
      95           0 :   , max_num_samples_(max_total_samples)
      96           0 :   , max_blocking_time_(max_blocking_time)
      97           0 :   , waiting_on_release_(false)
      98           0 :   , condition_(lock_)
      99           0 :   , empty_condition_(lock_)
     100           0 :   , wfa_condition_(wfa_lock_)
     101           0 :   , n_chunks_(n_chunks)
     102           0 :   , sample_list_element_allocator_(2 * n_chunks_)
     103           0 :   , shutdown_(false)
     104           0 :   , domain_id_(domain_id)
     105           0 :   , topic_name_(topic_name)
     106           0 :   , type_name_(type_name)
     107             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
     108           0 :   , durability_cache_(durability_cache)
     109           0 :   , durability_service_(durability_service)
     110             : #endif
     111           0 :   , deadline_task_(DCPS::make_rch<DCPS::PmfSporadicTask<WriteDataContainer> >(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &WriteDataContainer::process_deadlines))
     112           0 :   , deadline_period_(TimeDuration::max_value)
     113           0 :   , deadline_status_lock_(deadline_status_lock)
     114           0 :   , deadline_status_(deadline_status)
     115           0 :   , deadline_last_total_count_(deadline_last_total_count)
     116             : {
     117           0 :   if (DCPS_debug_level >= 2) {
     118           0 :     ACE_DEBUG((LM_DEBUG,
     119             :                "(%P|%t) WriteDataContainer "
     120             :                "sample_list_element_allocator %x with %d chunks\n",
     121             :                &sample_list_element_allocator_, n_chunks_));
     122             :   }
     123           0 :   acked_sequences_[GUID_UNKNOWN].insert(SequenceNumber::ZERO());
     124           0 : }
     125             : 
     126           0 : WriteDataContainer::~WriteDataContainer()
     127             : {
     128           0 :   deadline_task_->cancel();
     129             : 
     130           0 :   if (this->unsent_data_.size() > 0) {
     131           0 :     ACE_DEBUG((LM_WARNING,
     132             :                ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
     133             :                ACE_TEXT("destroyed with %d samples unsent.\n"),
     134             :                this->unsent_data_.size()));
     135             :   }
     136             : 
     137           0 :   if (this->sending_data_.size() > 0) {
     138           0 :     if (TransportRegistry::instance()->released()) {
     139           0 :       for (DataSampleElement* e; sending_data_.dequeue_head(e);) {
     140           0 :         release_buffer(e);
     141             :       }
     142             :     }
     143           0 :     if (sending_data_.size() && DCPS_debug_level) {
     144           0 :       ACE_DEBUG((LM_WARNING,
     145             :                  ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
     146             :                  ACE_TEXT("destroyed with %d samples sending.\n"),
     147             :                  this->sending_data_.size()));
     148             :     }
     149             :   }
     150             : 
     151           0 :   if (this->sent_data_.size() > 0) {
     152           0 :     ACE_DEBUG((LM_DEBUG,
     153             :                ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
     154             :                ACE_TEXT("destroyed with %d samples sent.\n"),
     155             :                this->sent_data_.size()));
     156             :   }
     157             : 
     158           0 :   if (this->orphaned_to_transport_.size() > 0) {
     159           0 :     if (DCPS_debug_level > 0) {
     160           0 :       ACE_DEBUG((LM_DEBUG,
     161             :                  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
     162             :                  ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
     163             :                  this->orphaned_to_transport_.size()));
     164             :     }
     165             :   }
     166             : 
     167           0 :   if (!shutdown_) {
     168           0 :     ACE_ERROR((LM_ERROR,
     169             :                ACE_TEXT("(%P|%t) ERROR: ")
     170             :                ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
     171             :                ACE_TEXT("The container has not been cleaned.\n")));
     172             :   }
     173           0 : }
     174             : 
     175             : void
     176           0 : WriteDataContainer::add_reader_acks(const GUID_t& reader, const SequenceNumber& base)
     177             : {
     178           0 :   ACE_Guard<ACE_Thread_Mutex> guard(wfa_lock_);
     179             : 
     180           0 :   DisjointSequence& ds = acked_sequences_[reader];
     181           0 :   ds.reset();
     182           0 :   if (base == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
     183           0 :     ds.insert(SequenceNumber::ZERO());
     184             :   } else {
     185           0 :     ds.insert(SequenceRange(SequenceNumber(), base));
     186             :   }
     187           0 :   cached_cumulative_ack_valid_ = false;
     188           0 : }
     189             : 
     190             : void
     191           0 : WriteDataContainer::remove_reader_acks(const GUID_t& reader)
     192             : {
     193           0 :   ACE_Guard<ACE_Thread_Mutex> guard(wfa_lock_);
     194             : 
     195           0 :   const SequenceNumber prev_cum_ack = get_cumulative_ack();
     196           0 :   const AckedSequenceMap::iterator it = acked_sequences_.find(reader);
     197           0 :   if (it != acked_sequences_.end()) {
     198           0 :     acked_sequences_.erase(it);
     199           0 :     cached_cumulative_ack_valid_ = false;
     200           0 :     if (prev_cum_ack != get_cumulative_ack()) {
     201           0 :       wfa_condition_.notify_all();
     202             :     }
     203             :   }
     204           0 : }
     205             : 
     206             : SequenceNumber
     207           0 : WriteDataContainer::get_cumulative_ack()
     208             : {
     209           0 :   if (acked_sequences_.empty()) {
     210           0 :     return SequenceNumber::SEQUENCENUMBER_UNKNOWN();
     211             :   }
     212             : 
     213           0 :   if (cached_cumulative_ack_valid_) {
     214           0 :     return cached_cumulative_ack_;
     215             :   }
     216             : 
     217           0 :   SequenceNumber result = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
     218           0 :   for (AckedSequenceMap::const_iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
     219           0 :     if (!it->second.empty()) {
     220           0 :       result = result == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? it->second.cumulative_ack() : std::min(result, it->second.cumulative_ack());
     221             :     }
     222             :   }
     223           0 :   cached_cumulative_ack_ = result;
     224           0 :   cached_cumulative_ack_valid_ = true;
     225           0 :   return result;
     226             : }
     227             : 
     228             : SequenceNumber
     229           0 : WriteDataContainer::get_last_ack()
     230             : {
     231           0 :   if (acked_sequences_.empty()) {
     232           0 :     return SequenceNumber::SEQUENCENUMBER_UNKNOWN();
     233             :   }
     234             : 
     235           0 :   SequenceNumber result = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
     236           0 :   for (AckedSequenceMap::const_iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
     237           0 :     if (!it->second.empty()) {
     238           0 :       result = result == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? it->second.last_ack() : std::max(result, it->second.last_ack());
     239             :     }
     240             :   }
     241           0 :   return result;
     242             : }
     243             : 
     244             : void
     245           0 : WriteDataContainer::update_acked(const SequenceNumber& seq, const GUID_t& id)
     246             : {
     247           0 :   bool do_notify = false;
     248           0 :   if (id == GUID_UNKNOWN) {
     249           0 :     for (AckedSequenceMap::iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
     250           0 :       SequenceNumber prev_cum_ack = it->second.cumulative_ack();
     251           0 :       it->second.insert(seq);
     252           0 :       cached_cumulative_ack_valid_ = false;
     253           0 :       if (prev_cum_ack != it->second.cumulative_ack()) {
     254           0 :         do_notify = true;
     255             :       }
     256             :     }
     257             :   } else {
     258           0 :     const AckedSequenceMap::iterator it = acked_sequences_.find(id);
     259           0 :     if (it != acked_sequences_.end()) {
     260           0 :       SequenceNumber prev_cum_ack = it->second.cumulative_ack();
     261           0 :       if (prev_cum_ack < seq) {
     262           0 :         it->second.insert(SequenceRange(prev_cum_ack, seq));
     263           0 :         cached_cumulative_ack_valid_ = false;
     264           0 :         if (prev_cum_ack != it->second.cumulative_ack()) {
     265           0 :           do_notify = true;
     266             :         }
     267             :       }
     268             :     }
     269             :   }
     270           0 :   if (do_notify) {
     271           0 :     wfa_condition_.notify_all();
     272             :   }
     273           0 : }
     274             : 
     275             : DDS::ReturnCode_t
     276           0 : WriteDataContainer::enqueue_control(DataSampleElement* control_sample)
     277             : {
     278             :   // Enqueue to the next_send_sample_ thread of unsent_data_
     279             :   // will link samples with the next_sample/previous_sample and
     280             :   // also next_send_sample_.
     281             :   // This would save time when we actually send the data.
     282             : 
     283           0 :   if (shutdown_) {
     284           0 :     return DDS::RETCODE_ERROR;
     285             :   }
     286             : 
     287           0 :   unsent_data_.enqueue_tail(control_sample);
     288             : 
     289           0 :   return DDS::RETCODE_OK;
     290             : }
     291             : 
     292             : // This method assumes that instance list has space for this sample.
     293             : DDS::ReturnCode_t
     294           0 : WriteDataContainer::enqueue(
     295             :   DataSampleElement* sample,
     296             :   DDS::InstanceHandle_t instance_handle)
     297             : {
     298           0 :   if (shutdown_) {
     299           0 :     return DDS::RETCODE_ERROR;
     300             :   }
     301             : 
     302             :   // Get the PublicationInstance pointer from InstanceHandle_t.
     303             :   PublicationInstance_rch instance =
     304           0 :     get_handle_instance(instance_handle);
     305             :   // Extract the instance queue.
     306           0 :   InstanceDataSampleList& instance_list = instance->samples_;
     307             : 
     308           0 :   extend_deadline(instance);
     309             : 
     310             :   //
     311             :   // Enqueue to the next_send_sample_ thread of unsent_data_
     312             :   // will link samples with the next_sample/previous_sample and
     313             :   // also next_send_sample_.
     314             :   // This would save time when we actually send the data.
     315             : 
     316           0 :   unsent_data_.enqueue_tail(sample);
     317             : 
     318             :   //
     319             :   // Add this sample to the INSTANCE scope list.
     320           0 :   instance_list.enqueue_tail(sample);
     321             : 
     322           0 :   return DDS::RETCODE_OK;
     323           0 : }
     324             : 
     325             : DDS::ReturnCode_t
     326           0 : WriteDataContainer::reenqueue_all(const GUID_t& reader_id,
     327             :                                   const DDS::LifespanQosPolicy& lifespan
     328             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     329             :                                   ,
     330             :                                   const OPENDDS_STRING& filterClassName,
     331             :                                   const FilterEvaluator* eval,
     332             :                                   const DDS::StringSeq& expression_params
     333             : #endif
     334             :                                   )
     335             : {
     336           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     337             :                    guard,
     338             :                    lock_,
     339             :                    DDS::RETCODE_ERROR);
     340             : 
     341           0 :   ssize_t total_size = 0;
     342           0 :   for (PublicationInstanceMapType::iterator it = instances_.begin();
     343           0 :        it != instances_.end(); ++it) {
     344           0 :     const ssize_t durable = std::min(it->second->samples_.size(),
     345           0 :                                      ssize_t(max_durable_per_instance_));
     346           0 :     total_size += durable;
     347           0 :     it->second->durable_samples_remaining_ = durable;
     348             :   }
     349             : 
     350           0 :   copy_and_prepend(resend_data_,
     351           0 :                    sending_data_,
     352             :                    reader_id,
     353             :                    lifespan,
     354             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     355             :                    filterClassName, eval, expression_params,
     356             : #endif
     357             :                    total_size);
     358             : 
     359           0 :   copy_and_prepend(resend_data_,
     360           0 :                    sent_data_,
     361             :                    reader_id,
     362             :                    lifespan,
     363             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     364             :                    filterClassName, eval, expression_params,
     365             : #endif
     366             :                    total_size);
     367             : 
     368             :   {
     369           0 :     ACE_Guard<ACE_SYNCH_MUTEX> guard(wfa_lock_);
     370           0 :     cached_cumulative_ack_valid_ = false;
     371           0 :     DisjointSequence& ds = acked_sequences_[reader_id];
     372           0 :     ds = acked_sequences_[GUID_UNKNOWN];
     373             : 
     374             :     // Remove exactly what will be sent
     375           0 :     SendStateDataSampleList::iterator iter = resend_data_.begin();
     376           0 :     while (iter != resend_data_.end()) {
     377           0 :       ds.erase(iter->get_header().sequence_);
     378           0 :       ++iter;
     379             :     }
     380           0 :   }
     381             : 
     382           0 :   if (DCPS_debug_level > 9 && resend_data_.size()) {
     383           0 :     ACE_DEBUG((LM_DEBUG,
     384             :                ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
     385             :                ACE_TEXT("domain %d topic %C publication %C copying ")
     386             :                ACE_TEXT("sending/sent to resend to %C.\n"),
     387             :                domain_id_,
     388             :                topic_name_,
     389             :                LogGuid(publication_id_).c_str(),
     390             :                LogGuid(reader_id).c_str()));
     391             :   }
     392             : 
     393           0 :   return DDS::RETCODE_OK;
     394           0 : }
     395             : 
     396             : DDS::ReturnCode_t
     397           0 : WriteDataContainer::register_instance(
     398             :   DDS::InstanceHandle_t&      instance_handle,
     399             :   Message_Block_Ptr&          registered_sample)
     400             : {
     401           0 :   PublicationInstance_rch instance;
     402             : 
     403           0 :   if (instance_handle == DDS::HANDLE_NIL) {
     404           0 :     if (max_num_instances_ > 0
     405           0 :         && max_num_instances_ <= (CORBA::Long) instances_.size()) {
     406           0 :       return DDS::RETCODE_OUT_OF_RESOURCES;
     407             :     }
     408             : 
     409             :     // registered the instance for the first time.
     410           0 :     instance.reset(new PublicationInstance(move(registered_sample)), keep_count());
     411             : 
     412           0 :     instance_handle = this->writer_->get_next_handle();
     413             : 
     414           0 :     int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
     415             : 
     416           0 :     if (0 != insert_attempt) {
     417           0 :       ACE_ERROR((LM_ERROR,
     418             :                  ACE_TEXT("(%P|%t) ERROR: ")
     419             :                  ACE_TEXT("WriteDataContainer::register_instance, ")
     420             :                  ACE_TEXT("failed to insert instance handle=%X\n"),
     421             :                  instance.in()));
     422           0 :       return DDS::RETCODE_ERROR;
     423             :     } // if (0 != insert_attempt)
     424             : 
     425           0 :     instance->instance_handle_ = instance_handle;
     426             : 
     427           0 :     extend_deadline(instance);
     428             : 
     429             :   } else {
     430             : 
     431           0 :     int const find_attempt = find(instances_, instance_handle, instance);
     432             : 
     433           0 :     if (0 != find_attempt) {
     434           0 :       ACE_ERROR((LM_ERROR,
     435             :                  ACE_TEXT("(%P|%t) ERROR: ")
     436             :                  ACE_TEXT("WriteDataContainer::register_instance, ")
     437             :                  ACE_TEXT("The provided instance handle=%X is not a valid")
     438             :                  ACE_TEXT("handle.\n"),
     439             :                  instance_handle));
     440             : 
     441           0 :       return DDS::RETCODE_ERROR;
     442             :     } // if (0 != find_attempt)
     443             :   }
     444             : 
     445             :   // The registered_sample is shallow copied.
     446           0 :   registered_sample.reset(instance->registered_sample_->duplicate());
     447             : 
     448           0 :   return DDS::RETCODE_OK;
     449           0 : }
     450             : 
     451             : DDS::ReturnCode_t
     452           0 : WriteDataContainer::unregister(
     453             :   DDS::InstanceHandle_t   instance_handle,
     454             :   Message_Block_Ptr& registered_sample,
     455             :   bool                    dup_registered_sample)
     456             : {
     457           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     458             :                    guard,
     459             :                    lock_,
     460             :                    DDS::RETCODE_ERROR);
     461             : 
     462           0 :   PublicationInstance_rch instance;
     463             :   {
     464           0 :     PublicationInstanceMapType::iterator pos = instances_.find(instance_handle);
     465           0 :     if (pos == instances_.end()) {
     466           0 :       ACE_ERROR_RETURN((LM_ERROR,
     467             :                         ACE_TEXT("(%P|%t) ERROR: ")
     468             :                         ACE_TEXT("WriteDataContainer::unregister, ")
     469             :                         ACE_TEXT("The instance(handle=%X) ")
     470             :                         ACE_TEXT("is not registered yet.\n"),
     471             :                         instance_handle),
     472             :                        DDS::RETCODE_PRECONDITION_NOT_MET);
     473             :     }
     474           0 :     instance = pos->second;
     475           0 :     instances_.erase(pos);
     476             :   }
     477             : 
     478           0 :   return remove_instance(instance, registered_sample, dup_registered_sample);
     479           0 : }
     480             : 
     481             : DDS::ReturnCode_t
     482           0 : WriteDataContainer::dispose(DDS::InstanceHandle_t instance_handle,
     483             :                             Message_Block_Ptr&    registered_sample,
     484             :                             bool                  dup_registered_sample)
     485             : {
     486           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     487             :                    guard,
     488             :                    lock_,
     489             :                    DDS::RETCODE_ERROR);
     490             : 
     491           0 :   PublicationInstance_rch instance;
     492             : 
     493           0 :   int const find_attempt = find(instances_, instance_handle, instance);
     494             : 
     495           0 :   if (0 != find_attempt) {
     496           0 :     ACE_ERROR_RETURN((LM_ERROR,
     497             :                       ACE_TEXT("(%P|%t) ERROR: ")
     498             :                       ACE_TEXT("WriteDataContainer::dispose, ")
     499             :                       ACE_TEXT("The instance(handle=%X) ")
     500             :                       ACE_TEXT("is not registered yet.\n"),
     501             :                       instance_handle),
     502             :                      DDS::RETCODE_PRECONDITION_NOT_MET);
     503             :   }
     504             : 
     505           0 :   return remove_instance(instance, registered_sample, dup_registered_sample);
     506           0 : }
     507             : 
     508             : DDS::ReturnCode_t
     509           0 : WriteDataContainer::remove_instance(PublicationInstance_rch instance,
     510             :                                     Message_Block_Ptr& registered_sample,
     511             :                                     bool dup_registered_sample)
     512             : {
     513           0 :   if (dup_registered_sample) {
     514             :     // The registered_sample is shallow copied.
     515           0 :     registered_sample.reset(instance->registered_sample_->duplicate());
     516             :   }
     517             : 
     518             :   // Note: The DDS specification is unclear as to if samples in the process
     519             :   // of being sent should be removed or not.
     520             :   // The advantage of calling remove_sample() on them is that the
     521             :   // cached allocator memory for them is freed.  The disadvantage
     522             :   // is that the slow reader may see multiple disposes without
     523             :   // any write sample between them and hence not temporarily move into the
     524             :   // Alive state.
     525             :   // We have chosen to NOT remove the sending samples.
     526           0 :   InstanceDataSampleList& instance_list = instance->samples_;
     527             : 
     528           0 :   while (instance_list.size() > 0) {
     529           0 :     bool released = false;
     530           0 :     const DDS::ReturnCode_t ret = remove_oldest_sample(instance_list, released);
     531           0 :     if (ret != DDS::RETCODE_OK) {
     532           0 :       return ret;
     533             :     }
     534             :   }
     535             : 
     536           0 :   cancel_deadline(instance);
     537             : 
     538           0 :   return DDS::RETCODE_OK;
     539             : }
     540             : 
     541             : DDS::ReturnCode_t
     542           0 : WriteDataContainer::num_samples(DDS::InstanceHandle_t handle,
     543             :                                 size_t&                 size)
     544             : {
     545           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     546             :                    guard,
     547             :                    lock_,
     548             :                    DDS::RETCODE_ERROR);
     549           0 :   PublicationInstance_rch instance;
     550             : 
     551           0 :   int const find_attempt = find(instances_, handle, instance);
     552             : 
     553           0 :   if (0 != find_attempt) {
     554           0 :     return DDS::RETCODE_ERROR;
     555             : 
     556             :   } else {
     557           0 :     size = instance->samples_.size();
     558           0 :     return DDS::RETCODE_OK;
     559             :   }
     560           0 : }
     561             : 
     562             : size_t
     563           0 : WriteDataContainer::num_all_samples()
     564             : {
     565           0 :   size_t size = 0;
     566             : 
     567           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
     568             :                    guard,
     569             :                    lock_,
     570             :                    0);
     571             : 
     572           0 :   for (PublicationInstanceMapType::iterator iter = instances_.begin();
     573           0 :        iter != instances_.end();
     574           0 :        ++iter)
     575             :   {
     576           0 :     size += iter->second->samples_.size();
     577             :   }
     578             : 
     579           0 :   return size;
     580           0 : }
     581             : 
     582             : ACE_UINT64
     583           0 : WriteDataContainer::get_unsent_data(SendStateDataSampleList& list)
     584             : {
     585             :   DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
     586             :   //
     587             :   // The samples in unsent_data are added to the local datawriter
     588             :   // list and enqueued to the sending_data_ signifying they have
     589             :   // been passed to the transport to send in a transaction
     590             :   //
     591           0 :   list = this->unsent_data_;
     592             : 
     593             :   // Increment send counter for this send operation
     594           0 :   ++transaction_id_;
     595             : 
     596             :   // Mark all samples with current send counter
     597           0 :   SendStateDataSampleList::iterator iter = list.begin();
     598           0 :   while (iter != list.end()) {
     599           0 :     iter->set_transaction_id(this->transaction_id_);
     600           0 :     ++iter;
     601             :   }
     602             : 
     603             :   //
     604             :   // The unsent_data_ already linked with the
     605             :   // next_send_sample during enqueue.
     606             :   // Append the unsent_data_ to current sending_data_
     607             :   // list.
     608           0 :   sending_data_.enqueue_tail(list);
     609             : 
     610             :   //
     611             :   // Clear the unsent data list.
     612             :   //
     613           0 :   this->unsent_data_.reset();
     614             : 
     615             :   //
     616             :   // Return the moved list.
     617             :   //
     618           0 :   return transaction_id_;
     619             : }
     620             : 
     621             : SendStateDataSampleList
     622           0 : WriteDataContainer::get_resend_data()
     623             : {
     624             :   DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
     625             : 
     626             :   //
     627             :   // The samples in unsent_data are added to the sending_data
     628             :   // during enqueue.
     629             :   //
     630           0 :   SendStateDataSampleList list = this->resend_data_;
     631             : 
     632             :   //
     633             :   // Clear the unsent data list.
     634             :   //
     635           0 :   this->resend_data_.reset();
     636             :   //
     637             :   // Return the moved list.
     638             :   //
     639           0 :   return list;
     640             : }
     641             : 
     642             : bool
     643           0 : WriteDataContainer::pending_data()
     644             : {
     645           0 :   return this->sending_data_.size() != 0
     646           0 :          || this->orphaned_to_transport_.size() != 0
     647           0 :          || this->unsent_data_.size() != 0;
     648             : }
     649             : 
     650             : void
     651           0 : WriteDataContainer::data_delivered(const DataSampleElement* sample)
     652             : {
     653             :   DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
     654             : 
     655           0 :   if (DCPS_debug_level >= 2) {
     656           0 :     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
     657             :                           ACE_TEXT(" %@\n"), sample));
     658             :   }
     659             : 
     660           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
     661             :             guard,
     662             :             lock_);
     663             : 
     664             :   // Delivered samples _must_ be on sending_data_ list
     665             : 
     666             :   // If it is not found in one of the lists, an invariant
     667             :   // exception is declared.
     668             : 
     669             :   // The element now needs to be removed from the sending_data_
     670             :   // list, and appended to the end of the sent_data_ list here
     671             : 
     672           0 :   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
     673             : 
     674             :   // If sample is on a SendStateDataSampleList it should be on the
     675             :   // sending_data_ list signifying it was given to the transport to
     676             :   // deliver and now the transport is signaling it has been delivered
     677           0 :   if (!sending_data_.dequeue(sample)) {
     678             :     //
     679             :     // Should be on sending_data_.  If it is in sent_data_
     680             :     // or unsent_data there was a problem.
     681             :     //
     682             :     SendStateDataSampleList* send_lists[] = {
     683           0 :       &sent_data_,
     684           0 :       &unsent_data_,
     685           0 :       &orphaned_to_transport_};
     686             :     const SendStateDataSampleList* containing_list =
     687           0 :       SendStateDataSampleList::send_list_containing_element(stale, send_lists);
     688             : 
     689           0 :     if (containing_list == &sent_data_) {
     690           0 :       ACE_ERROR((LM_WARNING,
     691             :                  ACE_TEXT("(%P|%t) WARNING: ")
     692             :                  ACE_TEXT("WriteDataContainer::data_delivered, ")
     693             :                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
     694             :                  ACE_TEXT("WAS IN sent_data_.\n")));
     695           0 :     } else if (containing_list == &unsent_data_) {
     696           0 :       ACE_ERROR((LM_WARNING,
     697             :                  ACE_TEXT("(%P|%t) WARNING: ")
     698             :                  ACE_TEXT("WriteDataContainer::data_delivered, ")
     699             :                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
     700             :                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
     701             :     } else {
     702             : 
     703             :       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
     704             :       //and inform transport of their release.  Transport will call data-delivered on the
     705             :       //elements as it processes the removal but they will already be gone from the send lists.
     706           0 :       if (stale->get_header().message_id_ != SAMPLE_DATA) {
     707             :         //this message was a control message so release it
     708           0 :         if (DCPS_debug_level > 9) {
     709           0 :           ACE_DEBUG((LM_DEBUG,
     710             :                      ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
     711             :                      ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
     712             :                      domain_id_,
     713             :                      topic_name_,
     714             :                      LogGuid(publication_id_).c_str()));
     715             :         }
     716           0 :         writer_->controlTracker.message_delivered();
     717             :       }
     718             : 
     719           0 :       if (containing_list == &orphaned_to_transport_) {
     720           0 :         orphaned_to_transport_.dequeue(sample);
     721           0 :         release_buffer(stale);
     722             : 
     723           0 :       } else if (!containing_list) {
     724             :         // samples that were retrieved from get_resend_data()
     725           0 :         ACE_Guard<ACE_SYNCH_MUTEX> wfa_guard(wfa_lock_);
     726           0 :         const CORBA::ULong num_subs = stale->get_num_subs();
     727           0 :         for (CORBA::ULong i = 0; i < num_subs; ++i) {
     728           0 :           update_acked(stale->get_header().sequence_, stale->get_sub_id(i));
     729             :         }
     730           0 :         wfa_guard.release();
     731           0 :         SendStateDataSampleList::remove(stale);
     732           0 :         release_buffer(stale);
     733           0 :       }
     734             : 
     735           0 :       if (!pending_data()) {
     736           0 :         empty_condition_.notify_all();
     737             :       }
     738             :     }
     739             : 
     740           0 :     return;
     741             :   }
     742           0 :   ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, wfa_lock_);
     743           0 :   SequenceNumber acked_seq = stale->get_header().sequence_;
     744           0 :   SequenceNumber prev_max = get_cumulative_ack();
     745             : 
     746           0 :   if (stale->get_header().message_id_ != SAMPLE_DATA) {
     747             :     //this message was a control message so release it
     748           0 :     if (DCPS_debug_level > 9) {
     749           0 :       ACE_DEBUG((LM_DEBUG,
     750             :                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
     751             :                  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
     752             :                  domain_id_,
     753             :                  topic_name_,
     754             :                  LogGuid(publication_id_).c_str()));
     755             :     }
     756           0 :     release_buffer(stale);
     757           0 :     stale = 0;
     758           0 :     writer_->controlTracker.message_delivered();
     759             :   } else {
     760             : 
     761           0 :     if (max_durable_per_instance_ && !shutdown_ && InstanceDataSampleList::on_some_list(sample)) {
     762           0 :       const_cast<DataSampleElement*>(sample)->get_header().historic_sample_ = true;
     763           0 :       DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
     764           0 :       sent_data_.enqueue_tail(sample);
     765             : 
     766             :     } else {
     767           0 :       if (InstanceDataSampleList::on_some_list(sample)) {
     768           0 :         PublicationInstance_rch inst = sample->get_handle();
     769           0 :         inst->samples_.dequeue(sample);
     770           0 :       }
     771           0 :       release_buffer(stale);
     772           0 :       stale = 0;
     773             :     }
     774             : 
     775           0 :     if (DCPS_debug_level > 9) {
     776           0 :       ACE_DEBUG((LM_DEBUG,
     777             :                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
     778             :                  ACE_TEXT("domain %d topic %C publication %C seq# %q %s.\n"),
     779             :                  domain_id_,
     780             :                  topic_name_,
     781             :                  LogGuid(publication_id_).c_str(),
     782             :                  acked_seq.getValue(),
     783             :                  max_durable_per_instance_
     784             :                  ? ACE_TEXT("stored for durability")
     785             :                  : ACE_TEXT("released")));
     786             :     }
     787             : 
     788           0 :     wakeup_blocking_writers(stale);
     789             :   }
     790           0 :   if (DCPS_debug_level > 9) {
     791           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
     792             :                          ACE_TEXT("Inserting acked_sequence: %q\n"),
     793             :                          acked_seq.getValue()));
     794             :   }
     795             : 
     796           0 :   update_acked(acked_seq);
     797             : 
     798           0 :   if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
     799           0 :       prev_max < get_cumulative_ack()) {
     800             : 
     801           0 :     if (DCPS_debug_level > 9) {
     802           0 :       ACE_DEBUG((LM_DEBUG,
     803             :                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
     804             :                  ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
     805             :     }
     806             : 
     807           0 :     wfa_condition_.notify_all();
     808             :   }
     809             : 
     810             :   // Signal if there is no pending data.
     811           0 :   if (!pending_data()) {
     812           0 :     empty_condition_.notify_all();
     813             :   }
     814           0 : }
     815             : 
     816             : void
     817           0 : WriteDataContainer::data_dropped(const DataSampleElement* sample,
     818             :                                  bool dropped_by_transport)
     819             : {
     820             :   DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
     821             : 
     822           0 :   if (DCPS_debug_level >= 2) {
     823           0 :     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
     824             :                           ACE_TEXT(" sample %X dropped_by_transport %d\n"),
     825             :                           sample, dropped_by_transport));
     826             :   }
     827             : 
     828             :   // If the transport initiates the data dropping, we need do same thing
     829             :   // as data_delivered. e.g. remove the sample from the internal list
     830             :   // and the instance list. We do not need acquire the lock here since
     831             :   // the data_delivered acquires the lock.
     832           0 :   if (dropped_by_transport) {
     833           0 :     data_delivered(sample);
     834           0 :     return;
     835             :   }
     836             : 
     837             :   //The data_dropped could be called from the thread initiating sample remove
     838             :   //which already hold the lock. In this case, it's not necessary to acquire
     839             :   //lock here. It also could be called from the transport thread in a delayed
     840             :   //notification, it's necessary to acquire lock here to protect the internal
     841             :   //structures in this class.
     842             : 
     843           0 :   ACE_GUARD (ACE_Recursive_Thread_Mutex,
     844             :     guard,
     845             :     lock_);
     846             : 
     847             :   // The dropped sample should be in the sending_data_ list.
     848             :   // Otherwise an exception will be raised.
     849             :   //
     850             :   // We are now been notified by transport, so we can
     851             :   // keep the sample from the sending_data_ list still in
     852             :   // sample list since we will send it.
     853             : 
     854           0 :   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
     855             : 
     856             :   // If sample is on a SendStateDataSampleList it should be on the
     857             :   // sending_data_ list signifying it was given to the transport to
     858             :   // deliver and now the transport is signaling it has been dropped
     859             : 
     860           0 :   if (sending_data_.dequeue(sample)) {
     861             :     // else: The data_dropped is called as a result of remove_sample()
     862             :     // called from reenqueue_all() which supports the TRANSIENT_LOCAL
     863             :     // qos. The samples that are sending by transport are dropped from
     864             :     // transport and will be moved to the unsent list for resend.
     865           0 :     if (!shutdown_ && InstanceDataSampleList::on_some_list(sample)) {
     866           0 :       unsent_data_.enqueue_tail(sample);
     867             :     } else {
     868           0 :       SendStateDataSampleList::remove(stale);
     869           0 :       release_buffer(stale);
     870           0 :       stale = 0;
     871             :     }
     872             : 
     873             :   } else {
     874             :     //
     875             :     // If it is in sent_data_ or unsent_data there was a problem.
     876             :     //
     877             :     SendStateDataSampleList* send_lists[] = {
     878           0 :       &sent_data_,
     879           0 :       &unsent_data_,
     880           0 :       &orphaned_to_transport_};
     881             :     const SendStateDataSampleList* containing_list =
     882           0 :       SendStateDataSampleList::send_list_containing_element(stale, send_lists);
     883             : 
     884           0 :     if (containing_list == &sent_data_) {
     885           0 :       ACE_ERROR((LM_WARNING,
     886             :                  ACE_TEXT("(%P|%t) WARNING: ")
     887             :                  ACE_TEXT("WriteDataContainer::data_dropped, ")
     888             :                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
     889             :                  ACE_TEXT("WAS IN sent_data_.\n")));
     890           0 :     } else if (containing_list == &unsent_data_) {
     891           0 :       ACE_ERROR((LM_WARNING,
     892             :                  ACE_TEXT("(%P|%t) WARNING: ")
     893             :                  ACE_TEXT("WriteDataContainer::data_dropped, ")
     894             :                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
     895             :                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
     896             :     } else {
     897             : 
     898             :       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
     899             :       //and inform transport of their release.  Transport will call data-dropped on the
     900             :       //elements as it processes the removal but they will already be gone from the send lists.
     901           0 :       if (stale->get_header().message_id_ != SAMPLE_DATA) {
     902             :         //this message was a control message so release it
     903           0 :         if (DCPS_debug_level > 9) {
     904           0 :           ACE_DEBUG((LM_DEBUG,
     905             :                      ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
     906             :                      ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
     907             :                      domain_id_,
     908             :                      topic_name_,
     909             :                      LogGuid(publication_id_).c_str()));
     910             :         }
     911           0 :         writer_->controlTracker.message_dropped();
     912             :       }
     913             : 
     914           0 :       if (containing_list == &orphaned_to_transport_) {
     915           0 :         orphaned_to_transport_.dequeue(sample);
     916           0 :         release_buffer(stale);
     917           0 :         stale = 0;
     918           0 :         if (!pending_data()) {
     919           0 :           empty_condition_.notify_all();
     920             :         }
     921             : 
     922           0 :       } else if (!containing_list) {
     923             :         // samples that were retrieved from get_resend_data()
     924           0 :         SendStateDataSampleList::remove(stale);
     925           0 :         release_buffer(stale);
     926           0 :         stale = 0;
     927             :       }
     928             :     }
     929             : 
     930           0 :     return;
     931             :   }
     932             : 
     933           0 :   wakeup_blocking_writers(stale);
     934             : 
     935           0 :   if (!pending_data()) {
     936           0 :     empty_condition_.notify_all();
     937             :   }
     938           0 : }
     939             : 
     940             : void
     941           0 : WriteDataContainer::remove_excess_durable()
     942             : {
     943           0 :   if (!max_durable_per_instance_)
     944           0 :     return;
     945             : 
     946           0 :   size_t n_released = 0;
     947             : 
     948           0 :   for (PublicationInstanceMapType::iterator iter = instances_.begin();
     949           0 :        iter != instances_.end();
     950           0 :        ++iter) {
     951             : 
     952           0 :     CORBA::Long durable_allowed = max_durable_per_instance_;
     953           0 :     InstanceDataSampleList& instance_list = iter->second->samples_;
     954             : 
     955           0 :     for (DataSampleElement* it = instance_list.tail(), *prev; it; it = prev) {
     956           0 :       prev = InstanceDataSampleList::prev(it);
     957             : 
     958           0 :       if (DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, it->get_sample())) {
     959             : 
     960           0 :         if (durable_allowed) {
     961           0 :           --durable_allowed;
     962             :         } else {
     963           0 :           instance_list.dequeue(it);
     964           0 :           sent_data_.dequeue(it);
     965           0 :           release_buffer(it);
     966           0 :           ++n_released;
     967             :         }
     968             :       }
     969             :     }
     970             :   }
     971             : 
     972           0 :   if (n_released && DCPS_debug_level > 9) {
     973           0 :     ACE_DEBUG((LM_DEBUG,
     974             :                ACE_TEXT("(%P|%t) WriteDataContainer::remove_excess_durable: ")
     975             :                ACE_TEXT("domain %d topic %C publication %C %B samples removed ")
     976             :                ACE_TEXT("from durable data.\n"), domain_id_, topic_name_,
     977             :                LogGuid(publication_id_).c_str(), n_released));
     978             :   }
     979             : }
     980             : 
     981             : 
     982             : DDS::ReturnCode_t
     983           0 : WriteDataContainer::remove_oldest_sample(
     984             :   InstanceDataSampleList& instance_list,
     985             :   bool& released)
     986             : {
     987           0 :   DataSampleElement* stale = 0;
     988             : 
     989             :   //
     990             :   // Remove the oldest sample from the instance list.
     991             :   //
     992           0 :   if (!instance_list.dequeue_head(stale)) {
     993           0 :     ACE_ERROR_RETURN((LM_ERROR,
     994             :                       ACE_TEXT("(%P|%t) ERROR: ")
     995             :                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
     996             :                       ACE_TEXT("dequeue_head_next_sample failed\n")),
     997             :                      DDS::RETCODE_ERROR);
     998             :   }
     999             : 
    1000             :   //
    1001             :   // Remove the stale data from the next_writer_sample_ list.  The
    1002             :   // sending_data_/next_send_sample_ list is not managed within the
    1003             :   // container, it is only used external to the container and does
    1004             :   // not need to be managed internally.
    1005             :   //
    1006             :   // The next_writer_sample_ link is being used in one of the sent_data_,
    1007             :   // sending_data_, or unsent_data lists.  Removal from the doubly
    1008             :   // linked list needs to repair the list only when the stale sample
    1009             :   // is either the head or tail of the list.
    1010             :   //
    1011             : 
    1012             :   //
    1013             :   // Locate the head of the list that the stale data is in.
    1014             :   //
    1015             :   SendStateDataSampleList* send_lists[] = {
    1016           0 :     &sending_data_,
    1017           0 :     &sent_data_,
    1018           0 :     &unsent_data_,
    1019           0 :     &orphaned_to_transport_};
    1020             :   const SendStateDataSampleList* containing_list =
    1021           0 :     SendStateDataSampleList::send_list_containing_element(stale, send_lists);
    1022             : 
    1023             :   //
    1024             :   // Identify the list that the stale data is in.
    1025             :   // The stale data should be in one of the sent_data_, sending_data_
    1026             :   // or unsent_data_. It should not be in released_data_ list since
    1027             :   // this function is the only place a sample is moved from
    1028             :   // sending_data_ to released_data_ list.
    1029             : 
    1030             :   // Remove the element from the internal list.
    1031           0 :   bool result = false;
    1032             : 
    1033           0 :   if (containing_list == &this->sending_data_) {
    1034           0 :     if (DCPS_debug_level > 2) {
    1035           0 :       ACE_ERROR((LM_WARNING,
    1036             :                  ACE_TEXT("(%P|%t) WARNING: ")
    1037             :                  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
    1038             :                  ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
    1039             :     }
    1040             : 
    1041             :     // This means transport is still using the sample that needs to
    1042             :     // be released currently so notify transport that sample is being removed.
    1043             : 
    1044           0 :     if (this->writer_->remove_sample(stale)) {
    1045           0 :       if (this->sent_data_.dequeue(stale)) {
    1046           0 :         release_buffer(stale);
    1047             :       }
    1048           0 :       result = true;
    1049             : 
    1050             :     } else {
    1051           0 :       if (this->sending_data_.dequeue(stale)) {
    1052           0 :         this->orphaned_to_transport_.enqueue_tail(stale);
    1053           0 :       } else if (this->sent_data_.dequeue(stale)) {
    1054           0 :         release_buffer(stale);
    1055           0 :         result = true;
    1056             :       }
    1057           0 :       result = true;
    1058             :     }
    1059           0 :     released = true;
    1060             : 
    1061           0 :   } else if (containing_list == &this->sent_data_) {
    1062             :     // No one is using the data sample, so we can release it back to
    1063             :     // its allocator.
    1064             :     //
    1065           0 :     result = this->sent_data_.dequeue(stale) != 0;
    1066           0 :     release_buffer(stale);
    1067           0 :     released = true;
    1068             : 
    1069           0 :     if (DCPS_debug_level > 9) {
    1070           0 :       ACE_DEBUG((LM_DEBUG,
    1071             :                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
    1072             :                  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
    1073             :                  this->domain_id_,
    1074             :                  this->topic_name_,
    1075             :                  LogGuid(publication_id_).c_str()));
    1076             :     }
    1077             : 
    1078           0 :   } else if (containing_list == &this->unsent_data_) {
    1079             :     //
    1080             :     // No one is using the data sample, so we can release it back to
    1081             :     // its allocator.
    1082             :     //
    1083           0 :     result = this->unsent_data_.dequeue(stale) != 0;
    1084           0 :     release_buffer(stale);
    1085           0 :     released = true;
    1086             : 
    1087           0 :     if (DCPS_debug_level > 9) {
    1088           0 :       ACE_DEBUG((LM_DEBUG,
    1089             :                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
    1090             :                  ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
    1091             :                  this->domain_id_,
    1092             :                  this->topic_name_,
    1093             :                  LogGuid(publication_id_).c_str()));
    1094             :     }
    1095             :   } else {
    1096           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1097             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1098             :                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
    1099             :                       ACE_TEXT("The oldest sample is not in any internal list.\n")),
    1100             :                      DDS::RETCODE_ERROR);
    1101             :   }
    1102             : 
    1103           0 :   if (!pending_data()) {
    1104           0 :     empty_condition_.notify_all();
    1105             :   }
    1106             : 
    1107           0 :   if (!result) {
    1108           0 :     ACE_ERROR_RETURN((LM_ERROR,
    1109             :                       ACE_TEXT("(%P|%t) ERROR: ")
    1110             :                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
    1111             :                       ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
    1112             :                      DDS::RETCODE_ERROR);
    1113             : 
    1114             :   }
    1115             : 
    1116           0 :   return DDS::RETCODE_OK;
    1117             : }
    1118             : 
    1119             : DDS::ReturnCode_t
    1120           0 : WriteDataContainer::obtain_buffer_for_control(DataSampleElement*& element)
    1121             : {
    1122             :   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
    1123             : 
    1124           0 :   ACE_NEW_MALLOC_RETURN(
    1125             :     element,
    1126             :     static_cast<DataSampleElement*>(
    1127             :       sample_list_element_allocator_.malloc(
    1128             :         sizeof(DataSampleElement))),
    1129             :     DataSampleElement(publication_id_,
    1130             :                       this->writer_,
    1131             :                       PublicationInstance_rch()),
    1132             :     DDS::RETCODE_ERROR);
    1133             : 
    1134           0 :   return DDS::RETCODE_OK;
    1135             : }
    1136             : 
    1137             : DDS::ReturnCode_t
    1138           0 : WriteDataContainer::obtain_buffer(DataSampleElement*& element,
    1139             :                                   DDS::InstanceHandle_t handle)
    1140             : {
    1141             :   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
    1142             : 
    1143           0 :   remove_excess_durable();
    1144             : 
    1145           0 :   PublicationInstance_rch instance = get_handle_instance(handle);
    1146             : 
    1147           0 :   if (!instance) {
    1148           0 :     return DDS::RETCODE_BAD_PARAMETER;
    1149             :   }
    1150             : 
    1151           0 :   ACE_NEW_MALLOC_RETURN(
    1152             :     element,
    1153             :     static_cast<DataSampleElement*>(
    1154             :       sample_list_element_allocator_.malloc(
    1155             :         sizeof(DataSampleElement))),
    1156             :     DataSampleElement(publication_id_,
    1157             :                           this->writer_,
    1158             :                           instance),
    1159             :     DDS::RETCODE_ERROR);
    1160             : 
    1161             :   // Extract the current instance queue.
    1162           0 :   InstanceDataSampleList& instance_list = instance->samples_;
    1163           0 :   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
    1164             : 
    1165           0 :   bool set_timeout = true;
    1166           0 :   MonotonicTimePoint timeout;
    1167             : 
    1168             :   //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and
    1169             :   //max_instances and max_instances * depth
    1170           0 :   ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
    1171           0 :   while ((instance_list.size() >= max_samples_per_instance_) ||
    1172           0 :          ((this->max_num_samples_ > 0) &&
    1173           0 :          ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
    1174             : 
    1175           0 :     if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
    1176           0 :       if (instance_list.size() >= history_depth_) {
    1177           0 :         if (DCPS_debug_level >= 2) {
    1178           0 :           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
    1179             :                      ACE_TEXT(" instance %d attempting to remove")
    1180             :                      ACE_TEXT(" its oldest sample (reliable)\n"),
    1181             :                      handle));
    1182             :         }
    1183           0 :         bool oldest_released = false;
    1184           0 :         ret = remove_oldest_sample(instance_list, oldest_released);
    1185           0 :         if (oldest_released) {
    1186           0 :           break;
    1187             :         }
    1188             :       }
    1189             :       // Reliable writers can wait
    1190           0 :       if (set_timeout) {
    1191           0 :         timeout = MonotonicTimePoint::now() + TimeDuration(max_blocking_time_);
    1192           0 :         set_timeout = false;
    1193             :       }
    1194           0 :       if (!shutdown_ && MonotonicTimePoint::now() < timeout) {
    1195           0 :         if (DCPS_debug_level >= 2) {
    1196           0 :           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
    1197             :                                 ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
    1198             :                       handle));
    1199             :         }
    1200             : 
    1201           0 :         waiting_on_release_ = true;
    1202           0 :         switch (condition_.wait_until(timeout, thread_status_manager)) {
    1203           0 :         case CvStatus_NoTimeout:
    1204           0 :           remove_excess_durable();
    1205           0 :           break;
    1206             : 
    1207           0 :         case CvStatus_Timeout:
    1208           0 :           if (DCPS_debug_level >= 2) {
    1209           0 :             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
    1210             :               ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
    1211             :               handle));
    1212             :           }
    1213           0 :           ret = DDS::RETCODE_TIMEOUT;
    1214           0 :           break;
    1215             : 
    1216           0 :         case CvStatus_Error:
    1217           0 :           if (DCPS_debug_level) {
    1218           0 :             ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::obtain_buffer: "
    1219             :               "error in wait_until\n"));
    1220             :           }
    1221           0 :           ret = DDS::RETCODE_ERROR;
    1222           0 :           break;
    1223             :         }
    1224             : 
    1225             :       } else {
    1226             :         //either shutdown has been signaled or max_blocking_time
    1227             :         //has surpassed so treat as timeout
    1228           0 :         ret = DDS::RETCODE_TIMEOUT;
    1229             :       }
    1230             : 
    1231             :     } else {
    1232             :       //BEST EFFORT
    1233           0 :       bool oldest_released = false;
    1234             : 
    1235             :       //try to remove stale samples from this instance
    1236             :       // The remove_oldest_sample() method removes the oldest sample
    1237             :       // from instance list and removes it from the internal lists.
    1238           0 :       if (instance_list.size() > 0) {
    1239           0 :         if (DCPS_debug_level >= 2) {
    1240           0 :           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
    1241             :                                 ACE_TEXT(" instance %d attempting to remove")
    1242             :                                 ACE_TEXT(" its oldest sample\n"),
    1243             :                                 handle));
    1244             :         }
    1245           0 :         ret = remove_oldest_sample(instance_list, oldest_released);
    1246             :       }
    1247             :       //else try to remove stale samples from other instances which are full
    1248           0 :       if (ret == DDS::RETCODE_OK && !oldest_released) {
    1249           0 :         if (DCPS_debug_level >= 2) {
    1250           0 :           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
    1251             :                                 ACE_TEXT(" instance %d attempting to remove")
    1252             :                                 ACE_TEXT(" oldest sample from any full instances\n"),
    1253             :                                 handle));
    1254             :         }
    1255           0 :         PublicationInstanceMapType::iterator it = instances_.begin();
    1256             : 
    1257           0 :         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
    1258           0 :           if (it->second->samples_.size() >= max_samples_per_instance_) {
    1259           0 :             ret = remove_oldest_sample(it->second->samples_, oldest_released);
    1260             :           }
    1261           0 :           ++it;
    1262             :         }
    1263             :       }
    1264             :       //else try to remove stale samples from other non-full instances
    1265           0 :       if (ret == DDS::RETCODE_OK && !oldest_released) {
    1266           0 :         if (DCPS_debug_level >= 2) {
    1267           0 :           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
    1268             :                                 ACE_TEXT(" instance %d attempting to remove")
    1269             :                                 ACE_TEXT(" oldest sample from any instance with samples currently\n"),
    1270             :                                 handle));
    1271             :         }
    1272           0 :         PublicationInstanceMapType::iterator it = instances_.begin();
    1273             : 
    1274           0 :         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
    1275           0 :           if (it->second->samples_.size() > 0) {
    1276           0 :             ret = remove_oldest_sample(it->second->samples_, oldest_released);
    1277             :           }
    1278           0 :           ++it;
    1279             :         }
    1280             :       }
    1281           0 :       if (!oldest_released) {
    1282             :         //This means that no instances have samples to remove and yet
    1283             :         //still hitting resource limits.
    1284           0 :         ACE_ERROR((LM_ERROR,
    1285             :                    ACE_TEXT("(%P|%t) ERROR: ")
    1286             :                    ACE_TEXT("WriteDataContainer::obtain_buffer, ")
    1287             :                    ACE_TEXT("hitting resource limits with no samples to remove\n")));
    1288           0 :         ret = DDS::RETCODE_ERROR;
    1289             :       }
    1290             :     }  //END BEST EFFORT
    1291             : 
    1292           0 :     if (ret != DDS::RETCODE_OK) {
    1293           0 :       if (DCPS_debug_level >= 2) {
    1294           0 :         ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
    1295             :                               ACE_TEXT(" instance %d could not obtain buffer for sample")
    1296             :                               ACE_TEXT(" releasing allotted sample and returning\n"),
    1297             :                               handle));
    1298             :       }
    1299           0 :       this->release_buffer(element);
    1300           0 :       return ret;
    1301             :     }
    1302             :   }  //END WHILE
    1303             : 
    1304           0 :   data_holder_.enqueue_tail(element);
    1305             : 
    1306           0 :   return ret;
    1307           0 : }
    1308             : 
    1309             : void
    1310           0 : WriteDataContainer::release_buffer(DataSampleElement* element)
    1311             : {
    1312           0 :   if (element->get_header().message_id_ == SAMPLE_DATA)
    1313           0 :     data_holder_.dequeue(element);
    1314             :   // Release the memory to the allocator.
    1315           0 :   ACE_DES_FREE(element,
    1316             :                sample_list_element_allocator_.free,
    1317             :                DataSampleElement);
    1318           0 : }
    1319             : 
    1320             : void
    1321           0 : WriteDataContainer::unregister_all()
    1322             : {
    1323             :   DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
    1324           0 :   shutdown_ = true;
    1325             : 
    1326             :   //The internal list needs protection since this call may result from the
    1327             :   //the delete_datawriter call which does not acquire the lock in advance.
    1328           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
    1329             :             guard,
    1330             :             lock_);
    1331             :   // Tell transport remove all control messages currently
    1332             :   // transport is processing.
    1333           0 :   (void) this->writer_->remove_all_msgs();
    1334             : 
    1335             :   // Broadcast to wake up all waiting threads.
    1336           0 :   if (waiting_on_release_) {
    1337           0 :     condition_.notify_all();
    1338             :   }
    1339             : 
    1340           0 :   Message_Block_Ptr registered_sample;
    1341             : 
    1342           0 :   for (PublicationInstanceMapType::iterator pos = instances_.begin(), limit = instances_.end(); pos != limit;) {
    1343             :     // Release the instance data.
    1344           0 :     if (remove_instance(pos->second, registered_sample, false) != DDS::RETCODE_OK) {
    1345           0 :       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::unregister_all, "
    1346             :                  "remove_instance %X failed\n", pos->first));
    1347             :     }
    1348             : 
    1349           0 :     writer_->return_handle(pos->first);
    1350           0 :     instances_.erase(pos++);
    1351             :   }
    1352           0 : }
    1353             : 
    1354             : PublicationInstance_rch
    1355           0 : WriteDataContainer::get_handle_instance(DDS::InstanceHandle_t handle)
    1356             : {
    1357           0 :   PublicationInstance_rch instance;
    1358             : 
    1359           0 :   if (0 != find(instances_, handle, instance)) {
    1360           0 :     ACE_DEBUG((LM_DEBUG,
    1361             :                ACE_TEXT("(%P|%t) ")
    1362             :                ACE_TEXT("WriteDataContainer::get_handle_instance, ")
    1363             :                ACE_TEXT("lookup for %d failed\n"), handle));
    1364             :   }
    1365             : 
    1366           0 :   return instance;
    1367           0 : }
    1368             : 
    1369             : void
    1370           0 : WriteDataContainer::copy_and_prepend(SendStateDataSampleList& list,
    1371             :                                      const SendStateDataSampleList& appended,
    1372             :                                      const GUID_t& reader_id,
    1373             :                                      const DDS::LifespanQosPolicy& lifespan,
    1374             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
    1375             :                                      const OPENDDS_STRING& filterClassName,
    1376             :                                      const FilterEvaluator* eval,
    1377             :                                      const DDS::StringSeq& params,
    1378             : #endif
    1379             :                                      ssize_t& max_resend_samples)
    1380             : {
    1381           0 :   for (SendStateDataSampleList::const_reverse_iterator cur = appended.rbegin();
    1382           0 :        cur != appended.rend() && max_resend_samples; ++cur) {
    1383             : 
    1384           0 :     if (resend_data_expired(*cur, lifespan))
    1385           0 :       continue;
    1386             : 
    1387             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
    1388           0 :     if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
    1389           0 :       continue;
    1390             : #endif
    1391             : 
    1392           0 :     PublicationInstance_rch inst = cur->get_handle();
    1393             : 
    1394           0 :     if (!inst) {
    1395             :       // *cur is a control message, just skip it
    1396           0 :       continue;
    1397             :     }
    1398             : 
    1399           0 :     if (inst->durable_samples_remaining_ == 0)
    1400           0 :       continue;
    1401           0 :     --inst->durable_samples_remaining_;
    1402             : 
    1403           0 :     DataSampleElement* element = 0;
    1404           0 :     ACE_NEW_MALLOC(element,
    1405             :                    static_cast<DataSampleElement*>(
    1406             :                      sample_list_element_allocator_.malloc(
    1407             :                        sizeof(DataSampleElement))),
    1408             :                    DataSampleElement(*cur));
    1409             : 
    1410           0 :     element->set_num_subs(1);
    1411           0 :     element->set_sub_id(0, reader_id);
    1412             : 
    1413           0 :     if (DCPS_debug_level > 9) {
    1414           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::copy_and_prepend added seq# %q\n",
    1415             :                  cur->get_header().sequence_.getValue()));
    1416             :     }
    1417             : 
    1418           0 :     list.enqueue_head(element);
    1419           0 :     --max_resend_samples;
    1420           0 :   }
    1421             : }
    1422             : 
    1423             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
    1424             : bool
    1425           0 : WriteDataContainer::persist_data()
    1426             : {
    1427           0 :   bool result = true;
    1428             : 
    1429             :   // ------------------------------------------------------------
    1430             :   // Transfer sent data to data DURABILITY cache.
    1431             :   // ------------------------------------------------------------
    1432           0 :   if (this->durability_cache_) {
    1433             :     // A data durability cache is available for TRANSIENT or
    1434             :     // PERSISTENT data durability.  Cache the data samples.
    1435             : 
    1436             :     //
    1437             :     //  We only cache data that is not still in use outside of
    1438             :     //  this instance of WriteDataContainer
    1439             :     //  (only cache samples in sent_data_ meaning transport has delivered).
    1440             :     bool const inserted =
    1441           0 :       this->durability_cache_->insert(this->domain_id_,
    1442           0 :                                       this->topic_name_,
    1443           0 :                                       this->type_name_,
    1444           0 :                                       this->sent_data_,
    1445             :                                       this->durability_service_
    1446             :                                      );
    1447             : 
    1448           0 :     result = inserted;
    1449             : 
    1450           0 :     if (!inserted)
    1451           0 :       ACE_ERROR((LM_ERROR,
    1452             :                  ACE_TEXT("(%P|%t) ERROR: ")
    1453             :                  ACE_TEXT("WriteDataContainer::persist_data, ")
    1454             :                  ACE_TEXT("failed to make data durable for ")
    1455             :                  ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
    1456             :                  this->domain_id_,
    1457             :                  this->topic_name_,
    1458             :                  this->type_name_));
    1459             :   }
    1460             : 
    1461           0 :   return result;
    1462             : }
    1463             : #endif
    1464             : 
    1465           0 : void WriteDataContainer::wait_pending(const MonotonicTimePoint& deadline)
    1466             : {
    1467           0 :   const bool no_deadline = deadline.is_zero();
    1468           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
    1469           0 :   const bool report = DCPS_debug_level > 0 && pending_data();
    1470           0 :   if (report) {
    1471           0 :     if (no_deadline) {
    1472           0 :       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending no timeout\n")));
    1473             :     } else {
    1474           0 :       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending ")
    1475             :         ACE_TEXT("timeout at %#T\n"),
    1476             :         &deadline.value()));
    1477             :     }
    1478             :   }
    1479             : 
    1480           0 :   bool loop = true;
    1481           0 :   ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
    1482           0 :   while (loop && pending_data()) {
    1483           0 :     switch (empty_condition_.wait_until(deadline, thread_status_manager)) {
    1484           0 :     case CvStatus_NoTimeout:
    1485           0 :       break;
    1486             : 
    1487           0 :     case CvStatus_Timeout:
    1488           0 :       if (pending_data()) {
    1489           0 :         if (DCPS_debug_level >= 2) {
    1490           0 :           ACE_DEBUG((LM_INFO, "(%P|%t) WriteDataContainer::wait_pending: "
    1491             :             "Timed out waiting for messages to be transported\n"));
    1492           0 :           log_send_state_lists("WriteDataContainer::wait_pending - wait timedout: ");
    1493             :         }
    1494             :       }
    1495           0 :       loop = false;
    1496           0 :       break;
    1497             : 
    1498           0 :     case CvStatus_Error:
    1499           0 :       if (DCPS_debug_level) {
    1500           0 :         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::wait_pending: "
    1501             :           "error in wait_until\n"));
    1502             :       }
    1503           0 :       loop = false;
    1504           0 :       break;
    1505             :     }
    1506             :   }
    1507           0 :   if (report) {
    1508           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending done\n")));
    1509             :   }
    1510           0 : }
    1511             : 
    1512             : void
    1513           0 : WriteDataContainer::get_instance_handles(InstanceHandleVec& instance_handles)
    1514             : {
    1515           0 :   ACE_GUARD(ACE_Recursive_Thread_Mutex,
    1516             :             guard,
    1517             :             lock_);
    1518           0 :   PublicationInstanceMapType::iterator it = instances_.begin();
    1519             : 
    1520           0 :   while (it != instances_.end()) {
    1521           0 :     instance_handles.push_back(it->second->instance_handle_);
    1522           0 :     ++it;
    1523             :   }
    1524           0 : }
    1525             : 
    1526             : DDS::ReturnCode_t
    1527           0 : WriteDataContainer::wait_ack_of_seq(const MonotonicTimePoint& deadline,
    1528             :                                     bool deadline_is_infinite,
    1529             :                                     const SequenceNumber& sequence)
    1530             : {
    1531           0 :   ACE_Guard<ACE_SYNCH_MUTEX> guard(wfa_lock_);
    1532           0 :   ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
    1533           0 :   while ((deadline_is_infinite || MonotonicTimePoint::now() < deadline) && !sequence_acknowledged_i(sequence)) {
    1534           0 :     switch (deadline_is_infinite ? wfa_condition_.wait(thread_status_manager) : wfa_condition_.wait_until(deadline, thread_status_manager)) {
    1535           0 :     case CvStatus_NoTimeout:
    1536           0 :       break;
    1537           0 :     case CvStatus_Timeout:
    1538           0 :       if (DCPS_debug_level >= 2) {
    1539           0 :         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
    1540             :                    ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
    1541             :                    sequence.getValue()));
    1542             :         }
    1543           0 :       return DDS::RETCODE_TIMEOUT;
    1544           0 :     case CvStatus_Error:
    1545           0 :       if (DCPS_debug_level) {
    1546           0 :         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::wait_ack_of_seq: "
    1547             :                    "error in wait/wait_until\n"));
    1548             :       }
    1549           0 :       return DDS::RETCODE_ERROR;
    1550             :     }
    1551             :   }
    1552             : 
    1553           0 :   return sequence_acknowledged_i(sequence) ? DDS::RETCODE_OK : DDS::RETCODE_TIMEOUT;
    1554           0 : }
    1555             : 
    1556             : bool
    1557           0 : WriteDataContainer::sequence_acknowledged(const SequenceNumber& sequence)
    1558             : {
    1559           0 :   ACE_Guard<ACE_SYNCH_MUTEX> guard(wfa_lock_);
    1560           0 :   return sequence_acknowledged_i(sequence);
    1561           0 : }
    1562             : 
    1563             : bool
    1564           0 : WriteDataContainer::sequence_acknowledged_i(const SequenceNumber& sequence)
    1565             : {
    1566           0 :   if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
    1567             :     //return true here so that wait_for_acknowledgments doesn't block
    1568           0 :     return true;
    1569             :   }
    1570             : 
    1571           0 :   SequenceNumber acked = get_cumulative_ack();
    1572           0 :   if (DCPS_debug_level >= 10) {
    1573           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged_i ")
    1574             :                           ACE_TEXT("- %C cumulative ack is currently: %q\n"), DCPS::LogGuid(publication_id_).c_str(), acked.getValue()));
    1575             :   }
    1576           0 :   if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
    1577           0 :     return false;
    1578             :   }
    1579           0 :   return true;
    1580             : }
    1581             : 
    1582             : void
    1583           0 : WriteDataContainer::wakeup_blocking_writers(DataSampleElement* stale)
    1584             : {
    1585           0 :   if (!stale && waiting_on_release_) {
    1586           0 :     waiting_on_release_ = false;
    1587             : 
    1588           0 :     condition_.notify_all();
    1589             :   }
    1590           0 : }
    1591             : 
    1592             : void
    1593           0 : WriteDataContainer::log_send_state_lists(OPENDDS_STRING description)
    1594             : {
    1595           0 :   ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
    1596             :              description.c_str(),
    1597             :              unsent_data_.size(),
    1598             :              sending_data_.size(),
    1599             :              sent_data_.size(),
    1600             :              orphaned_to_transport_.size(),
    1601             :              num_all_samples(),
    1602             :              instances_.size()));
    1603           0 : }
    1604             : 
    1605             : void
    1606           0 : WriteDataContainer::set_deadline_period(const TimeDuration& deadline_period)
    1607             : {
    1608             :   // Call comes from DataWriterImpl_t which should arleady have the lock_.
    1609             : 
    1610             :   // Deadline for all instances starting from now.
    1611           0 :   const MonotonicTimePoint deadline = MonotonicTimePoint::now() + deadline_period;
    1612             : 
    1613             :   // Reset the deadline timer if the period has changed.
    1614           0 :   if (deadline_period_ != deadline_period) {
    1615           0 :     if (deadline_period_ == TimeDuration::max_value) {
    1616           0 :       OPENDDS_ASSERT(deadline_map_.empty());
    1617             : 
    1618           0 :       for (PublicationInstanceMapType::iterator iter = instances_.begin();
    1619           0 :            iter != instances_.end();
    1620           0 :            ++iter) {
    1621           0 :         iter->second->deadline_ = deadline;
    1622           0 :         deadline_map_.insert(std::make_pair(deadline, iter->second));
    1623             :       }
    1624             : 
    1625           0 :       if (!deadline_map_.empty()) {
    1626           0 :         deadline_task_->schedule(deadline_period);
    1627             :       }
    1628           0 :     } else if (deadline_period == TimeDuration::max_value) {
    1629           0 :       if (!deadline_map_.empty()) {
    1630           0 :         deadline_task_->cancel();
    1631             :       }
    1632             : 
    1633           0 :       deadline_map_.clear();
    1634             :     } else {
    1635           0 :       DeadlineMapType new_map;
    1636           0 :       for (PublicationInstanceMapType::iterator iter = instances_.begin();
    1637           0 :            iter != instances_.end();
    1638           0 :            ++iter) {
    1639           0 :         iter->second->deadline_ = deadline;
    1640           0 :         new_map.insert(std::make_pair(iter->second->deadline_, iter->second));
    1641             :       }
    1642           0 :       std::swap(new_map, deadline_map_);
    1643             : 
    1644           0 :       if (!deadline_map_.empty()) {
    1645           0 :         deadline_task_->cancel();
    1646           0 :         deadline_task_->schedule(deadline_map_.begin()->first - MonotonicTimePoint::now());
    1647             :       }
    1648           0 :     }
    1649             : 
    1650           0 :     deadline_period_ = deadline_period;
    1651             :   }
    1652           0 : }
    1653             : 
    1654             : void
    1655           0 : WriteDataContainer::process_deadlines(const MonotonicTimePoint& now)
    1656             : {
    1657             :   // Lock the DataWriterImpl.
    1658           0 :   ACE_GUARD (ACE_Recursive_Thread_Mutex, dwi_guard, deadline_status_lock_);
    1659             :   // Lock ourselves.
    1660           0 :   ACE_GUARD (ACE_Recursive_Thread_Mutex, wdc_guard, lock_);
    1661             : 
    1662           0 :   if (deadline_map_.empty()) {
    1663           0 :     return;
    1664             :   }
    1665             : 
    1666           0 :   bool notify = false;
    1667             : 
    1668           0 :   for (DeadlineMapType::iterator pos = deadline_map_.begin(), limit = deadline_map_.end();
    1669           0 :        pos != limit && pos->first < now; pos = deadline_map_.begin()) {
    1670             : 
    1671           0 :     PublicationInstance_rch instance = pos->second;
    1672           0 :     deadline_map_.erase(pos);
    1673             : 
    1674           0 :     ++deadline_status_.total_count;
    1675           0 :     deadline_status_.total_count_change = deadline_status_.total_count - deadline_last_total_count_;
    1676           0 :     deadline_status_.last_instance_handle = instance->instance_handle_;
    1677             : 
    1678           0 :     writer_->set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, true);
    1679           0 :     notify = true;
    1680             : 
    1681           0 :     DDS::DataWriterListener_var listener = writer_->listener_for(DDS::OFFERED_DEADLINE_MISSED_STATUS);
    1682             : 
    1683           0 :     if (listener) {
    1684             :       // Copy before releasing the lock.
    1685           0 :       const DDS::OfferedDeadlineMissedStatus status = deadline_status_;
    1686             : 
    1687             :       // Release the lock during the upcall.
    1688           0 :       ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> deadline_reverse_status_lock(deadline_status_lock_);
    1689           0 :       ACE_GUARD(ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex>, rev_dwi_guard, deadline_reverse_status_lock);
    1690             : 
    1691             :       // @todo Will this operation ever throw?  If so we may want to
    1692             :       //       catch all exceptions, and act accordingly.
    1693           0 :       listener->on_offered_deadline_missed(writer_, status);
    1694             : 
    1695             :       // We need to update the last total count value to our current total
    1696             :       // so that the next time we will calculate the correct total_count_change;
    1697           0 :       deadline_last_total_count_ = deadline_status_.total_count;
    1698           0 :     }
    1699             : 
    1700           0 :     instance->deadline_ += deadline_period_;
    1701           0 :     deadline_map_.insert(std::make_pair(instance->deadline_, instance));
    1702           0 :   }
    1703             : 
    1704           0 :   if (notify) {
    1705           0 :     writer_->notify_status_condition();
    1706             :   }
    1707             : 
    1708           0 :   deadline_task_->schedule(deadline_map_.begin()->first - now);
    1709           0 : }
    1710             : 
    1711             : void
    1712           0 : WriteDataContainer::extend_deadline(const PublicationInstance_rch& instance)
    1713             : {
    1714             :   // Call comes from DataWriterImpl_t which should arleady have the lock_.
    1715             : 
    1716           0 :   if (deadline_period_ == TimeDuration::max_value) {
    1717           0 :     return;
    1718             :   }
    1719             : 
    1720           0 :   std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r = deadline_map_.equal_range(instance->deadline_);
    1721           0 :   while (r.first != r.second && r.first->second != instance) {
    1722           0 :     ++r.first;
    1723             :   }
    1724           0 :   if (r.first != r.second) {
    1725             :     // The instance was in the map.
    1726           0 :     deadline_map_.erase(r.first);
    1727             :   }
    1728           0 :   instance->deadline_ = MonotonicTimePoint::now() + deadline_period_;
    1729           0 :   bool schedule = deadline_map_.empty();
    1730           0 :   deadline_map_.insert(std::make_pair(instance->deadline_, instance));
    1731           0 :   if (schedule) {
    1732           0 :     deadline_task_->schedule(deadline_period_);
    1733             :   }
    1734             : }
    1735             : 
    1736             : void
    1737           0 : WriteDataContainer::cancel_deadline(const PublicationInstance_rch& instance)
    1738             : {
    1739             :   // Call comes from DataWriterImpl_t which should arleady have the lock_.
    1740             : 
    1741           0 :   if (deadline_period_ == TimeDuration::max_value) {
    1742           0 :     return;
    1743             :   }
    1744             : 
    1745           0 :   std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r = deadline_map_.equal_range(instance->deadline_);
    1746           0 :   while (r.first != r.second && r.first->second != instance) {
    1747           0 :     ++r.first;
    1748             :   }
    1749           0 :   if (r.first != r.second) {
    1750           0 :     deadline_map_.erase(r.first);
    1751           0 :     if (deadline_map_.empty()) {
    1752           0 :       deadline_task_->cancel();
    1753             :     }
    1754             :   }
    1755             : }
    1756             : 
    1757             : } // namespace DCPS
    1758             : } // namespace OpenDDS
    1759             : 
    1760             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16