Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 :
10 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
11 :
12 : #include "dds/DdsDcpsDomainC.h"
13 : #include "dds/DdsDcpsTypeSupportExtC.h"
14 : #include "DataDurabilityCache.h"
15 : #include "SendStateDataSampleList.h"
16 : #include "DataSampleElement.h"
17 : #include "WriteDataContainer.h"
18 : #include "DataWriterImpl.h"
19 : #include "Time_Helper.h"
20 : #include "debug.h"
21 : #include "SafetyProfileStreams.h"
22 : #include "Service_Participant.h"
23 : #include "RcEventHandler.h"
24 :
25 : #include "ace/Reactor.h"
26 : #include "ace/Message_Block.h"
27 : #include "ace/Log_Msg.h"
28 : #include "ace/Malloc_T.h"
29 : #include "ace/MMAP_Memory_Pool.h"
30 : #include "ace/OS_NS_sys_time.h"
31 :
32 : #include <fstream>
33 : #include <algorithm>
34 :
35 : namespace {
36 :
37 0 : void cleanup_directory(const OPENDDS_VECTOR(OPENDDS_STRING) & path,
38 : const ACE_CString & data_dir)
39 : {
40 0 : if (path.empty()) return;
41 :
42 : using OpenDDS::FileSystemStorage::Directory;
43 0 : Directory::Ptr dir = Directory::create(data_dir.c_str());
44 0 : dir = dir->get_dir(path);
45 0 : Directory::Ptr parent = dir->parent();
46 0 : dir->remove();
47 :
48 : // clean up empty directories
49 0 : while (!parent.is_nil() &&
50 0 : (parent->begin_dirs() == parent->end_dirs())) {
51 0 : Directory::Ptr to_delete = parent;
52 0 : parent = parent->parent();
53 0 : to_delete->remove();
54 0 : }
55 0 : }
56 :
57 : /**
58 : * @class Cleanup_Handler
59 : *
60 : * @brief Event handler that is called when @c service_cleanup_delay
61 : * period expires.
62 : */
63 : class Cleanup_Handler : public virtual OpenDDS::DCPS::RcEventHandler {
64 : public:
65 :
66 : typedef
67 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type data_type;
68 : typedef
69 : OpenDDS::DCPS::DataDurabilityCache::sample_list_type list_type;
70 : typedef ptrdiff_t list_difference_type;
71 :
72 0 : Cleanup_Handler(list_type & sample_list,
73 : list_difference_type index,
74 : ACE_Allocator * allocator,
75 : const OPENDDS_VECTOR(OPENDDS_STRING) & path,
76 : const ACE_CString & data_dir)
77 0 : : sample_list_(sample_list)
78 0 : , index_(index)
79 0 : , allocator_(allocator)
80 0 : , tid_(-1)
81 0 : , timer_ids_(0)
82 0 : , path_(path)
83 0 : , data_dir_(data_dir)
84 : {
85 0 : }
86 :
87 0 : virtual int handle_timeout(const ACE_Time_Value& /* current_time */,
88 : const void* /* act */)
89 : {
90 0 : OpenDDS::DCPS::ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
91 :
92 0 : if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
93 0 : ACE_DEBUG((LM_DEBUG,
94 : ACE_TEXT("(%P|%t) OpenDDS - Cleaning up ")
95 : ACE_TEXT("data durability cache.\n")));
96 : }
97 :
98 : typedef OpenDDS::DCPS::DurabilityQueue<
99 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type>
100 : data_queue_type;
101 :
102 : // Cleanup all data samples corresponding to the cleanup delay.
103 0 : data_queue_type *& queue = this->sample_list_[this->index_];
104 0 : ACE_DES_FREE(queue,
105 : this->allocator_->free,
106 : data_queue_type);
107 0 : queue = 0;
108 :
109 : try {
110 0 : cleanup_directory(path_, this->data_dir_);
111 :
112 0 : } catch (const std::exception& ex) {
113 0 : if (OpenDDS::DCPS::DCPS_debug_level > 0) {
114 0 : ACE_ERROR((LM_ERROR,
115 : ACE_TEXT("(%P|%t) Cleanup_Handler::handle_timout ")
116 : ACE_TEXT("couldn't remove directory for PERSISTENT ")
117 : ACE_TEXT("data: %C\n"), ex.what()));
118 : }
119 0 : }
120 :
121 : // No longer any need to keep track of the timer ID.
122 0 : this->timer_ids_->remove(this->tid_);
123 :
124 0 : return 0;
125 0 : }
126 :
127 0 : void timer_id(
128 : long tid,
129 : OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type * timer_ids) {
130 0 : this->tid_ = tid;
131 0 : this->timer_ids_ = timer_ids;
132 0 : }
133 :
134 : protected:
135 :
136 0 : virtual ~Cleanup_Handler() {}
137 :
138 : private:
139 :
140 : /// List containing samples to be cleaned up when the cleanup timer
141 : /// expires.
142 : list_type & sample_list_;
143 :
144 : /// Location in list/array of queue to be deallocated.
145 : list_difference_type const index_;
146 :
147 : /// Allocator to be used when deallocating data queue.
148 : ACE_Allocator * const allocator_;
149 :
150 : /// Timer ID corresponding to this cleanup event handler.
151 : long tid_;
152 :
153 : /// List of timer IDs.
154 : /**
155 : * If the cleanup timer fires successfully, the timer ID must be
156 : * removed from the timer ID list so that a subsequent attempt to
157 : * cancel the timer during durability cache destruction does not
158 : * occur.
159 : */
160 : OpenDDS::DCPS::DataDurabilityCache::timer_id_list_type *
161 : timer_ids_;
162 :
163 : OPENDDS_VECTOR(OPENDDS_STRING) path_;
164 :
165 : ACE_CString data_dir_;
166 : };
167 :
168 : } // namespace
169 :
170 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type()
171 0 : : length_(0)
172 0 : , sample_(0)
173 0 : , allocator_(0)
174 : {
175 0 : this->source_timestamp_.sec = 0;
176 0 : this->source_timestamp_.nanosec = 0;
177 0 : }
178 :
179 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
180 : DataSampleElement & element,
181 0 : ACE_Allocator * a)
182 0 : : length_(0)
183 0 : , sample_(0)
184 0 : , allocator_(a)
185 : {
186 0 : this->source_timestamp_.sec = element.get_header().source_timestamp_sec_;
187 0 : this->source_timestamp_.nanosec = element.get_header().source_timestamp_nanosec_;
188 :
189 : // Only copy the data provided by the user. The DataSampleHeader
190 : // will be reconstructed when the durable data is retrieved by a
191 : // DataWriterImpl instance.
192 : //
193 : // The user's data is stored in the first message block
194 : // continuation.
195 0 : ACE_Message_Block const * const data = element.get_sample()->cont();
196 0 : init(data);
197 0 : }
198 :
199 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
200 0 : DDS::Time_t timestamp, const ACE_Message_Block & mb, ACE_Allocator * a)
201 0 : : length_(0)
202 0 : , sample_(0)
203 0 : , source_timestamp_(timestamp)
204 0 : , allocator_(a)
205 : {
206 0 : init(&mb);
207 0 : }
208 :
209 : void
210 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::init
211 : (const ACE_Message_Block * data)
212 : {
213 0 : this->length_ = data->total_length();
214 :
215 0 : ACE_ALLOCATOR(this->sample_,
216 : static_cast<char *>(
217 : this->allocator_->malloc(this->length_)));
218 :
219 0 : char * buf = this->sample_;
220 :
221 0 : for (ACE_Message_Block const * i = data;
222 0 : i != 0;
223 0 : i = i->cont()) {
224 0 : ACE_OS::memcpy(buf, i->rd_ptr(), i->length());
225 0 : buf += i->length();
226 : }
227 : }
228 :
229 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::sample_data_type(
230 0 : sample_data_type const & rhs)
231 0 : : length_(rhs.length_)
232 0 : , sample_(0)
233 0 : , allocator_(rhs.allocator_)
234 : {
235 0 : this->source_timestamp_.sec = rhs.source_timestamp_.sec;
236 0 : this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
237 :
238 0 : if (this->allocator_) {
239 0 : ACE_ALLOCATOR(this->sample_,
240 : static_cast<char *>(
241 : this->allocator_->malloc(rhs.length_)));
242 0 : ACE_OS::memcpy(this->sample_, rhs.sample_, rhs.length_);
243 : }
244 : }
245 :
246 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::~sample_data_type()
247 : {
248 0 : if (this->allocator_)
249 0 : this->allocator_->free(this->sample_);
250 0 : }
251 :
252 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type &
253 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::operator= (
254 : sample_data_type const & rhs)
255 : {
256 : // Strongly exception-safe copy assignment.
257 0 : sample_data_type tmp(rhs);
258 0 : std::swap(this->length_, tmp.length_);
259 0 : std::swap(this->sample_, tmp.sample_);
260 0 : std::swap(this->allocator_, tmp.allocator_);
261 :
262 0 : this->source_timestamp_.sec = rhs.source_timestamp_.sec;
263 0 : this->source_timestamp_.nanosec = rhs.source_timestamp_.nanosec;
264 :
265 0 : return *this;
266 0 : }
267 :
268 : void
269 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::get_sample(
270 : char const *& s,
271 : size_t & len,
272 : DDS::Time_t & source_timestamp)
273 : {
274 0 : s = this->sample_;
275 0 : len = this->length_;
276 0 : source_timestamp.sec = this->source_timestamp_.sec;
277 0 : source_timestamp.nanosec = this->source_timestamp_.nanosec;
278 0 : }
279 :
280 : void
281 0 : OpenDDS::DCPS::DataDurabilityCache::sample_data_type::set_allocator(
282 : ACE_Allocator * allocator)
283 : {
284 0 : this->allocator_ = allocator;
285 0 : }
286 :
287 0 : OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
288 0 : DDS::DurabilityQosPolicyKind kind)
289 0 : : allocator_(new ACE_New_Allocator)
290 0 : , kind_(kind)
291 0 : , samples_(0)
292 0 : , cleanup_timer_ids_()
293 0 : , lock_()
294 0 : , reactor_(0)
295 : {
296 0 : init();
297 0 : }
298 :
299 0 : OpenDDS::DCPS::DataDurabilityCache::DataDurabilityCache(
300 : DDS::DurabilityQosPolicyKind kind,
301 0 : ACE_CString & data_dir)
302 0 : : allocator_(new ACE_New_Allocator)
303 0 : , kind_(kind)
304 0 : , data_dir_(data_dir)
305 0 : , samples_(0)
306 0 : , cleanup_timer_ids_()
307 0 : , lock_()
308 0 : , reactor_(0)
309 : {
310 0 : init();
311 0 : }
312 :
313 0 : void OpenDDS::DCPS::DataDurabilityCache::init()
314 : {
315 0 : ACE_Allocator * const allocator = this->allocator_.get();
316 0 : ACE_NEW_MALLOC(
317 : this->samples_,
318 : static_cast<sample_map_type *>(
319 : allocator->malloc(sizeof(sample_map_type))),
320 : sample_map_type(allocator));
321 :
322 : typedef DurabilityQueue<sample_data_type> data_queue_type;
323 :
324 0 : if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
325 : // Read data from the filesystem and create the in-memory data structures
326 : // as if we had called insert() once for each "datawriter" directory.
327 : using OpenDDS::FileSystemStorage::Directory;
328 : using OpenDDS::FileSystemStorage::File;
329 0 : Directory::Ptr root_dir = Directory::create(this->data_dir_.c_str());
330 0 : OPENDDS_VECTOR(OPENDDS_STRING) path(4); // domain, topic, type, datawriter
331 :
332 0 : for (Directory::DirectoryIterator domain = root_dir->begin_dirs(),
333 0 : domain_end = root_dir->end_dirs(); domain != domain_end; ++domain) {
334 0 : path[0] = domain->name();
335 : DDS::DomainId_t domain_id;
336 : {
337 0 : domain_id = ACE_OS::atoi(path[0].c_str());
338 : }
339 :
340 0 : for (Directory::DirectoryIterator topic = domain->begin_dirs(),
341 0 : topic_end = domain->end_dirs(); topic != topic_end; ++topic) {
342 0 : path[1] = topic->name();
343 :
344 0 : for (Directory::DirectoryIterator type = topic->begin_dirs(),
345 0 : type_end = topic->end_dirs(); type != type_end; ++type) {
346 0 : path[2] = type->name();
347 :
348 0 : key_type key(domain_id, path[1].c_str(), path[2].c_str(),
349 0 : allocator);
350 0 : sample_list_type * sample_list = 0;
351 0 : ACE_NEW_MALLOC(sample_list,
352 : static_cast<sample_list_type *>(
353 : allocator->malloc(sizeof(sample_list_type))),
354 : sample_list_type(0, static_cast<data_queue_type *>(0),
355 : allocator));
356 0 : this->samples_->bind(key, sample_list, allocator);
357 :
358 0 : for (Directory::DirectoryIterator dw = type->begin_dirs(),
359 0 : dw_end = type->end_dirs(); dw != dw_end; ++dw) {
360 0 : path[3] = dw->name();
361 :
362 0 : size_t old_len = sample_list->size();
363 0 : sample_list->size(old_len + 1);
364 0 : data_queue_type *& slot = (*sample_list)[old_len];
365 :
366 : // This variable is called "samples" in the insert() method be
367 : // we already have a "samples_" which is the overall data structure.
368 0 : data_queue_type * sample_queue = 0;
369 0 : ACE_NEW_MALLOC(sample_queue,
370 : static_cast<data_queue_type *>(
371 : allocator->malloc(sizeof(data_queue_type))),
372 : data_queue_type(allocator));
373 :
374 0 : slot = sample_queue;
375 0 : sample_queue->fs_path_ = path;
376 :
377 0 : for (Directory::FileIterator file = dw->begin_files(),
378 0 : file_end = dw->end_files(); file != file_end; ++file) {
379 0 : std::ifstream is;
380 :
381 0 : if (!file->read(is)) {
382 0 : if (DCPS_debug_level) {
383 0 : ACE_ERROR((LM_ERROR,
384 : ACE_TEXT("(%P|%t) DataDurabilityCache::init ")
385 : ACE_TEXT("couldn't open file for PERSISTENT ")
386 : ACE_TEXT("data: %C\n"), file->name().c_str()));
387 : }
388 0 : continue;
389 0 : }
390 :
391 : DDS::Time_t timestamp;
392 0 : is >> timestamp.sec >> timestamp.nanosec >> std::noskipws;
393 0 : is.get(); // consume separator
394 :
395 0 : const size_t CHUNK = 4096;
396 0 : ACE_Message_Block mb(CHUNK);
397 0 : ACE_Message_Block * current = &mb;
398 :
399 0 : while (!is.eof()) {
400 0 : is.read(current->wr_ptr(), current->space());
401 :
402 0 : if (is.bad()) break;
403 :
404 0 : current->wr_ptr((size_t)is.gcount());
405 :
406 0 : if (current->space() == 0) {
407 0 : ACE_Message_Block * old = current;
408 0 : current = new ACE_Message_Block(CHUNK);
409 0 : old->cont(current);
410 : }
411 : }
412 :
413 0 : sample_queue->enqueue_tail(
414 0 : sample_data_type(timestamp, mb, allocator));
415 :
416 0 : if (mb.cont()) mb.cont()->release(); // delete the cont() chain
417 0 : }
418 0 : }
419 0 : }
420 0 : }
421 0 : }
422 0 : }
423 :
424 0 : this->reactor_ = TheServiceParticipant->timer();
425 : }
426 :
427 0 : OpenDDS::DCPS::DataDurabilityCache::~DataDurabilityCache()
428 : {
429 : // Cancel timers that haven't expired yet.
430 : timer_id_list_type::const_iterator const end(
431 0 : this->cleanup_timer_ids_.end());
432 :
433 0 : for (timer_id_list_type::const_iterator i(
434 0 : this->cleanup_timer_ids_.begin());
435 0 : i != end;
436 0 : ++i) {
437 0 : (void) this->reactor_->cancel_timer(*i);
438 : }
439 :
440 : // Clean up memory that isn't automatically managed.
441 0 : if (this->allocator_.get() != 0) {
442 0 : sample_map_type::iterator const map_end = this->samples_->end();
443 :
444 0 : for (sample_map_type::iterator s = this->samples_->begin();
445 0 : s != map_end;
446 0 : ++s) {
447 0 : sample_list_type * const list = (*s).int_id_;
448 :
449 0 : size_t const len = list->size();
450 :
451 0 : for (size_t l = 0; l != len; ++l) {
452 0 : ACE_DES_FREE((*list)[l],
453 : this->allocator_->free,
454 : DurabilityQueue<sample_data_type>);
455 : }
456 :
457 0 : ACE_DES_FREE(list,
458 : this->allocator_->free,
459 : DurabilityArray<DurabilityQueue<sample_data_type> *>);
460 : }
461 :
462 : // Yes, this looks strange but please leave it in place. The third param
463 : // to ACE_DES_FREE must be the actual class name since it's used in an
464 : // explicit desturctor call (~T()). Typedefs are not allowed here. This
465 : // is why the two ACE_DES_FREE's above are not the typedefs. Below we use
466 : // a macro to hide the internal comma from ACE_DES_FREE's macro expansion.
467 : #define OPENDDS_MAP_TYPE ACE_Hash_Map_With_Allocator<key_type, sample_list_type *>
468 0 : ACE_DES_FREE(this->samples_, this->allocator_->free, OPENDDS_MAP_TYPE);
469 : #undef OPENDDS_MAP_TYPE
470 : }
471 0 : }
472 :
473 : bool
474 0 : OpenDDS::DCPS::DataDurabilityCache::insert(
475 : DDS::DomainId_t domain_id,
476 : char const * topic_name,
477 : char const * type_name,
478 : SendStateDataSampleList & the_data,
479 : DDS::DurabilityServiceQosPolicy const & qos)
480 : {
481 0 : if (the_data.size() == 0)
482 0 : return true; // Nothing to cache.
483 :
484 : // Apply DURABILITY_SERVICE QoS HISTORY and RESOURCE_LIMITS related
485 : // settings prior to data insertion into the cache.
486 0 : int depth = qos.history_kind == DDS::KEEP_ALL_HISTORY_QOS
487 0 : ? qos.max_samples_per_instance
488 : : qos.history_depth;
489 :
490 0 : if (depth == DDS::LENGTH_UNLIMITED)
491 0 : depth = 0x7fffffff;
492 :
493 : // Iterator to first DataSampleElement to be copied.
494 0 : SendStateDataSampleList::iterator element(the_data.begin());
495 :
496 0 : if (depth < 0)
497 0 : return false; // Should never occur.
498 :
499 0 : else if (depth == 0)
500 0 : return true; // Nothing else to do. Discard all data.
501 :
502 0 : else if (the_data.size() > depth) {
503 : // N.B. Dropping data samples does not take into account
504 : // those samples which are not actually persisted (i.e.
505 : // samples with the coherent_sample_ flag set). The spec
506 : // does not provide any guidance in this case, therefore
507 : // we opt for the simplest solution and assume that there
508 : // are no change sets when calculating the number of
509 : // samples to drop.
510 :
511 : // Drop "old" samples. Only keep the "depth" most recent
512 : // samples, i.e. those found at the tail end of the
513 : // SendStateDataSampleList.
514 0 : ssize_t const advance_amount = the_data.size() - depth;
515 0 : std::advance(element, advance_amount);
516 : }
517 :
518 : // -----------
519 :
520 : // Copy samples to the domain/topic/type-specific cache.
521 :
522 : key_type const key(domain_id,
523 : topic_name,
524 : type_name,
525 0 : this->allocator_.get());
526 0 : SendStateDataSampleList::iterator the_end(the_data.end());
527 0 : sample_list_type * sample_list = 0;
528 :
529 : typedef DurabilityQueue<sample_data_type> data_queue_type;
530 0 : data_queue_type ** slot = 0;
531 0 : data_queue_type * samples = 0; // sample_list_type::value_type
532 :
533 : using OpenDDS::FileSystemStorage::Directory;
534 : using OpenDDS::FileSystemStorage::File;
535 0 : Directory::Ptr dir;
536 0 : OPENDDS_VECTOR(OPENDDS_STRING) path;
537 : {
538 0 : ACE_Allocator * const allocator = this->allocator_.get();
539 :
540 0 : ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
541 :
542 0 : if (this->kind_ == DDS::PERSISTENT_DURABILITY_QOS) {
543 : try {
544 0 : dir = Directory::create(this->data_dir_.c_str());
545 :
546 0 : path.push_back(to_dds_string(domain_id));
547 0 : path.push_back(topic_name);
548 0 : path.push_back(type_name);
549 0 : dir = dir->get_dir(path);
550 : // dir is now the "type" directory, which is shared by all datawriters
551 : // of the domain/topic/type. We actually need a new directory per
552 : // datawriter and this assumes that insert() is called once per
553 : // datawriter, as is currently the case.
554 0 : dir = dir->create_next_dir();
555 0 : path.push_back(dir->name()); // for use by the Cleanup_Handler
556 :
557 0 : } catch (const std::exception& ex) {
558 0 : if (DCPS_debug_level > 0) {
559 0 : ACE_ERROR((LM_ERROR,
560 : ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
561 : ACE_TEXT("couldn't create directory for PERSISTENT ")
562 : ACE_TEXT("data: %C\n"), ex.what()));
563 : }
564 :
565 0 : dir.reset();
566 0 : }
567 : }
568 :
569 0 : if (this->samples_->find(key, sample_list, allocator) != 0) {
570 : // Create a new list (actually an ACE_Array_Base<>) with the
571 : // appropriate allocator passed to its constructor.
572 0 : ACE_NEW_MALLOC_RETURN(
573 : sample_list,
574 : static_cast<sample_list_type *>(
575 : allocator->malloc(sizeof(sample_list_type))),
576 : sample_list_type(1, static_cast<data_queue_type *>(0), allocator),
577 : false);
578 :
579 0 : if (this->samples_->bind(key, sample_list, allocator) != 0)
580 0 : return false;
581 : }
582 :
583 0 : data_queue_type ** const begin = &((*sample_list)[0]);
584 : data_queue_type ** const end =
585 0 : begin + sample_list->size();
586 :
587 : // Find an empty slot in the array. This is a linear search but
588 : // that should be fine for the common case, i.e. a small number of
589 : // DataWriters that push data into the cache.
590 0 : slot = std::find(begin,
591 : end,
592 : static_cast<data_queue_type *>(0));
593 :
594 0 : if (slot == end) {
595 : // No available slots. Grow the array accordingly.
596 0 : size_t const old_len = sample_list->size();
597 0 : sample_list->size(old_len + 1);
598 :
599 0 : data_queue_type ** new_begin = &((*sample_list)[0]);
600 0 : slot = new_begin + old_len;
601 : }
602 :
603 0 : ACE_NEW_MALLOC_RETURN(
604 : samples,
605 : static_cast<data_queue_type *>(
606 : allocator->malloc(sizeof(data_queue_type))),
607 : data_queue_type(allocator),
608 : false);
609 :
610 : // Insert the samples in to the sample list.
611 0 : *slot = samples;
612 :
613 0 : if (!dir.is_nil()) {
614 0 : samples->fs_path_ = path;
615 : }
616 :
617 0 : for (SendStateDataSampleList::iterator i(element); i != the_end; ++i) {
618 0 : DataSampleElement& elem = *i;
619 :
620 : // N.B. Do not persist samples with coherent changes.
621 : // To verify, we check the DataSampleHeader for the
622 : // coherent_change_ flag. The DataSampleHeader will
623 : // always be the first message block in the chain.
624 : //
625 : // It should be noted that persisting coherent changes
626 : // is a non-trivial task, and should be handled when
627 : // finalizing persistence profile conformance.
628 0 : if (DataSampleHeader::test_flag(COHERENT_CHANGE_FLAG, elem.get_sample())) {
629 0 : continue; // skip coherent sample
630 : }
631 :
632 0 : sample_data_type sample(elem, allocator);
633 :
634 0 : if (samples->enqueue_tail(sample) != 0)
635 0 : return false;
636 :
637 0 : if (!dir.is_nil()) {
638 : try {
639 0 : File::Ptr f = dir->create_next_file();
640 0 : std::ofstream os;
641 :
642 0 : if (!f->write(os)) return false;
643 :
644 : DDS::Time_t timestamp;
645 : const char * data;
646 : size_t len;
647 0 : sample.get_sample(data, len, timestamp);
648 :
649 0 : os << timestamp.sec << ' ' << timestamp.nanosec << ' ';
650 0 : os.write(data, len);
651 :
652 0 : } catch (const std::exception& ex) {
653 0 : if (DCPS_debug_level > 0) {
654 0 : ACE_ERROR((LM_ERROR,
655 : ACE_TEXT("(%P|%t) DataDurabilityCache::insert ")
656 : ACE_TEXT("couldn't write sample for PERSISTENT ")
657 : ACE_TEXT("data: %C\n"), ex.what()));
658 : }
659 0 : }
660 : }
661 0 : }
662 0 : }
663 :
664 : // -----------
665 :
666 : // Schedule cleanup timer.
667 : //FUTURE: The cleanup delay needs to be persisted (if QoS is persistent)
668 0 : const TimeDuration cleanup_delay(qos.service_cleanup_delay);
669 :
670 0 : if (!cleanup_delay.is_zero()) {
671 0 : if (OpenDDS::DCPS::DCPS_debug_level >= 4) {
672 0 : ACE_DEBUG((LM_DEBUG,
673 : ACE_TEXT("OpenDDS (%P|%t) Scheduling durable data ")
674 : ACE_TEXT("cleanup for\n")
675 : ACE_TEXT("OpenDDS (%P|%t) (domain_id, topic, type) ")
676 : ACE_TEXT("== (%d, %C, %C)\n"),
677 : domain_id,
678 : topic_name,
679 : type_name));
680 : }
681 :
682 : Cleanup_Handler * const cleanup =
683 : new Cleanup_Handler(*sample_list,
684 0 : slot - &(*sample_list)[0],
685 0 : this->allocator_.get(),
686 : path,
687 0 : this->data_dir_);
688 0 : ACE_Event_Handler_var safe_cleanup(cleanup); // Transfer ownership
689 : long const tid =
690 0 : this->reactor_->schedule_timer(cleanup,
691 : 0, // ACT
692 0 : cleanup_delay.value());
693 0 : if (tid == -1) {
694 0 : ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
695 :
696 0 : ACE_DES_FREE(samples,
697 : this->allocator_->free,
698 : DurabilityQueue<sample_data_type>);
699 0 : *slot = 0;
700 :
701 0 : return false;
702 :
703 0 : } else {
704 : {
705 0 : ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
706 0 : this->cleanup_timer_ids_.push_back(tid);
707 0 : }
708 :
709 0 : cleanup->timer_id(tid,
710 : &this->cleanup_timer_ids_);
711 : }
712 0 : }
713 :
714 0 : return true;
715 0 : }
716 :
717 : bool
718 0 : OpenDDS::DCPS::DataDurabilityCache::get_data(
719 : DDS::DomainId_t domain_id,
720 : char const * topic_name,
721 : char const * type_name,
722 : DataWriterImpl * data_writer,
723 : ACE_Allocator * mb_allocator,
724 : ACE_Allocator * db_allocator,
725 : DDS::LifespanQosPolicy const & /* lifespan */)
726 : {
727 : key_type const key(domain_id,
728 : topic_name,
729 : type_name,
730 0 : this->allocator_.get());
731 :
732 0 : ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->lock_, false);
733 :
734 0 : sample_list_type * p_sample_list = 0;
735 :
736 0 : if (this->samples_->find(key,
737 : p_sample_list,
738 0 : this->allocator_.get()) == -1)
739 0 : return true; // No durable data for this domain/topic/type.
740 :
741 0 : else if (p_sample_list == 0)
742 0 : return false; // Should never happen.
743 :
744 0 : sample_list_type & sample_list = *p_sample_list;
745 :
746 : // We will register an instance, and then write all of the cached
747 : // data to the DataWriter using that instance.
748 :
749 0 : sample_data_type * registration_data = 0;
750 :
751 0 : if (sample_list[0]->get(registration_data, 0) == -1)
752 0 : return false;
753 :
754 0 : char const * marshaled_sample = 0;
755 0 : size_t marshaled_sample_length = 0;
756 : DDS::Time_t registration_timestamp;
757 :
758 0 : registration_data->get_sample(marshaled_sample,
759 : marshaled_sample_length,
760 : registration_timestamp);
761 :
762 : // Don't use the cached allocator for the registered sample message
763 : // block.
764 : Message_Block_Ptr registration_sample(
765 : new ACE_Message_Block(marshaled_sample_length,
766 : ACE_Message_Block::MB_DATA,
767 : 0, //cont
768 : 0, //data
769 : 0, //alloc_strategy
770 0 : data_writer->get_db_lock()));
771 :
772 0 : ACE_OS::memcpy(registration_sample->wr_ptr(),
773 : marshaled_sample,
774 : marshaled_sample_length);
775 :
776 0 : registration_sample->wr_ptr(marshaled_sample_length);
777 :
778 0 : DDS::InstanceHandle_t handle = DDS::HANDLE_NIL;
779 :
780 : /**
781 : * @todo Is this going to cause problems for users that set a finite
782 : * DDS::ResourceLimitsQosPolicy::max_instances value when
783 : * OpenDDS supports that value?
784 : */
785 : DDS::ReturnCode_t ret =
786 0 : data_writer->register_instance_from_durable_data(handle,
787 0 : move(registration_sample),
788 : registration_timestamp);
789 :
790 0 : if (ret != DDS::RETCODE_OK)
791 0 : return false;
792 :
793 : typedef DurabilityQueue<sample_data_type> data_queue_type;
794 0 : size_t const len = sample_list.size();
795 :
796 0 : for (size_t i = 0; i != len; ++i) {
797 0 : data_queue_type * const q = sample_list[i];
798 :
799 0 : for (data_queue_type::ITERATOR j = q->begin();
800 0 : !j.done();
801 0 : j.advance()) {
802 0 : sample_data_type * data = 0;
803 :
804 0 : if (j.next(data) == 0)
805 0 : return false; // Should never happen.
806 :
807 0 : char const * sample = 0; // Sample does not include header.
808 0 : size_t sample_length = 0;
809 : DDS::Time_t source_timestamp;
810 :
811 0 : data->get_sample(sample, sample_length, source_timestamp);
812 :
813 0 : ACE_Message_Block * tmp_mb = 0;
814 0 : ACE_NEW_MALLOC_RETURN(tmp_mb,
815 : static_cast<ACE_Message_Block*>(
816 : mb_allocator->malloc(
817 : sizeof(ACE_Message_Block))),
818 : ACE_Message_Block(
819 : sample_length,
820 : ACE_Message_Block::MB_DATA,
821 : 0, // cont
822 : 0, // data
823 : 0, // allocator_strategy
824 : data_writer->get_db_lock(), // data block locking_strategy
825 : ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
826 : ACE_Time_Value::zero,
827 : ACE_Time_Value::max_time,
828 : db_allocator,
829 : mb_allocator),
830 : false);
831 0 : Message_Block_Ptr mb(tmp_mb);
832 :
833 0 : ACE_OS::memcpy(mb->wr_ptr(),
834 : sample,
835 : sample_length);
836 0 : mb->wr_ptr(sample_length);
837 :
838 0 : const DDS::ReturnCode_t ret = data_writer->write(move(mb),
839 : handle,
840 : source_timestamp,
841 : 0 /* no content filtering */,
842 : 0 /* no pointer to data */);
843 :
844 0 : if (ret != DDS::RETCODE_OK) {
845 0 : return false;
846 : }
847 0 : }
848 :
849 : // Data successfully written. Empty the queue/list.
850 : /**
851 : * @todo If we don't empty the queue, we'll end up with duplicate
852 : * data since the data retrieved from the cache will be
853 : * reinserted.
854 : */
855 0 : q->reset();
856 :
857 : try {
858 0 : cleanup_directory(q->fs_path_, this->data_dir_);
859 :
860 0 : } catch (const std::exception& ex) {
861 0 : if (DCPS_debug_level > 0) {
862 0 : ACE_ERROR((LM_ERROR,
863 : ACE_TEXT("(%P|%t) DataDurabilityCache::get_data ")
864 : ACE_TEXT("couldn't remove directory for PERSISTENT ")
865 : ACE_TEXT("data: %C\n"), ex.what()));
866 : }
867 0 : }
868 : }
869 0 : return true;
870 0 : }
871 :
872 : #endif // OPENDDS_NO_PERSISTENCE_PROFILE
|