LCOV - code coverage report
Current view: top level - DCPS - ReceivedDataElementList.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 43 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 175 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
       9             : #define OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
      10             : 
      11             : #include "Atomic.h"
      12             : #include "dcps_export.h"
      13             : #include "DataSampleHeader.h"
      14             : #include "Definitions.h"
      15             : #include "GuidUtils.h"
      16             : #include "InstanceState.h"
      17             : #include "Time_Helper.h"
      18             : #include "unique_ptr.h"
      19             : 
      20             : #include <dds/DdsDcpsInfrastructureC.h>
      21             : 
      22             : #include "ace/Thread_Mutex.h"
      23             : 
      24             : #if !defined (ACE_LACKS_PRAGMA_ONCE)
      25             : #pragma once
      26             : #endif /* ACE_LACKS_PRAGMA_ONCE */
      27             : 
      28             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      29             : 
      30             : namespace OpenDDS {
      31             : namespace DCPS {
      32             : 
      33             : class OpenDDS_Dcps_Export ReceivedDataElement {
      34             : public:
      35           0 :   ReceivedDataElement(const DataSampleHeader& header, void *received_data, ACE_Recursive_Thread_Mutex* mx)
      36           0 :     : pub_(header.publication_id_),
      37           0 :       registered_data_(received_data),
      38           0 :       sample_state_(DDS::NOT_READ_SAMPLE_STATE),
      39             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
      40           0 :       coherent_change_(header.coherent_change_),
      41           0 :       group_coherent_(header.group_coherent_),
      42           0 :       publisher_id_(header.publisher_id_),
      43             : #endif
      44           0 :       valid_data_(received_data != 0),
      45           0 :       disposed_generation_count_(0),
      46           0 :       no_writers_generation_count_(0),
      47           0 :       zero_copy_cnt_(0),
      48           0 :       sequence_(header.sequence_),
      49           0 :       previous_data_sample_(0),
      50           0 :       next_data_sample_(0),
      51           0 :       ref_count_(1),
      52           0 :       mx_(mx)
      53             :   {
      54           0 :     destination_timestamp_ = SystemTimePoint::now().to_dds_time();
      55             : 
      56           0 :     source_timestamp_.sec = header.source_timestamp_sec_;
      57           0 :     source_timestamp_.nanosec = header.source_timestamp_nanosec_;
      58             : 
      59             :     /*
      60             :      * In some situations, we will not have data to give to the user and
      61             :      * valid_data is how we communcate that to the user through a SampleInfo.
      62             :      */
      63           0 :     if (!header.valid_data()) {
      64           0 :       valid_data_ = false;
      65             :     }
      66           0 :   }
      67             : 
      68           0 :   virtual ~ReceivedDataElement(){}
      69             : 
      70           0 :   void dec_ref()
      71             :   {
      72           0 :     if (0 == --ref_count_) {
      73           0 :       delete this;
      74             :     }
      75           0 :   }
      76             : 
      77           0 :   void inc_ref()
      78             :   {
      79           0 :     ++ref_count_;
      80           0 :   }
      81             : 
      82             :   long ref_count()
      83             :   {
      84             :     return ref_count_;
      85             :   }
      86             : 
      87             :   GUID_t pub_;
      88             : 
      89             :   /**
      90             :    * Data sample received, could only be the key fields in case we received
      91             :    * dispose and/or unregister message.
      92             :    */
      93             :   void* const registered_data_;  // ugly, but works....
      94             : 
      95             :   /// Sample state for this data sample:
      96             :   /// DDS::NOT_READ_SAMPLE_STATE/DDS::READ_SAMPLE_STATE
      97             :   DDS::SampleStateKind sample_state_;
      98             : 
      99             :   /// Source time stamp for this data sample
     100             :   DDS::Time_t source_timestamp_;
     101             : 
     102             :   /// Reception time stamp for this data sample
     103             :   DDS::Time_t destination_timestamp_;
     104             : 
     105             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     106             :   /// Sample belongs to an active coherent change set
     107             :   bool coherent_change_;
     108             : 
     109             :   /// Sample belongs to a group coherent changes.
     110             :   bool group_coherent_;
     111             : 
     112             :   /// Publisher id represent group identifier.
     113             :   GUID_t publisher_id_;
     114             : #endif
     115             : 
     116             :   /// Do we contain valid data
     117             :   bool valid_data_;
     118             : 
     119             :   /// The data sample's instance's disposed_generation_count_
     120             :   /// at the time the sample was received
     121             :   size_t disposed_generation_count_;
     122             : 
     123             :   /// The data sample's instance's no_writers_generation_count_
     124             :   /// at the time the sample was received
     125             :   size_t no_writers_generation_count_;
     126             : 
     127             :   /// This is needed to know if delete DataReader should fail with
     128             :   /// PRECONDITION_NOT_MET because there are outstanding loans.
     129             :   Atomic<long> zero_copy_cnt_;
     130             : 
     131             :   /// The data sample's sequence number
     132             :   SequenceNumber sequence_;
     133             : 
     134             :   /// the previous data sample in the ReceivedDataElementList
     135             :   ReceivedDataElement* previous_data_sample_;
     136             : 
     137             :   /// the next data sample in the ReceivedDataElementList
     138             :   ReceivedDataElement* next_data_sample_;
     139             : 
     140             :   void* operator new(size_t size, ACE_New_Allocator& pool);
     141             :   void operator delete(void* memory);
     142             :   void operator delete(void* memory, ACE_New_Allocator& pool);
     143             : 
     144             : private:
     145             :   Atomic<long> ref_count_;
     146             : protected:
     147             :   ACE_Recursive_Thread_Mutex* mx_;
     148             : }; // class ReceivedDataElement
     149             : 
     150             : struct ReceivedDataElementMemoryBlock
     151             : {
     152             :   ReceivedDataElement element_;
     153             :   ACE_New_Allocator* allocator_;
     154             : };
     155             : 
     156             : 
     157             : template <typename DataTypeWithAllocator>
     158             : class ReceivedDataElementWithType : public ReceivedDataElement
     159             : {
     160             : public:
     161           0 :   ReceivedDataElementWithType(const DataSampleHeader& header, DataTypeWithAllocator* received_data, ACE_Recursive_Thread_Mutex* mx)
     162           0 :     : ReceivedDataElement(header, received_data, mx)
     163             :   {
     164           0 :   }
     165             : 
     166           0 :   ~ReceivedDataElementWithType() {
     167           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex,
     168             :               guard,
     169             :               *mx_)
     170           0 :     delete static_cast<DataTypeWithAllocator*> (registered_data_);
     171           0 :   }
     172             : };
     173             : 
     174             : class OpenDDS_Dcps_Export ReceivedDataFilter {
     175             : public:
     176           0 :   ReceivedDataFilter() {}
     177           0 :   virtual ~ReceivedDataFilter() {}
     178             : 
     179             :   virtual bool operator()(ReceivedDataElement* data_sample) = 0;
     180             : };
     181             : 
     182             : class OpenDDS_Dcps_Export ReceivedDataOperation {
     183             : public:
     184           0 :   ReceivedDataOperation() {}
     185           0 :   virtual ~ReceivedDataOperation() {}
     186             : 
     187             :   virtual void operator()(ReceivedDataElement* data_sample) = 0;
     188             : };
     189             : 
     190             : class OpenDDS_Dcps_Export ReceivedDataElementList {
     191             : public:
     192             :   explicit ReceivedDataElementList(const DataReaderImpl_rch& reader, const InstanceState_rch& instance_state = InstanceState_rch());
     193             : 
     194             :   ~ReceivedDataElementList();
     195             : 
     196             :   void apply_all(ReceivedDataFilter& match, ReceivedDataOperation& func);
     197             : 
     198             :   // adds a data sample to the end of the list
     199             :   void add(ReceivedDataElement* data_sample);
     200             :   void add_by_timestamp(ReceivedDataElement* data_sample);
     201             : 
     202             :   // returns true if the instance was released
     203             :   bool remove(ReceivedDataElement* data_sample);
     204             : 
     205             :   // returns true if the instance was released
     206             :   bool remove(ReceivedDataFilter& match, bool eval_all);
     207             : 
     208           0 :   const ReceivedDataElement* peek_tail() { return tail_; }
     209             : 
     210             :   ReceivedDataElement* remove_head();
     211             :   ReceivedDataElement* remove_tail();
     212             : 
     213           0 :   size_t size() const { return size_; }
     214             : 
     215             :   bool has_zero_copies() const;
     216             :   bool matches(CORBA::ULong sample_states) const;
     217             :   ReceivedDataElement* get_next_match(CORBA::ULong sample_states, ReceivedDataElement* prev);
     218             : 
     219             :   void mark_read(ReceivedDataElement* item);
     220             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     221             :   void accept_coherent_change(ReceivedDataElement* item);
     222             : #endif
     223             : 
     224             : private:
     225             :   DataReaderImpl_wrch reader_;
     226             : 
     227             :   /// The first element of the list.
     228             :   ReceivedDataElement* head_;
     229             : 
     230             :   /// The last element of the list.
     231             :   ReceivedDataElement* tail_;
     232             : 
     233             :   /// Number of elements in the list.
     234             :   size_t size_;
     235             : 
     236             :   CORBA::ULong read_sample_count_;
     237             :   CORBA::ULong not_read_sample_count_;
     238             :   CORBA::ULong sample_states_;
     239             : 
     240             :   void increment_read_count();
     241             :   void decrement_read_count();
     242             :   void increment_not_read_count();
     243             :   void decrement_not_read_count();
     244             :   InstanceState_rch instance_state_;
     245             : 
     246             :   bool sanity_check();
     247             :   bool sanity_check(ReceivedDataElement* item);
     248             : }; // ReceivedDataElementList
     249             : 
     250             : } // namespace DCPS
     251             : } // namespace OpenDDS
     252             : 
     253             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     254             : 
     255             : #if defined (__ACE_INLINE__)
     256             : # include "ReceivedDataElementList.inl"
     257             : #endif  /* __ACE_INLINE__ */
     258             : 
     259             : #endif /* OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H  */

Generated by: LCOV version 1.16