OpenDDS  Snapshot(2023/04/28-20:55)
PeriodicTask.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_PERIODIC_TASK_H
9 #define OPENDDS_DCPS_PERIODIC_TASK_H
10 
11 #include "RcEventHandler.h"
12 #include "ReactorInterceptor.h"
13 
15 
16 namespace OpenDDS {
17 namespace DCPS {
18 
19 class PeriodicTask : public virtual RcEventHandler {
20 public:
22  : user_enabled_(false)
23  , interceptor_(interceptor)
24  , enabled_(false)
25  {
26  reactor(interceptor->reactor());
27  }
28 
29  virtual ~PeriodicTask() {}
30 
31  void enable(bool reenable, const TimeDuration& period)
32  {
33  {
35  user_enabled_ = true;
36  }
37  RcHandle<ReactorInterceptor> interceptor = interceptor_.lock();
38  if (interceptor) {
39  interceptor->execute_or_enqueue(make_rch<ScheduleEnableCommand>(rchandle_from(this), reenable, period));
40  }
41  }
42 
43  void disable()
44  {
45  {
47  user_enabled_ = false;
48  }
49  RcHandle<ReactorInterceptor> interceptor = interceptor_.lock();
50  if (interceptor) {
51  interceptor->execute_or_enqueue(make_rch<ScheduleDisableCommand>(rchandle_from(this)));
52  }
53  }
54 
55  bool enabled() const
56  {
58  return user_enabled_;
59  }
60 
61  virtual void execute(const MonotonicTimePoint& now) = 0;
62 
63 private:
67  bool enabled_;
68 
71  : periodic_task_(hb), reenable_(reenable), period_(period)
72  { }
73 
74  virtual void execute()
75  {
76  RcHandle<PeriodicTask> periodic_task = periodic_task_.lock();
77  if (periodic_task) {
78  periodic_task->enable_i(reenable_, period_);
79  }
80  }
81 
83  const bool reenable_;
85  };
86 
89  : periodic_task_(hb)
90  { }
91 
92  virtual void execute()
93  {
94  RcHandle<PeriodicTask> periodic_task = periodic_task_.lock();
95  if (periodic_task) {
96  periodic_task->disable_i();
97  }
98  }
99 
101  };
102 
103  int handle_timeout(const ACE_Time_Value& tv, const void*)
104  {
105  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
106 
107  const MonotonicTimePoint now(tv);
108  execute(now);
109  return 0;
110  }
111 
112  void enable_i(bool reenable, const TimeDuration& per)
113  {
114  if (!enabled_) {
115  const long timer =
116  reactor()->schedule_timer(this, 0, ACE_Time_Value::zero, per.value());
117 
118  if (timer == -1) {
119  ACE_ERROR((LM_ERROR, "(%P|%t) PeriodicTask::enable"
120  " failed to schedule timer %p\n", ACE_TEXT("")));
121  } else {
122  enabled_ = true;
123  }
124  } else if (reenable) {
125  disable_i();
126  enable_i(false, per);
127  }
128  }
129 
130  void
132  {
133  if (enabled_) {
134  reactor()->cancel_timer(this);
135  enabled_ = false;
136  }
137  }
138 };
139 
140 template <typename Delegate>
142 public:
143  typedef void (Delegate::*PMF)(const MonotonicTimePoint&);
144 
145  PmfPeriodicTask(RcHandle<ReactorInterceptor> interceptor, const Delegate& delegate, PMF function)
146  : PeriodicTask(interceptor)
147  , delegate_(delegate)
148  , function_(function) {}
149 
150 private:
153 
154  void execute(const MonotonicTimePoint& now)
155  {
156  RcHandle<Delegate> handle = delegate_.lock();
157  if (handle) {
158  ((*handle).*function_)(now);
159  }
160  }
161 };
162 
163 } // namespace DCPS
164 } // namespace OpenDDS
165 
167 
168 #endif /* OPENDDS_DCPS_PERIODIC_TASK_H */
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex mutex_
Definition: PeriodicTask.h:64
WeakRcHandle< Delegate > delegate_
Definition: PeriodicTask.h:151
CommandPtr execute_or_enqueue(CommandPtr command)
const ACE_Time_Value & value() const
WeakRcHandle< PeriodicTask > const periodic_task_
Definition: PeriodicTask.h:100
virtual void execute(const MonotonicTimePoint &now)=0
PeriodicTask(RcHandle< ReactorInterceptor > interceptor)
Definition: PeriodicTask.h:21
PmfPeriodicTask(RcHandle< ReactorInterceptor > interceptor, const Delegate &delegate, PMF function)
Definition: PeriodicTask.h:145
WeakRcHandle< PeriodicTask > const periodic_task_
Definition: PeriodicTask.h:82
ScheduleEnableCommand(WeakRcHandle< PeriodicTask > hb, bool reenable, const TimeDuration &period)
Definition: PeriodicTask.h:70
virtual void reactor(ACE_Reactor *reactor)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
WeakRcHandle< ReactorInterceptor > interceptor_
Definition: PeriodicTask.h:66
virtual ACE_Reactor * reactor(void) const
ScheduleDisableCommand(WeakRcHandle< PeriodicTask > hb)
Definition: PeriodicTask.h:88
ACE_TEXT("TCP_Factory")
int handle_timeout(const ACE_Time_Value &tv, const void *)
Definition: PeriodicTask.h:103
void enable(bool reenable, const TimeDuration &period)
Definition: PeriodicTask.h:31
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)
static const ACE_Time_Value zero
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
RcHandle< T > lock() const
Definition: RcObject.h:188
#define TheServiceParticipant
void enable_i(bool reenable, const TimeDuration &per)
Definition: PeriodicTask.h:112
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
void execute(const MonotonicTimePoint &now)
Definition: PeriodicTask.h:154