Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H 9 : #define OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H 10 : 11 : #include "dcps_export.h" 12 : 13 : #ifndef ACE_LACKS_PRAGMA_ONCE 14 : # pragma once 15 : #endif /* ACE_LACKS_PRAGMA_ONCE */ 16 : 17 : #include "JobQueue.h" 18 : 19 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 20 : 21 : namespace OpenDDS { 22 : namespace DCPS { 23 : 24 : template <typename T> 25 : class InternalDataReader; 26 : 27 : template <typename T> 28 : class InternalDataReaderListener : public virtual RcObject { 29 : public: 30 : typedef RcHandle<InternalDataReader<T> > InternalDataReader_rch; 31 : 32 : InternalDataReaderListener() 33 : : job_(make_rch<ListenerJob>(rchandle_from(this))) 34 : {} 35 : 36 1 : explicit InternalDataReaderListener(JobQueue_rch job_queue) 37 1 : : job_(make_rch<ListenerJob>(rchandle_from(this))) 38 2 : , job_queue_(job_queue) 39 1 : {} 40 : 41 : void job_queue(JobQueue_rch job_queue) 42 : { 43 : job_queue_ = job_queue; 44 : } 45 : 46 : virtual void on_data_available(InternalDataReader_rch reader) = 0; 47 : 48 : /// @name InternalDataReader Interface 49 : /// @{ 50 1 : void schedule(InternalDataReader_rch reader) 51 : { 52 1 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 53 1 : JobQueue_rch lock = job_queue_.lock(); 54 1 : if (lock) { 55 1 : const bool enqueue = readers_.empty(); 56 1 : readers_.insert(reader); 57 1 : if (enqueue) { 58 1 : lock->enqueue(job_); 59 : } 60 : } 61 1 : } 62 : /// @} 63 : 64 : private: 65 : class ListenerJob : public Job { 66 : public: 67 1 : explicit ListenerJob(RcHandle<InternalDataReaderListener> listener) 68 1 : : listener_(listener) 69 1 : {} 70 : 71 1 : void execute() 72 : { 73 1 : RcHandle<InternalDataReaderListener> listener = listener_.lock(); 74 1 : if (listener) { 75 1 : listener->execute(); 76 : } 77 1 : } 78 : 79 : private: 80 : WeakRcHandle<InternalDataReaderListener> listener_; 81 : }; 82 : 83 : JobPtr job_; 84 : JobQueue_wrch job_queue_; 85 : typedef WeakRcHandle<InternalDataReader<T> > Reader; 86 : typedef OPENDDS_SET(Reader) ReaderSet; 87 : ReaderSet readers_; 88 : 89 : mutable ACE_Thread_Mutex mutex_; 90 : 91 1 : void execute() 92 : { 93 1 : ReaderSet readers; 94 : 95 : { 96 1 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_); 97 1 : std::swap(readers, readers_); 98 1 : } 99 : 100 2 : for (typename ReaderSet::const_iterator pos = readers.begin(), limit = readers.end(); 101 2 : pos != limit; ++pos) { 102 1 : InternalDataReader_rch reader = pos->lock(); 103 1 : if (reader) { 104 1 : on_data_available(reader); 105 : } 106 : } 107 1 : } 108 : }; 109 : 110 : } // namespace DCPS 111 : } // namespace OpenDDS 112 : 113 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 114 : 115 : #endif /* OPENDDS_DCPS_INTERNAL_DATA_READER_LISTENER_H */