LCOV - code coverage report
Current view: top level - DCPS - SporadicTask.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 77 77 100.0 %
Date: 2023-04-30 01:32:43 Functions: 11 129 8.5 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_SPORADIC_TASK_H
       9             : #define OPENDDS_DCPS_SPORADIC_TASK_H
      10             : 
      11             : #include "RcEventHandler.h"
      12             : #include "ReactorInterceptor.h"
      13             : #include "TimeSource.h"
      14             : #include "Service_Participant.h"
      15             : 
      16             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      17             : 
      18             : namespace OpenDDS {
      19             : namespace DCPS {
      20             : 
      21             : class SporadicTask : public virtual RcEventHandler {
      22             : public:
      23          14 :   SporadicTask(const TimeSource& time_source,
      24             :                RcHandle<ReactorInterceptor> interceptor)
      25          14 :     : time_source_(time_source)
      26          14 :     , interceptor_(interceptor)
      27          14 :     , desired_scheduled_(false)
      28          14 :     , timer_id_(-1)
      29          14 :     , sporadic_command_(make_rch<SporadicCommand>(rchandle_from(this)))
      30             :   {
      31          14 :     reactor(interceptor->reactor());
      32          14 :   }
      33             : 
      34          14 :   virtual ~SporadicTask() {}
      35             : 
      36          15 :   void schedule(const TimeDuration& delay)
      37             :   {
      38          15 :     const MonotonicTimePoint next_time = time_source_.monotonic_time_point_now() + delay;
      39             :     {
      40          15 :       ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      41          15 :       if (!desired_scheduled_ || next_time < desired_next_time_) {
      42          14 :         desired_scheduled_ = true;
      43          14 :         desired_next_time_ = next_time;
      44          14 :         desired_delay_ = delay;
      45             :       } else {
      46           1 :         return;
      47             :       }
      48          15 :     }
      49             : 
      50          14 :     RcHandle<ReactorInterceptor> interceptor = interceptor_.lock();
      51          14 :     if (interceptor) {
      52          13 :       interceptor->execute_or_enqueue(sporadic_command_);
      53           1 :     } else if (log_level >= LogLevel::Error) {
      54           1 :       ACE_ERROR((LM_ERROR,
      55             :                  "(%P|%t) ERROR: SporadicTask::schedule: "
      56             :                  "failed to receive ReactorInterceptor handle\n"));
      57             :     }
      58          15 :   }
      59             : 
      60           3 :   void cancel()
      61             :   {
      62             :     {
      63           3 :       ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      64           3 :       if (!desired_scheduled_) {
      65           1 :         return;
      66             :       }
      67             : 
      68           2 :       desired_scheduled_ = false;
      69           3 :     }
      70             : 
      71           2 :     RcHandle<ReactorInterceptor> interceptor = interceptor_.lock();
      72           2 :     if (interceptor) {
      73           1 :       interceptor->execute_or_enqueue(sporadic_command_);
      74           1 :     } else if (log_level >= LogLevel::Error) {
      75           1 :       ACE_ERROR((LM_ERROR,
      76             :                  "(%P|%t) ERROR: SporadicTask::cancel: "
      77             :                  "failed to receive ReactorInterceptor handle\n"));
      78             :     }
      79           2 :   }
      80             : 
      81             :   virtual void execute(const MonotonicTimePoint& now) = 0;
      82             : 
      83             : protected:
      84           2 :   long get_timer_id() { return timer_id_; }
      85             : 
      86             : private:
      87             :   struct SporadicCommand : public ReactorInterceptor::Command {
      88          14 :     explicit SporadicCommand(WeakRcHandle<SporadicTask> sporadic_task)
      89          14 :       : sporadic_task_(sporadic_task)
      90          14 :     { }
      91             : 
      92          14 :     virtual void execute()
      93             :     {
      94          14 :       RcHandle<SporadicTask> st = sporadic_task_.lock();
      95          14 :       if (st) {
      96          14 :         st->execute_i();
      97             :       }
      98          14 :     }
      99             : 
     100             :     WeakRcHandle<SporadicTask> sporadic_task_;
     101             :   };
     102             : 
     103             :   const TimeSource& time_source_;
     104             :   WeakRcHandle<ReactorInterceptor> interceptor_;
     105             :   bool desired_scheduled_;
     106             :   MonotonicTimePoint desired_next_time_;
     107             :   TimeDuration desired_delay_;
     108             :   long timer_id_;
     109             :   MonotonicTimePoint actual_next_time_;
     110             :   RcHandle<SporadicCommand> sporadic_command_;
     111             :   mutable ACE_Thread_Mutex mutex_;
     112             : 
     113          14 :   void execute_i()
     114             :   {
     115          14 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     116             : 
     117          27 :     if ((!desired_scheduled_ && timer_id_ != -1) ||
     118          13 :         (desired_scheduled_ && timer_id_ != -1 && desired_next_time_ != actual_next_time_)) {
     119           2 :       reactor()->cancel_timer(timer_id_);
     120           2 :       timer_id_ = -1;
     121             :     }
     122             : 
     123          14 :     if (desired_scheduled_ && timer_id_ == -1) {
     124          13 :       timer_id_ = reactor()->schedule_timer(this, 0, desired_delay_.value());
     125          13 :       if (timer_id_ == -1) {
     126           1 :         if (log_level >= LogLevel::Error) {
     127           1 :           ACE_ERROR((LM_ERROR,
     128             :                      "(%P|%t) ERROR: SporadicTask::execute_i: "
     129             :                      "failed to schedule timer %p\n", ""));
     130             :         }
     131             :       } else {
     132          12 :         actual_next_time_ = desired_next_time_;
     133             :       }
     134             :     }
     135          14 :   }
     136             : 
     137           2 :   int handle_timeout(const ACE_Time_Value& tv, const void*)
     138             :   {
     139           2 :     ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
     140             : 
     141           2 :     const MonotonicTimePoint now(tv);
     142             :     {
     143           2 :       ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     144           2 :       desired_scheduled_ = false;
     145           2 :       timer_id_ = -1;
     146           2 :     }
     147           2 :     execute(now);
     148           2 :     return 0;
     149           2 :   }
     150             : };
     151             : 
     152             : template <typename Delegate>
     153             : class PmfSporadicTask : public SporadicTask {
     154             : public:
     155             :   typedef void (Delegate::*PMF)(const MonotonicTimePoint&);
     156             : 
     157           1 :   PmfSporadicTask(const TimeSource& time_source,
     158             :                   RcHandle<ReactorInterceptor> interceptor,
     159             :                   RcHandle<Delegate> delegate,
     160             :                   PMF function)
     161             :     : SporadicTask(time_source, interceptor)
     162           1 :     , delegate_(delegate)
     163           1 :     , function_(function)
     164           1 :   {}
     165             : 
     166             : private:
     167             :   WeakRcHandle<Delegate> delegate_;
     168             :   PMF function_;
     169             : 
     170           1 :   void execute(const MonotonicTimePoint& now)
     171             :   {
     172           1 :     RcHandle<Delegate> handle = delegate_.lock();
     173           1 :     if (handle) {
     174           1 :       ((*handle).*function_)(now);
     175             :     }
     176           1 :   }
     177             : };
     178             : 
     179             : } // namespace DCPS
     180             : } // namespace OpenDDS
     181             : 
     182             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     183             : 
     184             : #endif /* OPENDDS_DCPS_SPORADIC_TASK_H  */

Generated by: LCOV version 1.16