OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Types | Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::DataDurabilityCache Class Reference

Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations.. More...

#include <DataDurabilityCache.h>

Collaboration diagram for OpenDDS::DCPS::DataDurabilityCache:
Collaboration graph
[legend]

Classes

class  key_type
 Key type for underlying maps. More...
 
class  sample_data_type
 Sample list data type for all samples. More...
 

Public Types

typedef DurabilityArray< DurabilityQueue< sample_data_type > * > sample_list_type
 
typedef ACE_Hash_Map_With_Allocator< key_type, sample_list_type * > sample_map_type
 

Public Member Functions

typedef OPENDDS_LIST (long) timer_id_list_type
 
 DataDurabilityCache (DDS::DurabilityQosPolicyKind kind)
 
 DataDurabilityCache (DDS::DurabilityQosPolicyKind kind, ACE_CString &data_dir)
 
 ~DataDurabilityCache ()
 
bool insert (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos)
 
bool get_data (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataWriterImpl *data_writer, ACE_Allocator *mb_allocator, ACE_Allocator *db_allocator, DDS::LifespanQosPolicy const &)
 

Private Member Functions

 DataDurabilityCache (DataDurabilityCache const &)
 
DataDurabilityCacheoperator= (DataDurabilityCache const &)
 
void init ()
 

Private Attributes

unique_ptr< ACE_Allocator > const allocator_
 Allocator used to allocate memory for sample map and lists. More...
 
DDS::DurabilityQosPolicyKind kind_
 
ACE_CString data_dir_
 
sample_map_typesamples_
 Map of all data samples. More...
 
timer_id_list_type cleanup_timer_ids_
 Timer ID list. More...
 
ACE_SYNCH_MUTEX lock_
 Lock for synchronized access to the underlying map. More...
 
ACE_Reactor_Timer_Interfacereactor_
 Reactor with which cleanup timers will be registered. More...
 

Detailed Description

Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..

This class implements a cache that outlives DataWriters.

Definition at line 69 of file DataDurabilityCache.h.

Member Typedef Documentation

◆ sample_list_type

Definition at line 184 of file DataDurabilityCache.h.

◆ sample_map_type

Definition at line 187 of file DataDurabilityCache.h.

Constructor & Destructor Documentation

◆ DataDurabilityCache() [1/3]

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DDS::DurabilityQosPolicyKind  kind)

Definition at line 287 of file DataDurabilityCache.cpp.

References init().

290  , kind_(kind)
291  , samples_(0)
293  , lock_()
294  , reactor_(0)
295 {
296  init();
297 }
DDS::DurabilityQosPolicyKind kind_
ACE_Reactor_Timer_Interface * reactor_
Reactor with which cleanup timers will be registered.
timer_id_list_type cleanup_timer_ids_
Timer ID list.
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
sample_map_type * samples_
Map of all data samples.
ACE_SYNCH_MUTEX lock_
Lock for synchronized access to the underlying map.

◆ DataDurabilityCache() [2/3]

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DDS::DurabilityQosPolicyKind  kind,
ACE_CString data_dir 
)

Definition at line 299 of file DataDurabilityCache.cpp.

References init().

303  , kind_(kind)
304  , data_dir_(data_dir)
305  , samples_(0)
307  , lock_()
308  , reactor_(0)
309 {
310  init();
311 }
DDS::DurabilityQosPolicyKind kind_
ACE_Reactor_Timer_Interface * reactor_
Reactor with which cleanup timers will be registered.
timer_id_list_type cleanup_timer_ids_
Timer ID list.
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
sample_map_type * samples_
Map of all data samples.
ACE_SYNCH_MUTEX lock_
Lock for synchronized access to the underlying map.

◆ ~DataDurabilityCache()

OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache ( )

Definition at line 427 of file DataDurabilityCache.cpp.

References ACE_DES_FREE, allocator_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_Null_Mutex >::begin(), ACE_Reactor_Timer_Interface::cancel_timer(), cleanup_timer_ids_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_Null_Mutex >::end(), ACE_Allocator::free(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OPENDDS_MAP_TYPE, reactor_, samples_, and ACE_Array_Base< T >::size().

428 {
429  // Cancel timers that haven't expired yet.
430  timer_id_list_type::const_iterator const end(
431  this->cleanup_timer_ids_.end());
432 
433  for (timer_id_list_type::const_iterator i(
434  this->cleanup_timer_ids_.begin());
435  i != end;
436  ++i) {
437  (void) this->reactor_->cancel_timer(*i);
438  }
439 
440  // Clean up memory that isn't automatically managed.
441  if (this->allocator_.get() != 0) {
442  sample_map_type::iterator const map_end = this->samples_->end();
443 
444  for (sample_map_type::iterator s = this->samples_->begin();
445  s != map_end;
446  ++s) {
447  sample_list_type * const list = (*s).int_id_;
448 
449  size_t const len = list->size();
450 
451  for (size_t l = 0; l != len; ++l) {
452  ACE_DES_FREE((*list)[l],
453  this->allocator_->free,
454  DurabilityQueue<sample_data_type>);
455  }
456 
457  ACE_DES_FREE(list,
458  this->allocator_->free,
459  DurabilityArray<DurabilityQueue<sample_data_type> *>);
460  }
461 
462  // Yes, this looks strange but please leave it in place. The third param
463  // to ACE_DES_FREE must be the actual class name since it's used in an
464  // explicit desturctor call (~T()). Typedefs are not allowed here. This
465  // is why the two ACE_DES_FREE's above are not the typedefs. Below we use
466  // a macro to hide the internal comma from ACE_DES_FREE's macro expansion.
467 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
469 #undef OPENDDS_MAP_TYPE
470  }
471 }
ACE_Reactor_Timer_Interface * reactor_
Reactor with which cleanup timers will be registered.
virtual void free(void *ptr)=0
#define OPENDDS_MAP_TYPE
ACE_Hash_Map_Iterator_Ex< EXT_ID, INT_ID, ACE_Hash< EXT_ID >, ACE_Equal_To< EXT_ID >, ACE_Null_Mutex > iterator
timer_id_list_type cleanup_timer_ids_
Timer ID list.
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
DurabilityArray< DurabilityQueue< sample_data_type > * > sample_list_type
sample_map_type * samples_
Map of all data samples.
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)

◆ DataDurabilityCache() [3/3]

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DataDurabilityCache const &  )
private

Member Function Documentation

◆ get_data()

bool OpenDDS::DCPS::DataDurabilityCache::get_data ( DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
DataWriterImpl data_writer,
ACE_Allocator mb_allocator,
ACE_Allocator db_allocator,
DDS::LifespanQosPolicy const &   
)

Write cached data corresponding to given domain, topic and type to DataWriter.

Todo:
Is this going to cause problems for users that set a finite DDS::ResourceLimitsQosPolicy::max_instances value when OpenDDS supports that value?
Todo:
If we don't empty the queue, we'll end up with duplicate data since the data retrieved from the cache will be reinserted.

Definition at line 718 of file DataDurabilityCache.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_MALLOC_RETURN, ACE_SYNCH_MUTEX, ACE_TEXT(), ACE_Array_Base< T >::allocator_, allocator_, OpenDDS::DCPS::DCPS_debug_level, ACE_Hash_Map_With_Allocator< class, class >::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataWriterImpl::get_db_lock(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), DDS::HANDLE_NIL, LM_ERROR, lock_, ACE_Allocator::malloc(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, ACE_OS::memcpy(), OpenDDS::DCPS::move(), OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data(), DDS::RETCODE_OK, samples_, ACE_Array_Base< T >::size(), ACE_Message_Block::wr_ptr(), OpenDDS::DCPS::DataWriterImpl::write(), and ACE_Time_Value::zero.

726 {
727  key_type const key(domain_id,
728  topic_name,
729  type_name,
730  this->allocator_.get());
731 
732  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
733 
734  sample_list_type * p_sample_list = 0;
735 
736  if (this->samples_->find(key,
737  p_sample_list,
738  this->allocator_.get()) == -1)
739  return true; // No durable data for this domain/topic/type.
740 
741  else if (p_sample_list == 0)
742  return false; // Should never happen.
743 
744  sample_list_type & sample_list = *p_sample_list;
745 
746  // We will register an instance, and then write all of the cached
747  // data to the DataWriter using that instance.
748 
749  sample_data_type * registration_data = 0;
750 
751  if (sample_list[0]->get(registration_data, 0) == -1)
752  return false;
753 
754  char const * marshaled_sample = 0;
755  size_t marshaled_sample_length = 0;
756  DDS::Time_t registration_timestamp;
757 
758  registration_data->get_sample(marshaled_sample,
759  marshaled_sample_length,
760  registration_timestamp);
761 
762  // Don't use the cached allocator for the registered sample message
763  // block.
764  Message_Block_Ptr registration_sample(
765  new ACE_Message_Block(marshaled_sample_length,
767  0, //cont
768  0, //data
769  0, //alloc_strategy
770  data_writer->get_db_lock()));
771 
772  ACE_OS::memcpy(registration_sample->wr_ptr(),
773  marshaled_sample,
774  marshaled_sample_length);
775 
776  registration_sample->wr_ptr(marshaled_sample_length);
777 
779 
780  /**
781  * @todo Is this going to cause problems for users that set a finite
782  * DDS::ResourceLimitsQosPolicy::max_instances value when
783  * OpenDDS supports that value?
784  */
785  DDS::ReturnCode_t ret =
786  data_writer->register_instance_from_durable_data(handle,
787  move(registration_sample),
788  registration_timestamp);
789 
790  if (ret != DDS::RETCODE_OK)
791  return false;
792 
793  typedef DurabilityQueue<sample_data_type> data_queue_type;
794  size_t const len = sample_list.size();
795 
796  for (size_t i = 0; i != len; ++i) {
797  data_queue_type * const q = sample_list[i];
798 
799  for (data_queue_type::ITERATOR j = q->begin();
800  !j.done();
801  j.advance()) {
802  sample_data_type * data = 0;
803 
804  if (j.next(data) == 0)
805  return false; // Should never happen.
806 
807  char const * sample = 0; // Sample does not include header.
808  size_t sample_length = 0;
809  DDS::Time_t source_timestamp;
810 
811  data->get_sample(sample, sample_length, source_timestamp);
812 
813  ACE_Message_Block * tmp_mb = 0;
814  ACE_NEW_MALLOC_RETURN(tmp_mb,
815  static_cast<ACE_Message_Block*>(
816  mb_allocator->malloc(
817  sizeof(ACE_Message_Block))),
819  sample_length,
821  0, // cont
822  0, // data
823  0, // allocator_strategy
824  data_writer->get_db_lock(), // data block locking_strategy
828  db_allocator,
829  mb_allocator),
830  false);
831  Message_Block_Ptr mb(tmp_mb);
832 
833  ACE_OS::memcpy(mb->wr_ptr(),
834  sample,
835  sample_length);
836  mb->wr_ptr(sample_length);
837 
838  const DDS::ReturnCode_t ret = data_writer->write(move(mb),
839  handle,
840  source_timestamp,
841  0 /* no content filtering */,
842  0 /* no pointer to data */);
843 
844  if (ret != DDS::RETCODE_OK) {
845  return false;
846  }
847  }
848 
849  // Data successfully written. Empty the queue/list.
850  /**
851  * @todo If we don't empty the queue, we'll end up with duplicate
852  * data since the data retrieved from the cache will be
853  * reinserted.
854  */
855  q->reset();
856 
857  try {
858  cleanup_directory(q->fs_path_, this->data_dir_);
859 
860  } catch (const std::exception& ex) {
861  if (DCPS_debug_level > 0) {
862  ACE_ERROR((LM_ERROR,
863  ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
864  ACE_TEXT("couldn't remove directory for PERSISTENT ")
865  ACE_TEXT("data: %C\n"), ex.what()));
866  }
867  }
868  }
869  return true;
870 }
#define ACE_ERROR(X)
static const ACE_Time_Value max_time
const ReturnCode_t RETCODE_OK
sequence< octet > key
void * memcpy(void *t, const void *s, size_t len)
const InstanceHandle_t HANDLE_NIL
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
DurabilityArray< DurabilityQueue< sample_data_type > * > sample_list_type
static const ACE_Time_Value zero
sample_map_type * samples_
Map of all data samples.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_SYNCH_MUTEX lock_
Lock for synchronized access to the underlying map.
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
virtual void * malloc(size_type nbytes)=0

◆ init()

void OpenDDS::DCPS::DataDurabilityCache::init ( void  )
private

Definition at line 313 of file DataDurabilityCache.cpp.

References ACE_ERROR, ACE_NEW_MALLOC, ACE_TEXT(), allocator_, ACE_OS::atoi(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_String_Base< char >::c_str(), ACE_Message_Block::cont(), data_dir_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), kind_, LM_ERROR, ACE_Allocator::malloc(), DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, ACE_Message_Block::release(), samples_, DDS::Time_t::sec, ACE_Array_Base< T >::size(), ACE_Message_Block::space(), TheServiceParticipant, timestamp(), and ACE_Message_Block::wr_ptr().

Referenced by DataDurabilityCache().

314 {
315  ACE_Allocator * const allocator = this->allocator_.get();
317  this->samples_,
318  static_cast<sample_map_type *>(
319  allocator->malloc(sizeof(sample_map_type))),
320  sample_map_type(allocator));
321 
322  typedef DurabilityQueue<sample_data_type> data_queue_type;
323 
324  if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
325  // Read data from the filesystem and create the in-memory data structures
326  // as if we had called insert() once for each "datawriter" directory.
329  Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
330  OPENDDS_VECTOR(OPENDDS_STRING) path(4); // domain, topic, type, datawriter
331 
332  for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
333  domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
334  path[0] = domain->name();
335  DDS::DomainId_t domain_id;
336  {
337  domain_id = ACE_OS::atoi(path[0].c_str());
338  }
339 
340  for (Directory::DirectoryIterator topic = domain->begin_dirs(),
341  topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
342  path[1] = topic->name();
343 
344  for (Directory::DirectoryIterator type = topic->begin_dirs(),
345  type_end = topic->end_dirs(); type != type_end; ++type) {
346  path[2] = type->name();
347 
348  key_type key(domain_id, path[1].c_str(), path[2].c_str(),
349  allocator);
350  sample_list_type * sample_list = 0;
351  ACE_NEW_MALLOC(sample_list,
352  static_cast<sample_list_type *>(
353  allocator->malloc(sizeof(sample_list_type))),
354  sample_list_type(0, static_cast<data_queue_type *>(0),
355  allocator));
356  this->samples_->bind(key, sample_list, allocator);
357 
358  for (Directory::DirectoryIterator dw = type->begin_dirs(),
359  dw_end = type->end_dirs(); dw != dw_end; ++dw) {
360  path[3] = dw->name();
361 
362  size_t old_len = sample_list->size();
363  sample_list->size(old_len + 1);
364  data_queue_type *& slot = (*sample_list)[old_len];
365 
366  // This variable is called "samples" in the insert() method be
367  // we already have a "samples_" which is the overall data structure.
368  data_queue_type * sample_queue = 0;
369  ACE_NEW_MALLOC(sample_queue,
370  static_cast<data_queue_type *>(
371  allocator->malloc(sizeof(data_queue_type))),
372  data_queue_type(allocator));
373 
374  slot = sample_queue;
375  sample_queue->fs_path_ = path;
376 
377  for (Directory::FileIterator file = dw->begin_files(),
378  file_end = dw->end_files(); file != file_end; ++file) {
379  std::ifstream is;
380 
381  if (!file->read(is)) {
382  if (DCPS_debug_level) {
383  ACE_ERROR((LM_ERROR,
384  ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
385  ACE_TEXT("couldn't open file for PERSISTENT ")
386  ACE_TEXT("data: %C\n"), file->name().c_str()));
387  }
388  continue;
389  }
390 
392  is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
393  is.get(); // consume separator
394 
395  const size_t CHUNK = 4096;
396  ACE_Message_Block mb(CHUNK);
397  ACE_Message_Block * current = &mb;
398 
399  while (!is.eof()) {
400  is.read(current->wr_ptr(), current->space());
401 
402  if (is.bad()) break;
403 
404  current->wr_ptr((size_t)is.gcount());
405 
406  if (current->space() == 0) {
407  ACE_Message_Block * old = current;
408  current = new ACE_Message_Block(CHUNK);
409  old->cont(current);
410  }
411  }
412 
413  sample_queue->enqueue_tail(
414  sample_data_type(timestamp, mb, allocator));
415 
416  if (mb.cont()) mb.cont()->release(); // delete the cont() chain
417  }
418  }
419  }
420  }
421  }
422  }
423 
424  this->reactor_ = TheServiceParticipant->timer();
425 }
unsigned long nanosec
DDS::DurabilityQosPolicyKind kind_
#define ACE_ERROR(X)
const char * c_str(void) const
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
sequence< octet > key
ACE_Reactor_Timer_Interface * reactor_
Reactor with which cleanup timers will be registered.
int bind(const EXT_ID &, const INT_ID &, ACE_Allocator *alloc)
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
DOMAINID_TYPE_NATIVE DomainId_t
#define OPENDDS_STRING
virtual ACE_Message_Block * release(void)
ACE_Message_Block * cont(void) const
char * wr_ptr(void) const
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
size_t space(void) const
DurabilityArray< DurabilityQueue< sample_data_type > * > sample_list_type
sample_map_type * samples_
Map of all data samples.
ACE_Hash_Map_With_Allocator< key_type, sample_list_type * > sample_map_type
#define TheServiceParticipant
virtual void * malloc(size_type nbytes)=0
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)

◆ insert()

bool OpenDDS::DCPS::DataDurabilityCache::insert ( DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
SendStateDataSampleList the_data,
DDS::DurabilityServiceQosPolicy const &  qos 
)

Insert the samples corresponding to the given topic instance (uniquely identify by its domain, topic name and type name) into the data durability cache.

Definition at line 474 of file DataDurabilityCache.cpp.

References ACE_DEBUG, ACE_DES_FREE, ACE_ERROR, ACE_GUARD_RETURN, ACE_NEW_MALLOC_RETURN, ACE_SYNCH_MUTEX, ACE_TEXT(), allocator_, OpenDDS::DCPS::SendStateDataSampleList::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_String_Base< char >::c_str(), cleanup_timer_ids_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, data_dir_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SendStateDataSampleList::end(), ACE_Hash_Map_With_Allocator< class, class >::find(), ACE_Allocator::free(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), DDS::DurabilityServiceQosPolicy::history_depth, DDS::DurabilityServiceQosPolicy::history_kind, OpenDDS::DCPS::TimeDuration::is_zero(), DDS::KEEP_ALL_HISTORY_QOS, kind_, DDS::LENGTH_UNLIMITED, LM_DEBUG, LM_ERROR, lock_, ACE_Allocator::malloc(), DDS::DurabilityServiceQosPolicy::max_samples_per_instance, DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, samples_, ACE_Reactor_Timer_Interface::schedule_timer(), DDS::Time_t::sec, DDS::DurabilityServiceQosPolicy::service_cleanup_delay, OpenDDS::DCPS::SendStateDataSampleList::size(), OpenDDS::DCPS::DataSampleHeader::test_flag(), timestamp(), OpenDDS::DCPS::to_dds_string(), and OpenDDS::DCPS::TimeDuration::value().

Referenced by OpenDDS::DCPS::WriteDataContainer::persist_data().

480 {
481  if (the_data.size() == 0)
482  return true; // Nothing to cache.
483 
484  // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related
485  // settings prior to data insertion into the cache.
486  int depth = qos.history_kind == DDS::KEEP_ALL_HISTORY_QOS
487  ? qos.max_samples_per_instance
488  : qos.history_depth;
489 
490  if (depth == DDS::LENGTH_UNLIMITED)
491  depth = 0x7fffffff;
492 
493  // Iterator to first DataSampleElement to be copied.
494  SendStateDataSampleList::iterator element(the_data.begin());
495 
496  if (depth < 0)
497  return false; // Should never occur.
498 
499  else if (depth == 0)
500  return true; // Nothing else to do. Discard all data.
501 
502  else if (the_data.size() > depth) {
503  // N.B. Dropping data samples does not take into account
504  // those samples which are not actually persisted (i.e.
505  // samples with the coherent_sample_ flag set). The spec
506  // does not provide any guidance in this case, therefore
507  // we opt for the simplest solution and assume that there
508  // are no change sets when calculating the number of
509  // samples to drop.
510 
511  // Drop "old" samples. Only keep the "depth" most recent
512  // samples, i.e. those found at the tail end of the
513  // SendStateDataSampleList.
514  ssize_t const advance_amount = the_data.size() - depth;
515  std::advance(element, advance_amount);
516  }
517 
518  // -----------
519 
520  // Copy samples to the domain/topic/type-specific cache.
521 
522  key_type const key(domain_id,
523  topic_name,
524  type_name,
525  this->allocator_.get());
526  SendStateDataSampleList::iterator the_end(the_data.end());
527  sample_list_type * sample_list = 0;
528 
529  typedef DurabilityQueue<sample_data_type> data_queue_type;
530  data_queue_type ** slot = 0;
531  data_queue_type * samples = 0; // sample_list_type::value_type
532 
535  Directory::Ptr dir;
537  {
538  ACE_Allocator * const allocator = this->allocator_.get();
539 
540  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
541 
542  if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
543  try {
544  dir = Directory::create(this->data_dir_.c_str());
545 
546  path.push_back(to_dds_string(domain_id));
547  path.push_back(topic_name);
548  path.push_back(type_name);
549  dir = dir->get_dir(path);
550  // dir is now the "type" directory, which is shared by all datawriters
551  // of the domain/topic/type. We actually need a new directory per
552  // datawriter and this assumes that insert() is called once per
553  // datawriter, as is currently the case.
554  dir = dir->create_next_dir();
555  path.push_back(dir->name()); // for use by the Cleanup_Handler
556 
557  } catch (const std::exception& ex) {
558  if (DCPS_debug_level > 0) {
559  ACE_ERROR((LM_ERROR,
560  ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
561  ACE_TEXT("couldn't create directory for PERSISTENT ")
562  ACE_TEXT("data: %C\n"), ex.what()));
563  }
564 
565  dir.reset();
566  }
567  }
568 
569  if (this->samples_->find(key, sample_list, allocator) != 0) {
570  // Create a new list (actually an ACE_Array_Base<>) with the
571  // appropriate allocator passed to its constructor.
573  sample_list,
574  static_cast<sample_list_type *>(
575  allocator->malloc(sizeof(sample_list_type))),
576  sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
577  false);
578 
579  if (this->samples_->bind(key, sample_list, allocator) != 0)
580  return false;
581  }
582 
583  data_queue_type ** const begin = &((*sample_list)[0]);
584  data_queue_type ** const end =
585  begin + sample_list->size();
586 
587  // Find an empty slot in the array. This is a linear search but
588  // that should be fine for the common case, i.e. a small number of
589  // DataWriters that push data into the cache.
590  slot = std::find(begin,
591  end,
592  static_cast<data_queue_type *>(0));
593 
594  if (slot == end) {
595  // No available slots. Grow the array accordingly.
596  size_t const old_len = sample_list->size();
597  sample_list->size(old_len + 1);
598 
599  data_queue_type ** new_begin = &((*sample_list)[0]);
600  slot = new_begin + old_len;
601  }
602 
604  samples,
605  static_cast<data_queue_type *>(
606  allocator->malloc(sizeof(data_queue_type))),
607  data_queue_type(allocator),
608  false);
609 
610  // Insert the samples in to the sample list.
611  *slot = samples;
612 
613  if (!dir.is_nil()) {
614  samples->fs_path_ = path;
615  }
616 
617  for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
618  DataSampleElement& elem = *i;
619 
620  // N.B. Do not persist samples with coherent changes.
621  // To verify, we check the DataSampleHeader for the
622  // coherent_change_ flag. The DataSampleHeader will
623  // always be the first message block in the chain.
624  //
625  // It should be noted that persisting coherent changes
626  // is a non-trivial task, and should be handled when
627  // finalizing persistence profile conformance.
628  if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) {
629  continue; // skip coherent sample
630  }
631 
632  sample_data_type sample(elem, allocator);
633 
634  if (samples->enqueue_tail(sample) != 0)
635  return false;
636 
637  if (!dir.is_nil()) {
638  try {
639  File::Ptr f = dir->create_next_file();
640  std::ofstream os;
641 
642  if (!f->write(os)) return false;
643 
645  const char * data;
646  size_t len;
647  sample.get_sample(data, len, timestamp);
648 
649  os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
650  os.write(data, len);
651 
652  } catch (const std::exception& ex) {
653  if (DCPS_debug_level > 0) {
654  ACE_ERROR((LM_ERROR,
655  ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
656  ACE_TEXT("couldn't write sample for PERSISTENT ")
657  ACE_TEXT("data: %C\n"), ex.what()));
658  }
659  }
660  }
661  }
662  }
663 
664  // -----------
665 
666  // Schedule cleanup timer.
667  //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent)
668  const TimeDuration cleanup_delay(qos.service_cleanup_delay);
669 
670  if (!cleanup_delay.is_zero()) {
672  ACE_DEBUG((LM_DEBUG,
673  ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
674  ACE_TEXT("cleanup for\n")
675  ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
676  ACE_TEXT("== (%d, %C, %C)\n"),
677  domain_id,
678  topic_name,
679  type_name));
680  }
681 
682  Cleanup_Handler * const cleanup =
683  new Cleanup_Handler(*sample_list,
684  slot - &(*sample_list)[0],
685  this->allocator_.get(),
686  path,
687  this->data_dir_);
688  ACE_Event_Handler_var safe_cleanup(cleanup); // Transfer ownership
689  long const tid =
690  this->reactor_->schedule_timer(cleanup,
691  0, // ACT
692  cleanup_delay.value());
693  if (tid == -1) {
694  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
695 
696  ACE_DES_FREE(samples,
697  this->allocator_->free,
698  DurabilityQueue<sample_data_type>);
699  *slot = 0;
700 
701  return false;
702 
703  } else {
704  {
705  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
706  this->cleanup_timer_ids_.push_back(tid);
707  }
708 
709  cleanup->timer_id(tid,
710  &this->cleanup_timer_ids_);
711  }
712  }
713 
714  return true;
715 }
#define ACE_DEBUG(X)
unsigned long nanosec
DDS::DurabilityQosPolicyKind kind_
#define ACE_ERROR(X)
const char * c_str(void) const
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
sequence< octet > key
ACE_Reactor_Timer_Interface * reactor_
Reactor with which cleanup timers will be registered.
virtual void free(void *ptr)=0
int bind(const EXT_ID &, const INT_ID &, ACE_Allocator *alloc)
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
int ssize_t
const long LENGTH_UNLIMITED
#define OPENDDS_STRING
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
timer_id_list_type cleanup_timer_ids_
Timer ID list.
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
SendStateDataSampleListIterator iterator
STL-style bidirectional iterator and const-iterator types.
DurabilityArray< DurabilityQueue< sample_data_type > * > sample_list_type
sample_map_type * samples_
Map of all data samples.
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
ACE_SYNCH_MUTEX lock_
Lock for synchronized access to the underlying map.
String to_dds_string(unsigned short to_convert)
virtual void * malloc(size_type nbytes)=0
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)

◆ OPENDDS_LIST()

typedef OpenDDS::DCPS::DataDurabilityCache::OPENDDS_LIST ( long  )

◆ operator=()

DataDurabilityCache& OpenDDS::DCPS::DataDurabilityCache::operator= ( DataDurabilityCache const &  )
private

Member Data Documentation

◆ allocator_

unique_ptr<ACE_Allocator> const OpenDDS::DCPS::DataDurabilityCache::allocator_
private

Allocator used to allocate memory for sample map and lists.

Definition at line 227 of file DataDurabilityCache.h.

Referenced by get_data(), init(), insert(), and ~DataDurabilityCache().

◆ cleanup_timer_ids_

timer_id_list_type OpenDDS::DCPS::DataDurabilityCache::cleanup_timer_ids_
private

Timer ID list.

Keep track of cleanup timer IDs in case we need to cancel before they expire.

Definition at line 241 of file DataDurabilityCache.h.

Referenced by insert(), and ~DataDurabilityCache().

◆ data_dir_

ACE_CString OpenDDS::DCPS::DataDurabilityCache::data_dir_
private

Definition at line 231 of file DataDurabilityCache.h.

Referenced by init(), and insert().

◆ kind_

DDS::DurabilityQosPolicyKind OpenDDS::DCPS::DataDurabilityCache::kind_
private

Definition at line 229 of file DataDurabilityCache.h.

Referenced by init(), and insert().

◆ lock_

ACE_SYNCH_MUTEX OpenDDS::DCPS::DataDurabilityCache::lock_
private

Lock for synchronized access to the underlying map.

Definition at line 244 of file DataDurabilityCache.h.

Referenced by get_data(), and insert().

◆ reactor_

ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataDurabilityCache::reactor_
private

Reactor with which cleanup timers will be registered.

Definition at line 247 of file DataDurabilityCache.h.

Referenced by init(), insert(), and ~DataDurabilityCache().

◆ samples_

sample_map_type* OpenDDS::DCPS::DataDurabilityCache::samples_
private

Map of all data samples.

Definition at line 234 of file DataDurabilityCache.h.

Referenced by get_data(), init(), insert(), and ~DataDurabilityCache().


The documentation for this class was generated from the following files: