00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00011
00012 #include "dds/DdsDcpsDomainC.h"
00013 #include "dds/DdsDcpsTypeSupportExtC.h"
00014 #include "DataDurabilityCache.h"
00015 #include "SendStateDataSampleList.h"
00016 #include "DataSampleElement.h"
00017 #include "WriteDataContainer.h"
00018 #include "DataWriterImpl.h"
00019 #include "Time_Helper.h"
00020 #include "debug.h"
00021 #include "SafetyProfileStreams.h"
00022 #include "Service_Participant.h"
00023 #include "RcEventHandler.h"
00024
00025 #include "ace/Reactor.h"
00026 #include "ace/Message_Block.h"
00027 #include "ace/Log_Msg.h"
00028 #include "ace/Malloc_T.h"
00029 #include "ace/MMAP_Memory_Pool.h"
00030 #include "ace/OS_NS_sys_time.h"
00031
00032 #include <fstream>
00033 #include <algorithm>
00034
00035 namespace {
00036
00037 void cleanup_directory(const OPENDDS_VECTOR(OPENDDS_STRING) & path,
00038 const ACE_CString & data_dir)
00039 {
00040 if (path.empty()) return;
00041
00042 using OpenDDS::FileSystemStorage::Directory;
00043 Directory::Ptr dir = Directory::create(data_dir.c_str());
00044 dir = dir->get_dir(path);
00045 Directory::Ptr parent = dir->parent();
00046 dir->remove();
00047
00048
00049 while (!parent.is_nil() &&
00050 (parent->begin_dirs() == parent->end_dirs())) {
00051 Directory::Ptr to_delete = parent;
00052 parent = parent->parent();
00053 to_delete->remove();
00054 }
00055 }
00056
00057
00058
00059
00060
00061
00062
00063 class Cleanup_Handler : public OpenDDS::DCPS::RcEventHandler {
00064 public:
00065
00066 typedef
00067 OpenDDS::DCPS::DataDurabilityCache::sample_data_type data_type;
00068 typedef
00069 OpenDDS::DCPS::DataDurabilityCache::sample_list_type list_type;
00070 typedef ptrdiff_t list_difference_type;
00071
00072 Cleanup_Handler(list_type & sample_list,
00073 list_difference_type index,
00074 ACE_Allocator * allocator,
00075 const OPENDDS_VECTOR(OPENDDS_STRING) & path,
00076 const ACE_CString & data_dir)
00077 : sample_list_(sample_list)
00078 , index_(index)
00079 , allocator_(allocator)
00080 , tid_(-1)
00081 , timer_ids_(0)
00082 , path_(path)
00083 , data_dir_(data_dir)
00084 {
00085 }
00086
00087 virtual int handle_timeout(ACE_Time_Value const & ,
00088 void const * ) {
00089 if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00090 ACE_DEBUG((LM_DEBUG,
00091 ACE_TEXT("(%P|%t) OpenDDS - Cleaning up ")
00092 ACE_TEXT("data durability cache.\n")));
00093 }
00094
00095 typedef OpenDDS::DCPS::DurabilityQueue<
00096 OpenDDS::DCPS::DataDurabilityCache::sample_data_type>
00097 data_queue_type;
00098
00099
00100 data_queue_type *& queue = this->sample_list_[this->index_];
00101 ACE_DES_FREE(queue,
00102 this->allocator_->free,
00103 data_queue_type);
00104 queue = 0;
00105
00106 try {
00107 cleanup_directory(path_, this->data_dir_);
00108
00109 } catch (const std::exception& ex) {
00110 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00111 ACE_ERROR((LM_ERROR,
00112 ACE_TEXT("(%P|%t) Cleanup_Handler::handle_timout ")
00113 ACE_TEXT("couldn't remove directory for PERSISTENT ")
00114 ACE_TEXT("data: %C\n"), ex.what()));
00115 }
00116 }
00117
00118
00119 this->timer_ids_->remove(this->tid_);
00120
00121 return 0;
00122 }
00123
00124 void timer_id(
00125 long tid,
00126 OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type * timer_ids) {
00127 this->tid_ = tid;
00128 this->timer_ids_ = timer_ids;
00129 }
00130
00131 protected:
00132
00133 virtual ~Cleanup_Handler() {}
00134
00135 private:
00136
00137
00138
00139 list_type & sample_list_;
00140
00141
00142 list_difference_type const index_;
00143
00144
00145 ACE_Allocator * const allocator_;
00146
00147
00148 long tid_;
00149
00150
00151
00152
00153
00154
00155
00156
00157 OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type *
00158 timer_ids_;
00159
00160 OPENDDS_VECTOR(OPENDDS_STRING) path_;
00161
00162 ACE_CString data_dir_;
00163 };
00164
00165 }
00166
00167 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type()
00168 : length_(0)
00169 , sample_(0)
00170 , allocator_(0)
00171 {
00172 this->source_timestamp_.sec = 0;
00173 this->source_timestamp_.nanosec = 0;
00174 }
00175
00176 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00177 DataSampleElement & element,
00178 ACE_Allocator * a)
00179 : length_(0)
00180 , sample_(0)
00181 , allocator_(a)
00182 {
00183 this->source_timestamp_.sec = element.get_header().source_timestamp_sec_;
00184 this->source_timestamp_.nanosec = element.get_header().source_timestamp_nanosec_;
00185
00186
00187
00188
00189
00190
00191
00192 ACE_Message_Block const * const data = element.get_sample()->cont();
00193 init(data);
00194 }
00195
00196 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00197 DDS::Time_t timestamp, const ACE_Message_Block & mb, ACE_Allocator * a)
00198 : length_(0)
00199 , sample_(0)
00200 , source_timestamp_(timestamp)
00201 , allocator_(a)
00202 {
00203 init(&mb);
00204 }
00205
00206 void
00207 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::init
00208 (const ACE_Message_Block * data)
00209 {
00210 this->length_ = data->total_length();
00211
00212 ACE_ALLOCATOR(this->sample_,
00213 static_cast<char *>(
00214 this->allocator_->malloc(this->length_)));
00215
00216 char * buf = this->sample_;
00217
00218 for (ACE_Message_Block const * i = data;
00219 i != 0;
00220 i = i->cont()) {
00221 ACE_OS::memcpy(buf, i->rd_ptr(), i->length());
00222 buf += i->length();
00223 }
00224 }
00225
00226 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00227 sample_data_type const & rhs)
00228 : length_(rhs.length_)
00229 , sample_(0)
00230 , allocator_(rhs.allocator_)
00231 {
00232 this->source_timestamp_.sec = rhs.source_timestamp_.sec;
00233 this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
00234
00235 if (this->allocator_) {
00236 ACE_ALLOCATOR(this->sample_,
00237 static_cast<char *>(
00238 this->allocator_->malloc(rhs.length_)));
00239 ACE_OS::memcpy(this->sample_, rhs.sample_, rhs.length_);
00240 }
00241 }
00242
00243 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::~sample_data_type()
00244 {
00245 if (this->allocator_)
00246 this->allocator_->free(this->sample_);
00247 }
00248
00249 OpenDDS::DCPS::DataDurabilityCache::sample_data_type &
00250 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::operator= (
00251 sample_data_type const & rhs)
00252 {
00253
00254 sample_data_type tmp(rhs);
00255 std::swap(this->length_, tmp.length_);
00256 std::swap(this->sample_, tmp.sample_);
00257 std::swap(this->allocator_, tmp.allocator_);
00258
00259 this->source_timestamp_.sec = rhs.source_timestamp_.sec;
00260 this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
00261
00262 return *this;
00263 }
00264
00265 void
00266 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(
00267 char const *& s,
00268 size_t & len,
00269 DDS::Time_t & source_timestamp)
00270 {
00271 s = this->sample_;
00272 len = this->length_;
00273 source_timestamp.sec = this->source_timestamp_.sec;
00274 source_timestamp.nanosec = this->source_timestamp_.nanosec;
00275 }
00276
00277 void
00278 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::set_allocator(
00279 ACE_Allocator * allocator)
00280 {
00281 this->allocator_ = allocator;
00282 }
00283
00284 OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
00285 DDS::DurabilityQosPolicyKind kind)
00286 : allocator_(new ACE_New_Allocator)
00287 , kind_(kind)
00288 , samples_(0)
00289 , cleanup_timer_ids_()
00290 , lock_()
00291 , reactor_(0)
00292 {
00293 init();
00294 }
00295
00296 OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
00297 DDS::DurabilityQosPolicyKind kind,
00298 ACE_CString & data_dir)
00299 : allocator_(new ACE_New_Allocator)
00300 , kind_(kind)
00301 , data_dir_(data_dir)
00302 , samples_(0)
00303 , cleanup_timer_ids_()
00304 , lock_()
00305 , reactor_(0)
00306 {
00307 init();
00308 }
00309
00310 void OpenDDS::DCPS::DataDurabilityCache::init()
00311 {
00312 ACE_Allocator * const allocator = this->allocator_.get();
00313 ACE_NEW_MALLOC(
00314 this->samples_,
00315 static_cast<sample_map_type *>(
00316 allocator->malloc(sizeof(sample_map_type))),
00317 sample_map_type(allocator));
00318
00319 typedef DurabilityQueue<sample_data_type> data_queue_type;
00320
00321 if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00322
00323
00324 using OpenDDS::FileSystemStorage::Directory;
00325 using OpenDDS::FileSystemStorage::File;
00326 Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
00327 OPENDDS_VECTOR(OPENDDS_STRING) path(4);
00328
00329 for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
00330 domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
00331 path[0] = domain->name();
00332 DDS::DomainId_t domain_id;
00333 {
00334 domain_id = ACE_OS::atoi(path[0].c_str());
00335 }
00336
00337 for (Directory::DirectoryIterator topic = domain->begin_dirs(),
00338 topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
00339 path[1] = topic->name();
00340
00341 for (Directory::DirectoryIterator type = topic->begin_dirs(),
00342 type_end = topic->end_dirs(); type != type_end; ++type) {
00343 path[2] = type->name();
00344
00345 key_type key(domain_id, path[1].c_str(), path[2].c_str(),
00346 allocator);
00347 sample_list_type * sample_list = 0;
00348 ACE_NEW_MALLOC(sample_list,
00349 static_cast<sample_list_type *>(
00350 allocator->malloc(sizeof(sample_list_type))),
00351 sample_list_type(0, static_cast<data_queue_type *>(0),
00352 allocator));
00353 this->samples_->bind(key, sample_list, allocator);
00354
00355 for (Directory::DirectoryIterator dw = type->begin_dirs(),
00356 dw_end = type->end_dirs(); dw != dw_end; ++dw) {
00357 path[3] = dw->name();
00358
00359 size_t old_len = sample_list->size();
00360 sample_list->size(old_len + 1);
00361 data_queue_type *& slot = (*sample_list)[old_len];
00362
00363
00364
00365 data_queue_type * sample_queue = 0;
00366 ACE_NEW_MALLOC(sample_queue,
00367 static_cast<data_queue_type *>(
00368 allocator->malloc(sizeof(data_queue_type))),
00369 data_queue_type(allocator));
00370
00371 slot = sample_queue;
00372 sample_queue->fs_path_ = path;
00373
00374 for (Directory::FileIterator file = dw->begin_files(),
00375 file_end = dw->end_files(); file != file_end; ++file) {
00376 std::ifstream is;
00377
00378 if (!file->read(is)) {
00379 if (DCPS_debug_level) {
00380 ACE_ERROR((LM_ERROR,
00381 ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
00382 ACE_TEXT("couldn't open file for PERSISTENT ")
00383 ACE_TEXT("data: %C\n"), file->name().c_str()));
00384 }
00385 continue;
00386 }
00387
00388 DDS::Time_t timestamp;
00389 is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
00390 is.get();
00391
00392 const size_t CHUNK = 4096;
00393 ACE_Message_Block mb(CHUNK);
00394 ACE_Message_Block * current = &mb;
00395
00396 while (!is.eof()) {
00397 is.read(current->wr_ptr(), current->space());
00398
00399 if (is.bad()) break;
00400
00401 current->wr_ptr((size_t)is.gcount());
00402
00403 if (current->space() == 0) {
00404 ACE_Message_Block * old = current;
00405 current = new ACE_Message_Block(CHUNK);
00406 old->cont(current);
00407 }
00408 }
00409
00410 sample_queue->enqueue_tail(
00411 sample_data_type(timestamp, mb, allocator));
00412
00413 if (mb.cont()) mb.cont()->release();
00414 }
00415 }
00416 }
00417 }
00418 }
00419 }
00420
00421 this->reactor_ = TheServiceParticipant->timer();
00422 }
00423
00424 OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache()
00425 {
00426
00427 timer_id_list_type::const_iterator const end(
00428 this->cleanup_timer_ids_.end());
00429
00430 for (timer_id_list_type::const_iterator i(
00431 this->cleanup_timer_ids_.begin());
00432 i != end;
00433 ++i) {
00434 (void) this->reactor_->cancel_timer(*i);
00435 }
00436
00437
00438 if (this->allocator_.get() != 0) {
00439 sample_map_type::iterator const map_end = this->samples_->end();
00440
00441 for (sample_map_type::iterator s = this->samples_->begin();
00442 s != map_end;
00443 ++s) {
00444 sample_list_type * const list = (*s).int_id_;
00445
00446 size_t const len = list->size();
00447
00448 for (size_t l = 0; l != len; ++l) {
00449 ACE_DES_FREE((*list)[l],
00450 this->allocator_->free,
00451 DurabilityQueue<sample_data_type>);
00452 }
00453
00454 ACE_DES_FREE(list,
00455 this->allocator_->free,
00456 DurabilityArray<DurabilityQueue<sample_data_type> *>);
00457 }
00458
00459
00460
00461
00462
00463
00464 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
00465 ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE);
00466 #undef OPENDDS_MAP_TYPE
00467 }
00468 }
00469
00470 bool
00471 OpenDDS::DCPS::DataDurabilityCache::insert(
00472 DDS::DomainId_t domain_id,
00473 char const * topic_name,
00474 char const * type_name,
00475 SendStateDataSampleList & the_data,
00476 DDS::DurabilityServiceQosPolicy const & qos)
00477 {
00478 if (the_data.size() == 0)
00479 return true;
00480
00481
00482
00483 int depth = qos.history_kind == DDS::KEEP_ALL_HISTORY_QOS
00484 ? qos.max_samples_per_instance
00485 : qos.history_depth;
00486
00487 if (depth == DDS::LENGTH_UNLIMITED)
00488 depth = 0x7fffffff;
00489
00490
00491 SendStateDataSampleList::iterator element(the_data.begin());
00492
00493 if (depth < 0)
00494 return false;
00495
00496 else if (depth == 0)
00497 return true;
00498
00499 else if (the_data.size() > depth) {
00500
00501
00502
00503
00504
00505
00506
00507
00508
00509
00510
00511 ssize_t const advance_amount = the_data.size() - depth;
00512 std::advance(element, advance_amount);
00513 }
00514
00515
00516
00517
00518
00519 key_type const key(domain_id,
00520 topic_name,
00521 type_name,
00522 this->allocator_.get());
00523 SendStateDataSampleList::iterator the_end(the_data.end());
00524 sample_list_type * sample_list = 0;
00525
00526 typedef DurabilityQueue<sample_data_type> data_queue_type;
00527 data_queue_type ** slot = 0;
00528 data_queue_type * samples = 0;
00529
00530 using OpenDDS::FileSystemStorage::Directory;
00531 using OpenDDS::FileSystemStorage::File;
00532 Directory::Ptr dir;
00533 OPENDDS_VECTOR(OPENDDS_STRING) path;
00534 {
00535 ACE_Allocator * const allocator = this->allocator_.get();
00536
00537 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00538
00539 if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00540 try {
00541 dir = Directory::create(this->data_dir_.c_str());
00542
00543 path.push_back(to_dds_string(domain_id));
00544 path.push_back(topic_name);
00545 path.push_back(type_name);
00546 dir = dir->get_dir(path);
00547
00548
00549
00550
00551 dir = dir->create_next_dir();
00552 path.push_back(dir->name());
00553
00554 } catch (const std::exception& ex) {
00555 if (DCPS_debug_level > 0) {
00556 ACE_ERROR((LM_ERROR,
00557 ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00558 ACE_TEXT("couldn't create directory for PERSISTENT ")
00559 ACE_TEXT("data: %C\n"), ex.what()));
00560 }
00561
00562 dir.reset();
00563 }
00564 }
00565
00566 if (this->samples_->find(key, sample_list, allocator) != 0) {
00567
00568
00569 ACE_NEW_MALLOC_RETURN(
00570 sample_list,
00571 static_cast<sample_list_type *>(
00572 allocator->malloc(sizeof(sample_list_type))),
00573 sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
00574 false);
00575
00576 if (this->samples_->bind(key, sample_list, allocator) != 0)
00577 return false;
00578 }
00579
00580 data_queue_type ** const begin = &((*sample_list)[0]);
00581 data_queue_type ** const end =
00582 begin + sample_list->size();
00583
00584
00585
00586
00587 slot = std::find(begin,
00588 end,
00589 static_cast<data_queue_type *>(0));
00590
00591 if (slot == end) {
00592
00593 size_t const old_len = sample_list->size();
00594 sample_list->size(old_len + 1);
00595
00596 data_queue_type ** new_begin = &((*sample_list)[0]);
00597 slot = new_begin + old_len;
00598 }
00599
00600 ACE_NEW_MALLOC_RETURN(
00601 samples,
00602 static_cast<data_queue_type *>(
00603 allocator->malloc(sizeof(data_queue_type))),
00604 data_queue_type(allocator),
00605 false);
00606
00607
00608 *slot = samples;
00609
00610 if (!dir.is_nil()) {
00611 samples->fs_path_ = path;
00612 }
00613
00614 for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
00615 DataSampleElement& elem = *i;
00616
00617
00618
00619
00620
00621
00622
00623
00624
00625 if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) {
00626 continue;
00627 }
00628
00629 sample_data_type sample(elem, allocator);
00630
00631 if (samples->enqueue_tail(sample) != 0)
00632 return false;
00633
00634 if (!dir.is_nil()) {
00635 try {
00636 File::Ptr f = dir->create_next_file();
00637 std::ofstream os;
00638
00639 if (!f->write(os)) return false;
00640
00641 DDS::Time_t timestamp;
00642 const char * data;
00643 size_t len;
00644 sample.get_sample(data, len, timestamp);
00645
00646 os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
00647 os.write(data, len);
00648
00649 } catch (const std::exception& ex) {
00650 if (DCPS_debug_level > 0) {
00651 ACE_ERROR((LM_ERROR,
00652 ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00653 ACE_TEXT("couldn't write sample for PERSISTENT ")
00654 ACE_TEXT("data: %C\n"), ex.what()));
00655 }
00656 }
00657 }
00658 }
00659 }
00660
00661
00662
00663
00664
00665 ACE_Time_Value const cleanup_delay(
00666 duration_to_time_value(qos.service_cleanup_delay));
00667
00668 if (cleanup_delay > ACE_Time_Value::zero) {
00669 if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00670 ACE_DEBUG((LM_DEBUG,
00671 ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
00672 ACE_TEXT("cleanup for\n")
00673 ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
00674 ACE_TEXT("== (%d, %C, %C)\n"),
00675 domain_id,
00676 topic_name,
00677 type_name));
00678 }
00679
00680 Cleanup_Handler * const cleanup =
00681 new Cleanup_Handler(*sample_list,
00682 slot - &(*sample_list)[0],
00683 this->allocator_.get(),
00684 path,
00685 this->data_dir_);
00686 ACE_Event_Handler_var safe_cleanup(cleanup);
00687 long const tid =
00688 this->reactor_->schedule_timer(cleanup,
00689 0,
00690 cleanup_delay);
00691 if (tid == -1) {
00692 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00693
00694 ACE_DES_FREE(samples,
00695 this->allocator_->free,
00696 DurabilityQueue<sample_data_type>);
00697 *slot = 0;
00698
00699 return false;
00700
00701 } else {
00702 {
00703 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00704 this->cleanup_timer_ids_.push_back(tid);
00705 }
00706
00707 cleanup->timer_id(tid,
00708 &this->cleanup_timer_ids_);
00709 }
00710 }
00711
00712 return true;
00713 }
00714
00715 bool
00716 OpenDDS::DCPS::DataDurabilityCache::get_data(
00717 DDS::DomainId_t domain_id,
00718 char const * topic_name,
00719 char const * type_name,
00720 DataWriterImpl * data_writer,
00721 ACE_Allocator * mb_allocator,
00722 ACE_Allocator * db_allocator,
00723 DDS::LifespanQosPolicy const & )
00724 {
00725 key_type const key(domain_id,
00726 topic_name,
00727 type_name,
00728 this->allocator_.get());
00729
00730 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00731
00732 sample_list_type * p_sample_list = 0;
00733
00734 if (this->samples_->find(key,
00735 p_sample_list,
00736 this->allocator_.get()) == -1)
00737 return true;
00738
00739 else if (p_sample_list == 0)
00740 return false;
00741
00742 sample_list_type & sample_list = *p_sample_list;
00743
00744
00745
00746
00747 sample_data_type * registration_data = 0;
00748
00749 if (sample_list[0]->get(registration_data, 0) == -1)
00750 return false;
00751
00752 char const * marshaled_sample = 0;
00753 size_t marshaled_sample_length = 0;
00754 DDS::Time_t registration_timestamp;
00755
00756 registration_data->get_sample(marshaled_sample,
00757 marshaled_sample_length,
00758 registration_timestamp);
00759
00760
00761
00762 Message_Block_Ptr registration_sample(
00763 new ACE_Message_Block(marshaled_sample_length,
00764 ACE_Message_Block::MB_DATA,
00765 0,
00766 0,
00767 0,
00768 data_writer->get_db_lock()));
00769
00770 ACE_OS::memcpy(registration_sample->wr_ptr(),
00771 marshaled_sample,
00772 marshaled_sample_length);
00773
00774 registration_sample->wr_ptr(marshaled_sample_length);
00775
00776 DDS::InstanceHandle_t handle = DDS::HANDLE_NIL;
00777
00778
00779
00780
00781
00782
00783 DDS::ReturnCode_t ret =
00784 data_writer->register_instance_from_durable_data(handle,
00785 move(registration_sample),
00786 registration_timestamp);
00787
00788 if (ret != DDS::RETCODE_OK)
00789 return false;
00790
00791 typedef DurabilityQueue<sample_data_type> data_queue_type;
00792 size_t const len = sample_list.size();
00793
00794 for (size_t i = 0; i != len; ++i) {
00795 data_queue_type * const q = sample_list[i];
00796
00797 for (data_queue_type::ITERATOR j = q->begin();
00798 !j.done();
00799 j.advance()) {
00800 sample_data_type * data = 0;
00801
00802 if (j.next(data) == 0)
00803 return false;
00804
00805 char const * sample = 0;
00806 size_t sample_length = 0;
00807 DDS::Time_t source_timestamp;
00808
00809 data->get_sample(sample, sample_length, source_timestamp);
00810
00811 ACE_Message_Block * tmp_mb = 0;
00812 ACE_NEW_MALLOC_RETURN(tmp_mb,
00813 static_cast<ACE_Message_Block*>(
00814 mb_allocator->malloc(
00815 sizeof(ACE_Message_Block))),
00816 ACE_Message_Block(
00817 sample_length,
00818 ACE_Message_Block::MB_DATA,
00819 0,
00820 0,
00821 0,
00822 data_writer->get_db_lock(),
00823 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00824 ACE_Time_Value::zero,
00825 ACE_Time_Value::max_time,
00826 db_allocator,
00827 mb_allocator),
00828 false);
00829 Message_Block_Ptr mb(tmp_mb);
00830
00831 ACE_OS::memcpy(mb->wr_ptr(),
00832 sample,
00833 sample_length);
00834 mb->wr_ptr(sample_length);
00835
00836 const DDS::ReturnCode_t ret = data_writer->write(move(mb), handle,
00837 source_timestamp, 0 );
00838
00839 if (ret != DDS::RETCODE_OK) {
00840 return false;
00841 }
00842 }
00843
00844
00845
00846
00847
00848
00849
00850 q->reset();
00851
00852 try {
00853 cleanup_directory(q->fs_path_, this->data_dir_);
00854
00855 } catch (const std::exception& ex) {
00856 if (DCPS_debug_level > 0) {
00857 ACE_ERROR((LM_ERROR,
00858 ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
00859 ACE_TEXT("couldn't remove directory for PERSISTENT ")
00860 ACE_TEXT("data: %C\n"), ex.what()));
00861 }
00862 }
00863 }
00864 return true;
00865 }
00866
00867 #endif // OPENDDS_NO_PERSISTENCE_PROFILE