DataDurabilityCache.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00011 
00012 #include "ace/Condition_Recursive_Thread_Mutex.h"
00013 #include "DataDurabilityCache.h"
00014 #include "Service_Participant.h"
00015 #include "SendStateDataSampleList.h"
00016 #include "DataSampleElement.h"
00017 #include "WriteDataContainer.h"
00018 #include "DataWriterImpl.h"
00019 #include "Qos_Helper.h"
00020 #include "debug.h"
00021 #include "SafetyProfileStreams.h"
00022 
00023 #include "tao/ORB_Core.h"
00024 
00025 #include "ace/Reactor.h"
00026 #include "ace/Message_Block.h"
00027 #include "ace/Log_Msg.h"
00028 #include "ace/Malloc_T.h"
00029 #include "ace/MMAP_Memory_Pool.h"
00030 #include "ace/OS_NS_sys_time.h"
00031 
00032 #include <fstream>
00033 #include <algorithm>
00034 
00035 namespace {
00036 
00037 void cleanup_directory(const OPENDDS_VECTOR(OPENDDS_STRING) & path,
00038                        const ACE_CString & data_dir)
00039 {
00040   if (path.empty()) return;
00041 
00042   using OpenDDS::FileSystemStorage::Directory;
00043   Directory::Ptr dir = Directory::create(data_dir.c_str());
00044   dir = dir->get_dir(path);
00045   Directory::Ptr parent = dir->parent();
00046   dir->remove();
00047 
00048   // clean up empty directories
00049   while (!parent.is_nil() &&
00050          (parent->begin_dirs() == parent->end_dirs())) {
00051     Directory::Ptr to_delete = parent;
00052     parent = parent->parent();
00053     to_delete->remove();
00054   }
00055 }
00056 
00057 /**
00058  * @class Cleanup_Handler
00059  *
00060  * @brief Event handler that is called when @c service_cleanup_delay
00061  *        period expires.
00062  */
00063 class Cleanup_Handler : public ACE_Event_Handler {
00064 public:
00065 
00066   typedef
00067   OpenDDS::DCPS::DataDurabilityCache::sample_data_type data_type;
00068   typedef
00069   OpenDDS::DCPS::DataDurabilityCache::sample_list_type list_type;
00070   typedef ptrdiff_t list_difference_type;
00071 
00072   Cleanup_Handler(list_type & sample_list,
00073                   list_difference_type index,
00074                   ACE_Allocator * allocator,
00075                   const OPENDDS_VECTOR(OPENDDS_STRING) & path,
00076                   const ACE_CString & data_dir)
00077   : sample_list_(sample_list)
00078   , index_(index)
00079   , allocator_(allocator)
00080   , tid_(-1)
00081   , timer_ids_(0)
00082   , path_(path)
00083   , data_dir_(data_dir) {
00084     this->reference_counting_policy().value(
00085       ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00086   }
00087 
00088   virtual int handle_timeout(ACE_Time_Value const & /* current_time */,
00089                              void const * /* act */) {
00090     if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00091       ACE_DEBUG((LM_DEBUG,
00092                  ACE_TEXT("(%P|%t) OpenDDS - Cleaning up ")
00093                  ACE_TEXT("data durability cache.\n")));
00094     }
00095 
00096     typedef OpenDDS::DCPS::DurabilityQueue<
00097     OpenDDS::DCPS::DataDurabilityCache::sample_data_type>
00098     data_queue_type;
00099 
00100     // Cleanup all data samples corresponding to the cleanup delay.
00101     data_queue_type *& queue = this->sample_list_[this->index_];
00102     ACE_DES_FREE(queue,
00103                  this->allocator_->free,
00104                  data_queue_type);
00105     queue = 0;
00106 
00107     try {
00108       cleanup_directory(path_, this->data_dir_);
00109 
00110     } catch (const std::exception& ex) {
00111       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00112         ACE_ERROR((LM_ERROR,
00113                    ACE_TEXT("(%P|%t) Cleanup_Handler::handle_timout ")
00114                    ACE_TEXT("couldn't remove directory for PERSISTENT ")
00115                    ACE_TEXT("data: %C\n"), ex.what()));
00116       }
00117     }
00118 
00119     // No longer any need to keep track of the timer ID.
00120     this->timer_ids_->remove(this->tid_);
00121 
00122     return 0;
00123   }
00124 
00125   void timer_id(
00126     long tid,
00127     OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type * timer_ids) {
00128     this->tid_ = tid;
00129     this->timer_ids_ = timer_ids;
00130   }
00131 
00132 protected:
00133 
00134   virtual ~Cleanup_Handler() {}
00135 
00136 private:
00137 
00138   /// List containing samples to be cleaned up when the cleanup timer
00139   /// expires.
00140   list_type & sample_list_;
00141 
00142   /// Location in list/array of queue to be deallocated.
00143   list_difference_type const index_;
00144 
00145   /// Allocator to be used when deallocating data queue.
00146   ACE_Allocator * const allocator_;
00147 
00148   /// Timer ID corresponding to this cleanup event handler.
00149   long tid_;
00150 
00151   /// List of timer IDs.
00152   /**
00153    * If the cleanup timer fires successfully, the timer ID must be
00154    * removed from the timer ID list so that a subsequent attempt to
00155    * cancel the timer during durability cache destruction does not
00156    * occur.
00157    */
00158   OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type *
00159   timer_ids_;
00160 
00161   OPENDDS_VECTOR(OPENDDS_STRING) path_;
00162 
00163   ACE_CString data_dir_;
00164 };
00165 
00166 } // namespace
00167 
00168 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type()
00169   : length_(0)
00170   , sample_(0)
00171   , allocator_(0)
00172 {
00173   this->source_timestamp_.sec = 0;
00174   this->source_timestamp_.nanosec = 0;
00175 }
00176 
00177 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00178   DataSampleElement & element,
00179   ACE_Allocator * a)
00180   : length_(0)
00181   , sample_(0)
00182   , allocator_(a)
00183 {
00184   this->source_timestamp_.sec     = element.get_header().source_timestamp_sec_;
00185   this->source_timestamp_.nanosec = element.get_header().source_timestamp_nanosec_;
00186 
00187   // Only copy the data provided by the user.  The DataSampleHeader
00188   // will be reconstructed when the durable data is retrieved by a
00189   // DataWriterImpl instance.
00190   //
00191   // The user's data is stored in the first message block
00192   // continuation.
00193   ACE_Message_Block const * const data = element.get_sample()->cont();
00194   init(data);
00195 }
00196 
00197 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00198   DDS::Time_t timestamp, const ACE_Message_Block & mb, ACE_Allocator * a)
00199   : length_(0)
00200   , sample_(0)
00201   , source_timestamp_(timestamp)
00202   , allocator_(a)
00203 {
00204   init(&mb);
00205 }
00206 
00207 void
00208 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::init
00209 (const ACE_Message_Block * data)
00210 {
00211   this->length_ = data->total_length();
00212 
00213   ACE_ALLOCATOR(this->sample_,
00214                 static_cast<char *>(
00215                   this->allocator_->malloc(this->length_)));
00216 
00217   char * buf = this->sample_;
00218 
00219   for (ACE_Message_Block const * i = data;
00220        i != 0;
00221        i = i->cont()) {
00222     ACE_OS::memcpy(buf, i->rd_ptr(), i->length());
00223     buf += i->length();
00224   }
00225 }
00226 
00227 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
00228   sample_data_type const & rhs)
00229   : length_(rhs.length_)
00230   , sample_(0)
00231   , allocator_(rhs.allocator_)
00232 {
00233   this->source_timestamp_.sec     = rhs.source_timestamp_.sec;
00234   this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
00235 
00236   if (this->allocator_) {
00237     ACE_ALLOCATOR(this->sample_,
00238                   static_cast<char *>(
00239                     this->allocator_->malloc(rhs.length_)));
00240     ACE_OS::memcpy(this->sample_, rhs.sample_, rhs.length_);
00241   }
00242 }
00243 
00244 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::~sample_data_type()
00245 {
00246   if (this->allocator_)
00247     this->allocator_->free(this->sample_);
00248 }
00249 
00250 OpenDDS::DCPS::DataDurabilityCache::sample_data_type &
00251 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::operator= (
00252   sample_data_type const & rhs)
00253 {
00254   // Strongly exception-safe copy assignment.
00255   sample_data_type tmp(rhs);
00256   std::swap(this->length_, tmp.length_);
00257   std::swap(this->sample_, tmp.sample_);
00258   std::swap(this->allocator_, tmp.allocator_);
00259 
00260   this->source_timestamp_.sec     = rhs.source_timestamp_.sec;
00261   this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
00262 
00263   return *this;
00264 }
00265 
00266 void
00267 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(
00268   char const *& s,
00269   size_t & len,
00270   DDS::Time_t & source_timestamp)
00271 {
00272   s = this->sample_;
00273   len = this->length_;
00274   source_timestamp.sec     = this->source_timestamp_.sec;
00275   source_timestamp.nanosec = this->source_timestamp_.nanosec;
00276 }
00277 
00278 void
00279 OpenDDS::DCPS::DataDurabilityCache::sample_data_type::set_allocator(
00280   ACE_Allocator * allocator)
00281 {
00282   this->allocator_ = allocator;
00283 }
00284 
00285 OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
00286   DDS::DurabilityQosPolicyKind kind)
00287   : allocator_(make_allocator(kind))
00288   , kind_(kind)
00289   , samples_(0)
00290   , cleanup_timer_ids_()
00291   , lock_()
00292   , reactor_(0)
00293 {
00294   init();
00295 }
00296 
00297 OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
00298   DDS::DurabilityQosPolicyKind kind,
00299   ACE_CString & data_dir)
00300   : allocator_(make_allocator(kind))
00301   , kind_(kind)
00302   , data_dir_(data_dir)
00303   , samples_(0)
00304   , cleanup_timer_ids_()
00305   , lock_()
00306   , reactor_(0)
00307 {
00308   init();
00309 }
00310 
00311 void OpenDDS::DCPS::DataDurabilityCache::init()
00312 {
00313   ACE_Allocator * const allocator = this->allocator_.get();
00314   ACE_NEW_MALLOC(
00315     this->samples_,
00316     static_cast<sample_map_type *>(
00317       allocator->malloc(sizeof(sample_map_type))),
00318     sample_map_type(allocator));
00319 
00320   typedef DurabilityQueue<sample_data_type> data_queue_type;
00321 
00322   if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00323     // Read data from the filesystem and create the in-memory data structures
00324     // as if we had called insert() once for each "datawriter" directory.
00325     using OpenDDS::FileSystemStorage::Directory;
00326     using OpenDDS::FileSystemStorage::File;
00327     Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
00328     OPENDDS_VECTOR(OPENDDS_STRING) path(4);  // domain, topic, type, datawriter
00329 
00330     for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
00331          domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
00332       path[0] = domain->name();
00333       DDS::DomainId_t domain_id;
00334       {
00335         domain_id = ACE_OS::atoi(path[0].c_str());
00336       }
00337 
00338       for (Directory::DirectoryIterator topic = domain->begin_dirs(),
00339            topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
00340         path[1] = topic->name();
00341 
00342         for (Directory::DirectoryIterator type = topic->begin_dirs(),
00343              type_end = topic->end_dirs(); type != type_end; ++type) {
00344           path[2] = type->name();
00345 
00346           key_type key(domain_id, path[1].c_str(), path[2].c_str(),
00347                        allocator);
00348           sample_list_type * sample_list = 0;
00349           ACE_NEW_MALLOC(sample_list,
00350                          static_cast<sample_list_type *>(
00351                            allocator->malloc(sizeof(sample_list_type))),
00352                          sample_list_type(0, static_cast<data_queue_type *>(0),
00353                                           allocator));
00354           this->samples_->bind(key, sample_list, allocator);
00355 
00356           for (Directory::DirectoryIterator dw = type->begin_dirs(),
00357                dw_end = type->end_dirs(); dw != dw_end; ++dw) {
00358             path[3] = dw->name();
00359 
00360             size_t old_len = sample_list->size();
00361             sample_list->size(old_len + 1);
00362             data_queue_type *& slot = (*sample_list)[old_len];
00363 
00364             // This variable is called "samples" in the insert() method be
00365             // we already have a "samples_" which is the overall data structure.
00366             data_queue_type * sample_queue = 0;
00367             ACE_NEW_MALLOC(sample_queue,
00368                            static_cast<data_queue_type *>(
00369                              allocator->malloc(sizeof(data_queue_type))),
00370                            data_queue_type(allocator));
00371 
00372             slot = sample_queue;
00373             sample_queue->fs_path_ = path;
00374 
00375             for (Directory::FileIterator file = dw->begin_files(),
00376                  file_end = dw->end_files(); file != file_end; ++file) {
00377               std::ifstream is;
00378 
00379               if (!file->read(is)) {
00380                 if (DCPS_debug_level) {
00381                   ACE_ERROR((LM_ERROR,
00382                              ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
00383                              ACE_TEXT("couldn't open file for PERSISTENT ")
00384                              ACE_TEXT("data: %C\n"), file->name().c_str()));
00385                 }
00386                 continue;
00387               }
00388 
00389               DDS::Time_t timestamp;
00390               is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
00391               is.get(); // consume separator
00392 
00393               const size_t CHUNK = 4096;
00394               ACE_Message_Block mb(CHUNK);
00395               ACE_Message_Block * current = &mb;
00396 
00397               while (!is.eof()) {
00398                 is.read(current->wr_ptr(), current->space());
00399 
00400                 if (is.bad()) break;
00401 
00402                 current->wr_ptr((size_t)is.gcount());
00403 
00404                 if (current->space() == 0) {
00405                   ACE_Message_Block * old = current;
00406                   current = new ACE_Message_Block(CHUNK);
00407                   old->cont(current);
00408                 }
00409               }
00410 
00411               sample_queue->enqueue_tail(
00412                 sample_data_type(timestamp, mb, allocator));
00413 
00414               if (mb.cont()) mb.cont()->release();    // delete the cont() chain
00415             }
00416           }
00417         }
00418       }
00419     }
00420   }
00421 
00422   this->reactor_ = TheServiceParticipant->timer();
00423 }
00424 
00425 OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache()
00426 {
00427   // Cancel timers that haven't expired yet.
00428   timer_id_list_type::const_iterator const end(
00429     this->cleanup_timer_ids_.end());
00430 
00431   for (timer_id_list_type::const_iterator i(
00432          this->cleanup_timer_ids_.begin());
00433        i != end;
00434        ++i) {
00435     (void) this->reactor_->cancel_timer(*i);
00436   }
00437 
00438   // Clean up memory that isn't automatically managed.
00439   if (this->allocator_.get() != 0) {
00440     sample_map_type::iterator const map_end = this->samples_->end();
00441 
00442     for (sample_map_type::iterator s = this->samples_->begin();
00443          s != map_end;
00444          ++s) {
00445       sample_list_type * const list = (*s).int_id_;
00446 
00447       size_t const len = list->size();;
00448 
00449       for (size_t l = 0; l != len; ++l) {
00450         ACE_DES_FREE((*list)[l],
00451                      this->allocator_->free,
00452                      DurabilityQueue<sample_data_type>);
00453       }
00454 
00455       ACE_DES_FREE(list,
00456                    this->allocator_->free,
00457                    DurabilityArray<DurabilityQueue<sample_data_type> *>);
00458     }
00459 
00460     // Yes, this looks strange but please leave it in place.  The third param
00461     // to ACE_DES_FREE must be the actual class name since it's used in an
00462     // explicit desturctor call (~T()).  Typedefs are not allowed here.  This
00463     // is why the two ACE_DES_FREE's above are not the typedefs.  Below we use
00464     // a macro to hide the internal comma from ACE_DES_FREE's macro expansion.
00465 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
00466     ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE);
00467 #undef OPENDDS_MAP_TYPE
00468   }
00469 }
00470 
00471 bool
00472 OpenDDS::DCPS::DataDurabilityCache::insert(
00473   DDS::DomainId_t domain_id,
00474   char const * topic_name,
00475   char const * type_name,
00476   SendStateDataSampleList & the_data,
00477   DDS::DurabilityServiceQosPolicy const & qos)
00478 {
00479   if (the_data.size() == 0)
00480     return true;  // Nothing to cache.
00481 
00482   // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related
00483   // settings prior to data insertion into the cache.
00484   CORBA::Long const depth =
00485     get_instance_sample_list_depth(
00486       qos.history_kind,
00487       qos.history_depth,
00488       qos.max_samples_per_instance);
00489 
00490   // Iterator to first DataSampleElement to be copied.
00491   SendStateDataSampleList::iterator element(the_data.begin());
00492 
00493   if (depth < 0)
00494     return false; // Should never occur.
00495 
00496   else if (depth == 0)
00497     return true;  // Nothing else to do.  Discard all data.
00498 
00499   else if (the_data.size() > depth) {
00500     // N.B. Dropping data samples does not take into account
00501     // those samples which are not actually persisted (i.e.
00502     // samples with the coherent_sample_ flag set). The spec
00503     // does not provide any guidance in this case, therefore
00504     // we opt for the simplest solution and assume that there
00505     // are no change sets when calculating the number of
00506     // samples to drop.
00507 
00508     // Drop "old" samples.  Only keep the "depth" most recent
00509     // samples, i.e. those found at the tail end of the
00510     // SendStateDataSampleList.
00511     ssize_t const advance_amount = the_data.size() - depth;
00512     std::advance(element, advance_amount);
00513   }
00514 
00515   // -----------
00516 
00517   // Copy samples to the domain/topic/type-specific cache.
00518 
00519   key_type const key(domain_id,
00520                      topic_name,
00521                      type_name,
00522                      this->allocator_.get());
00523   SendStateDataSampleList::iterator the_end(the_data.end());
00524   sample_list_type * sample_list = 0;
00525 
00526   typedef DurabilityQueue<sample_data_type> data_queue_type;
00527   data_queue_type ** slot = 0;
00528   data_queue_type * samples = 0;  // sample_list_type::value_type
00529 
00530   using OpenDDS::FileSystemStorage::Directory;
00531   using OpenDDS::FileSystemStorage::File;
00532   Directory::Ptr dir;
00533   OPENDDS_VECTOR(OPENDDS_STRING) path;
00534   {
00535     ACE_Allocator * const allocator = this->allocator_.get();
00536 
00537     ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00538 
00539     if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
00540       try {
00541         dir = Directory::create(this->data_dir_.c_str());
00542 
00543         path.push_back(to_dds_string(domain_id));
00544         path.push_back(topic_name);
00545         path.push_back(type_name);
00546         dir = dir->get_dir(path);
00547         // dir is now the "type" directory, which is shared by all datawriters
00548         // of the domain/topic/type.  We actually need a new directory per
00549         // datawriter and this assumes that insert() is called once per
00550         // datawriter, as is currently the case.
00551         dir = dir->create_next_dir();
00552         path.push_back(dir->name());   // for use by the Cleanup_Handler
00553 
00554       } catch (const std::exception& ex) {
00555         if (DCPS_debug_level > 0) {
00556           ACE_ERROR((LM_ERROR,
00557                      ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00558                      ACE_TEXT("couldn't create directory for PERSISTENT ")
00559                      ACE_TEXT("data: %C\n"), ex.what()));
00560         }
00561 
00562         dir = 0;
00563       }
00564     }
00565 
00566     if (this->samples_->find(key, sample_list, allocator) != 0) {
00567       // Create a new list (actually an ACE_Array_Base<>) with the
00568       // appropriate allocator passed to its constructor.
00569       ACE_NEW_MALLOC_RETURN(
00570         sample_list,
00571         static_cast<sample_list_type *>(
00572           allocator->malloc(sizeof(sample_list_type))),
00573         sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
00574         false);
00575 
00576       if (this->samples_->bind(key, sample_list, allocator) != 0)
00577         return false;
00578     }
00579 
00580     data_queue_type ** const begin = &((*sample_list)[0]);
00581     data_queue_type ** const end =
00582       begin + sample_list->size();
00583 
00584     // Find an empty slot in the array.  This is a linear search but
00585     // that should be fine for the common case, i.e. a small number of
00586     // DataWriters that push data into the cache.
00587     slot = std::find(begin,
00588                      end,
00589                      static_cast<data_queue_type *>(0));
00590 
00591     if (slot == end) {
00592       // No available slots.  Grow the array accordingly.
00593       size_t const old_len = sample_list->size();
00594       sample_list->size(old_len + 1);
00595 
00596       data_queue_type ** new_begin = &((*sample_list)[0]);
00597       slot = new_begin + old_len;
00598     }
00599 
00600     ACE_NEW_MALLOC_RETURN(
00601       samples,
00602       static_cast<data_queue_type *>(
00603         allocator->malloc(sizeof(data_queue_type))),
00604       data_queue_type(allocator),
00605       false);
00606 
00607     // Insert the samples in to the sample list.
00608     *slot = samples;
00609 
00610     if (!dir.is_nil()) {
00611       samples->fs_path_ = path;
00612     }
00613 
00614     for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
00615       DataSampleElement& elem = *i;
00616 
00617       // N.B. Do not persist samples with coherent changes.
00618       // To verify, we check the DataSampleHeader for the
00619       // coherent_change_ flag. The DataSampleHeader will
00620       // always be the first message block in the chain.
00621       //
00622       // It should be noted that persisting coherent changes
00623       // is a non-trivial task, and should be handled when
00624       // finalizing persistence profile conformance.
00625       if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) {
00626         continue; // skip coherent sample
00627       }
00628 
00629       sample_data_type sample(elem, allocator);
00630 
00631       if (samples->enqueue_tail(sample) != 0)
00632         return false;
00633 
00634       if (!dir.is_nil()) {
00635         try {
00636           File::Ptr f = dir->create_next_file();
00637           std::ofstream os;
00638 
00639           if (!f->write(os)) return false;
00640 
00641           DDS::Time_t timestamp;
00642           const char * data;
00643           size_t len;
00644           sample.get_sample(data, len, timestamp);
00645 
00646           os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
00647           os.write(data, len);
00648 
00649         } catch (const std::exception& ex) {
00650           if (DCPS_debug_level > 0) {
00651             ACE_ERROR((LM_ERROR,
00652                        ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
00653                        ACE_TEXT("couldn't write sample for PERSISTENT ")
00654                        ACE_TEXT("data: %C\n"), ex.what()));
00655           }
00656         }
00657       }
00658     }
00659   }
00660 
00661   // -----------
00662 
00663   // Schedule cleanup timer.
00664   //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent)
00665   ACE_Time_Value const cleanup_delay(
00666     duration_to_time_value(qos.service_cleanup_delay));
00667 
00668   if (cleanup_delay > ACE_Time_Value::zero) {
00669     if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
00670       ACE_DEBUG((LM_DEBUG,
00671                  ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
00672                  ACE_TEXT("cleanup for\n")
00673                  ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
00674                  ACE_TEXT("== (%d, %C, %C)\n"),
00675                  domain_id,
00676                  topic_name,
00677                  type_name));
00678     }
00679 
00680     Cleanup_Handler * const cleanup =
00681       new Cleanup_Handler(*sample_list,
00682                           slot - &(*sample_list)[0],
00683                           this->allocator_.get(),
00684                           path,
00685                           this->data_dir_);
00686     ACE_Event_Handler_var safe_cleanup(cleanup);   // Transfer ownership
00687     long const tid =
00688       this->reactor_->schedule_timer(cleanup,
00689                                      0, // ACT
00690                                      cleanup_delay);
00691     if (tid == -1) {
00692       ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00693 
00694       ACE_DES_FREE(samples,
00695                    this->allocator_->free,
00696                    DurabilityQueue<sample_data_type>);
00697       *slot = 0;
00698 
00699       return false;
00700 
00701     } else {
00702       {
00703         ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00704         this->cleanup_timer_ids_.push_back(tid);
00705       }
00706 
00707       cleanup->timer_id(tid,
00708                         &this->cleanup_timer_ids_);
00709     }
00710   }
00711 
00712   return true;
00713 }
00714 
00715 bool
00716 OpenDDS::DCPS::DataDurabilityCache::get_data(
00717   DDS::DomainId_t domain_id,
00718   char const * topic_name,
00719   char const * type_name,
00720   DataWriterImpl * data_writer,
00721   ACE_Allocator * mb_allocator,
00722   ACE_Allocator * db_allocator,
00723   DDS::LifespanQosPolicy const & /* lifespan */)
00724 {
00725   key_type const key(domain_id,
00726                      topic_name,
00727                      type_name,
00728                      this->allocator_.get());
00729 
00730   ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
00731 
00732   sample_list_type * p_sample_list = 0;
00733 
00734   if (this->samples_->find(key,
00735                            p_sample_list,
00736                            this->allocator_.get()) == -1)
00737     return true;  // No durable data for this domain/topic/type.
00738 
00739   else if (p_sample_list == 0)
00740     return false; // Should never happen.
00741 
00742   sample_list_type & sample_list = *p_sample_list;
00743 
00744   // We will register an instance, and then write all of the cached
00745   // data to the DataWriter using that instance.
00746 
00747   sample_data_type * registration_data = 0;
00748 
00749   if (sample_list[0]->get(registration_data, 0) == -1)
00750     return false;
00751 
00752   char const * marshaled_sample = 0;
00753   size_t marshaled_sample_length = 0;
00754   DDS::Time_t registration_timestamp;
00755 
00756   registration_data->get_sample(marshaled_sample,
00757                                 marshaled_sample_length,
00758                                 registration_timestamp);
00759 
00760   // Don't use the cached allocator for the registered sample message
00761   // block.
00762   std::auto_ptr<DataSample> registration_sample(
00763     new ACE_Message_Block(marshaled_sample_length,
00764                           ACE_Message_Block::MB_DATA,
00765                           0, //cont
00766                           0, //data
00767                           0, //alloc_strategy
00768                           data_writer->get_db_lock()));
00769 
00770   ACE_OS::memcpy(registration_sample->wr_ptr(),
00771                  marshaled_sample,
00772                  marshaled_sample_length);
00773 
00774   registration_sample->wr_ptr(marshaled_sample_length);
00775 
00776   DDS::InstanceHandle_t handle = DDS::HANDLE_NIL;
00777 
00778   /**
00779    * @todo Is this going to cause problems for users that set a finite
00780    *       DDS::ResourceLimitsQosPolicy::max_instances value when
00781    *       OpenDDS supports that value?
00782    */
00783   DDS::ReturnCode_t ret =
00784     data_writer->register_instance_from_durable_data(handle,
00785                                      registration_sample.get(),
00786                                      registration_timestamp);
00787 
00788   if (ret != DDS::RETCODE_OK)
00789     return false;
00790 
00791   registration_sample.release();
00792 
00793   typedef DurabilityQueue<sample_data_type> data_queue_type;
00794   size_t const len = sample_list.size();
00795 
00796   for (size_t i = 0; i != len; ++i) {
00797     data_queue_type * const q = sample_list[i];
00798 
00799     for (data_queue_type::ITERATOR j = q->begin();
00800          !j.done();
00801          j.advance()) {
00802       sample_data_type * data = 0;
00803 
00804       if (j.next(data) == 0)
00805         return false;  // Should never happen.
00806 
00807       char const * sample = 0;  // Sample does not include header.
00808       size_t sample_length = 0;
00809       DDS::Time_t source_timestamp;
00810 
00811       data->get_sample(sample, sample_length, source_timestamp);
00812 
00813       ACE_Message_Block * mb = 0;
00814       ACE_NEW_MALLOC_RETURN(mb,
00815                             static_cast<ACE_Message_Block*>(
00816                               mb_allocator->malloc(
00817                                 sizeof(ACE_Message_Block))),
00818                             ACE_Message_Block(
00819                               sample_length,
00820                               ACE_Message_Block::MB_DATA,
00821                               0, // cont
00822                               0, // data
00823                               0, // allocator_strategy
00824                               data_writer->get_db_lock(), // data block locking_strategy
00825                               ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00826                               ACE_Time_Value::zero,
00827                               ACE_Time_Value::max_time,
00828                               db_allocator,
00829                               mb_allocator),
00830                             false);
00831 
00832       ACE_OS::memcpy(mb->wr_ptr(),
00833                      sample,
00834                      sample_length);
00835       mb->wr_ptr(sample_length);
00836 
00837       const DDS::ReturnCode_t ret = data_writer->write(mb, handle,
00838         source_timestamp, 0 /* no content filtering */);
00839 
00840       if (ret != DDS::RETCODE_OK) {
00841         ACE_DES_FREE(mb,
00842                      mb_allocator->free,
00843                      ACE_Message_Block);
00844         return false;
00845       }
00846     }
00847 
00848     // Data successfully written.  Empty the queue/list.
00849     /**
00850      * @todo If we don't empty the queue, we'll end up with duplicate
00851      *       data since the data retrieved from the cache will be
00852      *       reinserted.
00853      */
00854     q->reset();
00855 
00856     try {
00857       cleanup_directory(q->fs_path_, this->data_dir_);
00858 
00859     } catch (const std::exception& ex) {
00860       if (DCPS_debug_level > 0) {
00861         ACE_ERROR((LM_ERROR,
00862                    ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
00863                    ACE_TEXT("couldn't remove directory for PERSISTENT ")
00864                    ACE_TEXT("data: %C\n"), ex.what()));
00865       }
00866     }
00867   }
00868   return true;
00869 }
00870 
00871 std::auto_ptr<ACE_Allocator>
00872 OpenDDS::DCPS::DataDurabilityCache::make_allocator(
00873   DDS::DurabilityQosPolicyKind)
00874 {
00875   // The use of other Allocators has been removed but this function
00876   // remains for the time being.
00877   // TODO: clean up all the ACE_Allocator-related code
00878   return std::auto_ptr<ACE_Allocator> (new ACE_New_Allocator);
00879 }
00880 
00881 #endif // OPENDDS_NO_PERSISTENCE_PROFILE

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7