RemoveAssociationSweeper.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H
00009 #define OPENDDS_DCPS_REMOVEASSOCIATIONSWEEPER_H
00010
00011
00012 #include "WriterInfo.h"
00013 #include "ReactorInterceptor.h"
00014 #include "Service_Participant.h"
00015 #include "GuidConverter.h"
00016
00017 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00018 #pragma once
00019 #endif
00020
00021 class DDS_TEST;
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 namespace OpenDDS {
00026 namespace DCPS {
00027
00028
00029 template <typename T>
00030 class RemoveAssociationSweeper : public ReactorInterceptor {
00031 public:
00032 RemoveAssociationSweeper(ACE_Reactor* reactor,
00033 ACE_thread_t owner,
00034 T* reader);
00035
00036 void schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info, bool callback);
00037 void cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00038
00039
00040 int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00041
00042 virtual bool reactor_is_shut_down() const
00043 {
00044 return TheServiceParticipant->is_shut_down();
00045 }
00046
00047
00048 int remove_info(WriterInfo* const info);
00049 private:
00050 ~RemoveAssociationSweeper();
00051
00052
00053 WeakRcHandle<T> reader_;
00054 OPENDDS_VECTOR(RcHandle<WriterInfo>) info_set_;
00055
00056 class CommandBase : public Command {
00057 public:
00058 CommandBase(RemoveAssociationSweeper<T>* sweeper,
00059 RcHandle<WriterInfo> info)
00060 : sweeper_ (sweeper)
00061 , info_(info)
00062 { }
00063
00064 protected:
00065 RemoveAssociationSweeper<T>* sweeper_;
00066 RcHandle<OpenDDS::DCPS::WriterInfo> info_;
00067 };
00068
00069 class ScheduleCommand : public CommandBase {
00070 public:
00071 ScheduleCommand(RemoveAssociationSweeper<T>* sweeper,
00072 RcHandle<WriterInfo> info)
00073 : CommandBase(sweeper, info)
00074 { }
00075 virtual void execute();
00076 };
00077
00078 class CancelCommand : public CommandBase {
00079 public:
00080 CancelCommand(RemoveAssociationSweeper<T>* sweeper,
00081 RcHandle<WriterInfo> info)
00082 : CommandBase(sweeper, info)
00083 { }
00084 virtual void execute();
00085 };
00086
00087 friend class CancelCommand;
00088 };
00089
00090 template <typename T>
00091 RemoveAssociationSweeper<T>::RemoveAssociationSweeper(ACE_Reactor* reactor,
00092 ACE_thread_t owner,
00093 T* reader)
00094 : ReactorInterceptor (reactor, owner)
00095 , reader_(*reader)
00096 { }
00097
00098 template <typename T>
00099 RemoveAssociationSweeper<T>::~RemoveAssociationSweeper()
00100 { }
00101
00102 template <typename T>
00103 void RemoveAssociationSweeper<T>::schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info, bool callback)
00104 {
00105 info->scheduled_for_removal_ = true;
00106 info->notify_lost_ = callback;
00107 ACE_Time_Value time_to_deadline(info->activity_wait_period());
00108
00109 if (time_to_deadline > ACE_Time_Value(10))
00110 time_to_deadline = ACE_Time_Value(10);
00111
00112 info->removal_deadline_ = ACE_OS::gettimeofday() + time_to_deadline;
00113 ScheduleCommand c(this, info);
00114 execute_or_enqueue(c);
00115 }
00116
00117 template <typename T>
00118 void RemoveAssociationSweeper<T>::cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00119 {
00120 info->scheduled_for_removal_ = false;
00121 info->removal_deadline_ = ACE_Time_Value::zero;
00122 CancelCommand c(this, info);
00123 execute_or_enqueue(c);
00124 }
00125
00126 template <typename T>
00127 int RemoveAssociationSweeper<T>::remove_info(WriterInfo* const info)
00128 {
00129 OPENDDS_VECTOR(RcHandle<WriterInfo>)::iterator itr, last = --info_set_.end();
00130
00131
00132
00133 for (itr = info_set_.begin(); itr != info_set_.end(); ++itr){
00134 if (itr->in() == info) {
00135 if (itr != last) {
00136 std::swap(*itr, info_set_.back());
00137 }
00138 info_set_.pop_back();
00139 return 0;
00140 }
00141 }
00142 return -1;
00143 }
00144
00145 template <typename T>
00146 int RemoveAssociationSweeper<T>::handle_timeout(
00147 const ACE_Time_Value& ,
00148 const void* arg)
00149 {
00150 WriterInfo* const info =
00151 const_cast<WriterInfo*>(reinterpret_cast<const WriterInfo*>(arg));
00152
00153 {
00154
00155
00156 ACE_Guard<ACE_Thread_Mutex> guard(this->mutex_);
00157 if (this->remove_info(info) == -1)
00158 return 0;
00159 }
00160
00161 info->remove_association_timer_ = WriterInfo::NO_TIMER;
00162 const PublicationId pub_id = info->writer_id_;
00163
00164 RcHandle<T> reader = reader_.lock();
00165 if (!reader)
00166 return 0;
00167
00168 if (DCPS_debug_level >= 1) {
00169 GuidConverter sub_repo(reader->get_repo_id());
00170 GuidConverter pub_repo(pub_id);
00171 ACE_DEBUG((LM_INFO, "((%P|%t)) RemoveAssociationSweeper::handle_timeout reader: %C waiting on writer: %C\n",
00172 OPENDDS_STRING(sub_repo).c_str(),
00173 OPENDDS_STRING(pub_repo).c_str()));
00174 }
00175
00176 reader->remove_publication(pub_id);
00177 return 0;
00178 }
00179
00180 template <typename T>
00181 void RemoveAssociationSweeper<T>::ScheduleCommand::execute()
00182 {
00183
00184 const void* arg = reinterpret_cast<const void*>(this->info_.in());
00185 this->sweeper_->info_set_.push_back(this->info_);
00186
00187 this->info_->remove_association_timer_ = this->sweeper_->reactor()->schedule_timer(this->sweeper_,
00188 arg,
00189 this->info_->removal_deadline_ - ACE_OS::gettimeofday());
00190 if (DCPS_debug_level) {
00191 ACE_DEBUG((LM_INFO, "(%P|%t) RemoveAssociationSweeper::ScheduleCommand::execute() - Scheduled sweeper %d\n", this->info_->remove_association_timer_));
00192 }
00193 }
00194
00195 template <typename T>
00196 void RemoveAssociationSweeper<T>::CancelCommand::execute()
00197 {
00198 if (this->info_->remove_association_timer_ != WriterInfo::NO_TIMER) {
00199 this->sweeper_->reactor()->cancel_timer(this->info_->remove_association_timer_);
00200 if (DCPS_debug_level) {
00201 ACE_DEBUG((LM_INFO, "(%P|%t) RemoveAssociationSweeper::CancelCommand::execute() - Unscheduled sweeper %d\n", this->info_->remove_association_timer_));
00202 }
00203 this->info_->remove_association_timer_ = WriterInfo::NO_TIMER;
00204 this->sweeper_->remove_info(this->info_.in());
00205 }
00206 }
00207
00208
00209 }
00210 }
00211
00212 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00213
00214 #endif