OpenDDS  Snapshot(2023/04/28-20:55)
InstanceState.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 #include "ace/Event_Handler.h"
10 #include "ace/Reactor.h"
11 #include "InstanceState.h"
12 #include "DataReaderImpl.h"
13 #include "SubscriptionInstance.h"
15 #include "Time_Helper.h"
16 #include "DomainParticipantImpl.h"
17 #include "GuidConverter.h"
18 
19 #if !defined (__ACE_INLINE__)
20 # include "InstanceState.inl"
21 #endif /* !__ACE_INLINE__ */
22 
24 
25 namespace OpenDDS {
26 namespace DCPS {
27 
30  DDS::InstanceHandle_t handle)
32  TheServiceParticipant->reactor_owner()),
33  lock_(lock),
34  instance_state_(0),
35  view_state_(0),
36  disposed_generation_count_(0),
37  no_writers_generation_count_(0),
38  empty_(true),
39  release_pending_(false),
40  release_timer_id_(-1),
41  reader_(reader),
42  handle_(handle),
43  owner_(GUID_UNKNOWN),
44 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
45  exclusive_(reader->qos_.ownership.kind == DDS::EXCLUSIVE_OWNERSHIP_QOS),
46 #endif
47  registered_(false)
48 {}
49 
51 {
52 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
53  if (registered_) {
55  if (reader) {
56  DataReaderImpl::OwnershipManagerPtr om = reader->ownership_manager();
57  if (om) om->remove_instance(this);
58  }
59  }
60 #endif
61 }
62 
64 {
65  si.sample_state = de->sample_state_;
75  if (reader) {
76  RcHandle<DomainParticipantImpl> participant = reader->participant_servant_.lock();
77  si.publication_handle = participant ? participant->lookup_handle(de->pub_) : DDS::HANDLE_NIL;
78  } else {
80  }
81  si.valid_data = de->valid_data_;
82  /*
83  * These are actually calculated later...
84  */
85  si.sample_rank = 0;
86 
87  // these aren't the real value, they're being saved
88  // for a later calculation. the actual value is
89  // calculated in DataReaderImpl::sample_info using
90  // these values.
91  si.generation_rank =
92  static_cast<CORBA::Long>(de->disposed_generation_count_ +
95  static_cast<CORBA::Long>(de->disposed_generation_count_ +
97 
99 }
100 
101 // cannot ACE_INLINE because of #include loop
102 
104 {
105  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
106 
107  if (DCPS_debug_level) {
109  ACE_TEXT("(%P|%t) NOTICE:")
110  ACE_TEXT(" InstanceState::handle_timeout:")
111  ACE_TEXT(" autopurging samples with instance handle 0x%x!\n"),
112  handle_));
113  }
114  release();
115 
116  return 0;
117 }
118 
120 {
122  writers_.erase(writer_id);
123 
124  //
125  // Manage the instance state on disposal here.
126  //
127  // If disposed by owner then the owner is not re-elected, it can
128  // resume if the writer sends message again.
130 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
132  if (reader) {
133  DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager();
134  if (! exclusive_
135  || (owner_manager && owner_manager->is_owner (handle_, writer_id))) {
136 #endif
138  state_updated();
140  return true;
141 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
142  }
143  }
144 #endif
145  }
146 
147  return false;
148 }
149 
151 {
152  if (DCPS_debug_level > 1) {
153  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) InstanceState::unregister_was_received on %C\n"),
154  LogGuid(writer_id).c_str()
155  ));
156  }
157 
159  writers_.erase(writer_id);
160 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
161  if (exclusive_) {
162  // If unregistered by owner then the ownership should be transferred to another
163  // writer.
165  if (reader) {
166  DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager();
167  if (owner_manager)
168  owner_manager->remove_writer (handle_, writer_id);
169  }
170  }
171 #endif
172 
175  state_updated();
177  return true;
178  }
179 
180  return false;
181 }
182 
184 {
185  release_pending_ = true;
186 }
187 
189 {
191  if (reader) {
192  reader->state_updated(handle_);
193  }
194 }
195 
197 {
198  DDS::DataReaderQos qos;
200  if (reader) {
201  qos = reader->qos_;
202  } else {
203  cancel_release();
204  return;
205  }
206 
207  DDS::Duration_t delay;
208 
209  switch (instance_state_) {
212  break;
213 
216  break;
217 
218  default:
220  ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
221  ACE_TEXT(" Unsupported instance state: %d!\n"),
222  instance_state_));
223  return;
224  }
225 
226  if (delay.sec != DDS::DURATION_INFINITE_SEC &&
228 
229  execute_or_enqueue(make_rch<ScheduleCommand>(this, TimeDuration(delay)));
230 
231  } else {
232  // N.B. instance transitions are always followed by a non-valid
233  // sample being queued to the ReceivedDataElementList; marking
234  // the release as pending prevents this sample from being lost
235  // if all samples have been already removed from the instance.
237  }
238 }
239 
241 {
242  release_pending_ = false;
243  execute_or_enqueue(make_rch<CancelCommand>(this));
244 }
245 
247 {
248  bool released = false;
249  if (empty_ && writers_.empty()) {
250  release();
251  released = true;
252  } else {
254  }
255  return released;
256 }
257 
259 {
261  if (reader) {
262  reader->release_instance(handle_);
263  }
264 }
265 
267 {
269  owner_ = owner;
270 }
271 
273 {
275  return owner_;
276 }
277 
279 {
280  return exclusive_;
281 }
282 
284 {
286  const bool ret = registered_;
287  registered_ = true;
288  return ret;
289 }
290 
292 {
294  registered_ = flag;
295 }
296 
298 {
301  registered_ = false;
302 
304  if (reader) {
305  reader->reset_ownership(instance);
306  }
307 }
308 
310 {
313 }
314 
316 {
317  return TheServiceParticipant->is_shut_down();
318 }
319 
321 {
322  if (instance_state_->release_timer_id_ != -1) {
323  instance_state_->reactor()->cancel_timer(instance_state_);
324  instance_state_->release_timer_id_ = -1;
325  }
326 }
327 
329 {
330  if (instance_state_->release_timer_id_ != -1) {
331  instance_state_->reactor()->cancel_timer(instance_state_);
332  }
333 
334  instance_state_->release_timer_id_ =
335  instance_state_->reactor()->schedule_timer(instance_state_, 0, delay_.value());
336 
337  if (instance_state_->release_timer_id_ == -1) {
339  ACE_TEXT("(%P|%t) ERROR: InstanceState::ScheduleCommand::execute:")
340  ACE_TEXT(" Unable to schedule timer!\n")));
341  }
342 }
343 
345 {
346  switch (value) {
348  return "ALIVE_INSTANCE_STATE";
350  return "NOT_ALIVE_INSTANCE_STATE";
352  return "NOT_ALIVE_DISPOSED_INSTANCE_STATE";
354  return "NOT_ALIVE_NO_WRITERS_INSTANCE_STATE";
356  return "ANY_INSTANCE_STATE";
357  default:
358  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: InstanceState::instance_state_string: ")
359  ACE_TEXT("%d is either invalid or not recognized.\n"),
360  value));
361 
362  return "Invalid instance state";
363  }
364 }
365 
367 {
368  if (mask == DDS::ANY_INSTANCE_STATE) {
370  }
371  if (mask == DDS::NOT_ALIVE_INSTANCE_STATE) {
373  }
374  OPENDDS_STRING str;
375  if (mask & DDS::ALIVE_INSTANCE_STATE) {
376  str = instance_state_string(DDS::ALIVE_INSTANCE_STATE);
377  }
379  if (!str.empty()) str += " | ";
380  str += instance_state_string(DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
381  }
383  if (!str.empty()) str += " | ";
384  str += instance_state_string(DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE);
385  }
386  return str;
387 }
388 
389 }
390 }
391 
#define ACE_DEBUG(X)
ACE_CDR::Long Long
#define ACE_ERROR(X)
const LogLevel::Value value
Definition: debug.cpp:61
InstanceStateKind instance_state
const InstanceHandle_t HANDLE_NIL
DDS::Time_t source_timestamp_
Source time stamp for this data sample.
ViewStateKind view_state
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool is_owner(const DDS::InstanceHandle_t &instance_handle, const GUID_t &pub_id)
bool dispose_was_received(const GUID_t &writer_id)
virtual int handle_timeout(const ACE_Time_Value &current_time, const void *arg)
void release()
Remove the instance immediately.
DDS::ViewStateKind view_state_
CommandPtr execute_or_enqueue(CommandPtr command)
long absolute_generation_rank
void remove_instance(InstanceState *instance_state)
void set_owner(const GUID_t &owner)
unsigned long InstanceStateMask
bool unregister_was_received(const GUID_t &writer_id)
ACE_Guard< ACE_Thread_Mutex > lock_
static OPENDDS_STRING instance_state_mask_string(DDS::InstanceStateMask mask)
Return string representation of the instance state mask passed.
void sample_info(DDS::SampleInfo &si, const ReceivedDataElement *de)
Populate the SampleInfo structure.
bool valid_data_
Do we contain valid data.
long long opendds_reserved_publication_seq
SequenceNumber sequence_
The data sample&#39;s sequence number.
#define OPENDDS_STRING
ACE_Thread_Mutex owner_lock_
LM_DEBUG
const char * instance_state_string() const
Return string of the name of the current instance state.
InstanceHandle_t publication_handle
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
InstanceHandle_t instance_handle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unsigned long InstanceStateKind
long no_writers_generation_count
const InstanceStateMask ANY_INSTANCE_STATE
LM_NOTICE
DDS::InstanceStateKind instance_state_
SampleStateKind sample_state
bool most_recent_generation(ReceivedDataElement *item) const
The End User API.
void reset_ownership(DDS::InstanceHandle_t instance)
ReaderDataLifecycleQosPolicy reader_data_lifecycle
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_Recursive_Thread_Mutex & lock_
ACE_TEXT("TCP_Factory")
DDS::InstanceHandle_t handle_
long disposed_generation_count
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
void remove_writer(const GUID_t &pub_id)
void schedule_release()
Schedule an immediate release of resources.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void schedule_pending()
Schedule a pending release of resources.
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const InstanceStateMask NOT_ALIVE_INSTANCE_STATE
RcHandle< T > lock() const
Definition: RcObject.h:188
void cancel_release()
Cancel a scheduled or pending release of resources.
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
Time_t source_timestamp
const InstanceStateKind ALIVE_INSTANCE_STATE
InstanceState(const DataReaderImpl_rch &reader, ACE_Recursive_Thread_Mutex &lock, DDS::InstanceHandle_t handle)
WeakRcHandle< DataReaderImpl > reader_