Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_JOB_QUEUE_H 9 : #define OPENDDS_DCPS_JOB_QUEUE_H 10 : 11 : #include "RcEventHandler.h" 12 : #include "PoolAllocator.h" 13 : #include "dcps_export.h" 14 : 15 : #include <ace/Reactor.h> 16 : #include <ace/Thread_Mutex.h> 17 : #include <ace/Reverse_Lock_T.h> 18 : 19 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 20 : 21 : namespace OpenDDS { 22 : namespace DCPS { 23 : 24 : class Job : public virtual RcObject { 25 : public: 26 1 : virtual ~Job() { } 27 : virtual void execute() = 0; 28 : }; 29 : typedef RcHandle<Job> JobPtr; 30 : 31 : template <typename Delegate> 32 : class PmfJob : public Job { 33 : public: 34 : typedef void (Delegate::*PMF)(); 35 : 36 : PmfJob(RcHandle<Delegate> delegate, 37 : PMF function) 38 : : delegate_(delegate) 39 : , function_(function) 40 : {} 41 : 42 : virtual ~PmfJob() {} 43 : 44 : private: 45 : WeakRcHandle<Delegate> delegate_; 46 : PMF function_; 47 : 48 : void execute() 49 : { 50 : RcHandle<Delegate> handle = delegate_.lock(); 51 : if (handle) { 52 : ((*handle).*function_)(); 53 : } 54 : } 55 : }; 56 : 57 : class OpenDDS_Dcps_Export JobQueue : public virtual RcEventHandler { 58 : public: 59 : explicit JobQueue(ACE_Reactor* reactor); 60 : 61 1 : void enqueue(JobPtr job) 62 : { 63 1 : ACE_GUARD(ACE_Thread_Mutex, guard, mutex_); 64 1 : const bool empty = job_queue_.empty(); 65 1 : job_queue_.push_back(job); 66 1 : if (empty) { 67 1 : guard.release(); 68 1 : reactor()->notify(this); 69 : } 70 1 : } 71 : 72 : private: 73 : ACE_Thread_Mutex mutex_; 74 : typedef OPENDDS_VECTOR(JobPtr) Queue; 75 : Queue job_queue_; 76 : 77 : int handle_exception(ACE_HANDLE /*fd*/); 78 : }; 79 : 80 : typedef RcHandle<JobQueue> JobQueue_rch; 81 : typedef WeakRcHandle<JobQueue> JobQueue_wrch; 82 : 83 : } // namespace DCPS 84 : } // namespace OpenDDS 85 : 86 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 87 : 88 : #endif /* OPENDDS_DCPS_JOB_QUEUE_H */