OpenDDS  Snapshot(2023/04/07-19:43)
Public Types | Public Member Functions | Static Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
DDS::WaitSet Class Reference

#include <WaitSet.h>

Inheritance diagram for DDS::WaitSet:
Inheritance graph
[legend]
Collaboration diagram for DDS::WaitSet:
Collaboration graph
[legend]

Public Types

typedef WaitSet_ptr _ptr_type
 
typedef WaitSet_var _var_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< WaitSetInterf >
typedef WaitSetInterf ::_ptr_type _ptr_type
 
typedef WaitSetInterf ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 

Public Member Functions

 WaitSet ()
 
virtual ~WaitSet ()
 
ReturnCode_t wait (ConditionSeq &active_conditions, const Duration_t &timeout)
 
ReturnCode_t attach_condition (Condition_ptr cond)
 
ReturnCode_t detach_condition (Condition_ptr cond)
 
ReturnCode_t get_conditions (ConditionSeq &attached_conditions)
 
ReturnCode_t detach_conditions (const ConditionSeq &conditions)
 
typedef OPENDDS_SET_CMP (Condition_var, OpenDDS::DCPS::VarLess< Condition >) ConditionSet
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Static Public Member Functions

static WaitSet_ptr _duplicate (WaitSet_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< WaitSetInterf >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 

Private Types

typedef OpenDDS::DCPS::ConditionVariable< ACE_Recursive_Thread_MutexConditionVariableType
 

Private Member Functions

ReturnCode_t detach_i (const Condition_ptr cond)
 
void signal (Condition_ptr cond)
 

Private Attributes

ACE_Recursive_Thread_Mutex lock_
 
ConditionVariableType cond_
 
bool waiting_
 
ConditionSet attached_conditions_
 
ConditionSet signaled_conditions_
 

Friends

class OpenDDS::DCPS::ConditionImpl
 

Additional Inherited Members

- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 

Detailed Description

Definition at line 52 of file WaitSet.h.

Member Typedef Documentation

◆ _ptr_type

Definition at line 55 of file WaitSet.h.

◆ _var_type

Definition at line 56 of file WaitSet.h.

◆ ConditionVariableType

Definition at line 89 of file WaitSet.h.

Constructor & Destructor Documentation

◆ WaitSet()

DDS::WaitSet::WaitSet ( )
inline

Definition at line 58 of file WaitSet.h.

59  : cond_(lock_)
60  , waiting_(false)
61  {}
bool waiting_
Definition: WaitSet.h:92
ConditionVariableType cond_
Definition: WaitSet.h:90
ACE_Recursive_Thread_Mutex lock_
Definition: WaitSet.h:87

◆ ~WaitSet()

virtual DDS::WaitSet::~WaitSet ( )
inlinevirtual

Definition at line 63 of file WaitSet.h.

References _duplicate(), OpenDDS::DCPS::OPENDDS_SET_CMP(), signal(), and wait().

63 {}

Member Function Documentation

◆ _duplicate()

WaitSet_ptr DDS::WaitSet::_duplicate ( WaitSet_ptr  obj)
static

Definition at line 182 of file WaitSet.cpp.

References OpenDDS::DCPS::LocalObjectBase::_add_ref(), CORBA::is_nil(), CORBA::Object::marshal(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, and CORBA::release().

183 {
184  if (!CORBA::is_nil(obj)) obj->_add_ref();
185 
186  return obj;
187 }
Boolean is_nil(T x)

◆ attach_condition()

DDS::ReturnCode_t DDS::WaitSet::attach_condition ( Condition_ptr  cond)

Definition at line 38 of file WaitSet.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::ConditionImpl::attach_to_ws(), attached_conditions_, lock_, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, DDS::RETCODE_PRECONDITION_NOT_MET, and signal().

39 {
41  Condition_var condv(Condition::_duplicate(cond));
42 
45  ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
46 
47  if (!ci) return RETCODE_BAD_PARAMETER;
48 
49  ReturnCode_t ret = ci->attach_to_ws(this);
50 
51  if (ret == RETCODE_OK) {
52  attached_conditions_.insert(condv);
53 
54  if (condv->get_trigger_value()) signal(condv.in());
55 
56  return RETCODE_OK;
57 
58  } else if (ret == RETCODE_PRECONDITION_NOT_MET) {
59  // RETCODE_PRECONDITION_NOT_MET means it was already in the set
60  return RETCODE_OK;
61  }
62 
63  return ret;
64 }
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_BAD_PARAMETER
void signal(Condition_ptr cond)
Definition: WaitSet.cpp:171
ConditionSet attached_conditions_
Definition: WaitSet.h:93
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex lock_
Definition: WaitSet.h:87
DDS::ReturnCode_t attach_to_ws(DDS::WaitSet_ptr ws)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES

◆ detach_condition()

ReturnCode_t DDS::WaitSet::detach_condition ( Condition_ptr  cond)

Definition at line 66 of file WaitSet.cpp.

References ACE_GUARD_RETURN, detach_i(), lock_, and DDS::RETCODE_OUT_OF_RESOURCES.

67 {
70  return detach_i(cond);
71 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex lock_
Definition: WaitSet.h:87
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
ReturnCode_t detach_i(const Condition_ptr cond)
Definition: WaitSet.cpp:89

◆ detach_conditions()

ReturnCode_t DDS::WaitSet::detach_conditions ( const ConditionSeq conditions)

Convenience method for detaching multiple conditions, for example when shutting down.

Definition at line 73 of file WaitSet.cpp.

References ACE_GUARD_RETURN, detach_i(), lock_, DDS::RETCODE_OK, and DDS::RETCODE_OUT_OF_RESOURCES.

74 {
77 
78  for (CORBA::ULong i = 0; i < conds.length(); ++i) {
79  ReturnCode_t ret = detach_i(conds[ i]);
80 
81  if (ret != RETCODE_OK) {
82  return ret;
83  }
84  }
85 
86  return RETCODE_OK;
87 }
const ReturnCode_t RETCODE_OK
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex lock_
Definition: WaitSet.h:87
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
ReturnCode_t detach_i(const Condition_ptr cond)
Definition: WaitSet.cpp:89

◆ detach_i()

ReturnCode_t DDS::WaitSet::detach_i ( const Condition_ptr  cond)
private

Definition at line 89 of file WaitSet.cpp.

References attached_conditions_, OpenDDS::DCPS::ConditionImpl::detach_from_ws(), DDS::RETCODE_BAD_PARAMETER, and signaled_conditions_.

Referenced by detach_condition(), and detach_conditions().

90 {
92  Condition_var condv(Condition::_duplicate(cond));
93 
94  ConditionImpl* ci = dynamic_cast<ConditionImpl*>(cond);
95 
96  if (!ci) return RETCODE_BAD_PARAMETER;
97 
98  ReturnCode_t ret = ci->detach_from_ws(this);
99  attached_conditions_.erase(condv);
100  signaled_conditions_.erase(condv);
101  return ret;
102 }
const ReturnCode_t RETCODE_BAD_PARAMETER
ConditionSet signaled_conditions_
Definition: WaitSet.h:94
ConditionSet attached_conditions_
Definition: WaitSet.h:93
DDS::ReturnCode_t detach_from_ws(DDS::WaitSet_ptr ws)

◆ get_conditions()

ReturnCode_t DDS::WaitSet::get_conditions ( ConditionSeq attached_conditions)

Definition at line 104 of file WaitSet.cpp.

References ACE_GUARD_RETURN, attached_conditions_, lock_, DDS::RETCODE_OK, and DDS::RETCODE_OUT_OF_RESOURCES.

105 {
108  copyInto(conds, attached_conditions_);
109  return RETCODE_OK;
110 }
const ReturnCode_t RETCODE_OK
ConditionSet attached_conditions_
Definition: WaitSet.h:93
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex lock_
Definition: WaitSet.h:87
const ReturnCode_t RETCODE_OUT_OF_RESOURCES

◆ OPENDDS_SET_CMP()

typedef DDS::WaitSet::OPENDDS_SET_CMP ( Condition_var  ,
OpenDDS::DCPS::VarLess< Condition  
)

◆ signal()

void DDS::WaitSet::signal ( Condition_ptr  cond)
private

Definition at line 171 of file WaitSet.cpp.

References ACE_GUARD, attached_conditions_, cond_, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_one(), and signaled_conditions_.

Referenced by attach_condition().

172 {
173  Condition_var condv(Condition::_duplicate(condition));
175 
176  if (attached_conditions_.find(condv) != attached_conditions_.end()) {
177  signaled_conditions_.insert(condv);
178  cond_.notify_one();
179  }
180 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ConditionSet signaled_conditions_
Definition: WaitSet.h:94
ConditionSet attached_conditions_
Definition: WaitSet.h:93
ConditionVariableType cond_
Definition: WaitSet.h:90
ACE_Recursive_Thread_Mutex lock_
Definition: WaitSet.h:87
bool notify_one()
Unblock one of the threads waiting on this condition.

◆ wait()

ReturnCode_t DDS::WaitSet::wait ( ConditionSeq active_conditions,
const Duration_t timeout 
)

Definition at line 112 of file WaitSet.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, attached_conditions_, cond_, OpenDDS::DCPS::CvStatus_Error, OpenDDS::DCPS::CvStatus_NoTimeout, OpenDDS::DCPS::CvStatus_Timeout, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::is_infinite(), LM_ERROR, lock_, OpenDDS::DCPS::non_negative_duration(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, DDS::RETCODE_PRECONDITION_NOT_MET, DDS::RETCODE_TIMEOUT, signaled_conditions_, TheServiceParticipant, OpenDDS::DCPS::ConditionVariable< Mutex >::wait(), OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until(), and waiting_.

114 {
115  using namespace OpenDDS::DCPS;
116 
117  if (!non_negative_duration(timeout)) {
119  }
120 
121  MonotonicTimePoint deadline;
122  const bool use_deadline = !is_infinite(timeout);
123  if (use_deadline) {
124  deadline = MonotonicTimePoint::now() + TimeDuration(timeout);
125  }
126 
129 
130  if (waiting_) {
132  }
133 
134  waiting_ = true;
135  signaled_conditions_.clear();
136 
137  for (ConditionSet::const_iterator iter = attached_conditions_.begin(),
138  end = attached_conditions_.end(); iter != end; ++iter) {
139  if ((*iter)->get_trigger_value()) {
140  signaled_conditions_.insert(*iter);
141  }
142  }
143 
144  CvStatus status = CvStatus_NoTimeout;
145  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
146  while ((attached_conditions_.empty() || signaled_conditions_.empty()) &&
147  status == CvStatus_NoTimeout) {
148  status = use_deadline ? cond_.wait_until(deadline, thread_status_manager) : cond_.wait(thread_status_manager);
149  }
150 
151  copyInto(active_conditions, signaled_conditions_);
152  signaled_conditions_.clear();
153  waiting_ = false;
154 
155  switch (status) {
156  case CvStatus_NoTimeout:
157  return RETCODE_OK;
158 
159  case CvStatus_Timeout:
160  return RETCODE_TIMEOUT;
161 
162  case CvStatus_Error:
163  default:
164  if (DCPS_debug_level) {
165  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: WaitSet::wait: wait_until failed\n"));
166  }
167  return RETCODE_ERROR;
168  }
169 }
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_BAD_PARAMETER
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
ConditionSet signaled_conditions_
Definition: WaitSet.h:94
ConditionSet attached_conditions_
Definition: WaitSet.h:93
bool waiting_
Definition: WaitSet.h:92
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
ConditionVariableType cond_
Definition: WaitSet.h:90
ACE_INLINE OpenDDS_Dcps_Export bool non_negative_duration(const DDS::Duration_t &t)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
The wait has returned because of a timeout.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Recursive_Thread_Mutex lock_
Definition: WaitSet.h:87
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
ACE_INLINE OpenDDS_Dcps_Export bool is_infinite(const DDS::Duration_t &value)
#define TheServiceParticipant
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
The wait has returned because it was woken up.
const ReturnCode_t RETCODE_TIMEOUT

Friends And Related Function Documentation

◆ OpenDDS::DCPS::ConditionImpl

friend class OpenDDS::DCPS::ConditionImpl
friend

Definition at line 85 of file WaitSet.h.

Member Data Documentation

◆ attached_conditions_

ConditionSet DDS::WaitSet::attached_conditions_
private

Definition at line 93 of file WaitSet.h.

Referenced by attach_condition(), detach_i(), get_conditions(), signal(), and wait().

◆ cond_

ConditionVariableType DDS::WaitSet::cond_
private

Definition at line 90 of file WaitSet.h.

Referenced by signal(), and wait().

◆ lock_

ACE_Recursive_Thread_Mutex DDS::WaitSet::lock_
private

◆ signaled_conditions_

ConditionSet DDS::WaitSet::signaled_conditions_
private

Definition at line 94 of file WaitSet.h.

Referenced by detach_i(), signal(), and wait().

◆ waiting_

bool DDS::WaitSet::waiting_
private

Definition at line 92 of file WaitSet.h.

Referenced by wait().


The documentation for this class was generated from the following files: