00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_REACTORINTERCEPTOR_H 00009 #define OPENDDS_DCPS_REACTORINTERCEPTOR_H 00010 00011 #include "PoolAllocator.h" 00012 #include "PoolAllocationBase.h" 00013 #include "ace/Reactor.h" 00014 #include "ace/Thread.h" 00015 #include "ace/Condition_Thread_Mutex.h" 00016 #include "RcEventHandler.h" 00017 #include "dcps_export.h" 00018 #include "unique_ptr.h" 00019 00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00021 00022 namespace OpenDDS { 00023 namespace DCPS { 00024 00025 class OpenDDS_Dcps_Export ReactorInterceptor : public RcEventHandler { 00026 public: 00027 00028 class Command 00029 : public PoolAllocationBase 00030 , public EnableContainerSupportedUniquePtr<Command> { 00031 public: 00032 virtual ~Command() { } 00033 virtual void execute() = 0; 00034 }; 00035 typedef container_supported_unique_ptr<Command> CommandPtr; 00036 00037 bool should_execute_immediately(); 00038 void process_command_queue(); 00039 00040 template<typename T> 00041 void execute_or_enqueue(T& t) 00042 { 00043 if (should_execute_immediately()) { 00044 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00045 process_command_queue(); 00046 t.execute(); 00047 } else { 00048 enqueue(t); 00049 } 00050 } 00051 00052 template<typename T> 00053 void enqueue(T& t) 00054 { 00055 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00056 command_queue_.push(CommandPtr(new T(t))); 00057 this->reactor()->notify(this); 00058 } 00059 00060 void wait(); 00061 00062 virtual bool reactor_is_shut_down() const = 0; 00063 00064 protected: 00065 ReactorInterceptor(ACE_Reactor* reactor, 00066 ACE_thread_t owner); 00067 00068 virtual ~ReactorInterceptor(); 00069 int handle_exception(ACE_HANDLE /*fd*/); 00070 int handle_exception_i(ACE_Guard<ACE_Thread_Mutex>& guard); 00071 ACE_thread_t owner_; 00072 ACE_Thread_Mutex mutex_; 00073 ACE_Condition_Thread_Mutex condition_; 00074 OPENDDS_QUEUE(CommandPtr) command_queue_; 00075 }; 00076 00077 } // namespace DCPS 00078 } // namespace OpenDDS 00079 00080 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00081 00082 #endif /* OPENDDS_DCPS_REACTORINTERCEPTOR_H */