OpenDDS  Snapshot(2023/04/28-20:55)
ReactorTask.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_REACTORTASK_H
9 #define OPENDDS_DCPS_REACTORTASK_H
10 
11 #include "dcps_export.h"
12 #include "RcObject.h"
13 #include "TimeTypes.h"
14 #include "ReactorInterceptor.h"
15 #include "SafetyProfileStreams.h"
16 #include "ConditionVariable.h"
17 #include "ThreadStatusManager.h"
18 
19 #include <ace/Task.h>
20 #include <ace/Synch_Traits.h>
21 #include <ace/Timer_Heap_T.h>
23 
25 class ACE_Proactor;
26 class ACE_Reactor;
28 
30 
31 namespace OpenDDS {
32 namespace DCPS {
33 
35 public virtual RcObject {
36 
37 public:
38 
39  explicit ReactorTask(bool useAsyncSend);
40  virtual ~ReactorTask();
41 
42 public:
43  int open_reactor_task(void*,
44  ThreadStatusManager* thread_status_manager = 0,
45  const String& name = "");
46  virtual int open(void* ptr) {
47  return open_reactor_task(ptr);
48  }
49  virtual int svc();
50  virtual int close(u_long flags = 0);
51 
52  void stop();
53 
54  ACE_Reactor* get_reactor();
55  const ACE_Reactor* get_reactor() const;
56 
57  ACE_thread_t get_reactor_owner() const;
58 
59  ACE_Proactor* get_proactor();
60  const ACE_Proactor* get_proactor() const;
61 
62  void wait_for_startup() const;
63 
64  bool is_shut_down() const;
65 
66  ReactorInterceptor_rch interceptor() const;
67 
69 
70 private:
71 
72  virtual void reactor(ACE_Reactor* reactor);
73  virtual ACE_Reactor* reactor() const;
74 
75  void cleanup();
76  void wait_for_startup_i() const;
77 
81  typedef ACE_Timer_Heap_T<
84 
85  enum State { STATE_UNINITIALIZED, STATE_OPENING, STATE_RUNNING, STATE_SHUT_DOWN };
86 
88  public:
90  : ReactorInterceptor(reactor, owner)
91  , task_(task)
92  {}
93  bool reactor_is_shut_down() const
94  {
95  return task_->is_shut_down();
96  }
97 
98  private:
100  };
101 
102  mutable LockType lock_;
103  mutable ConditionVariableType condition_;
108 
109 #if defined ACE_WIN32 && defined ACE_HAS_WIN32_OVERLAPPED_IO
110 #define OPENDDS_REACTOR_TASK_ASYNC
111  bool use_async_send_;
112 #endif
113 
114  TimerQueueType* timer_queue_;
115 
116  // thread status reporting
118 
121 };
122 
123 } // namespace DCPS
124 } // namespace OpenDDS
125 
127 
128 #if defined (__ACE_INLINE__)
129 #include "ReactorTask.inl"
130 #endif /* __ACE_INLINE__ */
131 
132 #endif /* OPENDDS_DCPS_REACTORTASK_H */
ConditionVariableType condition_
Definition: ReactorTask.h:103
#define ACE_BEGIN_VERSIONED_NAMESPACE_DECL
#define ACE_SYNCH_MUTEX
ACE_Timer_Heap_T< ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX, MonotonicClock > TimerQueueType
Definition: ReactorTask.h:83
std::string String
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
ThreadStatusManager * thread_status_manager_
Definition: ReactorTask.h:120
ReactorInterceptor_rch interceptor_
Definition: ReactorTask.h:119
int close(ACE_HANDLE handle)
ACE_Guard< LockType > GuardType
Definition: ReactorTask.h:79
ACE_Proactor * proactor_
Definition: ReactorTask.h:107
#define ACE_SYNCH_RECURSIVE_MUTEX
DWORD ACE_thread_t
#define ACE_END_VERSIONED_NAMESPACE_DECL
const char *const name
Definition: debug.cpp:60
ConditionVariable< LockType > ConditionVariableType
Definition: ReactorTask.h:80
TimerQueueType * timer_queue_
Definition: ReactorTask.h:114
#define OPENDDS_POOL_ALLOCATION_FWD
ACE_SYNCH_MUTEX LockType
Definition: ReactorTask.h:78
virtual int open(void *ptr)
Definition: ReactorTask.h:46
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
Interceptor(DCPS::ReactorTask *task, ACE_Reactor *reactor, ACE_thread_t owner)
Definition: ReactorTask.h:89
DCPS::ReactorTask *const task_
Definition: ReactorTask.h:99
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28