OpenDDS::DCPS::DataDurabilityCache Class Reference

Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations.. More...

#include <DataDurabilityCache.h>

Collaboration diagram for OpenDDS::DCPS::DataDurabilityCache:
Collaboration graph
[legend]

List of all members.

Classes

class  key_type
 Key type for underlying maps. More...
class  sample_data_type
 Sample list data type for all samples. More...

Public Types

typedef DurabilityArray
< DurabilityQueue
< sample_data_type > * > 
sample_list_type
typedef
ACE_Hash_Map_With_Allocator
< key_type, sample_list_type * > 
sample_map_type

Public Member Functions

typedef OPENDDS_LIST (long) timer_id_list_type
 DataDurabilityCache (DDS::DurabilityQosPolicyKind kind)
 DataDurabilityCache (DDS::DurabilityQosPolicyKind kind, ACE_CString &data_dir)
 ~DataDurabilityCache ()
bool insert (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos)
bool get_data (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataWriterImpl *data_writer, ACE_Allocator *mb_allocator, ACE_Allocator *db_allocator, DDS::LifespanQosPolicy const &)

Private Member Functions

 DataDurabilityCache (DataDurabilityCache const &)
DataDurabilityCacheoperator= (DataDurabilityCache const &)
void init ()

Private Attributes

unique_ptr< ACE_Allocator > const allocator_
 Allocator used to allocate memory for sample map and lists.
DDS::DurabilityQosPolicyKind kind_
ACE_CString data_dir_
sample_map_typesamples_
 Map of all data samples.
timer_id_list_type cleanup_timer_ids_
 Timer ID list.
ACE_SYNCH_MUTEX lock_
 Lock for synchronized access to the underlying map.
ACE_Reactor_Timer_Interfacereactor_
 Reactor with which cleanup timers will be registered.

Detailed Description

Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..

This class implements a cache that outlives DataWriters.

Definition at line 69 of file DataDurabilityCache.h.


Member Typedef Documentation

Definition at line 184 of file DataDurabilityCache.h.

Definition at line 187 of file DataDurabilityCache.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DDS::DurabilityQosPolicyKind  kind  ) 

Definition at line 284 of file DataDurabilityCache.cpp.

References init().

00286   : allocator_(new ACE_New_Allocator)
00287   , kind_(kind)
00288   , samples_(0)
00289   , cleanup_timer_ids_()
00290   , lock_()
00291   , reactor_(0)
00292 {
00293   init();
00294 }

Here is the call graph for this function:

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DDS::DurabilityQosPolicyKind  kind,
ACE_CString data_dir 
)

Definition at line 296 of file DataDurabilityCache.cpp.

References init().

00299   : allocator_(new ACE_New_Allocator)
00300   , kind_(kind)
00301   , data_dir_(data_dir)
00302   , samples_(0)
00303   , cleanup_timer_ids_()
00304   , lock_()
00305   , reactor_(0)
00306 {
00307   init();
00308 }

Here is the call graph for this function:

OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache (  ) 

Definition at line 424 of file DataDurabilityCache.cpp.

References allocator_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Reactor_Timer_Interface::cancel_timer(), cleanup_timer_ids_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, OPENDDS_MAP_TYPE, reactor_, samples_, and ACE_Array_Base< T >::size().

00425 {
00426   // Cancel timers that haven't expired yet.
00427   timer_id_list_type::const_iterator const end(
00428     this->cleanup_timer_ids_.end());
00429 
00430   for (timer_id_list_type::const_iterator i(
00431          this->cleanup_timer_ids_.begin());
00432        i != end;
00433        ++i) {
00434     (void) this->reactor_->cancel_timer(*i);
00435   }
00436 
00437   // Clean up memory that isn't automatically managed.
00438   if (this->allocator_.get() != 0) {
00439     sample_map_type::iterator const map_end = this->samples_->end();
00440 
00441     for (sample_map_type::iterator s = this->samples_->begin();
00442          s != map_end;
00443          ++s) {
00444       sample_list_type * const list = (*s).int_id_;
00445 
00446       size_t const len = list->size();
00447 
00448       for (size_t l = 0; l != len; ++l) {
00449         ACE_DES_FREE((*list)[l],
00450                      this->allocator_->free,
00451                      DurabilityQueue<sample_data_type>);
00452       }
00453 
00454       ACE_DES_FREE(list,
00455                    this->allocator_->free,
00456                    DurabilityArray<DurabilityQueue<sample_data_type> *>);
00457     }
00458 
00459     // Yes, this looks strange but please leave it in place.  The third param
00460     // to ACE_DES_FREE must be the actual class name since it's used in an
00461     // explicit desturctor call (~T()).  Typedefs are not allowed here.  This
00462     // is why the two ACE_DES_FREE's above are not the typedefs.  Below we use
00463     // a macro to hide the internal comma from ACE_DES_FREE's macro expansion.
00464 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
00465     ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE);
00466 #undef OPENDDS_MAP_TYPE
00467   }
00468 }

Here is the call graph for this function:

OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache ( DataDurabilityCache const &   )  [private]

Member Function Documentation

bool OpenDDS::DCPS::DataDurabilityCache::get_data ( DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
DataWriterImpl data_writer,
ACE_Allocator mb_allocator,
ACE_Allocator db_allocator,
DDS::LifespanQosPolicy const &   
)

Write cached data corresponding to given domain, topic and type to DataWriter.

Todo:
Is this going to cause problems for users that set a finite DDS::ResourceLimitsQosPolicy::max_instances value when OpenDDS supports that value?
Todo:
If we don't empty the queue, we'll end up with duplicate data since the data retrieved from the cache will be reinserted.

Definition at line 716 of file DataDurabilityCache.cpp.

References ACE_TEXT(), ACE_Array_Base< T >::allocator_, allocator_, OpenDDS::DCPS::DCPS_debug_level, ACE_Hash_Map_With_Allocator< class, class >::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataWriterImpl::get_db_lock(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), DDS::HANDLE_NIL, len, LM_ERROR, lock_, ACE_Allocator::malloc(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, ACE_OS::memcpy(), OpenDDS::DCPS::move(), OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data(), DDS::RETCODE_OK, samples_, ACE_Array_Base< T >::size(), OpenDDS::DCPS::DataWriterImpl::write(), and ACE_Time_Value::zero.

00724 {
00725   key_type const key(domain_id,
00726                      topic_name,
00727                      type_name,
00728                      this->allocator_.get());
00729 
00730   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00731 
00732   sample_list_type * p_sample_list = 0;
00733 
00734   if (this->samples_->find(key,
00735                            p_sample_list,
00736                            this->allocator_.get()) == -1)
00737     return true;  // No durable data for this domain/topic/type.
00738 
00739   else if (p_sample_list == 0)
00740     return false; // Should never happen.
00741 
00742   sample_list_type & sample_list = *p_sample_list;
00743 
00744   // We will register an instance, and then write all of the cached
00745   // data to the DataWriter using that instance.
00746 
00747   sample_data_type * registration_data = 0;
00748 
00749   if (sample_list[0]->get(registration_data, 0) == -1)
00750     return false;
00751 
00752   char const * marshaled_sample = 0;
00753   size_t marshaled_sample_length = 0;
00754   DDS::Time_t registration_timestamp;
00755 
00756   registration_data->get_sample(marshaled_sample,
00757                                 marshaled_sample_length,
00758                                 registration_timestamp);
00759 
00760   // Don't use the cached allocator for the registered sample message
00761   // block.
00762   Message_Block_Ptr registration_sample(
00763     new ACE_Message_Block(marshaled_sample_length,
00764                           ACE_Message_Block::MB_DATA,
00765                           0, //cont
00766                           0, //data
00767                           0, //alloc_strategy
00768                           data_writer->get_db_lock()));
00769 
00770   ACE_OS::memcpy(registration_sample->wr_ptr(),
00771                  marshaled_sample,
00772                  marshaled_sample_length);
00773 
00774   registration_sample->wr_ptr(marshaled_sample_length);
00775 
00776   DDS::InstanceHandle_t handle = DDS::HANDLE_NIL;
00777 
00778   /**
00779    * @todo Is this going to cause problems for users that set a finite
00780    *       DDS::ResourceLimitsQosPolicy::max_instances value when
00781    *       OpenDDS supports that value?
00782    */
00783   DDS::ReturnCode_t ret =
00784     data_writer->register_instance_from_durable_data(handle,
00785                                      move(registration_sample),
00786                                      registration_timestamp);
00787 
00788   if (ret != DDS::RETCODE_OK)
00789     return false;
00790 
00791   typedef DurabilityQueue<sample_data_type> data_queue_type;
00792   size_t const len = sample_list.size();
00793 
00794   for (size_t i = 0; i != len; ++i) {
00795     data_queue_type * const q = sample_list[i];
00796 
00797     for (data_queue_type::ITERATOR j = q->begin();
00798          !j.done();
00799          j.advance()) {
00800       sample_data_type * data = 0;
00801 
00802       if (j.next(data) == 0)
00803         return false;  // Should never happen.
00804 
00805       char const * sample = 0;  // Sample does not include header.
00806       size_t sample_length = 0;
00807       DDS::Time_t source_timestamp;
00808 
00809       data->get_sample(sample, sample_length, source_timestamp);
00810 
00811       ACE_Message_Block * tmp_mb = 0;
00812       ACE_NEW_MALLOC_RETURN(tmp_mb,
00813                             static_cast<ACE_Message_Block*>(
00814                               mb_allocator->malloc(
00815                                 sizeof(ACE_Message_Block))),
00816                             ACE_Message_Block(
00817                               sample_length,
00818                               ACE_Message_Block::MB_DATA,
00819                               0, // cont
00820                               0, // data
00821                               0, // allocator_strategy
00822                               data_writer->get_db_lock(), // data block locking_strategy
00823                               ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00824                               ACE_Time_Value::zero,
00825                               ACE_Time_Value::max_time,
00826                               db_allocator,
00827                               mb_allocator),
00828                             false);
00829       Message_Block_Ptr mb(tmp_mb);
00830 
00831       ACE_OS::memcpy(mb->wr_ptr(),
00832                      sample,
00833                      sample_length);
00834       mb->wr_ptr(sample_length);
00835 
00836       const DDS::ReturnCode_t ret = data_writer->write(move(mb), handle,
00837         source_timestamp, 0 /* no content filtering */);
00838 
00839       if (ret != DDS::RETCODE_OK) {
00840         return false;
00841       }
00842     }
00843 
00844     // Data successfully written.  Empty the queue/list.
00845     /**
00846      * @todo If we don't empty the queue, we'll end up with duplicate
00847      *       data since the data retrieved from the cache will be
00848      *       reinserted.
00849      */
00850     q->reset();
00851 
00852     try {
00853       cleanup_directory(q->fs_path_, this->data_dir_);
00854 
00855     } catch (const std::exception& ex) {
00856       if (DCPS_debug_level > 0) {
00857         ACE_ERROR((LM_ERROR,
00858                    ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
00859                    ACE_TEXT("couldn't remove directory for PERSISTENT ")
00860                    ACE_TEXT("data: %C\n"), ex.what()));
00861       }
00862     }
00863   }
00864   return true;
00865 }

Here is the call graph for this function:

void OpenDDS::DCPS::DataDurabilityCache::init ( void   )  [private]

Definition at line 310 of file DataDurabilityCache.cpp.

References ACE_TEXT(), allocator_, ACE_OS::atoi(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_String_Base< ACE_CHAR_T >::c_str(), ACE_Message_Block::cont(), data_dir_, OpenDDS::DCPS::DCPS_debug_level, file, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), kind_, LM_ERROR, ACE_Allocator::malloc(), DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, ACE_Message_Block::release(), samples_, DDS::Time_t::sec, ACE_Array_Base< T >::size(), ACE_Message_Block::space(), TheServiceParticipant, timestamp(), and ACE_Message_Block::wr_ptr().

Referenced by DataDurabilityCache().

00311 {
00312   ACE_Allocator * const allocator = this->allocator_.get();
00313   ACE_NEW_MALLOC(
00314     this->samples_,
00315     static_cast<sample_map_type *>(
00316       allocator->malloc(sizeof(sample_map_type))),
00317     sample_map_type(allocator));
00318 
00319   typedef DurabilityQueue<sample_data_type> data_queue_type;
00320 
00321   if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00322     // Read data from the filesystem and create the in-memory data structures
00323     // as if we had called insert() once for each "datawriter" directory.
00324     using OpenDDS::FileSystemStorage::Directory;
00325     using OpenDDS::FileSystemStorage::File;
00326     Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
00327     OPENDDS_VECTOR(OPENDDS_STRING) path(4);  // domain, topic, type, datawriter
00328 
00329     for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
00330          domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
00331       path[0] = domain->name();
00332       DDS::DomainId_t domain_id;
00333       {
00334         domain_id = ACE_OS::atoi(path[0].c_str());
00335       }
00336 
00337       for (Directory::DirectoryIterator topic = domain->begin_dirs(),
00338            topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
00339         path[1] = topic->name();
00340 
00341         for (Directory::DirectoryIterator type = topic->begin_dirs(),
00342              type_end = topic->end_dirs(); type != type_end; ++type) {
00343           path[2] = type->name();
00344 
00345           key_type key(domain_id, path[1].c_str(), path[2].c_str(),
00346                        allocator);
00347           sample_list_type * sample_list = 0;
00348           ACE_NEW_MALLOC(sample_list,
00349                          static_cast<sample_list_type *>(
00350                            allocator->malloc(sizeof(sample_list_type))),
00351                          sample_list_type(0, static_cast<data_queue_type *>(0),
00352                                           allocator));
00353           this->samples_->bind(key, sample_list, allocator);
00354 
00355           for (Directory::DirectoryIterator dw = type->begin_dirs(),
00356                dw_end = type->end_dirs(); dw != dw_end; ++dw) {
00357             path[3] = dw->name();
00358 
00359             size_t old_len = sample_list->size();
00360             sample_list->size(old_len + 1);
00361             data_queue_type *& slot = (*sample_list)[old_len];
00362 
00363             // This variable is called "samples" in the insert() method be
00364             // we already have a "samples_" which is the overall data structure.
00365             data_queue_type * sample_queue = 0;
00366             ACE_NEW_MALLOC(sample_queue,
00367                            static_cast<data_queue_type *>(
00368                              allocator->malloc(sizeof(data_queue_type))),
00369                            data_queue_type(allocator));
00370 
00371             slot = sample_queue;
00372             sample_queue->fs_path_ = path;
00373 
00374             for (Directory::FileIterator file = dw->begin_files(),
00375                  file_end = dw->end_files(); file != file_end; ++file) {
00376               std::ifstream is;
00377 
00378               if (!file->read(is)) {
00379                 if (DCPS_debug_level) {
00380                   ACE_ERROR((LM_ERROR,
00381                              ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
00382                              ACE_TEXT("couldn't open file for PERSISTENT ")
00383                              ACE_TEXT("data: %C\n"), file->name().c_str()));
00384                 }
00385                 continue;
00386               }
00387 
00388               DDS::Time_t timestamp;
00389               is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
00390               is.get(); // consume separator
00391 
00392               const size_t CHUNK = 4096;
00393               ACE_Message_Block mb(CHUNK);
00394               ACE_Message_Block * current = &mb;
00395 
00396               while (!is.eof()) {
00397                 is.read(current->wr_ptr(), current->space());
00398 
00399                 if (is.bad()) break;
00400 
00401                 current->wr_ptr((size_t)is.gcount());
00402 
00403                 if (current->space() == 0) {
00404                   ACE_Message_Block * old = current;
00405                   current = new ACE_Message_Block(CHUNK);
00406                   old->cont(current);
00407                 }
00408               }
00409 
00410               sample_queue->enqueue_tail(
00411                 sample_data_type(timestamp, mb, allocator));
00412 
00413               if (mb.cont()) mb.cont()->release();    // delete the cont() chain
00414             }
00415           }
00416         }
00417       }
00418     }
00419   }
00420 
00421   this->reactor_ = TheServiceParticipant->timer();
00422 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::DataDurabilityCache::insert ( DDS::DomainId_t  domain_id,
char const *  topic_name,
char const *  type_name,
SendStateDataSampleList the_data,
DDS::DurabilityServiceQosPolicy const &  qos 
)

Insert the samples corresponding to the given topic instance (uniquely identify by its domain, topic name and type name) into the data durability cache.

Definition at line 471 of file DataDurabilityCache.cpp.

References ACE_TEXT(), allocator_, OpenDDS::DCPS::SendStateDataSampleList::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_String_Base< ACE_CHAR_T >::c_str(), cleanup(), cleanup_timer_ids_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, data_dir_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::find(), ACE_Hash_Map_With_Allocator< class, class >::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), OpenDDS::DCPS::DataSampleElement::get_sample(), DDS::DurabilityServiceQosPolicy::history_depth, DDS::DurabilityServiceQosPolicy::history_kind, DDS::KEEP_ALL_HISTORY_QOS, kind_, len, DDS::LENGTH_UNLIMITED, LM_DEBUG, LM_ERROR, lock_, ACE_Allocator::malloc(), DDS::DurabilityServiceQosPolicy::max_samples_per_instance, DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, samples_, ACE_Reactor_Timer_Interface::schedule_timer(), DDS::Time_t::sec, DDS::DurabilityServiceQosPolicy::service_cleanup_delay, OpenDDS::DCPS::SendStateDataSampleList::size(), OpenDDS::DCPS::DataSampleHeader::test_flag(), timestamp(), OpenDDS::DCPS::to_dds_string(), and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::WriteDataContainer::persist_data().

00477 {
00478   if (the_data.size() == 0)
00479     return true;  // Nothing to cache.
00480 
00481   // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related
00482   // settings prior to data insertion into the cache.
00483   int depth = qos.history_kind == DDS::KEEP_ALL_HISTORY_QOS
00484     ? qos.max_samples_per_instance
00485     : qos.history_depth;
00486 
00487   if (depth == DDS::LENGTH_UNLIMITED)
00488     depth = 0x7fffffff;
00489 
00490   // Iterator to first DataSampleElement to be copied.
00491   SendStateDataSampleList::iterator element(the_data.begin());
00492 
00493   if (depth < 0)
00494     return false; // Should never occur.
00495 
00496   else if (depth == 0)
00497     return true;  // Nothing else to do.  Discard all data.
00498 
00499   else if (the_data.size() > depth) {
00500     // N.B. Dropping data samples does not take into account
00501     // those samples which are not actually persisted (i.e.
00502     // samples with the coherent_sample_ flag set). The spec
00503     // does not provide any guidance in this case, therefore
00504     // we opt for the simplest solution and assume that there
00505     // are no change sets when calculating the number of
00506     // samples to drop.
00507 
00508     // Drop "old" samples.  Only keep the "depth" most recent
00509     // samples, i.e. those found at the tail end of the
00510     // SendStateDataSampleList.
00511     ssize_t const advance_amount = the_data.size() - depth;
00512     std::advance(element, advance_amount);
00513   }
00514 
00515   // -----------
00516 
00517   // Copy samples to the domain/topic/type-specific cache.
00518 
00519   key_type const key(domain_id,
00520                      topic_name,
00521                      type_name,
00522                      this->allocator_.get());
00523   SendStateDataSampleList::iterator the_end(the_data.end());
00524   sample_list_type * sample_list = 0;
00525 
00526   typedef DurabilityQueue<sample_data_type> data_queue_type;
00527   data_queue_type ** slot = 0;
00528   data_queue_type * samples = 0;  // sample_list_type::value_type
00529 
00530   using OpenDDS::FileSystemStorage::Directory;
00531   using OpenDDS::FileSystemStorage::File;
00532   Directory::Ptr dir;
00533   OPENDDS_VECTOR(OPENDDS_STRING) path;
00534   {
00535     ACE_Allocator * const allocator = this->allocator_.get();
00536 
00537     ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00538 
00539     if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00540       try {
00541         dir = Directory::create(this->data_dir_.c_str());
00542 
00543         path.push_back(to_dds_string(domain_id));
00544         path.push_back(topic_name);
00545         path.push_back(type_name);
00546         dir = dir->get_dir(path);
00547         // dir is now the "type" directory, which is shared by all datawriters
00548         // of the domain/topic/type.  We actually need a new directory per
00549         // datawriter and this assumes that insert() is called once per
00550         // datawriter, as is currently the case.
00551         dir = dir->create_next_dir();
00552         path.push_back(dir->name());   // for use by the Cleanup_Handler
00553 
00554       } catch (const std::exception& ex) {
00555         if (DCPS_debug_level > 0) {
00556           ACE_ERROR((LM_ERROR,
00557                      ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00558                      ACE_TEXT("couldn't create directory for PERSISTENT ")
00559                      ACE_TEXT("data: %C\n"), ex.what()));
00560         }
00561 
00562         dir.reset();
00563       }
00564     }
00565 
00566     if (this->samples_->find(key, sample_list, allocator) != 0) {
00567       // Create a new list (actually an ACE_Array_Base<>) with the
00568       // appropriate allocator passed to its constructor.
00569       ACE_NEW_MALLOC_RETURN(
00570         sample_list,
00571         static_cast<sample_list_type *>(
00572           allocator->malloc(sizeof(sample_list_type))),
00573         sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
00574         false);
00575 
00576       if (this->samples_->bind(key, sample_list, allocator) != 0)
00577         return false;
00578     }
00579 
00580     data_queue_type ** const begin = &((*sample_list)[0]);
00581     data_queue_type ** const end =
00582       begin + sample_list->size();
00583 
00584     // Find an empty slot in the array.  This is a linear search but
00585     // that should be fine for the common case, i.e. a small number of
00586     // DataWriters that push data into the cache.
00587     slot = std::find(begin,
00588                      end,
00589                      static_cast<data_queue_type *>(0));
00590 
00591     if (slot == end) {
00592       // No available slots.  Grow the array accordingly.
00593       size_t const old_len = sample_list->size();
00594       sample_list->size(old_len + 1);
00595 
00596       data_queue_type ** new_begin = &((*sample_list)[0]);
00597       slot = new_begin + old_len;
00598     }
00599 
00600     ACE_NEW_MALLOC_RETURN(
00601       samples,
00602       static_cast<data_queue_type *>(
00603         allocator->malloc(sizeof(data_queue_type))),
00604       data_queue_type(allocator),
00605       false);
00606 
00607     // Insert the samples in to the sample list.
00608     *slot = samples;
00609 
00610     if (!dir.is_nil()) {
00611       samples->fs_path_ = path;
00612     }
00613 
00614     for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
00615       DataSampleElement& elem = *i;
00616 
00617       // N.B. Do not persist samples with coherent changes.
00618       // To verify, we check the DataSampleHeader for the
00619       // coherent_change_ flag. The DataSampleHeader will
00620       // always be the first message block in the chain.
00621       //
00622       // It should be noted that persisting coherent changes
00623       // is a non-trivial task, and should be handled when
00624       // finalizing persistence profile conformance.
00625       if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) {
00626         continue; // skip coherent sample
00627       }
00628 
00629       sample_data_type sample(elem, allocator);
00630 
00631       if (samples->enqueue_tail(sample) != 0)
00632         return false;
00633 
00634       if (!dir.is_nil()) {
00635         try {
00636           File::Ptr f = dir->create_next_file();
00637           std::ofstream os;
00638 
00639           if (!f->write(os)) return false;
00640 
00641           DDS::Time_t timestamp;
00642           const char * data;
00643           size_t len;
00644           sample.get_sample(data, len, timestamp);
00645 
00646           os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
00647           os.write(data, len);
00648 
00649         } catch (const std::exception& ex) {
00650           if (DCPS_debug_level > 0) {
00651             ACE_ERROR((LM_ERROR,
00652                        ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00653                        ACE_TEXT("couldn't write sample for PERSISTENT ")
00654                        ACE_TEXT("data: %C\n"), ex.what()));
00655           }
00656         }
00657       }
00658     }
00659   }
00660 
00661   // -----------
00662 
00663   // Schedule cleanup timer.
00664   //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent)
00665   ACE_Time_Value const cleanup_delay(
00666     duration_to_time_value(qos.service_cleanup_delay));
00667 
00668   if (cleanup_delay > ACE_Time_Value::zero) {
00669     if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00670       ACE_DEBUG((LM_DEBUG,
00671                  ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
00672                  ACE_TEXT("cleanup for\n")
00673                  ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
00674                  ACE_TEXT("== (%d, %C, %C)\n"),
00675                  domain_id,
00676                  topic_name,
00677                  type_name));
00678     }
00679 
00680     Cleanup_Handler * const cleanup =
00681       new Cleanup_Handler(*sample_list,
00682                           slot - &(*sample_list)[0],
00683                           this->allocator_.get(),
00684                           path,
00685                           this->data_dir_);
00686     ACE_Event_Handler_var safe_cleanup(cleanup);   // Transfer ownership
00687     long const tid =
00688       this->reactor_->schedule_timer(cleanup,
00689                                      0, // ACT
00690                                      cleanup_delay);
00691     if (tid == -1) {
00692       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00693 
00694       ACE_DES_FREE(samples,
00695                    this->allocator_->free,
00696                    DurabilityQueue<sample_data_type>);
00697       *slot = 0;
00698 
00699       return false;
00700 
00701     } else {
00702       {
00703         ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00704         this->cleanup_timer_ids_.push_back(tid);
00705       }
00706 
00707       cleanup->timer_id(tid,
00708                         &this->cleanup_timer_ids_);
00709     }
00710   }
00711 
00712   return true;
00713 }

Here is the call graph for this function:

Here is the caller graph for this function:

typedef OpenDDS::DCPS::DataDurabilityCache::OPENDDS_LIST ( long   ) 
DataDurabilityCache& OpenDDS::DCPS::DataDurabilityCache::operator= ( DataDurabilityCache const &   )  [private]

Member Data Documentation

Allocator used to allocate memory for sample map and lists.

Definition at line 227 of file DataDurabilityCache.h.

Referenced by get_data(), init(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::init(), insert(), and ~DataDurabilityCache().

Timer ID list.

Keep track of cleanup timer IDs in case we need to cancel before they expire.

Definition at line 241 of file DataDurabilityCache.h.

Referenced by insert(), and ~DataDurabilityCache().

Definition at line 231 of file DataDurabilityCache.h.

Referenced by init(), and insert().

Definition at line 229 of file DataDurabilityCache.h.

Referenced by init(), and insert().

ACE_SYNCH_MUTEX OpenDDS::DCPS::DataDurabilityCache::lock_ [private]

Lock for synchronized access to the underlying map.

Definition at line 244 of file DataDurabilityCache.h.

Referenced by get_data(), and insert().

Reactor with which cleanup timers will be registered.

Definition at line 247 of file DataDurabilityCache.h.

Referenced by init(), insert(), and ~DataDurabilityCache().

Map of all data samples.

Definition at line 234 of file DataDurabilityCache.h.

Referenced by get_data(), init(), insert(), and ~DataDurabilityCache().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1