Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_REACTORTASK_H 9 : #define OPENDDS_DCPS_REACTORTASK_H 10 : 11 : #include "dcps_export.h" 12 : #include "RcObject.h" 13 : #include "TimeTypes.h" 14 : #include "ReactorInterceptor.h" 15 : #include "SafetyProfileStreams.h" 16 : #include "ConditionVariable.h" 17 : #include "ThreadStatusManager.h" 18 : 19 : #include <ace/Task.h> 20 : #include <ace/Synch_Traits.h> 21 : #include <ace/Timer_Heap_T.h> 22 : #include <ace/Event_Handler_Handle_Timeout_Upcall.h> 23 : 24 : ACE_BEGIN_VERSIONED_NAMESPACE_DECL 25 : class ACE_Proactor; 26 : class ACE_Reactor; 27 : ACE_END_VERSIONED_NAMESPACE_DECL 28 : 29 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 30 : 31 : namespace OpenDDS { 32 : namespace DCPS { 33 : 34 : class OpenDDS_Dcps_Export ReactorTask : public virtual ACE_Task_Base, 35 : public virtual RcObject { 36 : 37 : public: 38 : 39 : explicit ReactorTask(bool useAsyncSend); 40 : virtual ~ReactorTask(); 41 : 42 : public: 43 : int open_reactor_task(void*, 44 : ThreadStatusManager* thread_status_manager = 0, 45 : const String& name = ""); 46 0 : virtual int open(void* ptr) { 47 0 : return open_reactor_task(ptr); 48 : } 49 : virtual int svc(); 50 : virtual int close(u_long flags = 0); 51 : 52 : void stop(); 53 : 54 : ACE_Reactor* get_reactor(); 55 : const ACE_Reactor* get_reactor() const; 56 : 57 : ACE_thread_t get_reactor_owner() const; 58 : 59 : ACE_Proactor* get_proactor(); 60 : const ACE_Proactor* get_proactor() const; 61 : 62 : void wait_for_startup() const; 63 : 64 : bool is_shut_down() const; 65 : 66 : ReactorInterceptor_rch interceptor() const; 67 : 68 : OPENDDS_POOL_ALLOCATION_FWD 69 : 70 : private: 71 : 72 : virtual void reactor(ACE_Reactor* reactor); 73 : virtual ACE_Reactor* reactor() const; 74 : 75 : void cleanup(); 76 : void wait_for_startup_i() const; 77 : 78 : typedef ACE_SYNCH_MUTEX LockType; 79 : typedef ACE_Guard<LockType> GuardType; 80 : typedef ConditionVariable<LockType> ConditionVariableType; 81 : typedef ACE_Timer_Heap_T< 82 : ACE_Event_Handler*, ACE_Event_Handler_Handle_Timeout_Upcall, 83 : ACE_SYNCH_RECURSIVE_MUTEX, MonotonicClock> TimerQueueType; 84 : 85 : enum State { STATE_UNINITIALIZED, STATE_OPENING, STATE_RUNNING, STATE_SHUT_DOWN }; 86 : 87 : class Interceptor : public DCPS::ReactorInterceptor { 88 : public: 89 9 : Interceptor(DCPS::ReactorTask* task, ACE_Reactor* reactor, ACE_thread_t owner) 90 9 : : ReactorInterceptor(reactor, owner) 91 9 : , task_(task) 92 9 : {} 93 23 : bool reactor_is_shut_down() const 94 : { 95 23 : return task_->is_shut_down(); 96 : } 97 : 98 : private: 99 : DCPS::ReactorTask* const task_; 100 : }; 101 : 102 : mutable LockType lock_; 103 : mutable ConditionVariableType condition_; 104 : State state_; 105 : ACE_Reactor* reactor_; 106 : ACE_thread_t reactor_owner_; 107 : ACE_Proactor* proactor_; 108 : 109 : #if defined ACE_WIN32 && defined ACE_HAS_WIN32_OVERLAPPED_IO 110 : #define OPENDDS_REACTOR_TASK_ASYNC 111 : bool use_async_send_; 112 : #endif 113 : 114 : TimerQueueType* timer_queue_; 115 : 116 : // thread status reporting 117 : String name_; 118 : 119 : ReactorInterceptor_rch interceptor_; 120 : ThreadStatusManager* thread_status_manager_; 121 : }; 122 : 123 : } // namespace DCPS 124 : } // namespace OpenDDS 125 : 126 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 127 : 128 : #if defined (__ACE_INLINE__) 129 : #include "ReactorTask.inl" 130 : #endif /* __ACE_INLINE__ */ 131 : 132 : #endif /* OPENDDS_DCPS_REACTORTASK_H */