OpenDDS  Snapshot(2023/04/28-20:55)
DispatchService.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_DISPATCH_SERVICE_H
9 #define OPENDDS_DCPS_DISPATCH_SERVICE_H
10 
11 #include "ConditionVariable.h"
12 #include "Definitions.h"
13 #include "RcObject.h"
14 #include "ThreadPool.h"
15 #include "TimePoint_T.h"
16 
17 #include <ace/Thread_Mutex.h>
18 
20 
21 namespace OpenDDS {
22 namespace DCPS {
23 
25 public:
26 
27  /// Helper function for adapting arbitrary function objects (with operator())
28  template <typename T>
29  static void fun_ptr_proxy(void* arg)
30  {
31  T& ref = *static_cast<T*>(arg);
32  ref();
33  }
34 
35  /// Typedef for Dispatch Return Values
36  typedef bool DispatchStatus;
37 
38  /// DispatchStatus Success Constant
39  static const bool DS_SUCCESS = true;
40 
41  /// DispatchStatus Error Constant
42  static const bool DS_ERROR = false;
43 
44  /// Typedef for Schedule Return Values
45  typedef long TimerId;
46 
47  /// TimerId Failure Constant
48  static const long TI_FAILURE = -1;
49 
50  typedef void (*FunPtr)(void*);
51  typedef std::pair<FunPtr, void*> FunArgPair;
52  typedef OPENDDS_DEQUE(FunArgPair) EventQueue;
53 
54  /**
55  * Create a DispatchService
56  * @param count the requested size of the internal thread pool
57  */
58  explicit DispatchService(size_t count = 1);
59 
60  virtual ~DispatchService();
61 
62  /**
63  * Request shutdown of this DispatchService, which prevents sucessful future
64  * calls to either dispatch or schedule and cancels all scheduled events.
65  * @param immediate prevent any further dispatches from event queue, otherwise allow current queue to empty
66  * @param pending An EventQueue object in which to store canceled events in case extra processing is required
67  */
68  void shutdown(bool immediate = false, EventQueue* pending = 0);
69 
70  /**
71  * Dispatch an event
72  * @param fun the function pointer to dispatch
73  * @param arg the argument to pass during dispatch
74  * @return true if event successfully enqueue, otherwise false
75  */
76  DispatchStatus dispatch(FunPtr fun, void* arg = 0);
77 
78  /// Helper function to dispatch arbitrary function objects (see fun_ptr_proxy)
79  template <typename T>
80  DispatchStatus dispatch(T& ref)
81  {
82  return dispatch(fun_ptr_proxy<T>, &ref);
83  }
84 
85  /**
86  * Schedule the future dispatch of an event
87  * @param fun the function pointer to schedule
88  * @param arg the argument to pass during dispatch
89  * @param expiration the requested dispatch time (no earlier than)
90  * @return -1 on a failure, otherwise the timer id for the future dispatch
91  */
92  TimerId schedule(FunPtr fun, void* arg = 0, const MonotonicTimePoint& expiration = MonotonicTimePoint::now());
93 
94  /// Helper function to schedule arbitrary function objects (see fun_ptr_proxy)
95  template <typename T>
96  TimerId schedule(T& ref, const MonotonicTimePoint& expiration = MonotonicTimePoint::now())
97  {
98  return schedule(fun_ptr_proxy<T>, &ref, expiration);
99  }
100 
101  /**
102  * Cancel a scheduled event by id
103  * @param id the scheduled timer id to cancel
104  * @param arg if specified, arg will be set to the arg value passed in at time of scheduling
105  * @return the number of timers successfully canceled (0 or 1)
106  */
107  size_t cancel(TimerId id, void** arg = 0);
108 
109  /**
110  * Cancel a scheduled event by function pointer
111  * @param fun the function pointer of the event/s to cancel
112  * @param arg the argument passed at time of scheduling
113  * @return the number of timers successfully canceled (potentially several)
114  */
115  size_t cancel(FunPtr fun, void* arg = 0);
116 
117  /// Helper function to cancel arbitrary function objects (see fun_ptr_proxy)
118  template <typename T>
119  size_t cancel(T& ref)
120  {
121  return cancel(fun_ptr_proxy<T>, &ref);
122  }
123 
124 private:
125 
126  static ACE_THR_FUNC_RETURN run(void* arg);
127  void run_event_loop();
128 
129  typedef std::pair<FunArgPair, TimerId> TimerPair;
130  typedef OPENDDS_MULTIMAP(MonotonicTimePoint, TimerPair) TimerQueueMap;
131  typedef OPENDDS_MAP(TimerId, TimerQueueMap::iterator) TimerIdMap;
132 
137  bool running_;
139  EventQueue event_queue_;
140  TimerQueueMap timer_queue_map_;
141  TimerIdMap timer_id_map_;
142  TimerId max_timer_id_;
144 };
146 
147 } // DCPS
148 } // OpenDDS
149 
151 
152 #endif // OPENDDS_DCPS_DISPATCH_SERVICE_H
ConditionVariable< ACE_Thread_Mutex > cv_
RcHandle< DispatchService > DispatchService_rch
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
std::pair< FunArgPair, TimerId > TimerPair
long TimerId
Typedef for Schedule Return Values.
#define OPENDDS_DEQUE(T)
std::pair< FunPtr, void * > FunArgPair
#define OPENDDS_MULTIMAP(K, T)
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
bool DispatchStatus
Typedef for Dispatch Return Values.
DispatchStatus dispatch(T &ref)
Helper function to dispatch arbitrary function objects (see fun_ptr_proxy)
TimerId schedule(T &ref, const MonotonicTimePoint &expiration=MonotonicTimePoint::now())
Helper function to schedule arbitrary function objects (see fun_ptr_proxy)
size_t cancel(T &ref)
Helper function to cancel arbitrary function objects (see fun_ptr_proxy)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static void fun_ptr_proxy(void *arg)
Helper function for adapting arbitrary function objects (with operator())
int shutdown(ACE_HANDLE handle, int how)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.