Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_SPORADIC_TASK_H 9 : #define OPENDDS_DCPS_SPORADIC_TASK_H 10 : 11 : #include "RcEventHandler.h" 12 : #include "ReactorInterceptor.h" 13 : #include "TimeSource.h" 14 : #include "Service_Participant.h" 15 : 16 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 17 : 18 : namespace OpenDDS { 19 : namespace DCPS { 20 : 21 : class SporadicTask : public virtual RcEventHandler { 22 : public: 23 14 : SporadicTask(const TimeSource& time_source, 24 : RcHandle<ReactorInterceptor> interceptor) 25 14 : : time_source_(time_source) 26 14 : , interceptor_(interceptor) 27 14 : , desired_scheduled_(false) 28 14 : , timer_id_(-1) 29 14 : , sporadic_command_(make_rch<SporadicCommand>(rchandle_from(this))) 30 : { 31 14 : reactor(interceptor->reactor()); 32 14 : } 33 : 34 14 : virtual ~SporadicTask() {} 35 : 36 15 : void schedule(const TimeDuration& delay) 37 : { 38 15 : const MonotonicTimePoint next_time = time_source_.monotonic_time_point_now() + delay; 39 : { 40 15 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 41 15 : if (!desired_scheduled_ || next_time < desired_next_time_) { 42 14 : desired_scheduled_ = true; 43 14 : desired_next_time_ = next_time; 44 14 : desired_delay_ = delay; 45 : } else { 46 1 : return; 47 : } 48 15 : } 49 : 50 14 : RcHandle<ReactorInterceptor> interceptor = interceptor_.lock(); 51 14 : if (interceptor) { 52 13 : interceptor->execute_or_enqueue(sporadic_command_); 53 1 : } else if (log_level >= LogLevel::Error) { 54 1 : ACE_ERROR((LM_ERROR, 55 : "(%P|%t) ERROR: SporadicTask::schedule: " 56 : "failed to receive ReactorInterceptor handle\n")); 57 : } 58 15 : } 59 : 60 3 : void cancel() 61 : { 62 : { 63 3 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 64 3 : if (!desired_scheduled_) { 65 1 : return; 66 : } 67 : 68 2 : desired_scheduled_ = false; 69 3 : } 70 : 71 2 : RcHandle<ReactorInterceptor> interceptor = interceptor_.lock(); 72 2 : if (interceptor) { 73 1 : interceptor->execute_or_enqueue(sporadic_command_); 74 1 : } else if (log_level >= LogLevel::Error) { 75 1 : ACE_ERROR((LM_ERROR, 76 : "(%P|%t) ERROR: SporadicTask::cancel: " 77 : "failed to receive ReactorInterceptor handle\n")); 78 : } 79 2 : } 80 : 81 : virtual void execute(const MonotonicTimePoint& now) = 0; 82 : 83 : protected: 84 2 : long get_timer_id() { return timer_id_; } 85 : 86 : private: 87 : struct SporadicCommand : public ReactorInterceptor::Command { 88 14 : explicit SporadicCommand(WeakRcHandle<SporadicTask> sporadic_task) 89 14 : : sporadic_task_(sporadic_task) 90 14 : { } 91 : 92 14 : virtual void execute() 93 : { 94 14 : RcHandle<SporadicTask> st = sporadic_task_.lock(); 95 14 : if (st) { 96 14 : st->execute_i(); 97 : } 98 14 : } 99 : 100 : WeakRcHandle<SporadicTask> sporadic_task_; 101 : }; 102 : 103 : const TimeSource& time_source_; 104 : WeakRcHandle<ReactorInterceptor> interceptor_; 105 : bool desired_scheduled_; 106 : MonotonicTimePoint desired_next_time_; 107 : TimeDuration desired_delay_; 108 : long timer_id_; 109 : MonotonicTimePoint actual_next_time_; 110 : RcHandle<SporadicCommand> sporadic_command_; 111 : mutable ACE_Thread_Mutex mutex_; 112 : 113 14 : void execute_i() 114 : { 115 14 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 116 : 117 27 : if ((!desired_scheduled_ && timer_id_ != -1) || 118 13 : (desired_scheduled_ && timer_id_ != -1 && desired_next_time_ != actual_next_time_)) { 119 2 : reactor()->cancel_timer(timer_id_); 120 2 : timer_id_ = -1; 121 : } 122 : 123 14 : if (desired_scheduled_ && timer_id_ == -1) { 124 13 : timer_id_ = reactor()->schedule_timer(this, 0, desired_delay_.value()); 125 13 : if (timer_id_ == -1) { 126 1 : if (log_level >= LogLevel::Error) { 127 1 : ACE_ERROR((LM_ERROR, 128 : "(%P|%t) ERROR: SporadicTask::execute_i: " 129 : "failed to schedule timer %p\n", "")); 130 : } 131 : } else { 132 12 : actual_next_time_ = desired_next_time_; 133 : } 134 : } 135 14 : } 136 : 137 2 : int handle_timeout(const ACE_Time_Value& tv, const void*) 138 : { 139 2 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager()); 140 : 141 2 : const MonotonicTimePoint now(tv); 142 : { 143 2 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 144 2 : desired_scheduled_ = false; 145 2 : timer_id_ = -1; 146 2 : } 147 2 : execute(now); 148 2 : return 0; 149 2 : } 150 : }; 151 : 152 : template <typename Delegate> 153 : class PmfSporadicTask : public SporadicTask { 154 : public: 155 : typedef void (Delegate::*PMF)(const MonotonicTimePoint&); 156 : 157 1 : PmfSporadicTask(const TimeSource& time_source, 158 : RcHandle<ReactorInterceptor> interceptor, 159 : RcHandle<Delegate> delegate, 160 : PMF function) 161 : : SporadicTask(time_source, interceptor) 162 1 : , delegate_(delegate) 163 1 : , function_(function) 164 1 : {} 165 : 166 : private: 167 : WeakRcHandle<Delegate> delegate_; 168 : PMF function_; 169 : 170 1 : void execute(const MonotonicTimePoint& now) 171 : { 172 1 : RcHandle<Delegate> handle = delegate_.lock(); 173 1 : if (handle) { 174 1 : ((*handle).*function_)(now); 175 : } 176 1 : } 177 : }; 178 : 179 : } // namespace DCPS 180 : } // namespace OpenDDS 181 : 182 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 183 : 184 : #endif /* OPENDDS_DCPS_SPORADIC_TASK_H */