OpenDDS::DCPS::DataDurabilityCache Class Reference

Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations.. More...

#include <DataDurabilityCache.h>

Collaboration diagram for OpenDDS::DCPS::DataDurabilityCache:

Collaboration graph
[legend]
List of all members.

Public Types

typedef DurabilityArray< DurabilityQueue<
sample_data_type > * > 
sample_list_type
typedef ACE_Hash_Map_With_Allocator<
key_type, sample_list_type * > 
sample_map_type

Public Member Functions

typedef OPENDDS_LIST (long) timer_id_list_type
 DataDurabilityCache (DDS::DurabilityQosPolicyKind kind)
 Constructors.
 DataDurabilityCache (DDS::DurabilityQosPolicyKind kind, ACE_CString &data_dir)
 ~DataDurabilityCache ()
 Destructor.
bool insert (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos)
bool get_data (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataWriterImpl *data_writer, ACE_Allocator *mb_allocator, ACE_Allocator *db_allocator, DDS::LifespanQosPolicy const &)

Private Member Functions

 DataDurabilityCache (DataDurabilityCache const &)
DataDurabilityCacheoperator= (DataDurabilityCache const &)
void init ()

Static Private Member Functions

static std::auto_ptr< ACE_Allocatormake_allocator (DDS::DurabilityQosPolicyKind kind)

Private Attributes

std::auto_ptr< ACE_Allocator
> const 
allocator_
 Allocator used to allocate memory for sample map and lists.
DDS::DurabilityQosPolicyKind kind_
ACE_CString data_dir_
sample_map_typesamples_
 Map of all data samples.
timer_id_list_type cleanup_timer_ids_
 Timer ID list.
ACE_SYNCH_MUTEX lock_
 Lock for synchronized access to the underlying map.
ACE_Reactor_Timer_Interface * reactor_
 Reactor with which cleanup timers will be registered.

Classes

class  key_type
 Key type for underlying maps. More...
class  sample_data_type
 Sample list data type for all samples. More...

Detailed Description

Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..

This class implements a cache that outlives DataWriters.

Definition at line 65 of file DataDurabilityCache.h.


Member Typedef Documentation

typedef DurabilityArray< DurabilityQueue<sample_data_type> *> OpenDDS::DCPS::DataDurabilityCache::sample_list_type

Definition at line 180 of file DataDurabilityCache.h.

typedef ACE_Hash_Map_With_Allocator<key_type, sample_list_type *> OpenDDS::DCPS::DataDurabilityCache::sample_map_type

Definition at line 183 of file DataDurabilityCache.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DDS::DurabilityQosPolicyKind  kind  ) 

Constructors.

Definition at line 285 of file DataDurabilityCache.cpp.

References init().

00287   : allocator_(make_allocator(kind))
00288   , kind_(kind)
00289   , samples_(0)
00290   , cleanup_timer_ids_()
00291   , lock_()
00292   , reactor_(0)
00293 {
00294   init();
00295 }

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DDS::DurabilityQosPolicyKind  kind,
ACE_CString &  data_dir 
)

Definition at line 297 of file DataDurabilityCache.cpp.

References init().

00300   : allocator_(make_allocator(kind))
00301   , kind_(kind)
00302   , data_dir_(data_dir)
00303   , samples_(0)
00304   , cleanup_timer_ids_()
00305   , lock_()
00306   , reactor_(0)
00307 {
00308   init();
00309 }

OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache (  ) 

Destructor.

Definition at line 425 of file DataDurabilityCache.cpp.

References OPENDDS_MAP_TYPE, and reactor_.

00426 {
00427   // Cancel timers that haven't expired yet.
00428   timer_id_list_type::const_iterator const end(
00429     this->cleanup_timer_ids_.end());
00430 
00431   for (timer_id_list_type::const_iterator i(
00432          this->cleanup_timer_ids_.begin());
00433        i != end;
00434        ++i) {
00435     (void) this->reactor_->cancel_timer(*i);
00436   }
00437 
00438   // Clean up memory that isn't automatically managed.
00439   if (this->allocator_.get() != 0) {
00440     sample_map_type::iterator const map_end = this->samples_->end();
00441 
00442     for (sample_map_type::iterator s = this->samples_->begin();
00443          s != map_end;
00444          ++s) {
00445       sample_list_type * const list = (*s).int_id_;
00446 
00447       size_t const len = list->size();;
00448 
00449       for (size_t l = 0; l != len; ++l) {
00450         ACE_DES_FREE((*list)[l],
00451                      this->allocator_->free,
00452                      DurabilityQueue<sample_data_type>);
00453       }
00454 
00455       ACE_DES_FREE(list,
00456                    this->allocator_->free,
00457                    DurabilityArray<DurabilityQueue<sample_data_type> *>);
00458     }
00459 
00460     // Yes, this looks strange but please leave it in place.  The third param
00461     // to ACE_DES_FREE must be the actual class name since it's used in an
00462     // explicit desturctor call (~T()).  Typedefs are not allowed here.  This
00463     // is why the two ACE_DES_FREE's above are not the typedefs.  Below we use
00464     // a macro to hide the internal comma from ACE_DES_FREE's macro expansion.
00465 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
00466     ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE);
00467 #undef OPENDDS_MAP_TYPE
00468   }
00469 }

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DataDurabilityCache const &   )  [private]


Member Function Documentation

bool OpenDDS::DCPS::DataDurabilityCache::get_data ( DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
DataWriterImpl data_writer,
ACE_Allocator mb_allocator,
ACE_Allocator db_allocator,
DDS::LifespanQosPolicy const &   
)

Write cached data corresponding to given domain, topic and type to DataWriter.

Todo:
Is this going to cause problems for users that set a finite DDS::ResourceLimitsQosPolicy::max_instances value when OpenDDS supports that value?

Todo:
If we don't empty the queue, we'll end up with duplicate data since the data retrieved from the cache will be reinserted.

Definition at line 716 of file DataDurabilityCache.cpp.

References cleanup_directory(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataWriterImpl::get_db_lock(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), DDS::HANDLE_NIL, OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data(), DDS::RETCODE_OK, and OpenDDS::DCPS::DataWriterImpl::write().

00724 {
00725   key_type const key(domain_id,
00726                      topic_name,
00727                      type_name,
00728                      this->allocator_.get());
00729 
00730   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00731 
00732   sample_list_type * p_sample_list = 0;
00733 
00734   if (this->samples_->find(key,
00735                            p_sample_list,
00736                            this->allocator_.get()) == -1)
00737     return true;  // No durable data for this domain/topic/type.
00738 
00739   else if (p_sample_list == 0)
00740     return false; // Should never happen.
00741 
00742   sample_list_type & sample_list = *p_sample_list;
00743 
00744   // We will register an instance, and then write all of the cached
00745   // data to the DataWriter using that instance.
00746 
00747   sample_data_type * registration_data = 0;
00748 
00749   if (sample_list[0]->get(registration_data, 0) == -1)
00750     return false;
00751 
00752   char const * marshaled_sample = 0;
00753   size_t marshaled_sample_length = 0;
00754   DDS::Time_t registration_timestamp;
00755 
00756   registration_data->get_sample(marshaled_sample,
00757                                 marshaled_sample_length,
00758                                 registration_timestamp);
00759 
00760   // Don't use the cached allocator for the registered sample message
00761   // block.
00762   std::auto_ptr<DataSample> registration_sample(
00763     new ACE_Message_Block(marshaled_sample_length,
00764                           ACE_Message_Block::MB_DATA,
00765                           0, //cont
00766                           0, //data
00767                           0, //alloc_strategy
00768                           data_writer->get_db_lock()));
00769 
00770   ACE_OS::memcpy(registration_sample->wr_ptr(),
00771                  marshaled_sample,
00772                  marshaled_sample_length);
00773 
00774   registration_sample->wr_ptr(marshaled_sample_length);
00775 
00776   DDS::InstanceHandle_t handle = DDS::HANDLE_NIL;
00777 
00778   /**
00779    * @todo Is this going to cause problems for users that set a finite
00780    *       DDS::ResourceLimitsQosPolicy::max_instances value when
00781    *       OpenDDS supports that value?
00782    */
00783   DDS::ReturnCode_t ret =
00784     data_writer->register_instance_from_durable_data(handle,
00785                                      registration_sample.get(),
00786                                      registration_timestamp);
00787 
00788   if (ret != DDS::RETCODE_OK)
00789     return false;
00790 
00791   registration_sample.release();
00792 
00793   typedef DurabilityQueue<sample_data_type> data_queue_type;
00794   size_t const len = sample_list.size();
00795 
00796   for (size_t i = 0; i != len; ++i) {
00797     data_queue_type * const q = sample_list[i];
00798 
00799     for (data_queue_type::ITERATOR j = q->begin();
00800          !j.done();
00801          j.advance()) {
00802       sample_data_type * data = 0;
00803 
00804       if (j.next(data) == 0)
00805         return false;  // Should never happen.
00806 
00807       char const * sample = 0;  // Sample does not include header.
00808       size_t sample_length = 0;
00809       DDS::Time_t source_timestamp;
00810 
00811       data->get_sample(sample, sample_length, source_timestamp);
00812 
00813       ACE_Message_Block * mb = 0;
00814       ACE_NEW_MALLOC_RETURN(mb,
00815                             static_cast<ACE_Message_Block*>(
00816                               mb_allocator->malloc(
00817                                 sizeof(ACE_Message_Block))),
00818                             ACE_Message_Block(
00819                               sample_length,
00820                               ACE_Message_Block::MB_DATA,
00821                               0, // cont
00822                               0, // data
00823                               0, // allocator_strategy
00824                               data_writer->get_db_lock(), // data block locking_strategy
00825                               ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00826                               ACE_Time_Value::zero,
00827                               ACE_Time_Value::max_time,
00828                               db_allocator,
00829                               mb_allocator),
00830                             false);
00831 
00832       ACE_OS::memcpy(mb->wr_ptr(),
00833                      sample,
00834                      sample_length);
00835       mb->wr_ptr(sample_length);
00836 
00837       const DDS::ReturnCode_t ret = data_writer->write(mb, handle,
00838         source_timestamp, 0 /* no content filtering */);
00839 
00840       if (ret != DDS::RETCODE_OK) {
00841         ACE_DES_FREE(mb,
00842                      mb_allocator->free,
00843                      ACE_Message_Block);
00844         return false;
00845       }
00846     }
00847 
00848     // Data successfully written.  Empty the queue/list.
00849     /**
00850      * @todo If we don't empty the queue, we'll end up with duplicate
00851      *       data since the data retrieved from the cache will be
00852      *       reinserted.
00853      */
00854     q->reset();
00855 
00856     try {
00857       cleanup_directory(q->fs_path_, this->data_dir_);
00858 
00859     } catch (const std::exception& ex) {
00860       if (DCPS_debug_level > 0) {
00861         ACE_ERROR((LM_ERROR,
00862                    ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
00863                    ACE_TEXT("couldn't remove directory for PERSISTENT ")
00864                    ACE_TEXT("data: %C\n"), ex.what()));
00865       }
00866     }
00867   }
00868   return true;
00869 }

void OpenDDS::DCPS::DataDurabilityCache::init (  )  [private]

Definition at line 311 of file DataDurabilityCache.cpp.

References allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, samples_, DDS::Time_t::sec, and TheServiceParticipant.

Referenced by DataDurabilityCache().

00312 {
00313   ACE_Allocator * const allocator = this->allocator_.get();
00314   ACE_NEW_MALLOC(
00315     this->samples_,
00316     static_cast<sample_map_type *>(
00317       allocator->malloc(sizeof(sample_map_type))),
00318     sample_map_type(allocator));
00319 
00320   typedef DurabilityQueue<sample_data_type> data_queue_type;
00321 
00322   if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00323     // Read data from the filesystem and create the in-memory data structures
00324     // as if we had called insert() once for each "datawriter" directory.
00325     using OpenDDS::FileSystemStorage::Directory;
00326     using OpenDDS::FileSystemStorage::File;
00327     Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
00328     OPENDDS_VECTOR(OPENDDS_STRING) path(4);  // domain, topic, type, datawriter
00329 
00330     for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
00331          domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
00332       path[0] = domain->name();
00333       DDS::DomainId_t domain_id;
00334       {
00335         domain_id = ACE_OS::atoi(path[0].c_str());
00336       }
00337 
00338       for (Directory::DirectoryIterator topic = domain->begin_dirs(),
00339            topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
00340         path[1] = topic->name();
00341 
00342         for (Directory::DirectoryIterator type = topic->begin_dirs(),
00343              type_end = topic->end_dirs(); type != type_end; ++type) {
00344           path[2] = type->name();
00345 
00346           key_type key(domain_id, path[1].c_str(), path[2].c_str(),
00347                        allocator);
00348           sample_list_type * sample_list = 0;
00349           ACE_NEW_MALLOC(sample_list,
00350                          static_cast<sample_list_type *>(
00351                            allocator->malloc(sizeof(sample_list_type))),
00352                          sample_list_type(0, static_cast<data_queue_type *>(0),
00353                                           allocator));
00354           this->samples_->bind(key, sample_list, allocator);
00355 
00356           for (Directory::DirectoryIterator dw = type->begin_dirs(),
00357                dw_end = type->end_dirs(); dw != dw_end; ++dw) {
00358             path[3] = dw->name();
00359 
00360             size_t old_len = sample_list->size();
00361             sample_list->size(old_len + 1);
00362             data_queue_type *& slot = (*sample_list)[old_len];
00363 
00364             // This variable is called "samples" in the insert() method be
00365             // we already have a "samples_" which is the overall data structure.
00366             data_queue_type * sample_queue = 0;
00367             ACE_NEW_MALLOC(sample_queue,
00368                            static_cast<data_queue_type *>(
00369                              allocator->malloc(sizeof(data_queue_type))),
00370                            data_queue_type(allocator));
00371 
00372             slot = sample_queue;
00373             sample_queue->fs_path_ = path;
00374 
00375             for (Directory::FileIterator file = dw->begin_files(),
00376                  file_end = dw->end_files(); file != file_end; ++file) {
00377               std::ifstream is;
00378 
00379               if (!file->read(is)) {
00380                 if (DCPS_debug_level) {
00381                   ACE_ERROR((LM_ERROR,
00382                              ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
00383                              ACE_TEXT("couldn't open file for PERSISTENT ")
00384                              ACE_TEXT("data: %C\n"), file->name().c_str()));
00385                 }
00386                 continue;
00387               }
00388 
00389               DDS::Time_t timestamp;
00390               is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
00391               is.get(); // consume separator
00392 
00393               const size_t CHUNK = 4096;
00394               ACE_Message_Block mb(CHUNK);
00395               ACE_Message_Block * current = &mb;
00396 
00397               while (!is.eof()) {
00398                 is.read(current->wr_ptr(), current->space());
00399 
00400                 if (is.bad()) break;
00401 
00402                 current->wr_ptr((size_t)is.gcount());
00403 
00404                 if (current->space() == 0) {
00405                   ACE_Message_Block * old = current;
00406                   current = new ACE_Message_Block(CHUNK);
00407                   old->cont(current);
00408                 }
00409               }
00410 
00411               sample_queue->enqueue_tail(
00412                 sample_data_type(timestamp, mb, allocator));
00413 
00414               if (mb.cont()) mb.cont()->release();    // delete the cont() chain
00415             }
00416           }
00417         }
00418       }
00419     }
00420   }
00421 
00422   this->reactor_ = TheServiceParticipant->timer();
00423 }

bool OpenDDS::DCPS::DataDurabilityCache::insert ( DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
SendStateDataSampleList the_data,
DDS::DurabilityServiceQosPolicy const &  qos 
)

Insert the samples corresponding to the given topic instance (uniquely identify by its domain, topic name and type name) into the data durability cache.

Definition at line 472 of file DataDurabilityCache.cpp.

References allocator_, OpenDDS::DCPS::SendStateDataSampleList::begin(), cleanup_timer_ids_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, data_dir_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::SendStateDataSampleList::end(), Util::find(), OpenDDS::DCPS::get_instance_sample_list_depth(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), DDS::DurabilityServiceQosPolicy::history_depth, DDS::DurabilityServiceQosPolicy::history_kind, DDS::DurabilityServiceQosPolicy::max_samples_per_instance, DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, DDS::Time_t::sec, DDS::DurabilityServiceQosPolicy::service_cleanup_delay, OpenDDS::DCPS::SendStateDataSampleList::size(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and OpenDDS::DCPS::to_dds_string().

Referenced by OpenDDS::DCPS::WriteDataContainer::persist_data().

00478 {
00479   if (the_data.size() == 0)
00480     return true;  // Nothing to cache.
00481 
00482   // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related
00483   // settings prior to data insertion into the cache.
00484   CORBA::Long const depth =
00485     get_instance_sample_list_depth(
00486       qos.history_kind,
00487       qos.history_depth,
00488       qos.max_samples_per_instance);
00489 
00490   // Iterator to first DataSampleElement to be copied.
00491   SendStateDataSampleList::iterator element(the_data.begin());
00492 
00493   if (depth < 0)
00494     return false; // Should never occur.
00495 
00496   else if (depth == 0)
00497     return true;  // Nothing else to do.  Discard all data.
00498 
00499   else if (the_data.size() > depth) {
00500     // N.B. Dropping data samples does not take into account
00501     // those samples which are not actually persisted (i.e.
00502     // samples with the coherent_sample_ flag set). The spec
00503     // does not provide any guidance in this case, therefore
00504     // we opt for the simplest solution and assume that there
00505     // are no change sets when calculating the number of
00506     // samples to drop.
00507 
00508     // Drop "old" samples.  Only keep the "depth" most recent
00509     // samples, i.e. those found at the tail end of the
00510     // SendStateDataSampleList.
00511     ssize_t const advance_amount = the_data.size() - depth;
00512     std::advance(element, advance_amount);
00513   }
00514 
00515   // -----------
00516 
00517   // Copy samples to the domain/topic/type-specific cache.
00518 
00519   key_type const key(domain_id,
00520                      topic_name,
00521                      type_name,
00522                      this->allocator_.get());
00523   SendStateDataSampleList::iterator the_end(the_data.end());
00524   sample_list_type * sample_list = 0;
00525 
00526   typedef DurabilityQueue<sample_data_type> data_queue_type;
00527   data_queue_type ** slot = 0;
00528   data_queue_type * samples = 0;  // sample_list_type::value_type
00529 
00530   using OpenDDS::FileSystemStorage::Directory;
00531   using OpenDDS::FileSystemStorage::File;
00532   Directory::Ptr dir;
00533   OPENDDS_VECTOR(OPENDDS_STRING) path;
00534   {
00535     ACE_Allocator * const allocator = this->allocator_.get();
00536 
00537     ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00538 
00539     if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00540       try {
00541         dir = Directory::create(this->data_dir_.c_str());
00542 
00543         path.push_back(to_dds_string(domain_id));
00544         path.push_back(topic_name);
00545         path.push_back(type_name);
00546         dir = dir->get_dir(path);
00547         // dir is now the "type" directory, which is shared by all datawriters
00548         // of the domain/topic/type.  We actually need a new directory per
00549         // datawriter and this assumes that insert() is called once per
00550         // datawriter, as is currently the case.
00551         dir = dir->create_next_dir();
00552         path.push_back(dir->name());   // for use by the Cleanup_Handler
00553 
00554       } catch (const std::exception& ex) {
00555         if (DCPS_debug_level > 0) {
00556           ACE_ERROR((LM_ERROR,
00557                      ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00558                      ACE_TEXT("couldn't create directory for PERSISTENT ")
00559                      ACE_TEXT("data: %C\n"), ex.what()));
00560         }
00561 
00562         dir = 0;
00563       }
00564     }
00565 
00566     if (this->samples_->find(key, sample_list, allocator) != 0) {
00567       // Create a new list (actually an ACE_Array_Base<>) with the
00568       // appropriate allocator passed to its constructor.
00569       ACE_NEW_MALLOC_RETURN(
00570         sample_list,
00571         static_cast<sample_list_type *>(
00572           allocator->malloc(sizeof(sample_list_type))),
00573         sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
00574         false);
00575 
00576       if (this->samples_->bind(key, sample_list, allocator) != 0)
00577         return false;
00578     }
00579 
00580     data_queue_type ** const begin = &((*sample_list)[0]);
00581     data_queue_type ** const end =
00582       begin + sample_list->size();
00583 
00584     // Find an empty slot in the array.  This is a linear search but
00585     // that should be fine for the common case, i.e. a small number of
00586     // DataWriters that push data into the cache.
00587     slot = std::find(begin,
00588                      end,
00589                      static_cast<data_queue_type *>(0));
00590 
00591     if (slot == end) {
00592       // No available slots.  Grow the array accordingly.
00593       size_t const old_len = sample_list->size();
00594       sample_list->size(old_len + 1);
00595 
00596       data_queue_type ** new_begin = &((*sample_list)[0]);
00597       slot = new_begin + old_len;
00598     }
00599 
00600     ACE_NEW_MALLOC_RETURN(
00601       samples,
00602       static_cast<data_queue_type *>(
00603         allocator->malloc(sizeof(data_queue_type))),
00604       data_queue_type(allocator),
00605       false);
00606 
00607     // Insert the samples in to the sample list.
00608     *slot = samples;
00609 
00610     if (!dir.is_nil()) {
00611       samples->fs_path_ = path;
00612     }
00613 
00614     for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
00615       DataSampleElement& elem = *i;
00616 
00617       // N.B. Do not persist samples with coherent changes.
00618       // To verify, we check the DataSampleHeader for the
00619       // coherent_change_ flag. The DataSampleHeader will
00620       // always be the first message block in the chain.
00621       //
00622       // It should be noted that persisting coherent changes
00623       // is a non-trivial task, and should be handled when
00624       // finalizing persistence profile conformance.
00625       if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) {
00626         continue; // skip coherent sample
00627       }
00628 
00629       sample_data_type sample(elem, allocator);
00630 
00631       if (samples->enqueue_tail(sample) != 0)
00632         return false;
00633 
00634       if (!dir.is_nil()) {
00635         try {
00636           File::Ptr f = dir->create_next_file();
00637           std::ofstream os;
00638 
00639           if (!f->write(os)) return false;
00640 
00641           DDS::Time_t timestamp;
00642           const char * data;
00643           size_t len;
00644           sample.get_sample(data, len, timestamp);
00645 
00646           os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
00647           os.write(data, len);
00648 
00649         } catch (const std::exception& ex) {
00650           if (DCPS_debug_level > 0) {
00651             ACE_ERROR((LM_ERROR,
00652                        ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00653                        ACE_TEXT("couldn't write sample for PERSISTENT ")
00654                        ACE_TEXT("data: %C\n"), ex.what()));
00655           }
00656         }
00657       }
00658     }
00659   }
00660 
00661   // -----------
00662 
00663   // Schedule cleanup timer.
00664   //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent)
00665   ACE_Time_Value const cleanup_delay(
00666     duration_to_time_value(qos.service_cleanup_delay));
00667 
00668   if (cleanup_delay > ACE_Time_Value::zero) {
00669     if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00670       ACE_DEBUG((LM_DEBUG,
00671                  ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
00672                  ACE_TEXT("cleanup for\n")
00673                  ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
00674                  ACE_TEXT("== (%d, %C, %C)\n"),
00675                  domain_id,
00676                  topic_name,
00677                  type_name));
00678     }
00679 
00680     Cleanup_Handler * const cleanup =
00681       new Cleanup_Handler(*sample_list,
00682                           slot - &(*sample_list)[0],
00683                           this->allocator_.get(),
00684                           path,
00685                           this->data_dir_);
00686     ACE_Event_Handler_var safe_cleanup(cleanup);   // Transfer ownership
00687     long const tid =
00688       this->reactor_->schedule_timer(cleanup,
00689                                      0, // ACT
00690                                      cleanup_delay);
00691     if (tid == -1) {
00692       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00693 
00694       ACE_DES_FREE(samples,
00695                    this->allocator_->free,
00696                    DurabilityQueue<sample_data_type>);
00697       *slot = 0;
00698 
00699       return false;
00700 
00701     } else {
00702       {
00703         ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00704         this->cleanup_timer_ids_.push_back(tid);
00705       }
00706 
00707       cleanup->timer_id(tid,
00708                         &this->cleanup_timer_ids_);
00709     }
00710   }
00711 
00712   return true;
00713 }

std::auto_ptr< ACE_Allocator > OpenDDS::DCPS::DataDurabilityCache::make_allocator ( DDS::DurabilityQosPolicyKind  kind  )  [static, private]

Make allocator suitable to support specified kind of DURABILITY.

Definition at line 872 of file DataDurabilityCache.cpp.

00874 {
00875   // The use of other Allocators has been removed but this function
00876   // remains for the time being.
00877   // TODO: clean up all the ACE_Allocator-related code
00878   return std::auto_ptr<ACE_Allocator> (new ACE_New_Allocator);
00879 }

typedef OpenDDS::DCPS::DataDurabilityCache::OPENDDS_LIST ( long   ) 

DataDurabilityCache& OpenDDS::DCPS::DataDurabilityCache::operator= ( DataDurabilityCache const &   )  [private]


Member Data Documentation

std::auto_ptr<ACE_Allocator> const OpenDDS::DCPS::DataDurabilityCache::allocator_ [private]

Allocator used to allocate memory for sample map and lists.

Definition at line 230 of file DataDurabilityCache.h.

Referenced by init(), and insert().

timer_id_list_type OpenDDS::DCPS::DataDurabilityCache::cleanup_timer_ids_ [private]

Timer ID list.

Keep track of cleanup timer IDs in case we need to cancel before they expire.

Definition at line 244 of file DataDurabilityCache.h.

Referenced by insert().

ACE_CString OpenDDS::DCPS::DataDurabilityCache::data_dir_ [private]

Definition at line 234 of file DataDurabilityCache.h.

Referenced by insert().

DDS::DurabilityQosPolicyKind OpenDDS::DCPS::DataDurabilityCache::kind_ [private]

Definition at line 232 of file DataDurabilityCache.h.

ACE_SYNCH_MUTEX OpenDDS::DCPS::DataDurabilityCache::lock_ [private]

Lock for synchronized access to the underlying map.

Definition at line 247 of file DataDurabilityCache.h.

ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataDurabilityCache::reactor_ [private]

Reactor with which cleanup timers will be registered.

Definition at line 250 of file DataDurabilityCache.h.

Referenced by init(), insert(), and ~DataDurabilityCache().

sample_map_type* OpenDDS::DCPS::DataDurabilityCache::samples_ [private]

Map of all data samples.

Definition at line 237 of file DataDurabilityCache.h.

Referenced by init().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:11 2016 for OpenDDS by  doxygen 1.4.7