00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "WaitSet.h"
00010 #include "ConditionImpl.h"
00011 #include "Qos_Helper.h"
00012
00013 #include "ace/OS_NS_sys_time.h"
00014
00015 namespace {
00016
00017 void copyInto(DDS::ConditionSeq& target,
00018 const DDS::WaitSet::ConditionSet& source)
00019 {
00020 size_t size = source.size();
00021 target.length(static_cast<CORBA::ULong>(size));
00022 CORBA::ULong index = 0;
00023
00024 for (DDS::WaitSet::ConditionSet::const_iterator iter = source.begin(),
00025 end = source.end(); iter != end; ++iter, ++index) {
00026 target[index] = *iter;
00027 }
00028 }
00029
00030 }
00031
00032 namespace DDS {
00033
00034 ReturnCode_t WaitSet::attach_condition(Condition_ptr cond)
00035 {
00036 using OpenDDS::DCPS::ConditionImpl;
00037 Condition_var condv(Condition::_duplicate(cond));
00038
00039 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00040 RETCODE_OUT_OF_RESOURCES);
00041 ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
00042
00043 if (!ci) return RETCODE_BAD_PARAMETER;
00044
00045 ReturnCode_t ret = ci->attach_to_ws(this);
00046
00047 if (ret == RETCODE_OK) {
00048 attached_conditions_.insert(condv);
00049
00050 if (condv->get_trigger_value()) signal(condv.in());
00051
00052 return RETCODE_OK;
00053
00054 } else if (ret == RETCODE_PRECONDITION_NOT_MET) {
00055
00056 return RETCODE_OK;
00057 }
00058
00059 return ret;
00060 }
00061
00062 ReturnCode_t WaitSet::detach_condition(Condition_ptr cond)
00063 {
00064 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00065 RETCODE_OUT_OF_RESOURCES);
00066 return detach_i(cond);
00067 }
00068
00069 ReturnCode_t WaitSet::detach_conditions(const ConditionSeq& conds)
00070 {
00071 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00072 RETCODE_OUT_OF_RESOURCES);
00073
00074 for (CORBA::ULong i = 0; i < conds.length(); ++i) {
00075 ReturnCode_t ret = detach_i(conds[ i]);
00076
00077 if (ret != RETCODE_OK) {
00078 return ret;
00079 }
00080 }
00081
00082 return RETCODE_OK;
00083 }
00084
00085 ReturnCode_t WaitSet::detach_i(const Condition_ptr cond)
00086 {
00087 using OpenDDS::DCPS::ConditionImpl;
00088 Condition_var condv(Condition::_duplicate(cond));
00089
00090 ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
00091
00092 if (!ci) return RETCODE_BAD_PARAMETER;
00093
00094 ReturnCode_t ret = ci->detach_from_ws(this);
00095 attached_conditions_.erase(condv);
00096 signaled_conditions_.erase(condv);
00097 return ret;
00098 }
00099
00100 ReturnCode_t WaitSet::get_conditions(ConditionSeq& conds)
00101 {
00102 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00103 RETCODE_OUT_OF_RESOURCES);
00104 copyInto(conds, attached_conditions_);
00105 return RETCODE_OK;
00106 }
00107
00108 ReturnCode_t WaitSet::wait(ConditionSeq& active_conditions,
00109 const Duration_t& timeout)
00110 {
00111 if (waiting_.value()) return RETCODE_PRECONDITION_NOT_MET;
00112
00113 if (!OpenDDS::DCPS::non_negative_duration(timeout))
00114 return DDS::RETCODE_BAD_PARAMETER;
00115
00116 ACE_Time_Value deadline;
00117 ACE_Time_Value* p_deadline = 0;
00118
00119 if (timeout.sec != DURATION_INFINITE_SEC ||
00120 timeout.nanosec != DURATION_INFINITE_NSEC) {
00121 deadline = OpenDDS::DCPS::duration_to_absolute_time_value(timeout);
00122 p_deadline = &deadline;
00123 }
00124
00125 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00126 RETCODE_OUT_OF_RESOURCES);
00127 waiting_ = 1;
00128 signaled_conditions_.clear();
00129
00130 for (ConditionSet::const_iterator iter = attached_conditions_.begin(),
00131 end = attached_conditions_.end(); iter != end; ++iter) {
00132 if ((*iter)->get_trigger_value()) {
00133 signaled_conditions_.insert(*iter);
00134 }
00135 }
00136
00137 int error = 0;
00138
00139 while ((attached_conditions_.empty() || signaled_conditions_.empty())
00140 && !error) {
00141 if (cond_.wait(p_deadline) == -1) error = errno;
00142 }
00143
00144 copyInto(active_conditions, signaled_conditions_);
00145 signaled_conditions_.clear();
00146 waiting_ = 0;
00147
00148 switch (error) {
00149 case 0:
00150 return RETCODE_OK;
00151 case ETIME:
00152 return RETCODE_TIMEOUT;
00153 default:
00154 return RETCODE_ERROR;
00155 }
00156 }
00157
00158 void WaitSet::signal(Condition_ptr condition)
00159 {
00160 Condition_var condv(Condition::_duplicate(condition));
00161 ACE_GUARD(ACE_Recursive_Thread_Mutex, g, lock_);
00162
00163 if (attached_conditions_.find(condv) != attached_conditions_.end()) {
00164 signaled_conditions_.insert(condv);
00165 cond_.signal();
00166 }
00167 }
00168
00169 WaitSet_ptr WaitSet::_duplicate(WaitSet_ptr obj)
00170 {
00171 if (!CORBA::is_nil(obj)) obj->_add_ref();
00172
00173 return obj;
00174 }
00175
00176 }
00177
00178 DDS::WaitSet_ptr
00179 TAO::Objref_Traits<DDS::WaitSet>::duplicate(DDS::WaitSet_ptr p)
00180 {
00181 return DDS::WaitSet::_duplicate(p);
00182 }
00183
00184 void
00185 TAO::Objref_Traits<DDS::WaitSet>::release(DDS::WaitSet_ptr p)
00186 {
00187 CORBA::release(p);
00188 }
00189
00190 DDS::WaitSet_ptr
00191 TAO::Objref_Traits<DDS::WaitSet>::nil()
00192 {
00193 return static_cast<DDS::WaitSet_ptr>(0);
00194 }
00195
00196 CORBA::Boolean
00197 TAO::Objref_Traits<DDS::WaitSet>::marshal(const DDS::WaitSet_ptr p,
00198 TAO_OutputCDR& cdr)
00199 {
00200 return CORBA::Object::marshal(p, cdr);
00201 }