RemoveAssociationSweeper.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H
00009 #define OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H
00010 
00011 
00012 #include "WriterInfo.h"
00013 #include "ReactorInterceptor.h"
00014 #include "Service_Participant.h"
00015 #include "GuidConverter.h"
00016 
00017 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00018 #pragma once
00019 #endif /* ACE_LACKS_PRAGMA_ONCE */
00020 
00021 class DDS_TEST;
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 namespace OpenDDS {
00026 namespace DCPS {
00027 
00028 // Class to cleanup associations scheduled for removal
00029 template <typename T>
00030 class RemoveAssociationSweeper : public ReactorInterceptor {
00031 public:
00032   RemoveAssociationSweeper(ACE_Reactor* reactor,
00033                            ACE_thread_t owner,
00034                            T* reader);
00035 
00036   void schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info, bool callback);
00037   void cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00038 
00039   // Arg will be PublicationId
00040   int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00041 
00042   virtual bool reactor_is_shut_down() const
00043   {
00044     return TheServiceParticipant->is_shut_down();
00045   }
00046 
00047 
00048   int remove_info(WriterInfo* const info);
00049 private:
00050   ~RemoveAssociationSweeper();
00051 
00052 
00053   WeakRcHandle<T> reader_;
00054   OPENDDS_VECTOR(RcHandle<WriterInfo>) info_set_;
00055 
00056   class CommandBase : public Command {
00057   public:
00058     CommandBase(RemoveAssociationSweeper<T>* sweeper,
00059                 RcHandle<WriterInfo> info)
00060       : sweeper_ (sweeper)
00061       , info_(info)
00062     { }
00063 
00064   protected:
00065     RemoveAssociationSweeper<T>* sweeper_;
00066     RcHandle<OpenDDS::DCPS::WriterInfo> info_;
00067   };
00068 
00069   class ScheduleCommand : public CommandBase {
00070   public:
00071     ScheduleCommand(RemoveAssociationSweeper<T>* sweeper,
00072                     RcHandle<WriterInfo> info)
00073       : CommandBase(sweeper, info)
00074     { }
00075     virtual void execute();
00076   };
00077 
00078   class CancelCommand : public CommandBase {
00079   public:
00080     CancelCommand(RemoveAssociationSweeper<T>* sweeper,
00081                   RcHandle<WriterInfo> info)
00082       : CommandBase(sweeper, info)
00083     { }
00084     virtual void execute();
00085   };
00086 
00087   friend class CancelCommand;
00088 };
00089 //Starting RemoveAssociationSweeper
00090 template <typename T>
00091 RemoveAssociationSweeper<T>::RemoveAssociationSweeper(ACE_Reactor* reactor,
00092                                                    ACE_thread_t owner,
00093                                                    T* reader)
00094   : ReactorInterceptor (reactor, owner)
00095   , reader_(*reader)
00096 { }
00097 
00098 template <typename T>
00099 RemoveAssociationSweeper<T>::~RemoveAssociationSweeper()
00100 { }
00101 
00102 template <typename T>
00103 void RemoveAssociationSweeper<T>::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info, bool callback)
00104 {
00105   info->scheduled_for_removal_ = true;
00106   info->notify_lost_ = callback;
00107   ACE_Time_Value time_to_deadline(info->activity_wait_period());
00108 
00109   if (time_to_deadline > ACE_Time_Value(10))
00110     time_to_deadline = ACE_Time_Value(10);
00111 
00112   info->removal_deadline_ = ACE_OS::gettimeofday() + time_to_deadline;
00113   ScheduleCommand c(this, info);
00114   execute_or_enqueue(c);
00115 }
00116 
00117 template <typename T>
00118 void RemoveAssociationSweeper<T>::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00119 {
00120   info->scheduled_for_removal_ = false;
00121   info->removal_deadline_ = ACE_Time_Value::zero;
00122   CancelCommand c(this, info);
00123   execute_or_enqueue(c);
00124 }
00125 
00126 template <typename T>
00127 int RemoveAssociationSweeper<T>::remove_info(WriterInfo* const info)
00128 {
00129   OPENDDS_VECTOR(RcHandle<WriterInfo>)::iterator itr, last = --info_set_.end();
00130   // find the RcHandle holds the pointer info in this->info_set_
00131   // and then swap the found element with the last element in the
00132   // info_set_ vector so  we can delete it.
00133   for (itr = info_set_.begin(); itr != info_set_.end(); ++itr){
00134     if (itr->in() == info) {
00135       if (itr != last) {
00136         std::swap(*itr, info_set_.back());
00137       }
00138       info_set_.pop_back();
00139       return 0;
00140     }
00141   }
00142   return -1;
00143 }
00144 
00145 template <typename T>
00146 int RemoveAssociationSweeper<T>::handle_timeout(
00147     const ACE_Time_Value& ,
00148     const void* arg)
00149 {
00150   WriterInfo* const info =
00151     const_cast<WriterInfo*>(reinterpret_cast<const WriterInfo*>(arg));
00152 
00153   {
00154     // info may be destroyed at this moment, we can only access it
00155     // if it is in the info_set_. This could happen when cancel_timer() handle it first.
00156     ACE_Guard<ACE_Thread_Mutex> guard(this->mutex_);
00157     if (this->remove_info(info) == -1)
00158       return 0;
00159   }
00160 
00161   info->remove_association_timer_ = WriterInfo::NO_TIMER;
00162   const PublicationId pub_id = info->writer_id_;
00163 
00164   RcHandle<T> reader = reader_.lock();
00165   if (!reader)
00166     return 0;
00167 
00168   if (DCPS_debug_level >= 1) {
00169     GuidConverter sub_repo(reader->get_repo_id());
00170     GuidConverter pub_repo(pub_id);
00171     ACE_DEBUG((LM_INFO, "((%P|%t)) RemoveAssociationSweeper::handle_timeout reader: %C waiting on writer: %C\n",
00172                OPENDDS_STRING(sub_repo).c_str(),
00173                OPENDDS_STRING(pub_repo).c_str()));
00174   }
00175 
00176   reader->remove_publication(pub_id);
00177   return 0;
00178 }
00179 
00180 template <typename T>
00181 void RemoveAssociationSweeper<T>::ScheduleCommand::execute()
00182 {
00183   //Pass pointer to writer info for timer to use, must decrease ref count when canceling timer
00184   const void* arg = reinterpret_cast<const void*>(this->info_.in());
00185   this->sweeper_->info_set_.push_back(this->info_);
00186 
00187   this->info_->remove_association_timer_ = this->sweeper_->reactor()->schedule_timer(this->sweeper_,
00188                                                                        arg,
00189                                                                        this->info_->removal_deadline_ - ACE_OS::gettimeofday());
00190   if (DCPS_debug_level) {
00191     ACE_DEBUG((LM_INFO, "(%P|%t) RemoveAssociationSweeper::ScheduleCommand::execute() - Scheduled sweeper %d\n", this->info_->remove_association_timer_));
00192   }
00193 }
00194 
00195 template <typename T>
00196 void RemoveAssociationSweeper<T>::CancelCommand::execute()
00197 {
00198   if (this->info_->remove_association_timer_ != WriterInfo::NO_TIMER) {
00199       this->sweeper_->reactor()->cancel_timer(this->info_->remove_association_timer_);
00200     if (DCPS_debug_level) {
00201       ACE_DEBUG((LM_INFO, "(%P|%t) RemoveAssociationSweeper::CancelCommand::execute() - Unscheduled sweeper %d\n", this->info_->remove_association_timer_));
00202     }
00203     this->info_->remove_association_timer_ = WriterInfo::NO_TIMER;
00204     this->sweeper_->remove_info(this->info_.in());
00205   }
00206 }
00207 //End RemoveAssociationSweeper
00208 
00209 } // namespace DCPS
00210 } // namespace OpenDDS
00211 
00212 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00213 
00214 #endif /* OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1