OpenDDS  Snapshot(2023/04/28-20:55)
ConditionVariable.h
Go to the documentation of this file.
1 #ifndef OPENDDS_DCPS_CONDITIONVARIABLE_H
2 #define OPENDDS_DCPS_CONDITIONVARIABLE_H
3 
4 #include "ThreadStatusManager.h"
5 #include "TimeTypes.h"
6 #include "debug.h"
7 
10 #include <ace/Condition_T.h>
12 #include <ace/OS_NS_Thread.h>
13 
14 #ifndef ACE_LACKS_PRAGMA_ONCE
15 # pragma once
16 #endif
17 
19 
20 namespace OpenDDS {
21 namespace DCPS {
22 
23 /**
24  * This is the return type of ConditionVariable::wait* functions.
25  */
26 enum CvStatus {
27  CvStatus_NoTimeout, ///< The wait has returned because it was woken up
28  CvStatus_Timeout, ///< The wait has returned because of a timeout
30  * The wait has returned because of an error.
31  * The errno-given reason was logged.
32  */
33 };
34 
35 /**
36  * ACE_Condition wrapper based on std::condition_variable that enforces
37  * monotonic time behavior.
38  *
39  * Besides the fact that it only works with ACE Mutexes, the major difference
40  * between this and std::condition_variable_any, the generalized form of
41  * std::condition_variable, is that it takes the mutex as a constructor
42  * argument, where the std::condition_variables take them as method arguments.
43  */
44 template <typename Mutex>
46 public:
47  explicit ConditionVariable(Mutex& mutex)
49  {
50  }
51 
52  /// Block until thread is woken up.
53  CvStatus wait(ThreadStatusManager& thread_status_manager)
54  {
55  ThreadStatusManager::Sleeper s(thread_status_manager);
56  if (impl_.wait() == 0) {
57  return CvStatus_NoTimeout;
58  }
59  if (DCPS_debug_level) {
60  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ConditionVariable::wait: %p\n"));
61  }
62  return CvStatus_Error;
63  }
64 
65  /// Block until woken up or until expire_at. Same as wait() if expire_at is zero.
66  CvStatus wait_until(const MonotonicTimePoint& expire_at, ThreadStatusManager& thread_status_manager)
67  {
68  if (expire_at.is_zero()) {
69  return wait(thread_status_manager);
70  }
71  ThreadStatusManager::Sleeper s(thread_status_manager);
72  if (impl_.wait(&expire_at.value()) == 0) {
73  return CvStatus_NoTimeout;
74  } else if (errno == ETIME) {
75  return CvStatus_Timeout;
76  }
77  if (DCPS_debug_level) {
78  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ConditionVariable::wait_until: %m\n"));
79  }
80  return CvStatus_Error;
81  }
82 
83  /// Block until woken up or for expire_in. Same as wait() if expire_in is zero.
84  CvStatus wait_for(const TimeDuration& expire_in, ThreadStatusManager& thread_status_manager)
85  {
86  if (expire_in.is_zero()) {
87  return wait(thread_status_manager);
88  }
89  ThreadStatusManager::Sleeper s(thread_status_manager);
90  return wait_until(MonotonicTimePoint::now() + expire_in);
91  }
92 
93  /// Unblock one of the threads waiting on this condition.
94  bool notify_one()
95  {
96  if (impl_.signal() == 0) {
97  return true;
98  }
99  if (DCPS_debug_level) {
100  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ConditionVariable::notify_one: %m\n"));
101  }
102  return false;
103  }
104 
105  /// Unblock all of the threads waiting on this condition.
106  bool notify_all()
107  {
108  if (impl_.broadcast() == 0) {
109  return true;
110  }
111  if (DCPS_debug_level) {
112  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ConditionVariable::notify_all: %m\n"));
113  }
114  return false;
115  }
116 
117 protected:
119 };
120 
121 } // namespace DCPS
122 } // namespace OpenDDS
123 
125 
126 #endif // OPENDDS_DCPS_CONDITIONVARIABLE_H
#define ACE_ERROR(X)
bool notify_one()
Unblock one of the threads waiting on this condition.
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
int wait(const ACE_Time_Value *abstime)
#define ETIME
bool notify_all()
Unblock all of the threads waiting on this condition.
The wait has returned because of a timeout.
CvStatus wait_for(const TimeDuration &expire_in, ThreadStatusManager &thread_status_manager)
Block until woken up or for expire_in. Same as wait() if expire_in is zero.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_SYNCH_MUTEX Mutex
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int broadcast(void)
The wait has returned because it was woken up.
LM_ERROR
const ACE_Time_Value_T< AceClock > & value() const
Definition: TimePoint_T.inl:49
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int signal(void)