OpenDDS  Snapshot(2023/04/28-20:55)
MultiTask.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_MULTI_TASK_H
9 #define OPENDDS_DCPS_MULTI_TASK_H
10 
11 #include "RcEventHandler.h"
12 #include "ReactorInterceptor.h"
13 #include "TimeTypes.h"
14 #include "Service_Participant.h"
15 
17 
18 namespace OpenDDS {
19 namespace DCPS {
20 
21 class MultiTask : public virtual RcEventHandler {
22 public:
23  explicit MultiTask(RcHandle<ReactorInterceptor> interceptor, const TimeDuration& delay)
24  : interceptor_(interceptor)
25  , delay_(delay)
26  , timer_(-1)
27  , next_time_()
29  {
30  reactor(interceptor->reactor());
31  }
32 
33  virtual ~MultiTask() {}
34 
35  void enable(const TimeDuration& delay)
36  {
37  bool worth_passing_along = false;
38  {
40  worth_passing_along = (timer_ == -1) || ((MonotonicTimePoint::now() + delay + cancel_estimate_) < next_time_);
41  }
42  if (worth_passing_along) {
43  RcHandle<ReactorInterceptor> interceptor = interceptor_.lock();
44  if (interceptor) {
45  interceptor->execute_or_enqueue(make_rch<ScheduleEnableCommand>(rchandle_from(this), delay));
46  }
47  }
48  }
49 
50  void disable()
51  {
52  RcHandle<ReactorInterceptor> interceptor = interceptor_.lock();
53  if (interceptor) {
54  interceptor->execute_or_enqueue(make_rch<ScheduleDisableCommand>(rchandle_from(this)));
55  }
56  }
57 
58  virtual void execute(const MonotonicTimePoint& now) = 0;
59 
60 private:
63  long timer_;
67 
70  : multi_task_(multi_task), delay_(delay)
71  { }
72 
73  virtual void execute()
74  {
75  RcHandle<MultiTask> multi_task = multi_task_.lock();
76  if (multi_task) {
77  multi_task->enable_i(delay_);
78  }
79  }
80 
83  };
84 
87  : multi_task_(multi_task)
88  { }
89 
90  virtual void execute()
91  {
92  RcHandle<MultiTask> multi_task = multi_task_.lock();
93  if (multi_task) {
94  multi_task->disable_i();
95  }
96  }
97 
99  };
100 
101  int handle_timeout(const ACE_Time_Value& tv, const void*)
102  {
103  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
104 
105  const MonotonicTimePoint now(tv);
106  {
107  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
108  next_time_ = now + delay_;
109  }
110  execute(now);
111  return 0;
112  }
113 
114  void enable_i(const TimeDuration& per)
115  {
116  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
118  if (timer_ == -1) {
119  timer_ = reactor()->schedule_timer(this, 0, per.value(), delay_.value());
120 
121  if (timer_ == -1) {
122  ACE_ERROR((LM_ERROR, "(%P|%t) MultiTask::enable"
123  " failed to schedule timer %p\n", ACE_TEXT("")));
124  } else {
125  next_time_ = now + per;
126  }
127  } else {
128  const MonotonicTimePoint estimated_next_time = now + per + cancel_estimate_;
129  if (estimated_next_time < next_time_) {
130  reactor()->cancel_timer(timer_);
132  timer_ = reactor()->schedule_timer(this, 0, per.value(), delay_.value());
133  cancel_estimate_ = now2 - now;
134 
135  if (timer_ == -1) {
136  ACE_ERROR((LM_ERROR, "(%P|%t) MultiTask::enable"
137  " failed to reschedule timer %p\n", ACE_TEXT("")));
138  } else {
139  next_time_ = now2 + per;
140  }
141  }
142  }
143  }
144 
145  void
147  {
148  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
149  if (timer_ != -1) {
150  reactor()->cancel_timer(timer_);
151  timer_ = -1;
152  }
153  }
154 };
155 
156 template <typename Delegate>
157 class PmfMultiTask : public MultiTask {
158 public:
159  typedef void (Delegate::*PMF)(const MonotonicTimePoint&);
160 
162  const TimeDuration& delay,
163  RcHandle<Delegate> delegate,
164  PMF function)
165  : MultiTask(interceptor, delay)
166  , delegate_(delegate)
167  , function_(function) {}
168 
169 private:
172 
173  void execute(const MonotonicTimePoint& now)
174  {
175  RcHandle<Delegate> handle = delegate_.lock();
176  if (handle) {
177  ((*handle).*function_)(now);
178  }
179  }
180 };
181 
182 } // namespace DCPS
183 } // namespace OpenDDS
184 
186 
187 #endif /* OPENDDS_DCPS_MULTI_TASK_H */
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
MultiTask(RcHandle< ReactorInterceptor > interceptor, const TimeDuration &delay)
Definition: MultiTask.h:23
void enable(const TimeDuration &delay)
Definition: MultiTask.h:35
CommandPtr execute_or_enqueue(CommandPtr command)
const ACE_Time_Value & value() const
WeakRcHandle< Delegate > delegate_
Definition: MultiTask.h:170
WeakRcHandle< MultiTask > const multi_task_
Definition: MultiTask.h:81
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
virtual void execute(const MonotonicTimePoint &now)=0
virtual void reactor(ACE_Reactor *reactor)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
ScheduleDisableCommand(WeakRcHandle< MultiTask > multi_task)
Definition: MultiTask.h:86
int handle_timeout(const ACE_Time_Value &tv, const void *)
Definition: MultiTask.h:101
virtual ACE_Reactor * reactor(void) const
void enable_i(const TimeDuration &per)
Definition: MultiTask.h:114
ScheduleEnableCommand(WeakRcHandle< MultiTask > multi_task, const TimeDuration &delay)
Definition: MultiTask.h:69
ACE_TEXT("TCP_Factory")
MonotonicTimePoint next_time_
Definition: MultiTask.h:64
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)
TimeDuration cancel_estimate_
Definition: MultiTask.h:65
WeakRcHandle< ReactorInterceptor > interceptor_
Definition: MultiTask.h:61
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void execute(const MonotonicTimePoint &now)
Definition: MultiTask.h:173
RcHandle< T > lock() const
Definition: RcObject.h:188
WeakRcHandle< MultiTask > const multi_task_
Definition: MultiTask.h:98
const TimeDuration delay_
Definition: MultiTask.h:62
#define TheServiceParticipant
LM_ERROR
PmfMultiTask(RcHandle< ReactorInterceptor > interceptor, const TimeDuration &delay, RcHandle< Delegate > delegate, PMF function)
Definition: MultiTask.h:161
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
ACE_Thread_Mutex mutex_
Definition: MultiTask.h:66