LCOV - code coverage report
Current view: top level - DCPS - WriterInfo.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 50 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 11 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : 
       9             : #ifndef OPENDDS_DCPS_WRITERINFO_H
      10             : #define OPENDDS_DCPS_WRITERINFO_H
      11             : 
      12             : #include "Atomic.h"
      13             : #include "CoherentChangeControl.h"
      14             : #include "ConditionVariable.h"
      15             : #include "Definitions.h"
      16             : #include "DisjointSequence.h"
      17             : #include "PoolAllocator.h"
      18             : #include "RcObject.h"
      19             : #include "TimeTypes.h"
      20             : #include "transport/framework/ReceivedDataSample.h"
      21             : 
      22             : #include <dds/DdsDcpsInfoUtilsC.h>
      23             : #include <dds/DdsDcpsCoreC.h>
      24             : 
      25             : ACE_BEGIN_VERSIONED_NAMESPACE_DECL
      26             : class ACE_Reactor;
      27             : class ACE_Event_Handler;
      28             : ACE_END_VERSIONED_NAMESPACE_DECL
      29             : 
      30             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      31             : 
      32             : namespace OpenDDS {
      33             : namespace DCPS {
      34             : 
      35             : class WriterInfo;
      36             : class EndHistoricSamplesMissedSweeper;
      37             : 
      38             : class OpenDDS_Dcps_Export WriterInfoListener: public virtual RcObject
      39             : {
      40             : public:
      41             :   WriterInfoListener();
      42             :   virtual ~WriterInfoListener();
      43             : 
      44             :   GUID_t subscription_id_;
      45             : 
      46             :   /// The time interval for checking liveliness.
      47             :   /// TBD: Should this be initialized with
      48             :   ///      DDS::DURATION_INFINITE_SEC and DDS::DURATION_INFINITE_NSEC
      49             :   ///      instead of ACE_Time_Value::zero to be consistent with default
      50             :   ///      duration qos ? Or should we simply use the ACE_Time_Value::zero
      51             :   ///      to indicate the INFINITY duration ?
      52             :   TimeDuration liveliness_lease_duration_;
      53             : 
      54             :   /// tell instances when a DataWriter transitions to being alive
      55             :   /// The writer state is inout parameter, it has to be set ALIVE before
      56             :   /// handle_timeout is called since some subroutine use the state.
      57             :   virtual void writer_became_alive(WriterInfo& info,
      58             :                                    const MonotonicTimePoint& when);
      59             : 
      60             :   /// tell instances when a DataWriter transitions to DEAD
      61             :   /// The writer state is inout parameter, the state is set to DEAD
      62             :   /// when it returns.
      63             :   virtual void writer_became_dead(WriterInfo& info);
      64             : 
      65             :   /// tell instance when a DataWriter is removed.
      66             :   /// The liveliness status need update.
      67             :   virtual void writer_removed(WriterInfo& info);
      68             : };
      69             : 
      70             : typedef RcHandle<WriterInfoListener> WriterInfoListener_rch;
      71             : 
      72             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
      73             : enum Coherent_State {
      74             :   NOT_COMPLETED_YET,
      75             :   COMPLETED,
      76             :   REJECTED
      77             : };
      78             : #endif
      79             : 
      80             : /// Keeps track of a DataWriter's liveliness for a DataReader.
      81             : class OpenDDS_Dcps_Export WriterInfo : public virtual RcObject {
      82             :   friend class WriteInfoListner;
      83             : 
      84             : public:
      85             :   enum WriterState { NOT_SET, ALIVE, DEAD };
      86             :   enum TimerState { NO_TIMER = -1 };
      87             : 
      88             :   WriterInfo(const WriterInfoListener_rch& reader,
      89             :              const GUID_t& writer_id,
      90             :              const DDS::DataWriterQos& writer_qos);
      91             : 
      92             :   /// check to see if this writer is alive (called by handle_timeout).
      93             :   /// @param now next monotonic time this DataWriter will become not active (not alive)
      94             :   ///      if no sample or liveliness message is received.
      95             :   /// @returns montonic time when the Writer will become not active (if no activity)
      96             :   ///          of MonotonicTimePoint::max_value if the writer is already or became not alive
      97             :   MonotonicTimePoint check_activity(const MonotonicTimePoint& now);
      98             : 
      99             :   /// called when a sample or other activity is received from this writer.
     100             :   void received_activity(const MonotonicTimePoint& when);
     101             : 
     102             :   /// returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
     103           0 :   WriterState state() const
     104             :   {
     105           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     106           0 :     return state_;
     107           0 :   };
     108             : 
     109           0 :   void state(WriterState state)
     110             :   {
     111           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     112           0 :     state_ = state;
     113           0 :   };
     114             : 
     115           0 :   DDS::InstanceHandle_t handle() const
     116             :   {
     117           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     118           0 :     return handle_;
     119           0 :   }
     120             : 
     121           0 :   void handle(DDS::InstanceHandle_t handle)
     122             :   {
     123           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     124           0 :     handle_ = handle;
     125           0 :   };
     126             : 
     127           0 :   GUID_t writer_id() const
     128             :   {
     129           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     130           0 :     return writer_id_;
     131           0 :   }
     132             : 
     133           0 :   CORBA::Long writer_qos_ownership_strength() const
     134             :   {
     135           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     136           0 :     return writer_qos_.ownership_strength.value;
     137           0 :   }
     138             : 
     139           0 :   void writer_qos_ownership_strength(const CORBA::Long writer_qos_ownership_strength)
     140             :   {
     141           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     142           0 :     writer_qos_.ownership_strength.value = writer_qos_ownership_strength;
     143           0 :   }
     144             : 
     145             :   bool waiting_for_end_historic_samples() const
     146             :   {
     147             :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     148             :     return waiting_for_end_historic_samples_;
     149             :   }
     150             : 
     151           0 :   void waiting_for_end_historic_samples(bool waiting_for_end_historic_samples)
     152             :   {
     153           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     154           0 :     waiting_for_end_historic_samples_ = waiting_for_end_historic_samples;
     155           0 :   }
     156             : 
     157             :   SequenceNumber last_historic_seq() const
     158             :   {
     159             :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     160             :     return last_historic_seq_;
     161             :   }
     162             : 
     163             :   void last_historic_seq(const SequenceNumber& last_historic_seq)
     164             :   {
     165             :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     166             :     last_historic_seq_ = last_historic_seq;
     167             :   }
     168             : 
     169             :   const char* get_state_str() const;
     170             : 
     171             :   /// update liveliness when remove_association is called.
     172             :   void removed();
     173             : 
     174             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     175             :   Coherent_State coherent_change_received();
     176             :   void reset_coherent_info();
     177             :   void set_group_info(const CoherentChangeControl& info);
     178             :   void add_coherent_samples(const SequenceNumber& seq);
     179             :   void coherent_change(bool group_coherent, const GUID_t& publisher_id);
     180             : 
     181           0 :   bool group_coherent() const {
     182           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     183           0 :     return group_coherent_;
     184           0 :   }
     185             : 
     186           0 :   GUID_t publisher_id() const {
     187           0 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     188           0 :     return publisher_id_;
     189           0 :   }
     190             : 
     191             : #endif
     192             : 
     193             :   void clear_owner_evaluated();
     194             :   void set_owner_evaluated(DDS::InstanceHandle_t instance, bool flag);
     195             :   bool is_owner_evaluated(DDS::InstanceHandle_t instance);
     196             : 
     197             :   void schedule_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper, const ACE_Time_Value& ten_seconds);
     198             :   void cancel_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper);
     199             :   bool check_end_historic_samples(EndHistoricSamplesMissedSweeper* sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& to_deliver);
     200             :   bool check_historic(const SequenceNumber& seq, const ReceivedDataSample& sample, SequenceNumber& last_historic_seq);
     201             :   void finished_delivering_historic();
     202             : 
     203             : private:
     204             : 
     205             :   mutable ACE_Thread_Mutex mutex_;
     206             : 
     207             :   /// Timestamp of last write/dispose/assert_liveliness from this DataWriter
     208             :   MonotonicTimePoint last_liveliness_activity_time_;
     209             : 
     210             :   // Non-negative if this a durable writer which has a timer scheduled
     211             :   long historic_samples_timer_;
     212             : 
     213             :   /// Temporary holding place for samples received before
     214             :   /// the END_HISTORIC_SAMPLES control message.
     215             :   OPENDDS_MAP(SequenceNumber, ReceivedDataSample) historic_samples_;
     216             : 
     217             :   /// After receiving END_HISTORIC_SAMPLES, check for duplicates
     218             :   SequenceNumber last_historic_seq_;
     219             : 
     220             :   bool waiting_for_end_historic_samples_;
     221             : 
     222             :   bool delivering_historic_samples_;
     223             :   ConditionVariable<ACE_Thread_Mutex> delivering_historic_samples_cv_;
     224             : 
     225             :   /// State of the writer.
     226             :   WriterState state_;
     227             : 
     228             :   /// The DataReader owning this WriterInfo
     229             :   WeakRcHandle<WriterInfoListener> reader_;
     230             : 
     231             :   /// DCPSInfoRepo ID of the DataWriter
     232             :   GUID_t writer_id_;
     233             : 
     234             :   /// Writer qos
     235             :   DDS::DataWriterQos writer_qos_;
     236             : 
     237             :   /// The publication entity instance handle.
     238             :   DDS::InstanceHandle_t handle_;
     239             : 
     240             :   /// Number of received coherent changes in active change set.
     241             :   Atomic<ACE_UINT32> coherent_samples_;
     242             : 
     243             :   /// Is this writer evaluated for owner ?
     244             :   typedef OPENDDS_MAP(DDS::InstanceHandle_t, bool) OwnerEvaluateFlags;
     245             :   OwnerEvaluateFlags owner_evaluated_;
     246             : 
     247             :   /// Data to support GROUP access scope.
     248             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     249             :   bool group_coherent_;
     250             :   GUID_t publisher_id_;
     251             :   DisjointSequence coherent_sample_sequence_;
     252             :   WriterCoherentSample writer_coherent_samples_;
     253             :   GroupCoherentSamples group_coherent_samples_;
     254             : #endif
     255             : 
     256             : };
     257             : 
     258             : inline
     259             : void
     260           0 : WriterInfo::received_activity(const MonotonicTimePoint& when)
     261             : {
     262           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     263             : 
     264           0 :   last_liveliness_activity_time_ = when;
     265             : 
     266           0 :   if (state_ != ALIVE) { // NOT_SET || DEAD
     267           0 :     RcHandle<WriterInfoListener> reader = reader_.lock();
     268           0 :     guard.release();
     269           0 :     if (reader) {
     270           0 :       reader->writer_became_alive(*this, when);
     271             :     }
     272           0 :   }
     273           0 : }
     274             : 
     275             : typedef RcHandle<WriterInfo> WriterInfo_rch;
     276             : 
     277             : } // namespace DCPS
     278             : } // namespace
     279             : 
     280             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     281             : 
     282             : #endif  /* end of include guard: OPENDDS_DCPS_WRITERINFO_H */

Generated by: LCOV version 1.16