Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_DISPATCH_SERVICE_H 9 : #define OPENDDS_DCPS_DISPATCH_SERVICE_H 10 : 11 : #include "ConditionVariable.h" 12 : #include "Definitions.h" 13 : #include "RcObject.h" 14 : #include "ThreadPool.h" 15 : #include "TimePoint_T.h" 16 : 17 : #include <ace/Thread_Mutex.h> 18 : 19 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 20 : 21 : namespace OpenDDS { 22 : namespace DCPS { 23 : 24 : class OpenDDS_Dcps_Export DispatchService : public virtual RcObject { 25 : public: 26 : 27 : /// Helper function for adapting arbitrary function objects (with operator()) 28 : template <typename T> 29 4189 : static void fun_ptr_proxy(void* arg) 30 : { 31 4189 : T& ref = *static_cast<T*>(arg); 32 4189 : ref(); 33 4189 : } 34 : 35 : /// Typedef for Dispatch Return Values 36 : typedef bool DispatchStatus; 37 : 38 : /// DispatchStatus Success Constant 39 : static const bool DS_SUCCESS = true; 40 : 41 : /// DispatchStatus Error Constant 42 : static const bool DS_ERROR = false; 43 : 44 : /// Typedef for Schedule Return Values 45 : typedef long TimerId; 46 : 47 : /// TimerId Failure Constant 48 : static const long TI_FAILURE = -1; 49 : 50 : typedef void (*FunPtr)(void*); 51 : typedef std::pair<FunPtr, void*> FunArgPair; 52 : typedef OPENDDS_DEQUE(FunArgPair) EventQueue; 53 : 54 : /** 55 : * Create a DispatchService 56 : * @param count the requested size of the internal thread pool 57 : */ 58 : explicit DispatchService(size_t count = 1); 59 : 60 : virtual ~DispatchService(); 61 : 62 : /** 63 : * Request shutdown of this DispatchService, which prevents sucessful future 64 : * calls to either dispatch or schedule and cancels all scheduled events. 65 : * @param immediate prevent any further dispatches from event queue, otherwise allow current queue to empty 66 : * @param pending An EventQueue object in which to store canceled events in case extra processing is required 67 : */ 68 : void shutdown(bool immediate = false, EventQueue* pending = 0); 69 : 70 : /** 71 : * Dispatch an event 72 : * @param fun the function pointer to dispatch 73 : * @param arg the argument to pass during dispatch 74 : * @return true if event successfully enqueue, otherwise false 75 : */ 76 : DispatchStatus dispatch(FunPtr fun, void* arg = 0); 77 : 78 : /// Helper function to dispatch arbitrary function objects (see fun_ptr_proxy) 79 : template <typename T> 80 4147 : DispatchStatus dispatch(T& ref) 81 : { 82 4147 : return dispatch(fun_ptr_proxy<T>, &ref); 83 : } 84 : 85 : /** 86 : * Schedule the future dispatch of an event 87 : * @param fun the function pointer to schedule 88 : * @param arg the argument to pass during dispatch 89 : * @param expiration the requested dispatch time (no earlier than) 90 : * @return -1 on a failure, otherwise the timer id for the future dispatch 91 : */ 92 : TimerId schedule(FunPtr fun, void* arg = 0, const MonotonicTimePoint& expiration = MonotonicTimePoint::now()); 93 : 94 : /// Helper function to schedule arbitrary function objects (see fun_ptr_proxy) 95 : template <typename T> 96 84 : TimerId schedule(T& ref, const MonotonicTimePoint& expiration = MonotonicTimePoint::now()) 97 : { 98 84 : return schedule(fun_ptr_proxy<T>, &ref, expiration); 99 : } 100 : 101 : /** 102 : * Cancel a scheduled event by id 103 : * @param id the scheduled timer id to cancel 104 : * @param arg if specified, arg will be set to the arg value passed in at time of scheduling 105 : * @return the number of timers successfully canceled (0 or 1) 106 : */ 107 : size_t cancel(TimerId id, void** arg = 0); 108 : 109 : /** 110 : * Cancel a scheduled event by function pointer 111 : * @param fun the function pointer of the event/s to cancel 112 : * @param arg the argument passed at time of scheduling 113 : * @return the number of timers successfully canceled (potentially several) 114 : */ 115 : size_t cancel(FunPtr fun, void* arg = 0); 116 : 117 : /// Helper function to cancel arbitrary function objects (see fun_ptr_proxy) 118 : template <typename T> 119 4 : size_t cancel(T& ref) 120 : { 121 4 : return cancel(fun_ptr_proxy<T>, &ref); 122 : } 123 : 124 : private: 125 : 126 : static ACE_THR_FUNC_RETURN run(void* arg); 127 : void run_event_loop(); 128 : 129 : typedef std::pair<FunArgPair, TimerId> TimerPair; 130 : typedef OPENDDS_MULTIMAP(MonotonicTimePoint, TimerPair) TimerQueueMap; 131 : typedef OPENDDS_MAP(TimerId, TimerQueueMap::iterator) TimerIdMap; 132 : 133 : mutable ACE_Thread_Mutex mutex_; 134 : mutable ConditionVariable<ACE_Thread_Mutex> cv_; 135 : bool allow_dispatch_; 136 : bool stop_when_empty_; 137 : bool running_; 138 : size_t running_threads_; 139 : EventQueue event_queue_; 140 : TimerQueueMap timer_queue_map_; 141 : TimerIdMap timer_id_map_; 142 : TimerId max_timer_id_; 143 : ThreadPool pool_; 144 : }; 145 : typedef RcHandle<DispatchService> DispatchService_rch; 146 : 147 : } // DCPS 148 : } // OpenDDS 149 : 150 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 151 : 152 : #endif // OPENDDS_DCPS_DISPATCH_SERVICE_H