Line data Source code
1 : #ifndef OPENDDS_DCPS_CONDITIONVARIABLE_H 2 : #define OPENDDS_DCPS_CONDITIONVARIABLE_H 3 : 4 : #include "ThreadStatusManager.h" 5 : #include "TimeTypes.h" 6 : #include "debug.h" 7 : 8 : #include <ace/Condition_Thread_Mutex.h> 9 : #include <ace/Condition_Recursive_Thread_Mutex.h> 10 : #include <ace/Condition_T.h> 11 : #include <ace/Condition_Attributes.h> 12 : #include <ace/OS_NS_Thread.h> 13 : 14 : #ifndef ACE_LACKS_PRAGMA_ONCE 15 : # pragma once 16 : #endif 17 : 18 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 19 : 20 : namespace OpenDDS { 21 : namespace DCPS { 22 : 23 : /** 24 : * This is the return type of ConditionVariable::wait* functions. 25 : */ 26 : enum CvStatus { 27 : CvStatus_NoTimeout, ///< The wait has returned because it was woken up 28 : CvStatus_Timeout, ///< The wait has returned because of a timeout 29 : CvStatus_Error /**< 30 : * The wait has returned because of an error. 31 : * The errno-given reason was logged. 32 : */ 33 : }; 34 : 35 : /** 36 : * ACE_Condition wrapper based on std::condition_variable that enforces 37 : * monotonic time behavior. 38 : * 39 : * Besides the fact that it only works with ACE Mutexes, the major difference 40 : * between this and std::condition_variable_any, the generalized form of 41 : * std::condition_variable, is that it takes the mutex as a constructor 42 : * argument, where the std::condition_variables take them as method arguments. 43 : */ 44 : template <typename Mutex> 45 : class ConditionVariable { 46 : public: 47 178 : explicit ConditionVariable(Mutex& mutex) 48 178 : : impl_(mutex, ACE_Condition_Attributes_T<MonotonicClock>()) 49 : { 50 178 : } 51 : 52 : /// Block until thread is woken up. 53 2590 : CvStatus wait(ThreadStatusManager& thread_status_manager) 54 : { 55 2590 : ThreadStatusManager::Sleeper s(thread_status_manager); 56 2590 : if (impl_.wait() == 0) { 57 2590 : return CvStatus_NoTimeout; 58 : } 59 0 : if (DCPS_debug_level) { 60 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ConditionVariable::wait: %p\n")); 61 : } 62 0 : return CvStatus_Error; 63 2590 : } 64 : 65 : /// Block until woken up or until expire_at. Same as wait() if expire_at is zero. 66 128 : CvStatus wait_until(const MonotonicTimePoint& expire_at, ThreadStatusManager& thread_status_manager) 67 : { 68 128 : if (expire_at.is_zero()) { 69 0 : return wait(thread_status_manager); 70 : } 71 128 : ThreadStatusManager::Sleeper s(thread_status_manager); 72 128 : if (impl_.wait(&expire_at.value()) == 0) { 73 38 : return CvStatus_NoTimeout; 74 90 : } else if (errno == ETIME) { 75 90 : return CvStatus_Timeout; 76 : } 77 0 : if (DCPS_debug_level) { 78 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ConditionVariable::wait_until: %m\n")); 79 : } 80 0 : return CvStatus_Error; 81 128 : } 82 : 83 : /// Block until woken up or for expire_in. Same as wait() if expire_in is zero. 84 : CvStatus wait_for(const TimeDuration& expire_in, ThreadStatusManager& thread_status_manager) 85 : { 86 : if (expire_in.is_zero()) { 87 : return wait(thread_status_manager); 88 : } 89 : ThreadStatusManager::Sleeper s(thread_status_manager); 90 : return wait_until(MonotonicTimePoint::now() + expire_in); 91 : } 92 : 93 : /// Unblock one of the threads waiting on this condition. 94 4233 : bool notify_one() 95 : { 96 4233 : if (impl_.signal() == 0) { 97 4233 : return true; 98 : } 99 0 : if (DCPS_debug_level) { 100 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ConditionVariable::notify_one: %m\n")); 101 : } 102 0 : return false; 103 : } 104 : 105 : /// Unblock all of the threads waiting on this condition. 106 4673 : bool notify_all() 107 : { 108 4673 : if (impl_.broadcast() == 0) { 109 4673 : return true; 110 : } 111 0 : if (DCPS_debug_level) { 112 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ConditionVariable::notify_all: %m\n")); 113 : } 114 0 : return false; 115 : } 116 : 117 : protected: 118 : ACE_Condition<Mutex> impl_; 119 : }; 120 : 121 : } // namespace DCPS 122 : } // namespace OpenDDS 123 : 124 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 125 : 126 : #endif // OPENDDS_DCPS_CONDITIONVARIABLE_H