DataLinkWatchdog_T.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_DATALINKWATCHDOG_T_H
00009 #define DCPS_DATALINKWATCHDOG_T_H
00010 
00011 #include "ace/Global_Macros.h"
00012 #include "ace/Event_Handler.h"
00013 #include "ace/Log_Msg.h"
00014 #include "ace/Mutex.h"
00015 #include "ace/Reactor.h"
00016 #include "ace/Time_Value.h"
00017 #include "ace/OS_NS_time.h"
00018 #include "ace/Reverse_Lock_T.h"
00019 #include "dds/DCPS/ReactorInterceptor.h"
00020 
00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 class DataLinkWatchdog : public ReactorInterceptor {
00027 public:
00028 
00029   bool schedule(const void* arg = 0) {
00030     ScheduleCommand c(this, arg, false);
00031     execute_or_enqueue(c);
00032     return true;
00033   }
00034 
00035   bool schedule_now(const void* arg = 0) {
00036     ScheduleCommand c(this, arg, true);
00037     execute_or_enqueue(c);
00038     return true;
00039   }
00040 
00041   void cancel() {
00042     CancelCommand c(this);
00043     execute_or_enqueue(c);
00044   }
00045 
00046   int handle_timeout(const ACE_Time_Value& now, const void* arg) {
00047     ACE_Time_Value timeout = next_timeout();
00048 
00049     if (timeout != ACE_Time_Value::zero) {
00050       timeout += this->epoch_;
00051       if (now > timeout) {
00052         on_timeout(arg);
00053         {
00054           cancel_i();
00055         }
00056         return 0;
00057       }
00058     }
00059 
00060     on_interval(arg);
00061 
00062     {
00063       if (!schedule_i(arg, false)) {
00064         ACE_ERROR((LM_WARNING,
00065                    ACE_TEXT("(%P|%t) WARNING: ")
00066                    ACE_TEXT("DataLinkWatchdog::handle_timeout: ")
00067                    ACE_TEXT("unable to reschedule watchdog timer!\n")));
00068       }
00069     }
00070 
00071     return 0;
00072   }
00073 
00074 protected:
00075   DataLinkWatchdog(ACE_Reactor* reactor,
00076                    ACE_thread_t owner)
00077     : ReactorInterceptor(reactor, owner)
00078     , timer_id_(-1)
00079     , cancelled_(false)
00080   {}
00081 
00082   virtual ~DataLinkWatchdog() {
00083   }
00084 
00085   virtual ACE_Time_Value next_interval() = 0;
00086   virtual void on_interval(const void* arg) = 0;
00087 
00088   virtual ACE_Time_Value next_timeout() { return ACE_Time_Value::zero; }
00089   virtual void on_timeout(const void* /*arg*/) {}
00090 
00091 private:
00092   class CommandBase : public Command {
00093   public:
00094     CommandBase(DataLinkWatchdog* watchdog)
00095       : watchdog_(watchdog)
00096     { }
00097   protected:
00098     DataLinkWatchdog* watchdog_;
00099   };
00100 
00101   class ScheduleCommand : public CommandBase {
00102   public:
00103     ScheduleCommand (DataLinkWatchdog* watchdog, const void* arg, bool nodelay)
00104       : CommandBase(watchdog)
00105       , arg_ (arg)
00106       , nodelay_ (nodelay)
00107     { }
00108 
00109     virtual void execute() {
00110       watchdog_->schedule_i(arg_, nodelay_);
00111     }
00112 
00113   private:
00114     const void* arg_;
00115     bool nodelay_;
00116   };
00117 
00118   class CancelCommand : public CommandBase {
00119   public:
00120     CancelCommand(DataLinkWatchdog* watchdog)
00121       : CommandBase(watchdog)
00122     { }
00123 
00124     virtual void execute() {
00125       watchdog_->cancel_i();
00126     }
00127   };
00128 
00129   long timer_id_;
00130 
00131   ACE_Time_Value epoch_;
00132   bool cancelled_;
00133 
00134   bool schedule_i(const void* arg, bool nodelay) {
00135     if (this->cancelled_) return true;
00136 
00137     ACE_Time_Value delay;
00138     if (!nodelay) delay = next_interval();
00139 
00140     if (this->epoch_ == ACE_Time_Value::zero) {
00141       this->epoch_ = ACE_OS::gettimeofday();
00142     }
00143 
00144     long timer_id = -1;
00145     {
00146       timer_id = reactor()->schedule_timer(this,  // event_handler
00147                                            arg,
00148                                            delay);
00149 
00150       if (timer_id == -1) {
00151         ACE_ERROR_RETURN ((LM_ERROR,
00152                   ACE_TEXT("(%P|%t) ERROR: ")
00153                   ACE_TEXT("DataLinkWatchdog::schedule_i: ")
00154                   ACE_TEXT("failed to register timer %p!\n"),
00155                   ACE_TEXT("schedule_timer")), false);
00156       }
00157     }
00158 
00159     //after re-acquiring lock_ need to check cancelled_
00160     if (this->cancelled_) {
00161       reactor()->cancel_timer(timer_id);
00162       return true;
00163     }
00164     else {
00165       this->timer_id_ = timer_id;
00166     }
00167 
00168     return this->timer_id_ != -1;
00169   }
00170 
00171   void cancel_i() {
00172     if (this->timer_id_ == -1) return;
00173 
00174     this->timer_id_ = -1;
00175     this->cancelled_ = true;
00176     reactor()->cancel_timer(this);
00177   }
00178 };
00179 
00180 } // namespace DCPS
00181 } // namespace OpenDDS
00182 
00183 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00184 
00185 #endif  /* DCPS_DATALINKWATCHDOG_T_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1