OpenDDS  Snapshot(2023/04/28-20:55)
DataWriterImpl.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 
8 #include "DataWriterImpl.h"
9 
11 #include "DomainParticipantImpl.h"
12 #include "PublisherImpl.h"
13 #include "Service_Participant.h"
14 #include "GuidConverter.h"
15 #include "TopicImpl.h"
16 #include "PublicationInstance.h"
17 #include "Serializer.h"
18 #include "Transient_Kludge.h"
19 #include "DataDurabilityCache.h"
20 #include "MonitorFactory.h"
22 #include "DataSampleElement.h"
23 #include "Util.h"
24 #include "DCPS_Utils.h"
25 #include "XTypes/TypeObject.h"
26 #include "TypeSupportImpl.h"
27 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
28 # include "CoherentChangeControl.h"
29 #endif
30 #include "AssociationData.h"
34 #ifndef DDS_HAS_MINIMUM_BIT
35 # include "BuiltInTopicUtils.h"
36 #endif
37 
38 #ifndef DDS_HAS_MINIMUM_BIT
39 # include <dds/DdsDcpsCoreTypeSupportC.h>
40 #endif // !defined (DDS_HAS_MINIMUM_BIT)
41 #include <dds/DdsDcpsCoreC.h>
42 #include <dds/DdsDcpsGuidTypeSupportImpl.h>
43 
44 #include <ace/Reactor.h>
45 #include <ace/Auto_Ptr.h>
46 
47 #include <stdexcept>
48 
50 
51 namespace OpenDDS {
52 namespace DCPS {
53 
54 //TBD - add check for enabled in most methods.
55 // currently this is not needed because auto_enable_created_entities
56 // cannot be false.
57 
59  : data_dropped_count_(0)
60  , data_delivered_count_(0)
61  , controlTracker("DataWriterImpl")
62  , n_chunks_(TheServiceParticipant->n_chunks())
63  , association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier())
64  , qos_(TheServiceParticipant->initial_DataWriterQos())
65  , skip_serialize_(false)
66  , db_lock_pool_(new DataBlockLockPool((unsigned long)TheServiceParticipant->n_chunks()))
67  , topic_id_(GUID_UNKNOWN)
68  , topic_servant_(0)
69  , type_support_(0)
70  , listener_mask_(DEFAULT_STATUS_MASK)
71  , domain_id_(0)
72  , publication_id_(GUID_UNKNOWN)
73  , sequence_number_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
74  , coherent_(false)
75  , coherent_samples_(0)
76  , liveliness_lost_(false)
77  , reactor_(0)
78  , liveliness_check_interval_(TimeDuration::max_value)
79  , last_deadline_missed_total_count_(0)
80  , is_bit_(false)
81  , min_suspended_transaction_id_(0)
82  , max_suspended_transaction_id_(0)
83  , liveliness_asserted_(false)
84  , liveness_timer_(make_rch<LivenessTimer>(ref(*this)))
85 {
88 
92 
97 
103 
104  monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this));
105  periodic_monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this));
106 }
107 
108 // This method is called when there are no longer any reference to the
109 // the servant.
111 {
112  DBG_ENTRY_LVL("DataWriterImpl", "~DataWriterImpl", 6);
113 #ifndef OPENDDS_SAFETY_PROFILE
115  if (participant) {
116  XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
117  if (type_lookup_service) {
118  type_lookup_service->remove_guid_from_dynamic_map(publication_id_);
119  }
120  }
121 #endif
122 }
123 
124 // this method is called when delete_datawriter is called.
125 void
127 {
128  // As first step set our listener to nill which will prevent us from calling
129  // back onto the listener at the moment the related DDS entity has been
130  // deleted
132  topic_servant_ = 0;
133  type_support_ = 0;
134 }
135 
136 void
138  TopicImpl* topic_servant,
139  const DDS::DataWriterQos& qos,
140  DDS::DataWriterListener_ptr a_listener,
141  const DDS::StatusMask& mask,
142  WeakRcHandle<DomainParticipantImpl> participant_servant,
143  PublisherImpl* publisher_servant)
144 {
145  DBG_ENTRY_LVL("DataWriterImpl", "init", 6);
146  topic_servant_ = topic_servant;
147  type_support_ = dynamic_cast<TypeSupportImpl*>(topic_servant->get_type_support());
148  topic_name_ = topic_servant_->get_name();
149  topic_id_ = topic_servant_->get_id();
150  type_name_ = topic_servant_->get_type_name();
151 
152 #if !defined (DDS_HAS_MINIMUM_BIT)
154 #endif // !defined (DDS_HAS_MINIMUM_BIT)
155 
156  qos_ = qos;
157  passed_qos_ = qos;
158 
159  set_listener(a_listener, mask);
160 
161  // Only store the participant pointer, since it is our "grand"
162  // parent, we will exist as long as it does.
163  participant_servant_ = participant_servant;
164 
165  RcHandle<DomainParticipantImpl> participant = participant_servant.lock();
166  domain_id_ = participant->get_domain_id();
167 
168  // Only store the publisher pointer, since it is our parent, we will
169  // exist as long as it does.
170  publisher_servant_ = *publisher_servant;
171 
172  this->reactor_ = TheServiceParticipant->timer();
173 }
174 
177 {
178  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
179  return get_entity_instance_handle(publication_id_, participant);
180 }
181 
184 {
185  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
186  if (participant) {
187  return participant->assign_handle();
188  }
189  return DDS::HANDLE_NIL;
190 }
191 
193 {
194  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
195  if (participant) {
196  participant->return_handle(handle);
197  }
198 }
199 
202 {
203  RcHandle<DomainParticipantImpl> participant_servant = participant_servant_.lock();
204  if (participant_servant) {
205  return participant_servant->get_builtin_subscriber_proxy();
206  }
207 
208  return RcHandle<BitSubscriber>();
209 }
210 
211 void
213  const ReaderAssociation& reader,
214  bool active)
215 {
216  DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
217 
218  if (DCPS_debug_level) {
219  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
220  ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
221  LogGuid(yourId).c_str(),
222  LogGuid(reader.readerId).c_str()));
223  }
224 
225  if (get_deleted()) {
226  if (DCPS_debug_level)
227  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
228  ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
229 
230  return;
231  }
232 
233  check_and_set_repo_id(yourId);
234 
235  {
236  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
237  reader_info_.insert(std::make_pair(reader.readerId,
239  TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
242  }
243 
244  if (DCPS_debug_level > 4) {
246  ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
247  ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
248  LogGuid(get_guid()).c_str(),
250  }
251 
252  AssociationData data;
253  data.remote_id_ = reader.readerId;
254  data.remote_data_ = reader.readerTransInfo;
255  data.discovery_locator_ = reader.readerDiscInfo;
258  data.remote_reliable_ =
260  data.remote_durable_ =
262 
263  if (associate(data, active)) {
265  if (observer) {
266  observer->on_associated(this, data.remote_id_);
267  }
268  } else {
269  //FUTURE: inform inforepo and try again as passive peer
270  if (DCPS_debug_level) {
272  ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
273  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
274  }
275  }
276 }
277 
278 void
279 DataWriterImpl::transport_assoc_done(int flags, const GUID_t& remote_id)
280 {
281  DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
282 
283  if (!(flags & ASSOC_OK)) {
284  if (DCPS_debug_level) {
286  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
287  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
288  LogGuid(remote_id).c_str()));
289  }
290 
291  return;
292  }
293 
295 
296  if (DCPS_debug_level) {
298  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
299  ACE_TEXT("writer %C succeeded in associating with reader %C\n"),
300  LogGuid(publication_id_).c_str(),
301  LogGuid(remote_id).c_str()));
302  }
303 
304  if (flags & ASSOC_ACTIVE) {
305 
306  // Have we already received an association_complete() callback?
307  if (DCPS_debug_level) {
309  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
310  ACE_TEXT("writer %C reader %C calling association_complete_i\n"),
311  LogGuid(publication_id_).c_str(),
312  LogGuid(remote_id).c_str()));
313  }
314  association_complete_i(remote_id);
315 
316  } else {
317  // In the current implementation, DataWriter is always active, so this
318  // code will not be applicable.
319  if (DCPS_debug_level) {
321  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
322  ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
323  LogGuid(publication_id_).c_str()));
324  }
325  }
326 }
327 
328 DataWriterImpl::ReaderInfo::ReaderInfo(const char* filterClassName,
329  const char* filter,
330  const DDS::StringSeq& params,
332  bool durable)
333 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
334  : participant_(participant)
335  , filter_class_name_(filterClassName)
336  , filter_(filter)
337  , expression_params_(params)
338  , expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
339  , durable_(durable)
340 {
341  RcHandle<DomainParticipantImpl> part = participant_.lock();
342  if (part && *filter) {
343  eval_ = part->get_filter_eval(filter);
344  }
345 }
346 #else
347  : expected_sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
348  , durable_(durable)
349 {
350  ACE_UNUSED_ARG(filterClassName);
351  ACE_UNUSED_ARG(filter);
352  ACE_UNUSED_ARG(params);
353  ACE_UNUSED_ARG(participant);
354 }
355 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
356 
358 {
359 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
360  eval_ = RcHandle<FilterEvaluator>();
361  RcHandle<DomainParticipantImpl> participant = participant_.lock();
362  if (participant && !filter_.empty()) {
363  participant->deref_filter_eval(filter_.c_str());
364  }
365 
366 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
367 }
368 
369 void
371 {
372  DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
373 
374  bool reader_durable = false;
375 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
376  OPENDDS_STRING filterClassName;
378  DDS::StringSeq expression_params;
379 #endif
380  {
382 
383  if (DCPS_debug_level >= 1) {
385  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
386  ACE_TEXT("bit %d local %C remote %C\n"),
387  is_bit_,
388  LogGuid(this->publication_id_).c_str(),
389  LogGuid(remote_id).c_str()));
390  }
391 
392  if (insert(readers_, remote_id) == -1) {
394  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
395  ACE_TEXT("insert %C from pending failed.\n"),
396  LogGuid(remote_id).c_str()));
397  }
398  }
399  {
400  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
401  RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
402 
403  if (it != reader_info_.end()) {
404  reader_durable = it->second.durable_;
405 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
406  filterClassName = it->second.filter_class_name_;
407  eval = it->second.eval_;
408  expression_params = it->second.expression_params_;
409 #endif
410  }
411  }
412 
413  if (this->monitor_) {
414  this->monitor_->report();
415  }
416 
417  if (!is_bit_) {
418 
419  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
420 
421  if (!participant)
422  return;
423 
424  data_container_->add_reader_acks(remote_id, get_max_sn());
425 
426  const DDS::InstanceHandle_t handle = participant->assign_handle(remote_id);
427 
428  {
429  // protect publication_match_status_ and status changed flags.
431 
432  if (DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
434  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::association_complete_i: ")
435  ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
436  LogGuid(remote_id).c_str(),
437  handle));
438  return;
439 
440  } else if (DCPS_debug_level > 4) {
442  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
443  ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
444  LogGuid(remote_id).c_str(),
445  handle));
446  }
447 
454  }
455 
456  DDS::DataWriterListener_var listener =
458 
459  if (!CORBA::is_nil(listener.in())) {
460 
461  listener->on_publication_matched(this, publication_match_status_);
462 
463  // TBD - why does the spec say to change this but not
464  // change the ChangeFlagStatus after a listener call?
467  }
468 
470  } else {
471  data_container_->add_reader_acks(remote_id, get_max_sn());
472  }
473 
474  // Support DURABILITY QoS
475  if (reader_durable) {
476  // Tell the WriteDataContainer to resend all sending/sent
477  // samples.
478  this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
479 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
480  , filterClassName, eval.in(), expression_params
481 #endif
482  );
483 
484  // Acquire the data writer container lock to avoid deadlock. The
485  // thread calling association_complete() has to acquire lock in the
486  // same order as the write()/register() operation.
487 
488  // Since the thread calling association_complete() is the ORB
489  // thread, it may have some performance penalty. If the
490  // performance is an issue, we may need a new thread to handle the
491  // data_available() calls.
493  guard,
494  this->get_lock());
495 
497  {
498  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
499  // Update the reader's expected sequence
500  SequenceNumber& seq =
501  reader_info_.find(remote_id)->second.expected_sequence_;
502 
503  for (SendStateDataSampleList::iterator list_el = list.begin();
504  list_el != list.end(); ++list_el) {
505  list_el->get_header().historic_sample_ = true;
506 
507  if (list_el->get_header().sequence_ > seq) {
508  seq = list_el->get_header().sequence_;
509  }
510  }
511  }
512 
513  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
514  if (!publisher || publisher->is_suspended()) {
516 
517  } else {
518  if (DCPS_debug_level >= 4) {
519  ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
520  }
521 
523  size_t size = 0;
524  serialized_size(encoding, size, remote_id);
525  Message_Block_Ptr data(
527  get_db_lock()));
528  Serializer ser(data.get(), encoding);
529  ser << remote_id;
530 
532  Message_Block_Ptr end_historic_samples(
534  END_HISTORIC_SAMPLES, header, move(data),
535  SystemTimePoint::now().to_dds_time()));
536 
538  guard.release();
541  SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
542  if (ret == SEND_CONTROL_ERROR) {
543  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
544  ACE_TEXT("DataWriterImpl::association_complete_i: ")
545  ACE_TEXT("send_w_control failed.\n")));
547  }
548  }
549  }
550 }
551 
552 void
554  CORBA::Boolean notify_lost)
555 {
556  if (readers.length() == 0) {
557  return;
558  }
559 
561  if (observer) {
562  for (CORBA::ULong i = 0; i < readers.length(); ++i) {
563  observer->on_disassociated(this, readers[i]);
564  }
565  }
566 
567  if (DCPS_debug_level >= 1) {
569  ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
570  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
571  is_bit_,
572  LogGuid(publication_id_).c_str(),
573  LogGuid(readers[0]).c_str(),
574  readers.length()));
575  }
576 
577  // stop pending associations for these reader ids
578  this->stop_associating(readers.get_buffer(), readers.length());
579 
580  ReaderIdSeq fully_associated_readers;
581  CORBA::ULong fully_associated_len = 0;
582  ReaderIdSeq rds;
583  CORBA::ULong rds_len = 0;
584  DDS::InstanceHandleSeq handles;
585 
587  {
588  // Ensure the same acquisition order as in wait_for_acknowledgments().
590  //Remove the readers from fully associated reader list.
591  //If the supplied reader is not in the cached reader list then it is
592  //already removed. We just need remove the readers in the list that have
593  //not been removed.
594 
595  CORBA::ULong len = readers.length();
596 
597  for (CORBA::ULong i = 0; i < len; ++i) {
598  //Remove the readers from fully associated reader list. If it's not
599  //in there, the association_complete() is not called yet and remove it
600  //from pending list.
601 
602  if (remove(readers_, readers[i]) == 0) {
603  ++ fully_associated_len;
604  fully_associated_readers.length(fully_associated_len);
605  fully_associated_readers [fully_associated_len - 1] = readers[i];
606 
607  ++ rds_len;
608  rds.length(rds_len);
609  rds [rds_len - 1] = readers[i];
610  }
611 
612  data_container_->remove_reader_acks(readers[i]);
613 
614  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
615  reader_info_.erase(readers[i]);
616  //else reader is already removed which indicates remove_association()
617  //is called multiple times.
618  }
619 
620  if (fully_associated_len > 0 && !is_bit_) {
621  // The reader should be in the id_to_handle map at this time
622  this->lookup_instance_handles(fully_associated_readers, handles);
623 
624  for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
625  id_to_handle_map_.erase(fully_associated_readers[i]);
626  }
627  }
628 
629  // Mirror the PUBLICATION_MATCHED_STATUS processing from
630  // association_complete() here.
631  if (!this->is_bit_) {
632 
633  // Derive the change in the number of subscriptions reading this writer.
634  int matchedSubscriptions =
635  static_cast<int>(this->id_to_handle_map_.size());
637  matchedSubscriptions - this->publication_match_status_.current_count;
638 
639  // Only process status if the number of subscriptions has changed.
641  this->publication_match_status_.current_count = matchedSubscriptions;
642 
643  /// Section 7.1.4.1: total_count will not decrement.
644 
645  /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
647  handles[fully_associated_len - 1];
648 
650 
651  DDS::DataWriterListener_var listener =
653 
654  if (!CORBA::is_nil(listener.in())) {
655  listener->on_publication_matched(this, this->publication_match_status_);
656 
657  // Listener consumes the change.
660  }
661 
662  this->notify_status_condition();
663  }
664  }
665  }
666 
667  for (CORBA::ULong i = 0; i < rds.length(); ++i) {
668  this->disassociate(rds[i]);
669  }
670 
671  // If this remove_association is invoked when the InfoRepo
672  // detects a lost reader then make a callback to notify
673  // subscription lost.
674  if (notify_lost && handles.length() > 0) {
675  this->notify_publication_lost(handles);
676  }
677 
678  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
679  for (unsigned int i = 0; i < handles.length(); ++i) {
680  participant->return_handle(handles[i]);
681  }
682 }
683 
685 {
686  DBG_ENTRY_LVL("DataWriterImpl", "replay_durable_data_for", 6);
687 
688  bool reader_durable = false;
689 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
690  OPENDDS_STRING filterClassName;
692  DDS::StringSeq expression_params;
693 #endif
694 
695  {
696  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
697  RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
698 
699  if (it != reader_info_.end()) {
700  reader_durable = it->second.durable_;
701 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
702  filterClassName = it->second.filter_class_name_;
703  eval = it->second.eval_;
704  expression_params = it->second.expression_params_;
705 #endif
706  }
707  }
708 
709  // Support DURABILITY QoS
710  if (reader_durable) {
711  // Tell the WriteDataContainer to resend all sending/sent
712  // samples.
713  this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
714 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
715  , filterClassName, eval.in(), expression_params
716 #endif
717  );
718 
719  // Acquire the data writer container lock to avoid deadlock. The
720  // thread calling association_complete() has to acquire lock in the
721  // same order as the write()/register() operation.
722 
723  // Since the thread calling association_complete() is the ORB
724  // thread, it may have some performance penalty. If the
725  // performance is an issue, we may need a new thread to handle the
726  // data_available() calls.
728  guard,
729  this->get_lock());
730 
732  {
733  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
734  // Update the reader's expected sequence
735  SequenceNumber& seq =
736  reader_info_.find(remote_id)->second.expected_sequence_;
737 
738  for (SendStateDataSampleList::iterator list_el = list.begin();
739  list_el != list.end(); ++list_el) {
740  list_el->get_header().historic_sample_ = true;
741 
742  if (list_el->get_header().sequence_ > seq) {
743  seq = list_el->get_header().sequence_;
744  }
745  }
746  }
747 
748  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
749  if (!publisher || publisher->is_suspended()) {
751 
752  } else {
753  if (DCPS_debug_level >= 4) {
754  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) DataWriterImpl::replay_durable_data_for: Sending historic samples\n")));
755  }
756 
758  size_t size = 0;
759  serialized_size(encoding, size, remote_id);
760  Message_Block_Ptr data(
762  get_db_lock()));
763  Serializer ser(data.get(), encoding);
764  ser << remote_id;
765 
767  Message_Block_Ptr end_historic_samples(create_control_message(END_HISTORIC_SAMPLES, header, move(data),
768  SystemTimePoint::now().to_dds_time()));
769 
771  guard.release();
772  const SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
773  if (ret == SEND_CONTROL_ERROR) {
774  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
775  ACE_TEXT("DataWriterImpl::replay_durable_data_for: ")
776  ACE_TEXT("send_w_control failed.\n")));
778  }
779  }
780  }
781 }
782 
784 {
785  DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
786  // stop pending associations
787  this->stop_associating();
788 
789  ReaderIdSeq readers;
790  CORBA::ULong size;
791  {
793 
794  size = static_cast<CORBA::ULong>(readers_.size());
795  readers.length(size);
796 
797  RepoIdSet::iterator itEnd = readers_.end();
798  int i = 0;
799 
800  for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
801  readers[i ++] = *it;
802  }
803  }
804 
805  try {
806  if (0 < size) {
807  CORBA::Boolean dont_notify_lost = false;
808 
809  this->remove_associations(readers, dont_notify_lost);
810  }
811 
812  } catch (const CORBA::Exception&) {
814  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
815  ACE_TEXT("caught exception from remove_associations.\n")));
816  }
817 
818  transport_stop();
819 }
820 
821 void
823  const GUID_t& writerid,
824  const GUID_t& readerid,
825  const TransportLocatorSeq& locators,
826  DiscoveryListener* listener)
827 {
828  TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
829 }
830 
831 void
833  const GUID_t& writerid,
834  const GUID_t& readerid)
835 {
836  TransportClient::unregister_for_reader(participant, writerid, readerid);
837 }
838 
839 void
841  const TransportLocatorSeq& locators)
842 {
843  {
844  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, reader_info_lock_);
845  RepoIdToReaderInfoMap::const_iterator iter = reader_info_.find(readerId);
846  if (iter == reader_info_.end()) {
847  return;
848  }
849  }
850  TransportClient::update_locators(readerId, locators);
851 }
852 
853 void
855 {
856  DDS::DataWriterListener_var listener =
858 
860 
861 #if 0
862 
864  // This test should make the method idempotent.
865  return;
866  }
867 
868 #endif
869 
871 
872  // copy status and increment change
875  status.count_since_last_send;
878 
879  if (!CORBA::is_nil(listener.in())) {
880  listener->on_offered_incompatible_qos(this, offered_incompatible_qos_status_);
881 
882  // TBD - Why does the spec say to change this but not change the
883  // ChangeFlagStatus after a listener call?
885  }
886 
888 }
889 
890 void
892  const DDS::StringSeq& params)
893 {
894 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
895  ACE_UNUSED_ARG(readerId);
896  ACE_UNUSED_ARG(params);
897 #else
899  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
900  RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
901 
902  if (iter != reader_info_.end()) {
903  iter->second.expression_params_ = params;
904 
905  } else if (DCPS_debug_level > 4 &&
906  TheServiceParticipant->publisher_content_filter()) {
908  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
909  ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
910  LogGuid(this->publication_id_).c_str(), LogGuid(readerId).c_str()));
911  }
912 
913 #endif
914 }
915 
917 {
923 
924  DDS::DataWriterQos new_qos = qos;
926  if (Qos_Helper::valid(new_qos) && Qos_Helper::consistent(new_qos)) {
927  if (qos_ == new_qos)
928  return DDS::RETCODE_OK;
929 
930  if (enabled_) {
931  if (!Qos_Helper::changeable(qos_, new_qos)) {
933  }
934 
935  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
936  DDS::PublisherQos publisherQos;
937  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
938 
939  bool status = false;
940  if (publisher) {
941  publisher->get_qos(publisherQos);
942  status
943  = disco->update_publication_qos(domain_id_,
944  dp_id_,
945  this->publication_id_,
946  new_qos,
947  publisherQos);
948  }
949  if (!status) {
951  ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
952  ACE_TEXT("qos not updated.\n")),
954  }
955 
956  if (!(qos_ == new_qos)) {
957  data_container_->set_deadline_period(TimeDuration(qos.deadline.period));
958  qos_ = new_qos;
959  }
960  }
961 
962  qos_ = new_qos;
963  passed_qos_ = qos;
964 
966  if (observer) {
967  observer->on_qos_changed(this);
968  }
969 
970  return DDS::RETCODE_OK;
971 
972  } else {
974  }
975 }
976 
979 {
980  qos = passed_qos_;
981  return DDS::RETCODE_OK;
982 }
983 
985 DataWriterImpl::set_listener(DDS::DataWriterListener_ptr a_listener,
986  DDS::StatusMask mask)
987 {
989  listener_mask_ = mask;
990  //note: OK to duplicate a nil object ref
991  listener_ = DDS::DataWriterListener::_duplicate(a_listener);
992  return DDS::RETCODE_OK;
993 }
994 
995 DDS::DataWriterListener_ptr
997 {
999  return DDS::DataWriterListener::_duplicate(listener_.in());
1000 }
1001 
1002 DataWriterListener_ptr
1004 {
1006  return DataWriterListener::_narrow(listener_.in());
1007 }
1008 
1009 DDS::Topic_ptr
1011 {
1012  return DDS::Topic::_duplicate(topic_servant_.get());
1013 }
1014 
1015 bool
1017 {
1018  // N.B. It may be worthwhile to investigate a more efficient
1019  // heuristic for determining if a writer should send SAMPLE_ACK
1020  // control samples. Perhaps based on a sequence number delta?
1021  return this->readers_.size() != 0;
1022 }
1023 
1026 {
1027  const SequenceNumber sn = get_max_sn();
1028  if (DCPS_debug_level > 0) {
1030  ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
1031  ACE_TEXT("for sequence %q\n"),
1032  sn.getValue()));
1033  }
1034  return AckToken(max_wait, sn);
1035 }
1036 
1037 
1038 
1041 {
1043  guard,
1044  get_lock(),
1046 
1047 
1048  DataSampleElement* element = 0;
1049  DDS::ReturnCode_t ret = this->data_container_->obtain_buffer_for_control(element);
1050 
1051  if (ret != DDS::RETCODE_OK) {
1053  ACE_TEXT("(%P|%t) ERROR: ")
1054  ACE_TEXT("DataWriterImpl::send_request_ack: ")
1055  ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1056  ret),
1057  ret);
1058  }
1059 
1060  Message_Block_Ptr blk;
1061  // Add header with the registration sample data.
1062  Message_Block_Ptr sample(
1064  REQUEST_ACK,
1065  element->get_header(),
1066  move(blk),
1068 
1069  element->set_sample(move(sample));
1070 
1071  ret = this->data_container_->enqueue_control(element);
1072 
1073  if (ret != DDS::RETCODE_OK) {
1074  data_container_->release_buffer(element);
1076  ACE_TEXT("(%P|%t) ERROR: ")
1077  ACE_TEXT("DataWriterImpl::send_request_ack: ")
1078  ACE_TEXT("enqueue_control failed.\n")),
1079  ret);
1080  }
1081 
1082 
1084 
1085  return DDS::RETCODE_OK;
1086 }
1087 
1090 {
1092  return DDS::RETCODE_OK;
1093 
1095 
1096  if (ret != DDS::RETCODE_OK)
1097  return ret;
1098 
1099  DataWriterImpl::AckToken token = create_ack_token(max_wait);
1100  if (DCPS_debug_level) {
1101  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
1102  ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
1103  token.sequence_.getValue()));
1104  }
1105  return wait_for_specific_ack(token);
1106 }
1107 
1110 {
1111  return this->data_container_->wait_ack_of_seq(token.deadline(), token.deadline_is_infinite(), token.sequence_);
1112 }
1113 
1114 DDS::Publisher_ptr
1116 {
1117  return publisher_servant_.lock()._retn();
1118 }
1119 
1122  DDS::LivelinessLostStatus & status)
1123 {
1125  guard,
1126  this->lock_,
1129  status = liveliness_lost_status_;
1131  return DDS::RETCODE_OK;
1132 }
1133 
1137 {
1139  guard,
1140  this->lock_,
1142 
1144 
1148 
1149  // Update for next status check.
1152 
1154 
1156 
1157  return DDS::RETCODE_OK;
1158 }
1159 
1163 {
1165  guard,
1166  this->lock_,
1171  return DDS::RETCODE_OK;
1172 }
1173 
1177 {
1179  guard,
1180  this->lock_,
1183  status = publication_match_status_;
1186  return DDS::RETCODE_OK;
1187 }
1188 
1191 {
1192  switch (this->qos_.liveliness.kind) {
1194  // Do nothing.
1195  break;
1197  {
1198  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1199  if (participant) {
1200  return participant->assert_liveliness();
1201  }
1202  }
1203  break;
1206  return DDS::RETCODE_ERROR;
1207  }
1208  break;
1209  }
1210 
1211  return DDS::RETCODE_OK;
1212 }
1213 
1216 {
1217  // This operation is called by participant.
1219  // Set a flag indicating that we should send a liveliness message on the timer if necessary.
1220  liveliness_asserted_ = true;
1221  }
1222 
1223  return DDS::RETCODE_OK;
1224 }
1225 
1228 {
1229  if (this->qos_.liveliness.kind == kind) {
1231  } else {
1232  return TimeDuration::max_value;
1233  }
1234 }
1235 
1236 bool
1238 {
1241  return last_liveliness_activity_time_ > tv;
1242  } else {
1243  return false;
1244  }
1245 }
1246 
1249  DDS::InstanceHandleSeq & subscription_handles)
1250 {
1251  if (!enabled_) {
1253  ACE_TEXT("(%P|%t) ERROR: ")
1254  ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
1255  ACE_TEXT(" Entity is not enabled.\n")),
1257  }
1258 
1260  guard,
1261  this->lock_,
1263 
1264  // Copy out the handles for the current set of subscriptions.
1265  int index = 0;
1266  subscription_handles.length(
1267  static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
1268 
1269  for (RepoIdToHandleMap::iterator
1270  current = this->id_to_handle_map_.begin();
1271  current != this->id_to_handle_map_.end();
1272  ++current, ++index) {
1273  subscription_handles[index] = current->second;
1274  }
1275 
1276  return DDS::RETCODE_OK;
1277 }
1278 
1279 #if !defined (DDS_HAS_MINIMUM_BIT)
1282  DDS::SubscriptionBuiltinTopicData & subscription_data,
1283  DDS::InstanceHandle_t subscription_handle)
1284 {
1285  if (!enabled_) {
1287  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
1288  ACE_TEXT("get_matched_subscription_data: ")
1289  ACE_TEXT("Entity is not enabled.\n")),
1291  }
1292  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1293 
1295  DDS::SubscriptionBuiltinTopicDataSeq data;
1296 
1297  if (participant) {
1298  ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>(
1299  participant.in(),
1301  subscription_handle,
1302  data);
1303  }
1304 
1305  if (ret == DDS::RETCODE_OK) {
1306  subscription_data = data[0];
1307  }
1308 
1309  return ret;
1310 }
1311 #endif // !defined (DDS_HAS_MINIMUM_BIT)
1312 
1315 {
1316  //According spec:
1317  // - Calling enable on an already enabled Entity returns OK and has no
1318  // effect.
1319  // - Calling enable on an Entity whose factory is not enabled will fail
1320  // and return PRECONDITION_NOT_MET.
1321 
1322  if (this->is_enabled()) {
1323  return DDS::RETCODE_OK;
1324  }
1325 
1326  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
1327  if (!publisher || !publisher->is_enabled()) {
1329  }
1330 
1331  if (!topic_servant_->is_enabled()) {
1333  }
1334 
1336  if (participant) {
1337  dp_id_ = participant->get_id();
1338  }
1339 
1340  // Note: do configuration based on QoS in enable() because
1341  // before enable is called the QoS can be changed -- even
1342  // for Changeable=NO
1343 
1344  // Configure WriteDataContainer constructor parameters from qos.
1345 
1346  const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
1347 
1348  CORBA::Long const max_samples_per_instance =
1351 
1352  CORBA::Long max_instances = 0, max_total_samples = 0;
1353 
1356 
1360  (qos_.resource_limits.max_instances * max_samples_per_instance))) {
1361  max_total_samples = reliable ? qos_.resource_limits.max_samples : 0;
1362  }
1363  }
1364 
1366  max_instances = qos_.resource_limits.max_instances;
1367 
1368  const CORBA::Long history_depth =
1371 
1372  const CORBA::Long max_durable_per_instance =
1373  qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? 0 : history_depth;
1374 
1375 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1376  // Get data durability cache if DataWriter QoS requires durable
1377  // samples. Publisher servant retains ownership of the cache.
1378  DataDurabilityCache* const durability_cache =
1379  TheServiceParticipant->get_data_durability_cache(qos_.durability);
1380 #endif
1381 
1382  //Note: the QoS used to set n_chunks_ is Changeable=No so
1383  // it is OK that we cannot change the size of our allocators.
1385  new WriteDataContainer(
1386  this,
1387  max_samples_per_instance,
1388  history_depth,
1389  max_durable_per_instance,
1391  n_chunks_,
1392  domain_id_,
1393  topic_name_,
1394  get_type_name(),
1395 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1396  durability_cache,
1398 #endif
1399  max_instances,
1400  max_total_samples,
1401  lock_,
1404  keep_count());
1405 
1406  // +1 because we might allocate one before releasing another
1407  // TBD - see if this +1 can be removed.
1411 
1412  if (DCPS_debug_level >= 2) {
1414  "(%P|%t) DataWriterImpl::enable-mb"
1415  " Cached_Allocator_With_Overflow %x with %B chunks\n",
1416  mb_allocator_.get(),
1417  n_chunks_));
1418 
1420  "(%P|%t) DataWriterImpl::enable-db"
1421  " Cached_Allocator_With_Overflow %x with %B chunks\n",
1422  db_allocator_.get(),
1423  n_chunks_));
1424 
1426  "(%P|%t) DataWriterImpl::enable-header"
1427  " Cached_Allocator_With_Overflow %x with %B chunks\n",
1428  header_allocator_.get(),
1429  n_chunks_));
1430  }
1431 
1434  // Must be at least 1 micro second.
1435  liveliness_check_interval_ = std::max(
1436  TimeDuration(qos_.liveliness.lease_duration) * (TheServiceParticipant->liveliness_factor() / 100.0),
1437  TimeDuration(0, 1));
1438 
1440  0,
1442  liveliness_check_interval_.value()) == -1) {
1444  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
1445  ACE_TEXT("schedule_timer")));
1446 
1447  }
1448  }
1449 
1450  if (!participant) {
1451  return DDS::RETCODE_ERROR;
1452  }
1453 
1454  participant->add_adjust_liveliness_timers(this);
1455 
1456  data_container_->set_deadline_period(TimeDuration(qos_.deadline.period));
1457 
1458  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
1459  disco->pre_writer(this);
1460 
1461  this->set_enabled();
1462 
1463  try {
1464  this->enable_transport(reliable,
1466 
1467  } catch (const Transport::Exception&) {
1469  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
1470  ACE_TEXT("Transport Exception.\n")));
1471  data_container_->shutdown_ = true;
1472  return DDS::RETCODE_ERROR;
1473  }
1474 
1475  // Must be done after transport enabled.
1477  if (!topic_servant_->check_data_representation(qos_.representation.value, true)) {
1478  data_container_->shutdown_ = true;
1479  return DDS::RETCODE_ERROR;
1480  }
1481 
1482  // Done after enable_transport so we know its swap_bytes.
1483  const DDS::ReturnCode_t setup_serialization_result = setup_serialization();
1484  if (setup_serialization_result != DDS::RETCODE_OK) {
1485  data_container_->shutdown_ = true;
1486  return setup_serialization_result;
1487  }
1488 
1489  const TransportLocatorSeq& trans_conf_info = connection_info();
1490  DDS::PublisherQos pub_qos;
1491  publisher->get_qos(pub_qos);
1492 
1493  XTypes::TypeInformation type_info;
1494  type_support_->to_type_info(type_info);
1495 
1496  XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
1497  type_support_->add_types(type_lookup_service);
1498 
1499  const GUID_t publication_id =
1500  disco->add_publication(this->domain_id_,
1501  this->dp_id_,
1502  this->topic_servant_->get_id(),
1503  rchandle_from(this),
1504  this->qos_,
1505  trans_conf_info,
1506  pub_qos,
1507  type_info);
1508 
1510  publication_id_ = publication_id;
1511 
1512  if (publication_id_ == GUID_UNKNOWN) {
1513  if (DCPS_debug_level >= 1) {
1514  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: DataWriterImpl::enable: "
1515  "add_publication failed\n"));
1516  }
1517  data_container_->shutdown_ = true;
1518  return DDS::RETCODE_ERROR;
1519  }
1520 
1521 #if defined(OPENDDS_SECURITY)
1522  security_config_ = participant->get_security_config();
1523  participant_permissions_handle_ = participant->permissions_handle();
1525 #endif
1526 
1527  if (DCPS_debug_level >= 2) {
1528  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::enable: "
1529  "got GUID %C, publishing to topic name \"%C\" type \"%C\"\n",
1530  LogGuid(publication_id_).c_str(),
1531  topic_servant_->topic_name(), topic_servant_->type_name()));
1532  }
1533 
1534  this->data_container_->publication_id_ = this->publication_id_;
1535 
1536  guard.release();
1537 
1538  const DDS::ReturnCode_t writer_enabled_result =
1539  publisher->writer_enabled(topic_name_.in(), this);
1540 
1541  if (this->monitor_) {
1542  this->monitor_->report();
1543  }
1544 
1545 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1546 
1547  // Move cached data from the durability cache to the unsent data
1548  // queue.
1549  if (durability_cache != 0) {
1550 
1551  if (!durability_cache->get_data(this->domain_id_,
1552  this->topic_name_,
1553  get_type_name(),
1554  this,
1555  this->mb_allocator_.get(),
1556  this->db_allocator_.get(),
1557  this->qos_.lifespan)) {
1559  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
1560  ACE_TEXT("unable to retrieve durable data\n")));
1561  }
1562  }
1563 
1564 #endif
1565 
1566  if (writer_enabled_result == DDS::RETCODE_OK) {
1567  const Observer_rch observer = get_observer(Observer::e_ENABLED);
1568  if (observer) {
1569  observer->on_enabled(this);
1570  }
1571  }
1572 
1573  return writer_enabled_result;
1574 }
1575 
1576 void
1578 {
1579  DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
1580 
1582 
1583  ACE_UINT64 transaction_id = this->get_unsent_data(list);
1584 
1586 
1587  //need to release guard to call down to transport
1588  guard.release();
1589 
1590  this->send(list, transaction_id);
1591 }
1592 
1595  Message_Block_Ptr data,
1596  const DDS::Time_t& source_timestamp)
1597 {
1598  DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
1599 
1600  if (!enabled_) {
1602  ACE_TEXT("(%P|%t) ERROR: ")
1603  ACE_TEXT("DataWriterImpl::register_instance_i: ")
1604  ACE_TEXT("Entity is not enabled.\n")),
1606  }
1607 
1608  DDS::ReturnCode_t ret = data_container_->register_instance(handle, data);
1609  if (ret != DDS::RETCODE_OK) {
1611  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
1612  ACE_TEXT("register instance with container failed, returned <%C>.\n"),
1613  retcode_to_string(ret)),
1614  ret);
1615  }
1616 
1617  if (this->monitor_) {
1618  this->monitor_->report();
1619  }
1620 
1621  DataSampleElement* element = 0;
1622  ret = this->data_container_->obtain_buffer_for_control(element);
1623  if (ret != DDS::RETCODE_OK) {
1625  ACE_TEXT("(%P|%t) ERROR: ")
1626  ACE_TEXT("DataWriterImpl::register_instance_i: ")
1627  ACE_TEXT("obtain_buffer_for_control failed, returned <%C>.\n"),
1628  retcode_to_string(ret)),
1629  ret);
1630  }
1631 
1632  // Add header with the registration sample data.
1633  Message_Block_Ptr sample(
1636  element->get_header(),
1637  move(data),
1638  source_timestamp));
1639 
1640  element->set_sample(move(sample));
1641 
1642  ret = this->data_container_->enqueue_control(element);
1643 
1644  if (ret != DDS::RETCODE_OK) {
1645  data_container_->release_buffer(element);
1647  ACE_TEXT("(%P|%t) ERROR: ")
1648  ACE_TEXT("DataWriterImpl::register_instance_i: ")
1649  ACE_TEXT("enqueue_control failed, returned <%C>\n"),
1650  retcode_to_string(ret)),
1651  ret);
1652  }
1653 
1654  return ret;
1655 }
1656 
1659  DDS::InstanceHandle_t& handle,
1660  Message_Block_Ptr data,
1661  const DDS::Time_t& source_timestamp)
1662 {
1663  DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
1664 
1666  guard,
1667  get_lock(),
1669 
1670  const DDS::ReturnCode_t ret = register_instance_i(handle, move(data), source_timestamp);
1671  if (ret != DDS::RETCODE_OK) {
1673  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
1674  ACE_TEXT("register instance with container failed, returned <%C>.\n"),
1675  retcode_to_string(ret)),
1676  ret);
1677  }
1678 
1680 
1681  return ret;
1682 }
1683 
1686  const DDS::Time_t& source_timestamp)
1687 {
1688  DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
1689 
1690  if (!enabled_) {
1692  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
1693  ACE_TEXT("Entity is not enabled.\n")),
1695  }
1696 
1697  // According to spec 1.2, autodispose_unregistered_instances true causes
1698  // dispose on the instance prior to calling unregister operation.
1700  return this->dispose_and_unregister(handle, source_timestamp);
1701  }
1702 
1705  Message_Block_Ptr unregistered_sample_data;
1706  ret = this->data_container_->unregister(handle, unregistered_sample_data);
1707 
1708  if (ret != DDS::RETCODE_OK) {
1710  ACE_TEXT("(%P|%t) ERROR: ")
1711  ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1712  ACE_TEXT("unregister with container failed.\n")),
1713  ret);
1714  }
1715 
1716  DataSampleElement* element = 0;
1717  ret = this->data_container_->obtain_buffer_for_control(element);
1718 
1719  if (ret != DDS::RETCODE_OK) {
1721  ACE_TEXT("(%P|%t) ERROR: ")
1722  ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1723  ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1724  ret),
1725  ret);
1726  }
1727 
1729  element->get_header(),
1730  move(unregistered_sample_data),
1731  source_timestamp));
1732  element->set_sample(move(sample));
1733 
1734  ret = this->data_container_->enqueue_control(element);
1735 
1736  if (ret != DDS::RETCODE_OK) {
1737  data_container_->release_buffer(element);
1739  ACE_TEXT("(%P|%t) ERROR: ")
1740  ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1741  ACE_TEXT("enqueue_control failed.\n")),
1742  ret);
1743  }
1744 
1746  return DDS::RETCODE_OK;
1747 }
1748 
1751  const DDS::Time_t& source_timestamp)
1752 {
1753  DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
1754 
1757 
1758  Message_Block_Ptr data_sample;
1759  ret = this->data_container_->dispose(handle, data_sample);
1760 
1761  if (ret != DDS::RETCODE_OK) {
1763  ACE_TEXT("(%P|%t) ERROR: ")
1764  ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1765  ACE_TEXT("dispose on container failed.\n")),
1766  ret);
1767  }
1768 
1769  ret = this->data_container_->unregister(handle, data_sample, false);
1770 
1771  if (ret != DDS::RETCODE_OK) {
1773  ACE_TEXT("(%P|%t) ERROR: ")
1774  ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1775  ACE_TEXT("unregister with container failed.\n")),
1776  ret);
1777  }
1778 
1779  DataSampleElement* element = 0;
1780  ret = this->data_container_->obtain_buffer_for_control(element);
1781 
1782  if (ret != DDS::RETCODE_OK) {
1784  ACE_TEXT("(%P|%t) ERROR: ")
1785  ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1786  ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1787  ret),
1788  ret);
1789  }
1790 
1792  element->get_header(),
1793  move(data_sample),
1794  source_timestamp));
1795  element->set_sample(move(sample));
1796 
1797  ret = this->data_container_->enqueue_control(element);
1798 
1799  if (ret != DDS::RETCODE_OK) {
1800  data_container_->release_buffer(element);
1802  ACE_TEXT("(%P|%t) ERROR: ")
1803  ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1804  ACE_TEXT("enqueue_control failed.\n")),
1805  ret);
1806  }
1807 
1809  return DDS::RETCODE_OK;
1810 }
1811 
1812 void
1814 {
1816 
1817  while (!this->data_container_->instances_.empty()) {
1818  this->unregister_instance_i(this->data_container_->instances_.begin()->first, source_timestamp);
1819  }
1820 }
1821 
1824  DDS::InstanceHandle_t handle,
1825  const DDS::Time_t& source_timestamp,
1827  const void* real_data)
1828 {
1829  DBG_ENTRY_LVL("DataWriterImpl","write",6);
1830 
1832 
1833  // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
1834  GUIDSeq_var filter_out_var(filter_out);
1835 
1836  if (!enabled_) {
1838  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
1839  ACE_TEXT("Entity is not enabled.\n")),
1841  }
1842 
1844  dc_guard,
1845  get_lock(),
1847 
1848  DataSampleElement* element = 0;
1849  DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
1850 
1851  if (ret == DDS::RETCODE_TIMEOUT) {
1852  return ret; // silent for timeout
1853 
1854  } else if (ret != DDS::RETCODE_OK) {
1856  ACE_TEXT("(%P|%t) ERROR: ")
1857  ACE_TEXT("DataWriterImpl::write: ")
1858  ACE_TEXT("obtain_buffer returned %d.\n"),
1859  ret),
1860  ret);
1861  }
1862 
1863  Message_Block_Ptr temp;
1864  ret = create_sample_data_message(move(data),
1865  handle,
1866  element->get_header(),
1867  temp,
1868  source_timestamp,
1869  (filter_out != 0));
1870  element->set_sample(move(temp));
1871 
1872  if (ret != DDS::RETCODE_OK) {
1873  data_container_->release_buffer(element);
1874  return ret;
1875  }
1876 
1877  element->set_filter_out(filter_out_var._retn()); // ownership passed to element
1878 
1879  ret = this->data_container_->enqueue(element, handle);
1880 
1881  if (ret != DDS::RETCODE_OK) {
1882  data_container_->release_buffer(element);
1884  ACE_TEXT("(%P|%t) ERROR: ")
1885  ACE_TEXT("DataWriterImpl::write: ")
1886  ACE_TEXT("enqueue failed.\n")),
1887  ret);
1888  }
1890 
1891  track_sequence_number(filter_out);
1892 
1893  if (this->coherent_) {
1894  ++this->coherent_samples_;
1895  }
1897 
1898  ACE_UINT64 transaction_id = this->get_unsent_data(list);
1899 
1900  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
1901  if (!publisher || publisher->is_suspended()) {
1902  if (min_suspended_transaction_id_ == 0) {
1903  //provides transaction id for lower bound of suspended transactions
1904  //or transaction id for single suspended write transaction
1905  min_suspended_transaction_id_ = transaction_id;
1906  } else {
1907  //when multiple write transactions have suspended, provides the upper bound
1908  //for suspended transactions.
1909  max_suspended_transaction_id_ = transaction_id;
1910  }
1911  this->available_data_list_.enqueue_tail(list);
1912 
1913  } else {
1914  dc_guard.release();
1915  guard.release();
1916  this->send(list, transaction_id);
1917  }
1918 
1919  const ValueDispatcher* vd = get_value_dispatcher();
1921  if (observer && real_data && vd) {
1922  Observer::Sample s(handle, element->get_header().instance_state(), source_timestamp, element->get_header().sequence_, real_data, *vd);
1923  observer->on_sample_sent(this, s);
1924  }
1925 
1926  return DDS::RETCODE_OK;
1927 }
1928 
1929 void
1931 {
1932  const SequenceNumber sn = get_max_sn();
1933  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
1934 
1935 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1936  // Track individual expected sequence numbers in ReaderInfo
1937  RepoIdSet excluded;
1938 
1939  if (filter_out && !reader_info_.empty()) {
1940  const GUID_t* buf = filter_out->get_buffer();
1941  excluded.insert(buf, buf + filter_out->length());
1942  }
1943 
1944  for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
1945  end = reader_info_.end(); iter != end; ++iter) {
1946  // If not excluding this reader, update expected sequence
1947  if (excluded.count(iter->first) == 0) {
1948  iter->second.expected_sequence_ = sn;
1949  }
1950  }
1951 
1952 #else
1953  ACE_UNUSED_ARG(filter_out);
1954  for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
1955  end = reader_info_.end(); iter != end; ++iter) {
1956  iter->second.expected_sequence_ = sn;
1957  }
1958 
1959 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
1960 
1961 }
1962 
1963 void
1965 {
1966  //this serves to get TransportClient's max_transaction_id_seen_
1967  //to the correct value for this list of transactions
1968  if (max_suspended_transaction_id_ != 0) {
1971  }
1972 
1973  //this serves to actually have the send proceed in
1974  //sending the samples to the datalinks by passing it
1975  //the min_suspended_transaction_id_ which should be the
1976  //TransportClient's expected_transaction_id_
1979  this->available_data_list_.reset();
1980 }
1981 
1984  const DDS::Time_t & source_timestamp)
1985 {
1986  DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
1987 
1988  if (!enabled_) {
1990  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
1991  ACE_TEXT("Entity is not enabled.\n")),
1993  }
1994 
1996 
1998 
1999  Message_Block_Ptr registered_sample_data;
2000  ret = this->data_container_->dispose(handle, registered_sample_data);
2001 
2002  if (ret != DDS::RETCODE_OK) {
2004  ACE_TEXT("(%P|%t) ERROR: ")
2005  ACE_TEXT("DataWriterImpl::dispose: ")
2006  ACE_TEXT("dispose failed.\n")),
2007  ret);
2008  }
2009 
2010  DataSampleElement* element = 0;
2011  ret = this->data_container_->obtain_buffer_for_control(element);
2012 
2013  if (ret != DDS::RETCODE_OK) {
2015  ACE_TEXT("(%P|%t) ERROR: ")
2016  ACE_TEXT("DataWriterImpl::dispose: ")
2017  ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
2018  ret),
2019  ret);
2020  }
2021 
2023  element->get_header(),
2024  move(registered_sample_data),
2025  source_timestamp));
2026  element->set_sample(move(sample));
2027 
2028  ret = this->data_container_->enqueue_control(element);
2029 
2030  if (ret != DDS::RETCODE_OK) {
2031  data_container_->release_buffer(element);
2033  ACE_TEXT("(%P|%t) ERROR: ")
2034  ACE_TEXT("DataWriterImpl::dispose: ")
2035  ACE_TEXT("enqueue_control failed.\n")),
2036  ret);
2037  }
2038 
2040 
2041  return DDS::RETCODE_OK;
2042 }
2043 
2046  size_t& size)
2047 {
2048  return data_container_->num_samples(handle, size);
2049 }
2050 
2051 void
2053 {
2054  data_container_->unregister_all();
2055 }
2056 
2057 GUID_t
2059 {
2060  return dp_id_;
2061 }
2062 
2063 char const *
2065 {
2066  return type_name_.in();
2067 }
2068 
2071  DataSampleHeader& header_data,
2072  Message_Block_Ptr data,
2073  const DDS::Time_t& source_timestamp)
2074 {
2075  header_data.message_id_ = message_id;
2076  header_data.byte_order_ =
2078  header_data.coherent_change_ = false;
2079 
2080  if (data) {
2081  header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
2082  }
2083 
2085  header_data.sequence_repair_ = false; // set below
2086  header_data.source_timestamp_sec_ = source_timestamp.sec;
2087  header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
2088  header_data.publication_id_ = publication_id_;
2089 
2090  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2091  if (!publisher) {
2092  return 0;
2093  }
2094 
2095  header_data.publisher_id_ = publisher->publisher_id_;
2096 
2098  SequenceNumber sequence = sequence_number_;
2099  if (message_id == INSTANCE_REGISTRATION
2100  || message_id == DISPOSE_INSTANCE
2101  || message_id == UNREGISTER_INSTANCE
2102  || message_id == DISPOSE_UNREGISTER_INSTANCE
2103  || message_id == REQUEST_ACK) {
2104 
2105  header_data.sequence_repair_ = need_sequence_repair();
2106  header_data.sequence_ = get_next_sn_i();
2107  header_data.key_fields_only_ = true;
2108  sequence = sequence_number_;
2109  }
2110  guard.release();
2111 
2112  ACE_Message_Block* message = 0;
2113  ACE_NEW_MALLOC_RETURN(message,
2114  static_cast<ACE_Message_Block*>(
2115  mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2119  header_data.message_length_ ? data.release() : 0, //cont
2120  0, //data
2121  0, //allocator_strategy
2122  get_db_lock(), //locking_strategy
2126  db_allocator_.get(),
2127  mb_allocator_.get()),
2128  0);
2129 
2130  *message << header_data;
2131 
2132  // If we incremented sequence number for this control message
2133  if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
2134  ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
2135  // Update the expected sequence number for all readers
2136  RepoIdToReaderInfoMap::iterator reader;
2137 
2138  for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
2139  reader->second.expected_sequence_ = sequence;
2140  }
2141  }
2142  if (DCPS_debug_level >= 4) {
2144  ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
2145  ACE_TEXT("from publication %C sending control sample: %C .\n"),
2146  LogGuid(publication_id_).c_str(),
2147  to_string(header_data).c_str()));
2148  }
2149  return message;
2150 }
2151 
2154  DDS::InstanceHandle_t instance_handle,
2155  DataSampleHeader& header_data,
2156  Message_Block_Ptr& message,
2157  const DDS::Time_t& source_timestamp,
2158  bool content_filter)
2159 {
2160  PublicationInstance_rch instance =
2161  data_container_->get_handle_instance(instance_handle);
2162 
2163  if (0 == instance) {
2165  ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
2166  ACE_TEXT("failed to find instance for handle %d\n"),
2167  instance_handle),
2169  }
2170 
2171  header_data.message_id_ = SAMPLE_DATA;
2172  header_data.byte_order_ =
2174  header_data.coherent_change_ = this->coherent_;
2175 
2176  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2177 
2178  if (!publisher) {
2179  return DDS::RETCODE_ERROR;
2180  }
2181 
2182 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2183  header_data.group_coherent_ =
2184  publisher->qos_.presentation.access_scope
2186 #endif
2187  header_data.content_filter_ = content_filter;
2188  header_data.cdr_encapsulation_ = this->cdr_encapsulation();
2189  header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
2190  {
2192  header_data.sequence_repair_ = need_sequence_repair();
2193  header_data.sequence_ = get_next_sn_i();
2194  }
2195  header_data.source_timestamp_sec_ = source_timestamp.sec;
2196  header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
2197 
2200  header_data.lifespan_duration_ = true;
2203  }
2204 
2205  header_data.publication_id_ = publication_id_;
2206  header_data.publisher_id_ = publisher->publisher_id_;
2207 
2208  ACE_Message_Block* tmp_message;
2209  ACE_NEW_MALLOC_RETURN(tmp_message,
2210  static_cast<ACE_Message_Block*>(
2211  mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2214  data.release(), //cont
2215  0, //data
2216  header_allocator_.get(), //alloc_strategy
2217  get_db_lock(), //locking_strategy
2221  db_allocator_.get(),
2222  mb_allocator_.get()),
2224  message.reset(tmp_message);
2225  *message << header_data;
2226  if (DCPS_debug_level >= 4) {
2228  ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
2229  ACE_TEXT("from publication %C sending data sample: %C .\n"),
2230  LogGuid(publication_id_).c_str(),
2231  to_string(header_data).c_str()));
2232  }
2233  return DDS::RETCODE_OK;
2234 }
2235 
2236 void
2238 {
2239  DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
2240 
2241  if (!(sample->get_pub_id() == this->publication_id_)) {
2243  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
2244  ACE_TEXT("The publication id %C from delivered element ")
2245  ACE_TEXT("does not match the datawriter's id %C\n"),
2246  LogGuid(sample->get_pub_id()).c_str(),
2248  return;
2249  }
2250  //provided for statistics tracking in tests
2252 
2253  this->data_container_->data_delivered(sample);
2254 }
2255 
2256 void
2258 {
2259  DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
2261 }
2262 
2265 {
2266  return this->publisher_servant_.lock();
2267 }
2268 
2269 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
2270 bool
2272  const OPENDDS_STRING& filterClassName,
2273  const FilterEvaluator& evaluator,
2274  const DDS::StringSeq& expression_params) const
2275 {
2276  if (!type_support_) {
2277  if (log_level >= LogLevel::Error) {
2278  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::filter_out: Could not cast type support, not filtering\n"));
2279  }
2280  return false;
2281  }
2282 
2283  if (filterClassName == "DDSSQL" ||
2284  filterClassName == "OPENDDSSQL") {
2285  if (!elt.get_header().valid_data() && evaluator.has_non_key_fields(*type_support_)) {
2286  return true;
2287  }
2288  try {
2289  return !evaluator.eval(elt.get_sample()->cont(), encoding_mode_.encoding(),
2290  *type_support_, expression_params);
2291  } catch (const std::runtime_error&) {
2292  // if the eval fails, the throws will do the logging
2293  // return false here so that the sample is not filtered
2294  return false;
2295  }
2296  } else {
2297  return false;
2298  }
2299 }
2300 #endif
2301 
2302 bool
2304 {
2305  // DataWriter does not impose any constraints on which transports
2306  // may be used based on QoS.
2307  return true;
2308 }
2309 
2310 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2311 
2312 bool
2314 {
2316  guard,
2317  get_lock(),
2318  false);
2319 
2320  return this->coherent_;
2321 }
2322 
2323 void
2325 {
2327  guard,
2328  get_lock());
2329 
2330  this->coherent_ = true;
2331 }
2332 
2333 void
2334 DataWriterImpl::end_coherent_changes(const GroupCoherentSamples& group_samples)
2335 {
2336  // PublisherImpl::pi_lock_ should be held.
2338  guard,
2339  get_lock());
2340 
2341  CoherentChangeControl end_msg;
2344 
2345  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2346 
2347  if (publisher) {
2348  end_msg.group_coherent_
2349  = publisher->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
2350  }
2351 
2352  if (publisher && end_msg.group_coherent_) {
2353  end_msg.publisher_id_ = publisher->publisher_id_;
2354  end_msg.group_coherent_samples_ = group_samples;
2355  }
2356 
2357  Message_Block_Ptr data(
2358  new ACE_Message_Block(
2359  end_msg.get_max_serialized_size(),
2361  0, // cont
2362  0, // data
2363  0, // alloc_strategy
2364  get_db_lock()));
2365 
2366  Serializer serializer(data.get(), Encoding::KIND_UNALIGNED_CDR,
2367  this->swap_bytes());
2368 
2369  serializer << end_msg;
2370 
2372  Message_Block_Ptr control(
2374  END_COHERENT_CHANGES, header, move(data),
2375  SystemTimePoint::now().to_dds_time()));
2376 
2377  this->coherent_ = false;
2378  this->coherent_samples_ = 0;
2379 
2380  guard.release();
2381  if (this->send_control(header, move(control)) == SEND_CONTROL_ERROR) {
2383  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
2384  ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
2385  }
2386 }
2387 
2388 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
2389 
2390 void
2392  bool dropped_by_transport)
2393 {
2394  DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
2395 
2396  //provided for statistics tracking in tests
2398 
2399  this->data_container_->data_dropped(element, dropped_by_transport);
2400 }
2401 
2402 void
2404  bool /* dropped_by_transport */)
2405 {
2406  DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
2408 }
2409 
2410 DDS::DataWriterListener_ptr
2412 {
2413  // per 2.1.4.3.1 Listener Access to Plain Communication Status
2414  // use this entities factory if listener is mask not enabled
2415  // for this kind.
2416  RcHandle<PublisherImpl> publisher = publisher_servant_.lock();
2417  if (!publisher)
2418  return 0;
2419 
2421  if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
2422  g.release();
2423  return publisher->listener_for(kind);
2424 
2425  } else {
2426  return DDS::DataWriterListener::_duplicate(listener_.in());
2427  }
2428 }
2429 
2430 int
2432  const void* /* arg */)
2433 {
2434  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2435 
2436  const MonotonicTimePoint now(tv);
2437  bool liveliness_lost = false;
2438 
2440 
2442 
2443  // Do we need to send a liveliness message?
2444  if (elapsed >= liveliness_check_interval_) {
2445  switch (this->qos_.liveliness.kind) {
2447  if (!send_liveliness(now)) {
2448  liveliness_lost = true;
2449  }
2450  break;
2451 
2453  if (liveliness_asserted_) {
2454  if (!send_liveliness(now)) {
2455  liveliness_lost = true;
2456  }
2457  }
2458  break;
2459 
2461  // Do nothing.
2462  break;
2463  }
2464  }
2465  else {
2466  // Reschedule.
2467  if (reactor_->cancel_timer(liveness_timer_.in()) == -1) {
2469  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
2470  ACE_TEXT("cancel_timer")));
2471  }
2473  (liveliness_check_interval_ - elapsed).value(),
2475  {
2477  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
2478  ACE_TEXT("schedule_timer")));
2479  }
2480  return 0;
2481  }
2482 
2483  liveliness_asserted_ = false;
2484  elapsed = now - last_liveliness_activity_time_;
2485 
2486  // Have we lost liveliness?
2487  if (elapsed >= TimeDuration(qos_.liveliness.lease_duration)) {
2488  liveliness_lost = true;
2489  }
2490 
2491  if (!this->liveliness_lost_ && liveliness_lost) {
2494 
2495  DDS::DataWriterListener_var listener =
2497 
2498  if (!CORBA::is_nil(listener.in())) {
2499  {
2502  listener->on_liveliness_lost(this, this->liveliness_lost_status_);
2503  }
2505  }
2506  }
2507 
2508  this->liveliness_lost_ = liveliness_lost;
2509  return 0;
2510 }
2511 
2512 bool
2514 {
2516  !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
2518  Message_Block_Ptr empty;
2519  Message_Block_Ptr liveliness_msg(
2521  DATAWRITER_LIVELINESS, header, move(empty),
2522  SystemTimePoint::now().to_dds_time()));
2523 
2524  if (this->send_control(header, move(liveliness_msg)) == SEND_CONTROL_ERROR) {
2526  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
2527  ACE_TEXT("send_control failed.\n")),
2528  false);
2529  }
2530  }
2532  return true;
2533 }
2534 
2535 void
2537 {
2538  const Observer_rch observer = get_observer(Observer::e_DELETED);
2539  if (observer) {
2540  observer->on_deleted(this);
2541  }
2542 
2543  this->set_deleted(true);
2544  this->stop_associating();
2546 
2547 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
2548  // Trigger data to be persisted, i.e. made durable, if so
2549  // configured. This needs be called before unregister_instances
2550  // because unregister_instances may cause instance dispose.
2551  if (!persist_data() && DCPS_debug_level >= 2) {
2552  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::prepare_to_delete: ")
2553  ACE_TEXT("failed to make data durable.\n")));
2554  }
2555 #endif
2556 
2557  // Unregister all registered instances prior to deletion.
2558  unregister_instances(SystemTimePoint::now().to_dds_time());
2559 }
2560 
2563 {
2564 
2565  if (0 != data_container_) {
2566  return data_container_->get_handle_instance(handle);
2567  }
2568 
2569  return PublicationInstance_rch();
2570 }
2571 
2572 void
2574 {
2575  DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
2576 
2577  if (!is_bit_) {
2578  // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
2579  // is given to this DataWriter then narrow() fails.
2580  DataWriterListener_var the_listener = get_ext_listener();
2581 
2582  if (!CORBA::is_nil(the_listener.in())) {
2584  // Since this callback may come after remove_association which
2585  // removes the reader from id_to_handle map, we can ignore this
2586  // error.
2587  this->lookup_instance_handles(subids,
2588  status.subscription_handles);
2589  the_listener->on_publication_disconnected(this, status);
2590  }
2591  }
2592 }
2593 
2594 void
2596 {
2597  DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
2598 
2599  if (!is_bit_) {
2600  // Narrow to DDS::DCPS::DataWriterListener. If a
2601  // DDS::DataWriterListener is given to this DataWriter then
2602  // narrow() fails.
2603  DataWriterListener_var the_listener = get_ext_listener();
2604 
2605  if (!CORBA::is_nil(the_listener.in())) {
2607 
2608  // If it's reconnected then the reader should be in id_to_handle
2609  this->lookup_instance_handles(subids, status.subscription_handles);
2610 
2611  the_listener->on_publication_reconnected(this, status);
2612  }
2613  }
2614 }
2615 
2616 void
2618 {
2619  DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
2620 
2621  if (!is_bit_) {
2622  // Narrow to DDS::DCPS::DataWriterListener. If a
2623  // DDS::DataWriterListener is given to this DataWriter then
2624  // narrow() fails.
2625  DataWriterListener_var the_listener = get_ext_listener();
2626 
2627  if (!CORBA::is_nil(the_listener.in())) {
2628  PublicationLostStatus status;
2629 
2630  // Since this callback may come after remove_association which removes
2631  // the reader from id_to_handle map, we can ignore this error.
2632  this->lookup_instance_handles(subids,
2633  status.subscription_handles);
2634  the_listener->on_publication_lost(this, status);
2635  }
2636  }
2637 }
2638 
2639 void
2641 {
2642  DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
2643 
2644  if (!is_bit_) {
2645  // Narrow to DDS::DCPS::DataWriterListener. If a
2646  // DDS::DataWriterListener is given to this DataWriter then
2647  // narrow() fails.
2648  DataWriterListener_var the_listener = get_ext_listener();
2649 
2650  if (!CORBA::is_nil(the_listener.in())) {
2651  PublicationLostStatus status;
2652 
2653  CORBA::ULong len = handles.length();
2654  status.subscription_handles.length(len);
2655 
2656  for (CORBA::ULong i = 0; i < len; ++ i) {
2657  status.subscription_handles[i] = handles[i];
2658  }
2659 
2660  the_listener->on_publication_lost(this, status);
2661  }
2662  }
2663 }
2664 
2665 
2666 void
2668  DDS::InstanceHandleSeq & hdls)
2669 {
2670  CORBA::ULong const num_rds = ids.length();
2671  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2672 
2673  if (!participant)
2674  return;
2675 
2676  if (DCPS_debug_level > 9) {
2677  OPENDDS_STRING separator;
2678  OPENDDS_STRING buffer;
2679 
2680  for (CORBA::ULong i = 0; i < num_rds; ++i) {
2681  buffer += separator + LogGuid(ids[i]).conv_;
2682  separator = ", ";
2683  }
2684 
2686  ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
2687  ACE_TEXT("searching for handles for reader Ids: %C.\n"),
2688  buffer.c_str()));
2689  }
2690 
2691  hdls.length(num_rds);
2692 
2693  for (CORBA::ULong i = 0; i < num_rds; ++i) {
2694  hdls[i] = participant->lookup_handle(ids[i]);
2695  }
2696 }
2697 
2698 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
2699 bool
2701 {
2702  return this->data_container_->persist_data();
2703 }
2704 #endif
2705 
2707 {
2708  if (!TransportRegistry::instance()->released()) {
2709  data_container_->wait_pending(wait_pending_deadline_);
2710  controlTracker.wait_messages_pending("DataWriterImpl::wait_pending", wait_pending_deadline_);
2711  }
2712 }
2713 
2714 void
2715 DataWriterImpl::get_instance_handles(InstanceHandleVec& instance_handles)
2716 {
2717  this->data_container_->get_instance_handles(instance_handles);
2718 }
2719 
2720 void
2722 {
2724  readers = this->readers_;
2725 }
2726 
2727 void
2729 {
2730  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2731  if (publisher) {
2732  publisher->get_qos(qos_data.pub_qos);
2733  }
2734  qos_data.dw_qos = this->qos_;
2735  qos_data.topic_name = this->topic_name_.in();
2736 }
2737 
2738 #if defined(OPENDDS_SECURITY)
2740 {
2742  return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
2743 }
2744 #endif
2745 
2746 bool
2748 {
2749  ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
2750  return need_sequence_repair_i();
2751 }
2752 
2753 bool
2755 {
2756  for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
2757  end = reader_info_.end(); it != end; ++it) {
2758  if (it->second.expected_sequence_ != sequence_number_) {
2759  return true;
2760  }
2761  }
2762 
2763  return false;
2764 }
2765 
2768  Message_Block_Ptr msg)
2769 {
2771 
2772  SendControlStatus status = TransportClient::send_control(header, move(msg));
2773 
2774  if (status != SEND_CONTROL_OK) {
2776  }
2777 
2778  return status;
2779 }
2780 
2783 {
2785 }
2786 
2788 {
2789  wait_pending_deadline_ = deadline;
2790 }
2791 
2792 int LivenessTimer::handle_timeout(const ACE_Time_Value& tv, const void* arg)
2793 {
2794  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2795 
2796  DataWriterImpl_rch writer = this->writer_.lock();
2797  if (writer) {
2798  writer->handle_timeout(tv, arg);
2799  } else {
2800  this->reactor()->cancel_timer(this);
2801  }
2802  return 0;
2803 }
2804 
2806 {
2808  const TransportLocatorSeq& trans_conf_info = connection_info();
2809 
2811  const GUID_t dp_id_copy = dp_id_;
2812  const GUID_t publication_id_copy = publication_id_;
2813  const int domain_id = domain_id_;
2814  guard.release();
2815 
2816  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id);
2817  disco->update_publication_locators(domain_id,
2818  dp_id_copy,
2819  publication_id_copy,
2820  trans_conf_info);
2821 }
2822 
2824 {
2825  if (qos_.representation.value.length() > 0 &&
2827  // If the QoS explicitly sets XCDR, XCDR2, or XML, force encapsulation
2828  cdr_encapsulation(true);
2829  }
2830 
2831  if (cdr_encapsulation()) {
2832  Encoding::Kind encoding_kind;
2833  // There should only be one data representation in a DataWriter, so
2834  // simply use qos_.representation.value[0].
2835  if (repr_to_encoding_kind(qos_.representation.value[0], encoding_kind)) {
2836  encoding_mode_ = EncodingMode(type_support_, encoding_kind, swap_bytes());
2837  if (encoding_kind == Encoding::KIND_XCDR1 &&
2839  if (log_level >= LogLevel::Notice) {
2840  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2841  "Encountered unsupported combination of XCDR1 encoding and mutable extensibility "
2842  "for writer of type %C\n",
2843  type_support_->name()));
2844  }
2845  return DDS::RETCODE_ERROR;
2846  } else if (encoding_kind == Encoding::KIND_UNALIGNED_CDR) {
2847  if (log_level >= LogLevel::Notice) {
2848  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2849  "Unaligned CDR is not supported by transport types that require encapsulation\n"));
2850  }
2851  return DDS::RETCODE_ERROR;
2852  }
2853  } else if (log_level >= LogLevel::Warning) {
2854  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: DataWriterImpl::setup_serialization: "
2855  "Encountered unsupported or unknown data representation: %C ",
2856  "for writer of type %C\n",
2858  type_support_->name()));
2859  }
2860  } else {
2861  // Pick unaligned CDR as it is the implicit representation for non-encapsulated
2863  }
2864  if (!encoding_mode_.valid()) {
2865  if (log_level >= LogLevel::Notice) {
2866  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2867  "Could not find a valid data representation\n"));
2868  }
2869  return DDS::RETCODE_ERROR;
2870  }
2871 
2872  if (DCPS_debug_level >= 2) {
2873  ACE_DEBUG((LM_DEBUG, "(%P|%t) WriterImpl::setup_serialization: "
2874  "Setup successfully with %C data representation.\n",
2876  }
2877 
2878  // Set up allocator with reserved space for data if it is bounded
2879  const SerializedSizeBound buffer_size_bound = encoding_mode_.buffer_size_bound();
2880  if (buffer_size_bound) {
2881  const size_t chunk_size = buffer_size_bound.get();
2882  data_allocator_.reset(new DataAllocator(n_chunks_, chunk_size));
2883  if (DCPS_debug_level >= 2) {
2884  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::setup_serialization: "
2885  "using data allocator at %x with %B %B byte chunks\n",
2886  data_allocator_.get(),
2887  n_chunks_,
2888  chunk_size));
2889  }
2890  } else if (DCPS_debug_level >= 2) {
2891  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::setup_serialization: "
2892  "sample size is unbounded, not using data allocator, "
2893  "always allocating from heap\n"));
2894  }
2895  return DDS::RETCODE_OK;
2896 }
2897 
2899 {
2901  const InstanceHandlesToValues::iterator it = instance_handles_to_values_.find(handle);
2902  if (it == instance_handles_to_values_.end()) {
2904  }
2905  sample = it->second->copy(Sample::Mutable);
2906  return DDS::RETCODE_OK;
2907 }
2908 
2910 {
2912  const InstanceValuesToHandles::iterator it = find_instance(sample);
2913  return it == instance_values_to_handles_.end() ? DDS::HANDLE_NIL : it->second;
2914 }
2915 
2917  const Sample& sample, const DDS::Time_t& timestamp)
2918 {
2919  DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
2920  const DDS::ReturnCode_t ret = get_or_create_instance_handle(registered_handle, sample, timestamp);
2921  if (ret != DDS::RETCODE_OK && log_level >= LogLevel::Notice) {
2922  ACE_ERROR((LM_NOTICE, ACE_TEXT("(%P|%t) NOTICE: DataWriterImpl::register_instance_w_timestamp: ")
2923  ACE_TEXT("register failed: %C\n"),
2924  retcode_to_string(ret)));
2925  }
2926  return registered_handle;
2927 }
2928 
2930  const Sample& sample,
2931  DDS::InstanceHandle_t instance_handle,
2932  const DDS::Time_t& timestamp)
2933 {
2935  "unregister_instance_w_timestamp", sample, instance_handle, /* remove = */ true);
2936  if (rc != DDS::RETCODE_OK) {
2937  return rc;
2938  }
2939  return unregister_instance_i(instance_handle, timestamp);
2940 }
2941 
2943  const Sample& sample,
2944  DDS::InstanceHandle_t instance_handle,
2945  const DDS::Time_t& source_timestamp)
2946 {
2947 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
2948  DDS::DynamicData_var dynamic_data = sample.get_dynamic_data(dynamic_type_);
2950  if (dynamic_data && security_config_ &&
2952  !security_config_->get_access_control()->check_local_datawriter_dispose_instance(participant_permissions_handle_, this, dynamic_data, ex)) {
2953  if (log_level >= LogLevel::Notice) {
2955  "(%P|%t) NOTICE: DataWriterImpl::dispose_w_timestamp: unable to dispose instance SecurityException[%d.%d]: %C\n",
2956  ex.code, ex.minor_code, ex.message.in()));
2957  }
2959  }
2960 #endif
2961 
2963  "dispose_w_timestamp", sample, instance_handle);
2964  if (rc != DDS::RETCODE_OK) {
2965  return rc;
2966  }
2967  return dispose(instance_handle, source_timestamp);
2968 }
2969 
2971 {
2972  const bool encapsulated = cdr_encapsulation();
2974  Message_Block_Ptr mb;
2975  ACE_Message_Block* tmp_mb;
2976 
2977  // Don't use the cached allocator for the registered sample message
2978  // block.
2979  if (sample.key_only() && !skip_serialize_) {
2980  ACE_NEW_RETURN(tmp_mb,
2982  encoding_mode_.buffer_size(sample),
2984  0, // cont
2985  0, // data
2986  0, // alloc_strategy
2987  get_db_lock()),
2988  0);
2989  } else {
2990  ACE_NEW_MALLOC_RETURN(tmp_mb,
2991  static_cast<ACE_Message_Block*>(
2992  mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2994  encoding_mode_.buffer_size(sample),
2996  0, // cont
2997  0, // data
2998  data_allocator_.get(), // allocator_strategy
2999  get_db_lock(), // data block locking_strategy
3003  db_allocator_.get(),
3004  mb_allocator_.get()),
3005  0);
3006  }
3007  mb.reset(tmp_mb);
3008 
3009  if (skip_serialize_) {
3010  if (!sample.to_message_block(*mb)) {
3011  if (log_level >= LogLevel::Error) {
3012  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3013  "to_message_block failed\n"));
3014  }
3015  return 0;
3016  }
3017  } else {
3018  Serializer serializer(mb.get(), encoding);
3019  if (encapsulated) {
3020  EncapsulationHeader encap;
3021  if (!encap.from_encoding(encoding, type_support_->base_extensibility())) {
3022  // from_encoding logged the error
3023  return 0;
3024  }
3025  if (!(serializer << encap)) {
3026  if (log_level >= LogLevel::Error) {
3027  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3028  "failed to serialize data encapsulation header\n"));
3029  }
3030  return 0;
3031  }
3032  }
3033  if (!sample.serialize(serializer)) {
3034  if (log_level >= LogLevel::Error) {
3035  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3036  "failed to serialize sample data\n"));
3037  }
3038  return 0;
3039  }
3040  if (encapsulated && !EncapsulationHeader::set_encapsulation_options(mb)) {
3041  if (log_level >= LogLevel::Error) {
3042  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3043  "set_encapsulation_options failed\n"));
3044  }
3045  return 0;
3046  }
3047  }
3048 
3049  return mb.release();
3050 }
3051 
3053 {
3054  OPENDDS_ASSERT(sample->key_only());
3055  if (!instance_handles_to_values_.insert(
3056  InstanceHandlesToValues::value_type(handle, sample)).second) {
3057  return false;
3058  }
3059  if (!instance_values_to_handles_.insert(
3060  InstanceValuesToHandles::value_type(sample, handle)).second) {
3061  instance_handles_to_values_.erase(handle);
3062  return false;
3063  }
3064  return true;
3065 }
3066 
3067 DataWriterImpl::InstanceValuesToHandles::iterator
3069 {
3070  Sample_rch dummy_rch(const_cast<Sample*>(&sample), keep_count());
3071  InstanceValuesToHandles::iterator pos = instance_values_to_handles_.find(dummy_rch);
3072  dummy_rch._retn();
3073  return pos;
3074 }
3075 
3077  DDS::InstanceHandle_t& handle,
3078  const Sample& sample,
3079  const DDS::Time_t& source_timestamp)
3080 {
3082 
3083  handle = lookup_instance(sample);
3084  if (handle == DDS::HANDLE_NIL || !get_handle_instance(handle)) {
3086 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
3087  DDS::DynamicData_var dynamic_data = copy->get_dynamic_data(dynamic_type_);
3089  if (dynamic_data && security_config_ &&
3091  !security_config_->get_access_control()->check_local_datawriter_register_instance(participant_permissions_handle_, this, dynamic_data, ex)) {
3092  if (log_level >= LogLevel::Notice) {
3094  "(%P|%t) NOTICE: DataWriterImpl::get_or_create_instance_handle: unable to register instance SecurityException[%d.%d]: %C\n",
3095  ex.code, ex.minor_code, ex.message.in()));
3096  }
3098  }
3099 #endif
3100 
3101  // don't use fast allocator for registration.
3102  const TypeSupportImpl* const ts = get_type_support();
3103  Message_Block_Ptr serialized(serialize_sample(*copy));
3104  if (!serialized) {
3105  if (log_level >= LogLevel::Notice) {
3106  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: "
3107  "failed to serialize sample\n", ts->name()));
3108  }
3109  return DDS::RETCODE_ERROR;
3110  }
3111 
3112  // tell DataWriterLocal and Publisher about the instance.
3113  const DDS::ReturnCode_t ret = register_instance_i(handle, move(serialized), source_timestamp);
3114  // note: the WriteDataContainer/PublicationInstance maintains ownership
3115  // of the marshalled sample.
3116  if (ret != DDS::RETCODE_OK) {
3117  handle = DDS::HANDLE_NIL;
3118  return ret;
3119  }
3120 
3121  if (!insert_instance(handle, copy)) {
3122  handle = DDS::HANDLE_NIL;
3123  if (log_level >= LogLevel::Notice) {
3124  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: "
3125  "insert instance failed\n", ts->name()));
3126  }
3127  return DDS::RETCODE_ERROR;
3128  }
3129 
3131  }
3132 
3133  return DDS::RETCODE_OK;
3134 }
3135 
3137  const char* const method_name,
3138  const Sample& sample,
3139  DDS::InstanceHandle_t& instance_handle,
3140  bool remove)
3141 {
3142  OPENDDS_ASSERT(sample.key_only());
3143 
3145 
3146  const InstanceValuesToHandles::iterator pos = find_instance(sample);
3147  if (pos == instance_values_to_handles_.end()) {
3148  if (log_level >= LogLevel::Notice) {
3149  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::%C: "
3150  "The instance sample is not registered\n",
3151  method_name));
3152  }
3153  return DDS::RETCODE_ERROR;
3154  }
3155 
3156  if (instance_handle != DDS::HANDLE_NIL && instance_handle != pos->second) {
3158  }
3159 
3160  instance_handle = pos->second;
3161 
3162  if (remove) {
3163  instance_values_to_handles_.erase(pos);
3164  instance_handles_to_values_.erase(instance_handle);
3165  }
3166 
3167  return DDS::RETCODE_OK;
3168 }
3169 
3171  const Sample& sample,
3172  DDS::InstanceHandle_t handle,
3173  const DDS::Time_t& source_timestamp)
3174 {
3175  // This operation assumes the provided handle is valid. The handle provided
3176  // will not be verified.
3177 
3178  if (handle == DDS::HANDLE_NIL) {
3179  DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
3180  const DDS::ReturnCode_t ret =
3181  get_or_create_instance_handle(registered_handle, sample, source_timestamp);
3182  if (ret != DDS::RETCODE_OK) {
3183  if (log_level >= LogLevel::Notice) {
3184  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::write_w_timestamp: "
3185  "register failed: %C\n",
3186  get_type_support()->name(),
3187  retcode_to_string(ret)));
3188  }
3189  return ret;
3190  }
3191 
3192  handle = registered_handle;
3193  }
3194 
3195  // list of reader GUID_ts that should not get data
3196  GUIDSeq_var filter_out;
3197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
3198  if (TheServiceParticipant->publisher_content_filter()) {
3200  for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
3201  end = reader_info_.end(); iter != end; ++iter) {
3202  const ReaderInfo& ri = iter->second;
3203  if (!ri.eval_.is_nil()) {
3204  if (!filter_out.ptr()) {
3205  filter_out = new OpenDDS::DCPS::GUIDSeq;
3206  }
3207  if (!sample.eval(*ri.eval_, ri.expression_params_)) {
3208  push_back(filter_out.inout(), iter->first);
3209  }
3210  }
3211  }
3212  }
3213 #endif
3214 
3215  return write_sample(sample, handle, source_timestamp, filter_out._retn());
3216 }
3217 
3219  const Sample& sample,
3220  DDS::InstanceHandle_t handle,
3221  const DDS::Time_t& source_timestamp,
3223 {
3224  Message_Block_Ptr serialized(serialize_sample(sample));
3225  if (!serialized) {
3226  if (log_level >= LogLevel::Notice) {
3227  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::write_sample: "
3228  "failed to serialize sample\n"));
3229  }
3230  return DDS::RETCODE_ERROR;
3231  }
3232 
3233  return write(move(serialized), handle, source_timestamp, filter_out, sample.native_data());
3234 }
3235 
3236 } // namespace DCPS
3237 } // namespace OpenDDS
3238 
void set_sample(Message_Block_Ptr sample)
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
void control_dropped(const Message_Block_Ptr &sample, bool dropped_by_transport)
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
RcHandle< PublicationInstance > PublicationInstance_rch
DDS::ReturnCode_t get_key_value(Sample_rch &sample, DDS::InstanceHandle_t handle)
#define ACE_DEBUG(X)
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness.
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
DDS::ReturnCode_t write_sample(const Sample &sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
HistoryQosPolicy history
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
ReaderInfo(const char *filter_class_name, const char *filter, const DDS::StringSeq &params, WeakRcHandle< DomainParticipantImpl > participant, bool durable)
SerializedSizeBound buffer_size_bound() const
size_t n_chunks_
The number of chunks for the cached allocator.
#define ACE_ERROR(X)
const DDS::StatusMask NO_STATUS_MASK
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual Extensibility max_extensibility() const =0
const StatusKind LIVELINESS_LOST_STATUS
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
ACE_Thread_Mutex sn_lock_
Mutex for sequence_number_.
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
virtual DDS::ReturnCode_t set_listener(DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
virtual void remove_associations(const ReaderIdSeq &readers, bool callback)
const LogLevel::Value value
Definition: debug.cpp:61
DataWriterListener_ptr get_ext_listener()
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
const DataSampleHeader & get_header() const
CORBA::String_var topic_name_
The name of associated topic.
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
static const ACE_Time_Value max_time
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
void enable_transport(bool reliable, bool durable)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
virtual void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData &qos_data) const
ReliabilityQosPolicy reliability
virtual DDS::ReturnCode_t wait_for_acknowledgments(const DDS::Duration_t &max_wait)
iterator end()
Return iterator to end of list.
if(!(yy_init))
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
SendStateDataSampleList get_resend_data()
virtual void on_deleted(DDS::DataWriter_ptr)
Definition: Observer.h:79
SendControlStatus send_w_control(SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
LM_INFO
void set_filter_out(GUIDSeq *filter_out)
const TransportLocatorSeq & connection_info() const
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
DurabilityQosPolicy durability
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
TransportLocatorSeq remote_data_
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
HistoryQosPolicyKind kind
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
bool should_ack() const
Does this writer have samples to be acknowledged?
SendStateDataSampleList available_data_list_
DDS::Security::PermissionsHandle participant_permissions_handle_
DDS::ReturnCode_t unregister_instance_i(DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
bool send_liveliness(const MonotonicTimePoint &now)
Send the liveliness message.
void notify_publication_disconnected(const ReaderIdSeq &subids)
virtual void on_disassociated(DDS::DataWriter_ptr, const GUID_t &)
Definition: Observer.h:87
DCPS::String repr_to_string(const DDS::DataRepresentationId_t &repr)
Definition: DCPS_Utils.cpp:473
unique_ptr< DataSampleHeaderAllocator > header_allocator_
The header data allocator.
virtual void update_subscription_params(const GUID_t &readerId, const DDS::StringSeq &params)
DDS::InstanceHandle_t lookup_instance(const Sample &sample)
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
void send_all_to_flush_control(ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
MonotonicTime_t participant_discovered_at_
void set_deleted(bool state)
Definition: EntityImpl.cpp:83
DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
GuidSet RepoIdSet
Definition: GuidUtils.h:113
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
DDS::ReturnCode_t create_sample_data_message(Message_Block_Ptr data, DDS::InstanceHandle_t instance_handle, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
const char * c_str() const
void notify_publication_reconnected(const ReaderIdSeq &subids)
DataBlockLockPool::DataBlockLock * get_db_lock()
Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..
virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(DDS::OfferedIncompatibleQosStatus &status)
void data_delivered(const DataSampleElement *sample)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
const StatusKind OFFERED_INCOMPATIBLE_QOS_STATUS
void wait_pending()
Wait for pending data and control messages to drain.
SequenceNumber get_max_sn() const
TransportLocator discovery_locator_
DDS::LivelinessLostStatus liveliness_lost_status_
Status conditions.
ACE_CDR::ULong remote_transport_context_
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
DeadlineQosPolicy deadline
bool participant_liveliness_activity_after(const MonotonicTimePoint &tv)
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const ACE_Time_Value & value() const
virtual DDS::ReturnCode_t get_offered_deadline_missed_status(DDS::OfferedDeadlineMissedStatus &status)
DDS::DataWriterListener_var listener_
Used to notify the entity for relevant events.
void association_complete_i(const GUID_t &remote_id)
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
sequence< TransportLocator > TransportLocatorSeq
MonotonicTimePoint deadline() const
const StatusKind PUBLICATION_MATCHED_STATUS
bool topicIsBIT(const char *name, const char *type)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
MonotonicTimePoint wait_pending_deadline_
DataRepresentationQosPolicy representation
ACE_Reactor_Timer_Interface * reactor_
Cached_Allocator_With_Overflow< DataSampleHeader, ACE_Null_Mutex > DataSampleHeaderAllocator
void unregister_instances(const DDS::Time_t &source_timestamp)
bool key_only() const
Definition: Sample.h:75
#define ACE_CDR_BYTE_ORDER
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
int release(void)
ACE_Message_Block * serialize_sample(const Sample &sample)
void disassociate(const GUID_t &peerId)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
OpenDDS::DCPS::TypeSupport_ptr get_type_support()
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
const ValueDispatcher * get_value_dispatcher() const
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
DDS::InstanceHandle_t get_next_handle()
DDS::InstanceStateKind instance_state() const
void data_dropped(const DataSampleElement *element, bool dropped_by_transport)
const DDS::StatusMask DEFAULT_STATUS_MASK
bool valid_data() const
Returns true if the sample has a complete serialized payload.
AckToken create_ack_token(DDS::Duration_t max_wait) const
Create an AckToken for ack operations.
RepoIdToReaderInfoMap reader_info_
Implements the OpenDDS::DCPS::Publisher interfaces.
Definition: PublisherImpl.h:38
void set_writer_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos, bool cdr_encapsulated)
Definition: DCPS_Utils.cpp:508
class OpenDDS::DCPS::DataWriterImpl::EncodingMode encoding_mode_
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
MessageId
One byte message id (<256)
bool insert_instance(DDS::InstanceHandle_t handle, Sample_rch &sample)
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
Security::SecurityConfig_rch security_config_
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
virtual void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
#define OPENDDS_STRING
virtual DDS::InstanceHandle_t get_instance_handle()
RcHandle< LivenessTimer > liveness_timer_
sequence< GUID_t > ReaderIdSeq
static bool set_encapsulation_options(Message_Block_Ptr &mb)
Definition: Serializer.cpp:251
DurabilityServiceQosPolicy durability_service
DDS::ReturnCode_t instance_must_exist(const char *method_name, const Sample &sample, DDS::InstanceHandle_t &instance_handle, bool remove=false)
virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos &qos)
virtual int handle_timeout(const ACE_Time_Value &tv, const void *arg)
Handle the assert liveliness timeout.
void enqueue_tail(const DataSampleElement *element)
LM_DEBUG
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
virtual const char * name() const =0
bool coherent_changes_pending()
Are coherent changes pending?
void begin_coherent_changes()
Starts a coherent change set; should only be called once.
DDS::ReturnCode_t wait_for_specific_ack(const AckToken &token)
virtual DDS::DynamicData_var get_dynamic_data(DDS::DynamicType_ptr type) const =0
virtual DDS::Topic_ptr get_topic()
InstanceValuesToHandles::iterator find_instance(const Sample &sample)
void get_readers(RepoIdSet &readers)
void end_coherent_changes(const GroupCoherentSamples &group_samples)
Ends a coherent change set; should only be called once.
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
const StatusKind OFFERED_DEADLINE_MISSED_STATUS
DurabilityQosPolicyKind kind
DDS::DataWriterQos passed_qos_
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
DurabilityQosPolicy durability
PublicationInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
virtual bool to_message_block(ACE_Message_Block &mb) const =0
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
WeakRcHandle< DomainParticipantImpl > participant_servant_
virtual DDS::ReturnCode_t assert_liveliness()
void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
void add_types(const XTypes::TypeLookupService_rch &tls) const
bool from_encoding(const Encoding &encoding, Extensibility extensibility)
Definition: Serializer.cpp:113
virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos &qos)
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
ACE_CDR::Boolean Boolean
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
const ReturnCode_t RETCODE_TIMEOUT
virtual DDS::DynamicType_ptr get_type() const
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
void return_handle(DDS::InstanceHandle_t handle)
DDS::ReturnCode_t dispose_w_timestamp(const Sample &sample, DDS::InstanceHandle_t instance_handle, const DDS::Time_t &source_timestamp)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
GUID_t topic_id_
The associated topic repository id.
virtual void replay_durable_data_for(const GUID_t &remote_sub_id)
LM_NOTICE
InstanceHandlesToValues instance_handles_to_values_
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.
Atomic< int > data_dropped_count_
Statistics counter.
ACE_Message_Block * cont(void) const
TypeSupportImpl * type_support_
DDS::DomainId_t domain_id_
The domain id.
ResourceLimitsQosPolicy resource_limits
iterator begin()
Return iterator to beginning of list.
ACE_UINT64 min_suspended_transaction_id_
The cached available data while suspending and associated transaction ids.
DDS::InstanceHandle_t register_instance_w_timestamp(const Sample &sample, const DDS::Time_t &timestamp)
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
virtual Sample_rch copy(Mutability mutability, Extent extent) const =0
unique_ptr< DataAllocator > data_allocator_
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
size_t total_length(void) const
DDS::ReturnCode_t setup_serialization()
virtual RcHandle< EntityImpl > parent() const
LM_WARNING
void get_instance_handles(InstanceHandleVec &instance_handles)
DDS::DomainId_t domain_id() const
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
const DDS::DataRepresentationId_t UNALIGNED_CDR_DATA_REPRESENTATION
unique_ptr< MessageBlockAllocator > mb_allocator_
The message block allocator.
OPENDDS_STRING conv_
DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle, size_t &size)
const char *const name
Definition: debug.cpp:60
DDS::ReturnCode_t write(Message_Block_Ptr sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out, const void *real_data)
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
char const * get_type_name() const
void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
virtual bool eval(FilterEvaluator &evaluator, const DDS::StringSeq &params) const =0
ACE_TEXT("TCP_Factory")
WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
unsigned long StatusMask
virtual void on_enabled(DDS::DataWriter_ptr)
Definition: Observer.h:77
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
bool associate(const AssociationData &peer, bool active)
TransportPriorityQosPolicy transport_priority
const ReturnCode_t RETCODE_NOT_ENABLED
size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
unsigned long long ACE_UINT64
unsigned long nanosec
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
DDS::PublicationMatchedStatus publication_match_status_
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
void notify_publication_lost(const ReaderIdSeq &subids)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual DDS::DataWriterListener_ptr get_listener()
ReliabilityQosPolicy reliability
virtual bool check_transport_qos(const TransportInst &inst)
OpenDDS_Dcps_Export LogLevel log_level
DDS::ReturnCode_t assert_liveliness_by_participant()
virtual void add_association(const GUID_t &yourId, const ReaderAssociation &reader, bool active)
bool eval(const T &sample, const DDS::StringSeq &params) const
bool repr_to_encoding_kind(DDS::DataRepresentationId_t repr, Encoding::Kind &kind)
Definition: DCPS_Utils.cpp:455
virtual DDS::Publisher_ptr get_publisher()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle, const DDS::Time_t &timestamp)
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
Definition: MessageTypes.h:50
virtual DDS::ReturnCode_t enable()
SendStateDataSampleList STL-style iterator implementation.
DDS::ReturnCode_t write_w_timestamp(const Sample &sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
Sequence number abstraction. Only allows positive 64 bit values.
void to_type_info(XTypes::TypeInformation &type_info) const
void init(TopicImpl *topic_servant, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, const DDS::StatusMask &mask, WeakRcHandle< DomainParticipantImpl > participant_servant, PublisherImpl *publisher_servant)
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
static const ACE_Time_Value zero
void control_delivered(const Message_Block_Ptr &sample)
TimeDuration liveliness_check_interval_
The time interval for sending liveliness message.
virtual DDS::ReturnCode_t get_liveliness_lost_status(DDS::LivelinessLostStatus &status)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
DDS::ReturnCode_t copy(DDS::DynamicData_ptr dest, DDS::DynamicData_ptr src)
const ReturnCode_t RETCODE_ERROR
DDS::ReturnCode_t register_instance_from_durable_data(DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
virtual SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DDS::ReturnCode_t get_or_create_instance_handle(DDS::InstanceHandle_t &handle, const Sample &sample, const DDS::Time_t &source_timestamp)
ACE_UINT64 get_unsent_data(SendStateDataSampleList &list)
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
LifespanQosPolicy lifespan
static TransportRegistry * instance()
Return a singleton instance of this class.
DDS::DynamicType_var dynamic_type_
TypeSupportImpl * get_type_support() const
WriterDataLifecycleQosPolicy writer_data_lifecycle
virtual void on_sample_sent(DDS::DataWriter_ptr, const Sample &)
Definition: Observer.h:91
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
size_t buffer_size(const Sample &sample) const
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
virtual void on_associated(DDS::DataWriter_ptr, const GUID_t &)
Definition: Observer.h:85
virtual void on_qos_changed(DDS::DataWriter_ptr)
Definition: Observer.h:81
ACE_Thread_Mutex reader_info_lock_
::DDS::InstanceHandleSeq subscription_handles
const char * to_string(MessageId value)
const ReturnCode_t RETCODE_NOT_ALLOWED_BY_SECURITY
LivelinessQosPolicyKind kind
ACE_Recursive_Thread_Mutex & get_lock() const
virtual DDS::ReturnCode_t get_matched_subscriptions(DDS::InstanceHandleSeq &subscription_handles)
void wait_messages_pending(const char *caller)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
DDS::ReturnCode_t send_request_ack()
const long LENGTH_UNLIMITED
ACE_Recursive_Thread_Mutex lock_
#define ACE_ERROR_RETURN(X, Y)
LivelinessQosPolicy liveliness
RcHandle< T > lock() const
Definition: RcObject.h:188
virtual DDS::ReturnCode_t get_matched_subscription_data(DDS::SubscriptionBuiltinTopicData &subscription_data, DDS::InstanceHandle_t subscription_handle)
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
const character_type * in(void) const
DataRepresentationIdSeq value
unsigned long StatusKind
virtual DDS::ReturnCode_t get_publication_matched_status(DDS::PublicationMatchedStatus &status)
void check_and_set_repo_id(const GUID_t &id)
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
virtual Extensibility base_extensibility() const =0
Returns the extensibility of just the topic type.
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define TheServiceParticipant
RepoIdToHandleMap id_to_handle_map_
void track_sequence_number(GUIDSeq *filter_out)
InstanceValuesToHandles instance_values_to_handles_
virtual int handle_timeout(const ACE_Time_Value &tv, const void *arg)
Handle the assert liveliness timeout.
CORBA::Long last_deadline_missed_total_count_
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
LM_ERROR
GUID_t publication_id_
The repository id of this datawriter/publication.
virtual bool serialize(Serializer &ser) const =0
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
virtual void update_incompatible_qos(const IncompatibleQosStatus &status)
DDS::ReturnCode_t register_instance_i(DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
RcHandle< WriteDataContainer > data_container_
The sample data container.
void set_wait_pending_deadline(const MonotonicTimePoint &deadline)
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
virtual void transport_assoc_done(int flags, const GUID_t &remote_id)
LivelinessQosPolicyKind
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
Boolean is_nil(T x)
CORBA::String_var type_name_
The type name of associated topic.
TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
virtual const void * native_data() const =0
unique_ptr< DataBlockAllocator > db_allocator_
The data block allocator.
DDS::ReturnCode_t unregister_instance_w_timestamp(const Sample &sample, DDS::InstanceHandle_t instance_handle, const DDS::Time_t &timestamp)
RcHandle< BitSubscriber > get_builtin_subscriber_proxy() const
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
bool has_non_key_fields(const TypeSupportImpl &ts) const
SendControlStatus
Return code type for send_control() operations.
static const TimeDuration max_value
Definition: TimeDuration.h:32
const ReturnCode_t RETCODE_BAD_PARAMETER