WriteDataContainer.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "ace/Condition_Recursive_Thread_Mutex.h"
00010 #include "WriteDataContainer.h"
00011 #include "DataSampleHeader.h"
00012 #include "InstanceDataSampleList.h"
00013 #include "DataWriterImpl.h"
00014 #include "MessageTracker.h"
00015 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00016 #include "DataDurabilityCache.h"
00017 #endif
00018 #include "PublicationInstance.h"
00019 #include "Util.h"
00020 #include "Time_Helper.h"
00021 #include "GuidConverter.h"
00022 #include "OfferedDeadlineWatchdog.h"
00023 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00024 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00025 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00026 
00027 #include "tao/debug.h"
00028 
00029 #include "ace/Auto_Ptr.h"
00030 
00031 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00032 
00033 namespace OpenDDS {
00034 namespace DCPS {
00035 
00036 /**
00037  * @todo Refactor this code and DataReaderImpl::data_expired() to
00038  *       a common function.
00039  */
00040 bool
00041 resend_data_expired(DataSampleElement const & element,
00042                     DDS::LifespanQosPolicy const & lifespan)
00043 {
00044   if (lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
00045       || lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
00046     // Finite lifespan.  Check if data has expired.
00047 
00048     DDS::Time_t const tmp = {
00049       element.get_header().source_timestamp_sec_ + lifespan.duration.sec,
00050       element.get_header().source_timestamp_nanosec_ + lifespan.duration.nanosec
00051     };
00052 
00053     ACE_Time_Value const now(ACE_OS::gettimeofday());
00054     ACE_Time_Value const expiration_time(time_to_time_value(tmp));
00055 
00056     if (now >= expiration_time) {
00057       if (DCPS_debug_level >= 8) {
00058         ACE_Time_Value const diff(now - expiration_time);
00059         ACE_DEBUG((LM_DEBUG,
00060                    ACE_TEXT("OpenDDS (%P|%t) Data to be sent ")
00061                    ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
00062                    diff.sec(),
00063                    diff.usec()));
00064       }
00065 
00066       return true;  // Data expired.
00067     }
00068   }
00069 
00070   return false;
00071 }
00072 
00073 WriteDataContainer::WriteDataContainer(
00074   DataWriterImpl* writer,
00075   CORBA::Long max_samples_per_instance,
00076   CORBA::Long history_depth,
00077   CORBA::Long max_durable_per_instance,
00078   DDS::Duration_t max_blocking_time,
00079   size_t         n_chunks,
00080   DDS::DomainId_t domain_id,
00081   char const * topic_name,
00082   char const * type_name,
00083 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00084   DataDurabilityCache* durability_cache,
00085   DDS::DurabilityServiceQosPolicy const & durability_service,
00086 #endif
00087   CORBA::Long     max_instances,
00088   CORBA::Long     max_total_samples)
00089   : transaction_id_(0),
00090     publication_id_(GUID_UNKNOWN),
00091     writer_(writer),
00092     max_samples_per_instance_(max_samples_per_instance),
00093     history_depth_(history_depth),
00094     max_durable_per_instance_(max_durable_per_instance),
00095     max_num_instances_(max_instances),
00096     max_num_samples_(max_total_samples),
00097     max_blocking_time_(max_blocking_time),
00098     waiting_on_release_(false),
00099     condition_(lock_),
00100     empty_condition_(lock_),
00101     wfa_condition_(this->wfa_lock_),
00102     n_chunks_(n_chunks),
00103     sample_list_element_allocator_(2 * n_chunks_),
00104     shutdown_(false),
00105     domain_id_(domain_id),
00106     topic_name_(topic_name),
00107     type_name_(type_name)
00108 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00109   , durability_cache_(durability_cache)
00110   , durability_service_(durability_service)
00111 #endif
00112 {
00113 
00114   if (DCPS_debug_level >= 2) {
00115     ACE_DEBUG((LM_DEBUG,
00116                "(%P|%t) WriteDataContainer "
00117                "sample_list_element_allocator %x with %d chunks\n",
00118                &sample_list_element_allocator_, n_chunks_));
00119   }
00120 }
00121 
00122 WriteDataContainer::~WriteDataContainer()
00123 {
00124   if (this->unsent_data_.size() > 0) {
00125     ACE_DEBUG((LM_WARNING,
00126                ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00127                ACE_TEXT("destroyed with %d samples unsent.\n"),
00128                this->unsent_data_.size()));
00129   }
00130 
00131   if (this->sending_data_.size() > 0) {
00132     if (TransportRegistry::instance()->released()) {
00133       for (DataSampleElement* e; sending_data_.dequeue_head(e);) {
00134         release_buffer(e);
00135       }
00136     }
00137     if (sending_data_.size() && DCPS_debug_level) {
00138       ACE_DEBUG((LM_WARNING,
00139                  ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00140                  ACE_TEXT("destroyed with %d samples sending.\n"),
00141                  this->sending_data_.size()));
00142     }
00143   }
00144 
00145   if (this->sent_data_.size() > 0) {
00146     ACE_DEBUG((LM_DEBUG,
00147                ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00148                ACE_TEXT("destroyed with %d samples sent.\n"),
00149                this->sent_data_.size()));
00150   }
00151 
00152   if (this->orphaned_to_transport_.size() > 0) {
00153     if (DCPS_debug_level > 0) {
00154       ACE_DEBUG((LM_DEBUG,
00155                  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00156                  ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
00157                  this->orphaned_to_transport_.size()));
00158     }
00159   }
00160 
00161   if (!shutdown_) {
00162     ACE_ERROR((LM_ERROR,
00163                ACE_TEXT("(%P|%t) ERROR: ")
00164                ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
00165                ACE_TEXT("The container has not been cleaned.\n")));
00166   }
00167 }
00168 
00169 DDS::ReturnCode_t
00170 WriteDataContainer::enqueue_control(DataSampleElement* control_sample)
00171 {
00172   // Enqueue to the next_send_sample_ thread of unsent_data_
00173   // will link samples with the next_sample/previous_sample and
00174   // also next_send_sample_.
00175   // This would save time when we actually send the data.
00176 
00177   unsent_data_.enqueue_tail(control_sample);
00178 
00179   return DDS::RETCODE_OK;
00180 }
00181 
00182 // This method assumes that instance list has space for this sample.
00183 DDS::ReturnCode_t
00184 WriteDataContainer::enqueue(
00185   DataSampleElement* sample,
00186   DDS::InstanceHandle_t instance_handle)
00187 {
00188   // Get the PublicationInstance pointer from InstanceHandle_t.
00189   PublicationInstance_rch instance =
00190     get_handle_instance(instance_handle);
00191   // Extract the instance queue.
00192   InstanceDataSampleList& instance_list = instance->samples_;
00193 
00194   if (this->writer_->watchdog_.in()) {
00195     instance->last_sample_tv_ = instance->cur_sample_tv_;
00196     instance->cur_sample_tv_ = ACE_OS::gettimeofday();
00197     this->writer_->watchdog_->execute(*this->writer_, instance, false);
00198   }
00199 
00200   //
00201   // Enqueue to the next_send_sample_ thread of unsent_data_
00202   // will link samples with the next_sample/previous_sample and
00203   // also next_send_sample_.
00204   // This would save time when we actually send the data.
00205 
00206   unsent_data_.enqueue_tail(sample);
00207 
00208   //
00209   // Add this sample to the INSTANCE scope list.
00210   instance_list.enqueue_tail(sample);
00211 
00212   return DDS::RETCODE_OK;
00213 }
00214 
00215 DDS::ReturnCode_t
00216 WriteDataContainer::reenqueue_all(const RepoId& reader_id,
00217                                   const DDS::LifespanQosPolicy& lifespan
00218 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00219                                   ,
00220                                   const OPENDDS_STRING& filterClassName,
00221                                   const FilterEvaluator* eval,
00222                                   const DDS::StringSeq& expression_params
00223 #endif
00224                                   )
00225 {
00226   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00227                    guard,
00228                    lock_,
00229                    DDS::RETCODE_ERROR);
00230 
00231   ssize_t total_size = 0;
00232   for (PublicationInstanceMapType::iterator it = instances_.begin();
00233        it != instances_.end(); ++it) {
00234     const ssize_t durable = std::min(it->second->samples_.size(),
00235                                      ssize_t(max_durable_per_instance_));
00236     total_size += durable;
00237     it->second->durable_samples_remaining_ = durable;
00238   }
00239 
00240   copy_and_prepend(resend_data_,
00241                    sending_data_,
00242                    reader_id,
00243                    lifespan,
00244 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00245                    filterClassName, eval, expression_params,
00246 #endif
00247                    total_size);
00248 
00249   copy_and_prepend(resend_data_,
00250                    sent_data_,
00251                    reader_id,
00252                    lifespan,
00253 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00254                    filterClassName, eval, expression_params,
00255 #endif
00256                    total_size);
00257 
00258   if (DCPS_debug_level > 9 && resend_data_.size()) {
00259     GuidConverter converter(publication_id_);
00260     GuidConverter reader(reader_id);
00261     ACE_DEBUG((LM_DEBUG,
00262                ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
00263                ACE_TEXT("domain %d topic %C publication %C copying ")
00264                ACE_TEXT("sending/sent to resend to %C.\n"),
00265                domain_id_,
00266                topic_name_,
00267                OPENDDS_STRING(converter).c_str(),
00268                OPENDDS_STRING(reader).c_str()));
00269   }
00270 
00271   return DDS::RETCODE_OK;
00272 }
00273 
00274 DDS::ReturnCode_t
00275 WriteDataContainer::register_instance(
00276   DDS::InstanceHandle_t&      instance_handle,
00277   Message_Block_Ptr&          registered_sample)
00278 {
00279   PublicationInstance_rch instance;
00280 
00281   if (instance_handle == DDS::HANDLE_NIL) {
00282     if (max_num_instances_ > 0
00283         && max_num_instances_ <= (CORBA::Long) instances_.size()) {
00284       return DDS::RETCODE_OUT_OF_RESOURCES;
00285     }
00286 
00287     // registered the instance for the first time.
00288     instance.reset(new PublicationInstance(move(registered_sample)), keep_count());
00289 
00290     instance_handle = this->writer_->get_next_handle();
00291 
00292     int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
00293 
00294     if (0 != insert_attempt) {
00295       ACE_ERROR((LM_ERROR,
00296                  ACE_TEXT("(%P|%t) ERROR: ")
00297                  ACE_TEXT("WriteDataContainer::register_instance, ")
00298                  ACE_TEXT("failed to insert instance handle=%X\n"),
00299                  instance.in()));
00300       return DDS::RETCODE_ERROR;
00301     } // if (0 != insert_attempt)
00302 
00303     instance->instance_handle_ = instance_handle;
00304 
00305   } else {
00306 
00307     int const find_attempt = find(instances_, instance_handle, instance);
00308 
00309     if (0 != find_attempt) {
00310       ACE_ERROR((LM_ERROR,
00311                  ACE_TEXT("(%P|%t) ERROR: ")
00312                  ACE_TEXT("WriteDataContainer::register_instance, ")
00313                  ACE_TEXT("The provided instance handle=%X is not a valid")
00314                  ACE_TEXT("handle.\n"),
00315                  instance_handle));
00316 
00317       return DDS::RETCODE_ERROR;
00318     } // if (0 != find_attempt)
00319 
00320     instance->unregistered_ = false;
00321   }
00322 
00323   // The registered_sample is shallow copied.
00324   registered_sample.reset(instance->registered_sample_->duplicate());
00325 
00326   if (this->writer_->watchdog_.in()) {
00327     this->writer_->watchdog_->schedule_timer(instance);
00328   }
00329 
00330   return DDS::RETCODE_OK;
00331 }
00332 
00333 DDS::ReturnCode_t
00334 WriteDataContainer::unregister(
00335   DDS::InstanceHandle_t   instance_handle,
00336   Message_Block_Ptr& registered_sample,
00337   bool                    dup_registered_sample)
00338 {
00339   PublicationInstance_rch instance;
00340 
00341   int const find_attempt = find(instances_, instance_handle, instance);
00342 
00343   if (0 != find_attempt) {
00344     ACE_ERROR_RETURN((LM_ERROR,
00345                       ACE_TEXT("(%P|%t) ERROR: ")
00346                       ACE_TEXT("WriteDataContainer::unregister, ")
00347                       ACE_TEXT("The instance(handle=%X) ")
00348                       ACE_TEXT("is not registered yet.\n"),
00349                       instance_handle),
00350                      DDS::RETCODE_PRECONDITION_NOT_MET);
00351   } // if (0 != find_attempt)
00352 
00353   instance->unregistered_ = true;
00354 
00355   if (dup_registered_sample) {
00356     // The registered_sample is shallow copied.
00357     registered_sample.reset(instance->registered_sample_->duplicate());
00358   }
00359 
00360   if (this->writer_->watchdog_.in())
00361     this->writer_->watchdog_->cancel_timer(instance);
00362 
00363   return DDS::RETCODE_OK;
00364 }
00365 
00366 DDS::ReturnCode_t
00367 WriteDataContainer::dispose(DDS::InstanceHandle_t instance_handle,
00368                             Message_Block_Ptr&    registered_sample,
00369                             bool                  dup_registered_sample)
00370 {
00371   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00372                    guard,
00373                    this->lock_,
00374                    DDS::RETCODE_ERROR);
00375 
00376   PublicationInstance_rch instance;
00377 
00378   int const find_attempt = find(instances_, instance_handle, instance);
00379 
00380   if (0 != find_attempt) {
00381     ACE_ERROR_RETURN((LM_ERROR,
00382                       ACE_TEXT("(%P|%t) ERROR: ")
00383                       ACE_TEXT("WriteDataContainer::dispose, ")
00384                       ACE_TEXT("The instance(handle=%X) ")
00385                       ACE_TEXT("is not registered yet.\n"),
00386                       instance_handle),
00387                      DDS::RETCODE_PRECONDITION_NOT_MET);
00388   }
00389 
00390   if (dup_registered_sample) {
00391     // The registered_sample is shallow copied.
00392     registered_sample.reset(instance->registered_sample_->duplicate());
00393   }
00394 
00395   // Note: The DDS specification is unclear as to if samples in the process
00396   // of being sent should be removed or not.
00397   // The advantage of calling remove_sample() on them is that the
00398   // cached allocator memory for them is freed.  The disadvantage
00399   // is that the slow reader may see multiple disposes without
00400   // any write sample between them and hence not temporarily move into the
00401   // Alive state.
00402   // We have chosen to NOT remove the sending samples.
00403 
00404   InstanceDataSampleList& instance_list = instance->samples_;
00405 
00406   while (instance_list.size() > 0) {
00407     bool released = false;
00408     DDS::ReturnCode_t ret
00409     = remove_oldest_sample(instance_list, released);
00410 
00411     if (ret != DDS::RETCODE_OK) {
00412       return ret;
00413     }
00414   }
00415 
00416   if (this->writer_->watchdog_.in())
00417     this->writer_->watchdog_->cancel_timer(instance);
00418   return DDS::RETCODE_OK;
00419 }
00420 
00421 DDS::ReturnCode_t
00422 WriteDataContainer::num_samples(DDS::InstanceHandle_t handle,
00423                                 size_t&                 size)
00424 {
00425   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00426                    guard,
00427                    this->lock_,
00428                    DDS::RETCODE_ERROR);
00429   PublicationInstance_rch instance;
00430 
00431   int const find_attempt = find(instances_, handle, instance);
00432 
00433   if (0 != find_attempt) {
00434     return DDS::RETCODE_ERROR;
00435 
00436   } else {
00437     size = instance->samples_.size();
00438     return DDS::RETCODE_OK;
00439   }
00440 }
00441 
00442 size_t
00443 WriteDataContainer::num_all_samples()
00444 {
00445   size_t size = 0;
00446 
00447   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00448                    guard,
00449                    this->lock_,
00450                    0);
00451 
00452   for (PublicationInstanceMapType::iterator iter = instances_.begin();
00453        iter != instances_.end();
00454        ++iter)
00455   {
00456     size += iter->second->samples_.size();
00457   }
00458 
00459   return size;
00460 }
00461 
00462 ACE_UINT64
00463 WriteDataContainer::get_unsent_data(SendStateDataSampleList& list)
00464 {
00465   DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
00466   //
00467   // The samples in unsent_data are added to the local datawriter
00468   // list and enqueued to the sending_data_ signifying they have
00469   // been passed to the transport to send in a transaction
00470   //
00471   list = this->unsent_data_;
00472 
00473   // Increment send counter for this send operation
00474   ++transaction_id_;
00475 
00476   // Mark all samples with current send counter
00477   SendStateDataSampleList::iterator iter = list.begin();
00478   while (iter != list.end()) {
00479     iter->set_transaction_id(this->transaction_id_);
00480     ++iter;
00481   }
00482 
00483   //
00484   // The unsent_data_ already linked with the
00485   // next_send_sample during enqueue.
00486   // Append the unsent_data_ to current sending_data_
00487   // list.
00488   sending_data_.enqueue_tail(list);
00489 
00490   //
00491   // Clear the unsent data list.
00492   //
00493   this->unsent_data_.reset();
00494 
00495   //
00496   // Return the moved list.
00497   //
00498   return transaction_id_;
00499 }
00500 
00501 SendStateDataSampleList
00502 WriteDataContainer::get_resend_data()
00503 {
00504   DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
00505 
00506   //
00507   // The samples in unsent_data are added to the sending_data
00508   // during enqueue.
00509   //
00510   SendStateDataSampleList list = this->resend_data_;
00511 
00512   //
00513   // Clear the unsent data list.
00514   //
00515   this->resend_data_.reset();
00516   //
00517   // Return the moved list.
00518   //
00519   return list;
00520 }
00521 
00522 bool
00523 WriteDataContainer::pending_data()
00524 {
00525   return this->sending_data_.size() != 0
00526          || this->orphaned_to_transport_.size() != 0
00527          || this->unsent_data_.size() != 0;
00528 }
00529 
00530 void
00531 WriteDataContainer::data_delivered(const DataSampleElement* sample)
00532 {
00533   DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
00534 
00535   if (DCPS_debug_level >= 2) {
00536     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
00537                           ACE_TEXT(" %@\n"), sample));
00538   }
00539 
00540   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00541             guard,
00542             this->lock_);
00543 
00544   // Delivered samples _must_ be on sending_data_ list
00545 
00546   // If it is not found in one of the lists, an invariant
00547   // exception is declared.
00548 
00549   // The element now needs to be removed from the sending_data_
00550   // list, and appended to the end of the sent_data_ list here
00551 
00552   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00553 
00554   // If sample is on a SendStateDataSampleList it should be on the
00555   // sending_data_ list signifying it was given to the transport to
00556   // deliver and now the transport is signaling it has been delivered
00557   if (!sending_data_.dequeue(sample)) {
00558     //
00559     // Should be on sending_data_.  If it is in sent_data_
00560     // or unsent_data there was a problem.
00561     //
00562     SendStateDataSampleList* send_lists[] = {
00563       &sent_data_,
00564       &unsent_data_,
00565       &orphaned_to_transport_};
00566     const SendStateDataSampleList* containing_list =
00567       SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00568 
00569     if (containing_list == &this->sent_data_) {
00570       ACE_ERROR((LM_WARNING,
00571                  ACE_TEXT("(%P|%t) WARNING: ")
00572                  ACE_TEXT("WriteDataContainer::data_delivered, ")
00573                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
00574                  ACE_TEXT("WAS IN sent_data_.\n")));
00575     } else if (containing_list == &this->unsent_data_) {
00576       ACE_ERROR((LM_WARNING,
00577                  ACE_TEXT("(%P|%t) WARNING: ")
00578                  ACE_TEXT("WriteDataContainer::data_delivered, ")
00579                  ACE_TEXT("The delivered sample is not in sending_data_ and ")
00580                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
00581     } else {
00582 
00583       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
00584       //and inform transport of their release.  Transport will call data-delivered on the
00585       //elements as it processes the removal but they will already be gone from the send lists.
00586       if (stale->get_header().message_id_ != SAMPLE_DATA) {
00587         //this message was a control message so release it
00588         if (DCPS_debug_level > 9) {
00589           GuidConverter converter(publication_id_);
00590           ACE_DEBUG((LM_DEBUG,
00591                      ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00592                      ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00593                      this->domain_id_,
00594                      this->topic_name_,
00595                      OPENDDS_STRING(converter).c_str()));
00596         }
00597         writer_->controlTracker.message_delivered();
00598       }
00599 
00600       if (containing_list == &this->orphaned_to_transport_) {
00601         orphaned_to_transport_.dequeue(sample);
00602         release_buffer(stale);
00603 
00604       } else if (!containing_list) {
00605         // samples that were retrieved from get_resend_data()
00606         SendStateDataSampleList::remove(stale);
00607         release_buffer(stale);
00608       }
00609 
00610       if (!pending_data())
00611         empty_condition_.broadcast();
00612     }
00613 
00614     return;
00615   }
00616   ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, this->wfa_lock_);
00617   SequenceNumber acked_seq = stale->get_header().sequence_;
00618   SequenceNumber prev_max = acked_sequences_.cumulative_ack();
00619 
00620   if (stale->get_header().message_id_ != SAMPLE_DATA) {
00621     //this message was a control message so release it
00622     if (DCPS_debug_level > 9) {
00623       GuidConverter converter(publication_id_);
00624       ACE_DEBUG((LM_DEBUG,
00625                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00626                  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00627                  this->domain_id_,
00628                  this->topic_name_,
00629                  OPENDDS_STRING(converter).c_str()));
00630     }
00631     release_buffer(stale);
00632     stale = 0;
00633     writer_->controlTracker.message_delivered();
00634   } else {
00635 
00636     if (max_durable_per_instance_) {
00637       DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
00638       sent_data_.enqueue_tail(sample);
00639 
00640     } else {
00641       if (InstanceDataSampleList::on_some_list(sample)) {
00642         PublicationInstance_rch inst = sample->get_handle();
00643         inst->samples_.dequeue(sample);
00644       }
00645       release_buffer(stale);
00646       stale = 0;
00647     }
00648 
00649     if (DCPS_debug_level > 9) {
00650       GuidConverter converter(publication_id_);
00651       ACE_DEBUG((LM_DEBUG,
00652                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00653                  ACE_TEXT("domain %d topic %C publication %C %s.\n"),
00654                  this->domain_id_,
00655                  this->topic_name_,
00656                  OPENDDS_STRING(converter).c_str(),
00657                  max_durable_per_instance_
00658                  ? ACE_TEXT("stored for durability")
00659                  : ACE_TEXT("released")));
00660     }
00661 
00662     this->wakeup_blocking_writers (stale);
00663   }
00664   if (DCPS_debug_level > 9) {
00665     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00666                          ACE_TEXT("Inserting acked_sequence: %q\n"),
00667                          acked_seq.getValue()));
00668   }
00669 
00670   acked_sequences_.insert(acked_seq);
00671 
00672   if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
00673       prev_max < acked_sequences_.cumulative_ack()) {
00674 
00675     if (DCPS_debug_level > 9) {
00676       ACE_DEBUG((LM_DEBUG,
00677                  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
00678                  ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
00679     }
00680 
00681     wfa_condition_.broadcast();
00682   }
00683 
00684   // Signal if there is no pending data.
00685   if (!pending_data())
00686     empty_condition_.broadcast();
00687 }
00688 
00689 void
00690 WriteDataContainer::data_dropped(const DataSampleElement* sample,
00691                                  bool dropped_by_transport)
00692 {
00693   DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
00694 
00695   if (DCPS_debug_level >= 2) {
00696     ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
00697                           ACE_TEXT(" sample %X dropped_by_transport %d\n"),
00698                           sample, dropped_by_transport));
00699   }
00700 
00701   // If the transport initiates the data dropping, we need do same thing
00702   // as data_delivered. e.g. remove the sample from the internal list
00703   // and the instance list. We do not need acquire the lock here since
00704   // the data_delivered acquires the lock.
00705   if (dropped_by_transport) {
00706     this->data_delivered(sample);
00707     return;
00708   }
00709 
00710   //The data_dropped could be called from the thread initiating sample remove
00711   //which already hold the lock. In this case, it's not necessary to acquire
00712   //lock here. It also could be called from the transport thread in a delayed
00713   //notification, it's necessary to acquire lock here to protect the internal
00714   //structures in this class.
00715 
00716   ACE_GUARD (ACE_Recursive_Thread_Mutex,
00717     guard,
00718     this->lock_);
00719 
00720   // The dropped sample should be in the sending_data_ list.
00721   // Otherwise an exception will be raised.
00722   //
00723   // We are now been notified by transport, so we can
00724   // keep the sample from the sending_data_ list still in
00725   // sample list since we will send it.
00726 
00727   DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00728 
00729   // If sample is on a SendStateDataSampleList it should be on the
00730   // sending_data_ list signifying it was given to the transport to
00731   // deliver and now the transport is signaling it has been dropped
00732 
00733   if (sending_data_.dequeue(sample)) {
00734     // else: The data_dropped is called as a result of remove_sample()
00735     // called from reenqueue_all() which supports the TRANSIENT_LOCAL
00736     // qos. The samples that are sending by transport are dropped from
00737     // transport and will be moved to the unsent list for resend.
00738     unsent_data_.enqueue_tail(sample);
00739 
00740   } else {
00741     //
00742     // If it is in sent_data_ or unsent_data there was a problem.
00743     //
00744     SendStateDataSampleList* send_lists[] = {
00745       &sent_data_,
00746       &unsent_data_,
00747       &orphaned_to_transport_};
00748     const SendStateDataSampleList* containing_list =
00749       SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00750 
00751     if (containing_list == &this->sent_data_) {
00752       ACE_ERROR((LM_WARNING,
00753                  ACE_TEXT("(%P|%t) WARNING: ")
00754                  ACE_TEXT("WriteDataContainer::data_dropped, ")
00755                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
00756                  ACE_TEXT("WAS IN sent_data_.\n")));
00757     } else if (containing_list == &this->unsent_data_) {
00758       ACE_ERROR((LM_WARNING,
00759                  ACE_TEXT("(%P|%t) WARNING: ")
00760                  ACE_TEXT("WriteDataContainer::data_dropped, ")
00761                  ACE_TEXT("The dropped sample is not in sending_data_ and ")
00762                  ACE_TEXT("WAS IN unsent_data_ list.\n")));
00763     } else {
00764 
00765       //No-op: elements may be removed from all WriteDataContainer lists during shutdown
00766       //and inform transport of their release.  Transport will call data-dropped on the
00767       //elements as it processes the removal but they will already be gone from the send lists.
00768       if (stale->get_header().message_id_ != SAMPLE_DATA) {
00769         //this message was a control message so release it
00770         if (DCPS_debug_level > 9) {
00771           GuidConverter converter(publication_id_);
00772           ACE_DEBUG((LM_DEBUG,
00773                      ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
00774                      ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
00775                      this->domain_id_,
00776                      this->topic_name_,
00777                      OPENDDS_STRING(converter).c_str()));
00778         }
00779         writer_->controlTracker.message_dropped();
00780       }
00781 
00782       if (containing_list == &this->orphaned_to_transport_) {
00783         orphaned_to_transport_.dequeue(sample);
00784         release_buffer(stale);
00785         if (!pending_data())
00786           empty_condition_.broadcast();
00787 
00788       } else if (!containing_list) {
00789         // samples that were retrieved from get_resend_data()
00790         SendStateDataSampleList::remove(stale);
00791         release_buffer(stale);
00792       }
00793     }
00794 
00795     return;
00796   }
00797 
00798   this->wakeup_blocking_writers (stale);
00799 
00800   if (!pending_data())
00801     empty_condition_.broadcast();
00802 }
00803 
00804 void
00805 WriteDataContainer::remove_excess_durable()
00806 {
00807   if (!max_durable_per_instance_)
00808     return;
00809 
00810   size_t n_released = 0;
00811 
00812   for (PublicationInstanceMapType::iterator iter = instances_.begin();
00813        iter != instances_.end();
00814        ++iter) {
00815 
00816     CORBA::Long durable_allowed = max_durable_per_instance_;
00817     InstanceDataSampleList& instance_list = iter->second->samples_;
00818 
00819     for (DataSampleElement* it = instance_list.tail(), *prev; it; it = prev) {
00820       prev = InstanceDataSampleList::prev(it);
00821 
00822       if (DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, it->get_sample())) {
00823 
00824         if (durable_allowed) {
00825           --durable_allowed;
00826         } else {
00827           instance_list.dequeue(it);
00828           sent_data_.dequeue(it);
00829           release_buffer(it);
00830           ++n_released;
00831         }
00832       }
00833     }
00834   }
00835 
00836   if (n_released && DCPS_debug_level > 9) {
00837     const GuidConverter converter(publication_id_);
00838     ACE_DEBUG((LM_DEBUG,
00839                ACE_TEXT("(%P|%t) WriteDataContainer::remove_excess_durable: ")
00840                ACE_TEXT("domain %d topic %C publication %C %B samples removed ")
00841                ACE_TEXT("from durable data.\n"), domain_id_, topic_name_,
00842                OPENDDS_STRING(converter).c_str(), n_released));
00843   }
00844 }
00845 
00846 
00847 DDS::ReturnCode_t
00848 WriteDataContainer::remove_oldest_sample(
00849   InstanceDataSampleList& instance_list,
00850   bool& released)
00851 {
00852   DataSampleElement* stale = 0;
00853 
00854   //
00855   // Remove the oldest sample from the instance list.
00856   //
00857   if (instance_list.dequeue_head(stale) == false) {
00858     ACE_ERROR_RETURN((LM_ERROR,
00859                       ACE_TEXT("(%P|%t) ERROR: ")
00860                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00861                       ACE_TEXT("dequeue_head_next_sample failed\n")),
00862                      DDS::RETCODE_ERROR);
00863   }
00864 
00865   //
00866   // Remove the stale data from the next_writer_sample_ list.  The
00867   // sending_data_/next_send_sample_ list is not managed within the
00868   // container, it is only used external to the container and does
00869   // not need to be managed internally.
00870   //
00871   // The next_writer_sample_ link is being used in one of the sent_data_,
00872   // sending_data_, or unsent_data lists.  Removal from the doubly
00873   // linked list needs to repair the list only when the stale sample
00874   // is either the head or tail of the list.
00875   //
00876 
00877   //
00878   // Locate the head of the list that the stale data is in.
00879   //
00880   SendStateDataSampleList* send_lists[] = {
00881     &sending_data_,
00882     &sent_data_,
00883     &unsent_data_,
00884     &orphaned_to_transport_};
00885   const SendStateDataSampleList* containing_list =
00886     SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00887 
00888   //
00889   // Identify the list that the stale data is in.
00890   // The stale data should be in one of the sent_data_, sending_data_
00891   // or unsent_data_. It should not be in released_data_ list since
00892   // this function is the only place a sample is moved from
00893   // sending_data_ to released_data_ list.
00894 
00895   // Remove the element from the internal list.
00896   bool result = false;
00897 
00898   if (containing_list == &this->sending_data_) {
00899     if (DCPS_debug_level > 2) {
00900       ACE_ERROR((LM_WARNING,
00901                  ACE_TEXT("(%P|%t) WARNING: ")
00902                  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00903                  ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
00904     }
00905 
00906     // This means transport is still using the sample that needs to
00907     // be released currently so notify transport that sample is being removed.
00908 
00909     if (this->writer_->remove_sample(stale)) {
00910       if (this->sent_data_.dequeue(stale)) {
00911         release_buffer(stale);
00912         result = true;
00913       }
00914 
00915     } else {
00916       if (this->sending_data_.dequeue(stale)) {
00917         this->orphaned_to_transport_.enqueue_tail(stale);
00918       } else if (this->sent_data_.dequeue(stale)) {
00919         release_buffer(stale);
00920         result = true;
00921       }
00922       result = true;
00923     }
00924     released = true;
00925 
00926   } else if (containing_list == &this->sent_data_) {
00927     // No one is using the data sample, so we can release it back to
00928     // its allocator.
00929     //
00930     result = this->sent_data_.dequeue(stale) != 0;
00931     release_buffer(stale);
00932     released = true;
00933 
00934     if (DCPS_debug_level > 9) {
00935       GuidConverter converter(publication_id_);
00936       ACE_DEBUG((LM_DEBUG,
00937                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00938                  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00939                  this->domain_id_,
00940                  this->topic_name_,
00941                  OPENDDS_STRING(converter).c_str()));
00942     }
00943 
00944   } else if (containing_list == &this->unsent_data_) {
00945     //
00946     // No one is using the data sample, so we can release it back to
00947     // its allocator.
00948     //
00949     result = this->unsent_data_.dequeue(stale) != 0;
00950     release_buffer(stale);
00951     released = true;
00952 
00953     if (DCPS_debug_level > 9) {
00954       GuidConverter converter(publication_id_);
00955       ACE_DEBUG((LM_DEBUG,
00956                  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00957                  ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
00958                  this->domain_id_,
00959                  this->topic_name_,
00960                  OPENDDS_STRING(converter).c_str()));
00961     }
00962   } else {
00963     ACE_ERROR_RETURN((LM_ERROR,
00964                       ACE_TEXT("(%P|%t) ERROR: ")
00965                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00966                       ACE_TEXT("The oldest sample is not in any internal list.\n")),
00967                      DDS::RETCODE_ERROR);
00968   }
00969 
00970   // Signal if there is no pending data.
00971   {
00972     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00973                      guard,
00974                      this->lock_,
00975                      DDS::RETCODE_ERROR);
00976 
00977     if (!pending_data())
00978       empty_condition_.broadcast();
00979   }
00980 
00981   if (result == false) {
00982     ACE_ERROR_RETURN((LM_ERROR,
00983                       ACE_TEXT("(%P|%t) ERROR: ")
00984                       ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00985                       ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00986                      DDS::RETCODE_ERROR);
00987 
00988   }
00989 
00990   return DDS::RETCODE_OK;
00991 }
00992 
00993 DDS::ReturnCode_t
00994 WriteDataContainer::obtain_buffer_for_control(DataSampleElement*& element)
00995 {
00996   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
00997 
00998   ACE_NEW_MALLOC_RETURN(
00999     element,
01000     static_cast<DataSampleElement*>(
01001       sample_list_element_allocator_.malloc(
01002         sizeof(DataSampleElement))),
01003     DataSampleElement(publication_id_,
01004                       this->writer_,
01005                       PublicationInstance_rch()),
01006     DDS::RETCODE_ERROR);
01007 
01008   return DDS::RETCODE_OK;
01009 }
01010 
01011 DDS::ReturnCode_t
01012 WriteDataContainer::obtain_buffer(DataSampleElement*& element,
01013                                   DDS::InstanceHandle_t handle)
01014 {
01015   DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
01016 
01017   remove_excess_durable();
01018 
01019   PublicationInstance_rch instance = get_handle_instance(handle);
01020 
01021   if (!instance) {
01022     return DDS::RETCODE_BAD_PARAMETER;
01023   }
01024 
01025   ACE_NEW_MALLOC_RETURN(
01026     element,
01027     static_cast<DataSampleElement*>(
01028       sample_list_element_allocator_.malloc(
01029         sizeof(DataSampleElement))),
01030     DataSampleElement(publication_id_,
01031                           this->writer_,
01032                           instance),
01033     DDS::RETCODE_ERROR);
01034 
01035   // Extract the current instance queue.
01036   InstanceDataSampleList& instance_list = instance->samples_;
01037   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01038 
01039   bool need_to_set_abs_timeout = true;
01040   ACE_Time_Value abs_timeout;
01041 
01042   //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and
01043   //max_instances and max_instances * depth
01044   while ((instance_list.size() >= max_samples_per_instance_) ||
01045          ((this->max_num_samples_ > 0) &&
01046          ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
01047 
01048     if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01049       if (instance_list.size() >= history_depth_) {
01050         if (DCPS_debug_level >= 2) {
01051           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01052                      ACE_TEXT(" instance %d attempting to remove")
01053                      ACE_TEXT(" its oldest sample (reliable)\n"),
01054                      handle));
01055         }
01056         bool oldest_released = false;
01057         ret = remove_oldest_sample(instance_list, oldest_released);
01058         if (oldest_released) {
01059           break;
01060         }
01061       }
01062       // Reliable writers can wait
01063       if (need_to_set_abs_timeout) {
01064         abs_timeout = duration_to_absolute_time_value (max_blocking_time_);
01065         need_to_set_abs_timeout = false;
01066       }
01067       if (!shutdown_ && ACE_OS::gettimeofday() < abs_timeout) {
01068         if (DCPS_debug_level >= 2) {
01069           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01070                                 ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
01071                       handle));
01072         }
01073 
01074         waiting_on_release_ = true;
01075         int const wait_result = condition_.wait(&abs_timeout);
01076 
01077         if (wait_result == 0) {
01078           remove_excess_durable();
01079 
01080         } else {
01081           if (errno == ETIME) {
01082             if (DCPS_debug_level >= 2) {
01083               ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01084                                     ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
01085                           handle));
01086             }
01087             ret = DDS::RETCODE_TIMEOUT;
01088 
01089           } else {
01090             ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: WriteDataContainer::obtain_buffer condition_.wait()")
01091                                   ACE_TEXT("%p\n")));
01092             ret = DDS::RETCODE_ERROR;
01093           }
01094         }
01095 
01096       } else {
01097         //either shutdown has been signaled or max_blocking_time
01098         //has surpassed so treat as timeout
01099         ret = DDS::RETCODE_TIMEOUT;
01100       }
01101 
01102     } else {
01103       //BEST EFFORT
01104       bool oldest_released = false;
01105 
01106       //try to remove stale samples from this instance
01107       // The remove_oldest_sample() method removes the oldest sample
01108       // from instance list and removes it from the internal lists.
01109       if (instance_list.size() > 0) {
01110         if (DCPS_debug_level >= 2) {
01111           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01112                                 ACE_TEXT(" instance %d attempting to remove")
01113                                 ACE_TEXT(" its oldest sample\n"),
01114                                 handle));
01115         }
01116         ret = remove_oldest_sample(instance_list, oldest_released);
01117       }
01118       //else try to remove stale samples from other instances which are full
01119       if (ret == DDS::RETCODE_OK && !oldest_released) {
01120         if (DCPS_debug_level >= 2) {
01121           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01122                                 ACE_TEXT(" instance %d attempting to remove")
01123                                 ACE_TEXT(" oldest sample from any full instances\n"),
01124                                 handle));
01125         }
01126         PublicationInstanceMapType::iterator it = instances_.begin();
01127 
01128         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01129           if (it->second->samples_.size() >= max_samples_per_instance_) {
01130             ret = remove_oldest_sample(it->second->samples_, oldest_released);
01131           }
01132           ++it;
01133         }
01134       }
01135       //else try to remove stale samples from other non-full instances
01136       if (ret == DDS::RETCODE_OK && !oldest_released) {
01137         if (DCPS_debug_level >= 2) {
01138           ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01139                                 ACE_TEXT(" instance %d attempting to remove")
01140                                 ACE_TEXT(" oldest sample from any instance with samples currently\n"),
01141                                 handle));
01142         }
01143         PublicationInstanceMapType::iterator it = instances_.begin();
01144 
01145         while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01146           if (it->second->samples_.size() > 0) {
01147             ret = remove_oldest_sample(it->second->samples_, oldest_released);
01148           }
01149           ++it;
01150         }
01151       }
01152       if (!oldest_released) {
01153         //This means that no instances have samples to remove and yet
01154         //still hitting resource limits.
01155         ACE_ERROR((LM_ERROR,
01156                    ACE_TEXT("(%P|%t) ERROR: ")
01157                    ACE_TEXT("WriteDataContainer::obtain_buffer, ")
01158                    ACE_TEXT("hitting resource limits with no samples to remove\n")));
01159         ret = DDS::RETCODE_ERROR;
01160       }
01161     }  //END BEST EFFORT
01162 
01163     if (ret != DDS::RETCODE_OK) {
01164       if (DCPS_debug_level >= 2) {
01165         ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01166                               ACE_TEXT(" instance %d could not obtain buffer for sample")
01167                               ACE_TEXT(" releasing allotted sample and returning\n"),
01168                               handle));
01169       }
01170       this->release_buffer(element);
01171       return ret;
01172     }
01173   }  //END WHILE
01174 
01175   data_holder_.enqueue_tail(element);
01176 
01177   return ret;
01178 }
01179 
01180 void
01181 WriteDataContainer::release_buffer(DataSampleElement* element)
01182 {
01183   if (element->get_header().message_id_ == SAMPLE_DATA)
01184     data_holder_.dequeue(element);
01185   // Release the memory to the allocator.
01186   ACE_DES_FREE(element,
01187                sample_list_element_allocator_.free,
01188                DataSampleElement);
01189 }
01190 
01191 void
01192 WriteDataContainer::unregister_all()
01193 {
01194   DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
01195   shutdown_ = true;
01196 
01197   {
01198     //The internal list needs protection since this call may result from the
01199     //the delete_datawriter call which does not acquire the lock in advance.
01200     ACE_GUARD(ACE_Recursive_Thread_Mutex,
01201               guard,
01202               this->lock_);
01203     // Tell transport remove all control messages currently
01204     // transport is processing.
01205     (void) this->writer_->remove_all_msgs();
01206 
01207     // Broadcast to wake up all waiting threads.
01208     if (waiting_on_release_) {
01209       condition_.broadcast();
01210     }
01211   }
01212   DDS::ReturnCode_t ret;
01213   Message_Block_Ptr registered_sample;
01214   PublicationInstanceMapType::iterator it = instances_.begin();
01215 
01216   while (it != instances_.end()) {
01217     // Release the instance data.
01218     ret = dispose(it->first, registered_sample, false);
01219 
01220     if (ret != DDS::RETCODE_OK) {
01221       ACE_ERROR((LM_ERROR,
01222                  ACE_TEXT("(%P|%t) ERROR: ")
01223                  ACE_TEXT("WriteDataContainer::unregister_all, ")
01224                  ACE_TEXT("dispose instance %X failed\n"),
01225                  it->first));
01226     }
01227     // Mark the instance unregistered.
01228     ret = unregister(it->first, registered_sample, false);
01229 
01230     if (ret != DDS::RETCODE_OK) {
01231       ACE_ERROR((LM_ERROR,
01232                  ACE_TEXT("(%P|%t) ERROR: ")
01233                  ACE_TEXT("WriteDataContainer::unregister_all, ")
01234                  ACE_TEXT("unregister instance %X failed\n"),
01235                  it->first));
01236     }
01237 
01238     // Get the next iterator before erase the instance handle.
01239     PublicationInstanceMapType::iterator it_next = it;
01240     ++it_next;
01241     // Remove the instance from the instance list.
01242     unbind(instances_, it->first);
01243     it = it_next;
01244   }
01245 
01246   ACE_UNUSED_ARG(registered_sample);
01247 }
01248 
01249 PublicationInstance_rch
01250 WriteDataContainer::get_handle_instance(DDS::InstanceHandle_t handle)
01251 {
01252   PublicationInstance_rch instance;
01253 
01254   if (0 != find(instances_, handle, instance)) {
01255     ACE_DEBUG((LM_DEBUG,
01256                ACE_TEXT("(%P|%t) ")
01257                ACE_TEXT("WriteDataContainer::get_handle_instance, ")
01258                ACE_TEXT("lookup for %d failed\n"), handle));
01259   }
01260 
01261   return instance;
01262 }
01263 
01264 void
01265 WriteDataContainer::copy_and_prepend(SendStateDataSampleList& list,
01266                                      const SendStateDataSampleList& appended,
01267                                      const RepoId& reader_id,
01268                                      const DDS::LifespanQosPolicy& lifespan,
01269 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01270                                      const OPENDDS_STRING& filterClassName,
01271                                      const FilterEvaluator* eval,
01272                                      const DDS::StringSeq& params,
01273 #endif
01274                                      ssize_t& max_resend_samples)
01275 {
01276   for (SendStateDataSampleList::const_reverse_iterator cur = appended.rbegin();
01277        cur != appended.rend() && max_resend_samples; ++cur) {
01278 
01279     if (resend_data_expired(*cur, lifespan))
01280       continue;
01281 
01282 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01283     if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
01284       continue;
01285 #endif
01286 
01287     PublicationInstance_rch inst = cur->get_handle();
01288 
01289     if (!inst) {
01290       // *cur is a control message, just skip it
01291       continue;
01292     }
01293 
01294     if (inst->durable_samples_remaining_ == 0)
01295       continue;
01296     --inst->durable_samples_remaining_;
01297 
01298     DataSampleElement* element = 0;
01299     ACE_NEW_MALLOC(element,
01300                    static_cast<DataSampleElement*>(
01301                      sample_list_element_allocator_.malloc(
01302                        sizeof(DataSampleElement))),
01303                    DataSampleElement(*cur));
01304 
01305     element->set_num_subs(1);
01306     element->set_sub_id(0, reader_id);
01307 
01308     list.enqueue_head(element);
01309     --max_resend_samples;
01310   }
01311 }
01312 
01313 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01314 bool
01315 WriteDataContainer::persist_data()
01316 {
01317   bool result = true;
01318 
01319   // ------------------------------------------------------------
01320   // Transfer sent data to data DURABILITY cache.
01321   // ------------------------------------------------------------
01322   if (this->durability_cache_) {
01323     // A data durability cache is available for TRANSIENT or
01324     // PERSISTENT data durability.  Cache the data samples.
01325 
01326     //
01327     //  We only cache data that is not still in use outside of
01328     //  this instance of WriteDataContainer
01329     //  (only cache samples in sent_data_ meaning transport has delivered).
01330     bool const inserted =
01331       this->durability_cache_->insert(this->domain_id_,
01332                                       this->topic_name_,
01333                                       this->type_name_,
01334                                       this->sent_data_,
01335                                       this->durability_service_
01336                                      );
01337 
01338     result = inserted;
01339 
01340     if (!inserted)
01341       ACE_ERROR((LM_ERROR,
01342                  ACE_TEXT("(%P|%t) ERROR: ")
01343                  ACE_TEXT("WriteDataContainer::persist_data, ")
01344                  ACE_TEXT("failed to make data durable for ")
01345                  ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
01346                  this->domain_id_,
01347                  this->topic_name_,
01348                  this->type_name_));
01349   }
01350 
01351   return result;
01352 }
01353 #endif
01354 
01355 void WriteDataContainer::reschedule_deadline()
01356 {
01357   for (PublicationInstanceMapType::iterator iter = instances_.begin();
01358        iter != instances_.end();
01359        ++iter) {
01360     if (iter->second->deadline_timer_id_ != -1) {
01361       if (this->writer_->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
01362         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WriteDataContainer::reschedule_deadline %p\n")
01363                    ACE_TEXT("reset_timer_interval")));
01364       }
01365     }
01366   }
01367 }
01368 
01369 void
01370 WriteDataContainer::wait_pending()
01371 {
01372   ACE_Time_Value pending_timeout =
01373     TheServiceParticipant->pending_timeout();
01374 
01375   ACE_Time_Value* pTimeout = 0;
01376 
01377   if (pending_timeout != ACE_Time_Value::zero) {
01378     pTimeout = &pending_timeout;
01379     pending_timeout += ACE_OS::gettimeofday();
01380   }
01381 
01382   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01383   const bool report = DCPS_debug_level > 0 && pending_data();
01384   if (report) {
01385     if (pending_timeout == ACE_Time_Value::zero) {
01386       ACE_DEBUG((LM_DEBUG,
01387                  ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending no timeout\n")));
01388     } else {
01389       ACE_DEBUG((LM_DEBUG,
01390                  ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending timeout ")
01391                  ACE_TEXT("at %#T\n"),
01392                  &pending_timeout));
01393     }
01394   }
01395   while (true) {
01396 
01397     if (!pending_data())
01398       break;
01399 
01400     if (empty_condition_.wait(pTimeout) == -1 && pending_data()) {
01401       if (DCPS_debug_level) {
01402         ACE_DEBUG((LM_INFO,
01403                    ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending %p\n"),
01404                    ACE_TEXT("Timed out waiting for messages to be transported")));
01405         this->log_send_state_lists("WriteDataContainer::wait_pending - wait failed: ");
01406       }
01407       break;
01408     }
01409   }
01410   if (report) {
01411     ACE_DEBUG((LM_DEBUG,
01412                "%T (%P|%t) WriteDataContainer::wait_pending done\n"));
01413   }
01414 }
01415 
01416 void
01417 WriteDataContainer::get_instance_handles(InstanceHandleVec& instance_handles)
01418 {
01419   ACE_GUARD(ACE_Recursive_Thread_Mutex,
01420             guard,
01421             this->lock_);
01422   PublicationInstanceMapType::iterator it = instances_.begin();
01423 
01424   while (it != instances_.end()) {
01425     instance_handles.push_back(it->second->instance_handle_);
01426     ++it;
01427   }
01428 }
01429 
01430 DDS::ReturnCode_t
01431 WriteDataContainer::wait_ack_of_seq(const ACE_Time_Value& abs_deadline, const SequenceNumber& sequence)
01432 {
01433   ACE_Time_Value deadline(abs_deadline);
01434   DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01435   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->wfa_lock_, DDS::RETCODE_ERROR);
01436 
01437   while (ACE_OS::gettimeofday() < deadline) {
01438 
01439     if (!sequence_acknowledged(sequence)) {
01440       // lock is released while waiting and acquired before returning
01441       // from wait.
01442       int const wait_result = wfa_condition_.wait(&deadline);
01443 
01444       if (wait_result != 0) {
01445         if (errno == ETIME) {
01446           if (DCPS_debug_level >= 2) {
01447             ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
01448                                   ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
01449                                   sequence.getValue()));
01450           }
01451           ret = DDS::RETCODE_TIMEOUT;
01452         } else {
01453           ret = DDS::RETCODE_ERROR;
01454         }
01455       }
01456     } else {
01457       ret = DDS::RETCODE_OK;
01458       break;
01459     }
01460   }
01461 
01462   return ret;
01463 }
01464 
01465 bool
01466 WriteDataContainer::sequence_acknowledged(const SequenceNumber sequence)
01467 {
01468   if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01469     //return true here so that wait_for_acknowledgements doesn't block
01470     return true;
01471   }
01472 
01473   SequenceNumber acked = acked_sequences_.cumulative_ack();
01474   if (DCPS_debug_level >= 10) {
01475     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged ")
01476                           ACE_TEXT("- cumulative ack is currently: %q\n"), acked.getValue()));
01477   }
01478   if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
01479     //if acked_sequences_ is empty or its cumulative_ack is lower than
01480     //the requests sequence, return false
01481     return false;
01482   }
01483   return true;
01484 }
01485 
01486 void
01487 WriteDataContainer::wakeup_blocking_writers (DataSampleElement* stale)
01488 {
01489   if (!stale && waiting_on_release_) {
01490     waiting_on_release_ = false;
01491 
01492     condition_.broadcast();
01493   }
01494 }
01495 
01496 void
01497 WriteDataContainer::log_send_state_lists (OPENDDS_STRING description)
01498 {
01499   ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
01500              description.c_str(),
01501              unsent_data_.size(),
01502              sending_data_.size(),
01503              sent_data_.size(),
01504              orphaned_to_transport_.size(),
01505              num_all_samples(),
01506              instances_.size()));
01507 }
01508 
01509 } // namespace OpenDDS
01510 } // namespace DCPS
01511 
01512 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1