TRANSIENT
and PERSISTENT
DURABILITY
implementations..
More...
#include <DataDurabilityCache.h>
Collaboration diagram for OpenDDS::DCPS::DataDurabilityCache:
Public Types | |
typedef DurabilityArray< DurabilityQueue< sample_data_type > * > | sample_list_type |
typedef ACE_Hash_Map_With_Allocator< key_type, sample_list_type * > | sample_map_type |
Public Member Functions | |
typedef | OPENDDS_LIST (long) timer_id_list_type |
DataDurabilityCache (DDS::DurabilityQosPolicyKind kind) | |
Constructors. | |
DataDurabilityCache (DDS::DurabilityQosPolicyKind kind, ACE_CString &data_dir) | |
~DataDurabilityCache () | |
Destructor. | |
bool | insert (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos) |
bool | get_data (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataWriterImpl *data_writer, ACE_Allocator *mb_allocator, ACE_Allocator *db_allocator, DDS::LifespanQosPolicy const &) |
Private Member Functions | |
DataDurabilityCache (DataDurabilityCache const &) | |
DataDurabilityCache & | operator= (DataDurabilityCache const &) |
void | init () |
Static Private Member Functions | |
static std::auto_ptr< ACE_Allocator > | make_allocator (DDS::DurabilityQosPolicyKind kind) |
Private Attributes | |
std::auto_ptr< ACE_Allocator > const | allocator_ |
Allocator used to allocate memory for sample map and lists. | |
DDS::DurabilityQosPolicyKind | kind_ |
ACE_CString | data_dir_ |
sample_map_type * | samples_ |
Map of all data samples. | |
timer_id_list_type | cleanup_timer_ids_ |
Timer ID list. | |
ACE_SYNCH_MUTEX | lock_ |
Lock for synchronized access to the underlying map. | |
ACE_Reactor_Timer_Interface * | reactor_ |
Reactor with which cleanup timers will be registered. | |
Classes | |
class | key_type |
Key type for underlying maps. More... | |
class | sample_data_type |
Sample list data type for all samples. More... |
TRANSIENT
and PERSISTENT
DURABILITY
implementations..
This class implements a cache that outlives DataWriters
.
Definition at line 65 of file DataDurabilityCache.h.
typedef DurabilityArray< DurabilityQueue<sample_data_type> *> OpenDDS::DCPS::DataDurabilityCache::sample_list_type |
Definition at line 180 of file DataDurabilityCache.h.
typedef ACE_Hash_Map_With_Allocator<key_type, sample_list_type *> OpenDDS::DCPS::DataDurabilityCache::sample_map_type |
Definition at line 183 of file DataDurabilityCache.h.
OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache | ( | DDS::DurabilityQosPolicyKind | kind | ) |
Constructors.
Definition at line 285 of file DataDurabilityCache.cpp.
References init().
00287 : allocator_(make_allocator(kind)) 00288 , kind_(kind) 00289 , samples_(0) 00290 , cleanup_timer_ids_() 00291 , lock_() 00292 , reactor_(0) 00293 { 00294 init(); 00295 }
OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache | ( | DDS::DurabilityQosPolicyKind | kind, | |
ACE_CString & | data_dir | |||
) |
Definition at line 297 of file DataDurabilityCache.cpp.
References init().
00300 : allocator_(make_allocator(kind)) 00301 , kind_(kind) 00302 , data_dir_(data_dir) 00303 , samples_(0) 00304 , cleanup_timer_ids_() 00305 , lock_() 00306 , reactor_(0) 00307 { 00308 init(); 00309 }
OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache | ( | ) |
Destructor.
Definition at line 425 of file DataDurabilityCache.cpp.
References OPENDDS_MAP_TYPE, and reactor_.
00426 { 00427 // Cancel timers that haven't expired yet. 00428 timer_id_list_type::const_iterator const end( 00429 this->cleanup_timer_ids_.end()); 00430 00431 for (timer_id_list_type::const_iterator i( 00432 this->cleanup_timer_ids_.begin()); 00433 i != end; 00434 ++i) { 00435 (void) this->reactor_->cancel_timer(*i); 00436 } 00437 00438 // Clean up memory that isn't automatically managed. 00439 if (this->allocator_.get() != 0) { 00440 sample_map_type::iterator const map_end = this->samples_->end(); 00441 00442 for (sample_map_type::iterator s = this->samples_->begin(); 00443 s != map_end; 00444 ++s) { 00445 sample_list_type * const list = (*s).int_id_; 00446 00447 size_t const len = list->size();; 00448 00449 for (size_t l = 0; l != len; ++l) { 00450 ACE_DES_FREE((*list)[l], 00451 this->allocator_->free, 00452 DurabilityQueue<sample_data_type>); 00453 } 00454 00455 ACE_DES_FREE(list, 00456 this->allocator_->free, 00457 DurabilityArray<DurabilityQueue<sample_data_type> *>); 00458 } 00459 00460 // Yes, this looks strange but please leave it in place. The third param 00461 // to ACE_DES_FREE must be the actual class name since it's used in an 00462 // explicit desturctor call (~T()). Typedefs are not allowed here. This 00463 // is why the two ACE_DES_FREE's above are not the typedefs. Below we use 00464 // a macro to hide the internal comma from ACE_DES_FREE's macro expansion. 00465 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *> 00466 ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE); 00467 #undef OPENDDS_MAP_TYPE 00468 } 00469 }
OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache | ( | DataDurabilityCache const & | ) | [private] |
bool OpenDDS::DCPS::DataDurabilityCache::get_data | ( | DDS::DomainId_t | domain_id, | |
char const * | topic_name, | |||
char const * | type_name, | |||
DataWriterImpl * | data_writer, | |||
ACE_Allocator * | mb_allocator, | |||
ACE_Allocator * | db_allocator, | |||
DDS::LifespanQosPolicy const & | ||||
) |
Write cached data corresponding to given domain, topic and type to DataWriter
.
Definition at line 716 of file DataDurabilityCache.cpp.
References cleanup_directory(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataWriterImpl::get_db_lock(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), DDS::HANDLE_NIL, OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data(), DDS::RETCODE_OK, and OpenDDS::DCPS::DataWriterImpl::write().
00724 { 00725 key_type const key(domain_id, 00726 topic_name, 00727 type_name, 00728 this->allocator_.get()); 00729 00730 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false); 00731 00732 sample_list_type * p_sample_list = 0; 00733 00734 if (this->samples_->find(key, 00735 p_sample_list, 00736 this->allocator_.get()) == -1) 00737 return true; // No durable data for this domain/topic/type. 00738 00739 else if (p_sample_list == 0) 00740 return false; // Should never happen. 00741 00742 sample_list_type & sample_list = *p_sample_list; 00743 00744 // We will register an instance, and then write all of the cached 00745 // data to the DataWriter using that instance. 00746 00747 sample_data_type * registration_data = 0; 00748 00749 if (sample_list[0]->get(registration_data, 0) == -1) 00750 return false; 00751 00752 char const * marshaled_sample = 0; 00753 size_t marshaled_sample_length = 0; 00754 DDS::Time_t registration_timestamp; 00755 00756 registration_data->get_sample(marshaled_sample, 00757 marshaled_sample_length, 00758 registration_timestamp); 00759 00760 // Don't use the cached allocator for the registered sample message 00761 // block. 00762 std::auto_ptr<DataSample> registration_sample( 00763 new ACE_Message_Block(marshaled_sample_length, 00764 ACE_Message_Block::MB_DATA, 00765 0, //cont 00766 0, //data 00767 0, //alloc_strategy 00768 data_writer->get_db_lock())); 00769 00770 ACE_OS::memcpy(registration_sample->wr_ptr(), 00771 marshaled_sample, 00772 marshaled_sample_length); 00773 00774 registration_sample->wr_ptr(marshaled_sample_length); 00775 00776 DDS::InstanceHandle_t handle = DDS::HANDLE_NIL; 00777 00778 /** 00779 * @todo Is this going to cause problems for users that set a finite 00780 * DDS::ResourceLimitsQosPolicy::max_instances value when 00781 * OpenDDS supports that value? 00782 */ 00783 DDS::ReturnCode_t ret = 00784 data_writer->register_instance_from_durable_data(handle, 00785 registration_sample.get(), 00786 registration_timestamp); 00787 00788 if (ret != DDS::RETCODE_OK) 00789 return false; 00790 00791 registration_sample.release(); 00792 00793 typedef DurabilityQueue<sample_data_type> data_queue_type; 00794 size_t const len = sample_list.size(); 00795 00796 for (size_t i = 0; i != len; ++i) { 00797 data_queue_type * const q = sample_list[i]; 00798 00799 for (data_queue_type::ITERATOR j = q->begin(); 00800 !j.done(); 00801 j.advance()) { 00802 sample_data_type * data = 0; 00803 00804 if (j.next(data) == 0) 00805 return false; // Should never happen. 00806 00807 char const * sample = 0; // Sample does not include header. 00808 size_t sample_length = 0; 00809 DDS::Time_t source_timestamp; 00810 00811 data->get_sample(sample, sample_length, source_timestamp); 00812 00813 ACE_Message_Block * mb = 0; 00814 ACE_NEW_MALLOC_RETURN(mb, 00815 static_cast<ACE_Message_Block*>( 00816 mb_allocator->malloc( 00817 sizeof(ACE_Message_Block))), 00818 ACE_Message_Block( 00819 sample_length, 00820 ACE_Message_Block::MB_DATA, 00821 0, // cont 00822 0, // data 00823 0, // allocator_strategy 00824 data_writer->get_db_lock(), // data block locking_strategy 00825 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00826 ACE_Time_Value::zero, 00827 ACE_Time_Value::max_time, 00828 db_allocator, 00829 mb_allocator), 00830 false); 00831 00832 ACE_OS::memcpy(mb->wr_ptr(), 00833 sample, 00834 sample_length); 00835 mb->wr_ptr(sample_length); 00836 00837 const DDS::ReturnCode_t ret = data_writer->write(mb, handle, 00838 source_timestamp, 0 /* no content filtering */); 00839 00840 if (ret != DDS::RETCODE_OK) { 00841 ACE_DES_FREE(mb, 00842 mb_allocator->free, 00843 ACE_Message_Block); 00844 return false; 00845 } 00846 } 00847 00848 // Data successfully written. Empty the queue/list. 00849 /** 00850 * @todo If we don't empty the queue, we'll end up with duplicate 00851 * data since the data retrieved from the cache will be 00852 * reinserted. 00853 */ 00854 q->reset(); 00855 00856 try { 00857 cleanup_directory(q->fs_path_, this->data_dir_); 00858 00859 } catch (const std::exception& ex) { 00860 if (DCPS_debug_level > 0) { 00861 ACE_ERROR((LM_ERROR, 00862 ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ") 00863 ACE_TEXT("couldn't remove directory for PERSISTENT ") 00864 ACE_TEXT("data: %C\n"), ex.what())); 00865 } 00866 } 00867 } 00868 return true; 00869 }
void OpenDDS::DCPS::DataDurabilityCache::init | ( | ) | [private] |
Definition at line 311 of file DataDurabilityCache.cpp.
References allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, samples_, DDS::Time_t::sec, and TheServiceParticipant.
Referenced by DataDurabilityCache().
00312 { 00313 ACE_Allocator * const allocator = this->allocator_.get(); 00314 ACE_NEW_MALLOC( 00315 this->samples_, 00316 static_cast<sample_map_type *>( 00317 allocator->malloc(sizeof(sample_map_type))), 00318 sample_map_type(allocator)); 00319 00320 typedef DurabilityQueue<sample_data_type> data_queue_type; 00321 00322 if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) { 00323 // Read data from the filesystem and create the in-memory data structures 00324 // as if we had called insert() once for each "datawriter" directory. 00325 using OpenDDS::FileSystemStorage::Directory; 00326 using OpenDDS::FileSystemStorage::File; 00327 Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str()); 00328 OPENDDS_VECTOR(OPENDDS_STRING) path(4); // domain, topic, type, datawriter 00329 00330 for (Directory::DirectoryIterator domain = root_dir->begin_dirs(), 00331 domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) { 00332 path[0] = domain->name(); 00333 DDS::DomainId_t domain_id; 00334 { 00335 domain_id = ACE_OS::atoi(path[0].c_str()); 00336 } 00337 00338 for (Directory::DirectoryIterator topic = domain->begin_dirs(), 00339 topic_end = domain->end_dirs(); topic != topic_end; ++topic) { 00340 path[1] = topic->name(); 00341 00342 for (Directory::DirectoryIterator type = topic->begin_dirs(), 00343 type_end = topic->end_dirs(); type != type_end; ++type) { 00344 path[2] = type->name(); 00345 00346 key_type key(domain_id, path[1].c_str(), path[2].c_str(), 00347 allocator); 00348 sample_list_type * sample_list = 0; 00349 ACE_NEW_MALLOC(sample_list, 00350 static_cast<sample_list_type *>( 00351 allocator->malloc(sizeof(sample_list_type))), 00352 sample_list_type(0, static_cast<data_queue_type *>(0), 00353 allocator)); 00354 this->samples_->bind(key, sample_list, allocator); 00355 00356 for (Directory::DirectoryIterator dw = type->begin_dirs(), 00357 dw_end = type->end_dirs(); dw != dw_end; ++dw) { 00358 path[3] = dw->name(); 00359 00360 size_t old_len = sample_list->size(); 00361 sample_list->size(old_len + 1); 00362 data_queue_type *& slot = (*sample_list)[old_len]; 00363 00364 // This variable is called "samples" in the insert() method be 00365 // we already have a "samples_" which is the overall data structure. 00366 data_queue_type * sample_queue = 0; 00367 ACE_NEW_MALLOC(sample_queue, 00368 static_cast<data_queue_type *>( 00369 allocator->malloc(sizeof(data_queue_type))), 00370 data_queue_type(allocator)); 00371 00372 slot = sample_queue; 00373 sample_queue->fs_path_ = path; 00374 00375 for (Directory::FileIterator file = dw->begin_files(), 00376 file_end = dw->end_files(); file != file_end; ++file) { 00377 std::ifstream is; 00378 00379 if (!file->read(is)) { 00380 if (DCPS_debug_level) { 00381 ACE_ERROR((LM_ERROR, 00382 ACE_TEXT("(%P|%t) DataDurabilityCache::init ") 00383 ACE_TEXT("couldn't open file for PERSISTENT ") 00384 ACE_TEXT("data: %C\n"), file->name().c_str())); 00385 } 00386 continue; 00387 } 00388 00389 DDS::Time_t timestamp; 00390 is >> timestamp.sec >> timestamp.nanosec >> std::noskipws; 00391 is.get(); // consume separator 00392 00393 const size_t CHUNK = 4096; 00394 ACE_Message_Block mb(CHUNK); 00395 ACE_Message_Block * current = &mb; 00396 00397 while (!is.eof()) { 00398 is.read(current->wr_ptr(), current->space()); 00399 00400 if (is.bad()) break; 00401 00402 current->wr_ptr((size_t)is.gcount()); 00403 00404 if (current->space() == 0) { 00405 ACE_Message_Block * old = current; 00406 current = new ACE_Message_Block(CHUNK); 00407 old->cont(current); 00408 } 00409 } 00410 00411 sample_queue->enqueue_tail( 00412 sample_data_type(timestamp, mb, allocator)); 00413 00414 if (mb.cont()) mb.cont()->release(); // delete the cont() chain 00415 } 00416 } 00417 } 00418 } 00419 } 00420 } 00421 00422 this->reactor_ = TheServiceParticipant->timer(); 00423 }
bool OpenDDS::DCPS::DataDurabilityCache::insert | ( | DDS::DomainId_t | domain_id, | |
char const * | topic_name, | |||
char const * | type_name, | |||
SendStateDataSampleList & | the_data, | |||
DDS::DurabilityServiceQosPolicy const & | qos | |||
) |
Insert the samples corresponding to the given topic instance (uniquely identify by its domain, topic name and type name) into the data durability cache.
Definition at line 472 of file DataDurabilityCache.cpp.
References allocator_, OpenDDS::DCPS::SendStateDataSampleList::begin(), cleanup_timer_ids_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, data_dir_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::SendStateDataSampleList::end(), Util::find(), OpenDDS::DCPS::get_instance_sample_list_depth(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), DDS::DurabilityServiceQosPolicy::history_depth, DDS::DurabilityServiceQosPolicy::history_kind, DDS::DurabilityServiceQosPolicy::max_samples_per_instance, DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, DDS::Time_t::sec, DDS::DurabilityServiceQosPolicy::service_cleanup_delay, OpenDDS::DCPS::SendStateDataSampleList::size(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and OpenDDS::DCPS::to_dds_string().
Referenced by OpenDDS::DCPS::WriteDataContainer::persist_data().
00478 { 00479 if (the_data.size() == 0) 00480 return true; // Nothing to cache. 00481 00482 // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related 00483 // settings prior to data insertion into the cache. 00484 CORBA::Long const depth = 00485 get_instance_sample_list_depth( 00486 qos.history_kind, 00487 qos.history_depth, 00488 qos.max_samples_per_instance); 00489 00490 // Iterator to first DataSampleElement to be copied. 00491 SendStateDataSampleList::iterator element(the_data.begin()); 00492 00493 if (depth < 0) 00494 return false; // Should never occur. 00495 00496 else if (depth == 0) 00497 return true; // Nothing else to do. Discard all data. 00498 00499 else if (the_data.size() > depth) { 00500 // N.B. Dropping data samples does not take into account 00501 // those samples which are not actually persisted (i.e. 00502 // samples with the coherent_sample_ flag set). The spec 00503 // does not provide any guidance in this case, therefore 00504 // we opt for the simplest solution and assume that there 00505 // are no change sets when calculating the number of 00506 // samples to drop. 00507 00508 // Drop "old" samples. Only keep the "depth" most recent 00509 // samples, i.e. those found at the tail end of the 00510 // SendStateDataSampleList. 00511 ssize_t const advance_amount = the_data.size() - depth; 00512 std::advance(element, advance_amount); 00513 } 00514 00515 // ----------- 00516 00517 // Copy samples to the domain/topic/type-specific cache. 00518 00519 key_type const key(domain_id, 00520 topic_name, 00521 type_name, 00522 this->allocator_.get()); 00523 SendStateDataSampleList::iterator the_end(the_data.end()); 00524 sample_list_type * sample_list = 0; 00525 00526 typedef DurabilityQueue<sample_data_type> data_queue_type; 00527 data_queue_type ** slot = 0; 00528 data_queue_type * samples = 0; // sample_list_type::value_type 00529 00530 using OpenDDS::FileSystemStorage::Directory; 00531 using OpenDDS::FileSystemStorage::File; 00532 Directory::Ptr dir; 00533 OPENDDS_VECTOR(OPENDDS_STRING) path; 00534 { 00535 ACE_Allocator * const allocator = this->allocator_.get(); 00536 00537 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false); 00538 00539 if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) { 00540 try { 00541 dir = Directory::create(this->data_dir_.c_str()); 00542 00543 path.push_back(to_dds_string(domain_id)); 00544 path.push_back(topic_name); 00545 path.push_back(type_name); 00546 dir = dir->get_dir(path); 00547 // dir is now the "type" directory, which is shared by all datawriters 00548 // of the domain/topic/type. We actually need a new directory per 00549 // datawriter and this assumes that insert() is called once per 00550 // datawriter, as is currently the case. 00551 dir = dir->create_next_dir(); 00552 path.push_back(dir->name()); // for use by the Cleanup_Handler 00553 00554 } catch (const std::exception& ex) { 00555 if (DCPS_debug_level > 0) { 00556 ACE_ERROR((LM_ERROR, 00557 ACE_TEXT("(%P|%t) DataDurabilityCache::insert ") 00558 ACE_TEXT("couldn't create directory for PERSISTENT ") 00559 ACE_TEXT("data: %C\n"), ex.what())); 00560 } 00561 00562 dir = 0; 00563 } 00564 } 00565 00566 if (this->samples_->find(key, sample_list, allocator) != 0) { 00567 // Create a new list (actually an ACE_Array_Base<>) with the 00568 // appropriate allocator passed to its constructor. 00569 ACE_NEW_MALLOC_RETURN( 00570 sample_list, 00571 static_cast<sample_list_type *>( 00572 allocator->malloc(sizeof(sample_list_type))), 00573 sample_list_type(1, static_cast<data_queue_type *>(0), allocator), 00574 false); 00575 00576 if (this->samples_->bind(key, sample_list, allocator) != 0) 00577 return false; 00578 } 00579 00580 data_queue_type ** const begin = &((*sample_list)[0]); 00581 data_queue_type ** const end = 00582 begin + sample_list->size(); 00583 00584 // Find an empty slot in the array. This is a linear search but 00585 // that should be fine for the common case, i.e. a small number of 00586 // DataWriters that push data into the cache. 00587 slot = std::find(begin, 00588 end, 00589 static_cast<data_queue_type *>(0)); 00590 00591 if (slot == end) { 00592 // No available slots. Grow the array accordingly. 00593 size_t const old_len = sample_list->size(); 00594 sample_list->size(old_len + 1); 00595 00596 data_queue_type ** new_begin = &((*sample_list)[0]); 00597 slot = new_begin + old_len; 00598 } 00599 00600 ACE_NEW_MALLOC_RETURN( 00601 samples, 00602 static_cast<data_queue_type *>( 00603 allocator->malloc(sizeof(data_queue_type))), 00604 data_queue_type(allocator), 00605 false); 00606 00607 // Insert the samples in to the sample list. 00608 *slot = samples; 00609 00610 if (!dir.is_nil()) { 00611 samples->fs_path_ = path; 00612 } 00613 00614 for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) { 00615 DataSampleElement& elem = *i; 00616 00617 // N.B. Do not persist samples with coherent changes. 00618 // To verify, we check the DataSampleHeader for the 00619 // coherent_change_ flag. The DataSampleHeader will 00620 // always be the first message block in the chain. 00621 // 00622 // It should be noted that persisting coherent changes 00623 // is a non-trivial task, and should be handled when 00624 // finalizing persistence profile conformance. 00625 if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) { 00626 continue; // skip coherent sample 00627 } 00628 00629 sample_data_type sample(elem, allocator); 00630 00631 if (samples->enqueue_tail(sample) != 0) 00632 return false; 00633 00634 if (!dir.is_nil()) { 00635 try { 00636 File::Ptr f = dir->create_next_file(); 00637 std::ofstream os; 00638 00639 if (!f->write(os)) return false; 00640 00641 DDS::Time_t timestamp; 00642 const char * data; 00643 size_t len; 00644 sample.get_sample(data, len, timestamp); 00645 00646 os << timestamp.sec << ' ' << timestamp.nanosec << ' '; 00647 os.write(data, len); 00648 00649 } catch (const std::exception& ex) { 00650 if (DCPS_debug_level > 0) { 00651 ACE_ERROR((LM_ERROR, 00652 ACE_TEXT("(%P|%t) DataDurabilityCache::insert ") 00653 ACE_TEXT("couldn't write sample for PERSISTENT ") 00654 ACE_TEXT("data: %C\n"), ex.what())); 00655 } 00656 } 00657 } 00658 } 00659 } 00660 00661 // ----------- 00662 00663 // Schedule cleanup timer. 00664 //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent) 00665 ACE_Time_Value const cleanup_delay( 00666 duration_to_time_value(qos.service_cleanup_delay)); 00667 00668 if (cleanup_delay > ACE_Time_Value::zero) { 00669 if (OpenDDS::DCPS::DCPS_debug_level >= 4) { 00670 ACE_DEBUG((LM_DEBUG, 00671 ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ") 00672 ACE_TEXT("cleanup for\n") 00673 ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ") 00674 ACE_TEXT("== (%d, %C, %C)\n"), 00675 domain_id, 00676 topic_name, 00677 type_name)); 00678 } 00679 00680 Cleanup_Handler * const cleanup = 00681 new Cleanup_Handler(*sample_list, 00682 slot - &(*sample_list)[0], 00683 this->allocator_.get(), 00684 path, 00685 this->data_dir_); 00686 ACE_Event_Handler_var safe_cleanup(cleanup); // Transfer ownership 00687 long const tid = 00688 this->reactor_->schedule_timer(cleanup, 00689 0, // ACT 00690 cleanup_delay); 00691 if (tid == -1) { 00692 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false); 00693 00694 ACE_DES_FREE(samples, 00695 this->allocator_->free, 00696 DurabilityQueue<sample_data_type>); 00697 *slot = 0; 00698 00699 return false; 00700 00701 } else { 00702 { 00703 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false); 00704 this->cleanup_timer_ids_.push_back(tid); 00705 } 00706 00707 cleanup->timer_id(tid, 00708 &this->cleanup_timer_ids_); 00709 } 00710 } 00711 00712 return true; 00713 }
std::auto_ptr< ACE_Allocator > OpenDDS::DCPS::DataDurabilityCache::make_allocator | ( | DDS::DurabilityQosPolicyKind | kind | ) | [static, private] |
Make allocator suitable to support specified kind of DURABILITY
.
Definition at line 872 of file DataDurabilityCache.cpp.
00874 { 00875 // The use of other Allocators has been removed but this function 00876 // remains for the time being. 00877 // TODO: clean up all the ACE_Allocator-related code 00878 return std::auto_ptr<ACE_Allocator> (new ACE_New_Allocator); 00879 }
typedef OpenDDS::DCPS::DataDurabilityCache::OPENDDS_LIST | ( | long | ) |
DataDurabilityCache& OpenDDS::DCPS::DataDurabilityCache::operator= | ( | DataDurabilityCache const & | ) | [private] |
std::auto_ptr<ACE_Allocator> const OpenDDS::DCPS::DataDurabilityCache::allocator_ [private] |
Allocator used to allocate memory for sample map and lists.
Definition at line 230 of file DataDurabilityCache.h.
timer_id_list_type OpenDDS::DCPS::DataDurabilityCache::cleanup_timer_ids_ [private] |
Timer ID list.
Keep track of cleanup timer IDs in case we need to cancel before they expire.
Definition at line 244 of file DataDurabilityCache.h.
Referenced by insert().
ACE_CString OpenDDS::DCPS::DataDurabilityCache::data_dir_ [private] |
Definition at line 232 of file DataDurabilityCache.h.
ACE_SYNCH_MUTEX OpenDDS::DCPS::DataDurabilityCache::lock_ [private] |
Lock for synchronized access to the underlying map.
Definition at line 247 of file DataDurabilityCache.h.
ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataDurabilityCache::reactor_ [private] |
Reactor with which cleanup timers will be registered.
Definition at line 250 of file DataDurabilityCache.h.
Referenced by init(), insert(), and ~DataDurabilityCache().
Map of all data samples.
Definition at line 237 of file DataDurabilityCache.h.
Referenced by init().