LCOV - code coverage report
Current view: top level - DCPS - DataWriterImpl.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 67 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 23 0.0 %

          Line data    Source code
       1             : /*
       2             :  * Distributed under the OpenDDS License.
       3             :  * See: http://www.opendds.org/license.html
       4             :  */
       5             : 
       6             : #ifndef OPENDDS_DCPS_DATAWRITERIMPL_H
       7             : #define OPENDDS_DCPS_DATAWRITERIMPL_H
       8             : 
       9             : #include "Atomic.h"
      10             : #include "Sample.h"
      11             : #include "DataWriterCallbacks.h"
      12             : #include "transport/framework/TransportSendListener.h"
      13             : #include "transport/framework/TransportClient.h"
      14             : #include "MessageTracker.h"
      15             : #include "DataBlockLockPool.h"
      16             : #include "PoolAllocator.h"
      17             : #include "WriteDataContainer.h"
      18             : #include "Definitions.h"
      19             : #include "DataSampleHeader.h"
      20             : #include "TopicImpl.h"
      21             : #include "Time_Helper.h"
      22             : #include "CoherentChangeControl.h"
      23             : #include "GuidUtils.h"
      24             : #include "RcEventHandler.h"
      25             : #include "unique_ptr.h"
      26             : #include "Message_Block_Ptr.h"
      27             : #include "TimeTypes.h"
      28             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
      29             : #  include "FilterEvaluator.h"
      30             : #endif
      31             : 
      32             : #include <dds/DdsDcpsDomainC.h>
      33             : #include <dds/DdsDcpsTopicC.h>
      34             : 
      35             : #include <ace/Event_Handler.h>
      36             : #include <ace/OS_NS_sys_time.h>
      37             : 
      38             : #include <memory>
      39             : 
      40             : #ifndef ACE_LACKS_PRAGMA_ONCE
      41             : #  pragma once
      42             : #endif /* ACE_LACKS_PRAGMA_ONCE */
      43             : 
      44             : class DDS_TEST;
      45             : 
      46             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      47             : 
      48             : namespace OpenDDS {
      49             : namespace DCPS {
      50             : 
      51             : class PublisherImpl;
      52             : class DomainParticipantImpl;
      53             : class Monitor;
      54             : class DataSampleElement;
      55             : class SendStateDataSampleList;
      56             : struct AssociationData;
      57             : class LivenessTimer;
      58             : 
      59             : /**
      60             :  * @class DataWriterImpl
      61             :  *
      62             :  * @brief Implements the OpenDDS::DCPS::DataWriterRemote interfaces and
      63             :  *        DDS::DataWriter interfaces.
      64             :  *
      65             :  * See the DDS specification, OMG formal/2015-04-10, for a description of
      66             :  * the interface this class is implementing.
      67             :  *
      68             :  * This class must be inherited by the type-specific datawriter which
      69             :  * is specific to the data-type associated with the topic.
      70             :  *
      71             :  * @note: This class is responsible for allocating memory for the
      72             :  *        header message block
      73             :  *        (MessageBlock + DataBlock + DataSampleHeader) and the
      74             :  *        DataSampleElement.
      75             :  *        The data-type datawriter is responsible for allocating
      76             :  *        memory for the sample data message block.
      77             :  *        (e.g. MessageBlock + DataBlock + Foo data). But it gives
      78             :  *        up ownership to this WriteDataContainer.
      79             :  */
      80             : class OpenDDS_Dcps_Export DataWriterImpl
      81             :   : public virtual LocalObject<DDS::DataWriter>
      82             :   , public virtual DataWriterCallbacks
      83             :   , public virtual EntityImpl
      84             :   , public virtual TransportClient
      85             :   , public virtual TransportSendListener
      86             : {
      87             : public:
      88             :   friend class WriteDataContainer;
      89             :   friend class PublisherImpl;
      90             : 
      91             :   typedef OPENDDS_MAP_CMP(GUID_t, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap;
      92             :   typedef Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> DataAllocator;
      93             : 
      94             :   struct AckToken {
      95             :     MonotonicTimePoint tstamp_;
      96             :     DDS::Duration_t max_wait_;
      97             :     SequenceNumber sequence_;
      98             : 
      99           0 :     AckToken(const DDS::Duration_t& max_wait,
     100             :              const SequenceNumber& sequence)
     101           0 :       : tstamp_(MonotonicTimePoint::now())
     102           0 :       , max_wait_(max_wait)
     103           0 :       , sequence_(sequence)
     104             :     {
     105           0 :     }
     106             : 
     107           0 :     ~AckToken() {}
     108             : 
     109           0 :     MonotonicTimePoint deadline() const
     110             :     {
     111           0 :       return tstamp_ + TimeDuration(max_wait_);
     112             :     }
     113             : 
     114           0 :     bool deadline_is_infinite() const
     115             :     {
     116           0 :       return max_wait_.sec == DDS::DURATION_INFINITE_SEC && max_wait_.nanosec == DDS::DURATION_INFINITE_NSEC;
     117             :     }
     118             :   };
     119             : 
     120             :   DataWriterImpl();
     121             : 
     122             :   virtual ~DataWriterImpl();
     123             : 
     124             :   void set_marshal_skip_serialize(bool value)
     125             :   {
     126             :     skip_serialize_ = value;
     127             :   }
     128             : 
     129             :   bool get_marshal_skip_serialize() const
     130             :   {
     131             :     return skip_serialize_;
     132             :   }
     133             : 
     134             :   DataAllocator* data_allocator() const
     135             :   {
     136             :     return data_allocator_.get();
     137             :   }
     138             : 
     139             :   virtual DDS::InstanceHandle_t get_instance_handle();
     140             : 
     141             :   virtual DDS::ReturnCode_t set_qos(const DDS::DataWriterQos & qos);
     142             : 
     143             :   virtual DDS::ReturnCode_t get_qos(DDS::DataWriterQos & qos);
     144             : 
     145             :   virtual DDS::ReturnCode_t set_listener(
     146             :     DDS::DataWriterListener_ptr a_listener,
     147             :     DDS::StatusMask mask);
     148             : 
     149             :   virtual DDS::DataWriterListener_ptr get_listener();
     150             : 
     151             :   virtual DDS::Topic_ptr get_topic();
     152             : 
     153             :   virtual DDS::ReturnCode_t wait_for_acknowledgments(
     154             :     const DDS::Duration_t & max_wait);
     155             : 
     156             :   virtual DDS::Publisher_ptr get_publisher();
     157             : 
     158             :   virtual DDS::ReturnCode_t get_liveliness_lost_status(
     159             :     DDS::LivelinessLostStatus & status);
     160             : 
     161             :   virtual DDS::ReturnCode_t get_offered_deadline_missed_status(
     162             :     DDS::OfferedDeadlineMissedStatus & status);
     163             : 
     164             :   virtual DDS::ReturnCode_t get_offered_incompatible_qos_status(
     165             :     DDS::OfferedIncompatibleQosStatus & status);
     166             : 
     167             :   virtual DDS::ReturnCode_t get_publication_matched_status(
     168             :     DDS::PublicationMatchedStatus & status);
     169             : 
     170             :   TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
     171             : 
     172             :   bool participant_liveliness_activity_after(const MonotonicTimePoint& tv);
     173             : 
     174             :   virtual DDS::ReturnCode_t assert_liveliness();
     175             : 
     176             :   DDS::ReturnCode_t assert_liveliness_by_participant();
     177             : 
     178             :   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
     179             :   void get_instance_handles(InstanceHandleVec& instance_handles);
     180             : 
     181             :   void get_readers(RepoIdSet& readers);
     182             : 
     183             :   virtual DDS::ReturnCode_t get_matched_subscriptions(
     184             :     DDS::InstanceHandleSeq & subscription_handles);
     185             : 
     186             : #if !defined (DDS_HAS_MINIMUM_BIT)
     187             :   virtual DDS::ReturnCode_t get_matched_subscription_data(
     188             :     DDS::SubscriptionBuiltinTopicData & subscription_data,
     189             :     DDS::InstanceHandle_t subscription_handle);
     190             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
     191             : 
     192             :   virtual DDS::ReturnCode_t enable();
     193             : 
     194             :   virtual void add_association(const GUID_t& yourId,
     195             :                                const ReaderAssociation& reader,
     196             :                                bool active);
     197             : 
     198             :   virtual void transport_assoc_done(int flags, const GUID_t& remote_id);
     199             : 
     200             :   virtual void remove_associations(const ReaderIdSeq & readers,
     201             :                                    bool callback);
     202             : 
     203             :   virtual void replay_durable_data_for(const GUID_t& remote_sub_id);
     204             : 
     205             :   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
     206             : 
     207             :   virtual void update_subscription_params(const GUID_t& readerId,
     208             :                                           const DDS::StringSeq& params);
     209             : 
     210             :   /**
     211             :    * cleanup the DataWriter.
     212             :    */
     213             :   void cleanup();
     214             : 
     215             :   /**
     216             :    * Initialize the data members.
     217             :    */
     218             :   void init(
     219             :     TopicImpl* topic_servant,
     220             :     const DDS::DataWriterQos& qos,
     221             :     DDS::DataWriterListener_ptr a_listener,
     222             :     const DDS::StatusMask& mask,
     223             :     WeakRcHandle<DomainParticipantImpl> participant_servant,
     224             :     PublisherImpl* publisher_servant);
     225             : 
     226             :   void send_all_to_flush_control(ACE_Guard<ACE_Recursive_Thread_Mutex>& guard);
     227             : 
     228             :   /**
     229             :    * Delegate to the WriteDataContainer to register
     230             :    * Must tell the transport to broadcast the registered
     231             :    * instance upon returning.
     232             :    */
     233             :   DDS::ReturnCode_t
     234             :   register_instance_i(
     235             :     DDS::InstanceHandle_t& handle,
     236             :     Message_Block_Ptr data,
     237             :     const DDS::Time_t& source_timestamp);
     238             : 
     239             :   /**
     240             :    * Delegate to the WriteDataContainer to register and tell
     241             :    * the transport to broadcast the registered instance.
     242             :    */
     243             :   DDS::ReturnCode_t
     244             :   register_instance_from_durable_data(
     245             :     DDS::InstanceHandle_t& handle,
     246             :     Message_Block_Ptr data,
     247             :     const DDS::Time_t & source_timestamp);
     248             : 
     249             :   /**
     250             :    * Delegate to the WriteDataContainer to unregister and tell
     251             :    * the transport to broadcast the unregistered instance.
     252             :    */
     253             :   DDS::ReturnCode_t
     254             :   unregister_instance_i(
     255             :     DDS::InstanceHandle_t handle,
     256             :     const DDS::Time_t & source_timestamp);
     257             : 
     258             :   /**
     259             :    * Unregister all registered instances and tell the transport
     260             :    * to broadcast the unregistered instances.
     261             :    */
     262             :   void unregister_instances(const DDS::Time_t& source_timestamp);
     263             : 
     264             :   /**
     265             :    * Delegate to the WriteDataContainer to queue the instance
     266             :    * sample and finally tell the transport to send the sample.
     267             :    * \param filter_out can either be null (if the writer can't
     268             :    *        or won't evaluate the filters), or a list of
     269             :    *        associated reader GUID_ts that should NOT get the
     270             :    *        data sample due to content filtering.
     271             :    */
     272             :   DDS::ReturnCode_t write(Message_Block_Ptr sample,
     273             :                           DDS::InstanceHandle_t handle,
     274             :                           const DDS::Time_t& source_timestamp,
     275             :                           GUIDSeq* filter_out,
     276             :                           const void* real_data);
     277             : 
     278             :   DDS::ReturnCode_t write_sample(
     279             :     const Sample& sample,
     280             :     DDS::InstanceHandle_t handle,
     281             :     const DDS::Time_t& source_timestamp,
     282             :     GUIDSeq* filter_out);
     283             : 
     284             :   /**
     285             :    * Delegate to the WriteDataContainer to dispose all data
     286             :    * samples for a given instance and tell the transport to
     287             :    * broadcast the disposed instance.
     288             :    */
     289             :   DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle,
     290             :                             const DDS::Time_t & source_timestamp);
     291             : 
     292             :   /**
     293             :    * Return the number of samples for a given instance.
     294             :    */
     295             :   DDS::ReturnCode_t num_samples(DDS::InstanceHandle_t handle,
     296             :                                 size_t&               size);
     297             : 
     298             :   /**
     299             :    * Retrieve the unsent data from the WriteDataContainer.
     300             :    */
     301           0 :   ACE_UINT64 get_unsent_data(SendStateDataSampleList& list)
     302             :   {
     303           0 :     return data_container_->get_unsent_data(list);
     304             :   }
     305             : 
     306           0 :   SendStateDataSampleList get_resend_data()
     307             :   {
     308           0 :     return data_container_->get_resend_data();
     309             :   }
     310             : 
     311             :   /**
     312             :    * Accessor of the repository id of the domain participant.
     313             :    */
     314             :   GUID_t get_dp_id();
     315             : 
     316             :   /**
     317             :    * Delegate to WriteDataContainer to unregister all instances.
     318             :    */
     319             :   void unregister_all();
     320             : 
     321             :   /**
     322             :    * This is called by transport to notify that the sample is
     323             :    * delivered and it is delegated to WriteDataContainer
     324             :    * to adjust the internal data sample threads.
     325             :    */
     326             :   void data_delivered(const DataSampleElement* sample);
     327             : 
     328             :   void transport_discovery_change();
     329             : 
     330             :   /**
     331             :    * This is called by transport to notify that the control
     332             :    * message is delivered.
     333             :    */
     334             :   void control_delivered(const Message_Block_Ptr& sample);
     335             : 
     336             :   /// Does this writer have samples to be acknowledged?
     337             :   bool should_ack() const;
     338             : 
     339             :   /// Create an AckToken for ack operations.
     340             :   AckToken create_ack_token(DDS::Duration_t max_wait) const;
     341             : 
     342             :   virtual void retrieve_inline_qos_data(TransportSendListener::InlineQosData& qos_data) const;
     343             : 
     344             :   virtual bool check_transport_qos(const TransportInst& inst);
     345             : 
     346             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     347             : 
     348             :   /// Are coherent changes pending?
     349             :   bool coherent_changes_pending();
     350             : 
     351             :   /// Starts a coherent change set; should only be called once.
     352             :   void begin_coherent_changes();
     353             : 
     354             :   /// Ends a coherent change set; should only be called once.
     355             :   void end_coherent_changes(const GroupCoherentSamples& group_samples);
     356             : 
     357             : #endif
     358             : 
     359             :   /**
     360             :    * Get associated topic type name.
     361             :    */
     362             :   char const* get_type_name() const;
     363             : 
     364             :   /**
     365             :    * This mothod is called by transport to notify the instance
     366             :    * sample is dropped and it delegates to WriteDataContainer
     367             :    * to update the internal list.
     368             :    */
     369             :   void data_dropped(const DataSampleElement* element,
     370             :                     bool dropped_by_transport);
     371             : 
     372             :   /**
     373             :    * This is called by transport to notify that the control
     374             :    * message is dropped.
     375             :    */
     376             :   void control_dropped(const Message_Block_Ptr& sample,
     377             :                        bool dropped_by_transport);
     378             : 
     379             :   /**
     380             :    * Accessor of the WriterDataContainer's lock.
     381             :    */
     382           0 :   ACE_Recursive_Thread_Mutex& get_lock() const
     383             :   {
     384           0 :     return data_container_->lock_;
     385             :   }
     386             : 
     387             :   /**
     388             :    * This is used to retrieve the listener for a certain status
     389             :    * change.
     390             :    *
     391             :    * If this datawriter has a registered listener and the status
     392             :    * kind is in the listener mask then the listener is returned.
     393             :    * Otherwise, the query for the listener is propagated up to the
     394             :    * factory/publisher.
     395             :    */
     396             :   DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind);
     397             : 
     398             :   /// Handle the assert liveliness timeout.
     399             :   virtual int handle_timeout(const ACE_Time_Value &tv,
     400             :                              const void *arg);
     401             : 
     402             :   /// Called by the PublisherImpl to indicate that the Publisher is now
     403             :   /// resumed and any data collected while it was suspended should now be sent.
     404             :   void send_suspended_data();
     405             : 
     406             :   void remove_all_associations();
     407             : 
     408             :   virtual void register_for_reader(const GUID_t& participant,
     409             :                                    const GUID_t& writerid,
     410             :                                    const GUID_t& readerid,
     411             :                                    const TransportLocatorSeq& locators,
     412             :                                    DiscoveryListener* listener);
     413             : 
     414             :   virtual void unregister_for_reader(const GUID_t& participant,
     415             :                                      const GUID_t& writerid,
     416             :                                      const GUID_t& readerid);
     417             : 
     418             :   virtual void update_locators(const GUID_t& remote,
     419             :                                const TransportLocatorSeq& locators);
     420             : 
     421             :   void notify_publication_disconnected(const ReaderIdSeq& subids);
     422             :   void notify_publication_reconnected(const ReaderIdSeq& subids);
     423             :   void notify_publication_lost(const ReaderIdSeq& subids);
     424             : 
     425             :   /// Statistics counter.
     426             :   Atomic<int> data_dropped_count_;
     427             :   Atomic<int> data_delivered_count_;
     428             : 
     429             :   MessageTracker controlTracker;
     430             : 
     431             :   /**
     432             :    * This method create a header message block and chain with
     433             :    * the sample data. The header contains the information
     434             :    * needed. e.g. message id, length of whole message...
     435             :    * The fast allocator is used to allocate the message block,
     436             :    * data block and header.
     437             :    */
     438             :   DDS::ReturnCode_t
     439             :   create_sample_data_message(Message_Block_Ptr data,
     440             :                              DDS::InstanceHandle_t instance_handle,
     441             :                              DataSampleHeader& header_data,
     442             :                              Message_Block_Ptr& message,
     443             :                              const DDS::Time_t& source_timestamp,
     444             :                              bool content_filter);
     445             : 
     446             : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
     447             :   /// Make sent data available beyond the lifetime of this
     448             :   /// @c DataWriter.
     449             :   bool persist_data();
     450             : #endif
     451             : 
     452             :   /// Wait for pending data and control messages to drain.
     453             :   void wait_pending();
     454             : 
     455             :   /**
     456             :    * Set deadline to complete wait_pending by. If 0, then wait_pending will
     457             :    * wait indefinitely if needed.
     458             :    */
     459             :   void set_wait_pending_deadline(const MonotonicTimePoint& deadline);
     460             : 
     461             :   /**
     462             :    * Get an instance handle for a new instance.
     463             :    */
     464             :   DDS::InstanceHandle_t get_next_handle();
     465             : 
     466             :   virtual RcHandle<EntityImpl> parent() const;
     467             : 
     468             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     469             :   bool filter_out(const DataSampleElement& elt,
     470             :                   const OPENDDS_STRING& filterClassName,
     471             :                   const FilterEvaluator& evaluator,
     472             :                   const DDS::StringSeq& expression_params) const;
     473             : #endif
     474             : 
     475           0 :   DataBlockLockPool::DataBlockLock* get_db_lock()
     476             :   {
     477           0 :     return db_lock_pool_->get_lock();
     478             :   }
     479             : 
     480             :   /**
     481             :    *  Attempt to locate an existing instance for the given handle.
     482             :    */
     483             :   PublicationInstance_rch get_handle_instance(
     484             :     DDS::InstanceHandle_t handle);
     485             : 
     486             :   virtual WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
     487             : 
     488           0 :   GUID_t get_guid() const
     489             :   {
     490           0 :     ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
     491           0 :     return publication_id_;
     492           0 :   }
     493             : 
     494           0 :   SequenceNumber get_max_sn() const
     495             :   {
     496           0 :     ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
     497           0 :     return sequence_number_;
     498           0 :   }
     499             : 
     500           0 :   const ValueDispatcher* get_value_dispatcher() const
     501             :   {
     502           0 :     return dynamic_cast<const ValueDispatcher*>(type_support_);
     503             :   }
     504             : 
     505             :   DDS::ReturnCode_t get_key_value(Sample_rch& sample, DDS::InstanceHandle_t handle);
     506             :   DDS::InstanceHandle_t lookup_instance(const Sample& sample);
     507             :   DDS::InstanceHandle_t register_instance_w_timestamp(
     508             :     const Sample& sample, const DDS::Time_t& timestamp);
     509             :   DDS::ReturnCode_t unregister_instance_w_timestamp(
     510             :     const Sample& sample,
     511             :     DDS::InstanceHandle_t instance_handle,
     512             :     const DDS::Time_t& timestamp);
     513             :   DDS::ReturnCode_t dispose_w_timestamp(
     514             :     const Sample& sample,
     515             :     DDS::InstanceHandle_t instance_handle,
     516             :     const DDS::Time_t& source_timestamp);
     517             : 
     518             : protected:
     519             : 
     520           0 :   void check_and_set_repo_id(const GUID_t& id)
     521             :   {
     522           0 :     ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
     523           0 :     if (GUID_UNKNOWN == publication_id_) {
     524           0 :       publication_id_ = id;
     525             :     }
     526           0 :   }
     527             : 
     528             :   SequenceNumber get_next_sn()
     529             :   {
     530             :     ACE_Guard<ACE_Thread_Mutex> guard(sn_lock_);
     531             :     return get_next_sn_i();
     532             :   }
     533             : 
     534           0 :   SequenceNumber get_next_sn_i()
     535             :   {
     536           0 :     if (sequence_number_ == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
     537           0 :       sequence_number_ = SequenceNumber();
     538             :     } else {
     539           0 :       ++sequence_number_;
     540             :     }
     541           0 :     return sequence_number_;
     542             :   }
     543             : 
     544             :   // Perform cast to get extended version of listener (otherwise nil)
     545             :   DataWriterListener_ptr get_ext_listener();
     546             : 
     547             :   DDS::ReturnCode_t wait_for_specific_ack(const AckToken& token);
     548             : 
     549             :   void prepare_to_delete();
     550             : 
     551             :   /**
     552             :    * Setup CDR serialization options.
     553             :    */
     554             :   DDS::ReturnCode_t setup_serialization();
     555             : 
     556             :   ACE_Message_Block* serialize_sample(const Sample& sample);
     557             : 
     558             :   /// The number of chunks for the cached allocator.
     559             :   size_t n_chunks_;
     560             : 
     561             :   /// The multiplier for allocators affected by associations
     562             :   size_t association_chunk_multiplier_;
     563             : 
     564             : 
     565             :   /// The type name of associated topic.
     566             :   CORBA::String_var type_name_;
     567             : 
     568             :   /// The qos policy list of this datawriter.
     569             :   DDS::DataWriterQos qos_;
     570             :   /// The qos policy passed in by the user.
     571             :   /// Differs from qos_ because representation has been interpreted.
     572             :   DDS::DataWriterQos passed_qos_;
     573             : 
     574             :   /// The participant servant which creats the publisher that
     575             :   /// creates this datawriter.
     576             :   WeakRcHandle<DomainParticipantImpl> participant_servant_;
     577             : 
     578             :   //This lock should be used to protect access to reader_info_
     579             :   ACE_Thread_Mutex reader_info_lock_;
     580             : 
     581             :   struct ReaderInfo {
     582             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     583             :     WeakRcHandle<DomainParticipantImpl> participant_;
     584             :     OPENDDS_STRING filter_class_name_;
     585             :     OPENDDS_STRING filter_;
     586             :     DDS::StringSeq expression_params_;
     587             :     RcHandle<FilterEvaluator> eval_;
     588             : #endif
     589             :     SequenceNumber expected_sequence_;
     590             :     bool durable_;
     591             :     ReaderInfo(const char* filter_class_name, const char* filter, const DDS::StringSeq& params,
     592             :                WeakRcHandle<DomainParticipantImpl> participant, bool durable);
     593             :     ~ReaderInfo();
     594             :   };
     595             : 
     596             :   typedef OPENDDS_MAP_CMP(GUID_t, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
     597             :   RepoIdToReaderInfoMap reader_info_;
     598             : 
     599             :   struct AckCustomization {
     600             :     GUIDSeq customized_;
     601             :     AckToken& token_;
     602             :     explicit AckCustomization(AckToken& at) : token_(at) {}
     603             :   };
     604             : 
     605             :   virtual SendControlStatus send_control(const DataSampleHeader& header,
     606             :                                          Message_Block_Ptr msg);
     607             : 
     608             :   bool skip_serialize_;
     609             : 
     610             :   /**
     611             :    * Used to hold the encoding and get the buffer sizes needed to store the
     612             :    * results of the encoding.
     613             :    */
     614             :   class EncodingMode {
     615             :   public:
     616           0 :     EncodingMode()
     617           0 :     : valid_(false)
     618           0 :     , header_size_(0)
     619             :     {
     620           0 :     }
     621             : 
     622           0 :     EncodingMode(const TypeSupportImpl* ts, Encoding::Kind kind, bool swap_the_bytes)
     623           0 :     : valid_(true)
     624           0 :     , encoding_(kind, swap_the_bytes)
     625           0 :     , header_size_(encoding_.is_encapsulated() ? EncapsulationHeader::serialized_size : 0)
     626           0 :     , bound_(ts->serialized_size_bound(encoding_))
     627           0 :     , key_only_bound_(ts->key_only_serialized_size_bound(encoding_))
     628             :     {
     629           0 :     }
     630             : 
     631           0 :     bool valid() const
     632             :     {
     633           0 :       return valid_;
     634             :     }
     635             : 
     636           0 :     const Encoding& encoding() const
     637             :     {
     638           0 :       return encoding_;
     639             :     }
     640             : 
     641             :     bool bound() const
     642             :     {
     643             :       return bound_;
     644             :     }
     645             : 
     646           0 :     SerializedSizeBound buffer_size_bound() const
     647             :     {
     648           0 :       return bound_ ? SerializedSizeBound(header_size_ + bound_.get()) : SerializedSizeBound();
     649             :     }
     650             : 
     651           0 :     size_t buffer_size(const Sample& sample) const
     652             :     {
     653           0 :       const SerializedSizeBound bound = sample.key_only() ? key_only_bound_ : bound_;
     654           0 :       return header_size_ + (bound ? bound.get() : sample.serialized_size(encoding_));
     655             :     }
     656             : 
     657             :   private:
     658             :     bool valid_;
     659             :     Encoding encoding_;
     660             :     size_t header_size_;
     661             :     SerializedSizeBound bound_;
     662             :     SerializedSizeBound key_only_bound_;
     663             :   } encoding_mode_;
     664             : 
     665           0 :   TypeSupportImpl* get_type_support() const
     666             :   {
     667           0 :     return type_support_;
     668             :   }
     669             : 
     670             :   DDS::ReturnCode_t instance_must_exist(
     671             :     const char* method_name,
     672             :     const Sample& sample,
     673             :     DDS::InstanceHandle_t& instance_handle,
     674             :     bool remove = false);
     675             : 
     676             :   DDS::ReturnCode_t get_or_create_instance_handle(
     677             :     DDS::InstanceHandle_t& handle,
     678             :     const Sample& sample,
     679             :     const DDS::Time_t& source_timestamp);
     680             : 
     681             :   DDS::ReturnCode_t write_w_timestamp(
     682             :     const Sample& sample,
     683             :     DDS::InstanceHandle_t handle,
     684             :     const DDS::Time_t& source_timestamp);
     685             : 
     686             : private:
     687             : 
     688             :   void track_sequence_number(GUIDSeq* filter_out);
     689             : 
     690             :   void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
     691             : 
     692             :   DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle,
     693             :                                            const DDS::Time_t& timestamp);
     694             : 
     695             :   /**
     696             :    * This method create a header message block and chain with
     697             :    * the registered sample. The header contains the information
     698             :    * needed. e.g. message id, length of whole message...
     699             :    * The fast allocator is not used for the header.
     700             :    */
     701             :   ACE_Message_Block*
     702             :   create_control_message(MessageId message_id,
     703             :                          DataSampleHeader& header,
     704             :                          Message_Block_Ptr data,
     705             :                          const DDS::Time_t& source_timestamp);
     706             : 
     707             :   /// Send the liveliness message.
     708             :   bool send_liveliness(const MonotonicTimePoint& now);
     709             : 
     710             :   /// Lookup the instance handles by the subscription repo ids
     711             :   void lookup_instance_handles(const ReaderIdSeq& ids,
     712             :                                DDS::InstanceHandleSeq& hdls);
     713             : 
     714             :   RcHandle<BitSubscriber> get_builtin_subscriber_proxy() const;
     715             : 
     716           0 :   DDS::DomainId_t domain_id() const
     717             :   {
     718           0 :     return this->domain_id_;
     719             :   }
     720             : 
     721           0 :   CORBA::Long get_priority_value(const AssociationData&) const
     722             :   {
     723           0 :     return this->qos_.transport_priority.value;
     724             :   }
     725             : 
     726             : #ifdef OPENDDS_SECURITY
     727             :   DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
     728             : #endif
     729             : 
     730             :   void association_complete_i(const GUID_t& remote_id);
     731             : 
     732             :   void return_handle(DDS::InstanceHandle_t handle);
     733             : 
     734             :   friend class ::DDS_TEST; // allows tests to get at privates
     735             : 
     736             : 
     737             :   // Data block local pool for this data writer.
     738             :   unique_ptr<DataBlockLockPool> db_lock_pool_;
     739             : 
     740             :   /// The name of associated topic.
     741             :   CORBA::String_var topic_name_;
     742             :   /// The associated topic repository id.
     743             :   GUID_t topic_id_;
     744             :   /// The topic servant.
     745             :   TopicDescriptionPtr<TopicImpl> topic_servant_;
     746             :   TypeSupportImpl* type_support_;
     747             : 
     748             :   /// Mutex to protect listener info
     749             :   ACE_Thread_Mutex listener_mutex_;
     750             :   /// The StatusKind bit mask indicates which status condition change
     751             :   /// can be notified by the listener of this entity.
     752             :   DDS::StatusMask listener_mask_;
     753             :   /// Used to notify the entity for relevant events.
     754             :   DDS::DataWriterListener_var listener_;
     755             :   /// The domain id.
     756             :   DDS::DomainId_t domain_id_;
     757             :   GUID_t dp_id_;
     758             :   /// The publisher servant which creates this datawriter.
     759             :   WeakRcHandle<PublisherImpl> publisher_servant_;
     760             :   /// The repository id of this datawriter/publication.
     761             :   GUID_t publication_id_;
     762             :   /// The sequence number unique in DataWriter scope.
     763             :   SequenceNumber sequence_number_;
     764             :   /// Mutex for sequence_number_
     765             :   mutable ACE_Thread_Mutex sn_lock_;
     766             :   /// Flag indicating DataWriter current belongs to
     767             :   /// a coherent change set.
     768             :   bool coherent_;
     769             :   /// The number of samples belonging to the current
     770             :   /// coherent change set.
     771             :   ACE_UINT32 coherent_samples_;
     772             :   /// The sample data container.
     773             :   RcHandle<WriteDataContainer> data_container_;
     774             :   /// The lock to protect the activate subscriptions
     775             :   /// and status changes.
     776             :   mutable ACE_Recursive_Thread_Mutex lock_;
     777             : 
     778             :   typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
     779             :   RepoIdToHandleMap id_to_handle_map_;
     780             : 
     781             :   RepoIdSet readers_;
     782             : 
     783             :   /// Status conditions.
     784             :   DDS::LivelinessLostStatus liveliness_lost_status_ ;
     785             :   DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_ ;
     786             :   DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_ ;
     787             :   DDS::PublicationMatchedStatus publication_match_status_ ;
     788             : 
     789             :   /// True if the writer failed to actively signal its liveliness within
     790             :   /// its offered liveliness period.
     791             :   bool liveliness_lost_;
     792             : 
     793             :   /**
     794             :    * @todo The publication_lost_status_ and
     795             :    *       publication_reconnecting_status_ are left here for
     796             :    *       future use when we add get_publication_lost_status()
     797             :    *       and get_publication_reconnecting_status() methods.
     798             :    */
     799             :   // Statistics of the lost publications due to lost connection.
     800             :   // PublicationLostStatus publication_lost_status_;
     801             :   // Statistics of the publications that associates with a
     802             :   // reconnecting datalink.
     803             :   // PublicationReconnectingStatus publication_reconnecting_status_;
     804             : 
     805             :   /// The message block allocator.
     806             :   unique_ptr<MessageBlockAllocator> mb_allocator_;
     807             :   /// The data block allocator.
     808             :   unique_ptr<DataBlockAllocator> db_allocator_;
     809             :   /// The header data allocator.
     810             :   unique_ptr<DataSampleHeaderAllocator> header_allocator_;
     811             :   unique_ptr<DataAllocator> data_allocator_;
     812             : 
     813             :   /// The orb's reactor to be used to register the liveliness
     814             :   /// timer.
     815             :   ACE_Reactor_Timer_Interface* reactor_;
     816             :   /// The time interval for sending liveliness message.
     817             :   TimeDuration liveliness_check_interval_;
     818             :   /// Timestamp of last write/dispose/assert_liveliness.
     819             :   MonotonicTimePoint last_liveliness_activity_time_;
     820             :   /// Total number of offered deadlines missed during last offered
     821             :   /// deadline status check.
     822             :   CORBA::Long last_deadline_missed_total_count_;
     823             : 
     824             :   /// Flag indicates that this datawriter is a builtin topic
     825             :   /// datawriter.
     826             :   bool is_bit_;
     827             : 
     828             :   /// The cached available data while suspending and associated transaction ids.
     829             :   ACE_UINT64 min_suspended_transaction_id_;
     830             :   ACE_UINT64 max_suspended_transaction_id_;
     831             :   SendStateDataSampleList available_data_list_;
     832             : 
     833             :   /// Monitor object for this entity
     834             :   unique_ptr<Monitor> monitor_;
     835             : 
     836             :   /// Periodic Monitor object for this entity
     837             :   unique_ptr<Monitor> periodic_monitor_;
     838             : 
     839             : 
     840             :   // Do we need to set the sequence repair header bit?
     841             :   //   must call prior to incrementing sequence number
     842             :   bool need_sequence_repair();
     843             :   bool need_sequence_repair_i() const;
     844             : 
     845             :   DDS::ReturnCode_t send_end_historic_samples(const GUID_t& readerId);
     846             :   DDS::ReturnCode_t send_request_ack();
     847             : 
     848             :   bool liveliness_asserted_;
     849             : 
     850             :   // Lock used to synchronize remove_associations calls from discovery
     851             :   // and unregister_instances during deletion of datawriter from application
     852             :   ACE_Thread_Mutex sync_unreg_rem_assocs_lock_;
     853             :   RcHandle<LivenessTimer> liveness_timer_;
     854             : 
     855             :   MonotonicTimePoint wait_pending_deadline_;
     856             : 
     857             :   typedef OPENDDS_MAP(DDS::InstanceHandle_t, Sample_rch) InstanceHandlesToValues;
     858             :   InstanceHandlesToValues instance_handles_to_values_;
     859             :   typedef OPENDDS_MAP_CMP(Sample_rch, DDS::InstanceHandle_t, SampleRchCmp) InstanceValuesToHandles;
     860             :   InstanceValuesToHandles instance_values_to_handles_;
     861             : 
     862             :   bool insert_instance(DDS::InstanceHandle_t handle, Sample_rch& sample);
     863             :   InstanceValuesToHandles::iterator find_instance(const Sample& sample);
     864             : 
     865             : #ifdef OPENDDS_SECURITY
     866             : protected:
     867             :   Security::SecurityConfig_rch security_config_;
     868             :   DDS::Security::PermissionsHandle participant_permissions_handle_;
     869             :   DDS::DynamicType_var dynamic_type_;
     870             : #endif
     871             : };
     872             : 
     873             : typedef RcHandle<DataWriterImpl> DataWriterImpl_rch;
     874             : 
     875             : 
     876             : class LivenessTimer : public virtual RcEventHandler
     877             : {
     878             : public:
     879           0 :   LivenessTimer(DataWriterImpl& writer)
     880           0 :     : writer_(writer)
     881             :   {
     882           0 :   }
     883             : 
     884             :   /// Handle the assert liveliness timeout.
     885             :   virtual int handle_timeout(const ACE_Time_Value& tv, const void* arg);
     886             : 
     887             : private:
     888             :   WeakRcHandle<DataWriterImpl> writer_;
     889             : };
     890             : 
     891             : } // namespace DCPS
     892             : } // namespace OpenDDS
     893             : 
     894             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     895             : 
     896             : #endif

Generated by: LCOV version 1.16