WriteDataContainer.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "ace/Condition_Recursive_Thread_Mutex.h"
00010 #include "WriteDataContainer.h"
00011 #include "DataSampleHeader.h"
00012 #include "InstanceDataSampleList.h"
00013 #include "DataWriterImpl.h"
00014 #include "MessageTracker.h"
00015 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00016 #include "DataDurabilityCache.h"
00017 #endif
00018 #include "PublicationInstance.h"
00019 #include "Util.h"
00020 #include "Qos_Helper.h"
00021 #include "GuidConverter.h"
00022 #include "OfferedDeadlineWatchdog.h"
00023 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00024 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00025 #include "dds/DCPS/transport/framework/TransportDebug.h"
00026 
00027 #include "tao/debug.h"
00028 
00029 namespace OpenDDS {
00030 namespace DCPS {
00031 
00032 /**
00033  * @todo Refactor this code and DataReaderImpl::data_expired() to
00034  *       a common function.
00035  */
00036 bool
00037 resend_data_expired(DataSampleElement const & element,
00038                     DDS::LifespanQosPolicy const & lifespan)
00039 {
00040   if (lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
00041       || lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
00042     // Finite lifespan.  Check if data has expired.
00043 
00044     DDS::Time_t const tmp = {
00045       element.get_header().source_timestamp_sec_ + lifespan.duration.sec,
00046       element.get_header().source_timestamp_nanosec_ + lifespan.duration.nanosec
00047     };
00048 
00049     ACE_Time_Value const now(ACE_OS::gettimeofday());
00050     ACE_Time_Value const expiration_time(time_to_time_value(tmp));
00051 
00052     if (now >= expiration_time) {
00053       if (DCPS_debug_level >= 8) {
00054         ACE_Time_Value const diff(now - expiration_time);
00055         ACE_DEBUG((LM_DEBUG,
00056                    ACE_TEXT("OpenDDS (%P|%t) Data to be sent ")
00057                    ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
00058                    diff.sec(),
00059                    diff.usec()));
00060       }
00061 
00062       return true;  // Data expired.
00063     }
00064   }
00065 
00066   return false;
00067 }
00068 
00069 WriteDataContainer::WriteDataContainer(
00070   DataWriterImpl* writer,
00071   CORBA::Long    depth,
00072   ::DDS::Duration_t max_blocking_time,
00073   size_t         n_chunks,
00074   DDS::DomainId_t domain_id,
00075   char const * topic_name,
00076   char const * type_name,
00077 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00078   DataDurabilityCache* durability_cache,
00079   DDS::DurabilityServiceQosPolicy const & durability_service,
00080 #endif
00081   CORBA::Long     max_instances,
00082   CORBA::Long     max_total_samples)
00083   : transaction_id_(0),
00084     writer_(writer),
00085     depth_(depth),
00086     max_num_instances_(max_instances),
00087     max_num_samples_(max_total_samples),
00088     max_blocking_time_(max_blocking_time),
00089     waiting_on_release_(false),
00090     condition_(lock_),
00091     empty_condition_(lock_),
00092     wfa_condition_(this->wfa_lock_),
00093     n_chunks_(n_chunks),
00094     sample_list_element_allocator_(2 * n_chunks_),
00095     transport_send_element_allocator_(2 * n_chunks_,
00096                                       sizeof(TransportSendElement)),
00097     transport_customized_element_allocator_(2 * n_chunks_,
00098                                             sizeof(TransportCustomizedElement)),
00099     shutdown_(false),
00100     domain_id_(domain_id),
00101     topic_name_(topic_name),
00102     type_name_(type_name)
00103 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00104   , durability_cache_(durability_cache)
00105   , durability_service_(durability_service)
00106 #endif
00107 {
00108 
00109   if (DCPS_debug_level >= 2) {
00110     ACE_DEBUG((LM_DEBUG,
00111                "(%P|%t) WriteDataContainer "
00112                "sample_list_element_allocator %x with %d chunks\n",
00113                &sample_list_element_allocator_, n_chunks_));
00114     ACE_DEBUG((LM_DEBUG,
00115                "(%P|%t) WriteDataContainer "
00116                "transport_send_element_allocator %x with %d chunks\n",
00117                &transport_send_element_allocator_, n_chunks_));
00118   }
00119 }
00120 
00121 WriteDataContainer::~WriteDataContainer()
00122 {
00123   if (this->unsent_data_.size() > 0) {
00124     ACE_DEBUG((LM_WARNING,
00125                ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00126                ACE_TEXT("destroyed with %d samples unsent.\n"),
00127                this->unsent_data_.size()));
00128   }
00129 
00130   if (this->sending_data_.size() > 0) {
00131     ACE_DEBUG((LM_WARNING,
00132                ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00133                ACE_TEXT("destroyed with %d samples sending.\n"),
00134                this->sending_data_.size()));
00135   }
00136 
00137   if (this->sent_data_.size() > 0) {
00138     if (DCPS_debug_level > 0) {
00139       ACE_DEBUG((LM_DEBUG,
00140                  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00141                  ACE_TEXT("destroyed with %d samples sent.\n"),
00142                  this->sent_data_.size()));
00143     }
00144   }
00145 
00146   if (this->orphaned_to_transport_.size() > 0) {
00147     if (DCPS_debug_level > 0) {
00148       ACE_DEBUG((LM_DEBUG,
00149                  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00150                  ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
00151                  this->orphaned_to_transport_.size()));
00152     }
00153   }
00154 
00155   if (!shutdown_) {
00156     ACE_ERROR((LM_ERROR,
00157                ACE_TEXT("(%P|%t) ERROR: ")
00158                ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
00159                ACE_TEXT("The container has not been cleaned.\n")));
00160   }
00161 }
00162 
00163 DDS::ReturnCode_t
00164 WriteDataContainer::enqueue_control(DataSampleElement* control_sample)
00165 {
00166   // Enqueue to the next_send_sample_ thread of unsent_data_
00167   // will link samples with the next_sample/previous_sample and
00168   // also next_send_sample_.
00169   // This would save time when we actually send the data.
00170 
00171   unsent_data_.enqueue_tail(control_sample);
00172 
00173   return DDS::RETCODE_OK;
00174 }
00175 
00176 // This method assumes that instance list has space for this sample.
00177 DDS::ReturnCode_t
00178 WriteDataContainer::enqueue(
00179   DataSampleElement* sample,
00180   DDS::InstanceHandle_t instance_handle)
00181 {
00182   // Get the PublicationInstance pointer from InstanceHandle_t.
00183   PublicationInstance* const instance =
00184     get_handle_instance(instance_handle);
00185   // Extract the instance queue.
00186   InstanceDataSampleList& instance_list = instance->samples_;
00187 
00188   if (this->writer_->watchdog_) {
00189     instance->last_sample_tv_ = instance->cur_sample_tv_;
00190     instance->cur_sample_tv_ = ACE_OS::gettimeofday();
00191     this->writer_->watchdog_->execute(instance, false);
00192   }
00193 
00194   //
00195   // Enqueue to the next_send_sample_ thread of unsent_data_
00196   // will link samples with the next_sample/previous_sample and
00197   // also next_send_sample_.
00198   // This would save time when we actually send the data.
00199 
00200   unsent_data_.enqueue_tail(sample);
00201 
00202   //
00203   // Add this sample to the INSTANCE scope list.
00204   instance_list.enqueue_tail(sample);
00205 
00206   return DDS::RETCODE_OK;
00207 }
00208 
00209 DDS::ReturnCode_t
00210 WriteDataContainer::reenqueue_all(const RepoId& reader_id,
00211                                   const DDS::LifespanQosPolicy& lifespan
00212 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00213                                   ,
00214                                   const OPENDDS_STRING& filterClassName,
00215                                   const FilterEvaluator* eval,
00216                                   const DDS::StringSeq& expression_params
00217 #endif
00218                                   )
00219 {
00220   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00221                    guard,
00222                    this->lock_,
00223                    DDS::RETCODE_ERROR);
00224 
00225   // Make a copy of sending_data_ and sent_data_;
00226   if (sent_data_.size() > 0) {
00227     this->copy_and_append(this->resend_data_,
00228                           sent_data_,
00229                           reader_id,
00230                           lifespan
00231 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00232                           , filterClassName, eval, expression_params
00233 #endif
00234                           );
00235 
00236   if (sending_data_.size() > 0) {
00237     this->copy_and_append(this->resend_data_,
00238                           sending_data_,
00239                           reader_id,
00240                           lifespan
00241 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00242                           , filterClassName, eval, expression_params
00243 #endif
00244                           );
00245   }
00246 
00247     if (DCPS_debug_level > 9) {
00248       GuidConverter converter(publication_id_);
00249       GuidConverter reader(reader_id);
00250       ACE_DEBUG((LM_DEBUG,
00251                  ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
00252                  ACE_TEXT("domain %d topic %C publication %C copying HISTORY to resend to %C.\n"),
00253                  this->domain_id_,
00254                  this->topic_name_,
00255                  OPENDDS_STRING(converter).c_str(),
00256                  OPENDDS_STRING(reader).c_str()));
00257     }
00258   }
00259 
00260   return DDS::RETCODE_OK;
00261 }
00262 
00263 DDS::ReturnCode_t
00264 WriteDataContainer::register_instance(
00265   DDS::InstanceHandle_t&      instance_handle,
00266   DataSample*&                  registered_sample)
00267 {
00268   PublicationInstance* instance = 0;
00269   auto_ptr<PublicationInstance> safe_instance;
00270 
00271   if (instance_handle == DDS::HANDLE_NIL) {
00272     if (max_num_instances_ > 0
00273         && max_num_instances_ <= (CORBA::Long) instances_.size()) {
00274       return DDS::RETCODE_OUT_OF_RESOURCES;
00275     }
00276 
00277     // registered the instance for the first time.
00278     ACE_NEW_RETURN(instance,
00279                    PublicationInstance(registered_sample),
00280                    DDS::RETCODE_ERROR);
00281 
00282     ACE_auto_ptr_reset(safe_instance, instance);
00283 
00284     instance_handle = this->writer_->get_next_handle();
00285 
00286     int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
00287 
00288     if (0 != insert_attempt) {
00289       ACE_ERROR((LM_ERROR,
00290                  ACE_TEXT("(%P|%t) ERROR: ")
00291                  ACE_TEXT("WriteDataContainer::register_instance, ")
00292                  ACE_TEXT("failed to insert instance handle=%X\n"),
00293                  instance));
00294       return DDS::RETCODE_ERROR;
00295     } // if (0 != insert_attempt)
00296 
00297     instance->instance_handle_ = instance_handle;
00298 
00299   } else {
00300     int const find_attempt = find(instances_, instance_handle, instance);
00301     ACE_auto_ptr_reset(safe_instance, instance);
00302 
00303     if (0 != find_attempt) {
00304       ACE_ERROR((LM_ERROR,
00305                  ACE_TEXT("(%P|%t) ERROR: ")
00306                  ACE_TEXT("WriteDataContainer::register_instance, ")
00307                  ACE_TEXT("The provided instance handle=%X is not a valid")
00308                  ACE_TEXT("handle.\n"),
00309                  instance_handle));
00310 
00311       return DDS::RETCODE_ERROR;
00312     } // if (0 != find_attempt)
00313 
00314     // don't need this - the PublicationInstances already has a sample.
00315     registered_sample->release();
00316 
00317     instance->unregistered_ = false;
00318   }
00319 
00320   // The registered_sample is shallow copied.
00321   registered_sample = instance->registered_sample_->duplicate();
00322 
00323   if (this->writer_->watchdog_) {
00324     this->writer_->watchdog_->schedule_timer(instance);
00325   }
00326 
00327   safe_instance.release();  // Safe to relinquish ownership.
00328 
00329   return DDS::RETCODE_OK;
00330 }
00331 
00332 DDS::ReturnCode_t
00333 WriteDataContainer::unregister(
00334   DDS::InstanceHandle_t instance_handle,
00335   DataSample*&            registered_sample,
00336   bool                    dup_registered_sample)
00337 {
00338   PublicationInstance* instance = 0;
00339 
00340   int const find_attempt = find(instances_, instance_handle, instance);
00341 
00342   if (0 != find_attempt) {
00343     ACE_ERROR_RETURN((LM_ERROR,
00344                       ACE_TEXT("(%P|%t) ERROR: ")
00345                       ACE_TEXT("WriteDataContainer::unregister, ")
00346                       ACE_TEXT("The instance(handle=%X) ")
00347                       ACE_TEXT("is not registered yet.\n"),
00348                       instance_handle),
00349                      DDS::RETCODE_PRECONDITION_NOT_MET);
00350   } // if (0 != find_attempt)
00351 
00352   instance->unregistered_ = true;
00353 
00354   if (dup_registered_sample) {
00355     // The registered_sample is shallow copied.
00356     registered_sample = instance->registered_sample_->duplicate();
00357   }
00358 
00359   // Unregister the instance with typed DataWriter.
00360   this->writer_->unregistered(instance_handle);
00361 
00362   if (this->writer_->watchdog_)
00363     this->writer_->watchdog_->cancel_timer(instance);
00364 
00365   return DDS::RETCODE_OK;
00366 }
00367 
00368 DDS::ReturnCode_t
00369 WriteDataContainer::dispose(DDS::InstanceHandle_t instance_handle,
00370                             DataSample*&            registered_sample,
00371                             bool                    dup_registered_sample)
00372 {
00373   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00374                    guard,
00375                    this->lock_,
00376                    DDS::RETCODE_ERROR);
00377 
00378   PublicationInstance* instance = 0;
00379 
00380   int const find_attempt = find(instances_, instance_handle, instance);
00381 
00382   if (0 != find_attempt) {
00383     ACE_ERROR_RETURN((LM_ERROR,
00384                       ACE_TEXT("(%P|%t) ERROR: ")
00385                       ACE_TEXT("WriteDataContainer::dispose, ")
00386                       ACE_TEXT("The instance(handle=%X) ")
00387                       ACE_TEXT("is not registered yet.\n"),
00388                       instance_handle),
00389                      DDS::RETCODE_PRECONDITION_NOT_MET);
00390   }
00391 
00392   if (dup_registered_sample) {
00393     // The registered_sample is shallow copied.
00394     registered_sample = instance->registered_sample_->duplicate();
00395   }
00396 
00397   // Note: The DDS specification is unclear as to if samples in the process
00398   // of being sent should be removed or not.
00399   // The advantage of calling remove_sample() on them is that the
00400   // cached allocator memory for them is freed.  The disadvantage
00401   // is that the slow reader may see multiple disposes without
00402   // any write sample between them and hence not temporarily move into the
00403   // Alive state.
00404   // We have chosen to NOT remove the sending samples.
00405 
00406   InstanceDataSampleList& instance_list = instance->samples_;
00407 
00408   while (instance_list.size() > 0) {
00409     bool released = false;
00410     DDS::ReturnCode_t ret
00411     = remove_oldest_sample(instance_list, released);
00412 
00413     if (ret != DDS::RETCODE_OK) {
00414       return ret;
00415     }
00416   }
00417 
00418   if (this->writer_->watchdog_)
00419     this->writer_->watchdog_->cancel_timer(instance);
00420   return DDS::RETCODE_OK;
00421 }
00422 
00423 DDS::ReturnCode_t
00424 WriteDataContainer::num_samples(DDS::InstanceHandle_t handle,
00425                                 size_t&                 size)
00426 {
00427   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00428                    guard,
00429                    this->lock_,
00430                    DDS::RETCODE_ERROR);
00431   PublicationInstance* instance = 0;
00432 
00433   int const find_attempt = find(instances_, handle, instance);
00434 
00435   if (0 != find_attempt) {
00436     return DDS::RETCODE_ERROR;
00437 
00438   } else {
00439     size = instance->samples_.size();
00440     return DDS::RETCODE_OK;
00441   }
00442 }
00443 
00444 size_t
00445 WriteDataContainer::num_all_samples()
00446 {
00447   size_t size = 0;
00448 
00449   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00450                    guard,
00451                    this->lock_,
00452                    0);
00453 
00454   for (PublicationInstanceMapType::iterator iter = instances_.begin();
00455        iter != instances_.end();
00456        ++iter)
00457   {
00458     size += iter->second->samples_.size();
00459   }
00460 
00461   return size;
00462 }
00463 
00464 ACE_UINT64
00465 WriteDataContainer::get_unsent_data(SendStateDataSampleList& list)
00466 {
00467   DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
00468   //
00469   // The samples in unsent_data are added to the local datawriter
00470   // list and enqueued to the sending_data_ signifying they have
00471   // been passed to the transport to send in a transaction
00472   //
00473   list = this->unsent_data_;
00474 
00475   // Increment send counter for this send operation
00476   ++transaction_id_;
00477 
00478   // Mark all samples with current send counter
00479   SendStateDataSampleList::iterator iter = list.begin();
00480   while (iter != list.end()) {
00481     iter->set_transaction_id(this->transaction_id_);
00482     ++iter;
00483   }
00484 
00485   //
00486   // The unsent_data_ already linked with the
00487   // next_send_sample during enqueue.
00488   // Append the unsent_data_ to current sending_data_
00489   // list.
00490   sending_data_.enqueue_tail(list);
00491 
00492   //
00493   // Clear the unsent data list.
00494   //
00495   this->unsent_data_.reset();
00496 
00497   //
00498   // Return the moved list.
00499   //
00500   return transaction_id_;
00501 }
00502 
00503 SendStateDataSampleList
00504 WriteDataContainer::get_resend_data()
00505 {
00506   DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
00507 
00508   //
00509   // The samples in unsent_data are added to the sending_data
00510   // during enqueue.
00511   //
00512   SendStateDataSampleList list = this->resend_data_;
00513 
00514   //
00515   // Clear the unsent data list.
00516   //
00517   this->resend_data_.reset();
00518   //
00519   // Return the moved list.
00520   //
00521   return list;
00522 }
00523 
00524 bool
00525 WriteDataContainer::pending_data()
00526 {
00527   return this->sending_data_.size() != 0
00528          || this->orphaned_to_transport_.size() != 0
00529          || this->unsent_data_.size() != 0;
00530 }
00531 
00532 void
00533 WriteDataContainer::data_delivered(const DataSampleElement* sample)
00534 {
00535   DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
00536 
00537   if (DCPS_debug_level >= 2) {
00538     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
00539                           ACE_TEXT(" %X \n"), sample));
00540   }
00541 
00542   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00543             guard,
00544             this->lock_);
00545 
00546   // Delivered samples _must_ be on sending_data_ list
00547 
00548   // If it is not found in one of the lists, an invariant
00549   // exception is declared.
00550 
00551   // The element now needs to be removed from the sending_data_
00552   // list, and appended to the end of the sent_data_ list here
00553 
00554   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00555 
00556   // If sample is on a SendStateDataSampleList it should be on the
00557   // sending_data_ list signifying it was given to the transport to
00558   // deliver and now the transport is signaling it has been delivered
00559   if (!sending_data_.dequeue(sample)) {
00560     //
00561     // Should be on sending_data_.  If it is in sent_data_
00562     // or unsent_data there was a problem.
00563     //
00564     OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00565     send_lists.push_back(&sent_data_);
00566     send_lists.push_back(&unsent_data_);
00567     send_lists.push_back(&orphaned_to_transport_);
00568 
00569     const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00570 
00571     if (containing_list == &this->sent_data_) {
00572       ACE_ERROR((LM_WARNING,
00573                  ACE_TEXT("(%P|%t) WARNING: ")
00574                  ACE_TEXT("WriteDataContainer::data_delivered, ")
00575                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
00576                  ACE_TEXT("WAS IN sent_data_.\n")));
00577     } else if (containing_list == &this->unsent_data_) {
00578       ACE_ERROR((LM_WARNING,
00579                  ACE_TEXT("(%P|%t) WARNING: ")
00580                  ACE_TEXT("WriteDataContainer::data_delivered, ")
00581                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
00582                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
00583     } else {
00584 
00585       if (containing_list == &this->orphaned_to_transport_) {
00586         orphaned_to_transport_.dequeue(sample);
00587         release_buffer(stale);
00588       }
00589       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
00590       //and inform transport of their release.  Transport will call data-delivered on the
00591       //elements as it processes the removal but they will already be gone from the send lists.
00592       if (stale->get_header().message_id_ != SAMPLE_DATA) {
00593         //this message was a control message so release it
00594         if (DCPS_debug_level > 9) {
00595           GuidConverter converter(publication_id_);
00596           ACE_DEBUG((LM_DEBUG,
00597                      ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00598                      ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00599                      this->domain_id_,
00600                      this->topic_name_,
00601                      OPENDDS_STRING(converter).c_str()));
00602         }
00603         writer_->controlTracker.message_delivered();
00604       }
00605 
00606       if (!pending_data())
00607         empty_condition_.broadcast();
00608     }
00609 
00610     return;
00611   }
00612   ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, this->wfa_lock_);
00613   SequenceNumber acked_seq = stale->get_header().sequence_;
00614   SequenceNumber prev_max = acked_sequences_.cumulative_ack();
00615 
00616   if (stale->get_header().message_id_ != SAMPLE_DATA) {
00617     //this message was a control message so release it
00618     if (DCPS_debug_level > 9) {
00619       GuidConverter converter(publication_id_);
00620       ACE_DEBUG((LM_DEBUG,
00621                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00622                  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00623                  this->domain_id_,
00624                  this->topic_name_,
00625                  OPENDDS_STRING(converter).c_str()));
00626     }
00627     release_buffer(stale);
00628     writer_->controlTracker.message_delivered();
00629   } else {
00630     if (DCPS_debug_level > 9) {
00631       GuidConverter converter(publication_id_);
00632       ACE_DEBUG((LM_DEBUG,
00633                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00634                  ACE_TEXT("domain %d topic %C publication %C pushed to HISTORY.\n"),
00635                  this->domain_id_,
00636                  this->topic_name_,
00637                  OPENDDS_STRING(converter).c_str()));
00638     }
00639 
00640     DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
00641     sent_data_.enqueue_tail(sample);
00642 
00643     this->wakeup_blocking_writers (stale);
00644   }
00645   if (DCPS_debug_level > 9) {
00646     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00647                          ACE_TEXT("Inserting acked_sequence: %q\n"),
00648                          acked_seq.getValue()));
00649   }
00650 
00651   acked_sequences_.insert(acked_seq);
00652 
00653   if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
00654       prev_max < acked_sequences_.cumulative_ack()) {
00655 
00656     if (DCPS_debug_level > 9) {
00657       ACE_DEBUG((LM_DEBUG,
00658                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
00659                  ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
00660     }
00661 
00662     wfa_condition_.broadcast();
00663   }
00664 
00665   // Signal if there is no pending data.
00666   if (!pending_data())
00667     empty_condition_.broadcast();
00668 }
00669 
00670 void
00671 WriteDataContainer::data_dropped(const DataSampleElement* sample,
00672                                  bool dropped_by_transport)
00673 {
00674   DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
00675 
00676   if (DCPS_debug_level >= 2) {
00677     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
00678                           ACE_TEXT(" sample %X dropped_by_transport %d\n"),
00679                           sample, dropped_by_transport));
00680   }
00681 
00682   // If the transport initiates the data dropping, we need do same thing
00683   // as data_delivered. e.g. remove the sample from the internal list
00684   // and the instance list. We do not need acquire the lock here since
00685   // the data_delivered acquires the lock.
00686   if (dropped_by_transport) {
00687     this->data_delivered(sample);
00688     return;
00689   }
00690 
00691   //The data_dropped could be called from the thread initiating sample remove
00692   //which already hold the lock. In this case, it's not necessary to acquire
00693   //lock here. It also could be called from the transport thread in a delayed
00694   //notification, it's necessary to acquire lock here to protect the internal
00695   //structures in this class.
00696 
00697   ACE_GUARD (ACE_Recursive_Thread_Mutex,
00698     guard,
00699     this->lock_);
00700 
00701   // The dropped sample should be in the sending_data_ list.
00702   // Otherwise an exception will be raised.
00703   //
00704   // We are now been notified by transport, so we can
00705   // keep the sample from the sending_data_ list still in
00706   // sample list since we will send it.
00707 
00708   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00709 
00710   // If sample is on a SendStateDataSampleList it should be on the
00711   // sending_data_ list signifying it was given to the transport to
00712   // deliver and now the transport is signaling it has been dropped
00713 
00714   if (sending_data_.dequeue(sample)) {
00715     // else: The data_dropped is called as a result of remove_sample()
00716     // called from reenqueue_all() which supports the TRANSIENT_LOCAL
00717     // qos. The samples that are sending by transport are dropped from
00718     // transport and will be moved to the unsent list for resend.
00719     unsent_data_.enqueue_tail(sample);
00720 
00721   } else {
00722     //
00723     // If it is in sent_data_ or unsent_data there was a problem.
00724     //
00725     OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00726     send_lists.push_back(&sent_data_);
00727     send_lists.push_back(&unsent_data_);
00728     send_lists.push_back(&orphaned_to_transport_);
00729 
00730     const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00731 
00732     if (containing_list == &this->sent_data_) {
00733       ACE_ERROR((LM_WARNING,
00734                  ACE_TEXT("(%P|%t) WARNING: ")
00735                  ACE_TEXT("WriteDataContainer::data_dropped, ")
00736                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
00737                  ACE_TEXT("WAS IN sent_data_.\n")));
00738     } else if (containing_list == &this->unsent_data_) {
00739       ACE_ERROR((LM_WARNING,
00740                  ACE_TEXT("(%P|%t) WARNING: ")
00741                  ACE_TEXT("WriteDataContainer::data_dropped, ")
00742                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
00743                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
00744     } else {
00745       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
00746       //and inform transport of their release.  Transport will call data-dropped on the
00747       //elements as it processes the removal but they will already be gone from the send lists.
00748       if (stale->get_header().message_id_ != SAMPLE_DATA) {
00749         //this message was a control message so release it
00750         if (DCPS_debug_level > 9) {
00751           GuidConverter converter(publication_id_);
00752           ACE_DEBUG((LM_DEBUG,
00753                      ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
00754                      ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
00755                      this->domain_id_,
00756                      this->topic_name_,
00757                      OPENDDS_STRING(converter).c_str()));
00758         }
00759         writer_->controlTracker.message_dropped();
00760       }
00761       if (containing_list == &this->orphaned_to_transport_) {
00762         orphaned_to_transport_.dequeue(sample);
00763         release_buffer(stale);
00764         if (!pending_data())
00765           empty_condition_.broadcast();
00766       }
00767     }
00768 
00769     return;
00770   }
00771 
00772   this->wakeup_blocking_writers (stale);
00773 
00774   if (!pending_data())
00775     empty_condition_.broadcast();
00776 }
00777 
00778 DDS::ReturnCode_t
00779 WriteDataContainer::remove_oldest_historical_sample(
00780   InstanceDataSampleList& instance_list,
00781   bool& released)
00782 {
00783   DataSampleElement* stale = 0;
00784 
00785   if (instance_list.head() != 0) {
00786     stale = instance_list.head();
00787   } else {
00788     //it is fine for the instance_list to be empty
00789     return DDS::RETCODE_OK;
00790   }
00791 
00792   // Only interested in trying to remove historical samples.
00793   // If the sample is not on the sent_data_ list, simply can't
00794   // be removed  -- not an error
00795 
00796   OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00797   send_lists.push_back(&sent_data_);
00798 
00799   const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00800 
00801   // Identify if the stale data is in the sent_data_ (i.e. HISTORY)
00802   // and can therefore be released
00803 
00804   bool result = false;
00805 
00806   if (containing_list == &this->sent_data_) {
00807     // No one is using the historical data sample, so we can release it back to
00808     // its allocator.
00809     // First remove the oldest sample from the instance list.
00810     //
00811     if (instance_list.dequeue_head(stale) == false) {
00812       ACE_ERROR_RETURN((LM_ERROR,
00813                         ACE_TEXT("(%P|%t) ERROR: ")
00814                         ACE_TEXT("WriteDataContainer::remove_oldest_historical_sample, ")
00815                         ACE_TEXT("dequeue_head_next_sample failed\n")),
00816                        DDS::RETCODE_ERROR);
00817     }
00818 
00819     // Now attempt to remove the sample from the internal list and release its buffer
00820     result = this->sent_data_.dequeue(stale) != 0;
00821     release_buffer(stale);
00822     released = true;
00823 
00824     if (DCPS_debug_level > 9) {
00825       GuidConverter converter(publication_id_);
00826       ACE_DEBUG((LM_DEBUG,
00827                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_historical_sample: ")
00828                  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00829                  this->domain_id_,
00830                  this->topic_name_,
00831                  OPENDDS_STRING(converter).c_str()));
00832     }
00833 
00834   } else {
00835     //Sample is not a historical sample
00836     return DDS::RETCODE_OK;
00837   }
00838 
00839   if (result == false) {
00840     ACE_ERROR_RETURN((LM_ERROR,
00841                       ACE_TEXT("(%P|%t) ERROR: ")
00842                       ACE_TEXT("WriteDataContainer::remove_oldest_historical_sample, ")
00843                       ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00844                      DDS::RETCODE_ERROR);
00845 
00846   }
00847 
00848   return DDS::RETCODE_OK;
00849 }
00850 
00851 
00852 DDS::ReturnCode_t
00853 WriteDataContainer::remove_oldest_sample(
00854   InstanceDataSampleList& instance_list,
00855   bool& released)
00856 {
00857   DataSampleElement* stale = 0;
00858 
00859   //
00860   // Remove the oldest sample from the instance list.
00861   //
00862   if (instance_list.dequeue_head(stale) == false) {
00863     ACE_ERROR_RETURN((LM_ERROR,
00864                       ACE_TEXT("(%P|%t) ERROR: ")
00865                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00866                       ACE_TEXT("dequeue_head_next_sample failed\n")),
00867                      DDS::RETCODE_ERROR);
00868   }
00869 
00870   //
00871   // Remove the stale data from the next_writer_sample_ list.  The
00872   // sending_data_/next_send_sample_ list is not managed within the
00873   // container, it is only used external to the container and does
00874   // not need to be managed internally.
00875   //
00876   // The next_writer_sample_ link is being used in one of the sent_data_,
00877   // sending_data_, or unsent_data lists.  Removal from the doubly
00878   // linked list needs to repair the list only when the stale sample
00879   // is either the head or tail of the list.
00880   //
00881 
00882   //
00883   // Locate the head of the list that the stale data is in.
00884   //
00885   OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00886   send_lists.push_back(&sending_data_);
00887   send_lists.push_back(&sent_data_);
00888   send_lists.push_back(&unsent_data_);
00889   send_lists.push_back(&orphaned_to_transport_);
00890 
00891   const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00892 
00893 
00894   //
00895   // Identify the list that the stale data is in.
00896   // The stale data should be in one of the sent_data_, sending_data_
00897   // or unsent_data_. It should not be in released_data_ list since
00898   // this function is the only place a sample is moved from
00899   // sending_data_ to released_data_ list.
00900 
00901   // Remove the element from the internal list.
00902   bool result = false;
00903 
00904   if (containing_list == &this->sending_data_) {
00905     if (DCPS_debug_level > 2) {
00906       ACE_ERROR((LM_WARNING,
00907                  ACE_TEXT("(%P|%t) WARNING: ")
00908                  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00909                  ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
00910     }
00911 
00912     // This means transport is still using the sample that needs to
00913     // be released currently so notify transport that sample is being removed.
00914 
00915     if (this->writer_->remove_sample(stale)) {
00916       if (this->sent_data_.dequeue(stale)) {
00917         release_buffer(stale);
00918         result = true;
00919       }
00920 
00921     } else {
00922       if (this->sending_data_.dequeue(stale)) {
00923         this->orphaned_to_transport_.enqueue_tail(stale);
00924       } else if (this->sent_data_.dequeue(stale)) {
00925         release_buffer(stale);
00926         result = true;
00927       }
00928       result = true;
00929     }
00930     released = true;
00931 
00932   } else if (containing_list == &this->sent_data_) {
00933     // No one is using the data sample, so we can release it back to
00934     // its allocator.
00935     //
00936     result = this->sent_data_.dequeue(stale) != 0;
00937     release_buffer(stale);
00938     released = true;
00939 
00940     if (DCPS_debug_level > 9) {
00941       GuidConverter converter(publication_id_);
00942       ACE_DEBUG((LM_DEBUG,
00943                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00944                  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00945                  this->domain_id_,
00946                  this->topic_name_,
00947                  OPENDDS_STRING(converter).c_str()));
00948     }
00949 
00950   } else if (containing_list == &this->unsent_data_) {
00951     //
00952     // No one is using the data sample, so we can release it back to
00953     // its allocator.
00954     //
00955     result = this->unsent_data_.dequeue(stale) != 0;
00956     release_buffer(stale);
00957     released = true;
00958 
00959     if (DCPS_debug_level > 9) {
00960       GuidConverter converter(publication_id_);
00961       ACE_DEBUG((LM_DEBUG,
00962                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00963                  ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
00964                  this->domain_id_,
00965                  this->topic_name_,
00966                  OPENDDS_STRING(converter).c_str()));
00967     }
00968   } else {
00969     ACE_ERROR_RETURN((LM_ERROR,
00970                       ACE_TEXT("(%P|%t) ERROR: ")
00971                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00972                       ACE_TEXT("The oldest sample is not in any internal list.\n")),
00973                      DDS::RETCODE_ERROR);
00974   }
00975 
00976   // Signal if there is no pending data.
00977   {
00978     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00979                      guard,
00980                      this->lock_,
00981                      DDS::RETCODE_ERROR);
00982 
00983     if (!pending_data())
00984       empty_condition_.broadcast();
00985   }
00986 
00987   if (result == false) {
00988     ACE_ERROR_RETURN((LM_ERROR,
00989                       ACE_TEXT("(%P|%t) ERROR: ")
00990                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00991                       ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00992                      DDS::RETCODE_ERROR);
00993 
00994   }
00995 
00996   return DDS::RETCODE_OK;
00997 }
00998 
00999 DDS::ReturnCode_t
01000 WriteDataContainer::obtain_buffer_for_control(DataSampleElement*& element)
01001 {
01002   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
01003 
01004   ACE_NEW_MALLOC_RETURN(
01005     element,
01006     static_cast<DataSampleElement*>(
01007       sample_list_element_allocator_.malloc(
01008         sizeof(DataSampleElement))),
01009     DataSampleElement(publication_id_,
01010                           this->writer_,
01011                           0,
01012                           &transport_send_element_allocator_,
01013                           &transport_customized_element_allocator_),
01014     DDS::RETCODE_ERROR);
01015 
01016   return DDS::RETCODE_OK;
01017 }
01018 
01019 DDS::ReturnCode_t
01020 WriteDataContainer::obtain_buffer(DataSampleElement*& element,
01021                                   DDS::InstanceHandle_t handle)
01022 {
01023   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
01024 
01025   PublicationInstance* instance = get_handle_instance(handle);
01026 
01027   ACE_NEW_MALLOC_RETURN(
01028     element,
01029     static_cast<DataSampleElement*>(
01030       sample_list_element_allocator_.malloc(
01031         sizeof(DataSampleElement))),
01032     DataSampleElement(publication_id_,
01033                           this->writer_,
01034                           instance,
01035                           &transport_send_element_allocator_,
01036                           &transport_customized_element_allocator_),
01037     DDS::RETCODE_ERROR);
01038 
01039   // Extract the current instance queue.
01040   InstanceDataSampleList& instance_list = instance->samples_;
01041   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01042 
01043   bool need_to_set_abs_timeout = true;
01044   ACE_Time_Value abs_timeout;
01045 
01046   //depth_ covers both HistoryQosPolicy kind and depth as well as
01047   //ResourceLimitsQosPolicy max_samples_per_instance
01048   //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and
01049   //max_instances and max_instances * depth
01050   while ((instance_list.size() >= depth_) ||
01051          ((this->max_num_samples_ > 0) &&
01052          ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
01053 
01054     //Need to either remove stale samples or wait for space to become available
01055     if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01056       //Remove historical samples already sent
01057       bool removed_historical = false;
01058       if (DCPS_debug_level >= 2) {
01059         ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01060                               ACE_TEXT(" instance %d attempting to remove")
01061                               ACE_TEXT(" its oldest historical sample\n"),
01062                               handle));
01063       }
01064 
01065       ret = this->remove_oldest_historical_sample(instance_list, removed_historical);
01066 
01067       //else try to remove historical samples from other instances
01068       if (ret == DDS::RETCODE_OK && !removed_historical) {
01069         if (DCPS_debug_level >= 2) {
01070           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01071                                 ACE_TEXT(" instance %d attempting to remove")
01072                                 ACE_TEXT(" oldest historical sample from any full instances\n"),
01073                                 handle));
01074         }
01075         PublicationInstanceMapType::iterator it = instances_.begin();
01076 
01077         while (!removed_historical && it != instances_.end() && ret == DDS::RETCODE_OK) {
01078           if(it->second->samples_.size() >= depth_) {
01079             ret = this->remove_oldest_historical_sample(it->second->samples_, removed_historical);
01080           }
01081           ++it;
01082         }
01083       }
01084 
01085       if (ret == DDS::RETCODE_OK && !removed_historical) {
01086         //Reliable writers can wait
01087         if (need_to_set_abs_timeout) {
01088           abs_timeout = duration_to_absolute_time_value (max_blocking_time_);
01089           need_to_set_abs_timeout = false;
01090         }
01091         if (!shutdown_ && ACE_OS::gettimeofday() < abs_timeout) {
01092           if (DCPS_debug_level >= 2) {
01093             ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01094                                   ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
01095                                   handle));
01096           }
01097 
01098           waiting_on_release_ = true;
01099           // lock is released while waiting and acquired before returning
01100           // from wait.
01101           int const wait_result = condition_.wait(&abs_timeout);
01102 
01103           if (wait_result != 0) {
01104             if (errno == ETIME) {
01105               if (DCPS_debug_level >= 2) {
01106                 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01107                                       ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
01108                                       handle));
01109               }
01110               ret = DDS::RETCODE_TIMEOUT;
01111             } else {
01112               ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: WriteDataContainer::obtain_buffer condition_.wait()")
01113                                     ACE_TEXT("%p\n")));
01114               ret = DDS::RETCODE_ERROR;
01115             }
01116           }
01117         } else {
01118           //either shutdown has been signaled or max_blocking_time
01119           //has surpassed so treat as timeout
01120           ret = DDS::RETCODE_TIMEOUT;
01121         }
01122       }
01123 
01124     } else {
01125       //BEST EFFORT
01126       bool oldest_released = false;
01127 
01128       //try to remove stale samples from this instance
01129       // The remove_oldest_sample() method removes the oldest sample
01130       // from instance list and removes it from the internal lists.
01131       if (instance_list.size() > 0) {
01132         if (DCPS_debug_level >= 2) {
01133           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01134                                 ACE_TEXT(" instance %d attempting to remove")
01135                                 ACE_TEXT(" its oldest sample\n"),
01136                                 handle));
01137         }
01138         ret = this->remove_oldest_sample(instance_list, oldest_released);
01139       }
01140       //else try to remove stale samples from other instances which are full
01141       if (ret == DDS::RETCODE_OK && !oldest_released) {
01142         if (DCPS_debug_level >= 2) {
01143           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01144                                 ACE_TEXT(" instance %d attempting to remove")
01145                                 ACE_TEXT(" oldest sample from any full instances\n"),
01146                                 handle));
01147         }
01148         PublicationInstanceMapType::iterator it = instances_.begin();
01149 
01150         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01151           if(it->second->samples_.size() >= depth_) {
01152             ret = this->remove_oldest_sample(it->second->samples_, oldest_released);
01153           }
01154           ++it;
01155         }
01156       }
01157       //else try to remove stale samples from other non-full instances
01158       if (ret == DDS::RETCODE_OK && !oldest_released) {
01159         if (DCPS_debug_level >= 2) {
01160           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01161                                 ACE_TEXT(" instance %d attempting to remove")
01162                                 ACE_TEXT(" oldest sample from any instance with samples currently\n"),
01163                                 handle));
01164         }
01165         PublicationInstanceMapType::iterator it = instances_.begin();
01166 
01167         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01168           if(it->second->samples_.size() > 0) {
01169             ret = this->remove_oldest_sample(it->second->samples_, oldest_released);
01170           }
01171           ++it;
01172         }
01173       }
01174       if (!oldest_released) {
01175         //This means that no instances have samples to remove and yet
01176         //still hitting resource limits.
01177         ACE_ERROR((LM_ERROR,
01178                    ACE_TEXT("(%P|%t) ERROR: ")
01179                    ACE_TEXT("WriteDataContainer::obtain_buffer, ")
01180                    ACE_TEXT("hitting resource limits with no samples to remove\n")));
01181         ret = DDS::RETCODE_ERROR;
01182       }
01183     }  //END BEST EFFORT
01184 
01185     if (ret != DDS::RETCODE_OK) {
01186       if (DCPS_debug_level >= 2) {
01187         ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01188                               ACE_TEXT(" instance %d could not obtain buffer for sample")
01189                               ACE_TEXT(" releasing allotted sample and returning\n"),
01190                               handle));
01191       }
01192       this->release_buffer(element);
01193       return ret;
01194     }
01195   }  //END WHILE
01196 
01197   data_holder_.enqueue_tail(element);
01198 
01199   return ret;
01200 }
01201 
01202 void
01203 WriteDataContainer::release_buffer(DataSampleElement* element)
01204 {
01205   if (element->get_header().message_id_ == SAMPLE_DATA)
01206     data_holder_.dequeue(element);
01207   // Release the memory to the allocator.
01208   ACE_DES_FREE(element,
01209                sample_list_element_allocator_.free,
01210                DataSampleElement);
01211 }
01212 
01213 void
01214 WriteDataContainer::unregister_all()
01215 {
01216   DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
01217   shutdown_ = true;
01218 
01219   {
01220     //The internal list needs protection since this call may result from the
01221     //the delete_datawriter call which does not acquire the lock in advance.
01222     ACE_GUARD(ACE_Recursive_Thread_Mutex,
01223               guard,
01224               this->lock_);
01225     // Tell transport remove all control messages currently
01226     // transport is processing.
01227     (void) this->writer_->remove_all_msgs();
01228 
01229     // Broadcast to wake up all waiting threads.
01230     if (waiting_on_release_) {
01231       condition_.broadcast();
01232     }
01233   }
01234   DDS::ReturnCode_t ret;
01235   DataSample* registered_sample;
01236   PublicationInstanceMapType::iterator it = instances_.begin();
01237 
01238   while (it != instances_.end()) {
01239     // Release the instance data.
01240     ret = dispose(it->first, registered_sample, false);
01241 
01242     if (ret != DDS::RETCODE_OK) {
01243       ACE_ERROR((LM_ERROR,
01244                  ACE_TEXT("(%P|%t) ERROR: ")
01245                  ACE_TEXT("WriteDataContainer::unregister_all, ")
01246                  ACE_TEXT("dispose instance %X failed\n"),
01247                  it->first));
01248     }
01249     // Mark the instance unregistered.
01250     ret = unregister(it->first, registered_sample, false);
01251 
01252     if (ret != DDS::RETCODE_OK) {
01253       ACE_ERROR((LM_ERROR,
01254                  ACE_TEXT("(%P|%t) ERROR: ")
01255                  ACE_TEXT("WriteDataContainer::unregister_all, ")
01256                  ACE_TEXT("unregister instance %X failed\n"),
01257                  it->first));
01258     }
01259 
01260     PublicationInstance* instance = it->second;
01261 
01262     delete instance;
01263 
01264     // Get the next iterator before erase the instance handle.
01265     PublicationInstanceMapType::iterator it_next = it;
01266     ++it_next;
01267     // Remove the instance from the instance list.
01268     unbind(instances_, it->first);
01269     it = it_next;
01270   }
01271 
01272   ACE_UNUSED_ARG(registered_sample);
01273 }
01274 
01275 PublicationInstance*
01276 WriteDataContainer::get_handle_instance(DDS::InstanceHandle_t handle)
01277 {
01278   PublicationInstance* instance = 0;
01279 
01280   if (0 != find(instances_, handle, instance)) {
01281     ACE_ERROR((LM_ERROR,
01282                ACE_TEXT("(%P|%t) ERROR: ")
01283                ACE_TEXT("WriteDataContainer::get_handle_instance, ")
01284                ACE_TEXT("lookup for %d failed\n"),
01285                handle));
01286   }
01287 
01288   return instance;
01289 }
01290 
01291 void
01292 WriteDataContainer::copy_and_append(SendStateDataSampleList& list,
01293                                     const SendStateDataSampleList& appended,
01294                                     const RepoId& reader_id,
01295                                     const DDS::LifespanQosPolicy& lifespan
01296 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01297                                     ,
01298                                     const OPENDDS_STRING& filterClassName,
01299                                     const FilterEvaluator* eval,
01300                                     const DDS::StringSeq& params
01301 #endif
01302                                     )
01303 {
01304   for (SendStateDataSampleList::const_iterator cur = appended.begin();
01305        cur != appended.end(); ++cur) {
01306 
01307     // Do not copy and append data that has exceeded the configured
01308     // lifespan.
01309     if (resend_data_expired(*cur, lifespan))
01310       continue;
01311 
01312 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01313     if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
01314       continue;
01315 #endif
01316 
01317     DataSampleElement* element = 0;
01318     ACE_NEW_MALLOC(element,
01319                     static_cast<DataSampleElement*>(
01320                       sample_list_element_allocator_.malloc(
01321                         sizeof(DataSampleElement))),
01322                     DataSampleElement(*cur));
01323 
01324     element->set_num_subs(1);
01325     element->set_sub_id(0, reader_id);
01326 
01327     list.enqueue_tail(element);
01328   }
01329 }
01330 
01331 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01332 bool
01333 WriteDataContainer::persist_data()
01334 {
01335   bool result = true;
01336 
01337   // ------------------------------------------------------------
01338   // Transfer sent data to data DURABILITY cache.
01339   // ------------------------------------------------------------
01340   if (this->durability_cache_) {
01341     // A data durability cache is available for TRANSIENT or
01342     // PERSISTENT data durability.  Cache the data samples.
01343 
01344     //
01345     //  We only cache data that is not still in use outside of
01346     //  this instance of WriteDataContainer
01347     //  (only cache samples in sent_data_ meaning transport has delivered).
01348     bool const inserted =
01349       this->durability_cache_->insert(this->domain_id_,
01350                                       this->topic_name_,
01351                                       this->type_name_,
01352                                       this->sent_data_,
01353                                       this->durability_service_
01354                                      );
01355 
01356     result = inserted;
01357 
01358     if (!inserted)
01359       ACE_ERROR((LM_ERROR,
01360                  ACE_TEXT("(%P|%t) ERROR: ")
01361                  ACE_TEXT("WriteDataContainer::persist_data, ")
01362                  ACE_TEXT("failed to make data durable for ")
01363                  ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
01364                  this->domain_id_,
01365                  this->topic_name_,
01366                  this->type_name_));
01367   }
01368 
01369   return result;
01370 }
01371 #endif
01372 
01373 void WriteDataContainer::reschedule_deadline()
01374 {
01375   for (PublicationInstanceMapType::iterator iter = instances_.begin();
01376        iter != instances_.end();
01377        ++iter) {
01378     if (iter->second->deadline_timer_id_ != -1) {
01379       if (this->writer_->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
01380         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WriteDataContainer::reschedule_deadline %p\n")
01381                    ACE_TEXT("reset_timer_interval")));
01382       }
01383     }
01384   }
01385 }
01386 
01387 void
01388 WriteDataContainer::wait_pending()
01389 {
01390   ACE_Time_Value pending_timeout =
01391     TheServiceParticipant->pending_timeout();
01392 
01393   ACE_Time_Value* pTimeout = 0;
01394 
01395   if (pending_timeout != ACE_Time_Value::zero) {
01396     pTimeout = &pending_timeout;
01397     pending_timeout += ACE_OS::gettimeofday();
01398   }
01399 
01400   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01401   const bool report = DCPS_debug_level > 0 && pending_data();
01402   if (report) {
01403     ACE_TCHAR date_time[50];
01404     ACE_TCHAR* const time =
01405       MessageTracker::timestamp(pending_timeout,
01406                                 date_time,
01407                                 50);
01408     ACE_DEBUG((LM_DEBUG,
01409                ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending timeout ")
01410                ACE_TEXT("at %s\n"),
01411                (pending_timeout == ACE_Time_Value::zero ?
01412                   ACE_TEXT("(no timeout)") : time)));
01413   }
01414   while (true) {
01415 
01416     if (!pending_data())
01417       break;
01418 
01419     if (empty_condition_.wait(pTimeout) == -1 && pending_data()) {
01420       if (DCPS_debug_level) {
01421         ACE_DEBUG((LM_INFO,
01422                    ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending %p\n"),
01423                    ACE_TEXT("Timed out waiting for messages to be transported")));
01424         this->log_send_state_lists("WriteDataContainer::wait_pending - wait failed: ");
01425       }
01426       break;
01427     }
01428   }
01429   if (report) {
01430     ACE_DEBUG((LM_DEBUG,
01431                "%T (%P|%t) WriteDataContainer::wait_pending done\n"));
01432   }
01433 }
01434 
01435 void
01436 WriteDataContainer::get_instance_handles(InstanceHandleVec& instance_handles)
01437 {
01438   ACE_GUARD(ACE_Recursive_Thread_Mutex,
01439             guard,
01440             this->lock_);
01441   PublicationInstanceMapType::iterator it = instances_.begin();
01442 
01443   while (it != instances_.end()) {
01444     instance_handles.push_back(it->second->instance_handle_);
01445     ++it;
01446   }
01447 }
01448 
01449 DDS::ReturnCode_t
01450 WriteDataContainer::wait_ack_of_seq(const ACE_Time_Value& abs_deadline, const SequenceNumber& sequence)
01451 {
01452   ACE_Time_Value deadline(abs_deadline);
01453   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01454   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->wfa_lock_, DDS::RETCODE_ERROR);
01455 
01456   while (ACE_OS::gettimeofday() < deadline) {
01457 
01458     if (!sequence_acknowledged(sequence)) {
01459       // lock is released while waiting and acquired before returning
01460       // from wait.
01461       int const wait_result = wfa_condition_.wait(&deadline);
01462 
01463       if (wait_result != 0) {
01464         if (errno == ETIME) {
01465           if (DCPS_debug_level >= 2) {
01466             ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
01467                                   ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
01468                                   sequence.getValue()));
01469           }
01470           ret = DDS::RETCODE_TIMEOUT;
01471         } else {
01472           ret = DDS::RETCODE_ERROR;
01473         }
01474       }
01475     } else {
01476       ret = DDS::RETCODE_OK;
01477       break;
01478     }
01479   }
01480 
01481   return ret;
01482 }
01483 
01484 bool
01485 WriteDataContainer::sequence_acknowledged(const SequenceNumber sequence)
01486 {
01487   if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01488     //return true here so that wait_for_acknowledgements doesn't block
01489     return true;
01490   }
01491 
01492   SequenceNumber acked = acked_sequences_.cumulative_ack();
01493   if (DCPS_debug_level >= 10) {
01494     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged ")
01495                           ACE_TEXT("- cumulative ack is currently: %q\n"), acked.getValue()));
01496   }
01497   if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
01498     //if acked_sequences_ is empty or its cumulative_ack is lower than
01499     //the requests sequence, return false
01500     return false;
01501   }
01502   return true;
01503 }
01504 
01505 void
01506 WriteDataContainer::wakeup_blocking_writers (DataSampleElement* stale)
01507 {
01508   if (stale && waiting_on_release_) {
01509     waiting_on_release_ = false;
01510 
01511     condition_.broadcast();
01512   }
01513 }
01514 
01515 void
01516 WriteDataContainer::log_send_state_lists (OPENDDS_STRING description)
01517 {
01518   ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
01519              description.c_str(),
01520              unsent_data_.size(),
01521              sending_data_.size(),
01522              sent_data_.size(),
01523              orphaned_to_transport_.size(),
01524              num_all_samples(),
01525              instances_.size()));
01526 }
01527 
01528 } // namespace OpenDDS
01529 } // namespace DCPS

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7