Underlying data cache for both OpenDDS TRANSIENT
and PERSISTENT
DURABILITY
implementations..
More...
#include <DataDurabilityCache.h>
Classes | |
class | key_type |
Key type for underlying maps. More... | |
class | sample_data_type |
Sample list data type for all samples. More... | |
Public Types | |
typedef DurabilityArray < DurabilityQueue < sample_data_type > * > | sample_list_type |
typedef ACE_Hash_Map_With_Allocator < key_type, sample_list_type * > | sample_map_type |
Public Member Functions | |
typedef | OPENDDS_LIST (long) timer_id_list_type |
DataDurabilityCache (DDS::DurabilityQosPolicyKind kind) | |
DataDurabilityCache (DDS::DurabilityQosPolicyKind kind, ACE_CString &data_dir) | |
~DataDurabilityCache () | |
bool | insert (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos) |
bool | get_data (DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataWriterImpl *data_writer, ACE_Allocator *mb_allocator, ACE_Allocator *db_allocator, DDS::LifespanQosPolicy const &) |
Private Member Functions | |
DataDurabilityCache (DataDurabilityCache const &) | |
DataDurabilityCache & | operator= (DataDurabilityCache const &) |
void | init () |
Private Attributes | |
unique_ptr< ACE_Allocator > const | allocator_ |
Allocator used to allocate memory for sample map and lists. | |
DDS::DurabilityQosPolicyKind | kind_ |
ACE_CString | data_dir_ |
sample_map_type * | samples_ |
Map of all data samples. | |
timer_id_list_type | cleanup_timer_ids_ |
Timer ID list. | |
ACE_SYNCH_MUTEX | lock_ |
Lock for synchronized access to the underlying map. | |
ACE_Reactor_Timer_Interface * | reactor_ |
Reactor with which cleanup timers will be registered. |
Underlying data cache for both OpenDDS TRANSIENT
and PERSISTENT
DURABILITY
implementations..
This class implements a cache that outlives DataWriters
.
Definition at line 69 of file DataDurabilityCache.h.
typedef DurabilityArray< DurabilityQueue<sample_data_type> *> OpenDDS::DCPS::DataDurabilityCache::sample_list_type |
Definition at line 184 of file DataDurabilityCache.h.
typedef ACE_Hash_Map_With_Allocator<key_type, sample_list_type *> OpenDDS::DCPS::DataDurabilityCache::sample_map_type |
Definition at line 187 of file DataDurabilityCache.h.
OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache | ( | DDS::DurabilityQosPolicyKind | kind | ) |
Definition at line 284 of file DataDurabilityCache.cpp.
References init().
00286 : allocator_(new ACE_New_Allocator) 00287 , kind_(kind) 00288 , samples_(0) 00289 , cleanup_timer_ids_() 00290 , lock_() 00291 , reactor_(0) 00292 { 00293 init(); 00294 }
OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache | ( | DDS::DurabilityQosPolicyKind | kind, | |
ACE_CString & | data_dir | |||
) |
Definition at line 296 of file DataDurabilityCache.cpp.
References init().
00299 : allocator_(new ACE_New_Allocator) 00300 , kind_(kind) 00301 , data_dir_(data_dir) 00302 , samples_(0) 00303 , cleanup_timer_ids_() 00304 , lock_() 00305 , reactor_(0) 00306 { 00307 init(); 00308 }
OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache | ( | ) |
Definition at line 424 of file DataDurabilityCache.cpp.
References allocator_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::begin(), ACE_Reactor_Timer_Interface::cancel_timer(), cleanup_timer_ids_, ACE_Hash_Map_Manager_Ex< EXT_ID, INT_ID, HASH_KEY, COMPARE_KEYS, ACE_LOCK >::end(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), len, OPENDDS_MAP_TYPE, reactor_, samples_, and ACE_Array_Base< T >::size().
00425 { 00426 // Cancel timers that haven't expired yet. 00427 timer_id_list_type::const_iterator const end( 00428 this->cleanup_timer_ids_.end()); 00429 00430 for (timer_id_list_type::const_iterator i( 00431 this->cleanup_timer_ids_.begin()); 00432 i != end; 00433 ++i) { 00434 (void) this->reactor_->cancel_timer(*i); 00435 } 00436 00437 // Clean up memory that isn't automatically managed. 00438 if (this->allocator_.get() != 0) { 00439 sample_map_type::iterator const map_end = this->samples_->end(); 00440 00441 for (sample_map_type::iterator s = this->samples_->begin(); 00442 s != map_end; 00443 ++s) { 00444 sample_list_type * const list = (*s).int_id_; 00445 00446 size_t const len = list->size(); 00447 00448 for (size_t l = 0; l != len; ++l) { 00449 ACE_DES_FREE((*list)[l], 00450 this->allocator_->free, 00451 DurabilityQueue<sample_data_type>); 00452 } 00453 00454 ACE_DES_FREE(list, 00455 this->allocator_->free, 00456 DurabilityArray<DurabilityQueue<sample_data_type> *>); 00457 } 00458 00459 // Yes, this looks strange but please leave it in place. The third param 00460 // to ACE_DES_FREE must be the actual class name since it's used in an 00461 // explicit desturctor call (~T()). Typedefs are not allowed here. This 00462 // is why the two ACE_DES_FREE's above are not the typedefs. Below we use 00463 // a macro to hide the internal comma from ACE_DES_FREE's macro expansion. 00464 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *> 00465 ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE); 00466 #undef OPENDDS_MAP_TYPE 00467 } 00468 }
OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache | ( | DataDurabilityCache const & | ) | [private] |
bool OpenDDS::DCPS::DataDurabilityCache::get_data | ( | DDS::DomainId_t | domain_id, | |
char const * | topic_name, | |||
char const * | type_name, | |||
DataWriterImpl * | data_writer, | |||
ACE_Allocator * | mb_allocator, | |||
ACE_Allocator * | db_allocator, | |||
DDS::LifespanQosPolicy const & | ||||
) |
Write cached data corresponding to given domain, topic and type to DataWriter
.
Definition at line 716 of file DataDurabilityCache.cpp.
References ACE_TEXT(), ACE_Array_Base< T >::allocator_, allocator_, OpenDDS::DCPS::DCPS_debug_level, ACE_Hash_Map_With_Allocator< class, class >::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataWriterImpl::get_db_lock(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), DDS::HANDLE_NIL, len, LM_ERROR, lock_, ACE_Allocator::malloc(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, ACE_OS::memcpy(), OpenDDS::DCPS::move(), OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data(), DDS::RETCODE_OK, samples_, ACE_Array_Base< T >::size(), OpenDDS::DCPS::DataWriterImpl::write(), and ACE_Time_Value::zero.
00724 { 00725 key_type const key(domain_id, 00726 topic_name, 00727 type_name, 00728 this->allocator_.get()); 00729 00730 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false); 00731 00732 sample_list_type * p_sample_list = 0; 00733 00734 if (this->samples_->find(key, 00735 p_sample_list, 00736 this->allocator_.get()) == -1) 00737 return true; // No durable data for this domain/topic/type. 00738 00739 else if (p_sample_list == 0) 00740 return false; // Should never happen. 00741 00742 sample_list_type & sample_list = *p_sample_list; 00743 00744 // We will register an instance, and then write all of the cached 00745 // data to the DataWriter using that instance. 00746 00747 sample_data_type * registration_data = 0; 00748 00749 if (sample_list[0]->get(registration_data, 0) == -1) 00750 return false; 00751 00752 char const * marshaled_sample = 0; 00753 size_t marshaled_sample_length = 0; 00754 DDS::Time_t registration_timestamp; 00755 00756 registration_data->get_sample(marshaled_sample, 00757 marshaled_sample_length, 00758 registration_timestamp); 00759 00760 // Don't use the cached allocator for the registered sample message 00761 // block. 00762 Message_Block_Ptr registration_sample( 00763 new ACE_Message_Block(marshaled_sample_length, 00764 ACE_Message_Block::MB_DATA, 00765 0, //cont 00766 0, //data 00767 0, //alloc_strategy 00768 data_writer->get_db_lock())); 00769 00770 ACE_OS::memcpy(registration_sample->wr_ptr(), 00771 marshaled_sample, 00772 marshaled_sample_length); 00773 00774 registration_sample->wr_ptr(marshaled_sample_length); 00775 00776 DDS::InstanceHandle_t handle = DDS::HANDLE_NIL; 00777 00778 /** 00779 * @todo Is this going to cause problems for users that set a finite 00780 * DDS::ResourceLimitsQosPolicy::max_instances value when 00781 * OpenDDS supports that value? 00782 */ 00783 DDS::ReturnCode_t ret = 00784 data_writer->register_instance_from_durable_data(handle, 00785 move(registration_sample), 00786 registration_timestamp); 00787 00788 if (ret != DDS::RETCODE_OK) 00789 return false; 00790 00791 typedef DurabilityQueue<sample_data_type> data_queue_type; 00792 size_t const len = sample_list.size(); 00793 00794 for (size_t i = 0; i != len; ++i) { 00795 data_queue_type * const q = sample_list[i]; 00796 00797 for (data_queue_type::ITERATOR j = q->begin(); 00798 !j.done(); 00799 j.advance()) { 00800 sample_data_type * data = 0; 00801 00802 if (j.next(data) == 0) 00803 return false; // Should never happen. 00804 00805 char const * sample = 0; // Sample does not include header. 00806 size_t sample_length = 0; 00807 DDS::Time_t source_timestamp; 00808 00809 data->get_sample(sample, sample_length, source_timestamp); 00810 00811 ACE_Message_Block * tmp_mb = 0; 00812 ACE_NEW_MALLOC_RETURN(tmp_mb, 00813 static_cast<ACE_Message_Block*>( 00814 mb_allocator->malloc( 00815 sizeof(ACE_Message_Block))), 00816 ACE_Message_Block( 00817 sample_length, 00818 ACE_Message_Block::MB_DATA, 00819 0, // cont 00820 0, // data 00821 0, // allocator_strategy 00822 data_writer->get_db_lock(), // data block locking_strategy 00823 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00824 ACE_Time_Value::zero, 00825 ACE_Time_Value::max_time, 00826 db_allocator, 00827 mb_allocator), 00828 false); 00829 Message_Block_Ptr mb(tmp_mb); 00830 00831 ACE_OS::memcpy(mb->wr_ptr(), 00832 sample, 00833 sample_length); 00834 mb->wr_ptr(sample_length); 00835 00836 const DDS::ReturnCode_t ret = data_writer->write(move(mb), handle, 00837 source_timestamp, 0 /* no content filtering */); 00838 00839 if (ret != DDS::RETCODE_OK) { 00840 return false; 00841 } 00842 } 00843 00844 // Data successfully written. Empty the queue/list. 00845 /** 00846 * @todo If we don't empty the queue, we'll end up with duplicate 00847 * data since the data retrieved from the cache will be 00848 * reinserted. 00849 */ 00850 q->reset(); 00851 00852 try { 00853 cleanup_directory(q->fs_path_, this->data_dir_); 00854 00855 } catch (const std::exception& ex) { 00856 if (DCPS_debug_level > 0) { 00857 ACE_ERROR((LM_ERROR, 00858 ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ") 00859 ACE_TEXT("couldn't remove directory for PERSISTENT ") 00860 ACE_TEXT("data: %C\n"), ex.what())); 00861 } 00862 } 00863 } 00864 return true; 00865 }
void OpenDDS::DCPS::DataDurabilityCache::init | ( | void | ) | [private] |
Definition at line 310 of file DataDurabilityCache.cpp.
References ACE_TEXT(), allocator_, ACE_OS::atoi(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_String_Base< ACE_CHAR_T >::c_str(), ACE_Message_Block::cont(), data_dir_, OpenDDS::DCPS::DCPS_debug_level, file, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), kind_, LM_ERROR, ACE_Allocator::malloc(), DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, ACE_Message_Block::release(), samples_, DDS::Time_t::sec, ACE_Array_Base< T >::size(), ACE_Message_Block::space(), TheServiceParticipant, timestamp(), and ACE_Message_Block::wr_ptr().
Referenced by DataDurabilityCache().
00311 { 00312 ACE_Allocator * const allocator = this->allocator_.get(); 00313 ACE_NEW_MALLOC( 00314 this->samples_, 00315 static_cast<sample_map_type *>( 00316 allocator->malloc(sizeof(sample_map_type))), 00317 sample_map_type(allocator)); 00318 00319 typedef DurabilityQueue<sample_data_type> data_queue_type; 00320 00321 if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) { 00322 // Read data from the filesystem and create the in-memory data structures 00323 // as if we had called insert() once for each "datawriter" directory. 00324 using OpenDDS::FileSystemStorage::Directory; 00325 using OpenDDS::FileSystemStorage::File; 00326 Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str()); 00327 OPENDDS_VECTOR(OPENDDS_STRING) path(4); // domain, topic, type, datawriter 00328 00329 for (Directory::DirectoryIterator domain = root_dir->begin_dirs(), 00330 domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) { 00331 path[0] = domain->name(); 00332 DDS::DomainId_t domain_id; 00333 { 00334 domain_id = ACE_OS::atoi(path[0].c_str()); 00335 } 00336 00337 for (Directory::DirectoryIterator topic = domain->begin_dirs(), 00338 topic_end = domain->end_dirs(); topic != topic_end; ++topic) { 00339 path[1] = topic->name(); 00340 00341 for (Directory::DirectoryIterator type = topic->begin_dirs(), 00342 type_end = topic->end_dirs(); type != type_end; ++type) { 00343 path[2] = type->name(); 00344 00345 key_type key(domain_id, path[1].c_str(), path[2].c_str(), 00346 allocator); 00347 sample_list_type * sample_list = 0; 00348 ACE_NEW_MALLOC(sample_list, 00349 static_cast<sample_list_type *>( 00350 allocator->malloc(sizeof(sample_list_type))), 00351 sample_list_type(0, static_cast<data_queue_type *>(0), 00352 allocator)); 00353 this->samples_->bind(key, sample_list, allocator); 00354 00355 for (Directory::DirectoryIterator dw = type->begin_dirs(), 00356 dw_end = type->end_dirs(); dw != dw_end; ++dw) { 00357 path[3] = dw->name(); 00358 00359 size_t old_len = sample_list->size(); 00360 sample_list->size(old_len + 1); 00361 data_queue_type *& slot = (*sample_list)[old_len]; 00362 00363 // This variable is called "samples" in the insert() method be 00364 // we already have a "samples_" which is the overall data structure. 00365 data_queue_type * sample_queue = 0; 00366 ACE_NEW_MALLOC(sample_queue, 00367 static_cast<data_queue_type *>( 00368 allocator->malloc(sizeof(data_queue_type))), 00369 data_queue_type(allocator)); 00370 00371 slot = sample_queue; 00372 sample_queue->fs_path_ = path; 00373 00374 for (Directory::FileIterator file = dw->begin_files(), 00375 file_end = dw->end_files(); file != file_end; ++file) { 00376 std::ifstream is; 00377 00378 if (!file->read(is)) { 00379 if (DCPS_debug_level) { 00380 ACE_ERROR((LM_ERROR, 00381 ACE_TEXT("(%P|%t) DataDurabilityCache::init ") 00382 ACE_TEXT("couldn't open file for PERSISTENT ") 00383 ACE_TEXT("data: %C\n"), file->name().c_str())); 00384 } 00385 continue; 00386 } 00387 00388 DDS::Time_t timestamp; 00389 is >> timestamp.sec >> timestamp.nanosec >> std::noskipws; 00390 is.get(); // consume separator 00391 00392 const size_t CHUNK = 4096; 00393 ACE_Message_Block mb(CHUNK); 00394 ACE_Message_Block * current = &mb; 00395 00396 while (!is.eof()) { 00397 is.read(current->wr_ptr(), current->space()); 00398 00399 if (is.bad()) break; 00400 00401 current->wr_ptr((size_t)is.gcount()); 00402 00403 if (current->space() == 0) { 00404 ACE_Message_Block * old = current; 00405 current = new ACE_Message_Block(CHUNK); 00406 old->cont(current); 00407 } 00408 } 00409 00410 sample_queue->enqueue_tail( 00411 sample_data_type(timestamp, mb, allocator)); 00412 00413 if (mb.cont()) mb.cont()->release(); // delete the cont() chain 00414 } 00415 } 00416 } 00417 } 00418 } 00419 } 00420 00421 this->reactor_ = TheServiceParticipant->timer(); 00422 }
bool OpenDDS::DCPS::DataDurabilityCache::insert | ( | DDS::DomainId_t | domain_id, | |
char const * | topic_name, | |||
char const * | type_name, | |||
SendStateDataSampleList & | the_data, | |||
DDS::DurabilityServiceQosPolicy const & | qos | |||
) |
Insert the samples corresponding to the given topic instance (uniquely identify by its domain, topic name and type name) into the data durability cache.
Definition at line 471 of file DataDurabilityCache.cpp.
References ACE_TEXT(), allocator_, OpenDDS::DCPS::SendStateDataSampleList::begin(), ACE_Hash_Map_With_Allocator< class, class >::bind(), ACE_String_Base< ACE_CHAR_T >::c_str(), cleanup(), cleanup_timer_ids_, OpenDDS::DCPS::COHERENT_CHANGE_FLAG, data_dir_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::duration_to_time_value(), OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::find(), ACE_Hash_Map_With_Allocator< class, class >::find(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(), OpenDDS::DCPS::DataSampleElement::get_sample(), DDS::DurabilityServiceQosPolicy::history_depth, DDS::DurabilityServiceQosPolicy::history_kind, DDS::KEEP_ALL_HISTORY_QOS, kind_, len, DDS::LENGTH_UNLIMITED, LM_DEBUG, LM_ERROR, lock_, ACE_Allocator::malloc(), DDS::DurabilityServiceQosPolicy::max_samples_per_instance, DDS::Time_t::nanosec, OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), DDS::PERSISTENT_DURABILITY_QOS, reactor_, samples_, ACE_Reactor_Timer_Interface::schedule_timer(), DDS::Time_t::sec, DDS::DurabilityServiceQosPolicy::service_cleanup_delay, OpenDDS::DCPS::SendStateDataSampleList::size(), OpenDDS::DCPS::DataSampleHeader::test_flag(), timestamp(), OpenDDS::DCPS::to_dds_string(), and ACE_Time_Value::zero.
Referenced by OpenDDS::DCPS::WriteDataContainer::persist_data().
00477 { 00478 if (the_data.size() == 0) 00479 return true; // Nothing to cache. 00480 00481 // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related 00482 // settings prior to data insertion into the cache. 00483 int depth = qos.history_kind == DDS::KEEP_ALL_HISTORY_QOS 00484 ? qos.max_samples_per_instance 00485 : qos.history_depth; 00486 00487 if (depth == DDS::LENGTH_UNLIMITED) 00488 depth = 0x7fffffff; 00489 00490 // Iterator to first DataSampleElement to be copied. 00491 SendStateDataSampleList::iterator element(the_data.begin()); 00492 00493 if (depth < 0) 00494 return false; // Should never occur. 00495 00496 else if (depth == 0) 00497 return true; // Nothing else to do. Discard all data. 00498 00499 else if (the_data.size() > depth) { 00500 // N.B. Dropping data samples does not take into account 00501 // those samples which are not actually persisted (i.e. 00502 // samples with the coherent_sample_ flag set). The spec 00503 // does not provide any guidance in this case, therefore 00504 // we opt for the simplest solution and assume that there 00505 // are no change sets when calculating the number of 00506 // samples to drop. 00507 00508 // Drop "old" samples. Only keep the "depth" most recent 00509 // samples, i.e. those found at the tail end of the 00510 // SendStateDataSampleList. 00511 ssize_t const advance_amount = the_data.size() - depth; 00512 std::advance(element, advance_amount); 00513 } 00514 00515 // ----------- 00516 00517 // Copy samples to the domain/topic/type-specific cache. 00518 00519 key_type const key(domain_id, 00520 topic_name, 00521 type_name, 00522 this->allocator_.get()); 00523 SendStateDataSampleList::iterator the_end(the_data.end()); 00524 sample_list_type * sample_list = 0; 00525 00526 typedef DurabilityQueue<sample_data_type> data_queue_type; 00527 data_queue_type ** slot = 0; 00528 data_queue_type * samples = 0; // sample_list_type::value_type 00529 00530 using OpenDDS::FileSystemStorage::Directory; 00531 using OpenDDS::FileSystemStorage::File; 00532 Directory::Ptr dir; 00533 OPENDDS_VECTOR(OPENDDS_STRING) path; 00534 { 00535 ACE_Allocator * const allocator = this->allocator_.get(); 00536 00537 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false); 00538 00539 if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) { 00540 try { 00541 dir = Directory::create(this->data_dir_.c_str()); 00542 00543 path.push_back(to_dds_string(domain_id)); 00544 path.push_back(topic_name); 00545 path.push_back(type_name); 00546 dir = dir->get_dir(path); 00547 // dir is now the "type" directory, which is shared by all datawriters 00548 // of the domain/topic/type. We actually need a new directory per 00549 // datawriter and this assumes that insert() is called once per 00550 // datawriter, as is currently the case. 00551 dir = dir->create_next_dir(); 00552 path.push_back(dir->name()); // for use by the Cleanup_Handler 00553 00554 } catch (const std::exception& ex) { 00555 if (DCPS_debug_level > 0) { 00556 ACE_ERROR((LM_ERROR, 00557 ACE_TEXT("(%P|%t) DataDurabilityCache::insert ") 00558 ACE_TEXT("couldn't create directory for PERSISTENT ") 00559 ACE_TEXT("data: %C\n"), ex.what())); 00560 } 00561 00562 dir.reset(); 00563 } 00564 } 00565 00566 if (this->samples_->find(key, sample_list, allocator) != 0) { 00567 // Create a new list (actually an ACE_Array_Base<>) with the 00568 // appropriate allocator passed to its constructor. 00569 ACE_NEW_MALLOC_RETURN( 00570 sample_list, 00571 static_cast<sample_list_type *>( 00572 allocator->malloc(sizeof(sample_list_type))), 00573 sample_list_type(1, static_cast<data_queue_type *>(0), allocator), 00574 false); 00575 00576 if (this->samples_->bind(key, sample_list, allocator) != 0) 00577 return false; 00578 } 00579 00580 data_queue_type ** const begin = &((*sample_list)[0]); 00581 data_queue_type ** const end = 00582 begin + sample_list->size(); 00583 00584 // Find an empty slot in the array. This is a linear search but 00585 // that should be fine for the common case, i.e. a small number of 00586 // DataWriters that push data into the cache. 00587 slot = std::find(begin, 00588 end, 00589 static_cast<data_queue_type *>(0)); 00590 00591 if (slot == end) { 00592 // No available slots. Grow the array accordingly. 00593 size_t const old_len = sample_list->size(); 00594 sample_list->size(old_len + 1); 00595 00596 data_queue_type ** new_begin = &((*sample_list)[0]); 00597 slot = new_begin + old_len; 00598 } 00599 00600 ACE_NEW_MALLOC_RETURN( 00601 samples, 00602 static_cast<data_queue_type *>( 00603 allocator->malloc(sizeof(data_queue_type))), 00604 data_queue_type(allocator), 00605 false); 00606 00607 // Insert the samples in to the sample list. 00608 *slot = samples; 00609 00610 if (!dir.is_nil()) { 00611 samples->fs_path_ = path; 00612 } 00613 00614 for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) { 00615 DataSampleElement& elem = *i; 00616 00617 // N.B. Do not persist samples with coherent changes. 00618 // To verify, we check the DataSampleHeader for the 00619 // coherent_change_ flag. The DataSampleHeader will 00620 // always be the first message block in the chain. 00621 // 00622 // It should be noted that persisting coherent changes 00623 // is a non-trivial task, and should be handled when 00624 // finalizing persistence profile conformance. 00625 if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) { 00626 continue; // skip coherent sample 00627 } 00628 00629 sample_data_type sample(elem, allocator); 00630 00631 if (samples->enqueue_tail(sample) != 0) 00632 return false; 00633 00634 if (!dir.is_nil()) { 00635 try { 00636 File::Ptr f = dir->create_next_file(); 00637 std::ofstream os; 00638 00639 if (!f->write(os)) return false; 00640 00641 DDS::Time_t timestamp; 00642 const char * data; 00643 size_t len; 00644 sample.get_sample(data, len, timestamp); 00645 00646 os << timestamp.sec << ' ' << timestamp.nanosec << ' '; 00647 os.write(data, len); 00648 00649 } catch (const std::exception& ex) { 00650 if (DCPS_debug_level > 0) { 00651 ACE_ERROR((LM_ERROR, 00652 ACE_TEXT("(%P|%t) DataDurabilityCache::insert ") 00653 ACE_TEXT("couldn't write sample for PERSISTENT ") 00654 ACE_TEXT("data: %C\n"), ex.what())); 00655 } 00656 } 00657 } 00658 } 00659 } 00660 00661 // ----------- 00662 00663 // Schedule cleanup timer. 00664 //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent) 00665 ACE_Time_Value const cleanup_delay( 00666 duration_to_time_value(qos.service_cleanup_delay)); 00667 00668 if (cleanup_delay > ACE_Time_Value::zero) { 00669 if (OpenDDS::DCPS::DCPS_debug_level >= 4) { 00670 ACE_DEBUG((LM_DEBUG, 00671 ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ") 00672 ACE_TEXT("cleanup for\n") 00673 ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ") 00674 ACE_TEXT("== (%d, %C, %C)\n"), 00675 domain_id, 00676 topic_name, 00677 type_name)); 00678 } 00679 00680 Cleanup_Handler * const cleanup = 00681 new Cleanup_Handler(*sample_list, 00682 slot - &(*sample_list)[0], 00683 this->allocator_.get(), 00684 path, 00685 this->data_dir_); 00686 ACE_Event_Handler_var safe_cleanup(cleanup); // Transfer ownership 00687 long const tid = 00688 this->reactor_->schedule_timer(cleanup, 00689 0, // ACT 00690 cleanup_delay); 00691 if (tid == -1) { 00692 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false); 00693 00694 ACE_DES_FREE(samples, 00695 this->allocator_->free, 00696 DurabilityQueue<sample_data_type>); 00697 *slot = 0; 00698 00699 return false; 00700 00701 } else { 00702 { 00703 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false); 00704 this->cleanup_timer_ids_.push_back(tid); 00705 } 00706 00707 cleanup->timer_id(tid, 00708 &this->cleanup_timer_ids_); 00709 } 00710 } 00711 00712 return true; 00713 }
typedef OpenDDS::DCPS::DataDurabilityCache::OPENDDS_LIST | ( | long | ) |
DataDurabilityCache& OpenDDS::DCPS::DataDurabilityCache::operator= | ( | DataDurabilityCache const & | ) | [private] |
unique_ptr<ACE_Allocator> const OpenDDS::DCPS::DataDurabilityCache::allocator_ [private] |
Allocator used to allocate memory for sample map and lists.
Definition at line 227 of file DataDurabilityCache.h.
Referenced by get_data(), init(), OpenDDS::DCPS::DataDurabilityCache::sample_data_type::init(), insert(), and ~DataDurabilityCache().
timer_id_list_type OpenDDS::DCPS::DataDurabilityCache::cleanup_timer_ids_ [private] |
Timer ID list.
Keep track of cleanup timer IDs in case we need to cancel before they expire.
Definition at line 241 of file DataDurabilityCache.h.
Referenced by insert(), and ~DataDurabilityCache().
Definition at line 231 of file DataDurabilityCache.h.
Definition at line 229 of file DataDurabilityCache.h.
ACE_SYNCH_MUTEX OpenDDS::DCPS::DataDurabilityCache::lock_ [private] |
Lock for synchronized access to the underlying map.
Definition at line 244 of file DataDurabilityCache.h.
Referenced by get_data(), and insert().
Reactor with which cleanup timers will be registered.
Definition at line 247 of file DataDurabilityCache.h.
Referenced by init(), insert(), and ~DataDurabilityCache().
Map of all data samples.
Definition at line 234 of file DataDurabilityCache.h.
Referenced by get_data(), init(), insert(), and ~DataDurabilityCache().