WaitSet.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "WaitSet.h"
00010 #include "ConditionImpl.h"
00011 #include "Qos_Helper.h"
00012 
00013 #include "ace/OS_NS_sys_time.h"
00014 
00015 namespace {
00016 
00017 void copyInto(DDS::ConditionSeq& target,
00018               const DDS::WaitSet::ConditionSet& source)
00019 {
00020   size_t size = source.size();
00021   target.length(static_cast<CORBA::ULong>(size));
00022   CORBA::ULong index = 0;
00023 
00024   for (DDS::WaitSet::ConditionSet::const_iterator iter = source.begin(),
00025        end = source.end(); iter != end; ++iter, ++index) {
00026     target[index] = *iter;
00027   }
00028 }
00029 
00030 } // namespace
00031 
00032 namespace DDS {
00033 
00034 ReturnCode_t WaitSet::attach_condition(Condition_ptr cond)
00035 {
00036   using OpenDDS::DCPS::ConditionImpl;
00037   Condition_var condv(Condition::_duplicate(cond));
00038 
00039   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00040                    RETCODE_OUT_OF_RESOURCES);
00041   ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
00042 
00043   if (!ci) return RETCODE_BAD_PARAMETER;
00044 
00045   ReturnCode_t ret = ci->attach_to_ws(this);
00046 
00047   if (ret == RETCODE_OK) {
00048     attached_conditions_.insert(condv);
00049 
00050     if (condv->get_trigger_value()) signal(condv.in());
00051 
00052     return RETCODE_OK;
00053 
00054   } else if (ret == RETCODE_PRECONDITION_NOT_MET) {
00055     // RETCODE_PRECONDITION_NOT_MET means it was already in the set
00056     return RETCODE_OK;
00057   }
00058 
00059   return ret;
00060 }
00061 
00062 ReturnCode_t WaitSet::detach_condition(Condition_ptr cond)
00063 {
00064   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00065                    RETCODE_OUT_OF_RESOURCES);
00066   return detach_i(cond);
00067 }
00068 
00069 ReturnCode_t WaitSet::detach_conditions(const ConditionSeq& conds)
00070 {
00071   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00072                    RETCODE_OUT_OF_RESOURCES);
00073 
00074   for (CORBA::ULong i = 0; i < conds.length(); ++i) {
00075     ReturnCode_t ret = detach_i(conds[ i]);
00076 
00077     if (ret != RETCODE_OK) {
00078       return ret;
00079     }
00080   }
00081 
00082   return RETCODE_OK;
00083 }
00084 
00085 ReturnCode_t WaitSet::detach_i(const Condition_ptr cond)
00086 {
00087   using OpenDDS::DCPS::ConditionImpl;
00088   Condition_var condv(Condition::_duplicate(cond));
00089 
00090   ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
00091 
00092   if (!ci) return RETCODE_BAD_PARAMETER;
00093 
00094   ReturnCode_t ret = ci->detach_from_ws(this);
00095   attached_conditions_.erase(condv);
00096   signaled_conditions_.erase(condv);
00097   return ret;
00098 }
00099 
00100 ReturnCode_t WaitSet::get_conditions(ConditionSeq& conds)
00101 {
00102   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00103                    RETCODE_OUT_OF_RESOURCES);
00104   copyInto(conds, attached_conditions_);
00105   return RETCODE_OK;
00106 }
00107 
00108 ReturnCode_t WaitSet::wait(ConditionSeq& active_conditions,
00109                            const Duration_t& timeout)
00110 {
00111   if (waiting_.value()) return RETCODE_PRECONDITION_NOT_MET;
00112 
00113   if (!OpenDDS::DCPS::non_negative_duration(timeout))
00114     return DDS::RETCODE_BAD_PARAMETER;
00115 
00116   ACE_Time_Value deadline;
00117   ACE_Time_Value* p_deadline = 0;
00118 
00119   if (timeout.sec != DURATION_INFINITE_SEC ||
00120       timeout.nanosec != DURATION_INFINITE_NSEC) {
00121     deadline = OpenDDS::DCPS::duration_to_absolute_time_value(timeout);
00122     p_deadline = &deadline;
00123   }
00124 
00125   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
00126                    RETCODE_OUT_OF_RESOURCES);
00127   waiting_ = 1;
00128   signaled_conditions_.clear();
00129 
00130   for (ConditionSet::const_iterator iter = attached_conditions_.begin(),
00131        end = attached_conditions_.end(); iter != end; ++iter) {
00132     if ((*iter)->get_trigger_value()) {
00133       signaled_conditions_.insert(*iter);
00134     }
00135   }
00136 
00137   int error = 0;
00138 
00139   while ((attached_conditions_.empty() || signaled_conditions_.empty())
00140          && !error) {
00141     if (cond_.wait(p_deadline) == -1) error = errno;
00142   }
00143 
00144   copyInto(active_conditions, signaled_conditions_);
00145   signaled_conditions_.clear();
00146   waiting_ = 0;
00147 
00148   switch (error) {
00149   case 0:
00150     return RETCODE_OK;
00151   case ETIME:
00152     return RETCODE_TIMEOUT;
00153   default:
00154     return RETCODE_ERROR;
00155   }
00156 }
00157 
00158 void WaitSet::signal(Condition_ptr condition)
00159 {
00160   Condition_var condv(Condition::_duplicate(condition));
00161   ACE_GUARD(ACE_Recursive_Thread_Mutex, g, lock_);
00162 
00163   if (attached_conditions_.find(condv) != attached_conditions_.end()) {
00164     signaled_conditions_.insert(condv);
00165     cond_.signal();
00166   }
00167 }
00168 
00169 WaitSet_ptr WaitSet::_duplicate(WaitSet_ptr obj)
00170 {
00171   if (!CORBA::is_nil(obj)) obj->_add_ref();
00172 
00173   return obj;
00174 }
00175 
00176 } // namespace DDS
00177 
00178 DDS::WaitSet_ptr
00179 TAO::Objref_Traits<DDS::WaitSet>::duplicate(DDS::WaitSet_ptr p)
00180 {
00181   return DDS::WaitSet::_duplicate(p);
00182 }
00183 
00184 void
00185 TAO::Objref_Traits<DDS::WaitSet>::release(DDS::WaitSet_ptr p)
00186 {
00187   CORBA::release(p);
00188 }
00189 
00190 DDS::WaitSet_ptr
00191 TAO::Objref_Traits<DDS::WaitSet>::nil()
00192 {
00193   return static_cast<DDS::WaitSet_ptr>(0);
00194 }
00195 
00196 CORBA::Boolean
00197 TAO::Objref_Traits<DDS::WaitSet>::marshal(const DDS::WaitSet_ptr p,
00198                                           TAO_OutputCDR& cdr)
00199 {
00200   return CORBA::Object::marshal(p, cdr);
00201 }

Generated on Fri Feb 12 20:05:29 2016 for OpenDDS by  doxygen 1.4.7