Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_REACTORINTERCEPTOR_H 9 : #define OPENDDS_DCPS_REACTORINTERCEPTOR_H 10 : 11 : #include "PoolAllocator.h" 12 : #include "PoolAllocationBase.h" 13 : #include "RcEventHandler.h" 14 : #include "dcps_export.h" 15 : #include "unique_ptr.h" 16 : #include "RcHandle_T.h" 17 : #include "ConditionVariable.h" 18 : 19 : #include <ace/Reactor.h> 20 : #include <ace/Thread.h> 21 : #include <ace/Thread_Mutex.h> 22 : 23 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 24 : 25 : namespace OpenDDS { 26 : namespace DCPS { 27 : 28 : class OpenDDS_Dcps_Export ReactorInterceptor : public virtual RcEventHandler { 29 : public: 30 : 31 : class OpenDDS_Dcps_Export Command 32 : : public virtual RcObject { 33 : public: 34 : Command(); 35 32 : virtual ~Command() { } 36 : 37 : virtual void execute() = 0; 38 : 39 : protected: 40 : const ACE_Reactor* reactor() const { return reactor_; } 41 0 : ACE_Reactor* reactor() { return reactor_; } 42 : 43 : private: 44 : friend class OpenDDS::DCPS::ReactorInterceptor; 45 32 : void set_reactor(ACE_Reactor* reactor) { reactor_ = reactor; } 46 : 47 : ACE_Reactor* reactor_; 48 : }; 49 : typedef RcHandle<Command> CommandPtr; 50 : 51 : CommandPtr execute_or_enqueue(CommandPtr command); 52 : 53 : virtual bool reactor_is_shut_down() const = 0; 54 : 55 : virtual void reactor(ACE_Reactor *reactor); 56 : virtual ACE_Reactor* reactor() const; 57 : 58 : protected: 59 : 60 : enum ReactorState { 61 : RS_NONE, 62 : RS_NOTIFIED, 63 : RS_PROCESSING 64 : }; 65 : 66 : ReactorInterceptor(ACE_Reactor* reactor, 67 : ACE_thread_t owner); 68 : 69 : virtual ~ReactorInterceptor(); 70 : int handle_exception(ACE_HANDLE /*fd*/); 71 : void process_command_queue_i(ACE_Guard<ACE_Thread_Mutex>& guard); 72 : 73 : ACE_thread_t owner_; 74 : mutable ACE_Thread_Mutex mutex_; 75 : typedef OPENDDS_VECTOR(CommandPtr) Queue; 76 : Queue command_queue_; 77 : ReactorState state_; 78 : }; 79 : 80 : typedef RcHandle<ReactorInterceptor> ReactorInterceptor_rch; 81 : typedef WeakRcHandle<ReactorInterceptor> ReactorInterceptor_wrch; 82 : 83 : class OpenDDS_Dcps_Export RegisterHandler : public ReactorInterceptor::Command { 84 : public: 85 : RegisterHandler(ACE_HANDLE io_handle, 86 : ACE_Event_Handler* event_handler, 87 : ACE_Reactor_Mask mask) 88 : : io_handle_(io_handle) 89 : , event_handler_(event_handler) 90 : , mask_(mask) 91 : {} 92 : 93 : private: 94 : ACE_HANDLE io_handle_; 95 : ACE_Event_Handler* event_handler_; 96 : ACE_Reactor_Mask mask_; 97 : 98 : void execute(); 99 : }; 100 : 101 : class OpenDDS_Dcps_Export RemoveHandler : public ReactorInterceptor::Command { 102 : public: 103 : RemoveHandler(ACE_HANDLE io_handle, 104 : ACE_Reactor_Mask mask) 105 : : io_handle_(io_handle) 106 : , mask_(mask) 107 : {} 108 : 109 : private: 110 : ACE_HANDLE io_handle_; 111 : ACE_Reactor_Mask mask_; 112 : 113 : void execute(); 114 : }; 115 : 116 : } // namespace DCPS 117 : } // namespace OpenDDS 118 : 119 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 120 : 121 : #endif /* OPENDDS_DCPS_REACTORINTERCEPTOR_H */