DataDurabilityCache.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00011 
00012 #include "dds/DdsDcpsDomainC.h"
00013 #include "dds/DdsDcpsTypeSupportExtC.h"
00014 #include "DataDurabilityCache.h"
00015 #include "SendStateDataSampleList.h"
00016 #include "DataSampleElement.h"
00017 #include "WriteDataContainer.h"
00018 #include "DataWriterImpl.h"
00019 #include "Time_Helper.h"
00020 #include "debug.h"
00021 #include "SafetyProfileStreams.h"
00022 #include "Service_Participant.h"
00023 #include "RcEventHandler.h"
00024 
00025 #include "ace/Reactor.h"
00026 #include "ace/Message_Block.h"
00027 #include "ace/Log_Msg.h"
00028 #include "ace/Malloc_T.h"
00029 #include "ace/MMAP_Memory_Pool.h"
00030 #include "ace/OS_NS_sys_time.h"
00031 
00032 #include <fstream>
00033 #include <algorithm>
00034 
00035 namespace {
00036 
00037 void cleanup_directory(const OPENDDS_VECTOR(OPENDDS_STRING) & path,
00038                        const ACE_CString & data_dir)
00039 {
00040   if (path.empty()) return;
00041 
00042   using OpenDDS::FileSystemStorage::Directory;
00043   Directory::Ptr dir = Directory::create(data_dir.c_str());
00044   dir = dir->get_dir(path);
00045   Directory::Ptr parent = dir->parent();
00046   dir->remove();
00047 
00048   // clean up empty directories
00049   while (!parent.is_nil() &&
00050          (parent->begin_dirs() == parent->end_dirs())) {
00051     Directory::Ptr to_delete = parent;
00052     parent = parent->parent();
00053     to_delete->remove();
00054   }
00055 }
00056 
00057 /**
00058  * @class Cleanup_Handler
00059  *
00060  * @brief Event handler that is called when @c service_cleanup_delay
00061  *        period expires.
00062  */
00063 class Cleanup_Handler : public OpenDDS::DCPS::RcEventHandler {
00064 public:
00065 
00066   typedef
00067   OpenDDS::DCPS::DataDurabilityCache::sample_data_type data_type;
00068   typedef
00069   OpenDDS::DCPS::DataDurabilityCache::sample_list_type list_type;
00070   typedef ptrdiff_t list_difference_type;
00071 
00072   Cleanup_Handler(list_type & sample_list,
00073                   list_difference_type index,
00074                   ACE_Allocator * allocator,
00075                   const OPENDDS_VECTOR(OPENDDS_STRING) & path,
00076                   const ACE_CString & data_dir)
00077   : sample_list_(sample_list)
00078   , index_(index)
00079   , allocator_(allocator)
00080   , tid_(-1)
00081   , timer_ids_(0)
00082   , path_(path)
00083   , data_dir_(data_dir)
00084   {
00085   }
00086 
00087   virtual int handle_timeout(ACE_Time_Value const & /* current_time */,
00088                              void const * /* act */) {
00089     if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00090       ACE_DEBUG((LM_DEBUG,
00091                  ACE_TEXT("(%P|%t) OpenDDS - Cleaning up ")
00092                  ACE_TEXT("data durability cache.\n")));
00093     }
00094 
00095     typedef OpenDDS::DCPS::DurabilityQueue<
00096     OpenDDS::DCPS::DataDurabilityCache::sample_data_type>
00097     data_queue_type;
00098 
00099     // Cleanup all data samples corresponding to the cleanup delay.
00100     data_queue_type *& queue = this->sample_list_[this->index_];
00101     ACE_DES_FREE(queue,
00102                  this->allocator_->free,
00103                  data_queue_type);
00104     queue = 0;
00105 
00106     try {
00107       cleanup_directory(path_, this->data_dir_);
00108 
00109     } catch (const std::exception& ex) {
00110       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00111         ACE_ERROR((LM_ERROR,
00112                    ACE_TEXT("(%P|%t) Cleanup_Handler::handle_timout ")
00113                    ACE_TEXT("couldn't remove directory for PERSISTENT ")
00114                    ACE_TEXT("data: %C\n"), ex.what()));
00115       }
00116     }
00117 
00118     // No longer any need to keep track of the timer ID.
00119     this->timer_ids_->remove(this->tid_);
00120 
00121     return 0;
00122   }
00123 
00124   void timer_id(
00125     long tid,
00126     OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type * timer_ids) {
00127     this->tid_ = tid;
00128     this->timer_ids_ = timer_ids;
00129   }
00130 
00131 protected:
00132 
00133   virtual ~Cleanup_Handler() {}
00134 
00135 private:
00136 
00137   /// List containing samples to be cleaned up when the cleanup timer
00138   /// expires.
00139   list_type & sample_list_;
00140 
00141   /// Location in list/array of queue to be deallocated.
00142   list_difference_type const index_;
00143 
00144   /// Allocator to be used when deallocating data queue.
00145   ACE_Allocator * const allocator_;
00146 
00147   /// Timer ID corresponding to this cleanup event handler.
00148   long tid_;
00149 
00150   /// List of timer IDs.
00151   /**
00152    * If the cleanup timer fires successfully, the timer ID must be
00153    * removed from the timer ID list so that a subsequent attempt to
00154    * cancel the timer during durability cache destruction does not
00155    * occur.
00156    */
00157   OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type *
00158   timer_ids_;
00159 
00160   OPENDDS_VECTOR(OPENDDS_STRING) path_;
00161 
00162   ACE_CString data_dir_;
00163 };
00164 
00165 } // namespace
00166 
00167 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type()
00168   : length_(0)
00169   , sample_(0)
00170   , allocator_(0)
00171 {
00172   this->source_timestamp_.sec = 0;
00173   this->source_timestamp_.nanosec = 0;
00174 }
00175 
00176 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00177   DataSampleElement & element,
00178   ACE_Allocator * a)
00179   : length_(0)
00180   , sample_(0)
00181   , allocator_(a)
00182 {
00183   this->source_timestamp_.sec     = element.get_header().source_timestamp_sec_;
00184   this->source_timestamp_.nanosec = element.get_header().source_timestamp_nanosec_;
00185 
00186   // Only copy the data provided by the user.  The DataSampleHeader
00187   // will be reconstructed when the durable data is retrieved by a
00188   // DataWriterImpl instance.
00189   //
00190   // The user's data is stored in the first message block
00191   // continuation.
00192   ACE_Message_Block const * const data = element.get_sample()->cont();
00193   init(data);
00194 }
00195 
00196 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00197   DDS::Time_t timestamp, const ACE_Message_Block & mb, ACE_Allocator * a)
00198   : length_(0)
00199   , sample_(0)
00200   , source_timestamp_(timestamp)
00201   , allocator_(a)
00202 {
00203   init(&mb);
00204 }
00205 
00206 void
00207 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::init
00208 (const ACE_Message_Block * data)
00209 {
00210   this->length_ = data->total_length();
00211 
00212   ACE_ALLOCATOR(this->sample_,
00213                 static_cast<char *>(
00214                   this->allocator_->malloc(this->length_)));
00215 
00216   char * buf = this->sample_;
00217 
00218   for (ACE_Message_Block const * i = data;
00219        i != 0;
00220        i = i->cont()) {
00221     ACE_OS::memcpy(buf, i->rd_ptr(), i->length());
00222     buf += i->length();
00223   }
00224 }
00225 
00226 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00227   sample_data_type const & rhs)
00228   : length_(rhs.length_)
00229   , sample_(0)
00230   , allocator_(rhs.allocator_)
00231 {
00232   this->source_timestamp_.sec     = rhs.source_timestamp_.sec;
00233   this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
00234 
00235   if (this->allocator_) {
00236     ACE_ALLOCATOR(this->sample_,
00237                   static_cast<char *>(
00238                     this->allocator_->malloc(rhs.length_)));
00239     ACE_OS::memcpy(this->sample_, rhs.sample_, rhs.length_);
00240   }
00241 }
00242 
00243 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::~sample_data_type()
00244 {
00245   if (this->allocator_)
00246     this->allocator_->free(this->sample_);
00247 }
00248 
00249 OpenDDS::DCPS::DataDurabilityCache::sample_data_type &
00250 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::operator= (
00251   sample_data_type const & rhs)
00252 {
00253   // Strongly exception-safe copy assignment.
00254   sample_data_type tmp(rhs);
00255   std::swap(this->length_, tmp.length_);
00256   std::swap(this->sample_, tmp.sample_);
00257   std::swap(this->allocator_, tmp.allocator_);
00258 
00259   this->source_timestamp_.sec     = rhs.source_timestamp_.sec;
00260   this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
00261 
00262   return *this;
00263 }
00264 
00265 void
00266 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(
00267   char const *& s,
00268   size_t & len,
00269   DDS::Time_t & source_timestamp)
00270 {
00271   s = this->sample_;
00272   len = this->length_;
00273   source_timestamp.sec     = this->source_timestamp_.sec;
00274   source_timestamp.nanosec = this->source_timestamp_.nanosec;
00275 }
00276 
00277 void
00278 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::set_allocator(
00279   ACE_Allocator * allocator)
00280 {
00281   this->allocator_ = allocator;
00282 }
00283 
00284 OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
00285   DDS::DurabilityQosPolicyKind kind)
00286   : allocator_(new ACE_New_Allocator)
00287   , kind_(kind)
00288   , samples_(0)
00289   , cleanup_timer_ids_()
00290   , lock_()
00291   , reactor_(0)
00292 {
00293   init();
00294 }
00295 
00296 OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
00297   DDS::DurabilityQosPolicyKind kind,
00298   ACE_CString & data_dir)
00299   : allocator_(new ACE_New_Allocator)
00300   , kind_(kind)
00301   , data_dir_(data_dir)
00302   , samples_(0)
00303   , cleanup_timer_ids_()
00304   , lock_()
00305   , reactor_(0)
00306 {
00307   init();
00308 }
00309 
00310 void OpenDDS::DCPS::DataDurabilityCache::init()
00311 {
00312   ACE_Allocator * const allocator = this->allocator_.get();
00313   ACE_NEW_MALLOC(
00314     this->samples_,
00315     static_cast<sample_map_type *>(
00316       allocator->malloc(sizeof(sample_map_type))),
00317     sample_map_type(allocator));
00318 
00319   typedef DurabilityQueue<sample_data_type> data_queue_type;
00320 
00321   if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00322     // Read data from the filesystem and create the in-memory data structures
00323     // as if we had called insert() once for each "datawriter" directory.
00324     using OpenDDS::FileSystemStorage::Directory;
00325     using OpenDDS::FileSystemStorage::File;
00326     Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
00327     OPENDDS_VECTOR(OPENDDS_STRING) path(4);  // domain, topic, type, datawriter
00328 
00329     for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
00330          domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
00331       path[0] = domain->name();
00332       DDS::DomainId_t domain_id;
00333       {
00334         domain_id = ACE_OS::atoi(path[0].c_str());
00335       }
00336 
00337       for (Directory::DirectoryIterator topic = domain->begin_dirs(),
00338            topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
00339         path[1] = topic->name();
00340 
00341         for (Directory::DirectoryIterator type = topic->begin_dirs(),
00342              type_end = topic->end_dirs(); type != type_end; ++type) {
00343           path[2] = type->name();
00344 
00345           key_type key(domain_id, path[1].c_str(), path[2].c_str(),
00346                        allocator);
00347           sample_list_type * sample_list = 0;
00348           ACE_NEW_MALLOC(sample_list,
00349                          static_cast<sample_list_type *>(
00350                            allocator->malloc(sizeof(sample_list_type))),
00351                          sample_list_type(0, static_cast<data_queue_type *>(0),
00352                                           allocator));
00353           this->samples_->bind(key, sample_list, allocator);
00354 
00355           for (Directory::DirectoryIterator dw = type->begin_dirs(),
00356                dw_end = type->end_dirs(); dw != dw_end; ++dw) {
00357             path[3] = dw->name();
00358 
00359             size_t old_len = sample_list->size();
00360             sample_list->size(old_len + 1);
00361             data_queue_type *& slot = (*sample_list)[old_len];
00362 
00363             // This variable is called "samples" in the insert() method be
00364             // we already have a "samples_" which is the overall data structure.
00365             data_queue_type * sample_queue = 0;
00366             ACE_NEW_MALLOC(sample_queue,
00367                            static_cast<data_queue_type *>(
00368                              allocator->malloc(sizeof(data_queue_type))),
00369                            data_queue_type(allocator));
00370 
00371             slot = sample_queue;
00372             sample_queue->fs_path_ = path;
00373 
00374             for (Directory::FileIterator file = dw->begin_files(),
00375                  file_end = dw->end_files(); file != file_end; ++file) {
00376               std::ifstream is;
00377 
00378               if (!file->read(is)) {
00379                 if (DCPS_debug_level) {
00380                   ACE_ERROR((LM_ERROR,
00381                              ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
00382                              ACE_TEXT("couldn't open file for PERSISTENT ")
00383                              ACE_TEXT("data: %C\n"), file->name().c_str()));
00384                 }
00385                 continue;
00386               }
00387 
00388               DDS::Time_t timestamp;
00389               is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
00390               is.get(); // consume separator
00391 
00392               const size_t CHUNK = 4096;
00393               ACE_Message_Block mb(CHUNK);
00394               ACE_Message_Block * current = &mb;
00395 
00396               while (!is.eof()) {
00397                 is.read(current->wr_ptr(), current->space());
00398 
00399                 if (is.bad()) break;
00400 
00401                 current->wr_ptr((size_t)is.gcount());
00402 
00403                 if (current->space() == 0) {
00404                   ACE_Message_Block * old = current;
00405                   current = new ACE_Message_Block(CHUNK);
00406                   old->cont(current);
00407                 }
00408               }
00409 
00410               sample_queue->enqueue_tail(
00411                 sample_data_type(timestamp, mb, allocator));
00412 
00413               if (mb.cont()) mb.cont()->release();    // delete the cont() chain
00414             }
00415           }
00416         }
00417       }
00418     }
00419   }
00420 
00421   this->reactor_ = TheServiceParticipant->timer();
00422 }
00423 
00424 OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache()
00425 {
00426   // Cancel timers that haven't expired yet.
00427   timer_id_list_type::const_iterator const end(
00428     this->cleanup_timer_ids_.end());
00429 
00430   for (timer_id_list_type::const_iterator i(
00431          this->cleanup_timer_ids_.begin());
00432        i != end;
00433        ++i) {
00434     (void) this->reactor_->cancel_timer(*i);
00435   }
00436 
00437   // Clean up memory that isn't automatically managed.
00438   if (this->allocator_.get() != 0) {
00439     sample_map_type::iterator const map_end = this->samples_->end();
00440 
00441     for (sample_map_type::iterator s = this->samples_->begin();
00442          s != map_end;
00443          ++s) {
00444       sample_list_type * const list = (*s).int_id_;
00445 
00446       size_t const len = list->size();
00447 
00448       for (size_t l = 0; l != len; ++l) {
00449         ACE_DES_FREE((*list)[l],
00450                      this->allocator_->free,
00451                      DurabilityQueue<sample_data_type>);
00452       }
00453 
00454       ACE_DES_FREE(list,
00455                    this->allocator_->free,
00456                    DurabilityArray<DurabilityQueue<sample_data_type> *>);
00457     }
00458 
00459     // Yes, this looks strange but please leave it in place.  The third param
00460     // to ACE_DES_FREE must be the actual class name since it's used in an
00461     // explicit desturctor call (~T()).  Typedefs are not allowed here.  This
00462     // is why the two ACE_DES_FREE's above are not the typedefs.  Below we use
00463     // a macro to hide the internal comma from ACE_DES_FREE's macro expansion.
00464 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
00465     ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE);
00466 #undef OPENDDS_MAP_TYPE
00467   }
00468 }
00469 
00470 bool
00471 OpenDDS::DCPS::DataDurabilityCache::insert(
00472   DDS::DomainId_t domain_id,
00473   char const * topic_name,
00474   char const * type_name,
00475   SendStateDataSampleList & the_data,
00476   DDS::DurabilityServiceQosPolicy const & qos)
00477 {
00478   if (the_data.size() == 0)
00479     return true;  // Nothing to cache.
00480 
00481   // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related
00482   // settings prior to data insertion into the cache.
00483   int depth = qos.history_kind == DDS::KEEP_ALL_HISTORY_QOS
00484     ? qos.max_samples_per_instance
00485     : qos.history_depth;
00486 
00487   if (depth == DDS::LENGTH_UNLIMITED)
00488     depth = 0x7fffffff;
00489 
00490   // Iterator to first DataSampleElement to be copied.
00491   SendStateDataSampleList::iterator element(the_data.begin());
00492 
00493   if (depth < 0)
00494     return false; // Should never occur.
00495 
00496   else if (depth == 0)
00497     return true;  // Nothing else to do.  Discard all data.
00498 
00499   else if (the_data.size() > depth) {
00500     // N.B. Dropping data samples does not take into account
00501     // those samples which are not actually persisted (i.e.
00502     // samples with the coherent_sample_ flag set). The spec
00503     // does not provide any guidance in this case, therefore
00504     // we opt for the simplest solution and assume that there
00505     // are no change sets when calculating the number of
00506     // samples to drop.
00507 
00508     // Drop "old" samples.  Only keep the "depth" most recent
00509     // samples, i.e. those found at the tail end of the
00510     // SendStateDataSampleList.
00511     ssize_t const advance_amount = the_data.size() - depth;
00512     std::advance(element, advance_amount);
00513   }
00514 
00515   // -----------
00516 
00517   // Copy samples to the domain/topic/type-specific cache.
00518 
00519   key_type const key(domain_id,
00520                      topic_name,
00521                      type_name,
00522                      this->allocator_.get());
00523   SendStateDataSampleList::iterator the_end(the_data.end());
00524   sample_list_type * sample_list = 0;
00525 
00526   typedef DurabilityQueue<sample_data_type> data_queue_type;
00527   data_queue_type ** slot = 0;
00528   data_queue_type * samples = 0;  // sample_list_type::value_type
00529 
00530   using OpenDDS::FileSystemStorage::Directory;
00531   using OpenDDS::FileSystemStorage::File;
00532   Directory::Ptr dir;
00533   OPENDDS_VECTOR(OPENDDS_STRING) path;
00534   {
00535     ACE_Allocator * const allocator = this->allocator_.get();
00536 
00537     ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00538 
00539     if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00540       try {
00541         dir = Directory::create(this->data_dir_.c_str());
00542 
00543         path.push_back(to_dds_string(domain_id));
00544         path.push_back(topic_name);
00545         path.push_back(type_name);
00546         dir = dir->get_dir(path);
00547         // dir is now the "type" directory, which is shared by all datawriters
00548         // of the domain/topic/type.  We actually need a new directory per
00549         // datawriter and this assumes that insert() is called once per
00550         // datawriter, as is currently the case.
00551         dir = dir->create_next_dir();
00552         path.push_back(dir->name());   // for use by the Cleanup_Handler
00553 
00554       } catch (const std::exception& ex) {
00555         if (DCPS_debug_level > 0) {
00556           ACE_ERROR((LM_ERROR,
00557                      ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00558                      ACE_TEXT("couldn't create directory for PERSISTENT ")
00559                      ACE_TEXT("data: %C\n"), ex.what()));
00560         }
00561 
00562         dir.reset();
00563       }
00564     }
00565 
00566     if (this->samples_->find(key, sample_list, allocator) != 0) {
00567       // Create a new list (actually an ACE_Array_Base<>) with the
00568       // appropriate allocator passed to its constructor.
00569       ACE_NEW_MALLOC_RETURN(
00570         sample_list,
00571         static_cast<sample_list_type *>(
00572           allocator->malloc(sizeof(sample_list_type))),
00573         sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
00574         false);
00575 
00576       if (this->samples_->bind(key, sample_list, allocator) != 0)
00577         return false;
00578     }
00579 
00580     data_queue_type ** const begin = &((*sample_list)[0]);
00581     data_queue_type ** const end =
00582       begin + sample_list->size();
00583 
00584     // Find an empty slot in the array.  This is a linear search but
00585     // that should be fine for the common case, i.e. a small number of
00586     // DataWriters that push data into the cache.
00587     slot = std::find(begin,
00588                      end,
00589                      static_cast<data_queue_type *>(0));
00590 
00591     if (slot == end) {
00592       // No available slots.  Grow the array accordingly.
00593       size_t const old_len = sample_list->size();
00594       sample_list->size(old_len + 1);
00595 
00596       data_queue_type ** new_begin = &((*sample_list)[0]);
00597       slot = new_begin + old_len;
00598     }
00599 
00600     ACE_NEW_MALLOC_RETURN(
00601       samples,
00602       static_cast<data_queue_type *>(
00603         allocator->malloc(sizeof(data_queue_type))),
00604       data_queue_type(allocator),
00605       false);
00606 
00607     // Insert the samples in to the sample list.
00608     *slot = samples;
00609 
00610     if (!dir.is_nil()) {
00611       samples->fs_path_ = path;
00612     }
00613 
00614     for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
00615       DataSampleElement& elem = *i;
00616 
00617       // N.B. Do not persist samples with coherent changes.
00618       // To verify, we check the DataSampleHeader for the
00619       // coherent_change_ flag. The DataSampleHeader will
00620       // always be the first message block in the chain.
00621       //
00622       // It should be noted that persisting coherent changes
00623       // is a non-trivial task, and should be handled when
00624       // finalizing persistence profile conformance.
00625       if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) {
00626         continue; // skip coherent sample
00627       }
00628 
00629       sample_data_type sample(elem, allocator);
00630 
00631       if (samples->enqueue_tail(sample) != 0)
00632         return false;
00633 
00634       if (!dir.is_nil()) {
00635         try {
00636           File::Ptr f = dir->create_next_file();
00637           std::ofstream os;
00638 
00639           if (!f->write(os)) return false;
00640 
00641           DDS::Time_t timestamp;
00642           const char * data;
00643           size_t len;
00644           sample.get_sample(data, len, timestamp);
00645 
00646           os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
00647           os.write(data, len);
00648 
00649         } catch (const std::exception& ex) {
00650           if (DCPS_debug_level > 0) {
00651             ACE_ERROR((LM_ERROR,
00652                        ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00653                        ACE_TEXT("couldn't write sample for PERSISTENT ")
00654                        ACE_TEXT("data: %C\n"), ex.what()));
00655           }
00656         }
00657       }
00658     }
00659   }
00660 
00661   // -----------
00662 
00663   // Schedule cleanup timer.
00664   //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent)
00665   ACE_Time_Value const cleanup_delay(
00666     duration_to_time_value(qos.service_cleanup_delay));
00667 
00668   if (cleanup_delay > ACE_Time_Value::zero) {
00669     if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00670       ACE_DEBUG((LM_DEBUG,
00671                  ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
00672                  ACE_TEXT("cleanup for\n")
00673                  ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
00674                  ACE_TEXT("== (%d, %C, %C)\n"),
00675                  domain_id,
00676                  topic_name,
00677                  type_name));
00678     }
00679 
00680     Cleanup_Handler * const cleanup =
00681       new Cleanup_Handler(*sample_list,
00682                           slot - &(*sample_list)[0],
00683                           this->allocator_.get(),
00684                           path,
00685                           this->data_dir_);
00686     ACE_Event_Handler_var safe_cleanup(cleanup);   // Transfer ownership
00687     long const tid =
00688       this->reactor_->schedule_timer(cleanup,
00689                                      0, // ACT
00690                                      cleanup_delay);
00691     if (tid == -1) {
00692       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00693 
00694       ACE_DES_FREE(samples,
00695                    this->allocator_->free,
00696                    DurabilityQueue<sample_data_type>);
00697       *slot = 0;
00698 
00699       return false;
00700 
00701     } else {
00702       {
00703         ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00704         this->cleanup_timer_ids_.push_back(tid);
00705       }
00706 
00707       cleanup->timer_id(tid,
00708                         &this->cleanup_timer_ids_);
00709     }
00710   }
00711 
00712   return true;
00713 }
00714 
00715 bool
00716 OpenDDS::DCPS::DataDurabilityCache::get_data(
00717   DDS::DomainId_t domain_id,
00718   char const * topic_name,
00719   char const * type_name,
00720   DataWriterImpl * data_writer,
00721   ACE_Allocator * mb_allocator,
00722   ACE_Allocator * db_allocator,
00723   DDS::LifespanQosPolicy const & /* lifespan */)
00724 {
00725   key_type const key(domain_id,
00726                      topic_name,
00727                      type_name,
00728                      this->allocator_.get());
00729 
00730   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00731 
00732   sample_list_type * p_sample_list = 0;
00733 
00734   if (this->samples_->find(key,
00735                            p_sample_list,
00736                            this->allocator_.get()) == -1)
00737     return true;  // No durable data for this domain/topic/type.
00738 
00739   else if (p_sample_list == 0)
00740     return false; // Should never happen.
00741 
00742   sample_list_type & sample_list = *p_sample_list;
00743 
00744   // We will register an instance, and then write all of the cached
00745   // data to the DataWriter using that instance.
00746 
00747   sample_data_type * registration_data = 0;
00748 
00749   if (sample_list[0]->get(registration_data, 0) == -1)
00750     return false;
00751 
00752   char const * marshaled_sample = 0;
00753   size_t marshaled_sample_length = 0;
00754   DDS::Time_t registration_timestamp;
00755 
00756   registration_data->get_sample(marshaled_sample,
00757                                 marshaled_sample_length,
00758                                 registration_timestamp);
00759 
00760   // Don't use the cached allocator for the registered sample message
00761   // block.
00762   Message_Block_Ptr registration_sample(
00763     new ACE_Message_Block(marshaled_sample_length,
00764                           ACE_Message_Block::MB_DATA,
00765                           0, //cont
00766                           0, //data
00767                           0, //alloc_strategy
00768                           data_writer->get_db_lock()));
00769 
00770   ACE_OS::memcpy(registration_sample->wr_ptr(),
00771                  marshaled_sample,
00772                  marshaled_sample_length);
00773 
00774   registration_sample->wr_ptr(marshaled_sample_length);
00775 
00776   DDS::InstanceHandle_t handle = DDS::HANDLE_NIL;
00777 
00778   /**
00779    * @todo Is this going to cause problems for users that set a finite
00780    *       DDS::ResourceLimitsQosPolicy::max_instances value when
00781    *       OpenDDS supports that value?
00782    */
00783   DDS::ReturnCode_t ret =
00784     data_writer->register_instance_from_durable_data(handle,
00785                                      move(registration_sample),
00786                                      registration_timestamp);
00787 
00788   if (ret != DDS::RETCODE_OK)
00789     return false;
00790 
00791   typedef DurabilityQueue<sample_data_type> data_queue_type;
00792   size_t const len = sample_list.size();
00793 
00794   for (size_t i = 0; i != len; ++i) {
00795     data_queue_type * const q = sample_list[i];
00796 
00797     for (data_queue_type::ITERATOR j = q->begin();
00798          !j.done();
00799          j.advance()) {
00800       sample_data_type * data = 0;
00801 
00802       if (j.next(data) == 0)
00803         return false;  // Should never happen.
00804 
00805       char const * sample = 0;  // Sample does not include header.
00806       size_t sample_length = 0;
00807       DDS::Time_t source_timestamp;
00808 
00809       data->get_sample(sample, sample_length, source_timestamp);
00810 
00811       ACE_Message_Block * tmp_mb = 0;
00812       ACE_NEW_MALLOC_RETURN(tmp_mb,
00813                             static_cast<ACE_Message_Block*>(
00814                               mb_allocator->malloc(
00815                                 sizeof(ACE_Message_Block))),
00816                             ACE_Message_Block(
00817                               sample_length,
00818                               ACE_Message_Block::MB_DATA,
00819                               0, // cont
00820                               0, // data
00821                               0, // allocator_strategy
00822                               data_writer->get_db_lock(), // data block locking_strategy
00823                               ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00824                               ACE_Time_Value::zero,
00825                               ACE_Time_Value::max_time,
00826                               db_allocator,
00827                               mb_allocator),
00828                             false);
00829       Message_Block_Ptr mb(tmp_mb);
00830 
00831       ACE_OS::memcpy(mb->wr_ptr(),
00832                      sample,
00833                      sample_length);
00834       mb->wr_ptr(sample_length);
00835 
00836       const DDS::ReturnCode_t ret = data_writer->write(move(mb), handle,
00837         source_timestamp, 0 /* no content filtering */);
00838 
00839       if (ret != DDS::RETCODE_OK) {
00840         return false;
00841       }
00842     }
00843 
00844     // Data successfully written.  Empty the queue/list.
00845     /**
00846      * @todo If we don't empty the queue, we'll end up with duplicate
00847      *       data since the data retrieved from the cache will be
00848      *       reinserted.
00849      */
00850     q->reset();
00851 
00852     try {
00853       cleanup_directory(q->fs_path_, this->data_dir_);
00854 
00855     } catch (const std::exception& ex) {
00856       if (DCPS_debug_level > 0) {
00857         ACE_ERROR((LM_ERROR,
00858                    ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
00859                    ACE_TEXT("couldn't remove directory for PERSISTENT ")
00860                    ACE_TEXT("data: %C\n"), ex.what()));
00861       }
00862     }
00863   }
00864   return true;
00865 }
00866 
00867 #endif // OPENDDS_NO_PERSISTENCE_PROFILE
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1