DataLinkWatchdog_T.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_DATALINKWATCHDOG_T_H
00009 #define DCPS_DATALINKWATCHDOG_T_H
00010 
00011 #include "ace/Global_Macros.h"
00012 #include "ace/Event_Handler.h"
00013 #include "ace/Log_Msg.h"
00014 #include "ace/Mutex.h"
00015 #include "ace/Reactor.h"
00016 #include "ace/Time_Value.h"
00017 #include "ace/OS_NS_time.h"
00018 #include "ace/Reverse_Lock_T.h"
00019 #include "dds/DCPS/ReactorInterceptor.h"
00020 
00021 namespace OpenDDS {
00022 namespace DCPS {
00023 
00024 class DataLinkWatchdog : public ReactorInterceptor {
00025 public:
00026 
00027   bool schedule(const void* arg = 0) {
00028     ScheduleCommand c(this, arg, false);
00029     execute_or_enqueue(c);
00030     return true;
00031   }
00032 
00033   bool schedule_now(const void* arg = 0) {
00034     ScheduleCommand c(this, arg, true);
00035     execute_or_enqueue(c);
00036     return true;
00037   }
00038 
00039   void cancel() {
00040     CancelCommand c(this);
00041     execute_or_enqueue(c);
00042   }
00043 
00044   int handle_timeout(const ACE_Time_Value& now, const void* arg) {
00045     ACE_Time_Value timeout = next_timeout();
00046 
00047     if (timeout != ACE_Time_Value::zero) {
00048       timeout += this->epoch_;
00049       if (now > timeout) {
00050         on_timeout(arg);
00051         {
00052           cancel_i();
00053         }
00054         return 0;
00055       }
00056     }
00057 
00058     on_interval(arg);
00059 
00060     {
00061       if (!schedule_i(arg, false)) {
00062         ACE_ERROR((LM_WARNING,
00063                    ACE_TEXT("(%P|%t) WARNING: ")
00064                    ACE_TEXT("DataLinkWatchdog::handle_timeout: ")
00065                    ACE_TEXT("unable to reschedule watchdog timer!\n")));
00066       }
00067     }
00068 
00069     return 0;
00070   }
00071 
00072 protected:
00073   DataLinkWatchdog(ACE_Reactor* reactor,
00074                    ACE_thread_t owner)
00075     : ReactorInterceptor(reactor, owner)
00076     , timer_id_(-1)
00077     , cancelled_(false)
00078   {}
00079 
00080   virtual ~DataLinkWatchdog() {
00081   }
00082 
00083   virtual ACE_Time_Value next_interval() = 0;
00084   virtual void on_interval(const void* arg) = 0;
00085 
00086   virtual ACE_Time_Value next_timeout() { return ACE_Time_Value::zero; }
00087   virtual void on_timeout(const void* /*arg*/) {}
00088 
00089 private:
00090   class CommandBase : public Command {
00091   public:
00092     CommandBase(DataLinkWatchdog* watchdog)
00093       : watchdog_(watchdog)
00094     { }
00095   protected:
00096     DataLinkWatchdog* watchdog_;
00097   };
00098 
00099   class ScheduleCommand : public CommandBase {
00100   public:
00101     ScheduleCommand (DataLinkWatchdog* watchdog, const void* arg, bool nodelay)
00102       : CommandBase(watchdog)
00103       , arg_ (arg)
00104       , nodelay_ (nodelay)
00105     { }
00106 
00107     virtual void execute() {
00108       watchdog_->schedule_i(arg_, nodelay_);
00109     }
00110 
00111   private:
00112     const void* arg_;
00113     bool nodelay_;
00114   };
00115 
00116   class CancelCommand : public CommandBase {
00117   public:
00118     CancelCommand(DataLinkWatchdog* watchdog)
00119       : CommandBase(watchdog)
00120     { }
00121 
00122     virtual void execute() {
00123       watchdog_->cancel_i();
00124     }
00125   };
00126 
00127   long timer_id_;
00128 
00129   ACE_Time_Value epoch_;
00130   bool cancelled_;
00131 
00132   bool schedule_i(const void* arg, bool nodelay) {
00133     if (this->cancelled_) return true;
00134 
00135     ACE_Time_Value delay;
00136     if (!nodelay) delay = next_interval();
00137 
00138     if (this->epoch_ == ACE_Time_Value::zero) {
00139       this->epoch_ = ACE_OS::gettimeofday();
00140     }
00141 
00142     long timer_id = -1;
00143     {
00144       timer_id = reactor()->schedule_timer(this,  // event_handler
00145                                            arg,
00146                                            delay);
00147 
00148       if (timer_id == -1) {
00149         ACE_ERROR_RETURN ((LM_ERROR,
00150                   ACE_TEXT("(%P|%t) ERROR: ")
00151                   ACE_TEXT("DataLinkWatchdog::schedule_i: ")
00152                   ACE_TEXT("failed to register timer %p!\n"),
00153                   ACE_TEXT("schedule_timer")), false);
00154       }
00155     }
00156 
00157     //after re-acquiring lock_ need to check cancelled_
00158     if (this->cancelled_) {
00159       reactor()->cancel_timer(timer_id);
00160       return true;
00161     }
00162     else {
00163       this->timer_id_ = timer_id;
00164     }
00165 
00166     return this->timer_id_ != -1;
00167   }
00168 
00169   void cancel_i() {
00170     if (this->timer_id_ == -1) return;
00171 
00172     this->timer_id_ = -1;
00173     this->cancelled_ = true;
00174     reactor()->cancel_timer(this);
00175   }
00176 };
00177 
00178 } // namespace DCPS
00179 } // namespace OpenDDS
00180 
00181 #endif  /* DCPS_DATALINKWATCHDOG_T_H */

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7