WaitSet.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "WaitSet.h"
00010 #include "ConditionImpl.h"
00011 #include "Time_Helper.h"
00012
00013 #include "ace/OS_NS_sys_time.h"
00014
00015 namespace {
00016
00017 void copyInto(DDS::ConditionSeq& target,
00018 const DDS::WaitSet::ConditionSet& source)
00019 {
00020 const CORBA::ULong size = static_cast<CORBA::ULong>(source.size());
00021 target.length(size);
00022 CORBA::ULong index = 0;
00023
00024 for (DDS::WaitSet::ConditionSet::const_iterator iter = source.begin(),
00025 end = source.end(); iter != end; ++iter, ++index) {
00026 target[index] = *iter;
00027 }
00028 }
00029
00030 }
00031
00032 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00033
00034 namespace DDS {
00035
00036 DDS::ReturnCode_t WaitSet::attach_condition(Condition_ptr cond)
00037 {
00038 using OpenDDS::DCPS::ConditionImpl;
00039 Condition_var condv(Condition::_duplicate(cond));
00040
00041 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00042 RETCODE_OUT_OF_RESOURCES);
00043 ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
00044
00045 if (!ci) return RETCODE_BAD_PARAMETER;
00046
00047 ReturnCode_t ret = ci->attach_to_ws(this);
00048
00049 if (ret == RETCODE_OK) {
00050 attached_conditions_.insert(condv);
00051
00052 if (condv->get_trigger_value()) signal(condv.in());
00053
00054 return RETCODE_OK;
00055
00056 } else if (ret == RETCODE_PRECONDITION_NOT_MET) {
00057
00058 return RETCODE_OK;
00059 }
00060
00061 return ret;
00062 }
00063
00064 ReturnCode_t WaitSet::detach_condition(Condition_ptr cond)
00065 {
00066 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00067 RETCODE_OUT_OF_RESOURCES);
00068 return detach_i(cond);
00069 }
00070
00071 ReturnCode_t WaitSet::detach_conditions(const ConditionSeq& conds)
00072 {
00073 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00074 RETCODE_OUT_OF_RESOURCES);
00075
00076 for (CORBA::ULong i = 0; i < conds.length(); ++i) {
00077 ReturnCode_t ret = detach_i(conds[ i]);
00078
00079 if (ret != RETCODE_OK) {
00080 return ret;
00081 }
00082 }
00083
00084 return RETCODE_OK;
00085 }
00086
00087 ReturnCode_t WaitSet::detach_i(const Condition_ptr cond)
00088 {
00089 using OpenDDS::DCPS::ConditionImpl;
00090 Condition_var condv(Condition::_duplicate(cond));
00091
00092 ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
00093
00094 if (!ci) return RETCODE_BAD_PARAMETER;
00095
00096 ReturnCode_t ret = ci->detach_from_ws(this);
00097 attached_conditions_.erase(condv);
00098 signaled_conditions_.erase(condv);
00099 return ret;
00100 }
00101
00102 ReturnCode_t WaitSet::get_conditions(ConditionSeq& conds)
00103 {
00104 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00105 RETCODE_OUT_OF_RESOURCES);
00106 copyInto(conds, attached_conditions_);
00107 return RETCODE_OK;
00108 }
00109
00110 ReturnCode_t WaitSet::wait(ConditionSeq& active_conditions,
00111 const Duration_t& timeout)
00112 {
00113 if (waiting_.value()) return RETCODE_PRECONDITION_NOT_MET;
00114
00115 if (!OpenDDS::DCPS::non_negative_duration(timeout))
00116 return DDS::RETCODE_BAD_PARAMETER;
00117
00118 ACE_Time_Value deadline;
00119 ACE_Time_Value* p_deadline = 0;
00120
00121 if (timeout.sec != DURATION_INFINITE_SEC ||
00122 timeout.nanosec != DURATION_INFINITE_NSEC) {
00123 deadline = OpenDDS::DCPS::duration_to_absolute_time_value(timeout);
00124 p_deadline = &deadline;
00125 }
00126
00127 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00128 RETCODE_OUT_OF_RESOURCES);
00129 waiting_ = 1;
00130 signaled_conditions_.clear();
00131
00132 for (ConditionSet::const_iterator iter = attached_conditions_.begin(),
00133 end = attached_conditions_.end(); iter != end; ++iter) {
00134 if ((*iter)->get_trigger_value()) {
00135 signaled_conditions_.insert(*iter);
00136 }
00137 }
00138
00139 int error = 0;
00140
00141 while ((attached_conditions_.empty() || signaled_conditions_.empty())
00142 && !error) {
00143 if (cond_.wait(p_deadline) == -1) error = errno;
00144 }
00145
00146 copyInto(active_conditions, signaled_conditions_);
00147 signaled_conditions_.clear();
00148 waiting_ = 0;
00149
00150 switch (error) {
00151 case 0:
00152 return RETCODE_OK;
00153 case ETIME:
00154 return RETCODE_TIMEOUT;
00155 default:
00156 return RETCODE_ERROR;
00157 }
00158 }
00159
00160 void WaitSet::signal(Condition_ptr condition)
00161 {
00162 Condition_var condv(Condition::_duplicate(condition));
00163 ACE_GUARD(ACE_Recursive_Thread_Mutex, g, lock_);
00164
00165 if (attached_conditions_.find(condv) != attached_conditions_.end()) {
00166 signaled_conditions_.insert(condv);
00167 cond_.signal();
00168 }
00169 }
00170
00171 WaitSet_ptr WaitSet::_duplicate(WaitSet_ptr obj)
00172 {
00173 if (!CORBA::is_nil(obj)) obj->_add_ref();
00174
00175 return obj;
00176 }
00177
00178 }
00179
00180 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00181
00182 DDS::WaitSet_ptr
00183 TAO::Objref_Traits<DDS::WaitSet>::duplicate(DDS::WaitSet_ptr p)
00184 {
00185 return DDS::WaitSet::_duplicate(p);
00186 }
00187
00188 void
00189 TAO::Objref_Traits<DDS::WaitSet>::release(DDS::WaitSet_ptr p)
00190 {
00191 CORBA::release(p);
00192 }
00193
00194 DDS::WaitSet_ptr
00195 TAO::Objref_Traits<DDS::WaitSet>::nil()
00196 {
00197 return static_cast<DDS::WaitSet_ptr>(0);
00198 }
00199
00200 CORBA::Boolean
00201 TAO::Objref_Traits<DDS::WaitSet>::marshal(const DDS::WaitSet_ptr p,
00202 TAO_OutputCDR& cdr)
00203 {
00204 return CORBA::Object::marshal(p, cdr);
00205 }