10 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 12 #include "dds/DdsDcpsDomainC.h" 13 #include "dds/DdsDcpsTypeSupportExtC.h" 40 if (path.empty())
return;
43 Directory::Ptr dir = Directory::create(data_dir.
c_str());
45 Directory::Ptr parent = dir->parent();
49 while (!parent.is_nil() &&
50 (parent->begin_dirs() == parent->end_dirs())) {
51 Directory::Ptr to_delete = parent;
52 parent = parent->parent();
70 typedef ptrdiff_t list_difference_type;
72 Cleanup_Handler(list_type & sample_list,
73 list_difference_type index,
77 : sample_list_(sample_list)
79 , allocator_(allocator)
94 ACE_TEXT(
"(%P|%t) OpenDDS - Cleaning up ")
95 ACE_TEXT(
"data durability cache.\n")));
103 data_queue_type *& queue = this->sample_list_[this->index_];
105 this->allocator_->free,
110 cleanup_directory(path_, this->data_dir_);
112 }
catch (
const std::exception& ex) {
115 ACE_TEXT(
"(%P|%t) Cleanup_Handler::handle_timout ")
116 ACE_TEXT(
"couldn't remove directory for PERSISTENT ")
117 ACE_TEXT(
"data: %C\n"), ex.what()));
122 this->timer_ids_->remove(this->tid_);
129 OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type * timer_ids) {
131 this->timer_ids_ = timer_ids;
136 virtual ~Cleanup_Handler() {}
142 list_type & sample_list_;
145 list_difference_type
const index_;
160 OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type *
318 static_cast<sample_map_type *>(
329 Directory::Ptr root_dir = Directory::create(this->
data_dir_.
c_str());
332 for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
333 domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
334 path[0] = domain->name();
340 for (Directory::DirectoryIterator topic = domain->begin_dirs(),
341 topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
342 path[1] = topic->name();
344 for (Directory::DirectoryIterator type = topic->begin_dirs(),
345 type_end = topic->end_dirs(); type != type_end; ++type) {
346 path[2] = type->name();
348 key_type key(domain_id, path[1].c_str(), path[2].c_str(),
352 static_cast<sample_list_type *>(
358 for (Directory::DirectoryIterator dw = type->begin_dirs(),
359 dw_end = type->end_dirs(); dw != dw_end; ++dw) {
360 path[3] = dw->name();
362 size_t old_len = sample_list->
size();
363 sample_list->
size(old_len + 1);
364 data_queue_type *& slot = (*sample_list)[old_len];
368 data_queue_type * sample_queue = 0;
370 static_cast<data_queue_type *>(
371 allocator->
malloc(
sizeof(data_queue_type))),
372 data_queue_type(allocator));
375 sample_queue->fs_path_ = path;
377 for (Directory::FileIterator file = dw->begin_files(),
378 file_end = dw->end_files(); file != file_end; ++file) {
381 if (!file->read(is)) {
384 ACE_TEXT(
"(%P|%t) DataDurabilityCache::init ")
385 ACE_TEXT(
"couldn't open file for PERSISTENT ")
386 ACE_TEXT(
"data: %C\n"), file->name().c_str()));
392 is >> timestamp.
sec >> timestamp.
nanosec >> std::noskipws;
395 const size_t CHUNK = 4096;
404 current->
wr_ptr((
size_t)is.gcount());
406 if (current->
space() == 0) {
413 sample_queue->enqueue_tail(
430 timer_id_list_type::const_iterator
const end(
433 for (timer_id_list_type::const_iterator i(
449 size_t const len = list->
size();
451 for (
size_t l = 0; l != len; ++l) {
467 #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *> 469 #undef OPENDDS_MAP_TYPE 476 char const * topic_name,
477 char const * type_name,
481 if (the_data.
size() == 0)
502 else if (the_data.
size() > depth) {
514 ssize_t const advance_amount = the_data.
size() - depth;
515 std::advance(element, advance_amount);
530 data_queue_type ** slot = 0;
531 data_queue_type * samples = 0;
547 path.push_back(topic_name);
548 path.push_back(type_name);
549 dir = dir->get_dir(path);
554 dir = dir->create_next_dir();
555 path.push_back(dir->name());
557 }
catch (
const std::exception& ex) {
560 ACE_TEXT(
"(%P|%t) DataDurabilityCache::insert ")
561 ACE_TEXT(
"couldn't create directory for PERSISTENT ")
562 ACE_TEXT(
"data: %C\n"), ex.what()));
574 static_cast<sample_list_type *>(
583 data_queue_type **
const begin = &((*sample_list)[0]);
584 data_queue_type **
const end =
585 begin + sample_list->size();
590 slot = std::find(begin,
592 static_cast<data_queue_type *>(0));
596 size_t const old_len = sample_list->size();
597 sample_list->size(old_len + 1);
599 data_queue_type ** new_begin = &((*sample_list)[0]);
600 slot = new_begin + old_len;
605 static_cast<data_queue_type *>(
606 allocator->
malloc(
sizeof(data_queue_type))),
607 data_queue_type(allocator),
614 samples->fs_path_ = path;
634 if (samples->enqueue_tail(sample) != 0)
639 File::Ptr f = dir->create_next_file();
642 if (!f->write(os))
return false;
649 os << timestamp.
sec <<
' ' << timestamp.
nanosec <<
' ';
652 }
catch (
const std::exception& ex) {
655 ACE_TEXT(
"(%P|%t) DataDurabilityCache::insert ")
656 ACE_TEXT(
"couldn't write sample for PERSISTENT ")
657 ACE_TEXT(
"data: %C\n"), ex.what()));
670 if (!cleanup_delay.
is_zero()) {
673 ACE_TEXT(
"OpenDDS (%P|%t) Scheduling durable data ")
675 ACE_TEXT(
"OpenDDS (%P|%t) (domain_id, topic, type) ")
682 Cleanup_Handler *
const cleanup =
683 new Cleanup_Handler(*sample_list,
684 slot - &(*sample_list)[0],
692 cleanup_delay.
value());
709 cleanup->timer_id(tid,
720 char const * topic_name,
721 char const * type_name,
741 else if (p_sample_list == 0)
751 if (sample_list[0]->
get(registration_data, 0) == -1)
754 char const * marshaled_sample = 0;
755 size_t marshaled_sample_length = 0;
758 registration_data->
get_sample(marshaled_sample,
759 marshaled_sample_length,
760 registration_timestamp);
774 marshaled_sample_length);
776 registration_sample->
wr_ptr(marshaled_sample_length);
787 move(registration_sample),
788 registration_timestamp);
794 size_t const len = sample_list.
size();
796 for (
size_t i = 0; i != len; ++i) {
797 data_queue_type *
const q = sample_list[i];
799 for (data_queue_type::ITERATOR j = q->begin();
804 if (j.next(data) == 0)
807 char const * sample = 0;
808 size_t sample_length = 0;
811 data->
get_sample(sample, sample_length, source_timestamp);
815 static_cast<ACE_Message_Block*>(
836 mb->
wr_ptr(sample_length);
858 cleanup_directory(q->fs_path_, this->data_dir_);
860 }
catch (
const std::exception& ex) {
863 ACE_TEXT(
"(%P|%t) DataDurabilityCache::get_data ")
864 ACE_TEXT(
"couldn't remove directory for PERSISTENT ")
865 ACE_TEXT(
"data: %C\n"), ex.what()));
872 #endif // OPENDDS_NO_PERSISTENCE_PROFILE
void swap(MessageBlock &lhs, MessageBlock &rhs)
const char * c_str(void) const
DurabilityArray< DurabilityQueue< sample_data_type > * > sample_list_type
bool insert(DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos)
const DataSampleHeader & get_header() const
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
iterator end()
Return iterator to end of list.
DDS::DurabilityQosPolicyKind kind_
String to_dds_string(unsigned short to_convert)
void * memcpy(void *t, const void *s, size_t len)
ACE_SYNCH_MUTEX lock_
Lock for synchronized access to the underlying map.
ACE_UINT32 source_timestamp_nanosec_
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
virtual void free(void *ptr)=0
DataBlockLockPool::DataBlockLock * get_db_lock()
int bind(const EXT_ID &, const INT_ID &, ACE_Allocator *alloc)
int find(const EXT_ID &, INT_ID &, ACE_Allocator *alloc)
const ACE_Time_Value & value() const
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
T::rv_reference move(T &p)
void set_allocator(ACE_Allocator *allocator)
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
Duration_t service_cleanup_delay
Sample list data type for all samples.
ACE_Allocator * allocator_
Queue class that provides a means to reset the underlying ACE_Allocator.
DOMAINID_TYPE_NATIVE DomainId_t
bool get_data(DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataWriterImpl *data_writer, ACE_Allocator *mb_allocator, ACE_Allocator *db_allocator, DDS::LifespanQosPolicy const &)
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
DataSample * get_sample() const
sample_map_type * samples_
Map of all data samples.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
long max_samples_per_instance
Array class that provides a means to reset the underlying ACE_Allocator.
virtual ACE_Message_Block * release(void)
size_type size(void) const
DDS::Time_t source_timestamp_
ACE_Message_Block * cont(void) const
iterator begin()
Return iterator to beginning of list.
ACE_Allocator * allocator_
ACE_Hash_Map_With_Allocator< key_type, sample_list_type * > sample_map_type
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
size_t total_length(void) const
char * wr_ptr(void) const
timer_id_list_type cleanup_timer_ids_
Timer ID list.
DDS::ReturnCode_t write(Message_Block_Ptr sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out, const void *real_data)
#define ACE_ALLOCATOR(POINTER, ALLOCATOR)
HANDLE_TYPE_NATIVE InstanceHandle_t
ACE_INT32 source_timestamp_sec_
Directory::Ptr get_dir(const OPENDDS_VECTOR(OPENDDS_STRING)&path)
#define OPENDDS_VECTOR(T)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
SendStateDataSampleList STL-style iterator implementation.
static const ACE_Time_Value zero
void get_sample(char const *&s, size_t &len, DDS::Time_t &source_timestamp)
DDS::ReturnCode_t register_instance_from_durable_data(DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
sample_data_type & operator=(sample_data_type const &rhs)
const ReturnCode_t RETCODE_OK
void init(const ACE_Message_Block *data)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
const long LENGTH_UNLIMITED
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
ACE_Reactor_Timer_Interface * reactor_
Reactor with which cleanup timers will be registered.
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
virtual int handle_timeout(const ACE_Time_Value ¤t_time, const void *act=0)
#define TheServiceParticipant
DataDurabilityCache(DDS::DurabilityQosPolicyKind kind)
Key type for underlying maps.
HistoryQosPolicyKind history_kind
virtual void * malloc(size_type nbytes)=0
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)