OpenDDS  Snapshot(2023/04/28-20:55)
WriterInfo.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 
9 #ifndef OPENDDS_DCPS_WRITERINFO_H
10 #define OPENDDS_DCPS_WRITERINFO_H
11 
12 #include "Atomic.h"
13 #include "CoherentChangeControl.h"
14 #include "ConditionVariable.h"
15 #include "Definitions.h"
16 #include "DisjointSequence.h"
17 #include "PoolAllocator.h"
18 #include "RcObject.h"
19 #include "TimeTypes.h"
21 
22 #include <dds/DdsDcpsInfoUtilsC.h>
23 #include <dds/DdsDcpsCoreC.h>
24 
26 class ACE_Reactor;
27 class ACE_Event_Handler;
29 
31 
32 namespace OpenDDS {
33 namespace DCPS {
34 
35 class WriterInfo;
36 class EndHistoricSamplesMissedSweeper;
37 
39 {
40 public:
42  virtual ~WriterInfoListener();
43 
45 
46  /// The time interval for checking liveliness.
47  /// TBD: Should this be initialized with
48  /// DDS::DURATION_INFINITE_SEC and DDS::DURATION_INFINITE_NSEC
49  /// instead of ACE_Time_Value::zero to be consistent with default
50  /// duration qos ? Or should we simply use the ACE_Time_Value::zero
51  /// to indicate the INFINITY duration ?
53 
54  /// tell instances when a DataWriter transitions to being alive
55  /// The writer state is inout parameter, it has to be set ALIVE before
56  /// handle_timeout is called since some subroutine use the state.
57  virtual void writer_became_alive(WriterInfo& info,
58  const MonotonicTimePoint& when);
59 
60  /// tell instances when a DataWriter transitions to DEAD
61  /// The writer state is inout parameter, the state is set to DEAD
62  /// when it returns.
63  virtual void writer_became_dead(WriterInfo& info);
64 
65  /// tell instance when a DataWriter is removed.
66  /// The liveliness status need update.
67  virtual void writer_removed(WriterInfo& info);
68 };
69 
71 
72 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
77 };
78 #endif
79 
80 /// Keeps track of a DataWriter's liveliness for a DataReader.
81 class OpenDDS_Dcps_Export WriterInfo : public virtual RcObject {
82  friend class WriteInfoListner;
83 
84 public:
85  enum WriterState { NOT_SET, ALIVE, DEAD };
86  enum TimerState { NO_TIMER = -1 };
87 
88  WriterInfo(const WriterInfoListener_rch& reader,
89  const GUID_t& writer_id,
90  const DDS::DataWriterQos& writer_qos);
91 
92  /// check to see if this writer is alive (called by handle_timeout).
93  /// @param now next monotonic time this DataWriter will become not active (not alive)
94  /// if no sample or liveliness message is received.
95  /// @returns montonic time when the Writer will become not active (if no activity)
96  /// of MonotonicTimePoint::max_value if the writer is already or became not alive
97  MonotonicTimePoint check_activity(const MonotonicTimePoint& now);
98 
99  /// called when a sample or other activity is received from this writer.
100  void received_activity(const MonotonicTimePoint& when);
101 
102  /// returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
104  {
105  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
106  return state_;
107  };
108 
110  {
111  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
112  state_ = state;
113  };
114 
116  {
117  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
118  return handle_;
119  }
120 
122  {
123  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
124  handle_ = handle;
125  };
126 
128  {
129  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
130  return writer_id_;
131  }
132 
134  {
135  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
136  return writer_qos_.ownership_strength.value;
137  }
138 
139  void writer_qos_ownership_strength(const CORBA::Long writer_qos_ownership_strength)
140  {
141  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
142  writer_qos_.ownership_strength.value = writer_qos_ownership_strength;
143  }
144 
146  {
147  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
148  return waiting_for_end_historic_samples_;
149  }
150 
151  void waiting_for_end_historic_samples(bool waiting_for_end_historic_samples)
152  {
153  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
154  waiting_for_end_historic_samples_ = waiting_for_end_historic_samples;
155  }
156 
158  {
159  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
160  return last_historic_seq_;
161  }
162 
163  void last_historic_seq(const SequenceNumber& last_historic_seq)
164  {
165  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
166  last_historic_seq_ = last_historic_seq;
167  }
168 
169  const char* get_state_str() const;
170 
171  /// update liveliness when remove_association is called.
172  void removed();
173 
174 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
175  Coherent_State coherent_change_received();
176  void reset_coherent_info();
177  void set_group_info(const CoherentChangeControl& info);
178  void add_coherent_samples(const SequenceNumber& seq);
179  void coherent_change(bool group_coherent, const GUID_t& publisher_id);
180 
181  bool group_coherent() const {
182  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
183  return group_coherent_;
184  }
185 
187  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
188  return publisher_id_;
189  }
190 
191 #endif
192 
193  void clear_owner_evaluated();
194  void set_owner_evaluated(DDS::InstanceHandle_t instance, bool flag);
195  bool is_owner_evaluated(DDS::InstanceHandle_t instance);
196 
197  void schedule_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper, const ACE_Time_Value& ten_seconds);
198  void cancel_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper);
199  bool check_end_historic_samples(EndHistoricSamplesMissedSweeper* sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& to_deliver);
200  bool check_historic(const SequenceNumber& seq, const ReceivedDataSample& sample, SequenceNumber& last_historic_seq);
201  void finished_delivering_historic();
202 
203 private:
204 
206 
207  /// Timestamp of last write/dispose/assert_liveliness from this DataWriter
209 
210  // Non-negative if this a durable writer which has a timer scheduled
212 
213  /// Temporary holding place for samples received before
214  /// the END_HISTORIC_SAMPLES control message.
215  OPENDDS_MAP(SequenceNumber, ReceivedDataSample) historic_samples_;
216 
217  /// After receiving END_HISTORIC_SAMPLES, check for duplicates
219 
221 
224 
225  /// State of the writer.
227 
228  /// The DataReader owning this WriterInfo
230 
231  /// DCPSInfoRepo ID of the DataWriter
233 
234  /// Writer qos
236 
237  /// The publication entity instance handle.
239 
240  /// Number of received coherent changes in active change set.
242 
243  /// Is this writer evaluated for owner ?
244  typedef OPENDDS_MAP(DDS::InstanceHandle_t, bool) OwnerEvaluateFlags;
245  OwnerEvaluateFlags owner_evaluated_;
246 
247  /// Data to support GROUP access scope.
248 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
253  GroupCoherentSamples group_coherent_samples_;
254 #endif
255 
256 };
257 
258 inline
259 void
261 {
262  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
263 
264  last_liveliness_activity_time_ = when;
265 
266  if (state_ != ALIVE) { // NOT_SET || DEAD
267  RcHandle<WriterInfoListener> reader = reader_.lock();
268  guard.release();
269  if (reader) {
270  reader->writer_became_alive(*this, when);
271  }
272  }
273 }
274 
276 
277 } // namespace DCPS
278 } // namespace
279 
281 
282 #endif /* end of include guard: OPENDDS_DCPS_WRITERINFO_H */
#define ACE_BEGIN_VERSIONED_NAMESPACE_DECL
ACE_CDR::Long Long
void handle(DDS::InstanceHandle_t handle)
Definition: WriterInfo.h:121
OwnerEvaluateFlags owner_evaluated_
Definition: WriterInfo.h:245
DisjointSequence coherent_sample_sequence_
Definition: WriterInfo.h:251
Atomic< ACE_UINT32 > coherent_samples_
Number of received coherent changes in active change set.
Definition: WriterInfo.h:241
DDS::InstanceHandle_t handle() const
Definition: WriterInfo.h:115
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
GroupCoherentSamples group_coherent_samples_
Definition: WriterInfo.h:253
WriterCoherentSample writer_coherent_samples_
Definition: WriterInfo.h:252
int release(void)
ACE_Thread_Mutex mutex_
Definition: WriterInfo.h:205
void last_historic_seq(const SequenceNumber &last_historic_seq)
Definition: WriterInfo.h:163
bool waiting_for_end_historic_samples() const
Definition: WriterInfo.h:145
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness from this DataWriter.
Definition: WriterInfo.h:208
DDS::DataWriterQos writer_qos_
Writer qos.
Definition: WriterInfo.h:235
WeakRcHandle< WriterInfoListener > reader_
The DataReader owning this WriterInfo.
Definition: WriterInfo.h:229
void writer_qos_ownership_strength(const CORBA::Long writer_qos_ownership_strength)
Definition: WriterInfo.h:139
RcHandle< WriterInfoListener > WriterInfoListener_rch
Definition: WriterInfo.h:70
GUID_t publisher_id() const
Definition: WriterInfo.h:186
bool group_coherent() const
Definition: WriterInfo.h:181
bool group_coherent_
Data to support GROUP access scope.
Definition: WriterInfo.h:249
Holds a data sample received by the transport.
RcHandle< WriterInfo > WriterInfo_rch
Definition: WriterInfo.h:275
SequenceNumber last_historic_seq_
After receiving END_HISTORIC_SAMPLES, check for duplicates.
Definition: WriterInfo.h:218
#define ACE_END_VERSIONED_NAMESPACE_DECL
void waiting_for_end_historic_samples(bool waiting_for_end_historic_samples)
Definition: WriterInfo.h:151
DDS::InstanceHandle_t handle_
The publication entity instance handle.
Definition: WriterInfo.h:238
SequenceNumber last_historic_seq() const
Definition: WriterInfo.h:157
TimeDuration liveliness_lease_duration_
Definition: WriterInfo.h:52
WriterState state_
State of the writer.
Definition: WriterInfo.h:226
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
Sequence number abstraction. Only allows positive 64 bit values.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void received_activity(const MonotonicTimePoint &when)
called when a sample or other activity is received from this writer.
Definition: WriterInfo.h:260
void state(WriterState state)
Definition: WriterInfo.h:109
GUID_t writer_id_
DCPSInfoRepo ID of the DataWriter.
Definition: WriterInfo.h:232
GUID_t writer_id() const
Definition: WriterInfo.h:127
Keeps track of a DataWriter&#39;s liveliness for a DataReader.
Definition: WriterInfo.h:81
ConditionVariable< ACE_Thread_Mutex > delivering_historic_samples_cv_
Definition: WriterInfo.h:223
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
CORBA::Long writer_qos_ownership_strength() const
Definition: WriterInfo.h:133
WriterState state() const
returns 1 if the DataWriter is lively; 2 if dead; otherwise returns 0.
Definition: WriterInfo.h:103