LCOV - code coverage report
Current view: top level - DCPS - InternalDataReaderListener.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 32 32 100.0 %
Date: 2023-04-30 01:32:43 Functions: 5 6 83.3 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H
       9             : #define OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H
      10             : 
      11             : #include "dcps_export.h"
      12             : 
      13             : #ifndef ACE_LACKS_PRAGMA_ONCE
      14             : #  pragma once
      15             : #endif /* ACE_LACKS_PRAGMA_ONCE */
      16             : 
      17             : #include "JobQueue.h"
      18             : 
      19             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      20             : 
      21             : namespace OpenDDS {
      22             : namespace DCPS {
      23             : 
      24             : template <typename T>
      25             : class InternalDataReader;
      26             : 
      27             : template <typename T>
      28             : class InternalDataReaderListener : public virtual RcObject {
      29             : public:
      30             :   typedef RcHandle<InternalDataReader<T> > InternalDataReader_rch;
      31             : 
      32             :   InternalDataReaderListener()
      33             :     : job_(make_rch<ListenerJob>(rchandle_from(this)))
      34             :   {}
      35             : 
      36           1 :   explicit InternalDataReaderListener(JobQueue_rch job_queue)
      37           1 :     : job_(make_rch<ListenerJob>(rchandle_from(this)))
      38           2 :     , job_queue_(job_queue)
      39           1 :   {}
      40             : 
      41             :   void job_queue(JobQueue_rch job_queue)
      42             :   {
      43             :     job_queue_ = job_queue;
      44             :   }
      45             : 
      46             :   virtual void on_data_available(InternalDataReader_rch reader) = 0;
      47             : 
      48             :   /// @name InternalDataReader Interface
      49             :   /// @{
      50           1 :   void schedule(InternalDataReader_rch reader)
      51             :   {
      52           1 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
      53           1 :     JobQueue_rch lock = job_queue_.lock();
      54           1 :     if (lock) {
      55           1 :       const bool enqueue = readers_.empty();
      56           1 :       readers_.insert(reader);
      57           1 :       if (enqueue) {
      58           1 :         lock->enqueue(job_);
      59             :       }
      60             :     }
      61           1 :   }
      62             :   /// @}
      63             : 
      64             : private:
      65             :   class ListenerJob : public Job {
      66             :   public:
      67           1 :     explicit ListenerJob(RcHandle<InternalDataReaderListener> listener)
      68           1 :       : listener_(listener)
      69           1 :     {}
      70             : 
      71           1 :     void execute()
      72             :     {
      73           1 :       RcHandle<InternalDataReaderListener> listener = listener_.lock();
      74           1 :       if (listener) {
      75           1 :         listener->execute();
      76             :       }
      77           1 :     }
      78             : 
      79             :   private:
      80             :     WeakRcHandle<InternalDataReaderListener> listener_;
      81             :   };
      82             : 
      83             :   JobPtr job_;
      84             :   JobQueue_wrch job_queue_;
      85             :   typedef WeakRcHandle<InternalDataReader<T> > Reader;
      86             :   typedef OPENDDS_SET(Reader) ReaderSet;
      87             :   ReaderSet readers_;
      88             : 
      89             :   mutable ACE_Thread_Mutex mutex_;
      90             : 
      91           1 :   void execute()
      92             :   {
      93           1 :     ReaderSet readers;
      94             : 
      95             :     {
      96           1 :       ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
      97           1 :       std::swap(readers, readers_);
      98           1 :     }
      99             : 
     100           2 :     for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end();
     101           2 :          pos != limit; ++pos) {
     102           1 :       InternalDataReader_rch reader = pos->lock();
     103           1 :       if (reader) {
     104           1 :         on_data_available(reader);
     105             :       }
     106             :     }
     107           1 :   }
     108             : };
     109             : 
     110             : } // namespace DCPS
     111             : } // namespace OpenDDS
     112             : 
     113             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     114             : 
     115             : #endif /* OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H */

Generated by: LCOV version 1.16