OpenDDS  Snapshot(2023/04/28-20:55)
ReactorInterceptor.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "ace/Log_Msg.h"
11 #include "ace/Reverse_Lock_T.h"
12 #include "ace/Synch.h"
13 
14 #include "ReactorInterceptor.h"
15 #include "Service_Participant.h"
16 
18 
19 namespace OpenDDS {
20 namespace DCPS {
21 
23  : reactor_(0)
24 {}
25 
27  ACE_thread_t owner)
28  : owner_(owner)
29  , state_(RS_NONE)
30 {
31  RcEventHandler::reactor(reactor);
32 }
33 
35 {
36 }
37 
39 {
40  OPENDDS_ASSERT(command);
41 
43 
44  // Only allow immediate execution if running on the reactor thread, otherwise we risk deadlock
45  // when calling into the reactor object.
46  const bool is_owner = ACE_OS::thr_equal(owner_, ACE_Thread::self());
47 
48  // If state is set to processing, the conents of command_queue_ have been swapped out
49  // so immediate execution may run jobs out of the expected order.
50  const bool is_not_processing = state_ != RS_PROCESSING;
51 
52  // If the command_queue_ is not empty, allowing execution will potentially run unexpected code
53  // which is problematic since we may be holding locks used by the unexpected code.
54  const bool is_empty = command_queue_.empty();
55 
56  // If all three of these conditions are met, it should be safe to execute
57  const bool is_safe_to_execute = is_owner && is_not_processing && is_empty;
58 
59  // Even if it's not normally safe to execute, allow immediate execution if the reactor is shut down
60  const bool immediate = is_safe_to_execute || reactor_is_shut_down();
61 
62  // Always set reactor and push to the queue
63  ACE_Reactor* local_reactor = ACE_Event_Handler::reactor();
64  command->set_reactor(local_reactor);
65  command_queue_.push_back(command);
66 
67  // But depending on whether we're running it immediately or not, we either process or notify
68  if (immediate) {
70  } else if (state_ == RS_NONE) {
72  guard.release();
73  local_reactor->notify(this);
74  }
75  return command;
76 }
77 
79 {
80  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
81 
84  return 0;
85 }
86 
88 {
89  Queue cq;
91 
93  if (!command_queue_.empty()) {
94  cq.swap(command_queue_);
96  for (Queue::const_iterator pos = cq.begin(), limit = cq.end(); pos != limit; ++pos) {
97  (*pos)->execute();
98  }
99  }
100  if (!command_queue_.empty()) {
103  guard.release();
104  reactor->notify(this);
105  } else {
106  state_ = RS_NONE;
107  }
108 }
109 
111 {
114 }
115 
117 {
120 }
121 
123 {
124  if (reactor()->register_handler(io_handle_, event_handler_, mask_) != 0) {
125  if (log_level >= LogLevel::Error) {
127  "(%P|%t) ERROR: RegisterHandler::execute: failed to register handler for socket %d\n",
128  io_handle_));
129  }
130  }
131 }
132 
134 {
135  if (reactor()->remove_handler(io_handle_, mask_) != 0) {
136  if (log_level >= LogLevel::Error) {
138  "(%P|%t) ERROR: UnregisterHandler::execute: failed to remove handler for socket %d\n",
139  io_handle_));
140  }
141  }
142 }
143 
144 }
145 }
146 
#define ACE_ERROR(X)
virtual bool reactor_is_shut_down() const =0
CommandPtr execute_or_enqueue(CommandPtr command)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
virtual ACE_Reactor * reactor() const
void process_command_queue_i(ACE_Guard< ACE_Thread_Mutex > &guard)
int release(void)
int notify(ACE_Event_Handler *event_handler=0, ACE_Reactor_Mask masks=ACE_Event_Handler::EXCEPT_MASK, ACE_Time_Value *timeout=0)
DWORD ACE_thread_t
static ACE_thread_t self(void)
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
virtual ACE_Reactor * reactor(void) const
OpenDDS_Dcps_Export LogLevel log_level
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
ReactorInterceptor(ACE_Reactor *reactor, ACE_thread_t owner)