00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_REACTORINTERCEPTOR_H
00009 #define OPENDDS_DCPS_REACTORINTERCEPTOR_H
00010
00011 #include "PoolAllocator.h"
00012 #include "PoolAllocationBase.h"
00013 #include "ace/Reactor.h"
00014 #include "ace/Thread.h"
00015 #include "ace/Condition_Thread_Mutex.h"
00016 #include "dcps_export.h"
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 class OpenDDS_Dcps_Export ReactorInterceptor : public ACE_Event_Handler, public PoolAllocationBase {
00022 public:
00023
00024 class Command : public PoolAllocationBase {
00025 public:
00026 virtual ~Command() { }
00027 virtual void execute() = 0;
00028 };
00029
00030 ReactorInterceptor(ACE_Reactor* reactor,
00031 ACE_thread_t owner);
00032
00033 bool should_execute_immediately();
00034 void process_command_queue();
00035
00036 template<typename T>
00037 void execute_or_enqueue(T& t)
00038 {
00039 if (should_execute_immediately()) {
00040 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00041 process_command_queue();
00042 t.execute();
00043 } else {
00044 enqueue(t);
00045 }
00046 }
00047
00048 template<typename T>
00049 void enqueue(T& t)
00050 {
00051 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00052 command_queue_.push(new T(t));
00053 ++registration_counter_;
00054 this->reactor()->notify(this);
00055 }
00056
00057 void wait();
00058
00059 void destroy();
00060
00061 virtual bool reactor_is_shut_down() const = 0;
00062
00063 protected:
00064 virtual ~ReactorInterceptor();
00065
00066 private:
00067 int handle_exception(ACE_HANDLE );
00068 int handle_exception_i(ACE_Guard<ACE_Thread_Mutex>& guard);
00069 ACE_thread_t owner_;
00070 ACE_Thread_Mutex mutex_;
00071 ACE_Condition_Thread_Mutex condition_;
00072 OPENDDS_QUEUE(Command*) command_queue_;
00073 ACE_UINT64 registration_counter_;
00074 bool destroy_;
00075 };
00076
00077 }
00078 }
00079
00080 #endif