LCOV - code coverage report
Current view: top level - DCPS - InternalDataWriter.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 58 65 89.2 %
Date: 2023-04-30 01:32:43 Functions: 12 17 70.6 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_INTERNAL_DATA_WRITER_H
       9             : #define OPENDDS_DCPS_INTERNAL_DATA_WRITER_H
      10             : 
      11             : #include "dcps_export.h"
      12             : 
      13             : #ifndef ACE_LACKS_PRAGMA_ONCE
      14             : #  pragma once
      15             : #endif /* ACE_LACKS_PRAGMA_ONCE */
      16             : 
      17             : #include "RcObject.h"
      18             : #include "InternalDataReader.h"
      19             : 
      20             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      21             : 
      22             : namespace OpenDDS {
      23             : namespace DCPS {
      24             : 
      25             : /*
      26             :   The InternalDataWriter supports the following QoS:
      27             : 
      28             :   DurabilityQosPolicy durability; => VOLATILE, TRANSIENT_LOCAL
      29             :   DurabilityServiceQosPolicy durability_service; => None
      30             :   DeadlineQosPolicy deadline; => None
      31             :   LatencyBudgetQosPolicy latency_budget => None
      32             :   LivelinessQosPolicy liveliness => None
      33             :   ReliabilityQosPolicy reliability => RELIABLE
      34             :   DestinationOrderQosPolicy destination_order; => None
      35             :   HistoryQosPolicy history; => KEEP_LAST_HISTORY, KEEP_ALL_HISTORY
      36             :   ResourceLimitsQosPolicy resource_limits; => None
      37             :   TransportPriorityQosPolicy transport_priority; => None
      38             :   LifespanQosPolicy lifespan; => None
      39             :   UserDataQosPolicy user_data; => None
      40             :   OwnershipQosPolicy ownership; => None
      41             :   OwnershipStrengthQosPolicy ownership_strength; => None
      42             :   WriterDataLifecycleQosPolicy writer_data_lifecycle; => Yes
      43             : */
      44             : 
      45             : template <typename T>
      46             : class InternalDataWriter : public InternalEntity {
      47             : public:
      48             :   typedef RcHandle<InternalDataReader<T> > InternalDataReader_rch;
      49             :   typedef WeakRcHandle<InternalDataReader<T> > InternalDataReader_wrch;
      50             : 
      51          23 :   explicit InternalDataWriter(const DDS::DataWriterQos& qos)
      52          23 :     : qos_(qos)
      53          23 :   {}
      54             : 
      55             :   /// @name InternalTopic Interface
      56             :   /// @{
      57          16 :   void add_reader(InternalDataReader_rch reader)
      58             :   {
      59          16 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
      60          16 :     readers_.insert(reader);
      61             : 
      62          16 :     if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) {
      63           2 :       for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end();
      64           6 :            pos != limit; ++pos) {
      65           4 :         pos->second.add_reader(reader, static_rchandle_cast<InternalEntity>(rchandle_from(this)));
      66             :       }
      67             :     }
      68          16 :   }
      69             : 
      70           3 :   void remove_reader(InternalDataReader_rch reader)
      71             :   {
      72           3 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
      73           3 :     if (readers_.erase(reader)) {
      74           3 :       reader->remove_publication(static_rchandle_cast<InternalEntity>(rchandle_from(this)), qos_.writer_data_lifecycle.autodispose_unregistered_instances);
      75             :     }
      76           3 :   }
      77             : 
      78          10 :   bool has_reader(InternalDataReader_rch reader)
      79             :   {
      80          10 :     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, false);
      81          10 :     return readers_.count(reader);
      82          10 :   }
      83             : 
      84             :   InternalEntity_wrch publication_handle()
      85             :   {
      86             :     return static_rchandle_cast<InternalEntity>(rchandle_from(this));
      87             :   }
      88             :   /// @}
      89             : 
      90             :   /// @name User Interface
      91             :   /// @{
      92          65 :   void write(const T& sample)
      93             :   {
      94          65 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
      95             : 
      96          65 :     if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) {
      97          60 :       const std::pair<typename InstanceMap::iterator, bool> p = instance_map_.insert(std::make_pair(sample, SampleHolder()));
      98          60 :       p.first->second.write(sample, qos_);
      99             :     }
     100             : 
     101          70 :     for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
     102           5 :       InternalDataReader_rch reader = pos->lock();
     103           5 :       if (reader) {
     104           5 :         reader->write(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
     105             :       }
     106             :     }
     107          65 :   }
     108             : 
     109           1 :   void dispose(const T& sample)
     110             :   {
     111           1 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     112             : 
     113           1 :     if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) {
     114           0 :       typename InstanceMap::iterator pos = instance_map_.find(sample);
     115           0 :       if (pos != instance_map_.end()) {
     116           0 :         pos->second.dispose();
     117             :       }
     118             :     }
     119             : 
     120           2 :     for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
     121           1 :       InternalDataReader_rch reader = pos->lock();
     122           1 :       if (reader) {
     123           1 :         reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
     124             :       }
     125             :     }
     126           1 :   }
     127             : 
     128           2 :   void unregister_instance(const T& sample)
     129             :   {
     130           2 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     131             : 
     132           2 :     if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) {
     133           0 :       instance_map_.erase(sample);
     134             :     }
     135             : 
     136           4 :     for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
     137           2 :       InternalDataReader_rch reader = pos->lock();
     138           2 :       if (reader) {
     139           2 :         if (qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
     140           1 :           reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
     141             :         }
     142           2 :         reader->unregister_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
     143             :       }
     144             :     }
     145           2 :   }
     146             :   /// @}
     147             : 
     148             : private:
     149             :   const DDS::DataWriterQos qos_;
     150             : 
     151             :   typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet;
     152             :   ReaderSet readers_;
     153             : 
     154             :   class SampleHolder {
     155             :   public:
     156             :     bool empty() const { return samples_.empty(); }
     157             : 
     158           4 :     void add_reader(InternalDataReader_rch reader, RcHandle<InternalEntity> writer)
     159             :     {
     160           9 :       for (typename SampleList::const_iterator pos = samples_.begin(), limit = samples_.end(); pos != limit; ++pos) {
     161           5 :         reader->write(writer, *pos);
     162             :       }
     163           4 :     }
     164             : 
     165          60 :     void write(const T& sample,
     166             :                const DDS::DataWriterQos& qos)
     167             :     {
     168          60 :       samples_.push_back(sample);
     169          60 :       if (qos.history.kind == DDS::KEEP_LAST_HISTORY_QOS) {
     170          61 :         while (samples_.size() > static_cast<std::size_t>(qos.history.depth)) {
     171           1 :           samples_.pop_front();
     172             :         }
     173             :       }
     174          60 :     }
     175             : 
     176           0 :     void dispose()
     177             :     {
     178           0 :       samples_.clear();
     179           0 :     }
     180             : 
     181             :   private:
     182             :     typedef OPENDDS_LIST(T) SampleList;
     183             :     SampleList samples_;
     184             :   };
     185             : 
     186             :   typedef OPENDDS_MAP_T(T, SampleHolder) InstanceMap;
     187             :   InstanceMap instance_map_;
     188             : 
     189             :   ACE_Thread_Mutex mutex_;
     190             : };
     191             : 
     192             : } // namespace DCPS
     193             : } // namespace OpenDDS
     194             : 
     195             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     196             : 
     197             : #endif /* OPENDDS_DCPS_INTERNAL_DATA_WRITER_H */

Generated by: LCOV version 1.16