LCOV - code coverage report
Current view: top level - DCPS - WaitSet.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 92 100 92.0 %
Date: 2023-04-30 01:32:43 Functions: 12 13 92.3 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : 
      10             : #include "WaitSet.h"
      11             : 
      12             : #include "ConditionImpl.h"
      13             : #include "Time_Helper.h"
      14             : #include "TimeTypes.h"
      15             : #include "Service_Participant.h"
      16             : 
      17             : namespace {
      18             : 
      19          23 : void copyInto(DDS::ConditionSeq& target,
      20             :               const DDS::WaitSet::ConditionSet& source)
      21             : {
      22          23 :   const CORBA::ULong size = static_cast<CORBA::ULong>(source.size());
      23          23 :   target.length(size);
      24          23 :   CORBA::ULong index = 0;
      25             : 
      26          46 :   for (DDS::WaitSet::ConditionSet::const_iterator iter = source.begin(),
      27          77 :        end = source.end(); iter != end; ++iter, ++index) {
      28          31 :     target[index] = *iter;
      29             :   }
      30          23 : }
      31             : 
      32             : } // namespace
      33             : 
      34             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      35             : 
      36             : namespace DDS {
      37             : 
      38          18 : DDS::ReturnCode_t WaitSet::attach_condition(Condition_ptr cond)
      39             : {
      40             :   using OpenDDS::DCPS::ConditionImpl;
      41          18 :   Condition_var condv(Condition::_duplicate(cond));
      42             : 
      43          18 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
      44             :                    RETCODE_OUT_OF_RESOURCES);
      45          18 :   ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
      46             : 
      47          18 :   if (!ci) return RETCODE_BAD_PARAMETER;
      48             : 
      49          18 :   ReturnCode_t ret = ci->attach_to_ws(this);
      50             : 
      51          18 :   if (ret == RETCODE_OK) {
      52          17 :     attached_conditions_.insert(condv);
      53             : 
      54          17 :     if (condv->get_trigger_value()) signal(condv.in());
      55             : 
      56          17 :     return RETCODE_OK;
      57             : 
      58           1 :   } else if (ret == RETCODE_PRECONDITION_NOT_MET) {
      59             :     // RETCODE_PRECONDITION_NOT_MET means it was already in the set
      60           1 :     return RETCODE_OK;
      61             :   }
      62             : 
      63           0 :   return ret;
      64          18 : }
      65             : 
      66          11 : ReturnCode_t WaitSet::detach_condition(Condition_ptr cond)
      67             : {
      68          11 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
      69             :                    RETCODE_OUT_OF_RESOURCES);
      70          11 :   return detach_i(cond);
      71          11 : }
      72             : 
      73           1 : ReturnCode_t WaitSet::detach_conditions(const ConditionSeq& conds)
      74             : {
      75           1 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
      76             :                    RETCODE_OUT_OF_RESOURCES);
      77             : 
      78           4 :   for (CORBA::ULong i = 0; i < conds.length(); ++i) {
      79           3 :     ReturnCode_t ret = detach_i(conds[ i]);
      80             : 
      81           3 :     if (ret != RETCODE_OK) {
      82           0 :       return ret;
      83             :     }
      84             :   }
      85             : 
      86           1 :   return RETCODE_OK;
      87           1 : }
      88             : 
      89          14 : ReturnCode_t WaitSet::detach_i(const Condition_ptr cond)
      90             : {
      91             :   using OpenDDS::DCPS::ConditionImpl;
      92          14 :   Condition_var condv(Condition::_duplicate(cond));
      93             : 
      94          14 :   ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
      95             : 
      96          14 :   if (!ci) return RETCODE_BAD_PARAMETER;
      97             : 
      98          14 :   ReturnCode_t ret = ci->detach_from_ws(this);
      99          14 :   attached_conditions_.erase(condv);
     100          14 :   signaled_conditions_.erase(condv);
     101          14 :   return ret;
     102          14 : }
     103             : 
     104          18 : ReturnCode_t WaitSet::get_conditions(ConditionSeq& conds)
     105             : {
     106          18 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
     107             :                    RETCODE_OUT_OF_RESOURCES);
     108          18 :   copyInto(conds, attached_conditions_);
     109          18 :   return RETCODE_OK;
     110          18 : }
     111             : 
     112           8 : ReturnCode_t WaitSet::wait(ConditionSeq& active_conditions,
     113             :                            const Duration_t& timeout)
     114             : {
     115             :   using namespace OpenDDS::DCPS;
     116             : 
     117           8 :   if (!non_negative_duration(timeout)) {
     118           1 :     return DDS::RETCODE_BAD_PARAMETER;
     119             :   }
     120             : 
     121           7 :   MonotonicTimePoint deadline;
     122           7 :   const bool use_deadline = !is_infinite(timeout);
     123           7 :   if (use_deadline) {
     124           3 :     deadline = MonotonicTimePoint::now() + TimeDuration(timeout);
     125             :   }
     126             : 
     127           7 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, g, lock_,
     128             :                    RETCODE_OUT_OF_RESOURCES);
     129             : 
     130           7 :   if (waiting_) {
     131           2 :     return RETCODE_PRECONDITION_NOT_MET;
     132             :   }
     133             : 
     134           5 :   waiting_ = true;
     135           5 :   signaled_conditions_.clear();
     136             : 
     137          10 :   for (ConditionSet::const_iterator iter = attached_conditions_.begin(),
     138          15 :        end = attached_conditions_.end(); iter != end; ++iter) {
     139           5 :     if ((*iter)->get_trigger_value()) {
     140           1 :       signaled_conditions_.insert(*iter);
     141             :     }
     142             :   }
     143             : 
     144           5 :   CvStatus status = CvStatus_NoTimeout;
     145           5 :   ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
     146           9 :   while ((attached_conditions_.empty() || signaled_conditions_.empty()) &&
     147             :       status == CvStatus_NoTimeout) {
     148           4 :     status = use_deadline ? cond_.wait_until(deadline, thread_status_manager) : cond_.wait(thread_status_manager);
     149             :   }
     150             : 
     151           5 :   copyInto(active_conditions, signaled_conditions_);
     152           5 :   signaled_conditions_.clear();
     153           5 :   waiting_ = false;
     154             : 
     155           5 :   switch (status) {
     156           4 :   case CvStatus_NoTimeout:
     157           4 :     return RETCODE_OK;
     158             : 
     159           1 :   case CvStatus_Timeout:
     160           1 :     return RETCODE_TIMEOUT;
     161             : 
     162           0 :   case CvStatus_Error:
     163             :   default:
     164           0 :     if (DCPS_debug_level) {
     165           0 :       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WaitSet::wait: wait_until failed\n"));
     166             :     }
     167           0 :     return RETCODE_ERROR;
     168             :   }
     169           7 : }
     170             : 
     171           4 : void WaitSet::signal(Condition_ptr condition)
     172             : {
     173           4 :   Condition_var condv(Condition::_duplicate(condition));
     174           4 :   ACE_GUARD(ACE_Recursive_Thread_Mutex, g, lock_);
     175             : 
     176           4 :   if (attached_conditions_.find(condv) != attached_conditions_.end()) {
     177           4 :     signaled_conditions_.insert(condv);
     178           4 :     cond_.notify_one();
     179             :   }
     180           4 : }
     181             : 
     182           4 : WaitSet_ptr WaitSet::_duplicate(WaitSet_ptr obj)
     183             : {
     184           4 :   if (!CORBA::is_nil(obj)) obj->_add_ref();
     185             : 
     186           4 :   return obj;
     187             : }
     188             : 
     189             : } // namespace DDS
     190             : 
     191             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     192             : 
     193             : DDS::WaitSet_ptr
     194           4 : TAO::Objref_Traits<DDS::WaitSet>::duplicate(DDS::WaitSet_ptr p)
     195             : {
     196           4 :   return DDS::WaitSet::_duplicate(p);
     197             : }
     198             : 
     199             : void
     200          16 : TAO::Objref_Traits<DDS::WaitSet>::release(DDS::WaitSet_ptr p)
     201             : {
     202          16 :   CORBA::release(p);
     203          16 : }
     204             : 
     205             : DDS::WaitSet_ptr
     206           1 : TAO::Objref_Traits<DDS::WaitSet>::nil()
     207             : {
     208           1 :   return static_cast<DDS::WaitSet_ptr>(0);
     209             : }
     210             : 
     211             : CORBA::Boolean
     212           0 : TAO::Objref_Traits<DDS::WaitSet>::marshal(const DDS::WaitSet_ptr p,
     213             :                                           TAO_OutputCDR& cdr)
     214             : {
     215           0 :   return CORBA::Object::marshal(p, cdr);
     216             : }

Generated by: LCOV version 1.16