00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "ace/Condition_Recursive_Thread_Mutex.h"
00010 #include "WriteDataContainer.h"
00011 #include "DataSampleHeader.h"
00012 #include "InstanceDataSampleList.h"
00013 #include "DataWriterImpl.h"
00014 #include "MessageTracker.h"
00015 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00016 #include "DataDurabilityCache.h"
00017 #endif
00018 #include "PublicationInstance.h"
00019 #include "Util.h"
00020 #include "Qos_Helper.h"
00021 #include "GuidConverter.h"
00022 #include "OfferedDeadlineWatchdog.h"
00023 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00024 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00025 #include "dds/DCPS/transport/framework/TransportDebug.h"
00026
00027 #include "tao/debug.h"
00028
00029 namespace OpenDDS {
00030 namespace DCPS {
00031
00032
00033
00034
00035
00036 bool
00037 resend_data_expired(DataSampleElement const & element,
00038 DDS::LifespanQosPolicy const & lifespan)
00039 {
00040 if (lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
00041 || lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
00042
00043
00044 DDS::Time_t const tmp = {
00045 element.get_header().source_timestamp_sec_ + lifespan.duration.sec,
00046 element.get_header().source_timestamp_nanosec_ + lifespan.duration.nanosec
00047 };
00048
00049 ACE_Time_Value const now(ACE_OS::gettimeofday());
00050 ACE_Time_Value const expiration_time(time_to_time_value(tmp));
00051
00052 if (now >= expiration_time) {
00053 if (DCPS_debug_level >= 8) {
00054 ACE_Time_Value const diff(now - expiration_time);
00055 ACE_DEBUG((LM_DEBUG,
00056 ACE_TEXT("OpenDDS (%P|%t) Data to be sent ")
00057 ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
00058 diff.sec(),
00059 diff.usec()));
00060 }
00061
00062 return true;
00063 }
00064 }
00065
00066 return false;
00067 }
00068
00069 WriteDataContainer::WriteDataContainer(
00070 DataWriterImpl* writer,
00071 CORBA::Long depth,
00072 ::DDS::Duration_t max_blocking_time,
00073 size_t n_chunks,
00074 DDS::DomainId_t domain_id,
00075 char const * topic_name,
00076 char const * type_name,
00077 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00078 DataDurabilityCache* durability_cache,
00079 DDS::DurabilityServiceQosPolicy const & durability_service,
00080 #endif
00081 CORBA::Long max_instances,
00082 CORBA::Long max_total_samples)
00083 : transaction_id_(0),
00084 writer_(writer),
00085 depth_(depth),
00086 max_num_instances_(max_instances),
00087 max_num_samples_(max_total_samples),
00088 max_blocking_time_(max_blocking_time),
00089 waiting_on_release_(false),
00090 condition_(lock_),
00091 empty_condition_(lock_),
00092 wfa_condition_(this->wfa_lock_),
00093 n_chunks_(n_chunks),
00094 sample_list_element_allocator_(2 * n_chunks_),
00095 transport_send_element_allocator_(2 * n_chunks_,
00096 sizeof(TransportSendElement)),
00097 transport_customized_element_allocator_(2 * n_chunks_,
00098 sizeof(TransportCustomizedElement)),
00099 shutdown_(false),
00100 domain_id_(domain_id),
00101 topic_name_(topic_name),
00102 type_name_(type_name)
00103 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00104 , durability_cache_(durability_cache)
00105 , durability_service_(durability_service)
00106 #endif
00107 {
00108
00109 if (DCPS_debug_level >= 2) {
00110 ACE_DEBUG((LM_DEBUG,
00111 "(%P|%t) WriteDataContainer "
00112 "sample_list_element_allocator %x with %d chunks\n",
00113 &sample_list_element_allocator_, n_chunks_));
00114 ACE_DEBUG((LM_DEBUG,
00115 "(%P|%t) WriteDataContainer "
00116 "transport_send_element_allocator %x with %d chunks\n",
00117 &transport_send_element_allocator_, n_chunks_));
00118 }
00119 }
00120
00121 WriteDataContainer::~WriteDataContainer()
00122 {
00123 if (this->unsent_data_.size() > 0) {
00124 ACE_DEBUG((LM_WARNING,
00125 ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00126 ACE_TEXT("destroyed with %d samples unsent.\n"),
00127 this->unsent_data_.size()));
00128 }
00129
00130 if (this->sending_data_.size() > 0) {
00131 ACE_DEBUG((LM_WARNING,
00132 ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
00133 ACE_TEXT("destroyed with %d samples sending.\n"),
00134 this->sending_data_.size()));
00135 }
00136
00137 if (this->sent_data_.size() > 0) {
00138 if (DCPS_debug_level > 0) {
00139 ACE_DEBUG((LM_DEBUG,
00140 ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00141 ACE_TEXT("destroyed with %d samples sent.\n"),
00142 this->sent_data_.size()));
00143 }
00144 }
00145
00146 if (this->orphaned_to_transport_.size() > 0) {
00147 if (DCPS_debug_level > 0) {
00148 ACE_DEBUG((LM_DEBUG,
00149 ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
00150 ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
00151 this->orphaned_to_transport_.size()));
00152 }
00153 }
00154
00155 if (!shutdown_) {
00156 ACE_ERROR((LM_ERROR,
00157 ACE_TEXT("(%P|%t) ERROR: ")
00158 ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
00159 ACE_TEXT("The container has not been cleaned.\n")));
00160 }
00161 }
00162
00163 DDS::ReturnCode_t
00164 WriteDataContainer::enqueue_control(DataSampleElement* control_sample)
00165 {
00166
00167
00168
00169
00170
00171 unsent_data_.enqueue_tail(control_sample);
00172
00173 return DDS::RETCODE_OK;
00174 }
00175
00176
00177 DDS::ReturnCode_t
00178 WriteDataContainer::enqueue(
00179 DataSampleElement* sample,
00180 DDS::InstanceHandle_t instance_handle)
00181 {
00182
00183 PublicationInstance* const instance =
00184 get_handle_instance(instance_handle);
00185
00186 InstanceDataSampleList& instance_list = instance->samples_;
00187
00188 if (this->writer_->watchdog_) {
00189 instance->last_sample_tv_ = instance->cur_sample_tv_;
00190 instance->cur_sample_tv_ = ACE_OS::gettimeofday();
00191 this->writer_->watchdog_->execute(instance, false);
00192 }
00193
00194
00195
00196
00197
00198
00199
00200 unsent_data_.enqueue_tail(sample);
00201
00202
00203
00204 instance_list.enqueue_tail(sample);
00205
00206 return DDS::RETCODE_OK;
00207 }
00208
00209 DDS::ReturnCode_t
00210 WriteDataContainer::reenqueue_all(const RepoId& reader_id,
00211 const DDS::LifespanQosPolicy& lifespan
00212 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00213 ,
00214 const OPENDDS_STRING& filterClassName,
00215 const FilterEvaluator* eval,
00216 const DDS::StringSeq& expression_params
00217 #endif
00218 )
00219 {
00220 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00221 guard,
00222 this->lock_,
00223 DDS::RETCODE_ERROR);
00224
00225
00226 if (sent_data_.size() > 0) {
00227 this->copy_and_append(this->resend_data_,
00228 sent_data_,
00229 reader_id,
00230 lifespan
00231 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00232 , filterClassName, eval, expression_params
00233 #endif
00234 );
00235
00236 if (sending_data_.size() > 0) {
00237 this->copy_and_append(this->resend_data_,
00238 sending_data_,
00239 reader_id,
00240 lifespan
00241 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00242 , filterClassName, eval, expression_params
00243 #endif
00244 );
00245 }
00246
00247 if (DCPS_debug_level > 9) {
00248 GuidConverter converter(publication_id_);
00249 GuidConverter reader(reader_id);
00250 ACE_DEBUG((LM_DEBUG,
00251 ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
00252 ACE_TEXT("domain %d topic %C publication %C copying HISTORY to resend to %C.\n"),
00253 this->domain_id_,
00254 this->topic_name_,
00255 OPENDDS_STRING(converter).c_str(),
00256 OPENDDS_STRING(reader).c_str()));
00257 }
00258 }
00259
00260 return DDS::RETCODE_OK;
00261 }
00262
00263 DDS::ReturnCode_t
00264 WriteDataContainer::register_instance(
00265 DDS::InstanceHandle_t& instance_handle,
00266 DataSample*& registered_sample)
00267 {
00268 PublicationInstance* instance = 0;
00269 auto_ptr<PublicationInstance> safe_instance;
00270
00271 if (instance_handle == DDS::HANDLE_NIL) {
00272 if (max_num_instances_ > 0
00273 && max_num_instances_ <= (CORBA::Long) instances_.size()) {
00274 return DDS::RETCODE_OUT_OF_RESOURCES;
00275 }
00276
00277
00278 ACE_NEW_RETURN(instance,
00279 PublicationInstance(registered_sample),
00280 DDS::RETCODE_ERROR);
00281
00282 ACE_auto_ptr_reset(safe_instance, instance);
00283
00284 instance_handle = this->writer_->get_next_handle();
00285
00286 int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
00287
00288 if (0 != insert_attempt) {
00289 ACE_ERROR((LM_ERROR,
00290 ACE_TEXT("(%P|%t) ERROR: ")
00291 ACE_TEXT("WriteDataContainer::register_instance, ")
00292 ACE_TEXT("failed to insert instance handle=%X\n"),
00293 instance));
00294 return DDS::RETCODE_ERROR;
00295 }
00296
00297 instance->instance_handle_ = instance_handle;
00298
00299 } else {
00300 int const find_attempt = find(instances_, instance_handle, instance);
00301 ACE_auto_ptr_reset(safe_instance, instance);
00302
00303 if (0 != find_attempt) {
00304 ACE_ERROR((LM_ERROR,
00305 ACE_TEXT("(%P|%t) ERROR: ")
00306 ACE_TEXT("WriteDataContainer::register_instance, ")
00307 ACE_TEXT("The provided instance handle=%X is not a valid")
00308 ACE_TEXT("handle.\n"),
00309 instance_handle));
00310
00311 return DDS::RETCODE_ERROR;
00312 }
00313
00314
00315 registered_sample->release();
00316
00317 instance->unregistered_ = false;
00318 }
00319
00320
00321 registered_sample = instance->registered_sample_->duplicate();
00322
00323 if (this->writer_->watchdog_) {
00324 this->writer_->watchdog_->schedule_timer(instance);
00325 }
00326
00327 safe_instance.release();
00328
00329 return DDS::RETCODE_OK;
00330 }
00331
00332 DDS::ReturnCode_t
00333 WriteDataContainer::unregister(
00334 DDS::InstanceHandle_t instance_handle,
00335 DataSample*& registered_sample,
00336 bool dup_registered_sample)
00337 {
00338 PublicationInstance* instance = 0;
00339
00340 int const find_attempt = find(instances_, instance_handle, instance);
00341
00342 if (0 != find_attempt) {
00343 ACE_ERROR_RETURN((LM_ERROR,
00344 ACE_TEXT("(%P|%t) ERROR: ")
00345 ACE_TEXT("WriteDataContainer::unregister, ")
00346 ACE_TEXT("The instance(handle=%X) ")
00347 ACE_TEXT("is not registered yet.\n"),
00348 instance_handle),
00349 DDS::RETCODE_PRECONDITION_NOT_MET);
00350 }
00351
00352 instance->unregistered_ = true;
00353
00354 if (dup_registered_sample) {
00355
00356 registered_sample = instance->registered_sample_->duplicate();
00357 }
00358
00359
00360 this->writer_->unregistered(instance_handle);
00361
00362 if (this->writer_->watchdog_)
00363 this->writer_->watchdog_->cancel_timer(instance);
00364
00365 return DDS::RETCODE_OK;
00366 }
00367
00368 DDS::ReturnCode_t
00369 WriteDataContainer::dispose(DDS::InstanceHandle_t instance_handle,
00370 DataSample*& registered_sample,
00371 bool dup_registered_sample)
00372 {
00373 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00374 guard,
00375 this->lock_,
00376 DDS::RETCODE_ERROR);
00377
00378 PublicationInstance* instance = 0;
00379
00380 int const find_attempt = find(instances_, instance_handle, instance);
00381
00382 if (0 != find_attempt) {
00383 ACE_ERROR_RETURN((LM_ERROR,
00384 ACE_TEXT("(%P|%t) ERROR: ")
00385 ACE_TEXT("WriteDataContainer::dispose, ")
00386 ACE_TEXT("The instance(handle=%X) ")
00387 ACE_TEXT("is not registered yet.\n"),
00388 instance_handle),
00389 DDS::RETCODE_PRECONDITION_NOT_MET);
00390 }
00391
00392 if (dup_registered_sample) {
00393
00394 registered_sample = instance->registered_sample_->duplicate();
00395 }
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406 InstanceDataSampleList& instance_list = instance->samples_;
00407
00408 while (instance_list.size() > 0) {
00409 bool released = false;
00410 DDS::ReturnCode_t ret
00411 = remove_oldest_sample(instance_list, released);
00412
00413 if (ret != DDS::RETCODE_OK) {
00414 return ret;
00415 }
00416 }
00417
00418 if (this->writer_->watchdog_)
00419 this->writer_->watchdog_->cancel_timer(instance);
00420 return DDS::RETCODE_OK;
00421 }
00422
00423 DDS::ReturnCode_t
00424 WriteDataContainer::num_samples(DDS::InstanceHandle_t handle,
00425 size_t& size)
00426 {
00427 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00428 guard,
00429 this->lock_,
00430 DDS::RETCODE_ERROR);
00431 PublicationInstance* instance = 0;
00432
00433 int const find_attempt = find(instances_, handle, instance);
00434
00435 if (0 != find_attempt) {
00436 return DDS::RETCODE_ERROR;
00437
00438 } else {
00439 size = instance->samples_.size();
00440 return DDS::RETCODE_OK;
00441 }
00442 }
00443
00444 size_t
00445 WriteDataContainer::num_all_samples()
00446 {
00447 size_t size = 0;
00448
00449 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00450 guard,
00451 this->lock_,
00452 0);
00453
00454 for (PublicationInstanceMapType::iterator iter = instances_.begin();
00455 iter != instances_.end();
00456 ++iter)
00457 {
00458 size += iter->second->samples_.size();
00459 }
00460
00461 return size;
00462 }
00463
00464 ACE_UINT64
00465 WriteDataContainer::get_unsent_data(SendStateDataSampleList& list)
00466 {
00467 DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
00468
00469
00470
00471
00472
00473 list = this->unsent_data_;
00474
00475
00476 ++transaction_id_;
00477
00478
00479 SendStateDataSampleList::iterator iter = list.begin();
00480 while (iter != list.end()) {
00481 iter->set_transaction_id(this->transaction_id_);
00482 ++iter;
00483 }
00484
00485
00486
00487
00488
00489
00490 sending_data_.enqueue_tail(list);
00491
00492
00493
00494
00495 this->unsent_data_.reset();
00496
00497
00498
00499
00500 return transaction_id_;
00501 }
00502
00503 SendStateDataSampleList
00504 WriteDataContainer::get_resend_data()
00505 {
00506 DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
00507
00508
00509
00510
00511
00512 SendStateDataSampleList list = this->resend_data_;
00513
00514
00515
00516
00517 this->resend_data_.reset();
00518
00519
00520
00521 return list;
00522 }
00523
00524 bool
00525 WriteDataContainer::pending_data()
00526 {
00527 return this->sending_data_.size() != 0
00528 || this->orphaned_to_transport_.size() != 0
00529 || this->unsent_data_.size() != 0;
00530 }
00531
00532 void
00533 WriteDataContainer::data_delivered(const DataSampleElement* sample)
00534 {
00535 DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
00536
00537 if (DCPS_debug_level >= 2) {
00538 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
00539 ACE_TEXT(" %X \n"), sample));
00540 }
00541
00542 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00543 guard,
00544 this->lock_);
00545
00546
00547
00548
00549
00550
00551
00552
00553
00554 DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00555
00556
00557
00558
00559 if (!sending_data_.dequeue(sample)) {
00560
00561
00562
00563
00564 OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00565 send_lists.push_back(&sent_data_);
00566 send_lists.push_back(&unsent_data_);
00567 send_lists.push_back(&orphaned_to_transport_);
00568
00569 const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00570
00571 if (containing_list == &this->sent_data_) {
00572 ACE_ERROR((LM_WARNING,
00573 ACE_TEXT("(%P|%t) WARNING: ")
00574 ACE_TEXT("WriteDataContainer::data_delivered, ")
00575 ACE_TEXT("The delivered sample is not in sending_data_ and ")
00576 ACE_TEXT("WAS IN sent_data_.\n")));
00577 } else if (containing_list == &this->unsent_data_) {
00578 ACE_ERROR((LM_WARNING,
00579 ACE_TEXT("(%P|%t) WARNING: ")
00580 ACE_TEXT("WriteDataContainer::data_delivered, ")
00581 ACE_TEXT("The delivered sample is not in sending_data_ and ")
00582 ACE_TEXT("WAS IN unsent_data_ list.\n")));
00583 } else {
00584
00585 if (containing_list == &this->orphaned_to_transport_) {
00586 orphaned_to_transport_.dequeue(sample);
00587 release_buffer(stale);
00588 }
00589
00590
00591
00592 if (stale->get_header().message_id_ != SAMPLE_DATA) {
00593
00594 if (DCPS_debug_level > 9) {
00595 GuidConverter converter(publication_id_);
00596 ACE_DEBUG((LM_DEBUG,
00597 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00598 ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00599 this->domain_id_,
00600 this->topic_name_,
00601 OPENDDS_STRING(converter).c_str()));
00602 }
00603 writer_->controlTracker.message_delivered();
00604 }
00605
00606 if (!pending_data())
00607 empty_condition_.broadcast();
00608 }
00609
00610 return;
00611 }
00612 ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, this->wfa_lock_);
00613 SequenceNumber acked_seq = stale->get_header().sequence_;
00614 SequenceNumber prev_max = acked_sequences_.cumulative_ack();
00615
00616 if (stale->get_header().message_id_ != SAMPLE_DATA) {
00617
00618 if (DCPS_debug_level > 9) {
00619 GuidConverter converter(publication_id_);
00620 ACE_DEBUG((LM_DEBUG,
00621 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00622 ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
00623 this->domain_id_,
00624 this->topic_name_,
00625 OPENDDS_STRING(converter).c_str()));
00626 }
00627 release_buffer(stale);
00628 writer_->controlTracker.message_delivered();
00629 } else {
00630 if (DCPS_debug_level > 9) {
00631 GuidConverter converter(publication_id_);
00632 ACE_DEBUG((LM_DEBUG,
00633 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00634 ACE_TEXT("domain %d topic %C publication %C pushed to HISTORY.\n"),
00635 this->domain_id_,
00636 this->topic_name_,
00637 OPENDDS_STRING(converter).c_str()));
00638 }
00639
00640 DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
00641 sent_data_.enqueue_tail(sample);
00642
00643 this->wakeup_blocking_writers (stale);
00644 }
00645 if (DCPS_debug_level > 9) {
00646 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
00647 ACE_TEXT("Inserting acked_sequence: %q\n"),
00648 acked_seq.getValue()));
00649 }
00650
00651 acked_sequences_.insert(acked_seq);
00652
00653 if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
00654 prev_max < acked_sequences_.cumulative_ack()) {
00655
00656 if (DCPS_debug_level > 9) {
00657 ACE_DEBUG((LM_DEBUG,
00658 ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
00659 ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
00660 }
00661
00662 wfa_condition_.broadcast();
00663 }
00664
00665
00666 if (!pending_data())
00667 empty_condition_.broadcast();
00668 }
00669
00670 void
00671 WriteDataContainer::data_dropped(const DataSampleElement* sample,
00672 bool dropped_by_transport)
00673 {
00674 DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
00675
00676 if (DCPS_debug_level >= 2) {
00677 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
00678 ACE_TEXT(" sample %X dropped_by_transport %d\n"),
00679 sample, dropped_by_transport));
00680 }
00681
00682
00683
00684
00685
00686 if (dropped_by_transport) {
00687 this->data_delivered(sample);
00688 return;
00689 }
00690
00691
00692
00693
00694
00695
00696
00697 ACE_GUARD (ACE_Recursive_Thread_Mutex,
00698 guard,
00699 this->lock_);
00700
00701
00702
00703
00704
00705
00706
00707
00708 DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
00709
00710
00711
00712
00713
00714 if (sending_data_.dequeue(sample)) {
00715
00716
00717
00718
00719 unsent_data_.enqueue_tail(sample);
00720
00721 } else {
00722
00723
00724
00725 OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00726 send_lists.push_back(&sent_data_);
00727 send_lists.push_back(&unsent_data_);
00728 send_lists.push_back(&orphaned_to_transport_);
00729
00730 const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00731
00732 if (containing_list == &this->sent_data_) {
00733 ACE_ERROR((LM_WARNING,
00734 ACE_TEXT("(%P|%t) WARNING: ")
00735 ACE_TEXT("WriteDataContainer::data_dropped, ")
00736 ACE_TEXT("The dropped sample is not in sending_data_ and ")
00737 ACE_TEXT("WAS IN sent_data_.\n")));
00738 } else if (containing_list == &this->unsent_data_) {
00739 ACE_ERROR((LM_WARNING,
00740 ACE_TEXT("(%P|%t) WARNING: ")
00741 ACE_TEXT("WriteDataContainer::data_dropped, ")
00742 ACE_TEXT("The dropped sample is not in sending_data_ and ")
00743 ACE_TEXT("WAS IN unsent_data_ list.\n")));
00744 } else {
00745
00746
00747
00748 if (stale->get_header().message_id_ != SAMPLE_DATA) {
00749
00750 if (DCPS_debug_level > 9) {
00751 GuidConverter converter(publication_id_);
00752 ACE_DEBUG((LM_DEBUG,
00753 ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
00754 ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
00755 this->domain_id_,
00756 this->topic_name_,
00757 OPENDDS_STRING(converter).c_str()));
00758 }
00759 writer_->controlTracker.message_dropped();
00760 }
00761 if (containing_list == &this->orphaned_to_transport_) {
00762 orphaned_to_transport_.dequeue(sample);
00763 release_buffer(stale);
00764 if (!pending_data())
00765 empty_condition_.broadcast();
00766 }
00767 }
00768
00769 return;
00770 }
00771
00772 this->wakeup_blocking_writers (stale);
00773
00774 if (!pending_data())
00775 empty_condition_.broadcast();
00776 }
00777
00778 DDS::ReturnCode_t
00779 WriteDataContainer::remove_oldest_historical_sample(
00780 InstanceDataSampleList& instance_list,
00781 bool& released)
00782 {
00783 DataSampleElement* stale = 0;
00784
00785 if (instance_list.head() != 0) {
00786 stale = instance_list.head();
00787 } else {
00788
00789 return DDS::RETCODE_OK;
00790 }
00791
00792
00793
00794
00795
00796 OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00797 send_lists.push_back(&sent_data_);
00798
00799 const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00800
00801
00802
00803
00804 bool result = false;
00805
00806 if (containing_list == &this->sent_data_) {
00807
00808
00809
00810
00811 if (instance_list.dequeue_head(stale) == false) {
00812 ACE_ERROR_RETURN((LM_ERROR,
00813 ACE_TEXT("(%P|%t) ERROR: ")
00814 ACE_TEXT("WriteDataContainer::remove_oldest_historical_sample, ")
00815 ACE_TEXT("dequeue_head_next_sample failed\n")),
00816 DDS::RETCODE_ERROR);
00817 }
00818
00819
00820 result = this->sent_data_.dequeue(stale) != 0;
00821 release_buffer(stale);
00822 released = true;
00823
00824 if (DCPS_debug_level > 9) {
00825 GuidConverter converter(publication_id_);
00826 ACE_DEBUG((LM_DEBUG,
00827 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_historical_sample: ")
00828 ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00829 this->domain_id_,
00830 this->topic_name_,
00831 OPENDDS_STRING(converter).c_str()));
00832 }
00833
00834 } else {
00835
00836 return DDS::RETCODE_OK;
00837 }
00838
00839 if (result == false) {
00840 ACE_ERROR_RETURN((LM_ERROR,
00841 ACE_TEXT("(%P|%t) ERROR: ")
00842 ACE_TEXT("WriteDataContainer::remove_oldest_historical_sample, ")
00843 ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00844 DDS::RETCODE_ERROR);
00845
00846 }
00847
00848 return DDS::RETCODE_OK;
00849 }
00850
00851
00852 DDS::ReturnCode_t
00853 WriteDataContainer::remove_oldest_sample(
00854 InstanceDataSampleList& instance_list,
00855 bool& released)
00856 {
00857 DataSampleElement* stale = 0;
00858
00859
00860
00861
00862 if (instance_list.dequeue_head(stale) == false) {
00863 ACE_ERROR_RETURN((LM_ERROR,
00864 ACE_TEXT("(%P|%t) ERROR: ")
00865 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00866 ACE_TEXT("dequeue_head_next_sample failed\n")),
00867 DDS::RETCODE_ERROR);
00868 }
00869
00870
00871
00872
00873
00874
00875
00876
00877
00878
00879
00880
00881
00882
00883
00884
00885 OPENDDS_VECTOR(SendStateDataSampleList*) send_lists;
00886 send_lists.push_back(&sending_data_);
00887 send_lists.push_back(&sent_data_);
00888 send_lists.push_back(&unsent_data_);
00889 send_lists.push_back(&orphaned_to_transport_);
00890
00891 const SendStateDataSampleList* containing_list = SendStateDataSampleList::send_list_containing_element(stale, send_lists);
00892
00893
00894
00895
00896
00897
00898
00899
00900
00901
00902 bool result = false;
00903
00904 if (containing_list == &this->sending_data_) {
00905 if (DCPS_debug_level > 2) {
00906 ACE_ERROR((LM_WARNING,
00907 ACE_TEXT("(%P|%t) WARNING: ")
00908 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00909 ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
00910 }
00911
00912
00913
00914
00915 if (this->writer_->remove_sample(stale)) {
00916 if (this->sent_data_.dequeue(stale)) {
00917 release_buffer(stale);
00918 result = true;
00919 }
00920
00921 } else {
00922 if (this->sending_data_.dequeue(stale)) {
00923 this->orphaned_to_transport_.enqueue_tail(stale);
00924 } else if (this->sent_data_.dequeue(stale)) {
00925 release_buffer(stale);
00926 result = true;
00927 }
00928 result = true;
00929 }
00930 released = true;
00931
00932 } else if (containing_list == &this->sent_data_) {
00933
00934
00935
00936 result = this->sent_data_.dequeue(stale) != 0;
00937 release_buffer(stale);
00938 released = true;
00939
00940 if (DCPS_debug_level > 9) {
00941 GuidConverter converter(publication_id_);
00942 ACE_DEBUG((LM_DEBUG,
00943 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00944 ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
00945 this->domain_id_,
00946 this->topic_name_,
00947 OPENDDS_STRING(converter).c_str()));
00948 }
00949
00950 } else if (containing_list == &this->unsent_data_) {
00951
00952
00953
00954
00955 result = this->unsent_data_.dequeue(stale) != 0;
00956 release_buffer(stale);
00957 released = true;
00958
00959 if (DCPS_debug_level > 9) {
00960 GuidConverter converter(publication_id_);
00961 ACE_DEBUG((LM_DEBUG,
00962 ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
00963 ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
00964 this->domain_id_,
00965 this->topic_name_,
00966 OPENDDS_STRING(converter).c_str()));
00967 }
00968 } else {
00969 ACE_ERROR_RETURN((LM_ERROR,
00970 ACE_TEXT("(%P|%t) ERROR: ")
00971 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00972 ACE_TEXT("The oldest sample is not in any internal list.\n")),
00973 DDS::RETCODE_ERROR);
00974 }
00975
00976
00977 {
00978 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00979 guard,
00980 this->lock_,
00981 DDS::RETCODE_ERROR);
00982
00983 if (!pending_data())
00984 empty_condition_.broadcast();
00985 }
00986
00987 if (result == false) {
00988 ACE_ERROR_RETURN((LM_ERROR,
00989 ACE_TEXT("(%P|%t) ERROR: ")
00990 ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
00991 ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
00992 DDS::RETCODE_ERROR);
00993
00994 }
00995
00996 return DDS::RETCODE_OK;
00997 }
00998
00999 DDS::ReturnCode_t
01000 WriteDataContainer::obtain_buffer_for_control(DataSampleElement*& element)
01001 {
01002 DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
01003
01004 ACE_NEW_MALLOC_RETURN(
01005 element,
01006 static_cast<DataSampleElement*>(
01007 sample_list_element_allocator_.malloc(
01008 sizeof(DataSampleElement))),
01009 DataSampleElement(publication_id_,
01010 this->writer_,
01011 0,
01012 &transport_send_element_allocator_,
01013 &transport_customized_element_allocator_),
01014 DDS::RETCODE_ERROR);
01015
01016 return DDS::RETCODE_OK;
01017 }
01018
01019 DDS::ReturnCode_t
01020 WriteDataContainer::obtain_buffer(DataSampleElement*& element,
01021 DDS::InstanceHandle_t handle)
01022 {
01023 DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
01024
01025 PublicationInstance* instance = get_handle_instance(handle);
01026
01027 ACE_NEW_MALLOC_RETURN(
01028 element,
01029 static_cast<DataSampleElement*>(
01030 sample_list_element_allocator_.malloc(
01031 sizeof(DataSampleElement))),
01032 DataSampleElement(publication_id_,
01033 this->writer_,
01034 instance,
01035 &transport_send_element_allocator_,
01036 &transport_customized_element_allocator_),
01037 DDS::RETCODE_ERROR);
01038
01039
01040 InstanceDataSampleList& instance_list = instance->samples_;
01041 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01042
01043 bool need_to_set_abs_timeout = true;
01044 ACE_Time_Value abs_timeout;
01045
01046
01047
01048
01049
01050 while ((instance_list.size() >= depth_) ||
01051 ((this->max_num_samples_ > 0) &&
01052 ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
01053
01054
01055 if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
01056
01057 bool removed_historical = false;
01058 if (DCPS_debug_level >= 2) {
01059 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01060 ACE_TEXT(" instance %d attempting to remove")
01061 ACE_TEXT(" its oldest historical sample\n"),
01062 handle));
01063 }
01064
01065 ret = this->remove_oldest_historical_sample(instance_list, removed_historical);
01066
01067
01068 if (ret == DDS::RETCODE_OK && !removed_historical) {
01069 if (DCPS_debug_level >= 2) {
01070 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01071 ACE_TEXT(" instance %d attempting to remove")
01072 ACE_TEXT(" oldest historical sample from any full instances\n"),
01073 handle));
01074 }
01075 PublicationInstanceMapType::iterator it = instances_.begin();
01076
01077 while (!removed_historical && it != instances_.end() && ret == DDS::RETCODE_OK) {
01078 if(it->second->samples_.size() >= depth_) {
01079 ret = this->remove_oldest_historical_sample(it->second->samples_, removed_historical);
01080 }
01081 ++it;
01082 }
01083 }
01084
01085 if (ret == DDS::RETCODE_OK && !removed_historical) {
01086
01087 if (need_to_set_abs_timeout) {
01088 abs_timeout = duration_to_absolute_time_value (max_blocking_time_);
01089 need_to_set_abs_timeout = false;
01090 }
01091 if (!shutdown_ && ACE_OS::gettimeofday() < abs_timeout) {
01092 if (DCPS_debug_level >= 2) {
01093 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01094 ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
01095 handle));
01096 }
01097
01098 waiting_on_release_ = true;
01099
01100
01101 int const wait_result = condition_.wait(&abs_timeout);
01102
01103 if (wait_result != 0) {
01104 if (errno == ETIME) {
01105 if (DCPS_debug_level >= 2) {
01106 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01107 ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
01108 handle));
01109 }
01110 ret = DDS::RETCODE_TIMEOUT;
01111 } else {
01112 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: WriteDataContainer::obtain_buffer condition_.wait()")
01113 ACE_TEXT("%p\n")));
01114 ret = DDS::RETCODE_ERROR;
01115 }
01116 }
01117 } else {
01118
01119
01120 ret = DDS::RETCODE_TIMEOUT;
01121 }
01122 }
01123
01124 } else {
01125
01126 bool oldest_released = false;
01127
01128
01129
01130
01131 if (instance_list.size() > 0) {
01132 if (DCPS_debug_level >= 2) {
01133 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01134 ACE_TEXT(" instance %d attempting to remove")
01135 ACE_TEXT(" its oldest sample\n"),
01136 handle));
01137 }
01138 ret = this->remove_oldest_sample(instance_list, oldest_released);
01139 }
01140
01141 if (ret == DDS::RETCODE_OK && !oldest_released) {
01142 if (DCPS_debug_level >= 2) {
01143 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01144 ACE_TEXT(" instance %d attempting to remove")
01145 ACE_TEXT(" oldest sample from any full instances\n"),
01146 handle));
01147 }
01148 PublicationInstanceMapType::iterator it = instances_.begin();
01149
01150 while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01151 if(it->second->samples_.size() >= depth_) {
01152 ret = this->remove_oldest_sample(it->second->samples_, oldest_released);
01153 }
01154 ++it;
01155 }
01156 }
01157
01158 if (ret == DDS::RETCODE_OK && !oldest_released) {
01159 if (DCPS_debug_level >= 2) {
01160 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01161 ACE_TEXT(" instance %d attempting to remove")
01162 ACE_TEXT(" oldest sample from any instance with samples currently\n"),
01163 handle));
01164 }
01165 PublicationInstanceMapType::iterator it = instances_.begin();
01166
01167 while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
01168 if(it->second->samples_.size() > 0) {
01169 ret = this->remove_oldest_sample(it->second->samples_, oldest_released);
01170 }
01171 ++it;
01172 }
01173 }
01174 if (!oldest_released) {
01175
01176
01177 ACE_ERROR((LM_ERROR,
01178 ACE_TEXT("(%P|%t) ERROR: ")
01179 ACE_TEXT("WriteDataContainer::obtain_buffer, ")
01180 ACE_TEXT("hitting resource limits with no samples to remove\n")));
01181 ret = DDS::RETCODE_ERROR;
01182 }
01183 }
01184
01185 if (ret != DDS::RETCODE_OK) {
01186 if (DCPS_debug_level >= 2) {
01187 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
01188 ACE_TEXT(" instance %d could not obtain buffer for sample")
01189 ACE_TEXT(" releasing allotted sample and returning\n"),
01190 handle));
01191 }
01192 this->release_buffer(element);
01193 return ret;
01194 }
01195 }
01196
01197 data_holder_.enqueue_tail(element);
01198
01199 return ret;
01200 }
01201
01202 void
01203 WriteDataContainer::release_buffer(DataSampleElement* element)
01204 {
01205 if (element->get_header().message_id_ == SAMPLE_DATA)
01206 data_holder_.dequeue(element);
01207
01208 ACE_DES_FREE(element,
01209 sample_list_element_allocator_.free,
01210 DataSampleElement);
01211 }
01212
01213 void
01214 WriteDataContainer::unregister_all()
01215 {
01216 DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
01217 shutdown_ = true;
01218
01219 {
01220
01221
01222 ACE_GUARD(ACE_Recursive_Thread_Mutex,
01223 guard,
01224 this->lock_);
01225
01226
01227 (void) this->writer_->remove_all_msgs();
01228
01229
01230 if (waiting_on_release_) {
01231 condition_.broadcast();
01232 }
01233 }
01234 DDS::ReturnCode_t ret;
01235 DataSample* registered_sample;
01236 PublicationInstanceMapType::iterator it = instances_.begin();
01237
01238 while (it != instances_.end()) {
01239
01240 ret = dispose(it->first, registered_sample, false);
01241
01242 if (ret != DDS::RETCODE_OK) {
01243 ACE_ERROR((LM_ERROR,
01244 ACE_TEXT("(%P|%t) ERROR: ")
01245 ACE_TEXT("WriteDataContainer::unregister_all, ")
01246 ACE_TEXT("dispose instance %X failed\n"),
01247 it->first));
01248 }
01249
01250 ret = unregister(it->first, registered_sample, false);
01251
01252 if (ret != DDS::RETCODE_OK) {
01253 ACE_ERROR((LM_ERROR,
01254 ACE_TEXT("(%P|%t) ERROR: ")
01255 ACE_TEXT("WriteDataContainer::unregister_all, ")
01256 ACE_TEXT("unregister instance %X failed\n"),
01257 it->first));
01258 }
01259
01260 PublicationInstance* instance = it->second;
01261
01262 delete instance;
01263
01264
01265 PublicationInstanceMapType::iterator it_next = it;
01266 ++it_next;
01267
01268 unbind(instances_, it->first);
01269 it = it_next;
01270 }
01271
01272 ACE_UNUSED_ARG(registered_sample);
01273 }
01274
01275 PublicationInstance*
01276 WriteDataContainer::get_handle_instance(DDS::InstanceHandle_t handle)
01277 {
01278 PublicationInstance* instance = 0;
01279
01280 if (0 != find(instances_, handle, instance)) {
01281 ACE_ERROR((LM_ERROR,
01282 ACE_TEXT("(%P|%t) ERROR: ")
01283 ACE_TEXT("WriteDataContainer::get_handle_instance, ")
01284 ACE_TEXT("lookup for %d failed\n"),
01285 handle));
01286 }
01287
01288 return instance;
01289 }
01290
01291 void
01292 WriteDataContainer::copy_and_append(SendStateDataSampleList& list,
01293 const SendStateDataSampleList& appended,
01294 const RepoId& reader_id,
01295 const DDS::LifespanQosPolicy& lifespan
01296 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01297 ,
01298 const OPENDDS_STRING& filterClassName,
01299 const FilterEvaluator* eval,
01300 const DDS::StringSeq& params
01301 #endif
01302 )
01303 {
01304 for (SendStateDataSampleList::const_iterator cur = appended.begin();
01305 cur != appended.end(); ++cur) {
01306
01307
01308
01309 if (resend_data_expired(*cur, lifespan))
01310 continue;
01311
01312 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01313 if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
01314 continue;
01315 #endif
01316
01317 DataSampleElement* element = 0;
01318 ACE_NEW_MALLOC(element,
01319 static_cast<DataSampleElement*>(
01320 sample_list_element_allocator_.malloc(
01321 sizeof(DataSampleElement))),
01322 DataSampleElement(*cur));
01323
01324 element->set_num_subs(1);
01325 element->set_sub_id(0, reader_id);
01326
01327 list.enqueue_tail(element);
01328 }
01329 }
01330
01331 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
01332 bool
01333 WriteDataContainer::persist_data()
01334 {
01335 bool result = true;
01336
01337
01338
01339
01340 if (this->durability_cache_) {
01341
01342
01343
01344
01345
01346
01347
01348 bool const inserted =
01349 this->durability_cache_->insert(this->domain_id_,
01350 this->topic_name_,
01351 this->type_name_,
01352 this->sent_data_,
01353 this->durability_service_
01354 );
01355
01356 result = inserted;
01357
01358 if (!inserted)
01359 ACE_ERROR((LM_ERROR,
01360 ACE_TEXT("(%P|%t) ERROR: ")
01361 ACE_TEXT("WriteDataContainer::persist_data, ")
01362 ACE_TEXT("failed to make data durable for ")
01363 ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
01364 this->domain_id_,
01365 this->topic_name_,
01366 this->type_name_));
01367 }
01368
01369 return result;
01370 }
01371 #endif
01372
01373 void WriteDataContainer::reschedule_deadline()
01374 {
01375 for (PublicationInstanceMapType::iterator iter = instances_.begin();
01376 iter != instances_.end();
01377 ++iter) {
01378 if (iter->second->deadline_timer_id_ != -1) {
01379 if (this->writer_->watchdog_->reset_timer_interval(iter->second->deadline_timer_id_) == -1) {
01380 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) WriteDataContainer::reschedule_deadline %p\n")
01381 ACE_TEXT("reset_timer_interval")));
01382 }
01383 }
01384 }
01385 }
01386
01387 void
01388 WriteDataContainer::wait_pending()
01389 {
01390 ACE_Time_Value pending_timeout =
01391 TheServiceParticipant->pending_timeout();
01392
01393 ACE_Time_Value* pTimeout = 0;
01394
01395 if (pending_timeout != ACE_Time_Value::zero) {
01396 pTimeout = &pending_timeout;
01397 pending_timeout += ACE_OS::gettimeofday();
01398 }
01399
01400 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01401 const bool report = DCPS_debug_level > 0 && pending_data();
01402 if (report) {
01403 ACE_TCHAR date_time[50];
01404 ACE_TCHAR* const time =
01405 MessageTracker::timestamp(pending_timeout,
01406 date_time,
01407 50);
01408 ACE_DEBUG((LM_DEBUG,
01409 ACE_TEXT("%T (%P|%t) WriteDataContainer::wait_pending timeout ")
01410 ACE_TEXT("at %s\n"),
01411 (pending_timeout == ACE_Time_Value::zero ?
01412 ACE_TEXT("(no timeout)") : time)));
01413 }
01414 while (true) {
01415
01416 if (!pending_data())
01417 break;
01418
01419 if (empty_condition_.wait(pTimeout) == -1 && pending_data()) {
01420 if (DCPS_debug_level) {
01421 ACE_DEBUG((LM_INFO,
01422 ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending %p\n"),
01423 ACE_TEXT("Timed out waiting for messages to be transported")));
01424 this->log_send_state_lists("WriteDataContainer::wait_pending - wait failed: ");
01425 }
01426 break;
01427 }
01428 }
01429 if (report) {
01430 ACE_DEBUG((LM_DEBUG,
01431 "%T (%P|%t) WriteDataContainer::wait_pending done\n"));
01432 }
01433 }
01434
01435 void
01436 WriteDataContainer::get_instance_handles(InstanceHandleVec& instance_handles)
01437 {
01438 ACE_GUARD(ACE_Recursive_Thread_Mutex,
01439 guard,
01440 this->lock_);
01441 PublicationInstanceMapType::iterator it = instances_.begin();
01442
01443 while (it != instances_.end()) {
01444 instance_handles.push_back(it->second->instance_handle_);
01445 ++it;
01446 }
01447 }
01448
01449 DDS::ReturnCode_t
01450 WriteDataContainer::wait_ack_of_seq(const ACE_Time_Value& abs_deadline, const SequenceNumber& sequence)
01451 {
01452 ACE_Time_Value deadline(abs_deadline);
01453 DDS::ReturnCode_t ret = DDS::RETCODE_OK;
01454 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->wfa_lock_, DDS::RETCODE_ERROR);
01455
01456 while (ACE_OS::gettimeofday() < deadline) {
01457
01458 if (!sequence_acknowledged(sequence)) {
01459
01460
01461 int const wait_result = wfa_condition_.wait(&deadline);
01462
01463 if (wait_result != 0) {
01464 if (errno == ETIME) {
01465 if (DCPS_debug_level >= 2) {
01466 ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
01467 ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
01468 sequence.getValue()));
01469 }
01470 ret = DDS::RETCODE_TIMEOUT;
01471 } else {
01472 ret = DDS::RETCODE_ERROR;
01473 }
01474 }
01475 } else {
01476 ret = DDS::RETCODE_OK;
01477 break;
01478 }
01479 }
01480
01481 return ret;
01482 }
01483
01484 bool
01485 WriteDataContainer::sequence_acknowledged(const SequenceNumber sequence)
01486 {
01487 if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
01488
01489 return true;
01490 }
01491
01492 SequenceNumber acked = acked_sequences_.cumulative_ack();
01493 if (DCPS_debug_level >= 10) {
01494 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged ")
01495 ACE_TEXT("- cumulative ack is currently: %q\n"), acked.getValue()));
01496 }
01497 if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
01498
01499
01500 return false;
01501 }
01502 return true;
01503 }
01504
01505 void
01506 WriteDataContainer::wakeup_blocking_writers (DataSampleElement* stale)
01507 {
01508 if (stale && waiting_on_release_) {
01509 waiting_on_release_ = false;
01510
01511 condition_.broadcast();
01512 }
01513 }
01514
01515 void
01516 WriteDataContainer::log_send_state_lists (OPENDDS_STRING description)
01517 {
01518 ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
01519 description.c_str(),
01520 unsent_data_.size(),
01521 sending_data_.size(),
01522 sent_data_.size(),
01523 orphaned_to_transport_.size(),
01524 num_all_samples(),
01525 instances_.size()));
01526 }
01527
01528 }
01529 }