00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00011
00012 #include "ace/Condition_Recursive_Thread_Mutex.h"
00013 #include "DataDurabilityCache.h"
00014 #include "Service_Participant.h"
00015 #include "SendStateDataSampleList.h"
00016 #include "DataSampleElement.h"
00017 #include "WriteDataContainer.h"
00018 #include "DataWriterImpl.h"
00019 #include "Qos_Helper.h"
00020 #include "debug.h"
00021 #include "SafetyProfileStreams.h"
00022
00023 #include "tao/ORB_Core.h"
00024
00025 #include "ace/Reactor.h"
00026 #include "ace/Message_Block.h"
00027 #include "ace/Log_Msg.h"
00028 #include "ace/Malloc_T.h"
00029 #include "ace/MMAP_Memory_Pool.h"
00030 #include "ace/OS_NS_sys_time.h"
00031
00032 #include <fstream>
00033 #include <algorithm>
00034
00035 namespace {
00036
00037 void cleanup_directory(const OPENDDS_VECTOR(OPENDDS_STRING) & path,
00038 const ACE_CString & data_dir)
00039 {
00040 if (path.empty()) return;
00041
00042 using OpenDDS::FileSystemStorage::Directory;
00043 Directory::Ptr dir = Directory::create(data_dir.c_str());
00044 dir = dir->get_dir(path);
00045 Directory::Ptr parent = dir->parent();
00046 dir->remove();
00047
00048
00049 while (!parent.is_nil() &&
00050 (parent->begin_dirs() == parent->end_dirs())) {
00051 Directory::Ptr to_delete = parent;
00052 parent = parent->parent();
00053 to_delete->remove();
00054 }
00055 }
00056
00057
00058
00059
00060
00061
00062
00063 class Cleanup_Handler : public ACE_Event_Handler {
00064 public:
00065
00066 typedef
00067 OpenDDS::DCPS::DataDurabilityCache::sample_data_type data_type;
00068 typedef
00069 OpenDDS::DCPS::DataDurabilityCache::sample_list_type list_type;
00070 typedef ptrdiff_t list_difference_type;
00071
00072 Cleanup_Handler(list_type & sample_list,
00073 list_difference_type index,
00074 ACE_Allocator * allocator,
00075 const OPENDDS_VECTOR(OPENDDS_STRING) & path,
00076 const ACE_CString & data_dir)
00077 : sample_list_(sample_list)
00078 , index_(index)
00079 , allocator_(allocator)
00080 , tid_(-1)
00081 , timer_ids_(0)
00082 , path_(path)
00083 , data_dir_(data_dir) {
00084 this->reference_counting_policy().value(
00085 ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00086 }
00087
00088 virtual int handle_timeout(ACE_Time_Value const & ,
00089 void const * ) {
00090 if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00091 ACE_DEBUG((LM_DEBUG,
00092 ACE_TEXT("(%P|%t) OpenDDS - Cleaning up ")
00093 ACE_TEXT("data durability cache.\n")));
00094 }
00095
00096 typedef OpenDDS::DCPS::DurabilityQueue<
00097 OpenDDS::DCPS::DataDurabilityCache::sample_data_type>
00098 data_queue_type;
00099
00100
00101 data_queue_type *& queue = this->sample_list_[this->index_];
00102 ACE_DES_FREE(queue,
00103 this->allocator_->free,
00104 data_queue_type);
00105 queue = 0;
00106
00107 try {
00108 cleanup_directory(path_, this->data_dir_);
00109
00110 } catch (const std::exception& ex) {
00111 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00112 ACE_ERROR((LM_ERROR,
00113 ACE_TEXT("(%P|%t) Cleanup_Handler::handle_timout ")
00114 ACE_TEXT("couldn't remove directory for PERSISTENT ")
00115 ACE_TEXT("data: %C\n"), ex.what()));
00116 }
00117 }
00118
00119
00120 this->timer_ids_->remove(this->tid_);
00121
00122 return 0;
00123 }
00124
00125 void timer_id(
00126 long tid,
00127 OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type * timer_ids) {
00128 this->tid_ = tid;
00129 this->timer_ids_ = timer_ids;
00130 }
00131
00132 protected:
00133
00134 virtual ~Cleanup_Handler() {}
00135
00136 private:
00137
00138
00139
00140 list_type & sample_list_;
00141
00142
00143 list_difference_type const index_;
00144
00145
00146 ACE_Allocator * const allocator_;
00147
00148
00149 long tid_;
00150
00151
00152
00153
00154
00155
00156
00157
00158 OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type *
00159 timer_ids_;
00160
00161 OPENDDS_VECTOR(OPENDDS_STRING) path_;
00162
00163 ACE_CString data_dir_;
00164 };
00165
00166 }
00167
00168 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type()
00169 : length_(0)
00170 , sample_(0)
00171 , allocator_(0)
00172 {
00173 this->source_timestamp_.sec = 0;
00174 this->source_timestamp_.nanosec = 0;
00175 }
00176
00177 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00178 DataSampleElement & element,
00179 ACE_Allocator * a)
00180 : length_(0)
00181 , sample_(0)
00182 , allocator_(a)
00183 {
00184 this->source_timestamp_.sec = element.get_header().source_timestamp_sec_;
00185 this->source_timestamp_.nanosec = element.get_header().source_timestamp_nanosec_;
00186
00187
00188
00189
00190
00191
00192
00193 ACE_Message_Block const * const data = element.get_sample()->cont();
00194 init(data);
00195 }
00196
00197 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00198 DDS::Time_t timestamp, const ACE_Message_Block & mb, ACE_Allocator * a)
00199 : length_(0)
00200 , sample_(0)
00201 , source_timestamp_(timestamp)
00202 , allocator_(a)
00203 {
00204 init(&mb);
00205 }
00206
00207 void
00208 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::init
00209 (const ACE_Message_Block * data)
00210 {
00211 this->length_ = data->total_length();
00212
00213 ACE_ALLOCATOR(this->sample_,
00214 static_cast<char *>(
00215 this->allocator_->malloc(this->length_)));
00216
00217 char * buf = this->sample_;
00218
00219 for (ACE_Message_Block const * i = data;
00220 i != 0;
00221 i = i->cont()) {
00222 ACE_OS::memcpy(buf, i->rd_ptr(), i->length());
00223 buf += i->length();
00224 }
00225 }
00226
00227 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00228 sample_data_type const & rhs)
00229 : length_(rhs.length_)
00230 , sample_(0)
00231 , allocator_(rhs.allocator_)
00232 {
00233 this->source_timestamp_.sec = rhs.source_timestamp_.sec;
00234 this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
00235
00236 if (this->allocator_) {
00237 ACE_ALLOCATOR(this->sample_,
00238 static_cast<char *>(
00239 this->allocator_->malloc(rhs.length_)));
00240 ACE_OS::memcpy(this->sample_, rhs.sample_, rhs.length_);
00241 }
00242 }
00243
00244 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::~sample_data_type()
00245 {
00246 if (this->allocator_)
00247 this->allocator_->free(this->sample_);
00248 }
00249
00250 OpenDDS::DCPS::DataDurabilityCache::sample_data_type &
00251 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::operator= (
00252 sample_data_type const & rhs)
00253 {
00254
00255 sample_data_type tmp(rhs);
00256 std::swap(this->length_, tmp.length_);
00257 std::swap(this->sample_, tmp.sample_);
00258 std::swap(this->allocator_, tmp.allocator_);
00259
00260 this->source_timestamp_.sec = rhs.source_timestamp_.sec;
00261 this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
00262
00263 return *this;
00264 }
00265
00266 void
00267 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(
00268 char const *& s,
00269 size_t & len,
00270 DDS::Time_t & source_timestamp)
00271 {
00272 s = this->sample_;
00273 len = this->length_;
00274 source_timestamp.sec = this->source_timestamp_.sec;
00275 source_timestamp.nanosec = this->source_timestamp_.nanosec;
00276 }
00277
00278 void
00279 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::set_allocator(
00280 ACE_Allocator * allocator)
00281 {
00282 this->allocator_ = allocator;
00283 }
00284
00285 OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
00286 DDS::DurabilityQosPolicyKind kind)
00287 : allocator_(make_allocator(kind))
00288 , kind_(kind)
00289 , samples_(0)
00290 , cleanup_timer_ids_()
00291 , lock_()
00292 , reactor_(0)
00293 {
00294 init();
00295 }
00296
00297 OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
00298 DDS::DurabilityQosPolicyKind kind,
00299 ACE_CString & data_dir)
00300 : allocator_(make_allocator(kind))
00301 , kind_(kind)
00302 , data_dir_(data_dir)
00303 , samples_(0)
00304 , cleanup_timer_ids_()
00305 , lock_()
00306 , reactor_(0)
00307 {
00308 init();
00309 }
00310
00311 void OpenDDS::DCPS::DataDurabilityCache::init()
00312 {
00313 ACE_Allocator * const allocator = this->allocator_.get();
00314 ACE_NEW_MALLOC(
00315 this->samples_,
00316 static_cast<sample_map_type *>(
00317 allocator->malloc(sizeof(sample_map_type))),
00318 sample_map_type(allocator));
00319
00320 typedef DurabilityQueue<sample_data_type> data_queue_type;
00321
00322 if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00323
00324
00325 using OpenDDS::FileSystemStorage::Directory;
00326 using OpenDDS::FileSystemStorage::File;
00327 Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
00328 OPENDDS_VECTOR(OPENDDS_STRING) path(4);
00329
00330 for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
00331 domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
00332 path[0] = domain->name();
00333 DDS::DomainId_t domain_id;
00334 {
00335 domain_id = ACE_OS::atoi(path[0].c_str());
00336 }
00337
00338 for (Directory::DirectoryIterator topic = domain->begin_dirs(),
00339 topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
00340 path[1] = topic->name();
00341
00342 for (Directory::DirectoryIterator type = topic->begin_dirs(),
00343 type_end = topic->end_dirs(); type != type_end; ++type) {
00344 path[2] = type->name();
00345
00346 key_type key(domain_id, path[1].c_str(), path[2].c_str(),
00347 allocator);
00348 sample_list_type * sample_list = 0;
00349 ACE_NEW_MALLOC(sample_list,
00350 static_cast<sample_list_type *>(
00351 allocator->malloc(sizeof(sample_list_type))),
00352 sample_list_type(0, static_cast<data_queue_type *>(0),
00353 allocator));
00354 this->samples_->bind(key, sample_list, allocator);
00355
00356 for (Directory::DirectoryIterator dw = type->begin_dirs(),
00357 dw_end = type->end_dirs(); dw != dw_end; ++dw) {
00358 path[3] = dw->name();
00359
00360 size_t old_len = sample_list->size();
00361 sample_list->size(old_len + 1);
00362 data_queue_type *& slot = (*sample_list)[old_len];
00363
00364
00365
00366 data_queue_type * sample_queue = 0;
00367 ACE_NEW_MALLOC(sample_queue,
00368 static_cast<data_queue_type *>(
00369 allocator->malloc(sizeof(data_queue_type))),
00370 data_queue_type(allocator));
00371
00372 slot = sample_queue;
00373 sample_queue->fs_path_ = path;
00374
00375 for (Directory::FileIterator file = dw->begin_files(),
00376 file_end = dw->end_files(); file != file_end; ++file) {
00377 std::ifstream is;
00378
00379 if (!file->read(is)) {
00380 if (DCPS_debug_level) {
00381 ACE_ERROR((LM_ERROR,
00382 ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
00383 ACE_TEXT("couldn't open file for PERSISTENT ")
00384 ACE_TEXT("data: %C\n"), file->name().c_str()));
00385 }
00386 continue;
00387 }
00388
00389 DDS::Time_t timestamp;
00390 is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
00391 is.get();
00392
00393 const size_t CHUNK = 4096;
00394 ACE_Message_Block mb(CHUNK);
00395 ACE_Message_Block * current = &mb;
00396
00397 while (!is.eof()) {
00398 is.read(current->wr_ptr(), current->space());
00399
00400 if (is.bad()) break;
00401
00402 current->wr_ptr((size_t)is.gcount());
00403
00404 if (current->space() == 0) {
00405 ACE_Message_Block * old = current;
00406 current = new ACE_Message_Block(CHUNK);
00407 old->cont(current);
00408 }
00409 }
00410
00411 sample_queue->enqueue_tail(
00412 sample_data_type(timestamp, mb, allocator));
00413
00414 if (mb.cont()) mb.cont()->release();
00415 }
00416 }
00417 }
00418 }
00419 }
00420 }
00421
00422 this->reactor_ = TheServiceParticipant->timer();
00423 }
00424
00425 OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache()
00426 {
00427
00428 timer_id_list_type::const_iterator const end(
00429 this->cleanup_timer_ids_.end());
00430
00431 for (timer_id_list_type::const_iterator i(
00432 this->cleanup_timer_ids_.begin());
00433 i != end;
00434 ++i) {
00435 (void) this->reactor_->cancel_timer(*i);
00436 }
00437
00438
00439 if (this->allocator_.get() != 0) {
00440 sample_map_type::iterator const map_end = this->samples_->end();
00441
00442 for (sample_map_type::iterator s = this->samples_->begin();
00443 s != map_end;
00444 ++s) {
00445 sample_list_type * const list = (*s).int_id_;
00446
00447 size_t const len = list->size();;
00448
00449 for (size_t l = 0; l != len; ++l) {
00450 ACE_DES_FREE((*list)[l],
00451 this->allocator_->free,
00452 DurabilityQueue<sample_data_type>);
00453 }
00454
00455 ACE_DES_FREE(list,
00456 this->allocator_->free,
00457 DurabilityArray<DurabilityQueue<sample_data_type> *>);
00458 }
00459
00460
00461
00462
00463
00464
00465 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
00466 ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE);
00467 #undef OPENDDS_MAP_TYPE
00468 }
00469 }
00470
00471 bool
00472 OpenDDS::DCPS::DataDurabilityCache::insert(
00473 DDS::DomainId_t domain_id,
00474 char const * topic_name,
00475 char const * type_name,
00476 SendStateDataSampleList & the_data,
00477 DDS::DurabilityServiceQosPolicy const & qos)
00478 {
00479 if (the_data.size() == 0)
00480 return true;
00481
00482
00483
00484 CORBA::Long const depth =
00485 get_instance_sample_list_depth(
00486 qos.history_kind,
00487 qos.history_depth,
00488 qos.max_samples_per_instance);
00489
00490
00491 SendStateDataSampleList::iterator element(the_data.begin());
00492
00493 if (depth < 0)
00494 return false;
00495
00496 else if (depth == 0)
00497 return true;
00498
00499 else if (the_data.size() > depth) {
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511 ssize_t const advance_amount = the_data.size() - depth;
00512 std::advance(element, advance_amount);
00513 }
00514
00515
00516
00517
00518
00519 key_type const key(domain_id,
00520 topic_name,
00521 type_name,
00522 this->allocator_.get());
00523 SendStateDataSampleList::iterator the_end(the_data.end());
00524 sample_list_type * sample_list = 0;
00525
00526 typedef DurabilityQueue<sample_data_type> data_queue_type;
00527 data_queue_type ** slot = 0;
00528 data_queue_type * samples = 0;
00529
00530 using OpenDDS::FileSystemStorage::Directory;
00531 using OpenDDS::FileSystemStorage::File;
00532 Directory::Ptr dir;
00533 OPENDDS_VECTOR(OPENDDS_STRING) path;
00534 {
00535 ACE_Allocator * const allocator = this->allocator_.get();
00536
00537 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00538
00539 if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00540 try {
00541 dir = Directory::create(this->data_dir_.c_str());
00542
00543 path.push_back(to_dds_string(domain_id));
00544 path.push_back(topic_name);
00545 path.push_back(type_name);
00546 dir = dir->get_dir(path);
00547
00548
00549
00550
00551 dir = dir->create_next_dir();
00552 path.push_back(dir->name());
00553
00554 } catch (const std::exception& ex) {
00555 if (DCPS_debug_level > 0) {
00556 ACE_ERROR((LM_ERROR,
00557 ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00558 ACE_TEXT("couldn't create directory for PERSISTENT ")
00559 ACE_TEXT("data: %C\n"), ex.what()));
00560 }
00561
00562 dir = 0;
00563 }
00564 }
00565
00566 if (this->samples_->find(key, sample_list, allocator) != 0) {
00567
00568
00569 ACE_NEW_MALLOC_RETURN(
00570 sample_list,
00571 static_cast<sample_list_type *>(
00572 allocator->malloc(sizeof(sample_list_type))),
00573 sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
00574 false);
00575
00576 if (this->samples_->bind(key, sample_list, allocator) != 0)
00577 return false;
00578 }
00579
00580 data_queue_type ** const begin = &((*sample_list)[0]);
00581 data_queue_type ** const end =
00582 begin + sample_list->size();
00583
00584
00585
00586
00587 slot = std::find(begin,
00588 end,
00589 static_cast<data_queue_type *>(0));
00590
00591 if (slot == end) {
00592
00593 size_t const old_len = sample_list->size();
00594 sample_list->size(old_len + 1);
00595
00596 data_queue_type ** new_begin = &((*sample_list)[0]);
00597 slot = new_begin + old_len;
00598 }
00599
00600 ACE_NEW_MALLOC_RETURN(
00601 samples,
00602 static_cast<data_queue_type *>(
00603 allocator->malloc(sizeof(data_queue_type))),
00604 data_queue_type(allocator),
00605 false);
00606
00607
00608 *slot = samples;
00609
00610 if (!dir.is_nil()) {
00611 samples->fs_path_ = path;
00612 }
00613
00614 for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
00615 DataSampleElement& elem = *i;
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625 if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) {
00626 continue;
00627 }
00628
00629 sample_data_type sample(elem, allocator);
00630
00631 if (samples->enqueue_tail(sample) != 0)
00632 return false;
00633
00634 if (!dir.is_nil()) {
00635 try {
00636 File::Ptr f = dir->create_next_file();
00637 std::ofstream os;
00638
00639 if (!f->write(os)) return false;
00640
00641 DDS::Time_t timestamp;
00642 const char * data;
00643 size_t len;
00644 sample.get_sample(data, len, timestamp);
00645
00646 os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
00647 os.write(data, len);
00648
00649 } catch (const std::exception& ex) {
00650 if (DCPS_debug_level > 0) {
00651 ACE_ERROR((LM_ERROR,
00652 ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00653 ACE_TEXT("couldn't write sample for PERSISTENT ")
00654 ACE_TEXT("data: %C\n"), ex.what()));
00655 }
00656 }
00657 }
00658 }
00659 }
00660
00661
00662
00663
00664
00665 ACE_Time_Value const cleanup_delay(
00666 duration_to_time_value(qos.service_cleanup_delay));
00667
00668 if (cleanup_delay > ACE_Time_Value::zero) {
00669 if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00670 ACE_DEBUG((LM_DEBUG,
00671 ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
00672 ACE_TEXT("cleanup for\n")
00673 ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
00674 ACE_TEXT("== (%d, %C, %C)\n"),
00675 domain_id,
00676 topic_name,
00677 type_name));
00678 }
00679
00680 Cleanup_Handler * const cleanup =
00681 new Cleanup_Handler(*sample_list,
00682 slot - &(*sample_list)[0],
00683 this->allocator_.get(),
00684 path,
00685 this->data_dir_);
00686 ACE_Event_Handler_var safe_cleanup(cleanup);
00687 long const tid =
00688 this->reactor_->schedule_timer(cleanup,
00689 0,
00690 cleanup_delay);
00691 if (tid == -1) {
00692 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00693
00694 ACE_DES_FREE(samples,
00695 this->allocator_->free,
00696 DurabilityQueue<sample_data_type>);
00697 *slot = 0;
00698
00699 return false;
00700
00701 } else {
00702 {
00703 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00704 this->cleanup_timer_ids_.push_back(tid);
00705 }
00706
00707 cleanup->timer_id(tid,
00708 &this->cleanup_timer_ids_);
00709 }
00710 }
00711
00712 return true;
00713 }
00714
00715 bool
00716 OpenDDS::DCPS::DataDurabilityCache::get_data(
00717 DDS::DomainId_t domain_id,
00718 char const * topic_name,
00719 char const * type_name,
00720 DataWriterImpl * data_writer,
00721 ACE_Allocator * mb_allocator,
00722 ACE_Allocator * db_allocator,
00723 DDS::LifespanQosPolicy const & )
00724 {
00725 key_type const key(domain_id,
00726 topic_name,
00727 type_name,
00728 this->allocator_.get());
00729
00730 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00731
00732 sample_list_type * p_sample_list = 0;
00733
00734 if (this->samples_->find(key,
00735 p_sample_list,
00736 this->allocator_.get()) == -1)
00737 return true;
00738
00739 else if (p_sample_list == 0)
00740 return false;
00741
00742 sample_list_type & sample_list = *p_sample_list;
00743
00744
00745
00746
00747 sample_data_type * registration_data = 0;
00748
00749 if (sample_list[0]->get(registration_data, 0) == -1)
00750 return false;
00751
00752 char const * marshaled_sample = 0;
00753 size_t marshaled_sample_length = 0;
00754 DDS::Time_t registration_timestamp;
00755
00756 registration_data->get_sample(marshaled_sample,
00757 marshaled_sample_length,
00758 registration_timestamp);
00759
00760
00761
00762 std::auto_ptr<DataSample> registration_sample(
00763 new ACE_Message_Block(marshaled_sample_length,
00764 ACE_Message_Block::MB_DATA,
00765 0,
00766 0,
00767 0,
00768 data_writer->get_db_lock()));
00769
00770 ACE_OS::memcpy(registration_sample->wr_ptr(),
00771 marshaled_sample,
00772 marshaled_sample_length);
00773
00774 registration_sample->wr_ptr(marshaled_sample_length);
00775
00776 DDS::InstanceHandle_t handle = DDS::HANDLE_NIL;
00777
00778
00779
00780
00781
00782
00783 DDS::ReturnCode_t ret =
00784 data_writer->register_instance_from_durable_data(handle,
00785 registration_sample.get(),
00786 registration_timestamp);
00787
00788 if (ret != DDS::RETCODE_OK)
00789 return false;
00790
00791 registration_sample.release();
00792
00793 typedef DurabilityQueue<sample_data_type> data_queue_type;
00794 size_t const len = sample_list.size();
00795
00796 for (size_t i = 0; i != len; ++i) {
00797 data_queue_type * const q = sample_list[i];
00798
00799 for (data_queue_type::ITERATOR j = q->begin();
00800 !j.done();
00801 j.advance()) {
00802 sample_data_type * data = 0;
00803
00804 if (j.next(data) == 0)
00805 return false;
00806
00807 char const * sample = 0;
00808 size_t sample_length = 0;
00809 DDS::Time_t source_timestamp;
00810
00811 data->get_sample(sample, sample_length, source_timestamp);
00812
00813 ACE_Message_Block * mb = 0;
00814 ACE_NEW_MALLOC_RETURN(mb,
00815 static_cast<ACE_Message_Block*>(
00816 mb_allocator->malloc(
00817 sizeof(ACE_Message_Block))),
00818 ACE_Message_Block(
00819 sample_length,
00820 ACE_Message_Block::MB_DATA,
00821 0,
00822 0,
00823 0,
00824 data_writer->get_db_lock(),
00825 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00826 ACE_Time_Value::zero,
00827 ACE_Time_Value::max_time,
00828 db_allocator,
00829 mb_allocator),
00830 false);
00831
00832 ACE_OS::memcpy(mb->wr_ptr(),
00833 sample,
00834 sample_length);
00835 mb->wr_ptr(sample_length);
00836
00837 const DDS::ReturnCode_t ret = data_writer->write(mb, handle,
00838 source_timestamp, 0 );
00839
00840 if (ret != DDS::RETCODE_OK) {
00841 ACE_DES_FREE(mb,
00842 mb_allocator->free,
00843 ACE_Message_Block);
00844 return false;
00845 }
00846 }
00847
00848
00849
00850
00851
00852
00853
00854 q->reset();
00855
00856 try {
00857 cleanup_directory(q->fs_path_, this->data_dir_);
00858
00859 } catch (const std::exception& ex) {
00860 if (DCPS_debug_level > 0) {
00861 ACE_ERROR((LM_ERROR,
00862 ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
00863 ACE_TEXT("couldn't remove directory for PERSISTENT ")
00864 ACE_TEXT("data: %C\n"), ex.what()));
00865 }
00866 }
00867 }
00868 return true;
00869 }
00870
00871 std::auto_ptr<ACE_Allocator>
00872 OpenDDS::DCPS::DataDurabilityCache::make_allocator(
00873 DDS::DurabilityQosPolicyKind)
00874 {
00875
00876
00877
00878 return std::auto_ptr<ACE_Allocator> (new ACE_New_Allocator);
00879 }
00880
00881 #endif // OPENDDS_NO_PERSISTENCE_PROFILE