OpenDDS  Snapshot(2023/04/28-20:55)
ReactorTask.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
9 
10 #include "ReactorTask.h"
11 #if !defined (__ACE_INLINE__)
12 #include "ReactorTask.inl"
13 #endif /* __ACE_INLINE__ */
14 
15 #include "Service_Participant.h"
16 
17 #include <ace/Select_Reactor.h>
18 #include <ace/WFMO_Reactor.h>
19 #include <ace/Proactor.h>
20 #include <ace/Proactor_Impl.h>
21 #include <ace/WIN32_Proactor.h>
22 #include <ace/OS_NS_Thread.h>
23 
24 #include <exception>
25 #include <cstring>
26 
28 
29 namespace OpenDDS {
30 namespace DCPS {
31 
32 ReactorTask::ReactorTask(bool useAsyncSend)
33  : condition_(lock_)
34  , state_(STATE_UNINITIALIZED)
35  , reactor_(0)
36  , reactor_owner_(ACE_OS::NULL_thread)
37  , proactor_(0)
38 #ifdef OPENDDS_REACTOR_TASK_ASYNC
39  , use_async_send_(useAsyncSend)
40 #endif
41  , timer_queue_(0)
42  , thread_status_manager_(0)
43 {
44  ACE_UNUSED_ARG(useAsyncSend);
45 }
46 
48 {
49  cleanup();
50 }
51 
53 {
57  TheServiceParticipant->get_thread_status_manager());
58  }
59 }
60 
62 {
63 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
64  if (proactor_) {
68  delete proactor_;
69  proactor_ = 0;
70  }
71 #endif
72 
73  delete reactor_;
74  reactor_ = 0;
75  delete timer_queue_;
76  timer_queue_ = 0;
77 }
78 
80  ThreadStatusManager* thread_status_manager,
81  const String& name)
82 {
83  GuardType guard(lock_);
84 
85  // If we've already been opened, let's clean up the old stuff
86  cleanup();
87 
88  // thread status reporting support
89  thread_status_manager_ = thread_status_manager;
90  name_ = name;
91 
92  // Set our reactor and proactor pointers to a new reactor/proactor objects.
93 #ifdef OPENDDS_REACTOR_TASK_ASYNC
94  if (use_async_send_ && !reactor_) {
96 
97  ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1);
98  proactor_ = new ACE_Proactor(proactor_impl, 1);
99  reactor_->register_handler(proactor_impl, proactor_impl->get_handle());
100  } else
101 #endif
102  if (!reactor_) {
103  reactor_ = new ACE_Reactor(new ACE_Select_Reactor, true);
104  proactor_ = 0;
105  }
106 
107  if (!timer_queue_) {
110  }
111 
114 
115  if (activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) {
117  "(%P|%t) ERROR: ReactorTask Failed to activate "
118  "itself.\n"),
119  -1);
120  }
121 
122  while (state_ != STATE_RUNNING) {
123  condition_.wait(*thread_status_manager);
124  }
125 
126  return 0;
127 }
128 
130 {
132 
133  {
134  GuardType guard(lock_);
135 
136  // First off - We need to obtain our own reference to ourselves such
137  // that we don't get deleted while still running in our own thread.
138  // In essence, our current thread "owns" a copy of our reference.
139  // It's all done with the magic of intrusive reference counting!
140  _add_ref();
141 
142  // Ignore all signals to avoid
143  // ERROR: <something descriptive> Interrupted system call
144  // The main thread will handle signals.
145  sigset_t set;
146  ACE_OS::sigfillset(&set);
147  ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
148 
149  // Tell the reactor that this thread will be its owner
152  "(%P|%t) ERROR: Failed to change the reactor's owner().\n"));
153  }
155 
156  interceptor_ = make_rch<Interceptor>(this, reactor_, reactor_owner_);
157 
158  // Advance the state.
161  }
162 
163  // Tell the reactor to handle events.
165  while (state_ == STATE_RUNNING) {
169  }
170 
171  } else {
173  }
174 
175  return 0;
176 }
177 
178 int ReactorTask::close(u_long flags)
179 {
180  ACE_UNUSED_ARG(flags);
181  // This is called after the reactor threads exit.
182  // We should not set state here since we are not
183  // sure how many reactor threads we will use.
184  // If there is one reactor thread then we should
185  // set the state so the stop will not call
186  // end_reactor_event_loop.
187  // If there are multiple reactor threads, we still
188  // need call end_reactor_event_loop in stop() while
189  // one reactor thread already exited.
190 //MJM: Right.
191 
192  _remove_ref();
193  return 0;
194 }
195 
197 {
198  ACE_Reactor* reactor = 0;
199  {
200  GuardType guard(lock_);
201 
203  // We are already "stopped". Just return.
204  return;
205  }
206 
208 
209 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
210  // Remove the proactor handler so the reactor stops forwarding messages.
211  if (proactor_) {
215  }
216 #endif
217  reactor = reactor_;
218  }
219 
220  if (reactor) {
221  // We can't hold the lock when we call this, because the reactor threads may need to
222  // access the lock as part of normal execution before they return to the reactor control loop
223  reactor->end_reactor_event_loop();
224  }
225 
226  {
227  GuardType guard(lock_);
228 
229  // In the future, we will likely want to replace this assert with a new "SHUTTING_DOWN" state
230  // which can be used to delay any potential new calls to open_reactor_task()
232 
233  // Let's wait for the reactor task's thread to complete before we
234  // leave this stop method.
237  wait();
238 
239  // Reset the thread manager in case it goes away before the next open.
240  thr_mgr(0);
241  }
242  }
243 }
244 
246 {
248 }
249 
251 {
253 }
254 
255 } // namespace DCPS
256 } // namespace OpenDDS
257 
ConditionVariableType condition_
Definition: ReactorTask.h:103
void thread_status_interval(const TimeDuration &thread_status_interval)
#define ACE_ERROR(X)
ACE_Timer_Heap_T< ACE_Event_Handler *, ACE_Event_Handler_Handle_Timeout_Upcall, ACE_SYNCH_RECURSIVE_MUTEX, MonotonicClock > TimerQueueType
Definition: ReactorTask.h:83
std::string String
virtual int close(u_long flags=0)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
ThreadStatusManager * thread_status_manager_
Definition: ReactorTask.h:120
virtual ACE_HANDLE get_handle(void) const=0
ACE_Proactor_Impl * implementation(void) const
virtual ACE_HANDLE get_handle(void) const
ACE_Guard< ACE_Thread_Mutex > lock_
ReactorInterceptor_rch interceptor_
Definition: ReactorTask.h:119
ACE_thread_t NULL_thread
virtual void _add_ref()
Definition: RcObject.h:69
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
ACE_Proactor * proactor_
Definition: ReactorTask.h:107
int thr_sigsetmask(int how, const sigset_t *nsm, sigset_t *osm)
void wait_for_startup_i() const
Definition: ReactorTask.cpp:52
ACE_Thread_Manager * thr_mgr(void) const
static ACE_Thread_Manager * instance(void)
int end_reactor_event_loop(void)
virtual void _remove_ref()
Definition: RcObject.h:74
virtual ACE_Reactor * reactor() const
virtual ACE_Reactor * reactor(void) const
bool notify_all()
Unblock all of the threads waiting on this condition.
const char *const name
Definition: debug.cpp:60
virtual int wait(void)
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
#define SIG_SETMASK
TimerQueueType * timer_queue_
Definition: ReactorTask.h:114
ReactorTask(bool useAsyncSend)
Definition: ReactorTask.cpp:32
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int thr_self(ACE_hthread_t &)
#define ACE_ERROR_RETURN(X, Y)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
#define TheServiceParticipant
int open_reactor_task(void *, ThreadStatusManager *thread_status_manager=0, const String &name="")
Definition: ReactorTask.cpp:79
int sigfillset(sigset_t *s)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int owner(ACE_thread_t new_owner, ACE_thread_t *old_owner=0)
int timer_queue(ACE_Timer_Queue *tq)
int run_reactor_event_loop(REACTOR_EVENT_HOOK=0)