OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::InstanceState Class Reference

manage the states of a received data instance. More...

#include <InstanceState.h>

Inheritance diagram for OpenDDS::DCPS::InstanceState:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::InstanceState:
Collaboration graph
[legend]

Classes

struct  CancelCommand
 
struct  CommandBase
 
struct  ScheduleCommand
 

Public Member Functions

 InstanceState (const DataReaderImpl_rch &reader, ACE_Recursive_Thread_Mutex &lock, DDS::InstanceHandle_t handle)
 
virtual ~InstanceState ()
 
void sample_info (DDS::SampleInfo &si, const ReceivedDataElement *de)
 Populate the SampleInfo structure. More...
 
DDS::InstanceStateKind instance_state () const
 Access instance state. More...
 
DDS::ViewStateKind view_state () const
 Access view state. More...
 
bool match (DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
 
size_t disposed_generation_count () const
 Access disposed generation count. More...
 
size_t no_writers_generation_count () const
 Access no writers generation count. More...
 
bool dispose_was_received (const GUID_t &writer_id)
 
bool unregister_was_received (const GUID_t &writer_id)
 
void data_was_received (const GUID_t &writer_id)
 Data sample received for this instance. More...
 
void lively (const GUID_t &writer_id)
 LIVELINESS message received for this DataWriter. More...
 
void accessed ()
 A read or take operation has been performed on this instance. More...
 
bool most_recent_generation (ReceivedDataElement *item) const
 
bool empty (bool value)
 DataReader has become empty. Returns true if the instance was released. More...
 
void schedule_pending ()
 Schedule a pending release of resources. More...
 
void schedule_release ()
 Schedule an immediate release of resources. More...
 
void cancel_release ()
 Cancel a scheduled or pending release of resources. More...
 
bool release_if_empty ()
 
void release ()
 Remove the instance immediately. More...
 
bool writes_instance (const GUID_t &writer_id) const
 Returns true if the writer is a writer of this instance. More...
 
WeakRcHandle< DataReaderImpldata_reader () const
 
void state_updated () const
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *arg)
 
void set_owner (const GUID_t &owner)
 
GUID_t get_owner ()
 
bool is_exclusive () const
 
bool registered ()
 
void registered (bool flag)
 
bool is_last (const GUID_t &pub)
 
bool no_writer () const
 
void reset_ownership (DDS::InstanceHandle_t instance)
 
DDS::InstanceHandle_t instance_handle () const
 
const char * instance_state_string () const
 Return string of the name of the current instance state. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::ReactorInterceptor
CommandPtr execute_or_enqueue (CommandPtr command)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor () const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Static Public Member Functions

static const char * instance_state_string (DDS::InstanceStateKind value)
 Return string of the name of the instance state kind passed. More...
 
static OPENDDS_STRING instance_state_mask_string (DDS::InstanceStateMask mask)
 Return string representation of the instance state mask passed. More...
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 

Private Member Functions

bool reactor_is_shut_down () const
 

Private Attributes

ACE_Recursive_Thread_Mutexlock_
 
ACE_Thread_Mutex owner_lock_
 
DDS::InstanceStateKind instance_state_
 
DDS::ViewStateKind view_state_
 
size_t disposed_generation_count_
 
size_t no_writers_generation_count_
 
bool empty_
 
bool release_pending_
 
long release_timer_id_
 
WeakRcHandle< DataReaderImplreader_
 
DDS::InstanceHandle_t handle_
 
RepoIdSet writers_
 
GUID_t owner_
 
bool exclusive_
 
bool registered_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::ReactorInterceptor
typedef RcHandle< CommandCommandPtr
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from OpenDDS::DCPS::ReactorInterceptor
enum  ReactorState { RS_NONE, RS_NOTIFIED, RS_PROCESSING }
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::ReactorInterceptor
 ReactorInterceptor (ACE_Reactor *reactor, ACE_thread_t owner)
 
virtual ~ReactorInterceptor ()
 
int handle_exception (ACE_HANDLE)
 
void process_command_queue_i (ACE_Guard< ACE_Thread_Mutex > &guard)
 
typedef OPENDDS_VECTOR (CommandPtr) Queue
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from OpenDDS::DCPS::ReactorInterceptor
ACE_thread_t owner_
 
ACE_Thread_Mutex mutex_
 
Queue command_queue_
 
ReactorState state_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

manage the states of a received data instance.

Provide a mechanism to manage the view state and instance state values for an instance contained within a DataReader. The instance_state and view_state are managed by this class. Accessors are provided to query the current value of each of these states.

Definition at line 49 of file InstanceState.h.

Constructor & Destructor Documentation

◆ InstanceState()

OpenDDS::DCPS::InstanceState::InstanceState ( const DataReaderImpl_rch reader,
ACE_Recursive_Thread_Mutex lock,
DDS::InstanceHandle_t  handle 
)

Definition at line 28 of file InstanceState.cpp.

32  TheServiceParticipant->reactor_owner()),
33  lock_(lock),
34  instance_state_(0),
35  view_state_(0),
38  empty_(true),
39  release_pending_(false),
41  reader_(reader),
42  handle_(handle),
44 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
45  exclusive_(reader->qos_.ownership.kind == DDS::EXCLUSIVE_OWNERSHIP_QOS),
46 #endif
47  registered_(false)
48 {}
DDS::InstanceStateKind instance_state_
ReactorInterceptor(ACE_Reactor *reactor, ACE_thread_t owner)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
WeakRcHandle< DataReaderImpl > reader_
DDS::ViewStateKind view_state_
DDS::InstanceHandle_t handle_
#define TheServiceParticipant
ACE_Recursive_Thread_Mutex & lock_

◆ ~InstanceState()

OpenDDS::DCPS::InstanceState::~InstanceState ( )
virtual

Definition at line 50 of file InstanceState.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), reader_, registered_, and OpenDDS::DCPS::OwnershipManager::remove_instance().

51 {
52 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
53  if (registered_) {
54  RcHandle<DataReaderImpl> reader = reader_.lock();
55  if (reader) {
56  DataReaderImpl::OwnershipManagerPtr om = reader->ownership_manager();
57  if (om) om->remove_instance(this);
58  }
59  }
60 #endif
61 }
RcHandle< T > lock() const
Definition: RcObject.h:188
WeakRcHandle< DataReaderImpl > reader_

Member Function Documentation

◆ accessed()

ACE_INLINE void OpenDDS::DCPS::InstanceState::accessed ( )

A read or take operation has been performed on this instance.

Definition at line 22 of file InstanceState.inl.

References ACE_GUARD, ACE_INLINE, DDS::ANY_VIEW_STATE, lock_, DDS::NOT_NEW_VIEW_STATE, state_updated(), and view_state_.

Referenced by OpenDDS::DCPS::RakeResults< MessageType >::copy_into(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_next_sample(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::store_synthetic_data(), and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::take_next_sample().

23 {
25  //
26  // Manage the view state due to data access here.
27  //
29  const bool updated = (view_state_ != DDS::NOT_NEW_VIEW_STATE);
31  if (updated) {
32  state_updated();
33  }
34  }
35 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const ViewStateKind NOT_NEW_VIEW_STATE
const ViewStateMask ANY_VIEW_STATE
DDS::ViewStateKind view_state_
ACE_Recursive_Thread_Mutex & lock_

◆ cancel_release()

void OpenDDS::DCPS::InstanceState::cancel_release ( )

Cancel a scheduled or pending release of resources.

Definition at line 240 of file InstanceState.cpp.

References OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), and release_pending_.

Referenced by data_was_received(), lively(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::purge_data(), and schedule_release().

241 {
242  release_pending_ = false;
243  execute_or_enqueue(make_rch<CancelCommand>(this));
244 }
CommandPtr execute_or_enqueue(CommandPtr command)

◆ data_reader()

Definition at line 15 of file InstanceState.inl.

References ACE_INLINE, and reader_.

Referenced by OpenDDS::DCPS::SubscriptionInstance::~SubscriptionInstance().

16 {
17  return reader_;
18 }
WeakRcHandle< DataReaderImpl > reader_

◆ data_was_received()

ACE_INLINE void OpenDDS::DCPS::InstanceState::data_was_received ( const GUID_t writer_id)

Data sample received for this instance.

Definition at line 71 of file InstanceState.inl.

References ACE_GUARD, ACE_INLINE, DDS::ALIVE_INSTANCE_STATE, cancel_release(), disposed_generation_count_, instance_state_, lock_, DDS::NEW_VIEW_STATE, no_writers_generation_count_, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, DDS::NOT_ALIVE_INSTANCE_STATE, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, DDS::NOT_NEW_VIEW_STATE, state_updated(), view_state_, and writers_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::finish_store_instance_data().

72 {
75 
76  //
77  // Update the view state here, since only sample data received affects
78  // this state value. Then manage the data sample only transitions
79  // here. Let the lively() method manage the other transitions.
80  //
81  writers_.insert(writer_id);
82 
83  const CORBA::ULong old_view_state = view_state_;
84  const CORBA::ULong old_instance_state = instance_state_;
85 
86  switch (view_state_) {
88  break;
89 
93  }
94  break;
95 
96  default:
98  break;
99  }
100 
101  switch (instance_state_) {
104  break;
105 
108  break;
109 
110  default:
111  break;
112  }
113 
115 
116  if (view_state_ != old_view_state || instance_state_ != old_instance_state) {
117  state_updated();
118  }
119 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::InstanceStateKind instance_state_
const ViewStateKind NEW_VIEW_STATE
const InstanceStateKind ALIVE_INSTANCE_STATE
const InstanceStateMask NOT_ALIVE_INSTANCE_STATE
ACE_CDR::ULong ULong
const ViewStateKind NOT_NEW_VIEW_STATE
void cancel_release()
Cancel a scheduled or pending release of resources.
DDS::ViewStateKind view_state_
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
ACE_Recursive_Thread_Mutex & lock_

◆ dispose_was_received()

bool OpenDDS::DCPS::InstanceState::dispose_was_received ( const GUID_t writer_id)

DISPOSE message received for this instance. Return flag indicates whether the instance state was changed. This flag is used by concrete DataReader to determine whether it should notify listener. If state is not changed, the dispose message is ignored.

Definition at line 119 of file InstanceState.cpp.

References ACE_GUARD_RETURN, DDS::ALIVE_INSTANCE_STATE, exclusive_, handle_, instance_state_, OpenDDS::DCPS::OwnershipManager::is_owner(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, reader_, schedule_release(), state_updated(), and writers_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::finish_store_instance_data().

120 {
122  writers_.erase(writer_id);
123 
124  //
125  // Manage the instance state on disposal here.
126  //
127  // If disposed by owner then the owner is not re-elected, it can
128  // resume if the writer sends message again.
130 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
131  RcHandle<DataReaderImpl> reader = reader_.lock();
132  if (reader) {
133  DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager();
134  if (! exclusive_
135  || (owner_manager && owner_manager->is_owner (handle_, writer_id))) {
136 #endif
138  state_updated();
140  return true;
141 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
142  }
143  }
144 #endif
145  }
146 
147  return false;
148 }
DDS::InstanceStateKind instance_state_
const InstanceStateKind ALIVE_INSTANCE_STATE
RcHandle< T > lock() const
Definition: RcObject.h:188
void schedule_release()
Schedule an immediate release of resources.
WeakRcHandle< DataReaderImpl > reader_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DDS::InstanceHandle_t handle_
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
ACE_Recursive_Thread_Mutex & lock_

◆ disposed_generation_count()

ACE_INLINE size_t OpenDDS::DCPS::InstanceState::disposed_generation_count ( ) const

Access disposed generation count.

Definition at line 58 of file InstanceState.inl.

References ACE_INLINE, and disposed_generation_count_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::finish_store_instance_data().

59 {
61 }

◆ empty()

ACE_INLINE bool OpenDDS::DCPS::InstanceState::empty ( bool  value)

DataReader has become empty. Returns true if the instance was released.

Definition at line 147 of file InstanceState.inl.

References ACE_INLINE, empty_, release_if_empty(), and release_pending_.

Referenced by OpenDDS::DCPS::ReceivedDataElementList::add(), and OpenDDS::DCPS::ReceivedDataElementList::remove().

148 {
149  //
150  // Manage the instance state due to the DataReader becoming empty
151  // here.
152  //
153  if ((empty_ = value) && release_pending_) {
154  return release_if_empty();
155  } else {
156  return false;
157  }
158 }
const LogLevel::Value value
Definition: debug.cpp:61

◆ get_owner()

GUID_t OpenDDS::DCPS::InstanceState::get_owner ( )

◆ handle_timeout()

int OpenDDS::DCPS::InstanceState::handle_timeout ( const ACE_Time_Value current_time,
const void *  arg 
)
virtual

Reimplemented from ACE_Event_Handler.

Definition at line 103 of file InstanceState.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, handle_, LM_NOTICE, release(), and TheServiceParticipant.

104 {
105  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
106 
107  if (DCPS_debug_level) {
108  ACE_DEBUG((LM_NOTICE,
109  ACE_TEXT("(%P|%t) NOTICE:")
110  ACE_TEXT(" InstanceState::handle_timeout:")
111  ACE_TEXT(" autopurging samples with instance handle 0x%x!\n"),
112  handle_));
113  }
114  release();
115 
116  return 0;
117 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
DDS::InstanceHandle_t handle_
void release()
Remove the instance immediately.
#define TheServiceParticipant

◆ instance_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::InstanceState::instance_handle ( ) const
inline

◆ instance_state()

ACE_INLINE DDS::InstanceStateKind OpenDDS::DCPS::InstanceState::instance_state ( ) const

◆ instance_state_mask_string()

OPENDDS_STRING OpenDDS::DCPS::InstanceState::instance_state_mask_string ( DDS::InstanceStateMask  mask)
static

Return string representation of the instance state mask passed.

Definition at line 366 of file InstanceState.cpp.

References DDS::ALIVE_INSTANCE_STATE, DDS::ANY_INSTANCE_STATE, instance_state_string(), DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, DDS::NOT_ALIVE_INSTANCE_STATE, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, OPENDDS_END_VERSIONED_NAMESPACE_DECL, and OPENDDS_STRING.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_instance_i().

367 {
368  if (mask == DDS::ANY_INSTANCE_STATE) {
370  }
371  if (mask == DDS::NOT_ALIVE_INSTANCE_STATE) {
373  }
374  OPENDDS_STRING str;
375  if (mask & DDS::ALIVE_INSTANCE_STATE) {
376  str = instance_state_string(DDS::ALIVE_INSTANCE_STATE);
377  }
379  if (!str.empty()) str += " | ";
380  str += instance_state_string(DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
381  }
383  if (!str.empty()) str += " | ";
384  str += instance_state_string(DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE);
385  }
386  return str;
387 }
const InstanceStateKind ALIVE_INSTANCE_STATE
const InstanceStateMask NOT_ALIVE_INSTANCE_STATE
const InstanceStateMask ANY_INSTANCE_STATE
#define OPENDDS_STRING
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
const char * instance_state_string() const
Return string of the name of the current instance state.
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE

◆ instance_state_string() [1/2]

ACE_INLINE const char * OpenDDS::DCPS::InstanceState::instance_state_string ( ) const

Return string of the name of the current instance state.

Definition at line 180 of file InstanceState.inl.

References instance_state_, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

Referenced by instance_state_mask_string(), and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_instance_i().

181 {
183 }
DDS::InstanceStateKind instance_state_
const char * instance_state_string() const
Return string of the name of the current instance state.

◆ instance_state_string() [2/2]

const char * OpenDDS::DCPS::InstanceState::instance_state_string ( DDS::InstanceStateKind  value)
static

Return string of the name of the instance state kind passed.

Definition at line 344 of file InstanceState.cpp.

References ACE_ERROR, ACE_TEXT(), DDS::ALIVE_INSTANCE_STATE, DDS::ANY_INSTANCE_STATE, LM_ERROR, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, DDS::NOT_ALIVE_INSTANCE_STATE, and DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE.

345 {
346  switch (value) {
348  return "ALIVE_INSTANCE_STATE";
350  return "NOT_ALIVE_INSTANCE_STATE";
352  return "NOT_ALIVE_DISPOSED_INSTANCE_STATE";
354  return "NOT_ALIVE_NO_WRITERS_INSTANCE_STATE";
356  return "ANY_INSTANCE_STATE";
357  default:
358  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: InstanceState::instance_state_string: ")
359  ACE_TEXT("%d is either invalid or not recognized.\n"),
360  value));
361 
362  return "Invalid instance state";
363  }
364 }
#define ACE_ERROR(X)
const LogLevel::Value value
Definition: debug.cpp:61
const InstanceStateKind ALIVE_INSTANCE_STATE
const InstanceStateMask NOT_ALIVE_INSTANCE_STATE
const InstanceStateMask ANY_INSTANCE_STATE
ACE_TEXT("TCP_Factory")
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE

◆ is_exclusive()

bool OpenDDS::DCPS::InstanceState::is_exclusive ( ) const

Definition at line 278 of file InstanceState.cpp.

References exclusive_.

Referenced by OpenDDS::DCPS::DataReaderImpl::process_deadline().

279 {
280  return exclusive_;
281 }

◆ is_last()

ACE_INLINE bool OpenDDS::DCPS::InstanceState::is_last ( const GUID_t pub)

Definition at line 163 of file InstanceState.inl.

References ACE_GUARD_RETURN, ACE_INLINE, lock_, and writers_.

Referenced by OpenDDS::DCPS::DataReaderImpl::data_received().

164 {
166 
167  return writers_.size() == 1 && *writers_.begin() == pub;
168 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex & lock_

◆ lively()

ACE_INLINE void OpenDDS::DCPS::InstanceState::lively ( const GUID_t writer_id)

LIVELINESS message received for this DataWriter.

Definition at line 123 of file InstanceState.inl.

References ACE_GUARD, ACE_INLINE, DDS::ALIVE_INSTANCE_STATE, cancel_release(), instance_state_, lock_, no_writers_generation_count_, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, state_updated(), and writers_.

Referenced by OpenDDS::DCPS::DataReaderImpl::signal_liveliness(), and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::store_instance_data().

124 {
126 
127  //
128  // Manage transisitions in the instance state that do not require a
129  // data sample, but merely the notion of liveliness.
130  //
131  writers_.insert(writer_id);
132 
134  cancel_release(); // cancel unregister
135 
137  const bool updated = (instance_state_ != DDS::ALIVE_INSTANCE_STATE);
139  if (updated) {
140  state_updated();
141  }
142  }
143 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::InstanceStateKind instance_state_
const InstanceStateKind ALIVE_INSTANCE_STATE
void cancel_release()
Cancel a scheduled or pending release of resources.
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
ACE_Recursive_Thread_Mutex & lock_

◆ match()

ACE_INLINE bool OpenDDS::DCPS::InstanceState::match ( DDS::ViewStateMask  view,
DDS::InstanceStateMask  inst 
) const

◆ most_recent_generation()

bool OpenDDS::DCPS::InstanceState::most_recent_generation ( ReceivedDataElement item) const

◆ no_writer()

ACE_INLINE bool OpenDDS::DCPS::InstanceState::no_writer ( ) const

Definition at line 172 of file InstanceState.inl.

References ACE_GUARD_RETURN, ACE_INLINE, lock_, and writers_.

173 {
175 
176  return writers_.empty();
177 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex & lock_

◆ no_writers_generation_count()

ACE_INLINE size_t OpenDDS::DCPS::InstanceState::no_writers_generation_count ( ) const

Access no writers generation count.

Definition at line 64 of file InstanceState.inl.

References ACE_INLINE, and no_writers_generation_count_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::finish_store_instance_data().

65 {
67 }

◆ reactor_is_shut_down()

bool OpenDDS::DCPS::InstanceState::reactor_is_shut_down ( ) const
privatevirtual

Implements OpenDDS::DCPS::ReactorInterceptor.

Definition at line 315 of file InstanceState.cpp.

References TheServiceParticipant.

316 {
317  return TheServiceParticipant->is_shut_down();
318 }
#define TheServiceParticipant

◆ registered() [1/2]

bool OpenDDS::DCPS::InstanceState::registered ( )

Definition at line 283 of file InstanceState.cpp.

References lock_, and registered_.

Referenced by OpenDDS::DCPS::OwnershipManager::select_owner().

284 {
286  const bool ret = registered_;
287  registered_ = true;
288  return ret;
289 }
ACE_Recursive_Thread_Mutex & lock_

◆ registered() [2/2]

void OpenDDS::DCPS::InstanceState::registered ( bool  flag)

Definition at line 291 of file InstanceState.cpp.

References lock_, and registered_.

292 {
294  registered_ = flag;
295 }
ACE_Recursive_Thread_Mutex & lock_

◆ release()

void OpenDDS::DCPS::InstanceState::release ( void  )

Remove the instance immediately.

Definition at line 258 of file InstanceState.cpp.

References handle_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), and reader_.

Referenced by handle_timeout(), and release_if_empty().

259 {
260  RcHandle<DataReaderImpl> reader = reader_.lock();
261  if (reader) {
262  reader->release_instance(handle_);
263  }
264 }
RcHandle< T > lock() const
Definition: RcObject.h:188
WeakRcHandle< DataReaderImpl > reader_
DDS::InstanceHandle_t handle_

◆ release_if_empty()

bool OpenDDS::DCPS::InstanceState::release_if_empty ( )

Remove the instance if it's instance has no samples and no writers. Returns true if the instance was released.

Definition at line 246 of file InstanceState.cpp.

References empty_, release(), schedule_pending(), and writers_.

Referenced by empty().

247 {
248  bool released = false;
249  if (empty_ && writers_.empty()) {
250  release();
251  released = true;
252  } else {
254  }
255  return released;
256 }
void schedule_pending()
Schedule a pending release of resources.
void release()
Remove the instance immediately.

◆ reset_ownership()

void OpenDDS::DCPS::InstanceState::reset_ownership ( DDS::InstanceHandle_t  instance)

Definition at line 297 of file InstanceState.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, reader_, registered_, and set_owner().

298 {
301  registered_ = false;
302 
303  RcHandle<DataReaderImpl> reader = reader_.lock();
304  if (reader) {
305  reader->reset_ownership(instance);
306  }
307 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RcHandle< T > lock() const
Definition: RcObject.h:188
WeakRcHandle< DataReaderImpl > reader_
void set_owner(const GUID_t &owner)
ACE_Recursive_Thread_Mutex & lock_

◆ sample_info()

void OpenDDS::DCPS::InstanceState::sample_info ( DDS::SampleInfo si,
const ReceivedDataElement de 
)

Populate the SampleInfo structure.

Definition at line 63 of file InstanceState.cpp.

References DDS::SampleInfo::absolute_generation_rank, DDS::SampleInfo::disposed_generation_count, OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, disposed_generation_count_, DDS::SampleInfo::generation_rank, OpenDDS::DCPS::SequenceNumber::getValue(), handle_, DDS::HANDLE_NIL, DDS::SampleInfo::instance_handle, DDS::SampleInfo::instance_state, instance_state_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), DDS::SampleInfo::no_writers_generation_count, OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, no_writers_generation_count_, DDS::SampleInfo::opendds_reserved_publication_seq, OpenDDS::DCPS::ReceivedDataElement::pub_, DDS::SampleInfo::publication_handle, reader_, DDS::SampleInfo::sample_rank, DDS::SampleInfo::sample_state, OpenDDS::DCPS::ReceivedDataElement::sample_state_, OpenDDS::DCPS::ReceivedDataElement::sequence_, DDS::SampleInfo::source_timestamp, OpenDDS::DCPS::ReceivedDataElement::source_timestamp_, DDS::SampleInfo::valid_data, OpenDDS::DCPS::ReceivedDataElement::valid_data_, DDS::SampleInfo::view_state, and view_state_.

Referenced by OpenDDS::DCPS::RakeResults< MessageType >::copy_into(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_next_sample(), and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::take_next_sample().

64 {
65  si.sample_state = de->sample_state_;
72  si.source_timestamp = de->source_timestamp_;
74  RcHandle<DataReaderImpl> reader = reader_.lock();
75  if (reader) {
76  RcHandle<DomainParticipantImpl> participant = reader->participant_servant_.lock();
77  si.publication_handle = participant ? participant->lookup_handle(de->pub_) : DDS::HANDLE_NIL;
78  } else {
80  }
81  si.valid_data = de->valid_data_;
82  /*
83  * These are actually calculated later...
84  */
85  si.sample_rank = 0;
86 
87  // these aren't the real value, they're being saved
88  // for a later calculation. the actual value is
89  // calculated in DataReaderImpl::sample_info using
90  // these values.
91  si.generation_rank =
92  static_cast<CORBA::Long>(de->disposed_generation_count_ +
93  de->no_writers_generation_count_);
95  static_cast<CORBA::Long>(de->disposed_generation_count_ +
96  de->no_writers_generation_count_);
97 
98  si.opendds_reserved_publication_seq = de->sequence_.getValue();
99 }
ACE_CDR::Long Long
long absolute_generation_rank
DDS::InstanceStateKind instance_state_
InstanceHandle_t publication_handle
long long opendds_reserved_publication_seq
InstanceHandle_t instance_handle
const InstanceHandle_t HANDLE_NIL
RcHandle< T > lock() const
Definition: RcObject.h:188
SampleStateKind sample_state
ViewStateKind view_state
WeakRcHandle< DataReaderImpl > reader_
Time_t source_timestamp
InstanceStateKind instance_state
DDS::ViewStateKind view_state_
long no_writers_generation_count
long disposed_generation_count
DDS::InstanceHandle_t handle_

◆ schedule_pending()

void OpenDDS::DCPS::InstanceState::schedule_pending ( )

Schedule a pending release of resources.

Definition at line 183 of file InstanceState.cpp.

References release_pending_.

Referenced by release_if_empty(), and schedule_release().

184 {
185  release_pending_ = true;
186 }

◆ schedule_release()

void OpenDDS::DCPS::InstanceState::schedule_release ( )

Schedule an immediate release of resources.

Definition at line 196 of file InstanceState.cpp.

References ACE_ERROR, ACE_TEXT(), DDS::ReaderDataLifecycleQosPolicy::autopurge_disposed_samples_delay, DDS::ReaderDataLifecycleQosPolicy::autopurge_nowriter_samples_delay, cancel_release(), DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue(), instance_state_, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), DDS::Duration_t::nanosec, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, reader_, DDS::DataReaderQos::reader_data_lifecycle, schedule_pending(), and DDS::Duration_t::sec.

Referenced by dispose_was_received(), and unregister_was_received().

197 {
198  DDS::DataReaderQos qos;
199  RcHandle<DataReaderImpl> reader = reader_.lock();
200  if (reader) {
201  qos = reader->qos_;
202  } else {
203  cancel_release();
204  return;
205  }
206 
207  DDS::Duration_t delay;
208 
209  switch (instance_state_) {
212  break;
213 
216  break;
217 
218  default:
219  ACE_ERROR((LM_ERROR,
220  ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
221  ACE_TEXT(" Unsupported instance state: %d!\n"),
222  instance_state_));
223  return;
224  }
225 
226  if (delay.sec != DDS::DURATION_INFINITE_SEC &&
228 
229  execute_or_enqueue(make_rch<ScheduleCommand>(this, TimeDuration(delay)));
230 
231  } else {
232  // N.B. instance transitions are always followed by a non-valid
233  // sample being queued to the ReceivedDataElementList; marking
234  // the release as pending prevents this sample from being lost
235  // if all samples have been already removed from the instance.
237  }
238 }
#define ACE_ERROR(X)
DDS::InstanceStateKind instance_state_
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
RcHandle< T > lock() const
Definition: RcObject.h:188
ReaderDataLifecycleQosPolicy reader_data_lifecycle
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
WeakRcHandle< DataReaderImpl > reader_
void cancel_release()
Cancel a scheduled or pending release of resources.
void schedule_pending()
Schedule a pending release of resources.
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
ACE_TEXT("TCP_Factory")
CommandPtr execute_or_enqueue(CommandPtr command)
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE

◆ set_owner()

void OpenDDS::DCPS::InstanceState::set_owner ( const GUID_t owner)

Definition at line 266 of file InstanceState.cpp.

References owner_, and owner_lock_.

Referenced by reset_ownership().

◆ state_updated()

void OpenDDS::DCPS::InstanceState::state_updated ( ) const

Definition at line 188 of file InstanceState.cpp.

References handle_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), and reader_.

Referenced by accessed(), data_was_received(), dispose_was_received(), lively(), and unregister_was_received().

189 {
190  RcHandle<DataReaderImpl> reader = reader_.lock();
191  if (reader) {
192  reader->state_updated(handle_);
193  }
194 }
RcHandle< T > lock() const
Definition: RcObject.h:188
WeakRcHandle< DataReaderImpl > reader_
DDS::InstanceHandle_t handle_

◆ unregister_was_received()

bool OpenDDS::DCPS::InstanceState::unregister_was_received ( const GUID_t writer_id)

UNREGISTER message received for this instance. Return flag indicates whether the instance state was changed. This flag is used by concrete DataReader to determine whether it should notify listener. If state is not changed, the unregister message is ignored.

Definition at line 150 of file InstanceState.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), DDS::ALIVE_INSTANCE_STATE, OpenDDS::DCPS::DCPS_debug_level, exclusive_, handle_, instance_state_, LM_DEBUG, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, reader_, OpenDDS::DCPS::OwnershipManager::remove_writer(), schedule_release(), state_updated(), and writers_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::finish_store_instance_data().

151 {
152  if (DCPS_debug_level > 1) {
153  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) InstanceState::unregister_was_received on %C\n"),
154  LogGuid(writer_id).c_str()
155  ));
156  }
157 
159  writers_.erase(writer_id);
160 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
161  if (exclusive_) {
162  // If unregistered by owner then the ownership should be transferred to another
163  // writer.
164  RcHandle<DataReaderImpl> reader = reader_.lock();
165  if (reader) {
166  DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager();
167  if (owner_manager)
168  owner_manager->remove_writer (handle_, writer_id);
169  }
170  }
171 #endif
172 
175  state_updated();
177  return true;
178  }
179 
180  return false;
181 }
#define ACE_DEBUG(X)
DDS::InstanceStateKind instance_state_
const InstanceStateKind ALIVE_INSTANCE_STATE
RcHandle< T > lock() const
Definition: RcObject.h:188
void schedule_release()
Schedule an immediate release of resources.
WeakRcHandle< DataReaderImpl > reader_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
DDS::InstanceHandle_t handle_
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
ACE_Recursive_Thread_Mutex & lock_

◆ view_state()

ACE_INLINE DDS::ViewStateKind OpenDDS::DCPS::InstanceState::view_state ( ) const

Access view state.

Definition at line 46 of file InstanceState.inl.

References ACE_INLINE, and view_state_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_instance_i().

47 {
48  return view_state_;
49 }
DDS::ViewStateKind view_state_

◆ writes_instance()

bool OpenDDS::DCPS::InstanceState::writes_instance ( const GUID_t writer_id) const
inline

Returns true if the writer is a writer of this instance.

Definition at line 121 of file InstanceState.h.

References ACE_GUARD_RETURN, and lock_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::store_instance_data_check().

122  {
124  return writers_.count(writer_id);
125  }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex & lock_

Member Data Documentation

◆ disposed_generation_count_

size_t OpenDDS::DCPS::InstanceState::disposed_generation_count_
private

Number of times the instance state changes from NOT_ALIVE_DISPOSED to ALIVE.

Definition at line 193 of file InstanceState.h.

Referenced by data_was_received(), disposed_generation_count(), most_recent_generation(), and sample_info().

◆ empty_

bool OpenDDS::DCPS::InstanceState::empty_
private

Keep track of whether the DataReader is empty or not.

Definition at line 202 of file InstanceState.h.

Referenced by empty(), and release_if_empty().

◆ exclusive_

bool OpenDDS::DCPS::InstanceState::exclusive_
private

Definition at line 225 of file InstanceState.h.

Referenced by dispose_was_received(), is_exclusive(), and unregister_was_received().

◆ handle_

DDS::InstanceHandle_t OpenDDS::DCPS::InstanceState::handle_
private

◆ instance_state_

DDS::InstanceStateKind OpenDDS::DCPS::InstanceState::instance_state_
private

◆ lock_

ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::InstanceState::lock_
private

◆ no_writers_generation_count_

size_t OpenDDS::DCPS::InstanceState::no_writers_generation_count_
private

Number of times the instance state changes from NOT_ALIVE_NO_WRITERS to ALIVE.

Definition at line 197 of file InstanceState.h.

Referenced by data_was_received(), lively(), most_recent_generation(), no_writers_generation_count(), and sample_info().

◆ owner_

GUID_t OpenDDS::DCPS::InstanceState::owner_
private

Definition at line 224 of file InstanceState.h.

Referenced by get_owner(), and set_owner().

◆ owner_lock_

ACE_Thread_Mutex OpenDDS::DCPS::InstanceState::owner_lock_
private

Definition at line 159 of file InstanceState.h.

Referenced by get_owner(), and set_owner().

◆ reader_

WeakRcHandle<DataReaderImpl> OpenDDS::DCPS::InstanceState::reader_
private

Reference to our containing reader. This is used to call back and notify the reader that liveliness has been lost on this instance. It is also queried to determine if the DataReader is empty – that it contains no more sample data.

Definition at line 220 of file InstanceState.h.

Referenced by data_reader(), dispose_was_received(), release(), reset_ownership(), sample_info(), schedule_release(), state_updated(), unregister_was_received(), and ~InstanceState().

◆ registered_

bool OpenDDS::DCPS::InstanceState::registered_
private

registered with participant so it can be called back as the owner is updated.

Definition at line 228 of file InstanceState.h.

Referenced by registered(), reset_ownership(), and ~InstanceState().

◆ release_pending_

bool OpenDDS::DCPS::InstanceState::release_pending_
private

Keep track of whether the instance is waiting to be released.

Definition at line 207 of file InstanceState.h.

Referenced by cancel_release(), empty(), and schedule_pending().

◆ release_timer_id_

long OpenDDS::DCPS::InstanceState::release_timer_id_
private

Keep track of a scheduled release timer.

Definition at line 212 of file InstanceState.h.

◆ view_state_

DDS::ViewStateKind OpenDDS::DCPS::InstanceState::view_state_
private

Current instance view state.

Can have values defined as:

DDS::NEW_VIEW_STATE DDS::NOT_NEW_VIEW_STATE

and can be checked with the mask:

DDS::ANY_VIEW_STATE

Definition at line 189 of file InstanceState.h.

Referenced by accessed(), data_was_received(), match(), sample_info(), and view_state().

◆ writers_

RepoIdSet OpenDDS::DCPS::InstanceState::writers_
private

The documentation for this class was generated from the following files: