OpenDDS  Snapshot(2023/04/28-20:55)
DataWriterImpl.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_DATAWRITERIMPL_H
7 #define OPENDDS_DCPS_DATAWRITERIMPL_H
8 
9 #include "Atomic.h"
10 #include "Sample.h"
11 #include "DataWriterCallbacks.h"
14 #include "MessageTracker.h"
15 #include "DataBlockLockPool.h"
16 #include "PoolAllocator.h"
17 #include "WriteDataContainer.h"
18 #include "Definitions.h"
19 #include "DataSampleHeader.h"
20 #include "TopicImpl.h"
21 #include "Time_Helper.h"
22 #include "CoherentChangeControl.h"
23 #include "GuidUtils.h"
24 #include "RcEventHandler.h"
25 #include "unique_ptr.h"
26 #include "Message_Block_Ptr.h"
27 #include "TimeTypes.h"
28 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
29 # include "FilterEvaluator.h"
30 #endif
31 
32 #include <dds/DdsDcpsDomainC.h>
33 #include <dds/DdsDcpsTopicC.h>
34 
35 #include <ace/Event_Handler.h>
36 #include <ace/OS_NS_sys_time.h>
37 
38 #include <memory>
39 
40 #ifndef ACE_LACKS_PRAGMA_ONCE
41 # pragma once
42 #endif /* ACE_LACKS_PRAGMA_ONCE */
43 
44 class DDS_TEST;
45 
47 
48 namespace OpenDDS {
49 namespace DCPS {
50 
51 class PublisherImpl;
52 class DomainParticipantImpl;
53 class Monitor;
54 class DataSampleElement;
55 class SendStateDataSampleList;
56 struct AssociationData;
57 class LivenessTimer;
58 
59 /**
60  * @class DataWriterImpl
61  *
62  * @brief Implements the OpenDDS::DCPS::DataWriterRemote interfaces and
63  * DDS::DataWriter interfaces.
64  *
65  * See the DDS specification, OMG formal/2015-04-10, for a description of
66  * the interface this class is implementing.
67  *
68  * This class must be inherited by the type-specific datawriter which
69  * is specific to the data-type associated with the topic.
70  *
71  * @note: This class is responsible for allocating memory for the
72  * header message block
73  * (MessageBlock + DataBlock + DataSampleHeader) and the
74  * DataSampleElement.
75  * The data-type datawriter is responsible for allocating
76  * memory for the sample data message block.
77  * (e.g. MessageBlock + DataBlock + Foo data). But it gives
78  * up ownership to this WriteDataContainer.
79  */
81  : public virtual LocalObject<DDS::DataWriter>
82  , public virtual DataWriterCallbacks
83  , public virtual EntityImpl
84  , public virtual TransportClient
85  , public virtual TransportSendListener
86 {
87 public:
88  friend class WriteDataContainer;
89  friend class PublisherImpl;
90 
91  typedef OPENDDS_MAP_CMP(GUID_t, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap;
93 
94  struct AckToken {
98 
99  AckToken(const DDS::Duration_t& max_wait,
100  const SequenceNumber& sequence)
101  : tstamp_(MonotonicTimePoint::now())
102  , max_wait_(max_wait)
103  , sequence_(sequence)
104  {
105  }
106 
108 
110  {
111  return tstamp_ + TimeDuration(max_wait_);
112  }
113 
114  bool deadline_is_infinite() const
115  {
116  return max_wait_.sec == DDS::DURATION_INFINITE_SEC && max_wait_.nanosec == DDS::DURATION_INFINITE_NSEC;
117  }
118  };
119 
120  DataWriterImpl();
121 
122  virtual ~DataWriterImpl();
123 
125  {
126  skip_serialize_ = value;
127  }
128 
130  {
131  return skip_serialize_;
132  }
133 
134  DataAllocator* data_allocator() const
135  {
136  return data_allocator_.get();
137  }
138 
139  virtual DDS::InstanceHandle_t get_instance_handle();
140 
141  virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos & qos);
142 
143  virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos & qos);
144 
145  virtual DDS::ReturnCode_t set_listener(
146  DDS::DataWriterListener_ptr a_listener,
147  DDS::StatusMask mask);
148 
149  virtual DDS::DataWriterListener_ptr get_listener();
150 
151  virtual DDS::Topic_ptr get_topic();
152 
153  virtual DDS::ReturnCode_t wait_for_acknowledgments(
154  const DDS::Duration_t & max_wait);
155 
156  virtual DDS::Publisher_ptr get_publisher();
157 
158  virtual DDS::ReturnCode_t get_liveliness_lost_status(
159  DDS::LivelinessLostStatus & status);
160 
161  virtual DDS::ReturnCode_t get_offered_deadline_missed_status(
163 
164  virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(
166 
167  virtual DDS::ReturnCode_t get_publication_matched_status(
169 
170  TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
171 
172  bool participant_liveliness_activity_after(const MonotonicTimePoint& tv);
173 
174  virtual DDS::ReturnCode_t assert_liveliness();
175 
176  DDS::ReturnCode_t assert_liveliness_by_participant();
177 
178  typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
179  void get_instance_handles(InstanceHandleVec& instance_handles);
180 
181  void get_readers(RepoIdSet& readers);
182 
183  virtual DDS::ReturnCode_t get_matched_subscriptions(
184  DDS::InstanceHandleSeq & subscription_handles);
185 
186 #if !defined (DDS_HAS_MINIMUM_BIT)
187  virtual DDS::ReturnCode_t get_matched_subscription_data(
188  DDS::SubscriptionBuiltinTopicData & subscription_data,
189  DDS::InstanceHandle_t subscription_handle);
190 #endif // !defined (DDS_HAS_MINIMUM_BIT)
191 
192  virtual DDS::ReturnCode_t enable();
193 
194  virtual void add_association(const GUID_t& yourId,
195  const ReaderAssociation& reader,
196  bool active);
197 
198  virtual void transport_assoc_done(int flags, const GUID_t& remote_id);
199 
200  virtual void remove_associations(const ReaderIdSeq & readers,
201  bool callback);
202 
203  virtual void replay_durable_data_for(const GUID_t& remote_sub_id);
204 
205  virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
206 
207  virtual void update_subscription_params(const GUID_t& readerId,
208  const DDS::StringSeq& params);
209 
210  /**
211  * cleanup the DataWriter.
212  */
213  void cleanup();
214 
215  /**
216  * Initialize the data members.
217  */
218  void init(
219  TopicImpl* topic_servant,
220  const DDS::DataWriterQos& qos,
221  DDS::DataWriterListener_ptr a_listener,
222  const DDS::StatusMask& mask,
223  WeakRcHandle<DomainParticipantImpl> participant_servant,
224  PublisherImpl* publisher_servant);
225 
226  void send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard);
227 
228  /**
229  * Delegate to the WriteDataContainer to register
230  * Must tell the transport to broadcast the registered
231  * instance upon returning.
232  */
234  register_instance_i(
235  DDS::InstanceHandle_t& handle,
236  Message_Block_Ptr data,
237  const DDS::Time_t& source_timestamp);
238 
239  /**
240  * Delegate to the WriteDataContainer to register and tell
241  * the transport to broadcast the registered instance.
242  */
244  register_instance_from_durable_data(
245  DDS::InstanceHandle_t& handle,
246  Message_Block_Ptr data,
247  const DDS::Time_t & source_timestamp);
248 
249  /**
250  * Delegate to the WriteDataContainer to unregister and tell
251  * the transport to broadcast the unregistered instance.
252  */
254  unregister_instance_i(
255  DDS::InstanceHandle_t handle,
256  const DDS::Time_t & source_timestamp);
257 
258  /**
259  * Unregister all registered instances and tell the transport
260  * to broadcast the unregistered instances.
261  */
262  void unregister_instances(const DDS::Time_t& source_timestamp);
263 
264  /**
265  * Delegate to the WriteDataContainer to queue the instance
266  * sample and finally tell the transport to send the sample.
267  * \param filter_out can either be null (if the writer can't
268  * or won't evaluate the filters), or a list of
269  * associated reader GUID_ts that should NOT get the
270  * data sample due to content filtering.
271  */
273  DDS::InstanceHandle_t handle,
274  const DDS::Time_t& source_timestamp,
275  GUIDSeq* filter_out,
276  const void* real_data);
277 
278  DDS::ReturnCode_t write_sample(
279  const Sample& sample,
280  DDS::InstanceHandle_t handle,
281  const DDS::Time_t& source_timestamp,
282  GUIDSeq* filter_out);
283 
284  /**
285  * Delegate to the WriteDataContainer to dispose all data
286  * samples for a given instance and tell the transport to
287  * broadcast the disposed instance.
288  */
290  const DDS::Time_t & source_timestamp);
291 
292  /**
293  * Return the number of samples for a given instance.
294  */
295  DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle,
296  size_t& size);
297 
298  /**
299  * Retrieve the unsent data from the WriteDataContainer.
300  */
302  {
303  return data_container_->get_unsent_data(list);
304  }
305 
307  {
308  return data_container_->get_resend_data();
309  }
310 
311  /**
312  * Accessor of the repository id of the domain participant.
313  */
314  GUID_t get_dp_id();
315 
316  /**
317  * Delegate to WriteDataContainer to unregister all instances.
318  */
319  void unregister_all();
320 
321  /**
322  * This is called by transport to notify that the sample is
323  * delivered and it is delegated to WriteDataContainer
324  * to adjust the internal data sample threads.
325  */
326  void data_delivered(const DataSampleElement* sample);
327 
328  void transport_discovery_change();
329 
330  /**
331  * This is called by transport to notify that the control
332  * message is delivered.
333  */
334  void control_delivered(const Message_Block_Ptr& sample);
335 
336  /// Does this writer have samples to be acknowledged?
337  bool should_ack() const;
338 
339  /// Create an AckToken for ack operations.
340  AckToken create_ack_token(DDS::Duration_t max_wait) const;
341 
342  virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const;
343 
344  virtual bool check_transport_qos(const TransportInst& inst);
345 
346 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
347 
348  /// Are coherent changes pending?
349  bool coherent_changes_pending();
350 
351  /// Starts a coherent change set; should only be called once.
352  void begin_coherent_changes();
353 
354  /// Ends a coherent change set; should only be called once.
355  void end_coherent_changes(const GroupCoherentSamples& group_samples);
356 
357 #endif
358 
359  /**
360  * Get associated topic type name.
361  */
362  char const* get_type_name() const;
363 
364  /**
365  * This mothod is called by transport to notify the instance
366  * sample is dropped and it delegates to WriteDataContainer
367  * to update the internal list.
368  */
369  void data_dropped(const DataSampleElement* element,
370  bool dropped_by_transport);
371 
372  /**
373  * This is called by transport to notify that the control
374  * message is dropped.
375  */
376  void control_dropped(const Message_Block_Ptr& sample,
377  bool dropped_by_transport);
378 
379  /**
380  * Accessor of the WriterDataContainer's lock.
381  */
383  {
384  return data_container_->lock_;
385  }
386 
387  /**
388  * This is used to retrieve the listener for a certain status
389  * change.
390  *
391  * If this datawriter has a registered listener and the status
392  * kind is in the listener mask then the listener is returned.
393  * Otherwise, the query for the listener is propagated up to the
394  * factory/publisher.
395  */
396  DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind);
397 
398  /// Handle the assert liveliness timeout.
399  virtual int handle_timeout(const ACE_Time_Value &tv,
400  const void *arg);
401 
402  /// Called by the PublisherImpl to indicate that the Publisher is now
403  /// resumed and any data collected while it was suspended should now be sent.
404  void send_suspended_data();
405 
406  void remove_all_associations();
407 
408  virtual void register_for_reader(const GUID_t& participant,
409  const GUID_t& writerid,
410  const GUID_t& readerid,
411  const TransportLocatorSeq& locators,
412  DiscoveryListener* listener);
413 
414  virtual void unregister_for_reader(const GUID_t& participant,
415  const GUID_t& writerid,
416  const GUID_t& readerid);
417 
418  virtual void update_locators(const GUID_t& remote,
419  const TransportLocatorSeq& locators);
420 
421  void notify_publication_disconnected(const ReaderIdSeq& subids);
422  void notify_publication_reconnected(const ReaderIdSeq& subids);
423  void notify_publication_lost(const ReaderIdSeq& subids);
424 
425  /// Statistics counter.
428 
430 
431  /**
432  * This method create a header message block and chain with
433  * the sample data. The header contains the information
434  * needed. e.g. message id, length of whole message...
435  * The fast allocator is used to allocate the message block,
436  * data block and header.
437  */
439  create_sample_data_message(Message_Block_Ptr data,
440  DDS::InstanceHandle_t instance_handle,
441  DataSampleHeader& header_data,
442  Message_Block_Ptr& message,
443  const DDS::Time_t& source_timestamp,
444  bool content_filter);
445 
446 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
447  /// Make sent data available beyond the lifetime of this
448  /// @c DataWriter.
449  bool persist_data();
450 #endif
451 
452  /// Wait for pending data and control messages to drain.
453  void wait_pending();
454 
455  /**
456  * Set deadline to complete wait_pending by. If 0, then wait_pending will
457  * wait indefinitely if needed.
458  */
459  void set_wait_pending_deadline(const MonotonicTimePoint& deadline);
460 
461  /**
462  * Get an instance handle for a new instance.
463  */
464  DDS::InstanceHandle_t get_next_handle();
465 
466  virtual RcHandle<EntityImpl> parent() const;
467 
468 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
469  bool filter_out(const DataSampleElement& elt,
470  const OPENDDS_STRING& filterClassName,
471  const FilterEvaluator& evaluator,
472  const DDS::StringSeq& expression_params) const;
473 #endif
474 
476  {
477  return db_lock_pool_->get_lock();
478  }
479 
480  /**
481  * Attempt to locate an existing instance for the given handle.
482  */
483  PublicationInstance_rch get_handle_instance(
484  DDS::InstanceHandle_t handle);
485 
486  virtual WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
487 
488  GUID_t get_guid() const
489  {
491  return publication_id_;
492  }
493 
495  {
496  ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
497  return sequence_number_;
498  }
499 
501  {
502  return dynamic_cast<const ValueDispatcher*>(type_support_);
503  }
504 
508  const Sample& sample, const DDS::Time_t& timestamp);
510  const Sample& sample,
511  DDS::InstanceHandle_t instance_handle,
512  const DDS::Time_t& timestamp);
514  const Sample& sample,
515  DDS::InstanceHandle_t instance_handle,
516  const DDS::Time_t& source_timestamp);
517 
518 protected:
519 
521  {
523  if (GUID_UNKNOWN == publication_id_) {
524  publication_id_ = id;
525  }
526  }
527 
529  {
530  ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
531  return get_next_sn_i();
532  }
533 
535  {
536  if (sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
537  sequence_number_ = SequenceNumber();
538  } else {
539  ++sequence_number_;
540  }
541  return sequence_number_;
542  }
543 
544  // Perform cast to get extended version of listener (otherwise nil)
545  DataWriterListener_ptr get_ext_listener();
546 
547  DDS::ReturnCode_t wait_for_specific_ack(const AckToken& token);
548 
549  void prepare_to_delete();
550 
551  /**
552  * Setup CDR serialization options.
553  */
554  DDS::ReturnCode_t setup_serialization();
555 
556  ACE_Message_Block* serialize_sample(const Sample& sample);
557 
558  /// The number of chunks for the cached allocator.
559  size_t n_chunks_;
560 
561  /// The multiplier for allocators affected by associations
563 
564 
565  /// The type name of associated topic.
567 
568  /// The qos policy list of this datawriter.
570  /// The qos policy passed in by the user.
571  /// Differs from qos_ because representation has been interpreted.
573 
574  /// The participant servant which creats the publisher that
575  /// creates this datawriter.
577 
578  //This lock should be used to protect access to reader_info_
580 
581  struct ReaderInfo {
582 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
588 #endif
590  bool durable_;
591  ReaderInfo(const char* filter_class_name, const char* filter, const DDS::StringSeq& params,
592  WeakRcHandle<DomainParticipantImpl> participant, bool durable);
593  ~ReaderInfo();
594  };
595 
596  typedef OPENDDS_MAP_CMP(GUID_t, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
597  RepoIdToReaderInfoMap reader_info_;
598 
602  explicit AckCustomization(AckToken& at) : token_(at) {}
603  };
604 
605  virtual SendControlStatus send_control(const DataSampleHeader& header,
606  Message_Block_Ptr msg);
607 
609 
610  /**
611  * Used to hold the encoding and get the buffer sizes needed to store the
612  * results of the encoding.
613  */
614  class EncodingMode {
615  public:
617  : valid_(false)
618  , header_size_(0)
619  {
620  }
621 
622  EncodingMode(const TypeSupportImpl* ts, Encoding::Kind kind, bool swap_the_bytes)
623  : valid_(true)
624  , encoding_(kind, swap_the_bytes)
625  , header_size_(encoding_.is_encapsulated() ? EncapsulationHeader::serialized_size : 0)
626  , bound_(ts->serialized_size_bound(encoding_))
627  , key_only_bound_(ts->key_only_serialized_size_bound(encoding_))
628  {
629  }
630 
631  bool valid() const
632  {
633  return valid_;
634  }
635 
636  const Encoding& encoding() const
637  {
638  return encoding_;
639  }
640 
641  bool bound() const
642  {
643  return bound_;
644  }
645 
647  {
648  return bound_ ? SerializedSizeBound(header_size_ + bound_.get()) : SerializedSizeBound();
649  }
650 
651  size_t buffer_size(const Sample& sample) const
652  {
653  const SerializedSizeBound bound = sample.key_only() ? key_only_bound_ : bound_;
654  return header_size_ + (bound ? bound.get() : sample.serialized_size(encoding_));
655  }
656 
657  private:
658  bool valid_;
660  size_t header_size_;
663  } encoding_mode_;
664 
666  {
667  return type_support_;
668  }
669 
670  DDS::ReturnCode_t instance_must_exist(
671  const char* method_name,
672  const Sample& sample,
673  DDS::InstanceHandle_t& instance_handle,
674  bool remove = false);
675 
676  DDS::ReturnCode_t get_or_create_instance_handle(
677  DDS::InstanceHandle_t& handle,
678  const Sample& sample,
679  const DDS::Time_t& source_timestamp);
680 
682  const Sample& sample,
683  DDS::InstanceHandle_t handle,
684  const DDS::Time_t& source_timestamp);
685 
686 private:
687 
688  void track_sequence_number(GUIDSeq* filter_out);
689 
690  void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
691 
692  DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle,
693  const DDS::Time_t& timestamp);
694 
695  /**
696  * This method create a header message block and chain with
697  * the registered sample. The header contains the information
698  * needed. e.g. message id, length of whole message...
699  * The fast allocator is not used for the header.
700  */
702  create_control_message(MessageId message_id,
703  DataSampleHeader& header,
704  Message_Block_Ptr data,
705  const DDS::Time_t& source_timestamp);
706 
707  /// Send the liveliness message.
708  bool send_liveliness(const MonotonicTimePoint& now);
709 
710  /// Lookup the instance handles by the subscription repo ids
711  void lookup_instance_handles(const ReaderIdSeq& ids,
712  DDS::InstanceHandleSeq& hdls);
713 
714  RcHandle<BitSubscriber> get_builtin_subscriber_proxy() const;
715 
717  {
718  return this->domain_id_;
719  }
720 
722  {
723  return this->qos_.transport_priority.value;
724  }
725 
726 #ifdef OPENDDS_SECURITY
727  DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
728 #endif
729 
730  void association_complete_i(const GUID_t& remote_id);
731 
732  void return_handle(DDS::InstanceHandle_t handle);
733 
734  friend class ::DDS_TEST; // allows tests to get at privates
735 
736 
737  // Data block local pool for this data writer.
739 
740  /// The name of associated topic.
742  /// The associated topic repository id.
744  /// The topic servant.
747 
748  /// Mutex to protect listener info
750  /// The StatusKind bit mask indicates which status condition change
751  /// can be notified by the listener of this entity.
753  /// Used to notify the entity for relevant events.
754  DDS::DataWriterListener_var listener_;
755  /// The domain id.
758  /// The publisher servant which creates this datawriter.
760  /// The repository id of this datawriter/publication.
762  /// The sequence number unique in DataWriter scope.
764  /// Mutex for sequence_number_
766  /// Flag indicating DataWriter current belongs to
767  /// a coherent change set.
768  bool coherent_;
769  /// The number of samples belonging to the current
770  /// coherent change set.
771  ACE_UINT32 coherent_samples_;
772  /// The sample data container.
774  /// The lock to protect the activate subscriptions
775  /// and status changes.
777 
778  typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
779  RepoIdToHandleMap id_to_handle_map_;
780 
782 
783  /// Status conditions.
788 
789  /// True if the writer failed to actively signal its liveliness within
790  /// its offered liveliness period.
792 
793  /**
794  * @todo The publication_lost_status_ and
795  * publication_reconnecting_status_ are left here for
796  * future use when we add get_publication_lost_status()
797  * and get_publication_reconnecting_status() methods.
798  */
799  // Statistics of the lost publications due to lost connection.
800  // PublicationLostStatus publication_lost_status_;
801  // Statistics of the publications that associates with a
802  // reconnecting datalink.
803  // PublicationReconnectingStatus publication_reconnecting_status_;
804 
805  /// The message block allocator.
807  /// The data block allocator.
809  /// The header data allocator.
812 
813  /// The orb's reactor to be used to register the liveliness
814  /// timer.
816  /// The time interval for sending liveliness message.
818  /// Timestamp of last write/dispose/assert_liveliness.
820  /// Total number of offered deadlines missed during last offered
821  /// deadline status check.
823 
824  /// Flag indicates that this datawriter is a builtin topic
825  /// datawriter.
826  bool is_bit_;
827 
828  /// The cached available data while suspending and associated transaction ids.
832 
833  /// Monitor object for this entity
835 
836  /// Periodic Monitor object for this entity
838 
839 
840  // Do we need to set the sequence repair header bit?
841  // must call prior to incrementing sequence number
842  bool need_sequence_repair();
843  bool need_sequence_repair_i() const;
844 
845  DDS::ReturnCode_t send_end_historic_samples(const GUID_t& readerId);
846  DDS::ReturnCode_t send_request_ack();
847 
849 
850  // Lock used to synchronize remove_associations calls from discovery
851  // and unregister_instances during deletion of datawriter from application
854 
856 
857  typedef OPENDDS_MAP(DDS::InstanceHandle_t, Sample_rch) InstanceHandlesToValues;
858  InstanceHandlesToValues instance_handles_to_values_;
859  typedef OPENDDS_MAP_CMP(Sample_rch, DDS::InstanceHandle_t, SampleRchCmp) InstanceValuesToHandles;
860  InstanceValuesToHandles instance_values_to_handles_;
861 
862  bool insert_instance(DDS::InstanceHandle_t handle, Sample_rch& sample);
863  InstanceValuesToHandles::iterator find_instance(const Sample& sample);
864 
865 #ifdef OPENDDS_SECURITY
866 protected:
869  DDS::DynamicType_var dynamic_type_;
870 #endif
871 };
872 
874 
875 
876 class LivenessTimer : public virtual RcEventHandler
877 {
878 public:
880  : writer_(writer)
881  {
882  }
883 
884  /// Handle the assert liveliness timeout.
885  virtual int handle_timeout(const ACE_Time_Value& tv, const void* arg);
886 
887 private:
889 };
890 
891 } // namespace DCPS
892 } // namespace OpenDDS
893 
895 
896 #endif
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_
Defines the interface for Discovery callbacks into the DataWriter.
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness.
Implements the OpenDDS::DCPS::Entity interfaces.
Definition: EntityImpl.h:37
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
ACE_CDR::Long Long
SerializedSizeBound buffer_size_bound() const
size_t n_chunks_
The number of chunks for the cached allocator.
ACE_Thread_Mutex sn_lock_
Mutex for sequence_number_.
const LogLevel::Value value
Definition: debug.cpp:61
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
CORBA::String_var topic_name_
The name of associated topic.
void set_marshal_skip_serialize(bool value)
::DDS::InstanceHandle_t register_instance_w_timestamp(in<%SCOPED%> instance, in ::DDS::Time_t timestamp)
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
SendStateDataSampleList get_resend_data()
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
SendStateDataSampleList available_data_list_
DDS::Security::PermissionsHandle participant_permissions_handle_
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
unique_ptr< DataSampleHeaderAllocator > header_allocator_
The header data allocator.
CORBA::Long get_priority_value(const AssociationData &) const
::DDS::InstanceHandle_t lookup_instance(in<%SCOPED%> instance_data)
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
GuidSet RepoIdSet
Definition: GuidUtils.h:113
DataBlockLockPool::DataBlockLock * get_db_lock()
SequenceNumber get_max_sn() const
DDS::LivelinessLostStatus liveliness_lost_status_
Status conditions.
DDS::DataWriterListener_var listener_
Used to notify the entity for relevant events.
sequence< TransportLocator > TransportLocatorSeq
MonotonicTimePoint deadline() const
MonotonicTimePoint wait_pending_deadline_
ACE_Reactor_Timer_Interface * reactor_
bool key_only() const
Definition: Sample.h:75
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
RcHandle< DataWriterImpl > DataWriterImpl_rch
ACE_Guard< ACE_Thread_Mutex > lock_
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
const ValueDispatcher * get_value_dispatcher() const
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
RepoIdToReaderInfoMap reader_info_
Implements the OpenDDS::DCPS::Publisher interfaces.
Definition: PublisherImpl.h:38
::DDS::ReturnCode_t dispose(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t instance_handle)
MessageId
One byte message id (<256)
Security::SecurityConfig_rch security_config_
Implements the DDS::Topic interface.
Definition: TopicImpl.h:37
#define OPENDDS_STRING
RcHandle< LivenessTimer > liveness_timer_
sequence< GUID_t > ReaderIdSeq
DOMAINID_TYPE_NATIVE DomainId_t
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
DDS::DataWriterQos passed_qos_
WeakRcHandle< DomainParticipantImpl > participant_servant_
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
::DDS::ReturnCode_t write_w_timestamp(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t handle, in ::DDS::Time_t source_timestamp)
GUID_t topic_id_
The associated topic repository id.
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
InstanceHandlesToValues instance_handles_to_values_
Atomic< int > data_dropped_count_
Statistics counter.
TypeSupportImpl * type_support_
DDS::DomainId_t domain_id_
The domain id.
ACE_UINT64 min_suspended_transaction_id_
The cached available data while suspending and associated transaction ids.
AckToken(const DDS::Duration_t &max_wait, const SequenceNumber &sequence)
int init(void)
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
unique_ptr< DataAllocator > data_allocator_
DDS::DomainId_t domain_id() const
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
unique_ptr< MessageBlockAllocator > mb_allocator_
The message block allocator.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
Mix-in class for DDS entities which directly use the transport layer.
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
unsigned long StatusMask
ACE_recursive_thread_mutex_t lock_
TransportPriorityQosPolicy transport_priority
EncodingMode(const TypeSupportImpl *ts, Encoding::Kind kind, bool swap_the_bytes)
unsigned long long ACE_UINT64
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
DDS::PublicationMatchedStatus publication_match_status_
virtual size_t serialized_size(const Encoding &enc) const =0
WeakRcHandle< DomainParticipantImpl > participant_
DataAllocator * data_allocator() const
Sequence number abstraction. Only allows positive 64 bit values.
TimeDuration liveliness_check_interval_
The time interval for sending liveliness message.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_UINT64 get_unsent_data(SendStateDataSampleList &list)
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
DDS::DynamicType_var dynamic_type_
TypeSupportImpl * get_type_support() const
::DDS::ReturnCode_t unregister_instance_w_timestamp(in<%SCOPED%> instance, in ::DDS::InstanceHandle_t handle, in ::DDS::Time_t timestamp)
::DDS::ReturnCode_t write(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t handle)
WeakRcHandle< DataWriterImpl > writer_
::DDS::ReturnCode_t get_key_value(inout<%SCOPED%> key_holder, in ::DDS::InstanceHandle_t handle)
size_t buffer_size(const Sample &sample) const
ACE_Thread_Mutex reader_info_lock_
ACE_Recursive_Thread_Mutex & get_lock() const
ACE_Recursive_Thread_Mutex lock_
unsigned long StatusKind
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
void check_and_set_repo_id(const GUID_t &id)
unique_ptr< DataBlockLockPool > db_lock_pool_
RepoIdToHandleMap id_to_handle_map_
InstanceValuesToHandles instance_values_to_handles_
CORBA::Long last_deadline_missed_total_count_
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
LivenessTimer(DataWriterImpl &writer)
GUID_t publication_id_
The repository id of this datawriter/publication.
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
A container for instances sample data.
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
RcHandle< WriteDataContainer > data_container_
The sample data container.
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
LivelinessQosPolicyKind
CORBA::String_var type_name_
The type name of associated topic.
unique_ptr< DataBlockAllocator > db_allocator_
The data block allocator.
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
SendControlStatus
Return code type for send_control() operations.
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)
const Encoding & encoding_
::DDS::ReturnCode_t dispose_w_timestamp(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t instance_handle, in ::DDS::Time_t source_timestamp)