OpenDDS  Snapshot(2023/04/28-20:55)
WriteDataContainer.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "WriteDataContainer.h"
11 
12 #include "DataSampleHeader.h"
13 #include "InstanceDataSampleList.h"
14 #include "DataWriterImpl.h"
15 #include "MessageTracker.h"
16 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
17 # include "DataDurabilityCache.h"
18 #endif
19 #include "PublicationInstance.h"
20 #include "Util.h"
21 #include "Time_Helper.h"
22 #include "GuidConverter.h"
26 
28 
29 namespace OpenDDS {
30 namespace DCPS {
31 
32 /**
33  * @todo Refactor this code and DataReaderImpl::data_expired() to
34  * a common function.
35  */
36 bool
38  const DDS::LifespanQosPolicy& lifespan)
39 {
42  // Finite lifespan. Check if data has expired.
43 
44  const DDS::Time_t tmp = {
45  element.get_header().source_timestamp_sec_ + lifespan.duration.sec,
47  };
48  const SystemTimePoint expiration_time(time_to_time_value(tmp));
50 
51  if (now >= expiration_time) {
52  if (DCPS_debug_level >= 8) {
53  const TimeDuration diff = now - expiration_time;
55  ACE_TEXT("OpenDDS (%P|%t) Data to be sent ")
56  ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
57  diff.value().sec(),
58  diff.value().usec()));
59  }
60 
61  return true; // Data expired.
62  }
63  }
64 
65  return false;
66 }
67 
69  DataWriterImpl* writer,
70  CORBA::Long max_samples_per_instance,
71  CORBA::Long history_depth,
72  CORBA::Long max_durable_per_instance,
73  DDS::Duration_t max_blocking_time,
74  size_t n_chunks,
75  DDS::DomainId_t domain_id,
76  const char* topic_name,
77  const char* type_name,
78 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
79  DataDurabilityCache* durability_cache,
80  const DDS::DurabilityServiceQosPolicy& durability_service,
81 #endif
82  CORBA::Long max_instances,
83  CORBA::Long max_total_samples,
84  ACE_Recursive_Thread_Mutex& deadline_status_lock,
85  DDS::OfferedDeadlineMissedStatus& deadline_status,
86  CORBA::Long& deadline_last_total_count)
87  : cached_cumulative_ack_valid_(false)
88  , transaction_id_(0)
89  , publication_id_(GUID_UNKNOWN)
90  , writer_(writer)
91  , max_samples_per_instance_(max_samples_per_instance)
92  , history_depth_(history_depth)
93  , max_durable_per_instance_(max_durable_per_instance)
94  , max_num_instances_(max_instances)
95  , max_num_samples_(max_total_samples)
96  , max_blocking_time_(max_blocking_time)
97  , waiting_on_release_(false)
98  , condition_(lock_)
99  , empty_condition_(lock_)
100  , wfa_condition_(wfa_lock_)
101  , n_chunks_(n_chunks)
102  , sample_list_element_allocator_(2 * n_chunks_)
103  , shutdown_(false)
104  , domain_id_(domain_id)
105  , topic_name_(topic_name)
106  , type_name_(type_name)
107 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
108  , durability_cache_(durability_cache)
109  , durability_service_(durability_service)
110 #endif
111  , deadline_task_(DCPS::make_rch<DCPS::PmfSporadicTask<WriteDataContainer> >(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &WriteDataContainer::process_deadlines))
112  , deadline_period_(TimeDuration::max_value)
113  , deadline_status_lock_(deadline_status_lock)
114  , deadline_status_(deadline_status)
115  , deadline_last_total_count_(deadline_last_total_count)
116 {
117  if (DCPS_debug_level >= 2) {
119  "(%P|%t) WriteDataContainer "
120  "sample_list_element_allocator %x with %d chunks\n",
122  }
124 }
125 
127 {
128  deadline_task_->cancel();
129 
130  if (this->unsent_data_.size() > 0) {
132  ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
133  ACE_TEXT("destroyed with %d samples unsent.\n"),
134  this->unsent_data_.size()));
135  }
136 
137  if (this->sending_data_.size() > 0) {
140  release_buffer(e);
141  }
142  }
145  ACE_TEXT("(%P|%t) WARNING: WriteDataContainer::~WriteDataContainer() - ")
146  ACE_TEXT("destroyed with %d samples sending.\n"),
147  this->sending_data_.size()));
148  }
149  }
150 
151  if (this->sent_data_.size() > 0) {
153  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
154  ACE_TEXT("destroyed with %d samples sent.\n"),
155  this->sent_data_.size()));
156  }
157 
158  if (this->orphaned_to_transport_.size() > 0) {
159  if (DCPS_debug_level > 0) {
161  ACE_TEXT("(%P|%t) WriteDataContainer::~WriteDataContainer() - ")
162  ACE_TEXT("destroyed with %d samples orphaned_to_transport.\n"),
163  this->orphaned_to_transport_.size()));
164  }
165  }
166 
167  if (!shutdown_) {
169  ACE_TEXT("(%P|%t) ERROR: ")
170  ACE_TEXT("WriteDataContainer::~WriteDataContainer, ")
171  ACE_TEXT("The container has not been cleaned.\n")));
172  }
173 }
174 
175 void
177 {
179 
180  DisjointSequence& ds = acked_sequences_[reader];
181  ds.reset();
184  } else {
185  ds.insert(SequenceRange(SequenceNumber(), base));
186  }
188 }
189 
190 void
192 {
194 
195  const SequenceNumber prev_cum_ack = get_cumulative_ack();
196  const AckedSequenceMap::iterator it = acked_sequences_.find(reader);
197  if (it != acked_sequences_.end()) {
198  acked_sequences_.erase(it);
200  if (prev_cum_ack != get_cumulative_ack()) {
202  }
203  }
204 }
205 
208 {
209  if (acked_sequences_.empty()) {
211  }
212 
214  return cached_cumulative_ack_;
215  }
216 
218  for (AckedSequenceMap::const_iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
219  if (!it->second.empty()) {
220  result = result == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? it->second.cumulative_ack() : std::min(result, it->second.cumulative_ack());
221  }
222  }
223  cached_cumulative_ack_ = result;
225  return result;
226 }
227 
230 {
231  if (acked_sequences_.empty()) {
233  }
234 
236  for (AckedSequenceMap::const_iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
237  if (!it->second.empty()) {
238  result = result == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ? it->second.last_ack() : std::max(result, it->second.last_ack());
239  }
240  }
241  return result;
242 }
243 
244 void
246 {
247  bool do_notify = false;
248  if (id == GUID_UNKNOWN) {
249  for (AckedSequenceMap::iterator it = acked_sequences_.begin(); it != acked_sequences_.end(); ++it) {
250  SequenceNumber prev_cum_ack = it->second.cumulative_ack();
251  it->second.insert(seq);
253  if (prev_cum_ack != it->second.cumulative_ack()) {
254  do_notify = true;
255  }
256  }
257  } else {
258  const AckedSequenceMap::iterator it = acked_sequences_.find(id);
259  if (it != acked_sequences_.end()) {
260  SequenceNumber prev_cum_ack = it->second.cumulative_ack();
261  if (prev_cum_ack < seq) {
262  it->second.insert(SequenceRange(prev_cum_ack, seq));
264  if (prev_cum_ack != it->second.cumulative_ack()) {
265  do_notify = true;
266  }
267  }
268  }
269  }
270  if (do_notify) {
272  }
273 }
274 
277 {
278  // Enqueue to the next_send_sample_ thread of unsent_data_
279  // will link samples with the next_sample/previous_sample and
280  // also next_send_sample_.
281  // This would save time when we actually send the data.
282 
283  if (shutdown_) {
284  return DDS::RETCODE_ERROR;
285  }
286 
287  unsent_data_.enqueue_tail(control_sample);
288 
289  return DDS::RETCODE_OK;
290 }
291 
292 // This method assumes that instance list has space for this sample.
295  DataSampleElement* sample,
296  DDS::InstanceHandle_t instance_handle)
297 {
298  if (shutdown_) {
299  return DDS::RETCODE_ERROR;
300  }
301 
302  // Get the PublicationInstance pointer from InstanceHandle_t.
303  PublicationInstance_rch instance =
304  get_handle_instance(instance_handle);
305  // Extract the instance queue.
306  InstanceDataSampleList& instance_list = instance->samples_;
307 
308  extend_deadline(instance);
309 
310  //
311  // Enqueue to the next_send_sample_ thread of unsent_data_
312  // will link samples with the next_sample/previous_sample and
313  // also next_send_sample_.
314  // This would save time when we actually send the data.
315 
316  unsent_data_.enqueue_tail(sample);
317 
318  //
319  // Add this sample to the INSTANCE scope list.
320  instance_list.enqueue_tail(sample);
321 
322  return DDS::RETCODE_OK;
323 }
324 
327  const DDS::LifespanQosPolicy& lifespan
328 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
329  ,
330  const OPENDDS_STRING& filterClassName,
331  const FilterEvaluator* eval,
332  const DDS::StringSeq& expression_params
333 #endif
334  )
335 {
337  guard,
338  lock_,
340 
341  ssize_t total_size = 0;
342  for (PublicationInstanceMapType::iterator it = instances_.begin();
343  it != instances_.end(); ++it) {
344  const ssize_t durable = std::min(it->second->samples_.size(),
346  total_size += durable;
347  it->second->durable_samples_remaining_ = durable;
348  }
349 
352  reader_id,
353  lifespan,
354 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
355  filterClassName, eval, expression_params,
356 #endif
357  total_size);
358 
360  sent_data_,
361  reader_id,
362  lifespan,
363 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
364  filterClassName, eval, expression_params,
365 #endif
366  total_size);
367 
368  {
371  DisjointSequence& ds = acked_sequences_[reader_id];
373 
374  // Remove exactly what will be sent
376  while (iter != resend_data_.end()) {
377  ds.erase(iter->get_header().sequence_);
378  ++iter;
379  }
380  }
381 
382  if (DCPS_debug_level > 9 && resend_data_.size()) {
384  ACE_TEXT("(%P|%t) WriteDataContainer::reenqueue_all: ")
385  ACE_TEXT("domain %d topic %C publication %C copying ")
386  ACE_TEXT("sending/sent to resend to %C.\n"),
387  domain_id_,
388  topic_name_,
389  LogGuid(publication_id_).c_str(),
390  LogGuid(reader_id).c_str()));
391  }
392 
393  return DDS::RETCODE_OK;
394 }
395 
398  DDS::InstanceHandle_t& instance_handle,
399  Message_Block_Ptr& registered_sample)
400 {
401  PublicationInstance_rch instance;
402 
403  if (instance_handle == DDS::HANDLE_NIL) {
404  if (max_num_instances_ > 0
405  && max_num_instances_ <= (CORBA::Long) instances_.size()) {
407  }
408 
409  // registered the instance for the first time.
410  instance.reset(new PublicationInstance(move(registered_sample)), keep_count());
411 
412  instance_handle = this->writer_->get_next_handle();
413 
414  int const insert_attempt = OpenDDS::DCPS::bind(instances_, instance_handle, instance);
415 
416  if (0 != insert_attempt) {
418  ACE_TEXT("(%P|%t) ERROR: ")
419  ACE_TEXT("WriteDataContainer::register_instance, ")
420  ACE_TEXT("failed to insert instance handle=%X\n"),
421  instance.in()));
422  return DDS::RETCODE_ERROR;
423  } // if (0 != insert_attempt)
424 
425  instance->instance_handle_ = instance_handle;
426 
427  extend_deadline(instance);
428 
429  } else {
430 
431  int const find_attempt = find(instances_, instance_handle, instance);
432 
433  if (0 != find_attempt) {
435  ACE_TEXT("(%P|%t) ERROR: ")
436  ACE_TEXT("WriteDataContainer::register_instance, ")
437  ACE_TEXT("The provided instance handle=%X is not a valid")
438  ACE_TEXT("handle.\n"),
439  instance_handle));
440 
441  return DDS::RETCODE_ERROR;
442  } // if (0 != find_attempt)
443  }
444 
445  // The registered_sample is shallow copied.
446  registered_sample.reset(instance->registered_sample_->duplicate());
447 
448  return DDS::RETCODE_OK;
449 }
450 
453  DDS::InstanceHandle_t instance_handle,
454  Message_Block_Ptr& registered_sample,
455  bool dup_registered_sample)
456 {
458  guard,
459  lock_,
461 
462  PublicationInstance_rch instance;
463  {
464  PublicationInstanceMapType::iterator pos = instances_.find(instance_handle);
465  if (pos == instances_.end()) {
467  ACE_TEXT("(%P|%t) ERROR: ")
468  ACE_TEXT("WriteDataContainer::unregister, ")
469  ACE_TEXT("The instance(handle=%X) ")
470  ACE_TEXT("is not registered yet.\n"),
471  instance_handle),
473  }
474  instance = pos->second;
475  instances_.erase(pos);
476  }
477 
478  return remove_instance(instance, registered_sample, dup_registered_sample);
479 }
480 
483  Message_Block_Ptr& registered_sample,
484  bool dup_registered_sample)
485 {
487  guard,
488  lock_,
490 
491  PublicationInstance_rch instance;
492 
493  int const find_attempt = find(instances_, instance_handle, instance);
494 
495  if (0 != find_attempt) {
497  ACE_TEXT("(%P|%t) ERROR: ")
498  ACE_TEXT("WriteDataContainer::dispose, ")
499  ACE_TEXT("The instance(handle=%X) ")
500  ACE_TEXT("is not registered yet.\n"),
501  instance_handle),
503  }
504 
505  return remove_instance(instance, registered_sample, dup_registered_sample);
506 }
507 
510  Message_Block_Ptr& registered_sample,
511  bool dup_registered_sample)
512 {
513  if (dup_registered_sample) {
514  // The registered_sample is shallow copied.
515  registered_sample.reset(instance->registered_sample_->duplicate());
516  }
517 
518  // Note: The DDS specification is unclear as to if samples in the process
519  // of being sent should be removed or not.
520  // The advantage of calling remove_sample() on them is that the
521  // cached allocator memory for them is freed. The disadvantage
522  // is that the slow reader may see multiple disposes without
523  // any write sample between them and hence not temporarily move into the
524  // Alive state.
525  // We have chosen to NOT remove the sending samples.
526  InstanceDataSampleList& instance_list = instance->samples_;
527 
528  while (instance_list.size() > 0) {
529  bool released = false;
530  const DDS::ReturnCode_t ret = remove_oldest_sample(instance_list, released);
531  if (ret != DDS::RETCODE_OK) {
532  return ret;
533  }
534  }
535 
536  cancel_deadline(instance);
537 
538  return DDS::RETCODE_OK;
539 }
540 
543  size_t& size)
544 {
546  guard,
547  lock_,
549  PublicationInstance_rch instance;
550 
551  int const find_attempt = find(instances_, handle, instance);
552 
553  if (0 != find_attempt) {
554  return DDS::RETCODE_ERROR;
555 
556  } else {
557  size = instance->samples_.size();
558  return DDS::RETCODE_OK;
559  }
560 }
561 
562 size_t
564 {
565  size_t size = 0;
566 
568  guard,
569  lock_,
570  0);
571 
572  for (PublicationInstanceMapType::iterator iter = instances_.begin();
573  iter != instances_.end();
574  ++iter)
575  {
576  size += iter->second->samples_.size();
577  }
578 
579  return size;
580 }
581 
584 {
585  DBG_ENTRY_LVL("WriteDataContainer","get_unsent_data",6);
586  //
587  // The samples in unsent_data are added to the local datawriter
588  // list and enqueued to the sending_data_ signifying they have
589  // been passed to the transport to send in a transaction
590  //
591  list = this->unsent_data_;
592 
593  // Increment send counter for this send operation
594  ++transaction_id_;
595 
596  // Mark all samples with current send counter
598  while (iter != list.end()) {
599  iter->set_transaction_id(this->transaction_id_);
600  ++iter;
601  }
602 
603  //
604  // The unsent_data_ already linked with the
605  // next_send_sample during enqueue.
606  // Append the unsent_data_ to current sending_data_
607  // list.
609 
610  //
611  // Clear the unsent data list.
612  //
613  this->unsent_data_.reset();
614 
615  //
616  // Return the moved list.
617  //
618  return transaction_id_;
619 }
620 
623 {
624  DBG_ENTRY_LVL("WriteDataContainer","get_resend_data",6);
625 
626  //
627  // The samples in unsent_data are added to the sending_data
628  // during enqueue.
629  //
631 
632  //
633  // Clear the unsent data list.
634  //
635  this->resend_data_.reset();
636  //
637  // Return the moved list.
638  //
639  return list;
640 }
641 
642 bool
644 {
645  return this->sending_data_.size() != 0
646  || this->orphaned_to_transport_.size() != 0
647  || this->unsent_data_.size() != 0;
648 }
649 
650 void
652 {
653  DBG_ENTRY_LVL("WriteDataContainer","data_delivered",6);
654 
655  if (DCPS_debug_level >= 2) {
656  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered")
657  ACE_TEXT(" %@\n"), sample));
658  }
659 
661  guard,
662  lock_);
663 
664  // Delivered samples _must_ be on sending_data_ list
665 
666  // If it is not found in one of the lists, an invariant
667  // exception is declared.
668 
669  // The element now needs to be removed from the sending_data_
670  // list, and appended to the end of the sent_data_ list here
671 
672  DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
673 
674  // If sample is on a SendStateDataSampleList it should be on the
675  // sending_data_ list signifying it was given to the transport to
676  // deliver and now the transport is signaling it has been delivered
677  if (!sending_data_.dequeue(sample)) {
678  //
679  // Should be on sending_data_. If it is in sent_data_
680  // or unsent_data there was a problem.
681  //
682  SendStateDataSampleList* send_lists[] = {
683  &sent_data_,
684  &unsent_data_,
686  const SendStateDataSampleList* containing_list =
688 
689  if (containing_list == &sent_data_) {
691  ACE_TEXT("(%P|%t) WARNING: ")
692  ACE_TEXT("WriteDataContainer::data_delivered, ")
693  ACE_TEXT("The delivered sample is not in sending_data_ and ")
694  ACE_TEXT("WAS IN sent_data_.\n")));
695  } else if (containing_list == &unsent_data_) {
697  ACE_TEXT("(%P|%t) WARNING: ")
698  ACE_TEXT("WriteDataContainer::data_delivered, ")
699  ACE_TEXT("The delivered sample is not in sending_data_ and ")
700  ACE_TEXT("WAS IN unsent_data_ list.\n")));
701  } else {
702 
703  //No-op: elements may be removed from all WriteDataContainer lists during shutdown
704  //and inform transport of their release. Transport will call data-delivered on the
705  //elements as it processes the removal but they will already be gone from the send lists.
706  if (stale->get_header().message_id_ != SAMPLE_DATA) {
707  //this message was a control message so release it
708  if (DCPS_debug_level > 9) {
710  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
711  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
712  domain_id_,
713  topic_name_,
714  LogGuid(publication_id_).c_str()));
715  }
717  }
718 
719  if (containing_list == &orphaned_to_transport_) {
720  orphaned_to_transport_.dequeue(sample);
721  release_buffer(stale);
722 
723  } else if (!containing_list) {
724  // samples that were retrieved from get_resend_data()
726  const CORBA::ULong num_subs = stale->get_num_subs();
727  for (CORBA::ULong i = 0; i < num_subs; ++i) {
728  update_acked(stale->get_header().sequence_, stale->get_sub_id(i));
729  }
730  wfa_guard.release();
732  release_buffer(stale);
733  }
734 
735  if (!pending_data()) {
737  }
738  }
739 
740  return;
741  }
742  ACE_GUARD(ACE_SYNCH_MUTEX, wfa_guard, wfa_lock_);
743  SequenceNumber acked_seq = stale->get_header().sequence_;
744  SequenceNumber prev_max = get_cumulative_ack();
745 
746  if (stale->get_header().message_id_ != SAMPLE_DATA) {
747  //this message was a control message so release it
748  if (DCPS_debug_level > 9) {
750  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
751  ACE_TEXT("domain %d topic %C publication %C control message delivered.\n"),
752  domain_id_,
753  topic_name_,
754  LogGuid(publication_id_).c_str()));
755  }
756  release_buffer(stale);
757  stale = 0;
759  } else {
760 
762  const_cast<DataSampleElement*>(sample)->get_header().historic_sample_ = true;
764  sent_data_.enqueue_tail(sample);
765 
766  } else {
768  PublicationInstance_rch inst = sample->get_handle();
769  inst->samples_.dequeue(sample);
770  }
771  release_buffer(stale);
772  stale = 0;
773  }
774 
775  if (DCPS_debug_level > 9) {
777  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
778  ACE_TEXT("domain %d topic %C publication %C seq# %q %s.\n"),
779  domain_id_,
780  topic_name_,
781  LogGuid(publication_id_).c_str(),
782  acked_seq.getValue(),
784  ? ACE_TEXT("stored for durability")
785  : ACE_TEXT("released")));
786  }
787 
789  }
790  if (DCPS_debug_level > 9) {
791  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered: ")
792  ACE_TEXT("Inserting acked_sequence: %q\n"),
793  acked_seq.getValue()));
794  }
795 
796  update_acked(acked_seq);
797 
798  if (prev_max == SequenceNumber::SEQUENCENUMBER_UNKNOWN() ||
799  prev_max < get_cumulative_ack()) {
800 
801  if (DCPS_debug_level > 9) {
803  ACE_TEXT("(%P|%t) WriteDataContainer::data_delivered - ")
804  ACE_TEXT("broadcasting wait_for_acknowledgments update.\n")));
805  }
806 
808  }
809 
810  // Signal if there is no pending data.
811  if (!pending_data()) {
813  }
814 }
815 
816 void
818  bool dropped_by_transport)
819 {
820  DBG_ENTRY_LVL("WriteDataContainer","data_dropped",6);
821 
822  if (DCPS_debug_level >= 2) {
823  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped")
824  ACE_TEXT(" sample %X dropped_by_transport %d\n"),
825  sample, dropped_by_transport));
826  }
827 
828  // If the transport initiates the data dropping, we need do same thing
829  // as data_delivered. e.g. remove the sample from the internal list
830  // and the instance list. We do not need acquire the lock here since
831  // the data_delivered acquires the lock.
832  if (dropped_by_transport) {
833  data_delivered(sample);
834  return;
835  }
836 
837  //The data_dropped could be called from the thread initiating sample remove
838  //which already hold the lock. In this case, it's not necessary to acquire
839  //lock here. It also could be called from the transport thread in a delayed
840  //notification, it's necessary to acquire lock here to protect the internal
841  //structures in this class.
842 
844  guard,
845  lock_);
846 
847  // The dropped sample should be in the sending_data_ list.
848  // Otherwise an exception will be raised.
849  //
850  // We are now been notified by transport, so we can
851  // keep the sample from the sending_data_ list still in
852  // sample list since we will send it.
853 
854  DataSampleElement* stale = const_cast<DataSampleElement*>(sample);
855 
856  // If sample is on a SendStateDataSampleList it should be on the
857  // sending_data_ list signifying it was given to the transport to
858  // deliver and now the transport is signaling it has been dropped
859 
860  if (sending_data_.dequeue(sample)) {
861  // else: The data_dropped is called as a result of remove_sample()
862  // called from reenqueue_all() which supports the TRANSIENT_LOCAL
863  // qos. The samples that are sending by transport are dropped from
864  // transport and will be moved to the unsent list for resend.
866  unsent_data_.enqueue_tail(sample);
867  } else {
869  release_buffer(stale);
870  stale = 0;
871  }
872 
873  } else {
874  //
875  // If it is in sent_data_ or unsent_data there was a problem.
876  //
877  SendStateDataSampleList* send_lists[] = {
878  &sent_data_,
879  &unsent_data_,
881  const SendStateDataSampleList* containing_list =
883 
884  if (containing_list == &sent_data_) {
886  ACE_TEXT("(%P|%t) WARNING: ")
887  ACE_TEXT("WriteDataContainer::data_dropped, ")
888  ACE_TEXT("The dropped sample is not in sending_data_ and ")
889  ACE_TEXT("WAS IN sent_data_.\n")));
890  } else if (containing_list == &unsent_data_) {
892  ACE_TEXT("(%P|%t) WARNING: ")
893  ACE_TEXT("WriteDataContainer::data_dropped, ")
894  ACE_TEXT("The dropped sample is not in sending_data_ and ")
895  ACE_TEXT("WAS IN unsent_data_ list.\n")));
896  } else {
897 
898  //No-op: elements may be removed from all WriteDataContainer lists during shutdown
899  //and inform transport of their release. Transport will call data-dropped on the
900  //elements as it processes the removal but they will already be gone from the send lists.
901  if (stale->get_header().message_id_ != SAMPLE_DATA) {
902  //this message was a control message so release it
903  if (DCPS_debug_level > 9) {
905  ACE_TEXT("(%P|%t) WriteDataContainer::data_dropped: ")
906  ACE_TEXT("domain %d topic %C publication %C control message dropped.\n"),
907  domain_id_,
908  topic_name_,
909  LogGuid(publication_id_).c_str()));
910  }
912  }
913 
914  if (containing_list == &orphaned_to_transport_) {
915  orphaned_to_transport_.dequeue(sample);
916  release_buffer(stale);
917  stale = 0;
918  if (!pending_data()) {
920  }
921 
922  } else if (!containing_list) {
923  // samples that were retrieved from get_resend_data()
925  release_buffer(stale);
926  stale = 0;
927  }
928  }
929 
930  return;
931  }
932 
934 
935  if (!pending_data()) {
937  }
938 }
939 
940 void
942 {
944  return;
945 
946  size_t n_released = 0;
947 
948  for (PublicationInstanceMapType::iterator iter = instances_.begin();
949  iter != instances_.end();
950  ++iter) {
951 
952  CORBA::Long durable_allowed = max_durable_per_instance_;
953  InstanceDataSampleList& instance_list = iter->second->samples_;
954 
955  for (DataSampleElement* it = instance_list.tail(), *prev; it; it = prev) {
956  prev = InstanceDataSampleList::prev(it);
957 
958  if (DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, it->get_sample())) {
959 
960  if (durable_allowed) {
961  --durable_allowed;
962  } else {
963  instance_list.dequeue(it);
964  sent_data_.dequeue(it);
965  release_buffer(it);
966  ++n_released;
967  }
968  }
969  }
970  }
971 
972  if (n_released && DCPS_debug_level > 9) {
974  ACE_TEXT("(%P|%t) WriteDataContainer::remove_excess_durable: ")
975  ACE_TEXT("domain %d topic %C publication %C %B samples removed ")
976  ACE_TEXT("from durable data.\n"), domain_id_, topic_name_,
977  LogGuid(publication_id_).c_str(), n_released));
978  }
979 }
980 
981 
984  InstanceDataSampleList& instance_list,
985  bool& released)
986 {
987  DataSampleElement* stale = 0;
988 
989  //
990  // Remove the oldest sample from the instance list.
991  //
992  if (!instance_list.dequeue_head(stale)) {
994  ACE_TEXT("(%P|%t) ERROR: ")
995  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
996  ACE_TEXT("dequeue_head_next_sample failed\n")),
998  }
999 
1000  //
1001  // Remove the stale data from the next_writer_sample_ list. The
1002  // sending_data_/next_send_sample_ list is not managed within the
1003  // container, it is only used external to the container and does
1004  // not need to be managed internally.
1005  //
1006  // The next_writer_sample_ link is being used in one of the sent_data_,
1007  // sending_data_, or unsent_data lists. Removal from the doubly
1008  // linked list needs to repair the list only when the stale sample
1009  // is either the head or tail of the list.
1010  //
1011 
1012  //
1013  // Locate the head of the list that the stale data is in.
1014  //
1015  SendStateDataSampleList* send_lists[] = {
1016  &sending_data_,
1017  &sent_data_,
1018  &unsent_data_,
1020  const SendStateDataSampleList* containing_list =
1022 
1023  //
1024  // Identify the list that the stale data is in.
1025  // The stale data should be in one of the sent_data_, sending_data_
1026  // or unsent_data_. It should not be in released_data_ list since
1027  // this function is the only place a sample is moved from
1028  // sending_data_ to released_data_ list.
1029 
1030  // Remove the element from the internal list.
1031  bool result = false;
1032 
1033  if (containing_list == &this->sending_data_) {
1034  if (DCPS_debug_level > 2) {
1036  ACE_TEXT("(%P|%t) WARNING: ")
1037  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1038  ACE_TEXT("removing from sending_data_ so must notify transport to remove sample\n")));
1039  }
1040 
1041  // This means transport is still using the sample that needs to
1042  // be released currently so notify transport that sample is being removed.
1043 
1044  if (this->writer_->remove_sample(stale)) {
1045  if (this->sent_data_.dequeue(stale)) {
1046  release_buffer(stale);
1047  }
1048  result = true;
1049 
1050  } else {
1051  if (this->sending_data_.dequeue(stale)) {
1052  this->orphaned_to_transport_.enqueue_tail(stale);
1053  } else if (this->sent_data_.dequeue(stale)) {
1054  release_buffer(stale);
1055  result = true;
1056  }
1057  result = true;
1058  }
1059  released = true;
1060 
1061  } else if (containing_list == &this->sent_data_) {
1062  // No one is using the data sample, so we can release it back to
1063  // its allocator.
1064  //
1065  result = this->sent_data_.dequeue(stale) != 0;
1066  release_buffer(stale);
1067  released = true;
1068 
1069  if (DCPS_debug_level > 9) {
1071  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
1072  ACE_TEXT("domain %d topic %C publication %C sample removed from HISTORY.\n"),
1073  this->domain_id_,
1074  this->topic_name_,
1075  LogGuid(publication_id_).c_str()));
1076  }
1077 
1078  } else if (containing_list == &this->unsent_data_) {
1079  //
1080  // No one is using the data sample, so we can release it back to
1081  // its allocator.
1082  //
1083  result = this->unsent_data_.dequeue(stale) != 0;
1084  release_buffer(stale);
1085  released = true;
1086 
1087  if (DCPS_debug_level > 9) {
1089  ACE_TEXT("(%P|%t) WriteDataContainer::remove_oldest_sample: ")
1090  ACE_TEXT("domain %d topic %C publication %C sample removed from unsent.\n"),
1091  this->domain_id_,
1092  this->topic_name_,
1093  LogGuid(publication_id_).c_str()));
1094  }
1095  } else {
1097  ACE_TEXT("(%P|%t) ERROR: ")
1098  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1099  ACE_TEXT("The oldest sample is not in any internal list.\n")),
1101  }
1102 
1103  if (!pending_data()) {
1105  }
1106 
1107  if (!result) {
1109  ACE_TEXT("(%P|%t) ERROR: ")
1110  ACE_TEXT("WriteDataContainer::remove_oldest_sample, ")
1111  ACE_TEXT("dequeue_next_send_sample from internal list failed.\n")),
1113 
1114  }
1115 
1116  return DDS::RETCODE_OK;
1117 }
1118 
1121 {
1122  DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer_for_control", 6);
1123 
1125  element,
1126  static_cast<DataSampleElement*>(
1128  sizeof(DataSampleElement))),
1130  this->writer_,
1133 
1134  return DDS::RETCODE_OK;
1135 }
1136 
1139  DDS::InstanceHandle_t handle)
1140 {
1141  DBG_ENTRY_LVL("WriteDataContainer","obtain_buffer", 6);
1142 
1144 
1145  PublicationInstance_rch instance = get_handle_instance(handle);
1146 
1147  if (!instance) {
1149  }
1150 
1152  element,
1153  static_cast<DataSampleElement*>(
1155  sizeof(DataSampleElement))),
1157  this->writer_,
1158  instance),
1160 
1161  // Extract the current instance queue.
1162  InstanceDataSampleList& instance_list = instance->samples_;
1164 
1165  bool set_timeout = true;
1166  MonotonicTimePoint timeout;
1167 
1168  //max_num_samples_ covers ResourceLimitsQosPolicy max_samples and
1169  //max_instances and max_instances * depth
1170  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1171  while ((instance_list.size() >= max_samples_per_instance_) ||
1172  ((this->max_num_samples_ > 0) &&
1173  ((CORBA::Long) this->num_all_samples () >= this->max_num_samples_))) {
1174 
1176  if (instance_list.size() >= history_depth_) {
1177  if (DCPS_debug_level >= 2) {
1178  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1179  ACE_TEXT(" instance %d attempting to remove")
1180  ACE_TEXT(" its oldest sample (reliable)\n"),
1181  handle));
1182  }
1183  bool oldest_released = false;
1184  ret = remove_oldest_sample(instance_list, oldest_released);
1185  if (oldest_released) {
1186  break;
1187  }
1188  }
1189  // Reliable writers can wait
1190  if (set_timeout) {
1192  set_timeout = false;
1193  }
1194  if (!shutdown_ && MonotonicTimePoint::now() < timeout) {
1195  if (DCPS_debug_level >= 2) {
1196  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1197  ACE_TEXT(" instance %d waiting for samples to be released by transport\n"),
1198  handle));
1199  }
1200 
1201  waiting_on_release_ = true;
1202  switch (condition_.wait_until(timeout, thread_status_manager)) {
1203  case CvStatus_NoTimeout:
1205  break;
1206 
1207  case CvStatus_Timeout:
1208  if (DCPS_debug_level >= 2) {
1209  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1210  ACE_TEXT(" instance %d timed out waiting for samples to be released by transport\n"),
1211  handle));
1212  }
1213  ret = DDS::RETCODE_TIMEOUT;
1214  break;
1215 
1216  case CvStatus_Error:
1217  if (DCPS_debug_level) {
1218  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::obtain_buffer: "
1219  "error in wait_until\n"));
1220  }
1221  ret = DDS::RETCODE_ERROR;
1222  break;
1223  }
1224 
1225  } else {
1226  //either shutdown has been signaled or max_blocking_time
1227  //has surpassed so treat as timeout
1228  ret = DDS::RETCODE_TIMEOUT;
1229  }
1230 
1231  } else {
1232  //BEST EFFORT
1233  bool oldest_released = false;
1234 
1235  //try to remove stale samples from this instance
1236  // The remove_oldest_sample() method removes the oldest sample
1237  // from instance list and removes it from the internal lists.
1238  if (instance_list.size() > 0) {
1239  if (DCPS_debug_level >= 2) {
1240  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1241  ACE_TEXT(" instance %d attempting to remove")
1242  ACE_TEXT(" its oldest sample\n"),
1243  handle));
1244  }
1245  ret = remove_oldest_sample(instance_list, oldest_released);
1246  }
1247  //else try to remove stale samples from other instances which are full
1248  if (ret == DDS::RETCODE_OK && !oldest_released) {
1249  if (DCPS_debug_level >= 2) {
1250  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1251  ACE_TEXT(" instance %d attempting to remove")
1252  ACE_TEXT(" oldest sample from any full instances\n"),
1253  handle));
1254  }
1255  PublicationInstanceMapType::iterator it = instances_.begin();
1256 
1257  while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
1258  if (it->second->samples_.size() >= max_samples_per_instance_) {
1259  ret = remove_oldest_sample(it->second->samples_, oldest_released);
1260  }
1261  ++it;
1262  }
1263  }
1264  //else try to remove stale samples from other non-full instances
1265  if (ret == DDS::RETCODE_OK && !oldest_released) {
1266  if (DCPS_debug_level >= 2) {
1267  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1268  ACE_TEXT(" instance %d attempting to remove")
1269  ACE_TEXT(" oldest sample from any instance with samples currently\n"),
1270  handle));
1271  }
1272  PublicationInstanceMapType::iterator it = instances_.begin();
1273 
1274  while (!oldest_released && it != instances_.end() && ret == DDS::RETCODE_OK) {
1275  if (it->second->samples_.size() > 0) {
1276  ret = remove_oldest_sample(it->second->samples_, oldest_released);
1277  }
1278  ++it;
1279  }
1280  }
1281  if (!oldest_released) {
1282  //This means that no instances have samples to remove and yet
1283  //still hitting resource limits.
1285  ACE_TEXT("(%P|%t) ERROR: ")
1286  ACE_TEXT("WriteDataContainer::obtain_buffer, ")
1287  ACE_TEXT("hitting resource limits with no samples to remove\n")));
1288  ret = DDS::RETCODE_ERROR;
1289  }
1290  } //END BEST EFFORT
1291 
1292  if (ret != DDS::RETCODE_OK) {
1293  if (DCPS_debug_level >= 2) {
1294  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::obtain_buffer")
1295  ACE_TEXT(" instance %d could not obtain buffer for sample")
1296  ACE_TEXT(" releasing allotted sample and returning\n"),
1297  handle));
1298  }
1299  this->release_buffer(element);
1300  return ret;
1301  }
1302  } //END WHILE
1303 
1304  data_holder_.enqueue_tail(element);
1305 
1306  return ret;
1307 }
1308 
1309 void
1311 {
1312  if (element->get_header().message_id_ == SAMPLE_DATA)
1313  data_holder_.dequeue(element);
1314  // Release the memory to the allocator.
1315  ACE_DES_FREE(element,
1318 }
1319 
1320 void
1322 {
1323  DBG_ENTRY_LVL("WriteDataContainer","unregister_all",6);
1324  shutdown_ = true;
1325 
1326  //The internal list needs protection since this call may result from the
1327  //the delete_datawriter call which does not acquire the lock in advance.
1329  guard,
1330  lock_);
1331  // Tell transport remove all control messages currently
1332  // transport is processing.
1333  (void) this->writer_->remove_all_msgs();
1334 
1335  // Broadcast to wake up all waiting threads.
1336  if (waiting_on_release_) {
1338  }
1339 
1340  Message_Block_Ptr registered_sample;
1341 
1342  for (PublicationInstanceMapType::iterator pos = instances_.begin(), limit = instances_.end(); pos != limit;) {
1343  // Release the instance data.
1344  if (remove_instance(pos->second, registered_sample, false) != DDS::RETCODE_OK) {
1345  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::unregister_all, "
1346  "remove_instance %X failed\n", pos->first));
1347  }
1348 
1349  writer_->return_handle(pos->first);
1350  instances_.erase(pos++);
1351  }
1352 }
1353 
1356 {
1357  PublicationInstance_rch instance;
1358 
1359  if (0 != find(instances_, handle, instance)) {
1361  ACE_TEXT("(%P|%t) ")
1362  ACE_TEXT("WriteDataContainer::get_handle_instance, ")
1363  ACE_TEXT("lookup for %d failed\n"), handle));
1364  }
1365 
1366  return instance;
1367 }
1368 
1369 void
1371  const SendStateDataSampleList& appended,
1372  const GUID_t& reader_id,
1373  const DDS::LifespanQosPolicy& lifespan,
1374 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1375  const OPENDDS_STRING& filterClassName,
1376  const FilterEvaluator* eval,
1377  const DDS::StringSeq& params,
1378 #endif
1379  ssize_t& max_resend_samples)
1380 {
1382  cur != appended.rend() && max_resend_samples; ++cur) {
1383 
1384  if (resend_data_expired(*cur, lifespan))
1385  continue;
1386 
1387 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1388  if (eval && writer_->filter_out(*cur, filterClassName, *eval, params))
1389  continue;
1390 #endif
1391 
1392  PublicationInstance_rch inst = cur->get_handle();
1393 
1394  if (!inst) {
1395  // *cur is a control message, just skip it
1396  continue;
1397  }
1398 
1399  if (inst->durable_samples_remaining_ == 0)
1400  continue;
1402 
1403  DataSampleElement* element = 0;
1404  ACE_NEW_MALLOC(element,
1405  static_cast<DataSampleElement*>(
1407  sizeof(DataSampleElement))),
1408  DataSampleElement(*cur));
1409 
1410  element->set_num_subs(1);
1411  element->set_sub_id(0, reader_id);
1412 
1413  if (DCPS_debug_level > 9) {
1414  ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::copy_and_prepend added seq# %q\n",
1415  cur->get_header().sequence_.getValue()));
1416  }
1417 
1418  list.enqueue_head(element);
1419  --max_resend_samples;
1420  }
1421 }
1422 
1423 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1424 bool
1426 {
1427  bool result = true;
1428 
1429  // ------------------------------------------------------------
1430  // Transfer sent data to data DURABILITY cache.
1431  // ------------------------------------------------------------
1432  if (this->durability_cache_) {
1433  // A data durability cache is available for TRANSIENT or
1434  // PERSISTENT data durability. Cache the data samples.
1435 
1436  //
1437  // We only cache data that is not still in use outside of
1438  // this instance of WriteDataContainer
1439  // (only cache samples in sent_data_ meaning transport has delivered).
1440  bool const inserted =
1441  this->durability_cache_->insert(this->domain_id_,
1442  this->topic_name_,
1443  this->type_name_,
1444  this->sent_data_,
1445  this->durability_service_
1446  );
1447 
1448  result = inserted;
1449 
1450  if (!inserted)
1452  ACE_TEXT("(%P|%t) ERROR: ")
1453  ACE_TEXT("WriteDataContainer::persist_data, ")
1454  ACE_TEXT("failed to make data durable for ")
1455  ACE_TEXT("(domain, topic, type) = (%d, %C, %C)\n"),
1456  this->domain_id_,
1457  this->topic_name_,
1458  this->type_name_));
1459  }
1460 
1461  return result;
1462 }
1463 #endif
1464 
1466 {
1467  const bool no_deadline = deadline.is_zero();
1469  const bool report = DCPS_debug_level > 0 && pending_data();
1470  if (report) {
1471  if (no_deadline) {
1472  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending no timeout\n")));
1473  } else {
1474  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending ")
1475  ACE_TEXT("timeout at %#T\n"),
1476  &deadline.value()));
1477  }
1478  }
1479 
1480  bool loop = true;
1481  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1482  while (loop && pending_data()) {
1483  switch (empty_condition_.wait_until(deadline, thread_status_manager)) {
1484  case CvStatus_NoTimeout:
1485  break;
1486 
1487  case CvStatus_Timeout:
1488  if (pending_data()) {
1489  if (DCPS_debug_level >= 2) {
1490  ACE_DEBUG((LM_INFO, "(%P|%t) WriteDataContainer::wait_pending: "
1491  "Timed out waiting for messages to be transported\n"));
1492  log_send_state_lists("WriteDataContainer::wait_pending - wait timedout: ");
1493  }
1494  }
1495  loop = false;
1496  break;
1497 
1498  case CvStatus_Error:
1499  if (DCPS_debug_level) {
1500  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::wait_pending: "
1501  "error in wait_until\n"));
1502  }
1503  loop = false;
1504  break;
1505  }
1506  }
1507  if (report) {
1508  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_pending done\n")));
1509  }
1510 }
1511 
1512 void
1513 WriteDataContainer::get_instance_handles(InstanceHandleVec& instance_handles)
1514 {
1516  guard,
1517  lock_);
1518  PublicationInstanceMapType::iterator it = instances_.begin();
1519 
1520  while (it != instances_.end()) {
1521  instance_handles.push_back(it->second->instance_handle_);
1522  ++it;
1523  }
1524 }
1525 
1528  bool deadline_is_infinite,
1529  const SequenceNumber& sequence)
1530 {
1532  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
1533  while ((deadline_is_infinite || MonotonicTimePoint::now() < deadline) && !sequence_acknowledged_i(sequence)) {
1534  switch (deadline_is_infinite ? wfa_condition_.wait(thread_status_manager) : wfa_condition_.wait_until(deadline, thread_status_manager)) {
1535  case CvStatus_NoTimeout:
1536  break;
1537  case CvStatus_Timeout:
1538  if (DCPS_debug_level >= 2) {
1539  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::wait_ack_of_seq")
1540  ACE_TEXT(" timed out waiting for sequence %q to be acked\n"),
1541  sequence.getValue()));
1542  }
1543  return DDS::RETCODE_TIMEOUT;
1544  case CvStatus_Error:
1545  if (DCPS_debug_level) {
1546  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WriteDataContainer::wait_ack_of_seq: "
1547  "error in wait/wait_until\n"));
1548  }
1549  return DDS::RETCODE_ERROR;
1550  }
1551  }
1552 
1554 }
1555 
1556 bool
1558 {
1560  return sequence_acknowledged_i(sequence);
1561 }
1562 
1563 bool
1565 {
1566  if (sequence == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
1567  //return true here so that wait_for_acknowledgments doesn't block
1568  return true;
1569  }
1570 
1572  if (DCPS_debug_level >= 10) {
1573  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) WriteDataContainer::sequence_acknowledged_i ")
1574  ACE_TEXT("- %C cumulative ack is currently: %q\n"), DCPS::LogGuid(publication_id_).c_str(), acked.getValue()));
1575  }
1576  if (acked == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || acked < sequence){
1577  return false;
1578  }
1579  return true;
1580 }
1581 
1582 void
1584 {
1585  if (!stale && waiting_on_release_) {
1586  waiting_on_release_ = false;
1587 
1589  }
1590 }
1591 
1592 void
1594 {
1595  ACE_DEBUG((LM_DEBUG, "(%P|%t) WriteDataContainer::log_send_state_lists: %C -- unsent(%d), sending(%d), sent(%d), orphaned_to_transport(%d), num_all_samples(%d), num_instances(%d)\n",
1596  description.c_str(),
1597  unsent_data_.size(),
1598  sending_data_.size(),
1599  sent_data_.size(),
1601  num_all_samples(),
1602  instances_.size()));
1603 }
1604 
1605 void
1607 {
1608  // Call comes from DataWriterImpl_t which should arleady have the lock_.
1609 
1610  // Deadline for all instances starting from now.
1611  const MonotonicTimePoint deadline = MonotonicTimePoint::now() + deadline_period;
1612 
1613  // Reset the deadline timer if the period has changed.
1614  if (deadline_period_ != deadline_period) {
1616  OPENDDS_ASSERT(deadline_map_.empty());
1617 
1618  for (PublicationInstanceMapType::iterator iter = instances_.begin();
1619  iter != instances_.end();
1620  ++iter) {
1621  iter->second->deadline_ = deadline;
1622  deadline_map_.insert(std::make_pair(deadline, iter->second));
1623  }
1624 
1625  if (!deadline_map_.empty()) {
1626  deadline_task_->schedule(deadline_period);
1627  }
1628  } else if (deadline_period == TimeDuration::max_value) {
1629  if (!deadline_map_.empty()) {
1630  deadline_task_->cancel();
1631  }
1632 
1633  deadline_map_.clear();
1634  } else {
1635  DeadlineMapType new_map;
1636  for (PublicationInstanceMapType::iterator iter = instances_.begin();
1637  iter != instances_.end();
1638  ++iter) {
1639  iter->second->deadline_ = deadline;
1640  new_map.insert(std::make_pair(iter->second->deadline_, iter->second));
1641  }
1642  std::swap(new_map, deadline_map_);
1643 
1644  if (!deadline_map_.empty()) {
1645  deadline_task_->cancel();
1646  deadline_task_->schedule(deadline_map_.begin()->first - MonotonicTimePoint::now());
1647  }
1648  }
1649 
1650  deadline_period_ = deadline_period;
1651  }
1652 }
1653 
1654 void
1656 {
1657  // Lock the DataWriterImpl.
1659  // Lock ourselves.
1661 
1662  if (deadline_map_.empty()) {
1663  return;
1664  }
1665 
1666  bool notify = false;
1667 
1668  for (DeadlineMapType::iterator pos = deadline_map_.begin(), limit = deadline_map_.end();
1669  pos != limit && pos->first < now; pos = deadline_map_.begin()) {
1670 
1671  PublicationInstance_rch instance = pos->second;
1672  deadline_map_.erase(pos);
1673 
1677 
1679  notify = true;
1680 
1681  DDS::DataWriterListener_var listener = writer_->listener_for(DDS::OFFERED_DEADLINE_MISSED_STATUS);
1682 
1683  if (listener) {
1684  // Copy before releasing the lock.
1686 
1687  // Release the lock during the upcall.
1689  ACE_GUARD(ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex>, rev_dwi_guard, deadline_reverse_status_lock);
1690 
1691  // @todo Will this operation ever throw? If so we may want to
1692  // catch all exceptions, and act accordingly.
1693  listener->on_offered_deadline_missed(writer_, status);
1694 
1695  // We need to update the last total count value to our current total
1696  // so that the next time we will calculate the correct total_count_change;
1698  }
1699 
1700  instance->deadline_ += deadline_period_;
1701  deadline_map_.insert(std::make_pair(instance->deadline_, instance));
1702  }
1703 
1704  if (notify) {
1706  }
1707 
1708  deadline_task_->schedule(deadline_map_.begin()->first - now);
1709 }
1710 
1711 void
1713 {
1714  // Call comes from DataWriterImpl_t which should arleady have the lock_.
1715 
1717  return;
1718  }
1719 
1720  std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r = deadline_map_.equal_range(instance->deadline_);
1721  while (r.first != r.second && r.first->second != instance) {
1722  ++r.first;
1723  }
1724  if (r.first != r.second) {
1725  // The instance was in the map.
1726  deadline_map_.erase(r.first);
1727  }
1729  bool schedule = deadline_map_.empty();
1730  deadline_map_.insert(std::make_pair(instance->deadline_, instance));
1731  if (schedule) {
1732  deadline_task_->schedule(deadline_period_);
1733  }
1734 }
1735 
1736 void
1738 {
1739  // Call comes from DataWriterImpl_t which should arleady have the lock_.
1740 
1742  return;
1743  }
1744 
1745  std::pair<DeadlineMapType::iterator, DeadlineMapType::iterator> r = deadline_map_.equal_range(instance->deadline_);
1746  while (r.first != r.second && r.first->second != instance) {
1747  ++r.first;
1748  }
1749  if (r.first != r.second) {
1750  deadline_map_.erase(r.first);
1751  if (deadline_map_.empty()) {
1752  deadline_task_->cancel();
1753  }
1754  }
1755 }
1756 
1757 } // namespace DCPS
1758 } // namespace OpenDDS
1759 
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
DataSampleElementAllocator sample_list_element_allocator_
RcHandle< PublicationInstance > PublicationInstance_rch
void swap(MessageBlock &lhs, MessageBlock &rhs)
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_CDR::Long Long
void remove_reader_acks(const GUID_t &reader)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool insert(DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, SendStateDataSampleList &the_data, DDS::DurabilityServiceQosPolicy const &qos)
#define ACE_SYNCH_MUTEX
void copy_and_prepend(SendStateDataSampleList &list, const SendStateDataSampleList &appended, const GUID_t &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params, ssize_t &max_resend_samples)
const DataSampleHeader & get_header() const
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
void erase(SequenceNumber value)
bool dequeue(const DataSampleElement *stale)
ReliabilityQosPolicy reliability
bool sequence_acknowledged_i(const SequenceNumber &sequence)
iterator end()
Return iterator to end of list.
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
void set_sub_id(CORBA::ULong index, OpenDDS::DCPS::GUID_t id)
DDS::ReturnCode_t remove_oldest_sample(InstanceDataSampleList &instance_list, bool &released)
void log_send_state_lists(OPENDDS_STRING description)
LM_INFO
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
DDS::ReturnCode_t wait_ack_of_seq(const MonotonicTimePoint &abs_deadline, bool deadline_is_infinite, const SequenceNumber &sequence)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
static const SendStateDataSampleList * send_list_containing_element(const DataSampleElement *element, SendStateDataSampleList **begin, SendStateDataSampleList **end)
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
GUID_t publication_id_
The publication Id from repo.
bool sequence_acknowledged(const SequenceNumber &sequence)
DDS::ReturnCode_t enqueue(DataSampleElement *sample, DDS::InstanceHandle_t instance)
DDS::ReturnCode_t enqueue_control(DataSampleElement *control_sample)
void cancel_deadline(const PublicationInstance_rch &instance)
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
bool dequeue(const DataSampleElement *stale)
Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
void update_acked(const SequenceNumber &seq, const GUID_t &id=GUID_UNKNOWN)
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
void wait_pending(const MonotonicTimePoint &deadline)
const ACE_Time_Value & value() const
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
SendStateDataSampleList orphaned_to_transport_
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
ACE_Recursive_Thread_Mutex & deadline_status_lock_
Lock for synchronization of status_ member.
DDS::ReturnCode_t register_instance(DDS::InstanceHandle_t &instance_handle, Message_Block_Ptr &registered_sample)
int ssize_t
int release(void)
PublicationInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
ACE_Guard< ACE_Thread_Mutex > lock_
void set_transaction_id(ACE_UINT64 transaction_id)
static bool on_some_list(const DataSampleElement *iter)
DDS::InstanceHandle_t get_next_handle()
void release_buffer(DataSampleElement *element)
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
#define OPENDDS_STRING
MonotonicTimePoint deadline_
Deadline for Deadline QoS.
DDS::ReturnCode_t unregister(DDS::InstanceHandle_t handle, Message_Block_Ptr &registered_sample, bool dup_registered_sample=true)
DDS::ReturnCode_t obtain_buffer(DataSampleElement *&element, DDS::InstanceHandle_t handle)
DOMAINID_TYPE_NATIVE DomainId_t
bool remove_sample(const DataSampleElement *sample)
void enqueue_tail(const DataSampleElement *element)
LM_DEBUG
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
void add_reader_acks(const GUID_t &reader, const SequenceNumber &base)
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
const StatusKind OFFERED_DEADLINE_MISSED_STATUS
DDS::DurabilityServiceQosPolicy const & durability_service_
DURABILITY_SERVICE QoS specific to the DataWriter.
OpenDDS::DCPS::GUID_t get_sub_id(CORBA::ULong index) const
PublicationInstance_rch get_handle() const
bool dequeue_head(DataSampleElement *&stale)
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
SendStateDataSampleList sending_data_
List of data that is currently being sent.
void data_dropped(const DataSampleElement *element, bool dropped_by_transport)
const ReturnCode_t RETCODE_TIMEOUT
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ssize_t durable_samples_remaining_
Only used by WriteDataContainer::reenqueue_all() while WDC is locked.
void free(void *ptr)
Return a chunk of memory back to free list cache.
void return_handle(DDS::InstanceHandle_t handle)
ACE_INLINE OpenDDS_Dcps_Export ACE_Time_Value time_to_time_value(const DDS::Time_t &t)
ACE_UINT64 get_unsent_data(SendStateDataSampleList &list)
iterator begin()
Return iterator to beginning of list.
virtual ACE_Message_Block * duplicate(void) const
time_t sec(void) const
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
static SequenceNumber ZERO()
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
LM_WARNING
bool dequeue_head(DataSampleElement *&stale)
WriteDataContainer(DataWriterImpl *writer, CORBA::Long max_samples_per_instance, CORBA::Long history_depth, CORBA::Long max_durable_per_instance, DDS::Duration_t max_blocking_time, size_t n_chunks, DDS::DomainId_t domain_id, char const *topic_name, char const *type_name, DataDurabilityCache *durability_cache, DDS::DurabilityServiceQosPolicy const &durability_service, CORBA::Long max_instances, CORBA::Long max_total_samples, ACE_Recursive_Thread_Mutex &deadline_status_lock, DDS::OfferedDeadlineMissedStatus &deadline_status, CORBA::Long &deadline_last_total_count)
void set_num_subs(CORBA::ULong num_subs)
bool dequeue(const DataSampleElement *stale)
void data_delivered(const DataSampleElement *sample)
bool notify_all()
Unblock all of the threads waiting on this condition.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex lock_
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
bool waiting_on_release_
The block waiting flag.
The wait has returned because of a timeout.
unsigned long long ACE_UINT64
std::pair< SequenceNumber, SequenceNumber > SequenceRange
DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
static void remove(DataSampleElement *stale)
DDS::OfferedDeadlineMissedStatus & deadline_status_
Reference to the missed requested deadline status structure.
bool resend_data_expired(const DataSampleElement &element, const DDS::LifespanQosPolicy &lifespan)
suseconds_t usec(void) const
SendStateDataSampleList resend_data_
static void set_flag(DataSampleHeaderFlag flag, ACE_Message_Block *buffer)
WfaConditionVariableType wfa_condition_
Used to block in wait_for_acks().
void wakeup_blocking_writers(DataSampleElement *stale)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void enqueue_head(const DataSampleElement *element)
DataWriterImpl * writer_
The writer that owns this container.
void enqueue_tail(const DataSampleElement *element)
InstanceDataSampleList samples_
History of the instance samples.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void get_instance_handles(InstanceHandleVec &instance_handles)
SendStateDataSampleList STL-style iterator implementation.
CORBA::Long & deadline_last_total_count_
Last total_count when status was last checked.
Sequence number abstraction. Only allows positive 64 bit values.
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static TransportRegistry * instance()
Return a singleton instance of this class.
void set_deadline_period(const TimeDuration &deadline_period)
SendStateDataSampleList sent_data_
List of data that has already been sent.
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle, Message_Block_Ptr &registered_sample, bool dup_registered_sample=true)
void process_deadlines(const MonotonicTimePoint &now)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
DDS::DomainId_t const domain_id_
Domain ID.
DDS::ReturnCode_t remove_instance(PublicationInstance_rch instance, Message_Block_Ptr &registered_sample, bool dup_registered_sample)
void enqueue_tail(const DataSampleElement *element)
#define ACE_ERROR_RETURN(X, Y)
The wait has returned because it was woken up.
DDS::ReturnCode_t reenqueue_all(const GUID_t &reader_id, const DDS::LifespanQosPolicy &lifespan, const OPENDDS_STRING &filterClassName, const FilterEvaluator *eval, const DDS::StringSeq &params)
std::reverse_iterator< const_iterator > const_reverse_iterator
Message_Block_Ptr registered_sample_
The sample data for registration.
DataDurabilityCache *const durability_cache_
Pointer to the data durability cache.
#define TheServiceParticipant
void extend_deadline(const PublicationInstance_rch &instance)
DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle, size_t &size)
LM_ERROR
const ACE_Time_Value_T< AceClock > & value() const
Definition: TimePoint_T.inl:49
char const *const topic_name_
Topic name.
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
A container for instances sample data.
static DataSampleElement * prev(const DataSampleElement *iter)
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
ConditionVariableType empty_condition_
char const *const type_name_
Type name.
Struct that has information about an instance and the instance sample list.
DDS::ReturnCode_t obtain_buffer_for_control(DataSampleElement *&element)
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
SendStateDataSampleList get_resend_data()
static const TimeDuration max_value
Definition: TimeDuration.h:32
const ReturnCode_t RETCODE_BAD_PARAMETER
bool shutdown_
The flag indicates the datawriter will be destroyed.
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.