RemoveAssociationSweeper.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H
00009 #define OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H
00010 
00011 
00012 #include "WriterInfo.h"
00013 #include "ReactorInterceptor.h"
00014 #include "Service_Participant.h"
00015 #include "GuidConverter.h"
00016 
00017 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00018 #pragma once
00019 #endif /* ACE_LACKS_PRAGMA_ONCE */
00020 
00021 class DDS_TEST;
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 // Class to cleanup associations scheduled for removal
00027 template <typename T>
00028 class RemoveAssociationSweeper : public ReactorInterceptor {
00029 public:
00030   RemoveAssociationSweeper(ACE_Reactor* reactor,
00031                            ACE_thread_t owner,
00032                            T* reader);
00033 
00034   void schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info, bool callback);
00035   void cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00036 
00037   // Arg will be PublicationId
00038   int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00039 
00040   virtual bool reactor_is_shut_down() const
00041   {
00042     return TheServiceParticipant->is_shut_down();
00043   }
00044 
00045 private:
00046   ~RemoveAssociationSweeper();
00047 
00048   T* reader_;
00049 
00050   class CommandBase : public Command {
00051   public:
00052     CommandBase(RemoveAssociationSweeper<T>* sweeper,
00053                 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00054       : sweeper_ (sweeper)
00055       , info_(info)
00056     { }
00057 
00058   protected:
00059     RemoveAssociationSweeper<T>* sweeper_;
00060     OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo> info_;
00061   };
00062 
00063   class ScheduleCommand : public CommandBase {
00064   public:
00065     ScheduleCommand(RemoveAssociationSweeper<T>* sweeper,
00066                     OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00067       : CommandBase(sweeper, info)
00068     { }
00069     virtual void execute();
00070   };
00071 
00072   class CancelCommand : public CommandBase {
00073   public:
00074     CancelCommand(RemoveAssociationSweeper<T>* sweeper,
00075                   OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00076       : CommandBase(sweeper, info)
00077     { }
00078     virtual void execute();
00079   };
00080 };
00081 //Starting RemoveAssociationSweeper
00082 template <typename T>
00083 RemoveAssociationSweeper<T>::RemoveAssociationSweeper(ACE_Reactor* reactor,
00084                                                    ACE_thread_t owner,
00085                                                    T* reader)
00086   : ReactorInterceptor (reactor, owner)
00087   , reader_(reader)
00088 { }
00089 
00090 template <typename T>
00091 RemoveAssociationSweeper<T>::~RemoveAssociationSweeper()
00092 { }
00093 
00094 template <typename T>
00095 void RemoveAssociationSweeper<T>::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info, bool callback)
00096 {
00097   info->scheduled_for_removal_ = true;
00098   info->notify_lost_ = callback;
00099   ACE_Time_Value ten_seconds(10);
00100   info->removal_deadline_ = ACE_OS::gettimeofday() + ten_seconds;
00101   ScheduleCommand c(this, info);
00102   execute_or_enqueue(c);
00103 }
00104 
00105 template <typename T>
00106 void RemoveAssociationSweeper<T>::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00107 {
00108   info->scheduled_for_removal_ = false;
00109   info->removal_deadline_ = ACE_Time_Value::zero;
00110   CancelCommand c(this, info);
00111   execute_or_enqueue(c);
00112 }
00113 
00114 template <typename T>
00115 int RemoveAssociationSweeper<T>::handle_timeout(
00116     const ACE_Time_Value& ,
00117     const void* arg)
00118 {
00119   PublicationId pub_id = reinterpret_cast<const WriterInfo*>(arg)->writer_id_;
00120 
00121   if (DCPS_debug_level >= 1) {
00122     GuidConverter sub_repo(reader_->get_repo_id());
00123     GuidConverter pub_repo(pub_id);
00124     ACE_DEBUG((LM_INFO, "((%P|%t)) RemoveAssociationSweeper::handle_timeout reader: %C waiting on writer: %C\n",
00125                OPENDDS_STRING(sub_repo).c_str(),
00126                OPENDDS_STRING(pub_repo).c_str()));
00127   }
00128 
00129   reader_->remove_or_reschedule(pub_id);
00130   return 0;
00131 }
00132 
00133 template <typename T>
00134 void RemoveAssociationSweeper<T>::ScheduleCommand::execute()
00135 {
00136   static const ACE_Time_Value two_seconds(2);
00137 
00138   //Pass pointer to writer info for timer to use, must decrease ref count when canceling timer
00139   const void* arg = reinterpret_cast<const void*>(this->info_.in());
00140   this->info_->_add_ref();
00141 
00142   this->info_->remove_association_timer_ = this->sweeper_->reactor()->schedule_timer(this->sweeper_,
00143                                                                        arg,
00144                                                                        two_seconds);
00145   if (DCPS_debug_level) {
00146     ACE_DEBUG((LM_INFO, "(%P|%t) RemoveAssociationSweeper::ScheduleCommand::execute() - Scheduled sweeper %d\n", this->info_->remove_association_timer_));
00147   }
00148 }
00149 
00150 template <typename T>
00151 void RemoveAssociationSweeper<T>::CancelCommand::execute()
00152 {
00153   if (this->info_->remove_association_timer_ != WriterInfo::NO_TIMER) {
00154       this->sweeper_->reactor()->cancel_timer(this->info_->remove_association_timer_);
00155     if (DCPS_debug_level) {
00156       ACE_DEBUG((LM_INFO, "(%P|%t) RemoveAssociationSweeper::CancelCommand::execute() - Unscheduled sweeper %d\n", this->info_->remove_association_timer_));
00157     }
00158     this->info_->remove_association_timer_ = WriterInfo::NO_TIMER;
00159     this->info_->_remove_ref();
00160   }
00161 }
00162 //End RemoveAssociationSweeper
00163 
00164 } // namespace DCPS
00165 } // namespace OpenDDS
00166 
00167 #endif /* OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H  */

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7