OpenDDS  Snapshot(2023/04/28-20:55)
InternalDataReaderListener.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H
9 #define OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H
10 
11 #include "dcps_export.h"
12 
13 #ifndef ACE_LACKS_PRAGMA_ONCE
14 # pragma once
15 #endif /* ACE_LACKS_PRAGMA_ONCE */
16 
17 #include "JobQueue.h"
18 
20 
21 namespace OpenDDS {
22 namespace DCPS {
23 
24 template <typename T>
25 class InternalDataReader;
26 
27 template <typename T>
28 class InternalDataReaderListener : public virtual RcObject {
29 public:
31 
33  : job_(make_rch<ListenerJob>(rchandle_from(this)))
34  {}
35 
37  : job_(make_rch<ListenerJob>(rchandle_from(this)))
38  , job_queue_(job_queue)
39  {}
40 
42  {
44  }
45 
46  virtual void on_data_available(InternalDataReader_rch reader) = 0;
47 
48  /// @name InternalDataReader Interface
49  /// @{
50  void schedule(InternalDataReader_rch reader)
51  {
53  JobQueue_rch lock = job_queue_.lock();
54  if (lock) {
55  const bool enqueue = readers_.empty();
56  readers_.insert(reader);
57  if (enqueue) {
58  lock->enqueue(job_);
59  }
60  }
61  }
62  /// @}
63 
64 private:
65  class ListenerJob : public Job {
66  public:
68  : listener_(listener)
69  {}
70 
71  void execute()
72  {
74  if (listener) {
75  listener->execute();
76  }
77  }
78 
79  private:
81  };
82 
86  typedef OPENDDS_SET(Reader) ReaderSet;
87  ReaderSet readers_;
88 
90 
91  void execute()
92  {
93  ReaderSet readers;
94 
95  {
96  ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
97  std::swap(readers, readers_);
98  }
99 
100  for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end();
101  pos != limit; ++pos) {
102  InternalDataReader_rch reader = pos->lock();
103  if (reader) {
104  on_data_available(reader);
105  }
106  }
107  }
108 };
109 
110 } // namespace DCPS
111 } // namespace OpenDDS
112 
114 
115 #endif /* OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H */
typedef OPENDDS_SET(Reader) ReaderSet
void swap(MessageBlock &lhs, MessageBlock &rhs)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
ListenerJob(RcHandle< InternalDataReaderListener > listener)
void enqueue(JobPtr job)
Definition: JobQueue.h:61
virtual void on_data_available(InternalDataReader_rch reader)=0
void schedule(InternalDataReader_rch reader)
WeakRcHandle< InternalDataReader< T > > Reader
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
RcHandle< T > lock() const
Definition: RcObject.h:188
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
RcHandle< InternalDataReader< T > > InternalDataReader_rch
WeakRcHandle< InternalDataReaderListener > listener_