OpenDDS  Snapshot(2023/04/28-20:55)
JobQueue.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_JOB_QUEUE_H
9 #define OPENDDS_DCPS_JOB_QUEUE_H
10 
11 #include "RcEventHandler.h"
12 #include "PoolAllocator.h"
13 #include "dcps_export.h"
14 
15 #include <ace/Reactor.h>
16 #include <ace/Thread_Mutex.h>
17 #include <ace/Reverse_Lock_T.h>
18 
20 
21 namespace OpenDDS {
22 namespace DCPS {
23 
24 class Job : public virtual RcObject {
25 public:
26  virtual ~Job() { }
27  virtual void execute() = 0;
28 };
30 
31 template <typename Delegate>
32 class PmfJob : public Job {
33 public:
34  typedef void (Delegate::*PMF)();
35 
37  PMF function)
38  : delegate_(delegate)
39  , function_(function)
40  {}
41 
42  virtual ~PmfJob() {}
43 
44 private:
46  PMF function_;
47 
48  void execute()
49  {
50  RcHandle<Delegate> handle = delegate_.lock();
51  if (handle) {
52  ((*handle).*function_)();
53  }
54  }
55 };
56 
58 public:
59  explicit JobQueue(ACE_Reactor* reactor);
60 
61  void enqueue(JobPtr job)
62  {
63  ACE_GUARD(ACE_Thread_Mutex, guard, mutex_);
64  const bool empty = job_queue_.empty();
65  job_queue_.push_back(job);
66  if (empty) {
67  guard.release();
68  reactor()->notify(this);
69  }
70  }
71 
72 private:
74  typedef OPENDDS_VECTOR(JobPtr) Queue;
75  Queue job_queue_;
76 
77  int handle_exception(ACE_HANDLE /*fd*/);
78 };
79 
82 
83 } // namespace DCPS
84 } // namespace OpenDDS
85 
87 
88 #endif /* OPENDDS_DCPS_JOB_QUEUE_H */
#define ACE_GUARD(MUTEX, OBJ, LOCK)
PmfJob(RcHandle< Delegate > delegate, PMF function)
Definition: JobQueue.h:36
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
virtual ~PmfJob()
Definition: JobQueue.h:42
virtual void execute()=0
void enqueue(JobPtr job)
Definition: JobQueue.h:61
int notify(ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask masks=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
RcHandle< JobQueue > JobQueue_rch
Definition: JobQueue.h:80
WeakRcHandle< Delegate > delegate_
Definition: JobQueue.h:45
RcHandle< Job > JobPtr
Definition: JobQueue.h:29
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Thread_Mutex mutex_
Definition: JobQueue.h:73
RcHandle< T > lock() const
Definition: RcObject.h:188
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
WeakRcHandle< JobQueue > JobQueue_wrch
Definition: JobQueue.h:81
virtual ~Job()
Definition: JobQueue.h:26
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28