00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "ace/Condition_Recursive_Thread_Mutex.h"
00010 #include "WriteDataContainer.h"
00011 #include "DataSampleHeader.h"
00012 #include "InstanceDataSampleList.h"
00013 #include "DataWriterImpl.h"
00014 #include "MessageTracker.h"
00015 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00016 #include "DataDurabilityCache.h"
00017 #endif
00018 #include "PublicationInstance.h"
00019 #include "Util.h"
00020 #include "Time_Helper.h"
00021 #include "GuidConverter.h"
00022 #include "OfferedDeadlineWatchdog.h"
00023 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00024 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00025 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00026
00027 #include "tao/debug.h"
00028
00029 #include "ace/Auto_Ptr.h"
00030
00031 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00032
00033 namespace OpenDDS {
00034 namespace DCPS {
00035
00036
00037
00038
00039
00040 bool
00041 resend_data_expired(DataSampleElement const & element,
00042 DDS::LifespanQosPolicy const & lifespan)
00043 {
00044 if (lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
00045 || lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
00046
00047
00048 DDS::Time_t const tmp = {
00049 element.get_header().source_timestamp_sec_ + lifespan.duration.sec,
00050 element.get_header().source_timestamp_nanosec_ + lifespan.duration.nanosec
00051 };
00052
00053 ACE_Time_Value const now(ACE_OS::gettimeofday());
00054 ACE_Time_Value const expiration_time(time_to_time_value(tmp));
00055
00056 if (now >= expiration_time) {
00057 if (DCPS_debug_level >= 8) {
00058 ACE_Time_Value const diff(now - expiration_time);
00059 ACE_DEBUG((LM_DEBUG,
00060 ACE_TEXT("OpenDDS (%P|%t) Data to be sent ")
00061 ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
00062 diff.sec(),
00063 diff.usec()));
00064 }
00065
00066 return true;
00067 }
00068 }
00069
00070 return false;
00071 }
00072
00073 WriteDataContainer::WriteDataContainer(
00074 DataWriterImpl* writer,
00075 CORBA::Long max_samples_per_instance,
00076 CORBA::Long history_depth,
00077 CORBA::Long max_durable_per_instance,
00078 DDS::Duration_t max_blocking_time,
00079 size_t n_chunks,
00080 DDS::DomainId_t domain_id,
00081 char const * topic_name,
00082 char const * type_name,
00083 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00084 DataDurabilityCache* durability_cache,
00085 DDS::DurabilityServiceQosPolicy const & durability_service,
00086 #endif
00087 CORBA::Long max_instances,
00088 CORBA::Long max_total_samples)
00089 : transaction_id_(0),
00090 publication_id_(GUID_UNKNOWN),
00091 writer_(writer),
00092 max_samples_per_instance_(max_samples_per_instance),
00093 history_depth_(history_depth),
00094 max_durable_per_instance_(max_durable_per_instance),
00095 max_num_instances_(max_instances),
00096 max_num_samples_(max_total_samples),
00097 max_blocking_time_(max_blocking_time),
00098 waiting_on_release_(false),
00099 condition_(lock_),
00100 empty_condition_(lock_),
00101 wfa_condition_(this->wfa_lock_),
00102 n_chunks_(n_chunks),
00103 sample_list_element_allocator_(2 * n_chunks_),
00104 shutdown_(false),
00105 domain_id_(domain_id),
00106 topic_name_(topic_name),
00107 type_name_(type_name)
00108 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00109 , durability_cache_(durability_cache)
00110 , durability_service_(durability_service)
00111 #endif
00112 {
00113
00114 if (DCPS_debug_level >= 2) {
00115 ACE_DEBUG((LM_DEBUG,
00116 "(%P|%t) WriteDataContainer "
00117 "sample_list_element_allocator %x with %d chunks\n",
00118 &sample_list_element_allocator_, n_chunks_));
00119 }
00120 }
00121
00122 WriteDataContainer::~WriteDataContainer()
00123 {
00124 if (this->unsent_data_.size() > 0) {
00125 ACE_DEBUG((LM_WARNING,
00126 ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00127 ACE_TEXT("destroyed with %d samples unsent.\n"),
00128 this->unsent_data_.size()));
00129 }
00130
00131 if (this->sending_data_.size() > 0) {
00132 if (TransportRegistry::instance()->released()) {
00133 for (DataSampleElement* e; sending_data_.dequeue_head(e);) {
00134 release_buffer(e);
00135 }
00136 }
00137 if (sending_data_.size() && DCPS_debug_level) {
00138 ACE_DEBUG((LM_WARNING,
00139 ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00140 ACE_TEXT("destroyed with %d samples sending.\n"),
00141 this->sending_data_.size()));
00142 }
00143 }
00144
00145 if (this->sent_data_.size() > 0) {
00146 ACE_DEBUG((LM_DEBUG,
00147 ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00148 ACE_TEXT("destroyed with %d samples sent.\n"),
00149 this->sent_data_.size()));
00150 }
00151
00152 if (this->orphaned_to_transport_.size() > 0) {
00153 if (DCPS_debug_level > 0) {
00154 ACE_DEBUG((LM_DEBUG,
00155 ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00156 ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
00157 this->orphaned_to_transport_.size()));
00158 }
00159 }
00160
00161 if (!shutdown_) {
00162 ACE_ERROR((LM_ERROR,
00163 ACE_TEXT("(%P|%t) ERROR: ")
00164 ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
00165 ACE_TEXT("The container has not been cleaned.\n")));
00166 }
00167 }
00168
00169 DDS::ReturnCode_t
00170 WriteDataContainer::enqueue_control(DataSampleElement* control_sample)
00171 {
00172
00173
00174
00175
00176
00177 unsent_data_.enqueue_tail(control_sample);
00178
00179 return DDS::RETCODE_OK;
00180 }
00181
00182
00183 DDS::ReturnCode_t
00184 WriteDataContainer::enqueue(
00185 DataSampleElement* sample,
00186 DDS::InstanceHandle_t instance_handle)
00187 {
00188
00189 PublicationInstance_rch instance =
00190 get_handle_instance(instance_handle);
00191
00192 InstanceDataSampleList& instance_list = instance->samples_;
00193
00194 if (this->writer_->watchdog_.in()) {
00195 instance->last_sample_tv_ = instance->cur_sample_tv_;
00196 instance->cur_sample_tv_ = ACE_OS::gettimeofday();
00197 this->writer_->watchdog_->execute(*this->writer_, instance, false);
00198 }
00199
00200
00201
00202
00203
00204
00205
00206 unsent_data_.enqueue_tail(sample);
00207
00208
00209
00210 instance_list.enqueue_tail(sample);
00211
00212 return DDS::RETCODE_OK;
00213 }
00214
00215 DDS::ReturnCode_t
00216 WriteDataContainer::reenqueue_all(const RepoId& reader_id,
00217 const DDS::LifespanQosPolicy& lifespan
00218 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00219 ,
00220 const OPENDDS_STRING& filterClassName,
00221 const FilterEvaluator* eval,
00222 const DDS::StringSeq& expression_params
00223 #endif
00224 )
00225 {
00226 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00227 guard,
00228 lock_,
00229 DDS::RETCODE_ERROR);
00230
00231 ssize_t total_size = 0;
00232 for (PublicationInstanceMapType::iterator it = instances_.begin();
00233 it != instances_.end(); ++it) {
00234 const ssize_t durable = std::min(it->second->samples_.size(),
00235 ssize_t(max_durable_per_instance_));
00236 total_size += durable;
00237 it->second->durable_samples_remaining_ = durable;
00238 }
00239
00240 copy_and_prepend(resend_data_,
00241 sending_data_,
00242 reader_id,
00243 lifespan,
00244 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00245 filterClassName, eval, expression_params,
00246 #endif
00247 total_size);
00248
00249 copy_and_prepend(resend_data_,
00250 sent_data_,
00251 reader_id,
00252 lifespan,
00253 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00254 filterClassName, eval, expression_params,
00255 #endif
00256 total_size);
00257
00258 if (DCPS_debug_level > 9 && resend_data_.size()) {
00259 GuidConverter converter(publication_id_);
00260 GuidConverter reader(reader_id);
00261 ACE_DEBUG((LM_DEBUG,
00262 ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
00263 ACE_TEXT("domain %d topic %C publication %C copying ")
00264 ACE_TEXT("sending/sent to resend to %C.\n"),
00265 domain_id_,
00266 topic_name_,
00267 OPENDDS_STRING(converter).c_str(),
00268 OPENDDS_STRING(reader).c_str()));
00269 }
00270
00271 return DDS::RETCODE_OK;
00272 }
00273
00274 DDS::ReturnCode_t
00275 WriteDataContainer::register_instance(
00276 DDS::InstanceHandle_t& instance_handle,
00277 Message_Block_Ptr& registered_sample)
00278 {
00279 PublicationInstance_rch instance;
00280
00281 if (instance_handle == DDS::HANDLE_NIL) {
00282 if (max_num_instances_ > 0
00283 && max_num_instances_ <= (CORBA::Long) instances_.size()) {
00284 return DDS::RETCODE_OUT_OF_RESOURCES;
00285 }
00286
00287
00288 instance.reset(new PublicationInstance(move(registered_sample)), keep_count());
00289
00290 instance_handle = this->writer_->get_next_handle();
00291
00292 int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
00293
00294 if (0 != insert_attempt) {
00295 ACE_ERROR((LM_ERROR,
00296 ACE_TEXT("(%P|%t) ERROR: ")
00297 ACE_TEXT("WriteDataContainer::register_instance, ")
00298 ACE_TEXT("failed to insert instance handle=%X\n"),
00299 instance.in()));
00300 return DDS::RETCODE_ERROR;
00301 }
00302
00303 instance->instance_handle_ = instance_handle;
00304
00305 } else {
00306
00307 int const find_attempt = find(instances_, instance_handle, instance);
00308
00309 if (0 != find_attempt) {
00310 ACE_ERROR((LM_ERROR,
00311 ACE_TEXT("(%P|%t) ERROR: ")
00312 ACE_TEXT("WriteDataContainer::register_instance, ")
00313 ACE_TEXT("The provided instance handle=%X is not a valid")
00314 ACE_TEXT("handle.\n"),
00315 instance_handle));
00316
00317 return DDS::RETCODE_ERROR;
00318 }
00319
00320 instance->unregistered_ = false;
00321 }
00322
00323
00324 registered_sample.reset(instance->registered_sample_->duplicate());
00325
00326 if (this->writer_->watchdog_.in()) {
00327 this->writer_->watchdog_->schedule_timer(instance);
00328 }
00329
00330 return DDS::RETCODE_OK;
00331 }
00332
00333 DDS::ReturnCode_t
00334 WriteDataContainer::unregister(
00335 DDS::InstanceHandle_t instance_handle,
00336 Message_Block_Ptr& registered_sample,
00337 bool dup_registered_sample)
00338 {
00339 PublicationInstance_rch instance;
00340
00341 int const find_attempt = find(instances_, instance_handle, instance);
00342
00343 if (0 != find_attempt) {
00344 ACE_ERROR_RETURN((LM_ERROR,
00345 ACE_TEXT("(%P|%t) ERROR: ")
00346 ACE_TEXT("WriteDataContainer::unregister, ")
00347 ACE_TEXT("The instance(handle=%X) ")
00348 ACE_TEXT("is not registered yet.\n"),
00349 instance_handle),
00350 DDS::RETCODE_PRECONDITION_NOT_MET);
00351 }
00352
00353 instance->unregistered_ = true;
00354
00355 if (dup_registered_sample) {
00356
00357 registered_sample.reset(instance->registered_sample_->duplicate());
00358 }
00359
00360 if (this->writer_->watchdog_.in())
00361 this->writer_->watchdog_->cancel_timer(instance);
00362
00363 return DDS::RETCODE_OK;
00364 }
00365
00366 DDS::ReturnCode_t
00367 WriteDataContainer::dispose(DDS::InstanceHandle_t instance_handle,
00368 Message_Block_Ptr& registered_sample,
00369 bool dup_registered_sample)
00370 {
00371 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00372 guard,
00373 this->lock_,
00374 DDS::RETCODE_ERROR);
00375
00376 PublicationInstance_rch instance;
00377
00378 int const find_attempt = find(instances_, instance_handle, instance);
00379
00380 if (0 != find_attempt) {
00381 ACE_ERROR_RETURN((LM_ERROR,
00382 ACE_TEXT("(%P|%t) ERROR: ")
00383 ACE_TEXT("WriteDataContainer::dispose, ")
00384 ACE_TEXT("The instance(handle=%X) ")
00385 ACE_TEXT("is not registered yet.\n"),
00386 instance_handle),
00387 DDS::RETCODE_PRECONDITION_NOT_MET);
00388 }
00389
00390 if (dup_registered_sample) {
00391
00392 registered_sample.reset(instance->registered_sample_->duplicate());
00393 }
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404 InstanceDataSampleList& instance_list = instance->samples_;
00405
00406 while (instance_list.size() > 0) {
00407 bool released = false;
00408 DDS::ReturnCode_t ret
00409 = remove_oldest_sample(instance_list, released);
00410
00411 if (ret != DDS::RETCODE_OK) {
00412 return ret;
00413 }
00414 }
00415
00416 if (this->writer_->watchdog_.in())
00417 this->writer_->watchdog_->cancel_timer(instance);
00418 return DDS::RETCODE_OK;
00419 }
00420
00421 DDS::ReturnCode_t
00422 WriteDataContainer::num_samples(DDS::InstanceHandle_t handle,
00423 size_t& size)
00424 {
00425 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00426 guard,
00427 this->lock_,
00428 DDS::RETCODE_ERROR);
00429 PublicationInstance_rch instance;
00430
00431 int const find_attempt = find(instances_, handle, instance);
00432
00433 if (0 != find_attempt) {
00434 return DDS::RETCODE_ERROR;
00435
00436 } else {
00437 size = instance->samples_.size();
00438 return DDS::RETCODE_OK;
00439 }
00440 }
00441
00442 size_t
00443 WriteDataContainer::num_all_samples()
00444 {
00445 size_t size = 0;
00446
00447 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00448 guard,
00449 this->lock_,
00450 0);
00451
00452 for (PublicationInstanceMapType::iterator iter = instances_.begin();
00453 iter != instances_.end();
00454 ++iter)
00455 {
00456 size += iter->second->samples_.size();
00457 }
00458
00459 return size;
00460 }
00461
00462 ACE_UINT64
00463 WriteDataContainer::get_unsent_data(SendStateDataSampleList& list)
00464 {
00465 DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
00466
00467
00468
00469
00470
00471 list = this->unsent_data_;
00472
00473
00474 ++transaction_id_;
00475
00476
00477 SendStateDataSampleList::iterator iter = list.begin();
00478 while (iter != list.end()) {
00479 iter->set_transaction_id(this->transaction_id_);
00480 ++iter;
00481 }
00482
00483
00484
00485
00486
00487
00488 sending_data_.enqueue_tail(list);
00489
00490
00491
00492
00493 this->unsent_data_.reset();
00494
00495
00496
00497
00498 return transaction_id_;
00499 }
00500
00501 SendStateDataSampleList
00502 WriteDataContainer::get_resend_data()
00503 {
00504 DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
00505
00506
00507
00508
00509
00510 SendStateDataSampleList list = this->resend_data_;
00511
00512
00513
00514
00515 this->resend_data_.reset();
00516
00517
00518
00519 return list;
00520 }
00521
00522 bool
00523 WriteDataContainer::pending_data()
00524 {
00525 return this->sending_data_.size() != 0
00526 || this->orphaned_to_transport_.size() != 0
00527 || this->unsent_data_.size() != 0;
00528 }
00529
00530 void
00531 WriteDataContainer::data_delivered(const DataSampleElement* sample)
00532 {
00533 DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
00534
00535 if (DCPS_debug_level >= 2) {
00536 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
00537 ACE_TEXT(" %@\n"), sample));
00538 }
00539
00540 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00541 guard,
00542 this->lock_);
00543
00544
00545
00546
00547
00548
00549
00550
00551
00552 DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00553
00554
00555
00556
00557 if (!sending_data_.dequeue(sample)) {
00558
00559
00560
00561
00562 SendStateDataSampleList* send_lists[] = {
00563 &sent_data_,
00564 &unsent_data_,
00565 &orphaned_to_transport_};
00566 const SendStateDataSampleList* containing_list =
00567 SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00568
00569 if (containing_list == &this->sent_data_) {
00570 ACE_ERROR((LM_WARNING,
00571 ACE_TEXT("(%P|%t) WARNING: ")
00572 ACE_TEXT("WriteDataContainer::data_delivered, ")
00573 ACE_TEXT("The delivered sample is not in sending_data_ and ")
00574 ACE_TEXT("WAS IN sent_data_.\n")));
00575 } else if (containing_list == &this->unsent_data_) {
00576 ACE_ERROR((LM_WARNING,
00577 ACE_TEXT("(%P|%t) WARNING: ")
00578 ACE_TEXT("WriteDataContainer::data_delivered, ")
00579 ACE_TEXT("The delivered sample is not in sending_data_ and ")
00580 ACE_TEXT("WAS IN unsent_data_ list.\n")));
00581 } else {
00582
00583
00584
00585
00586 if (stale->get_header().message_id_ != SAMPLE_DATA) {
00587
00588 if (DCPS_debug_level > 9) {
00589 GuidConverter converter(publication_id_);
00590 ACE_DEBUG((LM_DEBUG,
00591 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00592 ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00593 this->domain_id_,
00594 this->topic_name_,
00595 OPENDDS_STRING(converter).c_str()));
00596 }
00597 writer_->controlTracker.message_delivered();
00598 }
00599
00600 if (containing_list == &this->orphaned_to_transport_) {
00601 orphaned_to_transport_.dequeue(sample);
00602 release_buffer(stale);
00603
00604 } else if (!containing_list) {
00605
00606 SendStateDataSampleList::remove(stale);
00607 release_buffer(stale);
00608 }
00609
00610 if (!pending_data())
00611 empty_condition_.broadcast();
00612 }
00613
00614 return;
00615 }
00616 ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, this->wfa_lock_);
00617 SequenceNumber acked_seq = stale->get_header().sequence_;
00618 SequenceNumber prev_max = acked_sequences_.cumulative_ack();
00619
00620 if (stale->get_header().message_id_ != SAMPLE_DATA) {
00621
00622 if (DCPS_debug_level > 9) {
00623 GuidConverter converter(publication_id_);
00624 ACE_DEBUG((LM_DEBUG,
00625 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00626 ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00627 this->domain_id_,
00628 this->topic_name_,
00629 OPENDDS_STRING(converter).c_str()));
00630 }
00631 release_buffer(stale);
00632 stale = 0;
00633 writer_->controlTracker.message_delivered();
00634 } else {
00635
00636 if (max_durable_per_instance_) {
00637 DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
00638 sent_data_.enqueue_tail(sample);
00639
00640 } else {
00641 if (InstanceDataSampleList::on_some_list(sample)) {
00642 PublicationInstance_rch inst = sample->get_handle();
00643 inst->samples_.dequeue(sample);
00644 }
00645 release_buffer(stale);
00646 stale = 0;
00647 }
00648
00649 if (DCPS_debug_level > 9) {
00650 GuidConverter converter(publication_id_);
00651 ACE_DEBUG((LM_DEBUG,
00652 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00653 ACE_TEXT("domain %d topic %C publication %C %s.\n"),
00654 this->domain_id_,
00655 this->topic_name_,
00656 OPENDDS_STRING(converter).c_str(),
00657 max_durable_per_instance_
00658 ? ACE_TEXT("stored for durability")
00659 : ACE_TEXT("released")));
00660 }
00661
00662 this->wakeup_blocking_writers (stale);
00663 }
00664 if (DCPS_debug_level > 9) {
00665 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00666 ACE_TEXT("Inserting acked_sequence: %q\n"),
00667 acked_seq.getValue()));
00668 }
00669
00670 acked_sequences_.insert(acked_seq);
00671
00672 if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
00673 prev_max < acked_sequences_.cumulative_ack()) {
00674
00675 if (DCPS_debug_level > 9) {
00676 ACE_DEBUG((LM_DEBUG,
00677 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
00678 ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
00679 }
00680
00681 wfa_condition_.broadcast();
00682 }
00683
00684
00685 if (!pending_data())
00686 empty_condition_.broadcast();
00687 }
00688
00689 void
00690 WriteDataContainer::data_dropped(const DataSampleElement* sample,
00691 bool dropped_by_transport)
00692 {
00693 DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
00694
00695 if (DCPS_debug_level >= 2) {
00696 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
00697 ACE_TEXT(" sample %X dropped_by_transport %d\n"),
00698 sample, dropped_by_transport));
00699 }
00700
00701
00702
00703
00704
00705 if (dropped_by_transport) {
00706 this->data_delivered(sample);
00707 return;
00708 }
00709
00710
00711
00712
00713
00714
00715
00716 ACE_GUARD (ACE_Recursive_Thread_Mutex,
00717 guard,
00718 this->lock_);
00719
00720
00721
00722
00723
00724
00725
00726
00727 DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00728
00729
00730
00731
00732
00733 if (sending_data_.dequeue(sample)) {
00734
00735
00736
00737
00738 unsent_data_.enqueue_tail(sample);
00739
00740 } else {
00741
00742
00743
00744 SendStateDataSampleList* send_lists[] = {
00745 &sent_data_,
00746 &unsent_data_,
00747 &orphaned_to_transport_};
00748 const SendStateDataSampleList* containing_list =
00749 SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00750
00751 if (containing_list == &this->sent_data_) {
00752 ACE_ERROR((LM_WARNING,
00753 ACE_TEXT("(%P|%t) WARNING: ")
00754 ACE_TEXT("WriteDataContainer::data_dropped, ")
00755 ACE_TEXT("The dropped sample is not in sending_data_ and ")
00756 ACE_TEXT("WAS IN sent_data_.\n")));
00757 } else if (containing_list == &this->unsent_data_) {
00758 ACE_ERROR((LM_WARNING,
00759 ACE_TEXT("(%P|%t) WARNING: ")
00760 ACE_TEXT("WriteDataContainer::data_dropped, ")
00761 ACE_TEXT("The dropped sample is not in sending_data_ and ")
00762 ACE_TEXT("WAS IN unsent_data_ list.\n")));
00763 } else {
00764
00765
00766
00767
00768 if (stale->get_header().message_id_ != SAMPLE_DATA) {
00769
00770 if (DCPS_debug_level > 9) {
00771 GuidConverter converter(publication_id_);
00772 ACE_DEBUG((LM_DEBUG,
00773 ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
00774 ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
00775 this->domain_id_,
00776 this->topic_name_,
00777 OPENDDS_STRING(converter).c_str()));
00778 }
00779 writer_->controlTracker.message_dropped();
00780 }
00781
00782 if (containing_list == &this->orphaned_to_transport_) {
00783 orphaned_to_transport_.dequeue(sample);
00784 release_buffer(stale);
00785 if (!pending_data())
00786 empty_condition_.broadcast();
00787
00788 } else if (!containing_list) {
00789
00790 SendStateDataSampleList::remove(stale);
00791 release_buffer(stale);
00792 }
00793 }
00794
00795 return;
00796 }
00797
00798 this->wakeup_blocking_writers (stale);
00799
00800 if (!pending_data())
00801 empty_condition_.broadcast();
00802 }
00803
00804 void
00805 WriteDataContainer::remove_excess_durable()
00806 {
00807 if (!max_durable_per_instance_)
00808 return;
00809
00810 size_t n_released = 0;
00811
00812 for (PublicationInstanceMapType::iterator iter = instances_.begin();
00813 iter != instances_.end();
00814 ++iter) {
00815
00816 CORBA::Long durable_allowed = max_durable_per_instance_;
00817 InstanceDataSampleList& instance_list = iter->second->samples_;
00818
00819 for (DataSampleElement* it = instance_list.tail(), *prev; it; it = prev) {
00820 prev = InstanceDataSampleList::prev(it);
00821
00822 if (DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, it->get_sample())) {
00823
00824 if (durable_allowed) {
00825 --durable_allowed;
00826 } else {
00827 instance_list.dequeue(it);
00828 sent_data_.dequeue(it);
00829 release_buffer(it);
00830 ++n_released;
00831 }
00832 }
00833 }
00834 }
00835
00836 if (n_released && DCPS_debug_level > 9) {
00837 const GuidConverter converter(publication_id_);
00838 ACE_DEBUG((LM_DEBUG,
00839 ACE_TEXT("(%P|%t) WriteDataContainer::remove_excess_durable: ")
00840 ACE_TEXT("domain %d topic %C publication %C %B samples removed ")
00841 ACE_TEXT("from durable data.\n"), domain_id_, topic_name_,
00842 OPENDDS_STRING(converter).c_str(), n_released));
00843 }
00844 }
00845
00846
00847 DDS::ReturnCode_t
00848 WriteDataContainer::remove_oldest_sample(
00849 InstanceDataSampleList& instance_list,
00850 bool& released)
00851 {
00852 DataSampleElement* stale = 0;
00853
00854
00855
00856
00857 if (instance_list.dequeue_head(stale) == false) {
00858 ACE_ERROR_RETURN((LM_ERROR,
00859 ACE_TEXT("(%P|%t) ERROR: ")
00860 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00861 ACE_TEXT("dequeue_head_next_sample failed\n")),
00862 DDS::RETCODE_ERROR);
00863 }
00864
00865
00866
00867
00868
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880 SendStateDataSampleList* send_lists[] = {
00881 &sending_data_,
00882 &sent_data_,
00883 &unsent_data_,
00884 &orphaned_to_transport_};
00885 const SendStateDataSampleList* containing_list =
00886 SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00887
00888
00889
00890
00891
00892
00893
00894
00895
00896 bool result = false;
00897
00898 if (containing_list == &this->sending_data_) {
00899 if (DCPS_debug_level > 2) {
00900 ACE_ERROR((LM_WARNING,
00901 ACE_TEXT("(%P|%t) WARNING: ")
00902 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00903 ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
00904 }
00905
00906
00907
00908
00909 if (this->writer_->remove_sample(stale)) {
00910 if (this->sent_data_.dequeue(stale)) {
00911 release_buffer(stale);
00912 result = true;
00913 }
00914
00915 } else {
00916 if (this->sending_data_.dequeue(stale)) {
00917 this->orphaned_to_transport_.enqueue_tail(stale);
00918 } else if (this->sent_data_.dequeue(stale)) {
00919 release_buffer(stale);
00920 result = true;
00921 }
00922 result = true;
00923 }
00924 released = true;
00925
00926 } else if (containing_list == &this->sent_data_) {
00927
00928
00929
00930 result = this->sent_data_.dequeue(stale) != 0;
00931 release_buffer(stale);
00932 released = true;
00933
00934 if (DCPS_debug_level > 9) {
00935 GuidConverter converter(publication_id_);
00936 ACE_DEBUG((LM_DEBUG,
00937 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00938 ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00939 this->domain_id_,
00940 this->topic_name_,
00941 OPENDDS_STRING(converter).c_str()));
00942 }
00943
00944 } else if (containing_list == &this->unsent_data_) {
00945
00946
00947
00948
00949 result = this->unsent_data_.dequeue(stale) != 0;
00950 release_buffer(stale);
00951 released = true;
00952
00953 if (DCPS_debug_level > 9) {
00954 GuidConverter converter(publication_id_);
00955 ACE_DEBUG((LM_DEBUG,
00956 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00957 ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
00958 this->domain_id_,
00959 this->topic_name_,
00960 OPENDDS_STRING(converter).c_str()));
00961 }
00962 } else {
00963 ACE_ERROR_RETURN((LM_ERROR,
00964 ACE_TEXT("(%P|%t) ERROR: ")
00965 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00966 ACE_TEXT("The oldest sample is not in any internal list.\n")),
00967 DDS::RETCODE_ERROR);
00968 }
00969
00970
00971 {
00972 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00973 guard,
00974 this->lock_,
00975 DDS::RETCODE_ERROR);
00976
00977 if (!pending_data())
00978 empty_condition_.broadcast();
00979 }
00980
00981 if (result == false) {
00982 ACE_ERROR_RETURN((LM_ERROR,
00983 ACE_TEXT("(%P|%t) ERROR: ")
00984 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00985 ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00986 DDS::RETCODE_ERROR);
00987
00988 }
00989
00990 return DDS::RETCODE_OK;
00991 }
00992
00993 DDS::ReturnCode_t
00994 WriteDataContainer::obtain_buffer_for_control(DataSampleElement*& element)
00995 {
00996 DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
00997
00998 ACE_NEW_MALLOC_RETURN(
00999 element,
01000 static_cast<DataSampleElement*>(
01001 sample_list_element_allocator_.malloc(
01002 sizeof(DataSampleElement))),
01003 DataSampleElement(publication_id_,
01004 this->writer_,
01005 PublicationInstance_rch()),
01006 DDS::RETCODE_ERROR);
01007
01008 return DDS::RETCODE_OK;
01009 }
01010
01011 DDS::ReturnCode_t
01012 WriteDataContainer::obtain_buffer(DataSampleElement*& element,
01013 DDS::InstanceHandle_t handle)
01014 {
01015 DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
01016
01017 remove_excess_durable();
01018
01019 PublicationInstance_rch instance = get_handle_instance(handle);
01020
01021 if (!instance) {
01022 return DDS::RETCODE_BAD_PARAMETER;
01023 }
01024
01025 ACE_NEW_MALLOC_RETURN(
01026 element,
01027 static_cast<DataSampleElement*>(
01028 sample_list_element_allocator_.malloc(
01029 sizeof(DataSampleElement))),
01030 DataSampleElement(publication_id_,
01031 this->writer_,
01032 instance),
01033 DDS::RETCODE_ERROR);
01034
01035
01036 InstanceDataSampleList& instance_list = instance->samples_;
01037 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01038
01039 bool need_to_set_abs_timeout = true;
01040 ACE_Time_Value abs_timeout;
01041
01042
01043
01044 while ((instance_list.size() >= max_samples_per_instance_) ||
01045 ((this->max_num_samples_ > 0) &&
01046 ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
01047
01048 if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01049 if (instance_list.size() >= history_depth_) {
01050 if (DCPS_debug_level >= 2) {
01051 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01052 ACE_TEXT(" instance %d attempting to remove")
01053 ACE_TEXT(" its oldest sample (reliable)\n"),
01054 handle));
01055 }
01056 bool oldest_released = false;
01057 ret = remove_oldest_sample(instance_list, oldest_released);
01058 if (oldest_released) {
01059 break;
01060 }
01061 }
01062
01063 if (need_to_set_abs_timeout) {
01064 abs_timeout = duration_to_absolute_time_value (max_blocking_time_);
01065 need_to_set_abs_timeout = false;
01066 }
01067 if (!shutdown_ && ACE_OS::gettimeofday() < abs_timeout) {
01068 if (DCPS_debug_level >= 2) {
01069 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01070 ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
01071 handle));
01072 }
01073
01074 waiting_on_release_ = true;
01075 int const wait_result = condition_.wait(&abs_timeout);
01076
01077 if (wait_result == 0) {
01078 remove_excess_durable();
01079
01080 } else {
01081 if (errno == ETIME) {
01082 if (DCPS_debug_level >= 2) {
01083 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01084 ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
01085 handle));
01086 }
01087 ret = DDS::RETCODE_TIMEOUT;
01088
01089 } else {
01090 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: WriteDataContainer::obtain_buffer condition_.wait()")
01091 ACE_TEXT("%p\n")));
01092 ret = DDS::RETCODE_ERROR;
01093 }
01094 }
01095
01096 } else {
01097
01098
01099 ret = DDS::RETCODE_TIMEOUT;
01100 }
01101
01102 } else {
01103
01104 bool oldest_released = false;
01105
01106
01107
01108
01109 if (instance_list.size() > 0) {
01110 if (DCPS_debug_level >= 2) {
01111 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01112 ACE_TEXT(" instance %d attempting to remove")
01113 ACE_TEXT(" its oldest sample\n"),
01114 handle));
01115 }
01116 ret = remove_oldest_sample(instance_list, oldest_released);
01117 }
01118
01119 if (ret == DDS::RETCODE_OK && !oldest_released) {
01120 if (DCPS_debug_level >= 2) {
01121 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01122 ACE_TEXT(" instance %d attempting to remove")
01123 ACE_TEXT(" oldest sample from any full instances\n"),
01124 handle));
01125 }
01126 PublicationInstanceMapType::iterator it = instances_.begin();
01127
01128 while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01129 if (it->second->samples_.size() >= max_samples_per_instance_) {
01130 ret = remove_oldest_sample(it->second->samples_, oldest_released);
01131 }
01132 ++it;
01133 }
01134 }
01135
01136 if (ret == DDS::RETCODE_OK && !oldest_released) {
01137 if (DCPS_debug_level >= 2) {
01138 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01139 ACE_TEXT(" instance %d attempting to remove")
01140 ACE_TEXT(" oldest sample from any instance with samples currently\n"),
01141 handle));
01142 }
01143 PublicationInstanceMapType::iterator it = instances_.begin();
01144
01145 while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01146 if (it->second->samples_.size() > 0) {
01147 ret = remove_oldest_sample(it->second->samples_, oldest_released);
01148 }
01149 ++it;
01150 }
01151 }
01152 if (!oldest_released) {
01153
01154
01155 ACE_ERROR((LM_ERROR,
01156 ACE_TEXT("(%P|%t) ERROR: ")
01157 ACE_TEXT("WriteDataContainer::obtain_buffer, ")
01158 ACE_TEXT("hitting resource limits with no samples to remove\n")));
01159 ret = DDS::RETCODE_ERROR;
01160 }
01161 }
01162
01163 if (ret != DDS::RETCODE_OK) {
01164 if (DCPS_debug_level >= 2) {
01165 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01166 ACE_TEXT(" instance %d could not obtain buffer for sample")
01167 ACE_TEXT(" releasing allotted sample and returning\n"),
01168 handle));
01169 }
01170 this->release_buffer(element);
01171 return ret;
01172 }
01173 }
01174
01175 data_holder_.enqueue_tail(element);
01176
01177 return ret;
01178 }
01179
01180 void
01181 WriteDataContainer::release_buffer(DataSampleElement* element)
01182 {
01183 if (element->get_header().message_id_ == SAMPLE_DATA)
01184 data_holder_.dequeue(element);
01185
01186 ACE_DES_FREE(element,
01187 sample_list_element_allocator_.free,
01188 DataSampleElement);
01189 }
01190
01191 void
01192 WriteDataContainer::unregister_all()
01193 {
01194 DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
01195 shutdown_ = true;
01196
01197 {
01198
01199
01200 ACE_GUARD(ACE_Recursive_Thread_Mutex,
01201 guard,
01202 this->lock_);
01203
01204
01205 (void) this->writer_->remove_all_msgs();
01206
01207
01208 if (waiting_on_release_) {
01209 condition_.broadcast();
01210 }
01211 }
01212 DDS::ReturnCode_t ret;
01213 Message_Block_Ptr registered_sample;
01214 PublicationInstanceMapType::iterator it = instances_.begin();
01215
01216 while (it != instances_.end()) {
01217
01218 ret = dispose(it->first, registered_sample, false);
01219
01220 if (ret != DDS::RETCODE_OK) {
01221 ACE_ERROR((LM_ERROR,
01222 ACE_TEXT("(%P|%t) ERROR: ")
01223 ACE_TEXT("WriteDataContainer::unregister_all, ")
01224 ACE_TEXT("dispose instance %X failed\n"),
01225 it->first));
01226 }
01227
01228 ret = unregister(it->first, registered_sample, false);
01229
01230 if (ret != DDS::RETCODE_OK) {
01231 ACE_ERROR((LM_ERROR,
01232 ACE_TEXT("(%P|%t) ERROR: ")
01233 ACE_TEXT("WriteDataContainer::unregister_all, ")
01234 ACE_TEXT("unregister instance %X failed\n"),
01235 it->first));
01236 }
01237
01238
01239 PublicationInstanceMapType::iterator it_next = it;
01240 ++it_next;
01241
01242 unbind(instances_, it->first);
01243 it = it_next;
01244 }
01245
01246 ACE_UNUSED_ARG(registered_sample);
01247 }
01248
01249 PublicationInstance_rch
01250 WriteDataContainer::get_handle_instance(DDS::InstanceHandle_t handle)
01251 {
01252 PublicationInstance_rch instance;
01253
01254 if (0 != find(instances_, handle, instance)) {
01255 ACE_DEBUG((LM_DEBUG,
01256 ACE_TEXT("(%P|%t) ")
01257 ACE_TEXT("WriteDataContainer::get_handle_instance, ")
01258 ACE_TEXT("lookup for %d failed\n"), handle));
01259 }
01260
01261 return instance;
01262 }
01263
01264 void
01265 WriteDataContainer::copy_and_prepend(SendStateDataSampleList& list,
01266 const SendStateDataSampleList& appended,
01267 const RepoId& reader_id,
01268 const DDS::LifespanQosPolicy& lifespan,
01269 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01270 const OPENDDS_STRING& filterClassName,
01271 const FilterEvaluator* eval,
01272 const DDS::StringSeq& params,
01273 #endif
01274 ssize_t& max_resend_samples)
01275 {
01276 for (SendStateDataSampleList::const_reverse_iterator cur = appended.rbegin();
01277 cur != appended.rend() && max_resend_samples; ++cur) {
01278
01279 if (resend_data_expired(*cur, lifespan))
01280 continue;
01281
01282 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01283 if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
01284 continue;
01285 #endif
01286
01287 PublicationInstance_rch inst = cur->get_handle();
01288
01289 if (!inst) {
01290
01291 continue;
01292 }
01293
01294 if (inst->durable_samples_remaining_ == 0)
01295 continue;
01296 --inst->durable_samples_remaining_;
01297
01298 DataSampleElement* element = 0;
01299 ACE_NEW_MALLOC(element,
01300 static_cast<DataSampleElement*>(
01301 sample_list_element_allocator_.malloc(
01302 sizeof(DataSampleElement))),
01303 DataSampleElement(*cur));
01304
01305 element->set_num_subs(1);
01306 element->set_sub_id(0, reader_id);
01307
01308 list.enqueue_head(element);
01309 --max_resend_samples;
01310 }
01311 }
01312
01313 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01314 bool
01315 WriteDataContainer::persist_data()
01316 {
01317 bool result = true;
01318
01319
01320
01321
01322 if (this->durability_cache_) {
01323
01324
01325
01326
01327
01328
01329
01330 bool const inserted =
01331 this->durability_cache_->insert(this->domain_id_,
01332 this->topic_name_,
01333 this->type_name_,
01334 this->sent_data_,
01335 this->durability_service_
01336 );
01337
01338 result = inserted;
01339
01340 if (!inserted)
01341 ACE_ERROR((LM_ERROR,
01342 ACE_TEXT("(%P|%t) ERROR: ")
01343 ACE_TEXT("WriteDataContainer::persist_data, ")
01344 ACE_TEXT("failed to make data durable for ")
01345 ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
01346 this->domain_id_,
01347 this->topic_name_,
01348 this->type_name_));
01349 }
01350
01351 return result;
01352 }
01353 #endif
01354
01355 void WriteDataContainer::reschedule_deadline()
01356 {
01357 for (PublicationInstanceMapType::iterator iter = instances_.begin();
01358 iter != instances_.end();
01359 ++iter) {
01360 if (iter->second->deadline_timer_id_ != -1) {
01361 if (this->writer_->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
01362 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WriteDataContainer::reschedule_deadline %p\n")
01363 ACE_TEXT("reset_timer_interval")));
01364 }
01365 }
01366 }
01367 }
01368
01369 void
01370 WriteDataContainer::wait_pending()
01371 {
01372 ACE_Time_Value pending_timeout =
01373 TheServiceParticipant->pending_timeout();
01374
01375 ACE_Time_Value* pTimeout = 0;
01376
01377 if (pending_timeout != ACE_Time_Value::zero) {
01378 pTimeout = &pending_timeout;
01379 pending_timeout += ACE_OS::gettimeofday();
01380 }
01381
01382 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01383 const bool report = DCPS_debug_level > 0 && pending_data();
01384 if (report) {
01385 if (pending_timeout == ACE_Time_Value::zero) {
01386 ACE_DEBUG((LM_DEBUG,
01387 ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending no timeout\n")));
01388 } else {
01389 ACE_DEBUG((LM_DEBUG,
01390 ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending timeout ")
01391 ACE_TEXT("at %#T\n"),
01392 &pending_timeout));
01393 }
01394 }
01395 while (true) {
01396
01397 if (!pending_data())
01398 break;
01399
01400 if (empty_condition_.wait(pTimeout) == -1 && pending_data()) {
01401 if (DCPS_debug_level) {
01402 ACE_DEBUG((LM_INFO,
01403 ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending %p\n"),
01404 ACE_TEXT("Timed out waiting for messages to be transported")));
01405 this->log_send_state_lists("WriteDataContainer::wait_pending - wait failed: ");
01406 }
01407 break;
01408 }
01409 }
01410 if (report) {
01411 ACE_DEBUG((LM_DEBUG,
01412 "%T (%P|%t) WriteDataContainer::wait_pending done\n"));
01413 }
01414 }
01415
01416 void
01417 WriteDataContainer::get_instance_handles(InstanceHandleVec& instance_handles)
01418 {
01419 ACE_GUARD(ACE_Recursive_Thread_Mutex,
01420 guard,
01421 this->lock_);
01422 PublicationInstanceMapType::iterator it = instances_.begin();
01423
01424 while (it != instances_.end()) {
01425 instance_handles.push_back(it->second->instance_handle_);
01426 ++it;
01427 }
01428 }
01429
01430 DDS::ReturnCode_t
01431 WriteDataContainer::wait_ack_of_seq(const ACE_Time_Value& abs_deadline, const SequenceNumber& sequence)
01432 {
01433 ACE_Time_Value deadline(abs_deadline);
01434 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01435 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->wfa_lock_, DDS::RETCODE_ERROR);
01436
01437 while (ACE_OS::gettimeofday() < deadline) {
01438
01439 if (!sequence_acknowledged(sequence)) {
01440
01441
01442 int const wait_result = wfa_condition_.wait(&deadline);
01443
01444 if (wait_result != 0) {
01445 if (errno == ETIME) {
01446 if (DCPS_debug_level >= 2) {
01447 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
01448 ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
01449 sequence.getValue()));
01450 }
01451 ret = DDS::RETCODE_TIMEOUT;
01452 } else {
01453 ret = DDS::RETCODE_ERROR;
01454 }
01455 }
01456 } else {
01457 ret = DDS::RETCODE_OK;
01458 break;
01459 }
01460 }
01461
01462 return ret;
01463 }
01464
01465 bool
01466 WriteDataContainer::sequence_acknowledged(const SequenceNumber sequence)
01467 {
01468 if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01469
01470 return true;
01471 }
01472
01473 SequenceNumber acked = acked_sequences_.cumulative_ack();
01474 if (DCPS_debug_level >= 10) {
01475 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged ")
01476 ACE_TEXT("- cumulative ack is currently: %q\n"), acked.getValue()));
01477 }
01478 if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
01479
01480
01481 return false;
01482 }
01483 return true;
01484 }
01485
01486 void
01487 WriteDataContainer::wakeup_blocking_writers (DataSampleElement* stale)
01488 {
01489 if (!stale && waiting_on_release_) {
01490 waiting_on_release_ = false;
01491
01492 condition_.broadcast();
01493 }
01494 }
01495
01496 void
01497 WriteDataContainer::log_send_state_lists (OPENDDS_STRING description)
01498 {
01499 ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
01500 description.c_str(),
01501 unsent_data_.size(),
01502 sending_data_.size(),
01503 sent_data_.size(),
01504 orphaned_to_transport_.size(),
01505 num_all_samples(),
01506 instances_.size()));
01507 }
01508
01509 }
01510 }
01511
01512 OPENDDS_END_VERSIONED_NAMESPACE_DECL