00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H
00009 #define OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H
00010
00011
00012 #include "WriterInfo.h"
00013 #include "ReactorInterceptor.h"
00014 #include "Service_Participant.h"
00015 #include "GuidConverter.h"
00016
00017 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00018 #pragma once
00019 #endif
00020
00021 class DDS_TEST;
00022
00023 namespace OpenDDS {
00024 namespace DCPS {
00025
00026
00027 template <typename T>
00028 class RemoveAssociationSweeper : public ReactorInterceptor {
00029 public:
00030 RemoveAssociationSweeper(ACE_Reactor* reactor,
00031 ACE_thread_t owner,
00032 T* reader);
00033
00034 void schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info, bool callback);
00035 void cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00036
00037
00038 int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00039
00040 virtual bool reactor_is_shut_down() const
00041 {
00042 return TheServiceParticipant->is_shut_down();
00043 }
00044
00045 private:
00046 ~RemoveAssociationSweeper();
00047
00048 T* reader_;
00049
00050 class CommandBase : public Command {
00051 public:
00052 CommandBase(RemoveAssociationSweeper<T>* sweeper,
00053 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00054 : sweeper_ (sweeper)
00055 , info_(info)
00056 { }
00057
00058 protected:
00059 RemoveAssociationSweeper<T>* sweeper_;
00060 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo> info_;
00061 };
00062
00063 class ScheduleCommand : public CommandBase {
00064 public:
00065 ScheduleCommand(RemoveAssociationSweeper<T>* sweeper,
00066 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00067 : CommandBase(sweeper, info)
00068 { }
00069 virtual void execute();
00070 };
00071
00072 class CancelCommand : public CommandBase {
00073 public:
00074 CancelCommand(RemoveAssociationSweeper<T>* sweeper,
00075 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00076 : CommandBase(sweeper, info)
00077 { }
00078 virtual void execute();
00079 };
00080 };
00081
00082 template <typename T>
00083 RemoveAssociationSweeper<T>::RemoveAssociationSweeper(ACE_Reactor* reactor,
00084 ACE_thread_t owner,
00085 T* reader)
00086 : ReactorInterceptor (reactor, owner)
00087 , reader_(reader)
00088 { }
00089
00090 template <typename T>
00091 RemoveAssociationSweeper<T>::~RemoveAssociationSweeper()
00092 { }
00093
00094 template <typename T>
00095 void RemoveAssociationSweeper<T>::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info, bool callback)
00096 {
00097 info->scheduled_for_removal_ = true;
00098 info->notify_lost_ = callback;
00099 ACE_Time_Value ten_seconds(10);
00100 info->removal_deadline_ = ACE_OS::gettimeofday() + ten_seconds;
00101 ScheduleCommand c(this, info);
00102 execute_or_enqueue(c);
00103 }
00104
00105 template <typename T>
00106 void RemoveAssociationSweeper<T>::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00107 {
00108 info->scheduled_for_removal_ = false;
00109 info->removal_deadline_ = ACE_Time_Value::zero;
00110 CancelCommand c(this, info);
00111 execute_or_enqueue(c);
00112 }
00113
00114 template <typename T>
00115 int RemoveAssociationSweeper<T>::handle_timeout(
00116 const ACE_Time_Value& ,
00117 const void* arg)
00118 {
00119 PublicationId pub_id = reinterpret_cast<const WriterInfo*>(arg)->writer_id_;
00120
00121 if (DCPS_debug_level >= 1) {
00122 GuidConverter sub_repo(reader_->get_repo_id());
00123 GuidConverter pub_repo(pub_id);
00124 ACE_DEBUG((LM_INFO, "((%P|%t)) RemoveAssociationSweeper::handle_timeout reader: %C waiting on writer: %C\n",
00125 OPENDDS_STRING(sub_repo).c_str(),
00126 OPENDDS_STRING(pub_repo).c_str()));
00127 }
00128
00129 reader_->remove_or_reschedule(pub_id);
00130 return 0;
00131 }
00132
00133 template <typename T>
00134 void RemoveAssociationSweeper<T>::ScheduleCommand::execute()
00135 {
00136 static const ACE_Time_Value two_seconds(2);
00137
00138
00139 const void* arg = reinterpret_cast<const void*>(this->info_.in());
00140 this->info_->_add_ref();
00141
00142 this->info_->remove_association_timer_ = this->sweeper_->reactor()->schedule_timer(this->sweeper_,
00143 arg,
00144 two_seconds);
00145 if (DCPS_debug_level) {
00146 ACE_DEBUG((LM_INFO, "(%P|%t) RemoveAssociationSweeper::ScheduleCommand::execute() - Scheduled sweeper %d\n", this->info_->remove_association_timer_));
00147 }
00148 }
00149
00150 template <typename T>
00151 void RemoveAssociationSweeper<T>::CancelCommand::execute()
00152 {
00153 if (this->info_->remove_association_timer_ != WriterInfo::NO_TIMER) {
00154 this->sweeper_->reactor()->cancel_timer(this->info_->remove_association_timer_);
00155 if (DCPS_debug_level) {
00156 ACE_DEBUG((LM_INFO, "(%P|%t) RemoveAssociationSweeper::CancelCommand::execute() - Unscheduled sweeper %d\n", this->info_->remove_association_timer_));
00157 }
00158 this->info_->remove_association_timer_ = WriterInfo::NO_TIMER;
00159 this->info_->_remove_ref();
00160 }
00161 }
00162
00163
00164 }
00165 }
00166
00167 #endif