OpenDDS  Snapshot(2023/04/28-20:55)
DataDurabilityCache.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
11 
12 #include "dds/DdsDcpsDomainC.h"
13 #include "dds/DdsDcpsTypeSupportExtC.h"
14 #include "DataDurabilityCache.h"
16 #include "DataSampleElement.h"
17 #include "WriteDataContainer.h"
18 #include "DataWriterImpl.h"
19 #include "Time_Helper.h"
20 #include "debug.h"
21 #include "SafetyProfileStreams.h"
22 #include "Service_Participant.h"
23 #include "RcEventHandler.h"
24 
25 #include "ace/Reactor.h"
26 #include "ace/Message_Block.h"
27 #include "ace/Log_Msg.h"
28 #include "ace/Malloc_T.h"
29 #include "ace/MMAP_Memory_Pool.h"
30 #include "ace/OS_NS_sys_time.h"
31 
32 #include <fstream>
33 #include <algorithm>
34 
35 namespace {
36 
37 void cleanup_directory(const OPENDDS_VECTOR(OPENDDS_STRING) & path,
38  const ACE_CString & data_dir)
39 {
40  if (path.empty()) return;
41 
43  Directory::Ptr dir = Directory::create(data_dir.c_str());
44  dir = dir->get_dir(path);
45  Directory::Ptr parent = dir->parent();
46  dir->remove();
47 
48  // clean up empty directories
49  while (!parent.is_nil() &&
50  (parent->begin_dirs() == parent->end_dirs())) {
51  Directory::Ptr to_delete = parent;
52  parent = parent->parent();
53  to_delete->remove();
54  }
55 }
56 
57 /**
58  * @class Cleanup_Handler
59  *
60  * @brief Event handler that is called when @c service_cleanup_delay
61  * period expires.
62  */
63 class Cleanup_Handler : public virtual OpenDDS::DCPS::RcEventHandler {
64 public:
65 
66  typedef
68  typedef
70  typedef ptrdiff_t list_difference_type;
71 
72  Cleanup_Handler(list_type & sample_list,
73  list_difference_type index,
74  ACE_Allocator * allocator,
75  const OPENDDS_VECTOR(OPENDDS_STRING) & path,
76  const ACE_CString & data_dir)
77  : sample_list_(sample_list)
78  , index_(index)
79  , allocator_(allocator)
80  , tid_(-1)
81  , timer_ids_(0)
82  , path_(path)
83  , data_dir_(data_dir)
84  {
85  }
86 
87  virtual int handle_timeout(const ACE_Time_Value& /* current_time */,
88  const void* /* act */)
89  {
91 
93  ACE_DEBUG((LM_DEBUG,
94  ACE_TEXT("(%P|%t) OpenDDS - Cleaning up ")
95  ACE_TEXT("data durability cache.\n")));
96  }
97 
100  data_queue_type;
101 
102  // Cleanup all data samples corresponding to the cleanup delay.
103  data_queue_type *& queue = this->sample_list_[this->index_];
104  ACE_DES_FREE(queue,
105  this->allocator_->free,
106  data_queue_type);
107  queue = 0;
108 
109  try {
110  cleanup_directory(path_, this->data_dir_);
111 
112  } catch (const std::exception& ex) {
114  ACE_ERROR((LM_ERROR,
115  ACE_TEXT("(%P|%t) Cleanup_Handler::handle_timout ")
116  ACE_TEXT("couldn't remove directory for PERSISTENT ")
117  ACE_TEXT("data: %C\n"), ex.what()));
118  }
119  }
120 
121  // No longer any need to keep track of the timer ID.
122  this->timer_ids_->remove(this->tid_);
123 
124  return 0;
125  }
126 
127  void timer_id(
128  long tid,
129  OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type * timer_ids) {
130  this->tid_ = tid;
131  this->timer_ids_ = timer_ids;
132  }
133 
134 protected:
135 
136  virtual ~Cleanup_Handler() {}
137 
138 private:
139 
140  /// List containing samples to be cleaned up when the cleanup timer
141  /// expires.
142  list_type & sample_list_;
143 
144  /// Location in list/array of queue to be deallocated.
145  list_difference_type const index_;
146 
147  /// Allocator to be used when deallocating data queue.
148  ACE_Allocator * const allocator_;
149 
150  /// Timer ID corresponding to this cleanup event handler.
151  long tid_;
152 
153  /// List of timer IDs.
154  /**
155  * If the cleanup timer fires successfully, the timer ID must be
156  * removed from the timer ID list so that a subsequent attempt to
157  * cancel the timer during durability cache destruction does not
158  * occur.
159  */
160  OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type *
161  timer_ids_;
162 
164 
165  ACE_CString data_dir_;
166 };
167 
168 } // namespace
169 
171  : length_(0)
172  , sample_(0)
173  , allocator_(0)
174 {
175  this->source_timestamp_.sec = 0;
176  this->source_timestamp_.nanosec = 0;
177 }
178 
180  DataSampleElement & element,
181  ACE_Allocator * a)
182  : length_(0)
183  , sample_(0)
184  , allocator_(a)
185 {
188 
189  // Only copy the data provided by the user. The DataSampleHeader
190  // will be reconstructed when the durable data is retrieved by a
191  // DataWriterImpl instance.
192  //
193  // The user's data is stored in the first message block
194  // continuation.
195  ACE_Message_Block const * const data = element.get_sample()->cont();
196  init(data);
197 }
198 
200  DDS::Time_t timestamp, const ACE_Message_Block & mb, ACE_Allocator * a)
201  : length_(0)
202  , sample_(0)
203  , source_timestamp_(timestamp)
204  , allocator_(a)
205 {
206  init(&mb);
207 }
208 
209 void
211 (const ACE_Message_Block * data)
212 {
213  this->length_ = data->total_length();
214 
215  ACE_ALLOCATOR(this->sample_,
216  static_cast<char *>(
217  this->allocator_->malloc(this->length_)));
218 
219  char * buf = this->sample_;
220 
221  for (ACE_Message_Block const * i = data;
222  i != 0;
223  i = i->cont()) {
224  ACE_OS::memcpy(buf, i->rd_ptr(), i->length());
225  buf += i->length();
226  }
227 }
228 
230  sample_data_type const & rhs)
231  : length_(rhs.length_)
232  , sample_(0)
233  , allocator_(rhs.allocator_)
234 {
237 
238  if (this->allocator_) {
239  ACE_ALLOCATOR(this->sample_,
240  static_cast<char *>(
241  this->allocator_->malloc(rhs.length_)));
242  ACE_OS::memcpy(this->sample_, rhs.sample_, rhs.length_);
243  }
244 }
245 
247 {
248  if (this->allocator_)
249  this->allocator_->free(this->sample_);
250 }
251 
254  sample_data_type const & rhs)
255 {
256  // Strongly exception-safe copy assignment.
257  sample_data_type tmp(rhs);
258  std::swap(this->length_, tmp.length_);
259  std::swap(this->sample_, tmp.sample_);
260  std::swap(this->allocator_, tmp.allocator_);
261 
264 
265  return *this;
266 }
267 
268 void
270  char const *& s,
271  size_t & len,
272  DDS::Time_t & source_timestamp)
273 {
274  s = this->sample_;
275  len = this->length_;
276  source_timestamp.sec = this->source_timestamp_.sec;
277  source_timestamp.nanosec = this->source_timestamp_.nanosec;
278 }
279 
280 void
282  ACE_Allocator * allocator)
283 {
284  this->allocator_ = allocator;
285 }
286 
290  , kind_(kind)
291  , samples_(0)
293  , lock_()
294  , reactor_(0)
295 {
296  init();
297 }
298 
301  ACE_CString & data_dir)
303  , kind_(kind)
304  , data_dir_(data_dir)
305  , samples_(0)
307  , lock_()
308  , reactor_(0)
309 {
310  init();
311 }
312 
314 {
315  ACE_Allocator * const allocator = this->allocator_.get();
317  this->samples_,
318  static_cast<sample_map_type *>(
319  allocator->malloc(sizeof(sample_map_type))),
320  sample_map_type(allocator));
321 
322  typedef DurabilityQueue<sample_data_type> data_queue_type;
323 
324  if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
325  // Read data from the filesystem and create the in-memory data structures
326  // as if we had called insert() once for each "datawriter" directory.
329  Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
330  OPENDDS_VECTOR(OPENDDS_STRING) path(4); // domain, topic, type, datawriter
331 
332  for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
333  domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
334  path[0] = domain->name();
335  DDS::DomainId_t domain_id;
336  {
337  domain_id = ACE_OS::atoi(path[0].c_str());
338  }
339 
340  for (Directory::DirectoryIterator topic = domain->begin_dirs(),
341  topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
342  path[1] = topic->name();
343 
344  for (Directory::DirectoryIterator type = topic->begin_dirs(),
345  type_end = topic->end_dirs(); type != type_end; ++type) {
346  path[2] = type->name();
347 
348  key_type key(domain_id, path[1].c_str(), path[2].c_str(),
349  allocator);
350  sample_list_type * sample_list = 0;
351  ACE_NEW_MALLOC(sample_list,
352  static_cast<sample_list_type *>(
353  allocator->malloc(sizeof(sample_list_type))),
354  sample_list_type(0, static_cast<data_queue_type *>(0),
355  allocator));
356  this->samples_->bind(key, sample_list, allocator);
357 
358  for (Directory::DirectoryIterator dw = type->begin_dirs(),
359  dw_end = type->end_dirs(); dw != dw_end; ++dw) {
360  path[3] = dw->name();
361 
362  size_t old_len = sample_list->size();
363  sample_list->size(old_len + 1);
364  data_queue_type *& slot = (*sample_list)[old_len];
365 
366  // This variable is called "samples" in the insert() method be
367  // we already have a "samples_" which is the overall data structure.
368  data_queue_type * sample_queue = 0;
369  ACE_NEW_MALLOC(sample_queue,
370  static_cast<data_queue_type *>(
371  allocator->malloc(sizeof(data_queue_type))),
372  data_queue_type(allocator));
373 
374  slot = sample_queue;
375  sample_queue->fs_path_ = path;
376 
377  for (Directory::FileIterator file = dw->begin_files(),
378  file_end = dw->end_files(); file != file_end; ++file) {
379  std::ifstream is;
380 
381  if (!file->read(is)) {
382  if (DCPS_debug_level) {
384  ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
385  ACE_TEXT("couldn't open file for PERSISTENT ")
386  ACE_TEXT("data: %C\n"), file->name().c_str()));
387  }
388  continue;
389  }
390 
392  is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
393  is.get(); // consume separator
394 
395  const size_t CHUNK = 4096;
396  ACE_Message_Block mb(CHUNK);
397  ACE_Message_Block * current = &mb;
398 
399  while (!is.eof()) {
400  is.read(current->wr_ptr(), current->space());
401 
402  if (is.bad()) break;
403 
404  current->wr_ptr((size_t)is.gcount());
405 
406  if (current->space() == 0) {
407  ACE_Message_Block * old = current;
408  current = new ACE_Message_Block(CHUNK);
409  old->cont(current);
410  }
411  }
412 
413  sample_queue->enqueue_tail(
414  sample_data_type(timestamp, mb, allocator));
415 
416  if (mb.cont()) mb.cont()->release(); // delete the cont() chain
417  }
418  }
419  }
420  }
421  }
422  }
423 
424  this->reactor_ = TheServiceParticipant->timer();
425 }
426 
428 {
429  // Cancel timers that haven't expired yet.
430  timer_id_list_type::const_iterator const end(
431  this->cleanup_timer_ids_.end());
432 
433  for (timer_id_list_type::const_iterator i(
434  this->cleanup_timer_ids_.begin());
435  i != end;
436  ++i) {
437  (void) this->reactor_->cancel_timer(*i);
438  }
439 
440  // Clean up memory that isn't automatically managed.
441  if (this->allocator_.get() != 0) {
442  sample_map_type::iterator const map_end = this->samples_->end();
443 
444  for (sample_map_type::iterator s = this->samples_->begin();
445  s != map_end;
446  ++s) {
447  sample_list_type * const list = (*s).int_id_;
448 
449  size_t const len = list->size();
450 
451  for (size_t l = 0; l != len; ++l) {
452  ACE_DES_FREE((*list)[l],
453  this->allocator_->free,
455  }
456 
457  ACE_DES_FREE(list,
458  this->allocator_->free,
460  }
461 
462  // Yes, this looks strange but please leave it in place. The third param
463  // to ACE_DES_FREE must be the actual class name since it's used in an
464  // explicit desturctor call (~T()). Typedefs are not allowed here. This
465  // is why the two ACE_DES_FREE's above are not the typedefs. Below we use
466  // a macro to hide the internal comma from ACE_DES_FREE's macro expansion.
467 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
469 #undef OPENDDS_MAP_TYPE
470  }
471 }
472 
473 bool
475  DDS::DomainId_t domain_id,
476  char const * topic_name,
477  char const * type_name,
478  SendStateDataSampleList & the_data,
480 {
481  if (the_data.size() == 0)
482  return true; // Nothing to cache.
483 
484  // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related
485  // settings prior to data insertion into the cache.
486  int depth = qos.history_kind == DDS::KEEP_ALL_HISTORY_QOS
488  : qos.history_depth;
489 
490  if (depth == DDS::LENGTH_UNLIMITED)
491  depth = 0x7fffffff;
492 
493  // Iterator to first DataSampleElement to be copied.
494  SendStateDataSampleList::iterator element(the_data.begin());
495 
496  if (depth < 0)
497  return false; // Should never occur.
498 
499  else if (depth == 0)
500  return true; // Nothing else to do. Discard all data.
501 
502  else if (the_data.size() > depth) {
503  // N.B. Dropping data samples does not take into account
504  // those samples which are not actually persisted (i.e.
505  // samples with the coherent_sample_ flag set). The spec
506  // does not provide any guidance in this case, therefore
507  // we opt for the simplest solution and assume that there
508  // are no change sets when calculating the number of
509  // samples to drop.
510 
511  // Drop "old" samples. Only keep the "depth" most recent
512  // samples, i.e. those found at the tail end of the
513  // SendStateDataSampleList.
514  ssize_t const advance_amount = the_data.size() - depth;
515  std::advance(element, advance_amount);
516  }
517 
518  // -----------
519 
520  // Copy samples to the domain/topic/type-specific cache.
521 
522  key_type const key(domain_id,
523  topic_name,
524  type_name,
525  this->allocator_.get());
526  SendStateDataSampleList::iterator the_end(the_data.end());
527  sample_list_type * sample_list = 0;
528 
529  typedef DurabilityQueue<sample_data_type> data_queue_type;
530  data_queue_type ** slot = 0;
531  data_queue_type * samples = 0; // sample_list_type::value_type
532 
535  Directory::Ptr dir;
537  {
538  ACE_Allocator * const allocator = this->allocator_.get();
539 
540  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
541 
542  if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
543  try {
544  dir = Directory::create(this->data_dir_.c_str());
545 
546  path.push_back(to_dds_string(domain_id));
547  path.push_back(topic_name);
548  path.push_back(type_name);
549  dir = dir->get_dir(path);
550  // dir is now the "type" directory, which is shared by all datawriters
551  // of the domain/topic/type. We actually need a new directory per
552  // datawriter and this assumes that insert() is called once per
553  // datawriter, as is currently the case.
554  dir = dir->create_next_dir();
555  path.push_back(dir->name()); // for use by the Cleanup_Handler
556 
557  } catch (const std::exception& ex) {
558  if (DCPS_debug_level > 0) {
560  ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
561  ACE_TEXT("couldn't create directory for PERSISTENT ")
562  ACE_TEXT("data: %C\n"), ex.what()));
563  }
564 
565  dir.reset();
566  }
567  }
568 
569  if (this->samples_->find(key, sample_list, allocator) != 0) {
570  // Create a new list (actually an ACE_Array_Base<>) with the
571  // appropriate allocator passed to its constructor.
573  sample_list,
574  static_cast<sample_list_type *>(
575  allocator->malloc(sizeof(sample_list_type))),
576  sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
577  false);
578 
579  if (this->samples_->bind(key, sample_list, allocator) != 0)
580  return false;
581  }
582 
583  data_queue_type ** const begin = &((*sample_list)[0]);
584  data_queue_type ** const end =
585  begin + sample_list->size();
586 
587  // Find an empty slot in the array. This is a linear search but
588  // that should be fine for the common case, i.e. a small number of
589  // DataWriters that push data into the cache.
590  slot = std::find(begin,
591  end,
592  static_cast<data_queue_type *>(0));
593 
594  if (slot == end) {
595  // No available slots. Grow the array accordingly.
596  size_t const old_len = sample_list->size();
597  sample_list->size(old_len + 1);
598 
599  data_queue_type ** new_begin = &((*sample_list)[0]);
600  slot = new_begin + old_len;
601  }
602 
604  samples,
605  static_cast<data_queue_type *>(
606  allocator->malloc(sizeof(data_queue_type))),
607  data_queue_type(allocator),
608  false);
609 
610  // Insert the samples in to the sample list.
611  *slot = samples;
612 
613  if (!dir.is_nil()) {
614  samples->fs_path_ = path;
615  }
616 
617  for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
618  DataSampleElement& elem = *i;
619 
620  // N.B. Do not persist samples with coherent changes.
621  // To verify, we check the DataSampleHeader for the
622  // coherent_change_ flag. The DataSampleHeader will
623  // always be the first message block in the chain.
624  //
625  // It should be noted that persisting coherent changes
626  // is a non-trivial task, and should be handled when
627  // finalizing persistence profile conformance.
629  continue; // skip coherent sample
630  }
631 
632  sample_data_type sample(elem, allocator);
633 
634  if (samples->enqueue_tail(sample) != 0)
635  return false;
636 
637  if (!dir.is_nil()) {
638  try {
639  File::Ptr f = dir->create_next_file();
640  std::ofstream os;
641 
642  if (!f->write(os)) return false;
643 
645  const char * data;
646  size_t len;
647  sample.get_sample(data, len, timestamp);
648 
649  os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
650  os.write(data, len);
651 
652  } catch (const std::exception& ex) {
653  if (DCPS_debug_level > 0) {
655  ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
656  ACE_TEXT("couldn't write sample for PERSISTENT ")
657  ACE_TEXT("data: %C\n"), ex.what()));
658  }
659  }
660  }
661  }
662  }
663 
664  // -----------
665 
666  // Schedule cleanup timer.
667  //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent)
668  const TimeDuration cleanup_delay(qos.service_cleanup_delay);
669 
670  if (!cleanup_delay.is_zero()) {
673  ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
674  ACE_TEXT("cleanup for\n")
675  ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
676  ACE_TEXT("== (%d, %C, %C)\n"),
677  domain_id,
678  topic_name,
679  type_name));
680  }
681 
682  Cleanup_Handler * const cleanup =
683  new Cleanup_Handler(*sample_list,
684  slot - &(*sample_list)[0],
685  this->allocator_.get(),
686  path,
687  this->data_dir_);
688  ACE_Event_Handler_var safe_cleanup(cleanup); // Transfer ownership
689  long const tid =
690  this->reactor_->schedule_timer(cleanup,
691  0, // ACT
692  cleanup_delay.value());
693  if (tid == -1) {
694  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
695 
696  ACE_DES_FREE(samples,
697  this->allocator_->free,
699  *slot = 0;
700 
701  return false;
702 
703  } else {
704  {
705  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
706  this->cleanup_timer_ids_.push_back(tid);
707  }
708 
709  cleanup->timer_id(tid,
710  &this->cleanup_timer_ids_);
711  }
712  }
713 
714  return true;
715 }
716 
717 bool
719  DDS::DomainId_t domain_id,
720  char const * topic_name,
721  char const * type_name,
722  DataWriterImpl * data_writer,
723  ACE_Allocator * mb_allocator,
724  ACE_Allocator * db_allocator,
725  DDS::LifespanQosPolicy const & /* lifespan */)
726 {
727  key_type const key(domain_id,
728  topic_name,
729  type_name,
730  this->allocator_.get());
731 
732  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
733 
734  sample_list_type * p_sample_list = 0;
735 
736  if (this->samples_->find(key,
737  p_sample_list,
738  this->allocator_.get()) == -1)
739  return true; // No durable data for this domain/topic/type.
740 
741  else if (p_sample_list == 0)
742  return false; // Should never happen.
743 
744  sample_list_type & sample_list = *p_sample_list;
745 
746  // We will register an instance, and then write all of the cached
747  // data to the DataWriter using that instance.
748 
749  sample_data_type * registration_data = 0;
750 
751  if (sample_list[0]->get(registration_data, 0) == -1)
752  return false;
753 
754  char const * marshaled_sample = 0;
755  size_t marshaled_sample_length = 0;
756  DDS::Time_t registration_timestamp;
757 
758  registration_data->get_sample(marshaled_sample,
759  marshaled_sample_length,
760  registration_timestamp);
761 
762  // Don't use the cached allocator for the registered sample message
763  // block.
764  Message_Block_Ptr registration_sample(
765  new ACE_Message_Block(marshaled_sample_length,
767  0, //cont
768  0, //data
769  0, //alloc_strategy
770  data_writer->get_db_lock()));
771 
772  ACE_OS::memcpy(registration_sample->wr_ptr(),
773  marshaled_sample,
774  marshaled_sample_length);
775 
776  registration_sample->wr_ptr(marshaled_sample_length);
777 
779 
780  /**
781  * @todo Is this going to cause problems for users that set a finite
782  * DDS::ResourceLimitsQosPolicy::max_instances value when
783  * OpenDDS supports that value?
784  */
785  DDS::ReturnCode_t ret =
786  data_writer->register_instance_from_durable_data(handle,
787  move(registration_sample),
788  registration_timestamp);
789 
790  if (ret != DDS::RETCODE_OK)
791  return false;
792 
793  typedef DurabilityQueue<sample_data_type> data_queue_type;
794  size_t const len = sample_list.size();
795 
796  for (size_t i = 0; i != len; ++i) {
797  data_queue_type * const q = sample_list[i];
798 
799  for (data_queue_type::ITERATOR j = q->begin();
800  !j.done();
801  j.advance()) {
802  sample_data_type * data = 0;
803 
804  if (j.next(data) == 0)
805  return false; // Should never happen.
806 
807  char const * sample = 0; // Sample does not include header.
808  size_t sample_length = 0;
809  DDS::Time_t source_timestamp;
810 
811  data->get_sample(sample, sample_length, source_timestamp);
812 
813  ACE_Message_Block * tmp_mb = 0;
814  ACE_NEW_MALLOC_RETURN(tmp_mb,
815  static_cast<ACE_Message_Block*>(
816  mb_allocator->malloc(
817  sizeof(ACE_Message_Block))),
819  sample_length,
821  0, // cont
822  0, // data
823  0, // allocator_strategy
824  data_writer->get_db_lock(), // data block locking_strategy
828  db_allocator,
829  mb_allocator),
830  false);
831  Message_Block_Ptr mb(tmp_mb);
832 
833  ACE_OS::memcpy(mb->wr_ptr(),
834  sample,
835  sample_length);
836  mb->wr_ptr(sample_length);
837 
838  const DDS::ReturnCode_t ret = data_writer->write(move(mb),
839  handle,
840  source_timestamp,
841  0 /* no content filtering */,
842  0 /* no pointer to data */);
843 
844  if (ret != DDS::RETCODE_OK) {
845  return false;
846  }
847  }
848 
849  // Data successfully written. Empty the queue/list.
850  /**
851  * @todo If we don't empty the queue, we'll end up with duplicate
852  * data since the data retrieved from the cache will be
853  * reinserted.
854  */
855  q->reset();
856 
857  try {
858  cleanup_directory(q->fs_path_, this->data_dir_);
859 
860  } catch (const std::exception& ex) {
861  if (DCPS_debug_level > 0) {
863  ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
864  ACE_TEXT("couldn't remove directory for PERSISTENT ")
865  ACE_TEXT("data: %C\n"), ex.what()));
866  }
867  }
868  }
869  return true;
870 }
871 
872 #endif // OPENDDS_NO_PERSISTENCE_PROFILE
void swap(MessageBlock &lhs, MessageBlock &rhs)
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
const char * c_str(void) const
DurabilityArray< DurabilityQueue< sample_data_type > * > sample_list_type
bool insert(DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos)
#define ACE_SYNCH_MUTEX
const DataSampleHeader & get_header() const
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
iterator end()
Return iterator to end of list.
DDS::DurabilityQosPolicyKind kind_
sequence< octet > key
String to_dds_string(unsigned short to_convert)
void * memcpy(void *t, const void *s, size_t len)
ACE_SYNCH_MUTEX lock_
Lock for synchronized access to the underlying map.
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
virtual void free(void *ptr)=0
DataBlockLockPool::DataBlockLock * get_db_lock()
int bind(const EXT_ID &, const INT_ID &, ACE_Allocator *alloc)
#define OPENDDS_MAP_TYPE
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
const ACE_Time_Value & value() const
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
int ssize_t
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
#define OPENDDS_STRING
Queue class that provides a means to reset the underlying ACE_Allocator.
DOMAINID_TYPE_NATIVE DomainId_t
bool get_data(DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataWriterImpl *data_writer, ACE_Allocator *mb_allocator, ACE_Allocator *db_allocator, DDS::LifespanQosPolicy const &)
LM_DEBUG
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
sample_map_type * samples_
Map of all data samples.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Array class that provides a means to reset the underlying ACE_Allocator.
virtual ACE_Message_Block * release(void)
size_type size(void) const
ACE_Message_Block * cont(void) const
iterator begin()
Return iterator to beginning of list.
ACE_Allocator * allocator_
ACE_Hash_Map_With_Allocator< key_type, sample_list_type * > sample_map_type
DurabilityQosPolicyKind
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
size_t total_length(void) const
char * wr_ptr(void) const
timer_id_list_type cleanup_timer_ids_
Timer ID list.
DDS::ReturnCode_t write(Message_Block_Ptr sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out, const void *real_data)
#define ACE_ALLOCATOR(POINTER, ALLOCATOR)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
size_t space(void) const
Directory::Ptr get_dir(const OPENDDS_VECTOR(OPENDDS_STRING)&path)
unsigned long nanosec
#define OPENDDS_VECTOR(T)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
SendStateDataSampleList STL-style iterator implementation.
static const ACE_Time_Value zero
void get_sample(char const *&s, size_t &len, DDS::Time_t &source_timestamp)
DDS::ReturnCode_t register_instance_from_durable_data(DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
sample_data_type & operator=(sample_data_type const &rhs)
const ReturnCode_t RETCODE_OK
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
const long LENGTH_UNLIMITED
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
ACE_Reactor_Timer_Interface * reactor_
Reactor with which cleanup timers will be registered.
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
virtual int handle_timeout(const ACE_Time_Value &current_time, const void *act=0)
#define TheServiceParticipant
DataDurabilityCache(DDS::DurabilityQosPolicyKind kind)
LM_ERROR
HistoryQosPolicyKind history_kind
virtual void * malloc(size_type nbytes)=0
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)