OpenDDS  Snapshot(2023/04/28-20:55)
SporadicTask.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_SPORADIC_TASK_H
9 #define OPENDDS_DCPS_SPORADIC_TASK_H
10 
11 #include "RcEventHandler.h"
12 #include "ReactorInterceptor.h"
13 #include "TimeSource.h"
14 #include "Service_Participant.h"
15 
17 
18 namespace OpenDDS {
19 namespace DCPS {
20 
21 class SporadicTask : public virtual RcEventHandler {
22 public:
23  SporadicTask(const TimeSource& time_source,
24  RcHandle<ReactorInterceptor> interceptor)
25  : time_source_(time_source)
26  , interceptor_(interceptor)
27  , desired_scheduled_(false)
28  , timer_id_(-1)
30  {
31  reactor(interceptor->reactor());
32  }
33 
34  virtual ~SporadicTask() {}
35 
36  void schedule(const TimeDuration& delay)
37  {
38  const MonotonicTimePoint next_time = time_source_.monotonic_time_point_now() + delay;
39  {
41  if (!desired_scheduled_ || next_time < desired_next_time_) {
42  desired_scheduled_ = true;
43  desired_next_time_ = next_time;
44  desired_delay_ = delay;
45  } else {
46  return;
47  }
48  }
49 
50  RcHandle<ReactorInterceptor> interceptor = interceptor_.lock();
51  if (interceptor) {
53  } else if (log_level >= LogLevel::Error) {
55  "(%P|%t) ERROR: SporadicTask::schedule: "
56  "failed to receive ReactorInterceptor handle\n"));
57  }
58  }
59 
60  void cancel()
61  {
62  {
64  if (!desired_scheduled_) {
65  return;
66  }
67 
68  desired_scheduled_ = false;
69  }
70 
71  RcHandle<ReactorInterceptor> interceptor = interceptor_.lock();
72  if (interceptor) {
74  } else if (log_level >= LogLevel::Error) {
76  "(%P|%t) ERROR: SporadicTask::cancel: "
77  "failed to receive ReactorInterceptor handle\n"));
78  }
79  }
80 
81  virtual void execute(const MonotonicTimePoint& now) = 0;
82 
83 protected:
84  long get_timer_id() { return timer_id_; }
85 
86 private:
89  : sporadic_task_(sporadic_task)
90  { }
91 
92  virtual void execute()
93  {
95  if (st) {
96  st->execute_i();
97  }
98  }
99 
101  };
102 
108  long timer_id_;
112 
113  void execute_i()
114  {
115  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
116 
117  if ((!desired_scheduled_ && timer_id_ != -1) ||
118  (desired_scheduled_ && timer_id_ != -1 && desired_next_time_ != actual_next_time_)) {
119  reactor()->cancel_timer(timer_id_);
120  timer_id_ = -1;
121  }
122 
123  if (desired_scheduled_ && timer_id_ == -1) {
124  timer_id_ = reactor()->schedule_timer(this, 0, desired_delay_.value());
125  if (timer_id_ == -1) {
126  if (log_level >= LogLevel::Error) {
128  "(%P|%t) ERROR: SporadicTask::execute_i: "
129  "failed to schedule timer %p\n", ""));
130  }
131  } else {
132  actual_next_time_ = desired_next_time_;
133  }
134  }
135  }
136 
137  int handle_timeout(const ACE_Time_Value& tv, const void*)
138  {
139  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
140 
141  const MonotonicTimePoint now(tv);
142  {
143  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
144  desired_scheduled_ = false;
145  timer_id_ = -1;
146  }
147  execute(now);
148  return 0;
149  }
150 };
151 
152 template <typename Delegate>
154 public:
155  typedef void (Delegate::*PMF)(const MonotonicTimePoint&);
156 
157  PmfSporadicTask(const TimeSource& time_source,
158  RcHandle<ReactorInterceptor> interceptor,
159  RcHandle<Delegate> delegate,
160  PMF function)
161  : SporadicTask(time_source, interceptor)
162  , delegate_(delegate)
163  , function_(function)
164  {}
165 
166 private:
169 
170  void execute(const MonotonicTimePoint& now)
171  {
172  RcHandle<Delegate> handle = delegate_.lock();
173  if (handle) {
174  ((*handle).*function_)(now);
175  }
176  }
177 };
178 
179 } // namespace DCPS
180 } // namespace OpenDDS
181 
183 
184 #endif /* OPENDDS_DCPS_SPORADIC_TASK_H */
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
int handle_timeout(const ACE_Time_Value &tv, const void *)
Definition: SporadicTask.h:137
WeakRcHandle< SporadicTask > sporadic_task_
Definition: SporadicTask.h:100
MonotonicTimePoint actual_next_time_
Definition: SporadicTask.h:109
RcHandle< SporadicCommand > sporadic_command_
Definition: SporadicTask.h:110
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
CommandPtr execute_or_enqueue(CommandPtr command)
void execute(const MonotonicTimePoint &now)
Definition: SporadicTask.h:170
const ACE_Time_Value & value() const
MonotonicTimePoint desired_next_time_
Definition: SporadicTask.h:106
WeakRcHandle< ReactorInterceptor > interceptor_
Definition: SporadicTask.h:104
void schedule(const TimeDuration &delay)
Definition: SporadicTask.h:36
virtual void reactor(ACE_Reactor *reactor)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
virtual MonotonicTimePoint monotonic_time_point_now() const
Definition: TimeSource.h:26
virtual void execute(const MonotonicTimePoint &now)=0
virtual ACE_Reactor * reactor(void) const
SporadicTask(const TimeSource &time_source, RcHandle< ReactorInterceptor > interceptor)
Definition: SporadicTask.h:23
SporadicCommand(WeakRcHandle< SporadicTask > sporadic_task)
Definition: SporadicTask.h:88
OpenDDS_Dcps_Export LogLevel log_level
WeakRcHandle< Delegate > delegate_
Definition: SporadicTask.h:167
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const TimeSource & time_source_
Definition: SporadicTask.h:103
RcHandle< T > lock() const
Definition: RcObject.h:188
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
PmfSporadicTask(const TimeSource &time_source, RcHandle< ReactorInterceptor > interceptor, RcHandle< Delegate > delegate, PMF function)
Definition: SporadicTask.h:157