LCOV - code coverage report
Current view: top level - DCPS - ReplayerImpl.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 6 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 5 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_REPLAYERIMPL_H
       9             : #define OPENDDS_DCPS_REPLAYERIMPL_H
      10             : 
      11             : #include "Replayer.h"
      12             : #include "DataWriterCallbacks.h"
      13             : #include "WriteDataContainer.h"
      14             : #include "Definitions.h"
      15             : #include "DataSampleHeader.h"
      16             : #include "TopicImpl.h"
      17             : #include "Time_Helper.h"
      18             : #include "CoherentChangeControl.h"
      19             : #include "GuidUtils.h"
      20             : #include "unique_ptr.h"
      21             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
      22             : #  include "FilterEvaluator.h"
      23             : #endif
      24             : #include "ConditionVariable.h"
      25             : #include "transport/framework/TransportSendListener.h"
      26             : #include "transport/framework/TransportClient.h"
      27             : 
      28             : #include <dds/DdsDcpsDomainC.h>
      29             : #include <dds/DdsDcpsTopicC.h>
      30             : 
      31             : #include <ace/Event_Handler.h>
      32             : #include <ace/OS_NS_sys_time.h>
      33             : #include <ace/Recursive_Thread_Mutex.h>
      34             : 
      35             : #include <memory>
      36             : 
      37             : #if !defined (ACE_LACKS_PRAGMA_ONCE)
      38             : #pragma once
      39             : #endif /* ACE_LACKS_PRAGMA_ONCE */
      40             : 
      41             : class DDS_TEST;
      42             : 
      43             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      44             : 
      45             : namespace OpenDDS {
      46             : namespace DCPS {
      47             : 
      48             : class SendStateDataSampleList;
      49             : class DataSampleElement;
      50             : 
      51             : /**
      52             :  * @class ReplayerImpl
      53             :  *
      54             :  * @brief Implementation of Replayer functionality
      55             :  *
      56             :  * This class is the implementation of the Replayer.
      57             :  * Inheritance is used to limit the applications access to
      58             :  * underlying system methods.
      59             :  */
      60             : 
      61             : class OpenDDS_Dcps_Export ReplayerImpl : public Replayer,
      62             :   public TransportClient,
      63             :   public TransportSendListener,
      64             :   public DataWriterCallbacks,
      65             :   public EntityImpl
      66             : {
      67             : public:
      68             :   ReplayerImpl();
      69             :   ~ReplayerImpl();
      70             : 
      71             :   /**
      72             :    * cleanup the DataWriter.
      73             :    */
      74             :   DDS::ReturnCode_t cleanup();
      75             : 
      76             :   /**
      77             :    * Initialize the data members.
      78             :    */
      79             :   virtual void init(
      80             :     DDS::Topic_ptr                        topic,
      81             :     TopicImpl*                            topic_servant,
      82             :     const DDS::DataWriterQos &            qos,
      83             :     ReplayerListener_rch                  a_listener,
      84             :     const DDS::StatusMask &               mask,
      85             :     OpenDDS::DCPS::DomainParticipantImpl* participant_servant,
      86             :     const DDS::PublisherQos&              publisher_qos);
      87             : 
      88             :   // Implement Replayer
      89             :   virtual DDS::ReturnCode_t write (const RawDataSample& sample );
      90             :   virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription,
      91             :                                              const RawDataSample&  sample );
      92             :   virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t    subscription,
      93             :                                              const RawDataSampleList& samples );
      94             :   virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos & publisher_qos,
      95             :                                      const DDS::DataWriterQos &  datawriter_qos);
      96             :   virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &  publisher_qos,
      97             :                                      DDS::DataWriterQos & datawriter_qos);
      98             :   virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch & a_listener,
      99             :                                           DDS::StatusMask              mask);
     100             :   virtual ReplayerListener_rch get_listener ();
     101             : 
     102             :   // Implement TransportClient
     103             :   virtual bool check_transport_qos(const TransportInst& inst);
     104             :   virtual GUID_t get_guid() const;
     105           0 :   DDS::DomainId_t domain_id() const { return this->domain_id_; }
     106             :   virtual CORBA::Long get_priority_value(const AssociationData& data) const;
     107           0 :   SequenceNumber get_max_sn() const { return sequence_number_; }
     108             : 
     109             : 
     110             :   // Implement TransportSendListener
     111             :   virtual void data_delivered(const DataSampleElement* sample);
     112             :   virtual void data_dropped(const DataSampleElement* sample,
     113             :                             bool                         dropped_by_transport);
     114             : 
     115             :   virtual void control_delivered(const Message_Block_Ptr& sample);
     116             :   virtual void control_dropped(const Message_Block_Ptr& sample,
     117             :                                bool               dropped_by_transport);
     118             : 
     119             :   virtual void notify_publication_disconnected(const ReaderIdSeq& subids);
     120             :   virtual void notify_publication_reconnected(const ReaderIdSeq& subids);
     121             :   virtual void notify_publication_lost(const ReaderIdSeq& subids);
     122             : 
     123             :   /// Statistics counter.
     124             :   int data_dropped_count_;
     125             :   int data_delivered_count_;
     126             : 
     127             : 
     128             :   virtual void retrieve_inline_qos_data(InlineQosData& qos_data) const;
     129             : 
     130             :   // implement DataWriterCallbacks
     131             :   virtual void add_association(const GUID_t&            yourId,
     132             :                                const ReaderAssociation& reader,
     133             :                                bool                     active);
     134             : 
     135             :   virtual void remove_associations(const ReaderIdSeq& readers,
     136             :                                    CORBA::Boolean     callback);
     137             : 
     138           0 :   virtual void replay_durable_data_for(const GUID_t&) {}
     139             : 
     140             :   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
     141             : 
     142             :   virtual void update_subscription_params(const GUID_t&         readerId,
     143             :                                           const DDS::StringSeq& exprParams);
     144             : 
     145             :   void remove_all_associations();
     146             : 
     147             :   virtual void register_for_reader(const GUID_t& participant,
     148             :                                    const GUID_t& writerid,
     149             :                                    const GUID_t& readerid,
     150             :                                    const TransportLocatorSeq& locators,
     151             :                                    DiscoveryListener* listener);
     152             : 
     153             :   virtual void unregister_for_reader(const GUID_t& participant,
     154             :                                      const GUID_t& writerid,
     155             :                                      const GUID_t& readerid);
     156             : 
     157           0 :   virtual DCPS::WeakRcHandle<ICE::Endpoint> get_ice_endpoint() { return DCPS::WeakRcHandle<ICE::Endpoint>(); }
     158             : 
     159             :   DDS::ReturnCode_t enable();
     160             : 
     161           0 :   DomainParticipantImpl*          participant() {
     162           0 :     return participant_servant_;
     163             :   }
     164             : 
     165             :   virtual DDS::InstanceHandle_t get_instance_handle();
     166             : 
     167             : private:
     168             :   void notify_publication_lost(const DDS::InstanceHandleSeq& handles);
     169             : 
     170             :   DDS::ReturnCode_t write (const RawDataSample* sample_array, int array_size, DDS::InstanceHandle_t* reader);
     171             : 
     172             :   DDS::ReturnCode_t
     173             :   create_sample_data_message(Message_Block_Ptr   data,
     174             :                              DataSampleHeader&   header_data,
     175             :                              Message_Block_Ptr&  message,
     176             :                              const DDS::Time_t&  source_timestamp,
     177             :                              bool                content_filter);
     178             :   bool need_sequence_repair() const;
     179             : 
     180             :   /// Lookup the instance handles by the subscription repo ids
     181             :   void lookup_instance_handles(const ReaderIdSeq&      ids,
     182             :                                DDS::InstanceHandleSeq& hdls);
     183             :   /// The number of chunks for the cached allocator.
     184             :   size_t n_chunks_;
     185             : 
     186             :   /// The multiplier for allocators affected by associations
     187             :   size_t association_chunk_multiplier_;
     188             : 
     189             :   /// The type name of associated topic.
     190             :   CORBA::String_var type_name_;
     191             : 
     192             :   /// The qos policy list of this datawriter.
     193             :   DDS::DataWriterQos qos_;
     194             :   /// The qos policy passed in by the user.
     195             :   /// Differs from qos_ because representation has been interpreted.
     196             :   DDS::DataWriterQos passed_qos_;
     197             : 
     198             :   /// The participant servant which creats the publisher that
     199             :   /// creates this datawriter.
     200             :   DomainParticipantImpl*          participant_servant_;
     201             : 
     202             :   struct ReaderInfo {
     203             :     SequenceNumber expected_sequence_;
     204             :     bool durable_;
     205             :     ReaderInfo(const char* filter, const DDS::StringSeq& params,
     206             :                DomainParticipantImpl* participant, bool durable);
     207             :     ~ReaderInfo();
     208             :   };
     209             : 
     210             :   typedef OPENDDS_MAP_CMP(GUID_t, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap;
     211             :   RepoIdToReaderInfoMap reader_info_;
     212             : 
     213             :   void association_complete_i(const GUID_t& remote_id);
     214             : 
     215             :   friend class ::DDS_TEST; // allows tests to get at privates
     216             : 
     217             :   /// The name of associated topic.
     218             :   CORBA::String_var topic_name_;
     219             :   /// The associated topic repository id.
     220             :   GUID_t topic_id_;
     221             :   /// The object reference of the associated topic.
     222             :   DDS::Topic_var topic_objref_;
     223             :   /// The topic servant.
     224             :   TopicDescriptionPtr<TopicImpl> topic_servant_;
     225             : 
     226             :   /// The StatusKind bit mask indicates which status condition change
     227             :   /// can be notified by the listener of this entity.
     228             :   DDS::StatusMask listener_mask_;
     229             :   /// Used to notify the entity for relevant events.
     230             :   ReplayerListener_rch listener_;
     231             :   /// The domain id.
     232             :   DDS::DomainId_t domain_id_;
     233             :   /// The publisher servant which creates this datawriter.
     234             :   PublisherImpl*                  publisher_servant_;
     235             :   DDS::PublisherQos publisher_qos_;
     236             : 
     237             :   /// The repository id of this datawriter/publication.
     238             :   GUID_t publication_id_;
     239             :   /// The sequence number unique in DataWriter scope.
     240             :   SequenceNumber sequence_number_;
     241             : 
     242             :   /// The sample data container.
     243             :   // WriteDataContainer*             data_container_;
     244             :   /// The lock to protect the activate subscriptions
     245             :   /// and status changes.
     246             :   ACE_Recursive_Thread_Mutex lock_;
     247             : 
     248             :   typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
     249             : 
     250             :   RepoIdToHandleMap id_to_handle_map_;
     251             : 
     252             :   RepoIdSet readers_;
     253             : 
     254             :   /// Status conditions.
     255             :   // DDS::LivelinessLostStatus liveliness_lost_status_;
     256             :   // DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_;
     257             :   DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_;
     258             :   DDS::PublicationMatchedStatus publication_match_status_;
     259             : 
     260             :   /// True if the writer failed to actively signal its liveliness within
     261             :   /// its offered liveliness period.
     262             :   // bool liveliness_lost_;
     263             : 
     264             :   /**
     265             :    * @todo The publication_lost_status_ and
     266             :    *       publication_reconnecting_status_ are left here for
     267             :    *       future use when we add get_publication_lost_status()
     268             :    *       and get_publication_reconnecting_status() methods.
     269             :    */
     270             :   // Statistics of the lost publications due to lost connection.
     271             :   // PublicationLostStatus               publication_lost_status_;
     272             :   // Statistics of the publications that associates with a
     273             :   // reconnecting datalink.
     274             :   // PublicationReconnectingStatus       publication_reconnecting_status_;
     275             : 
     276             :   // The message block allocator.
     277             :   unique_ptr<MessageBlockAllocator>     mb_allocator_;
     278             :   // The data block allocator.
     279             :   unique_ptr<DataBlockAllocator>        db_allocator_;
     280             :   // The header data allocator.
     281             :   unique_ptr<DataSampleHeaderAllocator> header_allocator_;
     282             : 
     283             :   /// The cached allocator to allocate DataSampleElement
     284             :   /// objects.
     285             :   unique_ptr<DataSampleElementAllocator> sample_list_element_allocator_;
     286             : 
     287             :   /// The orb's reactor to be used to register the liveliness
     288             :   /// timer.
     289             :   // ACE_Reactor_Timer_Interface* reactor_;
     290             :   /// The time interval for sending liveliness message.
     291             :   // ACE_Time_Value             liveliness_check_interval_;
     292             :   /// Timestamp of last write/dispose/assert_liveliness.
     293             :   // ACE_Time_Value             last_liveliness_activity_time_;
     294             :   /// Total number of offered deadlines missed during last offered
     295             :   /// deadline status check.
     296             :   // CORBA::Long last_deadline_missed_total_count_;
     297             :   /// Watchdog responsible for reporting missed offered
     298             :   /// deadlines.
     299             :   // unique_ptr<OfferedDeadlineWatchdog> watchdog_;
     300             :   /// The flag indicates whether the liveliness timer is scheduled and
     301             :   /// needs be cancelled.
     302             :   // bool                       cancel_timer_;
     303             : 
     304             :   /// Flag indicates that this datawriter is a builtin topic
     305             :   /// datawriter.
     306             :   bool is_bit_;
     307             : 
     308             :   typedef OPENDDS_MAP_CMP(GUID_t, SequenceNumber, GUID_tKeyLessThan)
     309             :   RepoIdToSequenceMap;
     310             : 
     311             :   RepoIdToSequenceMap idToSequence_;
     312             : 
     313             :   ConditionVariable<ACE_Recursive_Thread_Mutex> empty_condition_;
     314             :   int pending_write_count_;
     315             : };
     316             : 
     317             : } // namespace DCPS
     318             : } // namespace
     319             : 
     320             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     321             : 
     322             : #endif /* end of include guard: OPENDDS_DCPS_REPLAYERIMPL_H */

Generated by: LCOV version 1.16