OpenDDS  Snapshot(2023/04/28-20:55)
WaitSet.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "WaitSet.h"
11 
12 #include "ConditionImpl.h"
13 #include "Time_Helper.h"
14 #include "TimeTypes.h"
15 #include "Service_Participant.h"
16 
17 namespace {
18 
19 void copyInto(DDS::ConditionSeq& target,
20  const DDS::WaitSet::ConditionSet& source)
21 {
22  const CORBA::ULong size = static_cast<CORBA::ULong>(source.size());
23  target.length(size);
24  CORBA::ULong index = 0;
25 
26  for (DDS::WaitSet::ConditionSet::const_iterator iter = source.begin(),
27  end = source.end(); iter != end; ++iter, ++index) {
28  target[index] = *iter;
29  }
30 }
31 
32 } // namespace
33 
35 
36 namespace DDS {
37 
39 {
41  Condition_var condv(Condition::_duplicate(cond));
42 
45  ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
46 
47  if (!ci) return RETCODE_BAD_PARAMETER;
48 
49  ReturnCode_t ret = ci->attach_to_ws(this);
50 
51  if (ret == RETCODE_OK) {
52  attached_conditions_.insert(condv);
53 
54  if (condv->get_trigger_value()) signal(condv.in());
55 
56  return RETCODE_OK;
57 
58  } else if (ret == RETCODE_PRECONDITION_NOT_MET) {
59  // RETCODE_PRECONDITION_NOT_MET means it was already in the set
60  return RETCODE_OK;
61  }
62 
63  return ret;
64 }
65 
67 {
70  return detach_i(cond);
71 }
72 
74 {
77 
78  for (CORBA::ULong i = 0; i < conds.length(); ++i) {
79  ReturnCode_t ret = detach_i(conds[ i]);
80 
81  if (ret != RETCODE_OK) {
82  return ret;
83  }
84  }
85 
86  return RETCODE_OK;
87 }
88 
89 ReturnCode_t WaitSet::detach_i(const Condition_ptr cond)
90 {
92  Condition_var condv(Condition::_duplicate(cond));
93 
94  ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
95 
96  if (!ci) return RETCODE_BAD_PARAMETER;
97 
98  ReturnCode_t ret = ci->detach_from_ws(this);
99  attached_conditions_.erase(condv);
100  signaled_conditions_.erase(condv);
101  return ret;
102 }
103 
105 {
108  copyInto(conds, attached_conditions_);
109  return RETCODE_OK;
110 }
111 
113  const Duration_t& timeout)
114 {
115  using namespace OpenDDS::DCPS;
116 
117  if (!non_negative_duration(timeout)) {
119  }
120 
121  MonotonicTimePoint deadline;
122  const bool use_deadline = !is_infinite(timeout);
123  if (use_deadline) {
124  deadline = MonotonicTimePoint::now() + TimeDuration(timeout);
125  }
126 
129 
130  if (waiting_) {
132  }
133 
134  waiting_ = true;
135  signaled_conditions_.clear();
136 
137  for (ConditionSet::const_iterator iter = attached_conditions_.begin(),
138  end = attached_conditions_.end(); iter != end; ++iter) {
139  if ((*iter)->get_trigger_value()) {
140  signaled_conditions_.insert(*iter);
141  }
142  }
143 
144  CvStatus status = CvStatus_NoTimeout;
145  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
146  while ((attached_conditions_.empty() || signaled_conditions_.empty()) &&
147  status == CvStatus_NoTimeout) {
148  status = use_deadline ? cond_.wait_until(deadline, thread_status_manager) : cond_.wait(thread_status_manager);
149  }
150 
151  copyInto(active_conditions, signaled_conditions_);
152  signaled_conditions_.clear();
153  waiting_ = false;
154 
155  switch (status) {
156  case CvStatus_NoTimeout:
157  return RETCODE_OK;
158 
159  case CvStatus_Timeout:
160  return RETCODE_TIMEOUT;
161 
162  case CvStatus_Error:
163  default:
164  if (DCPS_debug_level) {
165  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WaitSet::wait: wait_until failed\n"));
166  }
167  return RETCODE_ERROR;
168  }
169 }
170 
171 void WaitSet::signal(Condition_ptr condition)
172 {
173  Condition_var condv(Condition::_duplicate(condition));
175 
176  if (attached_conditions_.find(condv) != attached_conditions_.end()) {
177  signaled_conditions_.insert(condv);
178  cond_.notify_one();
179  }
180 }
181 
183 {
184  if (!CORBA::is_nil(obj)) obj->_add_ref();
185 
186  return obj;
187 }
188 
189 } // namespace DDS
190 
192 
195 {
196  return DDS::WaitSet::_duplicate(p);
197 }
198 
199 void
201 {
202  CORBA::release(p);
203 }
204 
207 {
208  return static_cast<DDS::WaitSet_ptr>(0);
209 }
210 
213  TAO_OutputCDR& cdr)
214 {
215  return CORBA::Object::marshal(p, cdr);
216 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
void release(T x)
ReturnCode_t wait(ConditionSeq &active_conditions, const Duration_t &timeout)
Definition: WaitSet.cpp:112
ReturnCode_t detach_conditions(const ConditionSeq &conditions)
Definition: WaitSet.cpp:73
ACE_INLINE OpenDDS_Dcps_Export bool non_negative_duration(const DDS::Duration_t &t)
ReturnCode_t detach_i(const Condition_ptr cond)
Definition: WaitSet.cpp:89
ACE_Recursive_Thread_Mutex lock_
Definition: WaitSet.h:87
bool notify_one()
Unblock one of the threads waiting on this condition.
virtual CORBA::Boolean marshal(TAO_OutputCDR &cdr)
Definition: Object.cpp:75
sequence< Condition > ConditionSeq
ReturnCode_t detach_condition(Condition_ptr cond)
Definition: WaitSet.cpp:66
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
ReturnCode_t get_conditions(ConditionSeq &attached_conditions)
Definition: WaitSet.cpp:104
ACE_CDR::ULong ULong
ACE_CDR::Boolean Boolean
const ReturnCode_t RETCODE_TIMEOUT
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ConditionVariableType cond_
Definition: WaitSet.h:90
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
static WaitSet_ptr _duplicate(WaitSet_ptr obj)
Definition: WaitSet.cpp:182
The End User API.
The wait has returned because of a timeout.
ReturnCode_t attach_condition(Condition_ptr cond)
Definition: WaitSet.cpp:38
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
ACE_INLINE OpenDDS_Dcps_Export bool is_infinite(const DDS::Duration_t &value)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::ReturnCode_t detach_from_ws(DDS::WaitSet_ptr ws)
DDS::ReturnCode_t attach_to_ws(DDS::WaitSet_ptr ws)
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const ReturnCode_t RETCODE_OK
ConditionSet signaled_conditions_
Definition: WaitSet.h:94
bool waiting_
Definition: WaitSet.h:92
The wait has returned because it was woken up.
#define TheServiceParticipant
LM_ERROR
void signal(Condition_ptr cond)
Definition: WaitSet.cpp:171
Boolean is_nil(T x)
ConditionSet attached_conditions_
Definition: WaitSet.h:93
const ReturnCode_t RETCODE_BAD_PARAMETER