Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : 10 : #include "WaitSet.h" 11 : 12 : #include "ConditionImpl.h" 13 : #include "Time_Helper.h" 14 : #include "TimeTypes.h" 15 : #include "Service_Participant.h" 16 : 17 : namespace { 18 : 19 23 : void copyInto(DDS::ConditionSeq& target, 20 : const DDS::WaitSet::ConditionSet& source) 21 : { 22 23 : const CORBA::ULong size = static_cast<CORBA::ULong>(source.size()); 23 23 : target.length(size); 24 23 : CORBA::ULong index = 0; 25 : 26 46 : for (DDS::WaitSet::ConditionSet::const_iterator iter = source.begin(), 27 77 : end = source.end(); iter != end; ++iter, ++index) { 28 31 : target[index] = *iter; 29 : } 30 23 : } 31 : 32 : } // namespace 33 : 34 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 35 : 36 : namespace DDS { 37 : 38 18 : DDS::ReturnCode_t WaitSet::attach_condition(Condition_ptr cond) 39 : { 40 : using OpenDDS::DCPS::ConditionImpl; 41 18 : Condition_var condv(Condition::_duplicate(cond)); 42 : 43 18 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_, 44 : RETCODE_OUT_OF_RESOURCES); 45 18 : ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond); 46 : 47 18 : if (!ci) return RETCODE_BAD_PARAMETER; 48 : 49 18 : ReturnCode_t ret = ci->attach_to_ws(this); 50 : 51 18 : if (ret == RETCODE_OK) { 52 17 : attached_conditions_.insert(condv); 53 : 54 17 : if (condv->get_trigger_value()) signal(condv.in()); 55 : 56 17 : return RETCODE_OK; 57 : 58 1 : } else if (ret == RETCODE_PRECONDITION_NOT_MET) { 59 : // RETCODE_PRECONDITION_NOT_MET means it was already in the set 60 1 : return RETCODE_OK; 61 : } 62 : 63 0 : return ret; 64 18 : } 65 : 66 11 : ReturnCode_t WaitSet::detach_condition(Condition_ptr cond) 67 : { 68 11 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_, 69 : RETCODE_OUT_OF_RESOURCES); 70 11 : return detach_i(cond); 71 11 : } 72 : 73 1 : ReturnCode_t WaitSet::detach_conditions(const ConditionSeq& conds) 74 : { 75 1 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_, 76 : RETCODE_OUT_OF_RESOURCES); 77 : 78 4 : for (CORBA::ULong i = 0; i < conds.length(); ++i) { 79 3 : ReturnCode_t ret = detach_i(conds[ i]); 80 : 81 3 : if (ret != RETCODE_OK) { 82 0 : return ret; 83 : } 84 : } 85 : 86 1 : return RETCODE_OK; 87 1 : } 88 : 89 14 : ReturnCode_t WaitSet::detach_i(const Condition_ptr cond) 90 : { 91 : using OpenDDS::DCPS::ConditionImpl; 92 14 : Condition_var condv(Condition::_duplicate(cond)); 93 : 94 14 : ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond); 95 : 96 14 : if (!ci) return RETCODE_BAD_PARAMETER; 97 : 98 14 : ReturnCode_t ret = ci->detach_from_ws(this); 99 14 : attached_conditions_.erase(condv); 100 14 : signaled_conditions_.erase(condv); 101 14 : return ret; 102 14 : } 103 : 104 18 : ReturnCode_t WaitSet::get_conditions(ConditionSeq& conds) 105 : { 106 18 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_, 107 : RETCODE_OUT_OF_RESOURCES); 108 18 : copyInto(conds, attached_conditions_); 109 18 : return RETCODE_OK; 110 18 : } 111 : 112 8 : ReturnCode_t WaitSet::wait(ConditionSeq& active_conditions, 113 : const Duration_t& timeout) 114 : { 115 : using namespace OpenDDS::DCPS; 116 : 117 8 : if (!non_negative_duration(timeout)) { 118 1 : return DDS::RETCODE_BAD_PARAMETER; 119 : } 120 : 121 7 : MonotonicTimePoint deadline; 122 7 : const bool use_deadline = !is_infinite(timeout); 123 7 : if (use_deadline) { 124 3 : deadline = MonotonicTimePoint::now() + TimeDuration(timeout); 125 : } 126 : 127 7 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_, 128 : RETCODE_OUT_OF_RESOURCES); 129 : 130 7 : if (waiting_) { 131 2 : return RETCODE_PRECONDITION_NOT_MET; 132 : } 133 : 134 5 : waiting_ = true; 135 5 : signaled_conditions_.clear(); 136 : 137 10 : for (ConditionSet::const_iterator iter = attached_conditions_.begin(), 138 15 : end = attached_conditions_.end(); iter != end; ++iter) { 139 5 : if ((*iter)->get_trigger_value()) { 140 1 : signaled_conditions_.insert(*iter); 141 : } 142 : } 143 : 144 5 : CvStatus status = CvStatus_NoTimeout; 145 5 : ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager(); 146 9 : while ((attached_conditions_.empty() || signaled_conditions_.empty()) && 147 : status == CvStatus_NoTimeout) { 148 4 : status = use_deadline ? cond_.wait_until(deadline, thread_status_manager) : cond_.wait(thread_status_manager); 149 : } 150 : 151 5 : copyInto(active_conditions, signaled_conditions_); 152 5 : signaled_conditions_.clear(); 153 5 : waiting_ = false; 154 : 155 5 : switch (status) { 156 4 : case CvStatus_NoTimeout: 157 4 : return RETCODE_OK; 158 : 159 1 : case CvStatus_Timeout: 160 1 : return RETCODE_TIMEOUT; 161 : 162 0 : case CvStatus_Error: 163 : default: 164 0 : if (DCPS_debug_level) { 165 0 : ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WaitSet::wait: wait_until failed\n")); 166 : } 167 0 : return RETCODE_ERROR; 168 : } 169 7 : } 170 : 171 4 : void WaitSet::signal(Condition_ptr condition) 172 : { 173 4 : Condition_var condv(Condition::_duplicate(condition)); 174 4 : ACE_GUARD(ACE_Recursive_Thread_Mutex, g, lock_); 175 : 176 4 : if (attached_conditions_.find(condv) != attached_conditions_.end()) { 177 4 : signaled_conditions_.insert(condv); 178 4 : cond_.notify_one(); 179 : } 180 4 : } 181 : 182 4 : WaitSet_ptr WaitSet::_duplicate(WaitSet_ptr obj) 183 : { 184 4 : if (!CORBA::is_nil(obj)) obj->_add_ref(); 185 : 186 4 : return obj; 187 : } 188 : 189 : } // namespace DDS 190 : 191 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 192 : 193 : DDS::WaitSet_ptr 194 4 : TAO::Objref_Traits<DDS::WaitSet>::duplicate(DDS::WaitSet_ptr p) 195 : { 196 4 : return DDS::WaitSet::_duplicate(p); 197 : } 198 : 199 : void 200 16 : TAO::Objref_Traits<DDS::WaitSet>::release(DDS::WaitSet_ptr p) 201 : { 202 16 : CORBA::release(p); 203 16 : } 204 : 205 : DDS::WaitSet_ptr 206 1 : TAO::Objref_Traits<DDS::WaitSet>::nil() 207 : { 208 1 : return static_cast<DDS::WaitSet_ptr>(0); 209 : } 210 : 211 : CORBA::Boolean 212 0 : TAO::Objref_Traits<DDS::WaitSet>::marshal(const DDS::WaitSet_ptr p, 213 : TAO_OutputCDR& cdr) 214 : { 215 0 : return CORBA::Object::marshal(p, cdr); 216 : }