Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 :
10 : #include "WriteDataContainer.h"
11 :
12 : #include "DataSampleHeader.h"
13 : #include "InstanceDataSampleList.h"
14 : #include "DataWriterImpl.h"
15 : #include "MessageTracker.h"
16 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
17 : # include "DataDurabilityCache.h"
18 : #endif
19 : #include "PublicationInstance.h"
20 : #include "Util.h"
21 : #include "Time_Helper.h"
22 : #include "GuidConverter.h"
23 : #include "transport/framework/TransportSendElement.h"
24 : #include "transport/framework/TransportCustomizedElement.h"
25 : #include "transport/framework/TransportRegistry.h"
26 :
27 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
28 :
29 : namespace OpenDDS {
30 : namespace DCPS {
31 :
32 : /**
33 : * @todo Refactor this code and DataReaderImpl::data_expired() to
34 : * a common function.
35 : */
36 : bool
37 0 : resend_data_expired(const DataSampleElement& element,
38 : const DDS::LifespanQosPolicy& lifespan)
39 : {
40 0 : if (lifespan.duration.sec != DDS::DURATION_INFINITE_SEC
41 0 : || lifespan.duration.nanosec != DDS::DURATION_INFINITE_NSEC) {
42 : // Finite lifespan. Check if data has expired.
43 :
44 : const DDS::Time_t tmp = {
45 0 : element.get_header().source_timestamp_sec_ + lifespan.duration.sec,
46 0 : element.get_header().source_timestamp_nanosec_ + lifespan.duration.nanosec
47 0 : };
48 0 : const SystemTimePoint expiration_time(time_to_time_value(tmp));
49 0 : const SystemTimePoint now = SystemTimePoint::now();
50 :
51 0 : if (now >= expiration_time) {
52 0 : if (DCPS_debug_level >= 8) {
53 0 : const TimeDuration diff = now - expiration_time;
54 0 : ACE_DEBUG((LM_DEBUG,
55 : ACE_TEXT("OpenDDS (%P|%t) Data to be sent ")
56 : ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
57 : diff.value().sec(),
58 : diff.value().usec()));
59 0 : }
60 :
61 0 : return true; // Data expired.
62 : }
63 0 : }
64 :
65 0 : return false;
66 : }
67 :
68 0 : WriteDataContainer::WriteDataContainer(
69 : DataWriterImpl* writer,
70 : CORBA::Long max_samples_per_instance,
71 : CORBA::Long history_depth,
72 : CORBA::Long max_durable_per_instance,
73 : DDS::Duration_t max_blocking_time,
74 : size_t n_chunks,
75 : DDS::DomainId_t domain_id,
76 : const char* topic_name,
77 : const char* type_name,
78 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
79 : DataDurabilityCache* durability_cache,
80 : const DDS::DurabilityServiceQosPolicy& durability_service,
81 : #endif
82 : CORBA::Long max_instances,
83 : CORBA::Long max_total_samples,
84 : ACE_Recursive_Thread_Mutex& deadline_status_lock,
85 : DDS::OfferedDeadlineMissedStatus& deadline_status,
86 0 : CORBA::Long& deadline_last_total_count)
87 0 : : cached_cumulative_ack_valid_(false)
88 0 : , transaction_id_(0)
89 0 : , publication_id_(GUID_UNKNOWN)
90 0 : , writer_(writer)
91 0 : , max_samples_per_instance_(max_samples_per_instance)
92 0 : , history_depth_(history_depth)
93 0 : , max_durable_per_instance_(max_durable_per_instance)
94 0 : , max_num_instances_(max_instances)
95 0 : , max_num_samples_(max_total_samples)
96 0 : , max_blocking_time_(max_blocking_time)
97 0 : , waiting_on_release_(false)
98 0 : , condition_(lock_)
99 0 : , empty_condition_(lock_)
100 0 : , wfa_condition_(wfa_lock_)
101 0 : , n_chunks_(n_chunks)
102 0 : , sample_list_element_allocator_(2 * n_chunks_)
103 0 : , shutdown_(false)
104 0 : , domain_id_(domain_id)
105 0 : , topic_name_(topic_name)
106 0 : , type_name_(type_name)
107 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
108 0 : , durability_cache_(durability_cache)
109 0 : , durability_service_(durability_service)
110 : #endif
111 0 : , deadline_task_(DCPS::make_rch<DCPS::PmfSporadicTask<WriteDataContainer> >(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &WriteDataContainer::process_deadlines))
112 0 : , deadline_period_(TimeDuration::max_value)
113 0 : , deadline_status_lock_(deadline_status_lock)
114 0 : , deadline_status_(deadline_status)
115 0 : , deadline_last_total_count_(deadline_last_total_count)
116 : {
117 0 : if (DCPS_debug_level >= 2) {
118 0 : ACE_DEBUG((LM_DEBUG,
119 : "(%P|%t) WriteDataContainer "
120 : "sample_list_element_allocator %x with %d chunks\n",
121 : &sample_list_element_allocator_, n_chunks_));
122 : }
123 0 : acked_sequences_[GUID_UNKNOWN].insert(SequenceNumber::ZERO());
124 0 : }
125 :
126 0 : WriteDataContainer::~WriteDataContainer()
127 : {
128 0 : deadline_task_->cancel();
129 :
130 0 : if (this->unsent_data_.size() > 0) {
131 0 : ACE_DEBUG((LM_WARNING,
132 : ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
133 : ACE_TEXT("destroyed with %d samples unsent.\n"),
134 : this->unsent_data_.size()));
135 : }
136 :
137 0 : if (this->sending_data_.size() > 0) {
138 0 : if (TransportRegistry::instance()->released()) {
139 0 : for (DataSampleElement* e; sending_data_.dequeue_head(e);) {
140 0 : release_buffer(e);
141 : }
142 : }
143 0 : if (sending_data_.size() && DCPS_debug_level) {
144 0 : ACE_DEBUG((LM_WARNING,
145 : ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
146 : ACE_TEXT("destroyed with %d samples sending.\n"),
147 : this->sending_data_.size()));
148 : }
149 : }
150 :
151 0 : if (this->sent_data_.size() > 0) {
152 0 : ACE_DEBUG((LM_DEBUG,
153 : ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
154 : ACE_TEXT("destroyed with %d samples sent.\n"),
155 : this->sent_data_.size()));
156 : }
157 :
158 0 : if (this->orphaned_to_transport_.size() > 0) {
159 0 : if (DCPS_debug_level > 0) {
160 0 : ACE_DEBUG((LM_DEBUG,
161 : ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
162 : ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
163 : this->orphaned_to_transport_.size()));
164 : }
165 : }
166 :
167 0 : if (!shutdown_) {
168 0 : ACE_ERROR((LM_ERROR,
169 : ACE_TEXT("(%P|%t) ERROR: ")
170 : ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
171 : ACE_TEXT("The container has not been cleaned.\n")));
172 : }
173 0 : }
174 :
175 : void
176 0 : WriteDataContainer::add_reader_acks(const GUID_t& reader, const SequenceNumber& base)
177 : {
178 0 : ACE_Guard<ACE_Thread_Mutex> guard(wfa_lock_);
179 :
180 0 : DisjointSequence& ds = acked_sequences_[reader];
181 0 : ds.reset();
182 0 : if (base == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
183 0 : ds.insert(SequenceNumber::ZERO());
184 : } else {
185 0 : ds.insert(SequenceRange(SequenceNumber(), base));
186 : }
187 0 : cached_cumulative_ack_valid_ = false;
188 0 : }
189 :
190 : void
191 0 : WriteDataContainer::remove_reader_acks(const GUID_t& reader)
192 : {
193 0 : ACE_Guard<ACE_Thread_Mutex> guard(wfa_lock_);
194 :
195 0 : const SequenceNumber prev_cum_ack = get_cumulative_ack();
196 0 : const AckedSequenceMap::iterator it = acked_sequences_.find(reader);
197 0 : if (it != acked_sequences_.end()) {
198 0 : acked_sequences_.erase(it);
199 0 : cached_cumulative_ack_valid_ = false;
200 0 : if (prev_cum_ack != get_cumulative_ack()) {
201 0 : wfa_condition_.notify_all();
202 : }
203 : }
204 0 : }
205 :
206 : SequenceNumber
207 0 : WriteDataContainer::get_cumulative_ack()
208 : {
209 0 : if (acked_sequences_.empty()) {
210 0 : return SequenceNumber::SEQUENCENUMBER_UNKNOWN();
211 : }
212 :
213 0 : if (cached_cumulative_ack_valid_) {
214 0 : return cached_cumulative_ack_;
215 : }
216 :
217 0 : SequenceNumber result = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
218 0 : for (AckedSequenceMap::const_iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
219 0 : if (!it->second.empty()) {
220 0 : result = result == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? it->second.cumulative_ack() : std::min(result, it->second.cumulative_ack());
221 : }
222 : }
223 0 : cached_cumulative_ack_ = result;
224 0 : cached_cumulative_ack_valid_ = true;
225 0 : return result;
226 : }
227 :
228 : SequenceNumber
229 0 : WriteDataContainer::get_last_ack()
230 : {
231 0 : if (acked_sequences_.empty()) {
232 0 : return SequenceNumber::SEQUENCENUMBER_UNKNOWN();
233 : }
234 :
235 0 : SequenceNumber result = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
236 0 : for (AckedSequenceMap::const_iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
237 0 : if (!it->second.empty()) {
238 0 : result = result == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? it->second.last_ack() : std::max(result, it->second.last_ack());
239 : }
240 : }
241 0 : return result;
242 : }
243 :
244 : void
245 0 : WriteDataContainer::update_acked(const SequenceNumber& seq, const GUID_t& id)
246 : {
247 0 : bool do_notify = false;
248 0 : if (id == GUID_UNKNOWN) {
249 0 : for (AckedSequenceMap::iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
250 0 : SequenceNumber prev_cum_ack = it->second.cumulative_ack();
251 0 : it->second.insert(seq);
252 0 : cached_cumulative_ack_valid_ = false;
253 0 : if (prev_cum_ack != it->second.cumulative_ack()) {
254 0 : do_notify = true;
255 : }
256 : }
257 : } else {
258 0 : const AckedSequenceMap::iterator it = acked_sequences_.find(id);
259 0 : if (it != acked_sequences_.end()) {
260 0 : SequenceNumber prev_cum_ack = it->second.cumulative_ack();
261 0 : if (prev_cum_ack < seq) {
262 0 : it->second.insert(SequenceRange(prev_cum_ack, seq));
263 0 : cached_cumulative_ack_valid_ = false;
264 0 : if (prev_cum_ack != it->second.cumulative_ack()) {
265 0 : do_notify = true;
266 : }
267 : }
268 : }
269 : }
270 0 : if (do_notify) {
271 0 : wfa_condition_.notify_all();
272 : }
273 0 : }
274 :
275 : DDS::ReturnCode_t
276 0 : WriteDataContainer::enqueue_control(DataSampleElement* control_sample)
277 : {
278 : // Enqueue to the next_send_sample_ thread of unsent_data_
279 : // will link samples with the next_sample/previous_sample and
280 : // also next_send_sample_.
281 : // This would save time when we actually send the data.
282 :
283 0 : if (shutdown_) {
284 0 : return DDS::RETCODE_ERROR;
285 : }
286 :
287 0 : unsent_data_.enqueue_tail(control_sample);
288 :
289 0 : return DDS::RETCODE_OK;
290 : }
291 :
292 : // This method assumes that instance list has space for this sample.
293 : DDS::ReturnCode_t
294 0 : WriteDataContainer::enqueue(
295 : DataSampleElement* sample,
296 : DDS::InstanceHandle_t instance_handle)
297 : {
298 0 : if (shutdown_) {
299 0 : return DDS::RETCODE_ERROR;
300 : }
301 :
302 : // Get the PublicationInstance pointer from InstanceHandle_t.
303 : PublicationInstance_rch instance =
304 0 : get_handle_instance(instance_handle);
305 : // Extract the instance queue.
306 0 : InstanceDataSampleList& instance_list = instance->samples_;
307 :
308 0 : extend_deadline(instance);
309 :
310 : //
311 : // Enqueue to the next_send_sample_ thread of unsent_data_
312 : // will link samples with the next_sample/previous_sample and
313 : // also next_send_sample_.
314 : // This would save time when we actually send the data.
315 :
316 0 : unsent_data_.enqueue_tail(sample);
317 :
318 : //
319 : // Add this sample to the INSTANCE scope list.
320 0 : instance_list.enqueue_tail(sample);
321 :
322 0 : return DDS::RETCODE_OK;
323 0 : }
324 :
325 : DDS::ReturnCode_t
326 0 : WriteDataContainer::reenqueue_all(const GUID_t& reader_id,
327 : const DDS::LifespanQosPolicy& lifespan
328 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
329 : ,
330 : const OPENDDS_STRING& filterClassName,
331 : const FilterEvaluator* eval,
332 : const DDS::StringSeq& expression_params
333 : #endif
334 : )
335 : {
336 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
337 : guard,
338 : lock_,
339 : DDS::RETCODE_ERROR);
340 :
341 0 : ssize_t total_size = 0;
342 0 : for (PublicationInstanceMapType::iterator it = instances_.begin();
343 0 : it != instances_.end(); ++it) {
344 0 : const ssize_t durable = std::min(it->second->samples_.size(),
345 0 : ssize_t(max_durable_per_instance_));
346 0 : total_size += durable;
347 0 : it->second->durable_samples_remaining_ = durable;
348 : }
349 :
350 0 : copy_and_prepend(resend_data_,
351 0 : sending_data_,
352 : reader_id,
353 : lifespan,
354 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
355 : filterClassName, eval, expression_params,
356 : #endif
357 : total_size);
358 :
359 0 : copy_and_prepend(resend_data_,
360 0 : sent_data_,
361 : reader_id,
362 : lifespan,
363 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
364 : filterClassName, eval, expression_params,
365 : #endif
366 : total_size);
367 :
368 : {
369 0 : ACE_Guard<ACE_SYNCH_MUTEX> guard(wfa_lock_);
370 0 : cached_cumulative_ack_valid_ = false;
371 0 : DisjointSequence& ds = acked_sequences_[reader_id];
372 0 : ds = acked_sequences_[GUID_UNKNOWN];
373 :
374 : // Remove exactly what will be sent
375 0 : SendStateDataSampleList::iterator iter = resend_data_.begin();
376 0 : while (iter != resend_data_.end()) {
377 0 : ds.erase(iter->get_header().sequence_);
378 0 : ++iter;
379 : }
380 0 : }
381 :
382 0 : if (DCPS_debug_level > 9 && resend_data_.size()) {
383 0 : ACE_DEBUG((LM_DEBUG,
384 : ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
385 : ACE_TEXT("domain %d topic %C publication %C copying ")
386 : ACE_TEXT("sending/sent to resend to %C.\n"),
387 : domain_id_,
388 : topic_name_,
389 : LogGuid(publication_id_).c_str(),
390 : LogGuid(reader_id).c_str()));
391 : }
392 :
393 0 : return DDS::RETCODE_OK;
394 0 : }
395 :
396 : DDS::ReturnCode_t
397 0 : WriteDataContainer::register_instance(
398 : DDS::InstanceHandle_t& instance_handle,
399 : Message_Block_Ptr& registered_sample)
400 : {
401 0 : PublicationInstance_rch instance;
402 :
403 0 : if (instance_handle == DDS::HANDLE_NIL) {
404 0 : if (max_num_instances_ > 0
405 0 : && max_num_instances_ <= (CORBA::Long) instances_.size()) {
406 0 : return DDS::RETCODE_OUT_OF_RESOURCES;
407 : }
408 :
409 : // registered the instance for the first time.
410 0 : instance.reset(new PublicationInstance(move(registered_sample)), keep_count());
411 :
412 0 : instance_handle = this->writer_->get_next_handle();
413 :
414 0 : int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
415 :
416 0 : if (0 != insert_attempt) {
417 0 : ACE_ERROR((LM_ERROR,
418 : ACE_TEXT("(%P|%t) ERROR: ")
419 : ACE_TEXT("WriteDataContainer::register_instance, ")
420 : ACE_TEXT("failed to insert instance handle=%X\n"),
421 : instance.in()));
422 0 : return DDS::RETCODE_ERROR;
423 : } // if (0 != insert_attempt)
424 :
425 0 : instance->instance_handle_ = instance_handle;
426 :
427 0 : extend_deadline(instance);
428 :
429 : } else {
430 :
431 0 : int const find_attempt = find(instances_, instance_handle, instance);
432 :
433 0 : if (0 != find_attempt) {
434 0 : ACE_ERROR((LM_ERROR,
435 : ACE_TEXT("(%P|%t) ERROR: ")
436 : ACE_TEXT("WriteDataContainer::register_instance, ")
437 : ACE_TEXT("The provided instance handle=%X is not a valid")
438 : ACE_TEXT("handle.\n"),
439 : instance_handle));
440 :
441 0 : return DDS::RETCODE_ERROR;
442 : } // if (0 != find_attempt)
443 : }
444 :
445 : // The registered_sample is shallow copied.
446 0 : registered_sample.reset(instance->registered_sample_->duplicate());
447 :
448 0 : return DDS::RETCODE_OK;
449 0 : }
450 :
451 : DDS::ReturnCode_t
452 0 : WriteDataContainer::unregister(
453 : DDS::InstanceHandle_t instance_handle,
454 : Message_Block_Ptr& registered_sample,
455 : bool dup_registered_sample)
456 : {
457 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
458 : guard,
459 : lock_,
460 : DDS::RETCODE_ERROR);
461 :
462 0 : PublicationInstance_rch instance;
463 : {
464 0 : PublicationInstanceMapType::iterator pos = instances_.find(instance_handle);
465 0 : if (pos == instances_.end()) {
466 0 : ACE_ERROR_RETURN((LM_ERROR,
467 : ACE_TEXT("(%P|%t) ERROR: ")
468 : ACE_TEXT("WriteDataContainer::unregister, ")
469 : ACE_TEXT("The instance(handle=%X) ")
470 : ACE_TEXT("is not registered yet.\n"),
471 : instance_handle),
472 : DDS::RETCODE_PRECONDITION_NOT_MET);
473 : }
474 0 : instance = pos->second;
475 0 : instances_.erase(pos);
476 : }
477 :
478 0 : return remove_instance(instance, registered_sample, dup_registered_sample);
479 0 : }
480 :
481 : DDS::ReturnCode_t
482 0 : WriteDataContainer::dispose(DDS::InstanceHandle_t instance_handle,
483 : Message_Block_Ptr& registered_sample,
484 : bool dup_registered_sample)
485 : {
486 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
487 : guard,
488 : lock_,
489 : DDS::RETCODE_ERROR);
490 :
491 0 : PublicationInstance_rch instance;
492 :
493 0 : int const find_attempt = find(instances_, instance_handle, instance);
494 :
495 0 : if (0 != find_attempt) {
496 0 : ACE_ERROR_RETURN((LM_ERROR,
497 : ACE_TEXT("(%P|%t) ERROR: ")
498 : ACE_TEXT("WriteDataContainer::dispose, ")
499 : ACE_TEXT("The instance(handle=%X) ")
500 : ACE_TEXT("is not registered yet.\n"),
501 : instance_handle),
502 : DDS::RETCODE_PRECONDITION_NOT_MET);
503 : }
504 :
505 0 : return remove_instance(instance, registered_sample, dup_registered_sample);
506 0 : }
507 :
508 : DDS::ReturnCode_t
509 0 : WriteDataContainer::remove_instance(PublicationInstance_rch instance,
510 : Message_Block_Ptr& registered_sample,
511 : bool dup_registered_sample)
512 : {
513 0 : if (dup_registered_sample) {
514 : // The registered_sample is shallow copied.
515 0 : registered_sample.reset(instance->registered_sample_->duplicate());
516 : }
517 :
518 : // Note: The DDS specification is unclear as to if samples in the process
519 : // of being sent should be removed or not.
520 : // The advantage of calling remove_sample() on them is that the
521 : // cached allocator memory for them is freed. The disadvantage
522 : // is that the slow reader may see multiple disposes without
523 : // any write sample between them and hence not temporarily move into the
524 : // Alive state.
525 : // We have chosen to NOT remove the sending samples.
526 0 : InstanceDataSampleList& instance_list = instance->samples_;
527 :
528 0 : while (instance_list.size() > 0) {
529 0 : bool released = false;
530 0 : const DDS::ReturnCode_t ret = remove_oldest_sample(instance_list, released);
531 0 : if (ret != DDS::RETCODE_OK) {
532 0 : return ret;
533 : }
534 : }
535 :
536 0 : cancel_deadline(instance);
537 :
538 0 : return DDS::RETCODE_OK;
539 : }
540 :
541 : DDS::ReturnCode_t
542 0 : WriteDataContainer::num_samples(DDS::InstanceHandle_t handle,
543 : size_t& size)
544 : {
545 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
546 : guard,
547 : lock_,
548 : DDS::RETCODE_ERROR);
549 0 : PublicationInstance_rch instance;
550 :
551 0 : int const find_attempt = find(instances_, handle, instance);
552 :
553 0 : if (0 != find_attempt) {
554 0 : return DDS::RETCODE_ERROR;
555 :
556 : } else {
557 0 : size = instance->samples_.size();
558 0 : return DDS::RETCODE_OK;
559 : }
560 0 : }
561 :
562 : size_t
563 0 : WriteDataContainer::num_all_samples()
564 : {
565 0 : size_t size = 0;
566 :
567 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
568 : guard,
569 : lock_,
570 : 0);
571 :
572 0 : for (PublicationInstanceMapType::iterator iter = instances_.begin();
573 0 : iter != instances_.end();
574 0 : ++iter)
575 : {
576 0 : size += iter->second->samples_.size();
577 : }
578 :
579 0 : return size;
580 0 : }
581 :
582 : ACE_UINT64
583 0 : WriteDataContainer::get_unsent_data(SendStateDataSampleList& list)
584 : {
585 : DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
586 : //
587 : // The samples in unsent_data are added to the local datawriter
588 : // list and enqueued to the sending_data_ signifying they have
589 : // been passed to the transport to send in a transaction
590 : //
591 0 : list = this->unsent_data_;
592 :
593 : // Increment send counter for this send operation
594 0 : ++transaction_id_;
595 :
596 : // Mark all samples with current send counter
597 0 : SendStateDataSampleList::iterator iter = list.begin();
598 0 : while (iter != list.end()) {
599 0 : iter->set_transaction_id(this->transaction_id_);
600 0 : ++iter;
601 : }
602 :
603 : //
604 : // The unsent_data_ already linked with the
605 : // next_send_sample during enqueue.
606 : // Append the unsent_data_ to current sending_data_
607 : // list.
608 0 : sending_data_.enqueue_tail(list);
609 :
610 : //
611 : // Clear the unsent data list.
612 : //
613 0 : this->unsent_data_.reset();
614 :
615 : //
616 : // Return the moved list.
617 : //
618 0 : return transaction_id_;
619 : }
620 :
621 : SendStateDataSampleList
622 0 : WriteDataContainer::get_resend_data()
623 : {
624 : DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
625 :
626 : //
627 : // The samples in unsent_data are added to the sending_data
628 : // during enqueue.
629 : //
630 0 : SendStateDataSampleList list = this->resend_data_;
631 :
632 : //
633 : // Clear the unsent data list.
634 : //
635 0 : this->resend_data_.reset();
636 : //
637 : // Return the moved list.
638 : //
639 0 : return list;
640 : }
641 :
642 : bool
643 0 : WriteDataContainer::pending_data()
644 : {
645 0 : return this->sending_data_.size() != 0
646 0 : || this->orphaned_to_transport_.size() != 0
647 0 : || this->unsent_data_.size() != 0;
648 : }
649 :
650 : void
651 0 : WriteDataContainer::data_delivered(const DataSampleElement* sample)
652 : {
653 : DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
654 :
655 0 : if (DCPS_debug_level >= 2) {
656 0 : ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
657 : ACE_TEXT(" %@\n"), sample));
658 : }
659 :
660 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
661 : guard,
662 : lock_);
663 :
664 : // Delivered samples _must_ be on sending_data_ list
665 :
666 : // If it is not found in one of the lists, an invariant
667 : // exception is declared.
668 :
669 : // The element now needs to be removed from the sending_data_
670 : // list, and appended to the end of the sent_data_ list here
671 :
672 0 : DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
673 :
674 : // If sample is on a SendStateDataSampleList it should be on the
675 : // sending_data_ list signifying it was given to the transport to
676 : // deliver and now the transport is signaling it has been delivered
677 0 : if (!sending_data_.dequeue(sample)) {
678 : //
679 : // Should be on sending_data_. If it is in sent_data_
680 : // or unsent_data there was a problem.
681 : //
682 : SendStateDataSampleList* send_lists[] = {
683 0 : &sent_data_,
684 0 : &unsent_data_,
685 0 : &orphaned_to_transport_};
686 : const SendStateDataSampleList* containing_list =
687 0 : SendStateDataSampleList::send_list_containing_element(stale, send_lists);
688 :
689 0 : if (containing_list == &sent_data_) {
690 0 : ACE_ERROR((LM_WARNING,
691 : ACE_TEXT("(%P|%t) WARNING: ")
692 : ACE_TEXT("WriteDataContainer::data_delivered, ")
693 : ACE_TEXT("The delivered sample is not in sending_data_ and ")
694 : ACE_TEXT("WAS IN sent_data_.\n")));
695 0 : } else if (containing_list == &unsent_data_) {
696 0 : ACE_ERROR((LM_WARNING,
697 : ACE_TEXT("(%P|%t) WARNING: ")
698 : ACE_TEXT("WriteDataContainer::data_delivered, ")
699 : ACE_TEXT("The delivered sample is not in sending_data_ and ")
700 : ACE_TEXT("WAS IN unsent_data_ list.\n")));
701 : } else {
702 :
703 : //No-op: elements may be removed from all WriteDataContainer lists during shutdown
704 : //and inform transport of their release. Transport will call data-delivered on the
705 : //elements as it processes the removal but they will already be gone from the send lists.
706 0 : if (stale->get_header().message_id_ != SAMPLE_DATA) {
707 : //this message was a control message so release it
708 0 : if (DCPS_debug_level > 9) {
709 0 : ACE_DEBUG((LM_DEBUG,
710 : ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
711 : ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
712 : domain_id_,
713 : topic_name_,
714 : LogGuid(publication_id_).c_str()));
715 : }
716 0 : writer_->controlTracker.message_delivered();
717 : }
718 :
719 0 : if (containing_list == &orphaned_to_transport_) {
720 0 : orphaned_to_transport_.dequeue(sample);
721 0 : release_buffer(stale);
722 :
723 0 : } else if (!containing_list) {
724 : // samples that were retrieved from get_resend_data()
725 0 : ACE_Guard<ACE_SYNCH_MUTEX> wfa_guard(wfa_lock_);
726 0 : const CORBA::ULong num_subs = stale->get_num_subs();
727 0 : for (CORBA::ULong i = 0; i < num_subs; ++i) {
728 0 : update_acked(stale->get_header().sequence_, stale->get_sub_id(i));
729 : }
730 0 : wfa_guard.release();
731 0 : SendStateDataSampleList::remove(stale);
732 0 : release_buffer(stale);
733 0 : }
734 :
735 0 : if (!pending_data()) {
736 0 : empty_condition_.notify_all();
737 : }
738 : }
739 :
740 0 : return;
741 : }
742 0 : ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, wfa_lock_);
743 0 : SequenceNumber acked_seq = stale->get_header().sequence_;
744 0 : SequenceNumber prev_max = get_cumulative_ack();
745 :
746 0 : if (stale->get_header().message_id_ != SAMPLE_DATA) {
747 : //this message was a control message so release it
748 0 : if (DCPS_debug_level > 9) {
749 0 : ACE_DEBUG((LM_DEBUG,
750 : ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
751 : ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
752 : domain_id_,
753 : topic_name_,
754 : LogGuid(publication_id_).c_str()));
755 : }
756 0 : release_buffer(stale);
757 0 : stale = 0;
758 0 : writer_->controlTracker.message_delivered();
759 : } else {
760 :
761 0 : if (max_durable_per_instance_ && !shutdown_ && InstanceDataSampleList::on_some_list(sample)) {
762 0 : const_cast<DataSampleElement*>(sample)->get_header().historic_sample_ = true;
763 0 : DataSampleHeader::set_flag(HISTORIC_SAMPLE_FLAG, sample->get_sample());
764 0 : sent_data_.enqueue_tail(sample);
765 :
766 : } else {
767 0 : if (InstanceDataSampleList::on_some_list(sample)) {
768 0 : PublicationInstance_rch inst = sample->get_handle();
769 0 : inst->samples_.dequeue(sample);
770 0 : }
771 0 : release_buffer(stale);
772 0 : stale = 0;
773 : }
774 :
775 0 : if (DCPS_debug_level > 9) {
776 0 : ACE_DEBUG((LM_DEBUG,
777 : ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
778 : ACE_TEXT("domain %d topic %C publication %C seq# %q %s.\n"),
779 : domain_id_,
780 : topic_name_,
781 : LogGuid(publication_id_).c_str(),
782 : acked_seq.getValue(),
783 : max_durable_per_instance_
784 : ? ACE_TEXT("stored for durability")
785 : : ACE_TEXT("released")));
786 : }
787 :
788 0 : wakeup_blocking_writers(stale);
789 : }
790 0 : if (DCPS_debug_level > 9) {
791 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
792 : ACE_TEXT("Inserting acked_sequence: %q\n"),
793 : acked_seq.getValue()));
794 : }
795 :
796 0 : update_acked(acked_seq);
797 :
798 0 : if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
799 0 : prev_max < get_cumulative_ack()) {
800 :
801 0 : if (DCPS_debug_level > 9) {
802 0 : ACE_DEBUG((LM_DEBUG,
803 : ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
804 : ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
805 : }
806 :
807 0 : wfa_condition_.notify_all();
808 : }
809 :
810 : // Signal if there is no pending data.
811 0 : if (!pending_data()) {
812 0 : empty_condition_.notify_all();
813 : }
814 0 : }
815 :
816 : void
817 0 : WriteDataContainer::data_dropped(const DataSampleElement* sample,
818 : bool dropped_by_transport)
819 : {
820 : DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
821 :
822 0 : if (DCPS_debug_level >= 2) {
823 0 : ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
824 : ACE_TEXT(" sample %X dropped_by_transport %d\n"),
825 : sample, dropped_by_transport));
826 : }
827 :
828 : // If the transport initiates the data dropping, we need do same thing
829 : // as data_delivered. e.g. remove the sample from the internal list
830 : // and the instance list. We do not need acquire the lock here since
831 : // the data_delivered acquires the lock.
832 0 : if (dropped_by_transport) {
833 0 : data_delivered(sample);
834 0 : return;
835 : }
836 :
837 : //The data_dropped could be called from the thread initiating sample remove
838 : //which already hold the lock. In this case, it's not necessary to acquire
839 : //lock here. It also could be called from the transport thread in a delayed
840 : //notification, it's necessary to acquire lock here to protect the internal
841 : //structures in this class.
842 :
843 0 : ACE_GUARD (ACE_Recursive_Thread_Mutex,
844 : guard,
845 : lock_);
846 :
847 : // The dropped sample should be in the sending_data_ list.
848 : // Otherwise an exception will be raised.
849 : //
850 : // We are now been notified by transport, so we can
851 : // keep the sample from the sending_data_ list still in
852 : // sample list since we will send it.
853 :
854 0 : DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
855 :
856 : // If sample is on a SendStateDataSampleList it should be on the
857 : // sending_data_ list signifying it was given to the transport to
858 : // deliver and now the transport is signaling it has been dropped
859 :
860 0 : if (sending_data_.dequeue(sample)) {
861 : // else: The data_dropped is called as a result of remove_sample()
862 : // called from reenqueue_all() which supports the TRANSIENT_LOCAL
863 : // qos. The samples that are sending by transport are dropped from
864 : // transport and will be moved to the unsent list for resend.
865 0 : if (!shutdown_ && InstanceDataSampleList::on_some_list(sample)) {
866 0 : unsent_data_.enqueue_tail(sample);
867 : } else {
868 0 : SendStateDataSampleList::remove(stale);
869 0 : release_buffer(stale);
870 0 : stale = 0;
871 : }
872 :
873 : } else {
874 : //
875 : // If it is in sent_data_ or unsent_data there was a problem.
876 : //
877 : SendStateDataSampleList* send_lists[] = {
878 0 : &sent_data_,
879 0 : &unsent_data_,
880 0 : &orphaned_to_transport_};
881 : const SendStateDataSampleList* containing_list =
882 0 : SendStateDataSampleList::send_list_containing_element(stale, send_lists);
883 :
884 0 : if (containing_list == &sent_data_) {
885 0 : ACE_ERROR((LM_WARNING,
886 : ACE_TEXT("(%P|%t) WARNING: ")
887 : ACE_TEXT("WriteDataContainer::data_dropped, ")
888 : ACE_TEXT("The dropped sample is not in sending_data_ and ")
889 : ACE_TEXT("WAS IN sent_data_.\n")));
890 0 : } else if (containing_list == &unsent_data_) {
891 0 : ACE_ERROR((LM_WARNING,
892 : ACE_TEXT("(%P|%t) WARNING: ")
893 : ACE_TEXT("WriteDataContainer::data_dropped, ")
894 : ACE_TEXT("The dropped sample is not in sending_data_ and ")
895 : ACE_TEXT("WAS IN unsent_data_ list.\n")));
896 : } else {
897 :
898 : //No-op: elements may be removed from all WriteDataContainer lists during shutdown
899 : //and inform transport of their release. Transport will call data-dropped on the
900 : //elements as it processes the removal but they will already be gone from the send lists.
901 0 : if (stale->get_header().message_id_ != SAMPLE_DATA) {
902 : //this message was a control message so release it
903 0 : if (DCPS_debug_level > 9) {
904 0 : ACE_DEBUG((LM_DEBUG,
905 : ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
906 : ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
907 : domain_id_,
908 : topic_name_,
909 : LogGuid(publication_id_).c_str()));
910 : }
911 0 : writer_->controlTracker.message_dropped();
912 : }
913 :
914 0 : if (containing_list == &orphaned_to_transport_) {
915 0 : orphaned_to_transport_.dequeue(sample);
916 0 : release_buffer(stale);
917 0 : stale = 0;
918 0 : if (!pending_data()) {
919 0 : empty_condition_.notify_all();
920 : }
921 :
922 0 : } else if (!containing_list) {
923 : // samples that were retrieved from get_resend_data()
924 0 : SendStateDataSampleList::remove(stale);
925 0 : release_buffer(stale);
926 0 : stale = 0;
927 : }
928 : }
929 :
930 0 : return;
931 : }
932 :
933 0 : wakeup_blocking_writers(stale);
934 :
935 0 : if (!pending_data()) {
936 0 : empty_condition_.notify_all();
937 : }
938 0 : }
939 :
940 : void
941 0 : WriteDataContainer::remove_excess_durable()
942 : {
943 0 : if (!max_durable_per_instance_)
944 0 : return;
945 :
946 0 : size_t n_released = 0;
947 :
948 0 : for (PublicationInstanceMapType::iterator iter = instances_.begin();
949 0 : iter != instances_.end();
950 0 : ++iter) {
951 :
952 0 : CORBA::Long durable_allowed = max_durable_per_instance_;
953 0 : InstanceDataSampleList& instance_list = iter->second->samples_;
954 :
955 0 : for (DataSampleElement* it = instance_list.tail(), *prev; it; it = prev) {
956 0 : prev = InstanceDataSampleList::prev(it);
957 :
958 0 : if (DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, it->get_sample())) {
959 :
960 0 : if (durable_allowed) {
961 0 : --durable_allowed;
962 : } else {
963 0 : instance_list.dequeue(it);
964 0 : sent_data_.dequeue(it);
965 0 : release_buffer(it);
966 0 : ++n_released;
967 : }
968 : }
969 : }
970 : }
971 :
972 0 : if (n_released && DCPS_debug_level > 9) {
973 0 : ACE_DEBUG((LM_DEBUG,
974 : ACE_TEXT("(%P|%t) WriteDataContainer::remove_excess_durable: ")
975 : ACE_TEXT("domain %d topic %C publication %C %B samples removed ")
976 : ACE_TEXT("from durable data.\n"), domain_id_, topic_name_,
977 : LogGuid(publication_id_).c_str(), n_released));
978 : }
979 : }
980 :
981 :
982 : DDS::ReturnCode_t
983 0 : WriteDataContainer::remove_oldest_sample(
984 : InstanceDataSampleList& instance_list,
985 : bool& released)
986 : {
987 0 : DataSampleElement* stale = 0;
988 :
989 : //
990 : // Remove the oldest sample from the instance list.
991 : //
992 0 : if (!instance_list.dequeue_head(stale)) {
993 0 : ACE_ERROR_RETURN((LM_ERROR,
994 : ACE_TEXT("(%P|%t) ERROR: ")
995 : ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
996 : ACE_TEXT("dequeue_head_next_sample failed\n")),
997 : DDS::RETCODE_ERROR);
998 : }
999 :
1000 : //
1001 : // Remove the stale data from the next_writer_sample_ list. The
1002 : // sending_data_/next_send_sample_ list is not managed within the
1003 : // container, it is only used external to the container and does
1004 : // not need to be managed internally.
1005 : //
1006 : // The next_writer_sample_ link is being used in one of the sent_data_,
1007 : // sending_data_, or unsent_data lists. Removal from the doubly
1008 : // linked list needs to repair the list only when the stale sample
1009 : // is either the head or tail of the list.
1010 : //
1011 :
1012 : //
1013 : // Locate the head of the list that the stale data is in.
1014 : //
1015 : SendStateDataSampleList* send_lists[] = {
1016 0 : &sending_data_,
1017 0 : &sent_data_,
1018 0 : &unsent_data_,
1019 0 : &orphaned_to_transport_};
1020 : const SendStateDataSampleList* containing_list =
1021 0 : SendStateDataSampleList::send_list_containing_element(stale, send_lists);
1022 :
1023 : //
1024 : // Identify the list that the stale data is in.
1025 : // The stale data should be in one of the sent_data_, sending_data_
1026 : // or unsent_data_. It should not be in released_data_ list since
1027 : // this function is the only place a sample is moved from
1028 : // sending_data_ to released_data_ list.
1029 :
1030 : // Remove the element from the internal list.
1031 0 : bool result = false;
1032 :
1033 0 : if (containing_list == &this->sending_data_) {
1034 0 : if (DCPS_debug_level > 2) {
1035 0 : ACE_ERROR((LM_WARNING,
1036 : ACE_TEXT("(%P|%t) WARNING: ")
1037 : ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1038 : ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
1039 : }
1040 :
1041 : // This means transport is still using the sample that needs to
1042 : // be released currently so notify transport that sample is being removed.
1043 :
1044 0 : if (this->writer_->remove_sample(stale)) {
1045 0 : if (this->sent_data_.dequeue(stale)) {
1046 0 : release_buffer(stale);
1047 : }
1048 0 : result = true;
1049 :
1050 : } else {
1051 0 : if (this->sending_data_.dequeue(stale)) {
1052 0 : this->orphaned_to_transport_.enqueue_tail(stale);
1053 0 : } else if (this->sent_data_.dequeue(stale)) {
1054 0 : release_buffer(stale);
1055 0 : result = true;
1056 : }
1057 0 : result = true;
1058 : }
1059 0 : released = true;
1060 :
1061 0 : } else if (containing_list == &this->sent_data_) {
1062 : // No one is using the data sample, so we can release it back to
1063 : // its allocator.
1064 : //
1065 0 : result = this->sent_data_.dequeue(stale) != 0;
1066 0 : release_buffer(stale);
1067 0 : released = true;
1068 :
1069 0 : if (DCPS_debug_level > 9) {
1070 0 : ACE_DEBUG((LM_DEBUG,
1071 : ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
1072 : ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
1073 : this->domain_id_,
1074 : this->topic_name_,
1075 : LogGuid(publication_id_).c_str()));
1076 : }
1077 :
1078 0 : } else if (containing_list == &this->unsent_data_) {
1079 : //
1080 : // No one is using the data sample, so we can release it back to
1081 : // its allocator.
1082 : //
1083 0 : result = this->unsent_data_.dequeue(stale) != 0;
1084 0 : release_buffer(stale);
1085 0 : released = true;
1086 :
1087 0 : if (DCPS_debug_level > 9) {
1088 0 : ACE_DEBUG((LM_DEBUG,
1089 : ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
1090 : ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
1091 : this->domain_id_,
1092 : this->topic_name_,
1093 : LogGuid(publication_id_).c_str()));
1094 : }
1095 : } else {
1096 0 : ACE_ERROR_RETURN((LM_ERROR,
1097 : ACE_TEXT("(%P|%t) ERROR: ")
1098 : ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1099 : ACE_TEXT("The oldest sample is not in any internal list.\n")),
1100 : DDS::RETCODE_ERROR);
1101 : }
1102 :
1103 0 : if (!pending_data()) {
1104 0 : empty_condition_.notify_all();
1105 : }
1106 :
1107 0 : if (!result) {
1108 0 : ACE_ERROR_RETURN((LM_ERROR,
1109 : ACE_TEXT("(%P|%t) ERROR: ")
1110 : ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1111 : ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
1112 : DDS::RETCODE_ERROR);
1113 :
1114 : }
1115 :
1116 0 : return DDS::RETCODE_OK;
1117 : }
1118 :
1119 : DDS::ReturnCode_t
1120 0 : WriteDataContainer::obtain_buffer_for_control(DataSampleElement*& element)
1121 : {
1122 : DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
1123 :
1124 0 : ACE_NEW_MALLOC_RETURN(
1125 : element,
1126 : static_cast<DataSampleElement*>(
1127 : sample_list_element_allocator_.malloc(
1128 : sizeof(DataSampleElement))),
1129 : DataSampleElement(publication_id_,
1130 : this->writer_,
1131 : PublicationInstance_rch()),
1132 : DDS::RETCODE_ERROR);
1133 :
1134 0 : return DDS::RETCODE_OK;
1135 : }
1136 :
1137 : DDS::ReturnCode_t
1138 0 : WriteDataContainer::obtain_buffer(DataSampleElement*& element,
1139 : DDS::InstanceHandle_t handle)
1140 : {
1141 : DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
1142 :
1143 0 : remove_excess_durable();
1144 :
1145 0 : PublicationInstance_rch instance = get_handle_instance(handle);
1146 :
1147 0 : if (!instance) {
1148 0 : return DDS::RETCODE_BAD_PARAMETER;
1149 : }
1150 :
1151 0 : ACE_NEW_MALLOC_RETURN(
1152 : element,
1153 : static_cast<DataSampleElement*>(
1154 : sample_list_element_allocator_.malloc(
1155 : sizeof(DataSampleElement))),
1156 : DataSampleElement(publication_id_,
1157 : this->writer_,
1158 : instance),
1159 : DDS::RETCODE_ERROR);
1160 :
1161 : // Extract the current instance queue.
1162 0 : InstanceDataSampleList& instance_list = instance->samples_;
1163 0 : DDS::ReturnCode_t ret = DDS::RETCODE_OK;
1164 :
1165 0 : bool set_timeout = true;
1166 0 : MonotonicTimePoint timeout;
1167 :
1168 : //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and
1169 : //max_instances and max_instances * depth
1170 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1171 0 : while ((instance_list.size() >= max_samples_per_instance_) ||
1172 0 : ((this->max_num_samples_ > 0) &&
1173 0 : ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
1174 :
1175 0 : if (this->writer_->qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
1176 0 : if (instance_list.size() >= history_depth_) {
1177 0 : if (DCPS_debug_level >= 2) {
1178 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1179 : ACE_TEXT(" instance %d attempting to remove")
1180 : ACE_TEXT(" its oldest sample (reliable)\n"),
1181 : handle));
1182 : }
1183 0 : bool oldest_released = false;
1184 0 : ret = remove_oldest_sample(instance_list, oldest_released);
1185 0 : if (oldest_released) {
1186 0 : break;
1187 : }
1188 : }
1189 : // Reliable writers can wait
1190 0 : if (set_timeout) {
1191 0 : timeout = MonotonicTimePoint::now() + TimeDuration(max_blocking_time_);
1192 0 : set_timeout = false;
1193 : }
1194 0 : if (!shutdown_ && MonotonicTimePoint::now() < timeout) {
1195 0 : if (DCPS_debug_level >= 2) {
1196 0 : ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1197 : ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
1198 : handle));
1199 : }
1200 :
1201 0 : waiting_on_release_ = true;
1202 0 : switch (condition_.wait_until(timeout, thread_status_manager)) {
1203 0 : case CvStatus_NoTimeout:
1204 0 : remove_excess_durable();
1205 0 : break;
1206 :
1207 0 : case CvStatus_Timeout:
1208 0 : if (DCPS_debug_level >= 2) {
1209 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1210 : ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
1211 : handle));
1212 : }
1213 0 : ret = DDS::RETCODE_TIMEOUT;
1214 0 : break;
1215 :
1216 0 : case CvStatus_Error:
1217 0 : if (DCPS_debug_level) {
1218 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::obtain_buffer: "
1219 : "error in wait_until\n"));
1220 : }
1221 0 : ret = DDS::RETCODE_ERROR;
1222 0 : break;
1223 : }
1224 :
1225 : } else {
1226 : //either shutdown has been signaled or max_blocking_time
1227 : //has surpassed so treat as timeout
1228 0 : ret = DDS::RETCODE_TIMEOUT;
1229 : }
1230 :
1231 : } else {
1232 : //BEST EFFORT
1233 0 : bool oldest_released = false;
1234 :
1235 : //try to remove stale samples from this instance
1236 : // The remove_oldest_sample() method removes the oldest sample
1237 : // from instance list and removes it from the internal lists.
1238 0 : if (instance_list.size() > 0) {
1239 0 : if (DCPS_debug_level >= 2) {
1240 0 : ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1241 : ACE_TEXT(" instance %d attempting to remove")
1242 : ACE_TEXT(" its oldest sample\n"),
1243 : handle));
1244 : }
1245 0 : ret = remove_oldest_sample(instance_list, oldest_released);
1246 : }
1247 : //else try to remove stale samples from other instances which are full
1248 0 : if (ret == DDS::RETCODE_OK && !oldest_released) {
1249 0 : if (DCPS_debug_level >= 2) {
1250 0 : ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1251 : ACE_TEXT(" instance %d attempting to remove")
1252 : ACE_TEXT(" oldest sample from any full instances\n"),
1253 : handle));
1254 : }
1255 0 : PublicationInstanceMapType::iterator it = instances_.begin();
1256 :
1257 0 : while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
1258 0 : if (it->second->samples_.size() >= max_samples_per_instance_) {
1259 0 : ret = remove_oldest_sample(it->second->samples_, oldest_released);
1260 : }
1261 0 : ++it;
1262 : }
1263 : }
1264 : //else try to remove stale samples from other non-full instances
1265 0 : if (ret == DDS::RETCODE_OK && !oldest_released) {
1266 0 : if (DCPS_debug_level >= 2) {
1267 0 : ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1268 : ACE_TEXT(" instance %d attempting to remove")
1269 : ACE_TEXT(" oldest sample from any instance with samples currently\n"),
1270 : handle));
1271 : }
1272 0 : PublicationInstanceMapType::iterator it = instances_.begin();
1273 :
1274 0 : while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
1275 0 : if (it->second->samples_.size() > 0) {
1276 0 : ret = remove_oldest_sample(it->second->samples_, oldest_released);
1277 : }
1278 0 : ++it;
1279 : }
1280 : }
1281 0 : if (!oldest_released) {
1282 : //This means that no instances have samples to remove and yet
1283 : //still hitting resource limits.
1284 0 : ACE_ERROR((LM_ERROR,
1285 : ACE_TEXT("(%P|%t) ERROR: ")
1286 : ACE_TEXT("WriteDataContainer::obtain_buffer, ")
1287 : ACE_TEXT("hitting resource limits with no samples to remove\n")));
1288 0 : ret = DDS::RETCODE_ERROR;
1289 : }
1290 : } //END BEST EFFORT
1291 :
1292 0 : if (ret != DDS::RETCODE_OK) {
1293 0 : if (DCPS_debug_level >= 2) {
1294 0 : ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1295 : ACE_TEXT(" instance %d could not obtain buffer for sample")
1296 : ACE_TEXT(" releasing allotted sample and returning\n"),
1297 : handle));
1298 : }
1299 0 : this->release_buffer(element);
1300 0 : return ret;
1301 : }
1302 : } //END WHILE
1303 :
1304 0 : data_holder_.enqueue_tail(element);
1305 :
1306 0 : return ret;
1307 0 : }
1308 :
1309 : void
1310 0 : WriteDataContainer::release_buffer(DataSampleElement* element)
1311 : {
1312 0 : if (element->get_header().message_id_ == SAMPLE_DATA)
1313 0 : data_holder_.dequeue(element);
1314 : // Release the memory to the allocator.
1315 0 : ACE_DES_FREE(element,
1316 : sample_list_element_allocator_.free,
1317 : DataSampleElement);
1318 0 : }
1319 :
1320 : void
1321 0 : WriteDataContainer::unregister_all()
1322 : {
1323 : DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
1324 0 : shutdown_ = true;
1325 :
1326 : //The internal list needs protection since this call may result from the
1327 : //the delete_datawriter call which does not acquire the lock in advance.
1328 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
1329 : guard,
1330 : lock_);
1331 : // Tell transport remove all control messages currently
1332 : // transport is processing.
1333 0 : (void) this->writer_->remove_all_msgs();
1334 :
1335 : // Broadcast to wake up all waiting threads.
1336 0 : if (waiting_on_release_) {
1337 0 : condition_.notify_all();
1338 : }
1339 :
1340 0 : Message_Block_Ptr registered_sample;
1341 :
1342 0 : for (PublicationInstanceMapType::iterator pos = instances_.begin(), limit = instances_.end(); pos != limit;) {
1343 : // Release the instance data.
1344 0 : if (remove_instance(pos->second, registered_sample, false) != DDS::RETCODE_OK) {
1345 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::unregister_all, "
1346 : "remove_instance %X failed\n", pos->first));
1347 : }
1348 :
1349 0 : writer_->return_handle(pos->first);
1350 0 : instances_.erase(pos++);
1351 : }
1352 0 : }
1353 :
1354 : PublicationInstance_rch
1355 0 : WriteDataContainer::get_handle_instance(DDS::InstanceHandle_t handle)
1356 : {
1357 0 : PublicationInstance_rch instance;
1358 :
1359 0 : if (0 != find(instances_, handle, instance)) {
1360 0 : ACE_DEBUG((LM_DEBUG,
1361 : ACE_TEXT("(%P|%t) ")
1362 : ACE_TEXT("WriteDataContainer::get_handle_instance, ")
1363 : ACE_TEXT("lookup for %d failed\n"), handle));
1364 : }
1365 :
1366 0 : return instance;
1367 0 : }
1368 :
1369 : void
1370 0 : WriteDataContainer::copy_and_prepend(SendStateDataSampleList& list,
1371 : const SendStateDataSampleList& appended,
1372 : const GUID_t& reader_id,
1373 : const DDS::LifespanQosPolicy& lifespan,
1374 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1375 : const OPENDDS_STRING& filterClassName,
1376 : const FilterEvaluator* eval,
1377 : const DDS::StringSeq& params,
1378 : #endif
1379 : ssize_t& max_resend_samples)
1380 : {
1381 0 : for (SendStateDataSampleList::const_reverse_iterator cur = appended.rbegin();
1382 0 : cur != appended.rend() && max_resend_samples; ++cur) {
1383 :
1384 0 : if (resend_data_expired(*cur, lifespan))
1385 0 : continue;
1386 :
1387 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1388 0 : if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
1389 0 : continue;
1390 : #endif
1391 :
1392 0 : PublicationInstance_rch inst = cur->get_handle();
1393 :
1394 0 : if (!inst) {
1395 : // *cur is a control message, just skip it
1396 0 : continue;
1397 : }
1398 :
1399 0 : if (inst->durable_samples_remaining_ == 0)
1400 0 : continue;
1401 0 : --inst->durable_samples_remaining_;
1402 :
1403 0 : DataSampleElement* element = 0;
1404 0 : ACE_NEW_MALLOC(element,
1405 : static_cast<DataSampleElement*>(
1406 : sample_list_element_allocator_.malloc(
1407 : sizeof(DataSampleElement))),
1408 : DataSampleElement(*cur));
1409 :
1410 0 : element->set_num_subs(1);
1411 0 : element->set_sub_id(0, reader_id);
1412 :
1413 0 : if (DCPS_debug_level > 9) {
1414 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::copy_and_prepend added seq# %q\n",
1415 : cur->get_header().sequence_.getValue()));
1416 : }
1417 :
1418 0 : list.enqueue_head(element);
1419 0 : --max_resend_samples;
1420 0 : }
1421 : }
1422 :
1423 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1424 : bool
1425 0 : WriteDataContainer::persist_data()
1426 : {
1427 0 : bool result = true;
1428 :
1429 : // ------------------------------------------------------------
1430 : // Transfer sent data to data DURABILITY cache.
1431 : // ------------------------------------------------------------
1432 0 : if (this->durability_cache_) {
1433 : // A data durability cache is available for TRANSIENT or
1434 : // PERSISTENT data durability. Cache the data samples.
1435 :
1436 : //
1437 : // We only cache data that is not still in use outside of
1438 : // this instance of WriteDataContainer
1439 : // (only cache samples in sent_data_ meaning transport has delivered).
1440 : bool const inserted =
1441 0 : this->durability_cache_->insert(this->domain_id_,
1442 0 : this->topic_name_,
1443 0 : this->type_name_,
1444 0 : this->sent_data_,
1445 : this->durability_service_
1446 : );
1447 :
1448 0 : result = inserted;
1449 :
1450 0 : if (!inserted)
1451 0 : ACE_ERROR((LM_ERROR,
1452 : ACE_TEXT("(%P|%t) ERROR: ")
1453 : ACE_TEXT("WriteDataContainer::persist_data, ")
1454 : ACE_TEXT("failed to make data durable for ")
1455 : ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
1456 : this->domain_id_,
1457 : this->topic_name_,
1458 : this->type_name_));
1459 : }
1460 :
1461 0 : return result;
1462 : }
1463 : #endif
1464 :
1465 0 : void WriteDataContainer::wait_pending(const MonotonicTimePoint& deadline)
1466 : {
1467 0 : const bool no_deadline = deadline.is_zero();
1468 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, lock_);
1469 0 : const bool report = DCPS_debug_level > 0 && pending_data();
1470 0 : if (report) {
1471 0 : if (no_deadline) {
1472 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending no timeout\n")));
1473 : } else {
1474 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending ")
1475 : ACE_TEXT("timeout at %#T\n"),
1476 : &deadline.value()));
1477 : }
1478 : }
1479 :
1480 0 : bool loop = true;
1481 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1482 0 : while (loop && pending_data()) {
1483 0 : switch (empty_condition_.wait_until(deadline, thread_status_manager)) {
1484 0 : case CvStatus_NoTimeout:
1485 0 : break;
1486 :
1487 0 : case CvStatus_Timeout:
1488 0 : if (pending_data()) {
1489 0 : if (DCPS_debug_level >= 2) {
1490 0 : ACE_DEBUG((LM_INFO, "(%P|%t) WriteDataContainer::wait_pending: "
1491 : "Timed out waiting for messages to be transported\n"));
1492 0 : log_send_state_lists("WriteDataContainer::wait_pending - wait timedout: ");
1493 : }
1494 : }
1495 0 : loop = false;
1496 0 : break;
1497 :
1498 0 : case CvStatus_Error:
1499 0 : if (DCPS_debug_level) {
1500 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::wait_pending: "
1501 : "error in wait_until\n"));
1502 : }
1503 0 : loop = false;
1504 0 : break;
1505 : }
1506 : }
1507 0 : if (report) {
1508 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending done\n")));
1509 : }
1510 0 : }
1511 :
1512 : void
1513 0 : WriteDataContainer::get_instance_handles(InstanceHandleVec& instance_handles)
1514 : {
1515 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
1516 : guard,
1517 : lock_);
1518 0 : PublicationInstanceMapType::iterator it = instances_.begin();
1519 :
1520 0 : while (it != instances_.end()) {
1521 0 : instance_handles.push_back(it->second->instance_handle_);
1522 0 : ++it;
1523 : }
1524 0 : }
1525 :
1526 : DDS::ReturnCode_t
1527 0 : WriteDataContainer::wait_ack_of_seq(const MonotonicTimePoint& deadline,
1528 : bool deadline_is_infinite,
1529 : const SequenceNumber& sequence)
1530 : {
1531 0 : ACE_Guard<ACE_SYNCH_MUTEX> guard(wfa_lock_);
1532 0 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1533 0 : while ((deadline_is_infinite || MonotonicTimePoint::now() < deadline) && !sequence_acknowledged_i(sequence)) {
1534 0 : switch (deadline_is_infinite ? wfa_condition_.wait(thread_status_manager) : wfa_condition_.wait_until(deadline, thread_status_manager)) {
1535 0 : case CvStatus_NoTimeout:
1536 0 : break;
1537 0 : case CvStatus_Timeout:
1538 0 : if (DCPS_debug_level >= 2) {
1539 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
1540 : ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
1541 : sequence.getValue()));
1542 : }
1543 0 : return DDS::RETCODE_TIMEOUT;
1544 0 : case CvStatus_Error:
1545 0 : if (DCPS_debug_level) {
1546 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::wait_ack_of_seq: "
1547 : "error in wait/wait_until\n"));
1548 : }
1549 0 : return DDS::RETCODE_ERROR;
1550 : }
1551 : }
1552 :
1553 0 : return sequence_acknowledged_i(sequence) ? DDS::RETCODE_OK : DDS::RETCODE_TIMEOUT;
1554 0 : }
1555 :
1556 : bool
1557 0 : WriteDataContainer::sequence_acknowledged(const SequenceNumber& sequence)
1558 : {
1559 0 : ACE_Guard<ACE_SYNCH_MUTEX> guard(wfa_lock_);
1560 0 : return sequence_acknowledged_i(sequence);
1561 0 : }
1562 :
1563 : bool
1564 0 : WriteDataContainer::sequence_acknowledged_i(const SequenceNumber& sequence)
1565 : {
1566 0 : if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
1567 : //return true here so that wait_for_acknowledgments doesn't block
1568 0 : return true;
1569 : }
1570 :
1571 0 : SequenceNumber acked = get_cumulative_ack();
1572 0 : if (DCPS_debug_level >= 10) {
1573 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged_i ")
1574 : ACE_TEXT("- %C cumulative ack is currently: %q\n"), DCPS::LogGuid(publication_id_).c_str(), acked.getValue()));
1575 : }
1576 0 : if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
1577 0 : return false;
1578 : }
1579 0 : return true;
1580 : }
1581 :
1582 : void
1583 0 : WriteDataContainer::wakeup_blocking_writers(DataSampleElement* stale)
1584 : {
1585 0 : if (!stale && waiting_on_release_) {
1586 0 : waiting_on_release_ = false;
1587 :
1588 0 : condition_.notify_all();
1589 : }
1590 0 : }
1591 :
1592 : void
1593 0 : WriteDataContainer::log_send_state_lists(OPENDDS_STRING description)
1594 : {
1595 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
1596 : description.c_str(),
1597 : unsent_data_.size(),
1598 : sending_data_.size(),
1599 : sent_data_.size(),
1600 : orphaned_to_transport_.size(),
1601 : num_all_samples(),
1602 : instances_.size()));
1603 0 : }
1604 :
1605 : void
1606 0 : WriteDataContainer::set_deadline_period(const TimeDuration& deadline_period)
1607 : {
1608 : // Call comes from DataWriterImpl_t which should arleady have the lock_.
1609 :
1610 : // Deadline for all instances starting from now.
1611 0 : const MonotonicTimePoint deadline = MonotonicTimePoint::now() + deadline_period;
1612 :
1613 : // Reset the deadline timer if the period has changed.
1614 0 : if (deadline_period_ != deadline_period) {
1615 0 : if (deadline_period_ == TimeDuration::max_value) {
1616 0 : OPENDDS_ASSERT(deadline_map_.empty());
1617 :
1618 0 : for (PublicationInstanceMapType::iterator iter = instances_.begin();
1619 0 : iter != instances_.end();
1620 0 : ++iter) {
1621 0 : iter->second->deadline_ = deadline;
1622 0 : deadline_map_.insert(std::make_pair(deadline, iter->second));
1623 : }
1624 :
1625 0 : if (!deadline_map_.empty()) {
1626 0 : deadline_task_->schedule(deadline_period);
1627 : }
1628 0 : } else if (deadline_period == TimeDuration::max_value) {
1629 0 : if (!deadline_map_.empty()) {
1630 0 : deadline_task_->cancel();
1631 : }
1632 :
1633 0 : deadline_map_.clear();
1634 : } else {
1635 0 : DeadlineMapType new_map;
1636 0 : for (PublicationInstanceMapType::iterator iter = instances_.begin();
1637 0 : iter != instances_.end();
1638 0 : ++iter) {
1639 0 : iter->second->deadline_ = deadline;
1640 0 : new_map.insert(std::make_pair(iter->second->deadline_, iter->second));
1641 : }
1642 0 : std::swap(new_map, deadline_map_);
1643 :
1644 0 : if (!deadline_map_.empty()) {
1645 0 : deadline_task_->cancel();
1646 0 : deadline_task_->schedule(deadline_map_.begin()->first - MonotonicTimePoint::now());
1647 : }
1648 0 : }
1649 :
1650 0 : deadline_period_ = deadline_period;
1651 : }
1652 0 : }
1653 :
1654 : void
1655 0 : WriteDataContainer::process_deadlines(const MonotonicTimePoint& now)
1656 : {
1657 : // Lock the DataWriterImpl.
1658 0 : ACE_GUARD (ACE_Recursive_Thread_Mutex, dwi_guard, deadline_status_lock_);
1659 : // Lock ourselves.
1660 0 : ACE_GUARD (ACE_Recursive_Thread_Mutex, wdc_guard, lock_);
1661 :
1662 0 : if (deadline_map_.empty()) {
1663 0 : return;
1664 : }
1665 :
1666 0 : bool notify = false;
1667 :
1668 0 : for (DeadlineMapType::iterator pos = deadline_map_.begin(), limit = deadline_map_.end();
1669 0 : pos != limit && pos->first < now; pos = deadline_map_.begin()) {
1670 :
1671 0 : PublicationInstance_rch instance = pos->second;
1672 0 : deadline_map_.erase(pos);
1673 :
1674 0 : ++deadline_status_.total_count;
1675 0 : deadline_status_.total_count_change = deadline_status_.total_count - deadline_last_total_count_;
1676 0 : deadline_status_.last_instance_handle = instance->instance_handle_;
1677 :
1678 0 : writer_->set_status_changed_flag(DDS::OFFERED_DEADLINE_MISSED_STATUS, true);
1679 0 : notify = true;
1680 :
1681 0 : DDS::DataWriterListener_var listener = writer_->listener_for(DDS::OFFERED_DEADLINE_MISSED_STATUS);
1682 :
1683 0 : if (listener) {
1684 : // Copy before releasing the lock.
1685 0 : const DDS::OfferedDeadlineMissedStatus status = deadline_status_;
1686 :
1687 : // Release the lock during the upcall.
1688 0 : ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> deadline_reverse_status_lock(deadline_status_lock_);
1689 0 : ACE_GUARD(ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex>, rev_dwi_guard, deadline_reverse_status_lock);
1690 :
1691 : // @todo Will this operation ever throw? If so we may want to
1692 : // catch all exceptions, and act accordingly.
1693 0 : listener->on_offered_deadline_missed(writer_, status);
1694 :
1695 : // We need to update the last total count value to our current total
1696 : // so that the next time we will calculate the correct total_count_change;
1697 0 : deadline_last_total_count_ = deadline_status_.total_count;
1698 0 : }
1699 :
1700 0 : instance->deadline_ += deadline_period_;
1701 0 : deadline_map_.insert(std::make_pair(instance->deadline_, instance));
1702 0 : }
1703 :
1704 0 : if (notify) {
1705 0 : writer_->notify_status_condition();
1706 : }
1707 :
1708 0 : deadline_task_->schedule(deadline_map_.begin()->first - now);
1709 0 : }
1710 :
1711 : void
1712 0 : WriteDataContainer::extend_deadline(const PublicationInstance_rch& instance)
1713 : {
1714 : // Call comes from DataWriterImpl_t which should arleady have the lock_.
1715 :
1716 0 : if (deadline_period_ == TimeDuration::max_value) {
1717 0 : return;
1718 : }
1719 :
1720 0 : std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r = deadline_map_.equal_range(instance->deadline_);
1721 0 : while (r.first != r.second && r.first->second != instance) {
1722 0 : ++r.first;
1723 : }
1724 0 : if (r.first != r.second) {
1725 : // The instance was in the map.
1726 0 : deadline_map_.erase(r.first);
1727 : }
1728 0 : instance->deadline_ = MonotonicTimePoint::now() + deadline_period_;
1729 0 : bool schedule = deadline_map_.empty();
1730 0 : deadline_map_.insert(std::make_pair(instance->deadline_, instance));
1731 0 : if (schedule) {
1732 0 : deadline_task_->schedule(deadline_period_);
1733 : }
1734 : }
1735 :
1736 : void
1737 0 : WriteDataContainer::cancel_deadline(const PublicationInstance_rch& instance)
1738 : {
1739 : // Call comes from DataWriterImpl_t which should arleady have the lock_.
1740 :
1741 0 : if (deadline_period_ == TimeDuration::max_value) {
1742 0 : return;
1743 : }
1744 :
1745 0 : std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r = deadline_map_.equal_range(instance->deadline_);
1746 0 : while (r.first != r.second && r.first->second != instance) {
1747 0 : ++r.first;
1748 : }
1749 0 : if (r.first != r.second) {
1750 0 : deadline_map_.erase(r.first);
1751 0 : if (deadline_map_.empty()) {
1752 0 : deadline_task_->cancel();
1753 : }
1754 : }
1755 : }
1756 :
1757 : } // namespace DCPS
1758 : } // namespace OpenDDS
1759 :
1760 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|