OpenDDS  Snapshot(2023/04/28-20:55)
WriterInfo.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
8 #include "dcps_export.h"
9 
10 #include "DataReaderImpl.h"
11 #include "GuidConverter.h"
12 #include "Service_Participant.h"
13 #include "Time_Helper.h"
14 #include "WriterInfo.h"
15 
16 #include "ace/OS_NS_sys_time.h"
17 
19 
20 namespace OpenDDS {
21 namespace DCPS {
22 
24  : subscription_id_(GUID_UNKNOWN)
25 {
26 }
27 
29 {
30 }
31 
32 /// tell instances when a DataWriter transitions to being alive
33 /// The writer state is inout parameter, it has to be set ALIVE before
34 /// handle_timeout is called since some subroutine use the state.
35 void
37  const MonotonicTimePoint&)
38 {
39 }
40 
41 /// tell instances when a DataWriter transitions to DEAD
42 /// The writer state is inout parameter, the state is set to DEAD
43 /// when it returns.
44 void
46 {
47 }
48 
49 /// tell instance when a DataWriter is removed.
50 /// The liveliness status need update.
51 void
53 {
54 }
55 
57  const GUID_t& writer_id,
58  const ::DDS::DataWriterQos& writer_qos)
59  : last_liveliness_activity_time_(MonotonicTimePoint::now())
60  , historic_samples_timer_(NO_TIMER)
61  , last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
62  , waiting_for_end_historic_samples_(false)
63  , delivering_historic_samples_(false)
64  , delivering_historic_samples_cv_(mutex_)
65  , state_(NOT_SET)
66  , reader_(reader)
67  , writer_id_(writer_id)
68  , writer_qos_(writer_qos)
69  , handle_(DDS::HANDLE_NIL)
70 {
71 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
72  reset_coherent_info();
73 #endif
74 
75  if (DCPS_debug_level >= 5) {
77  ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ")
78  ACE_TEXT("writer %C added to reader %C.\n"),
79  LogGuid(writer_id).c_str(),
80  LogGuid(reader->subscription_id_).c_str()));
81  }
82 }
83 
84 const char* WriterInfo::get_state_str() const
85 {
86  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
87  switch (state_) {
88  case NOT_SET:
89  return "NOT_SET";
90  case ALIVE:
91  return "ALIVE";
92  case DEAD:
93  return "DEAD";
94  default:
95  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: WriterInfo::get_state_str: ")
96  ACE_TEXT("%d is either invalid or not recognized.\n"),
97  state_));
98  return "Invalid state";
99  }
100 }
101 
102 void
104 {
105  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
106  const void* arg = reinterpret_cast<const void*>(this);
107  historic_samples_timer_ = sweeper->reactor()->schedule_timer(sweeper, arg, ten_seconds);
108 }
109 
110 void
112 {
113  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
114  if (historic_samples_timer_ != WriterInfo::NO_TIMER) {
115  sweeper->reactor()->cancel_timer(historic_samples_timer_);
116  historic_samples_timer_ = WriterInfo::NO_TIMER;
117  }
118 }
119 
120 bool
122 {
123  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
124  while (delivering_historic_samples_) {
125  delivering_historic_samples_cv_.wait(TheServiceParticipant->get_thread_status_manager());
126  }
127  if (waiting_for_end_historic_samples_) {
128  bool result = false;
129  RcHandle<WriterInfo> info = rchandle_from(this);
130  if (!historic_samples_.empty()) {
131  last_historic_seq_ = historic_samples_.rbegin()->first;
132  delivering_historic_samples_ = true;
133  to_deliver.swap(historic_samples_);
134  result = true;
135  }
136  guard.release();
137  sweeper->cancel_timer(info);
138  return result;
139  }
140  return false;
141 }
142 
143 bool
144 WriterInfo::check_historic(const SequenceNumber& seq, const ReceivedDataSample& sample, SequenceNumber& last_historic_seq)
145 {
146  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
147  last_historic_seq = last_historic_seq_;
148  if (waiting_for_end_historic_samples_) {
149  historic_samples_.insert(std::make_pair(seq, sample));
150  return true;
151  }
152  return false;
153 }
154 
155 void
157 {
158  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
159  delivering_historic_samples_ = false;
160  delivering_historic_samples_cv_.notify_all();
161 }
162 
163 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
164 void
166 {
167  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
168  if (coherent_samples_ == 0) {
169  static const SequenceNumber defaultSN;
170  const SequenceRange resetRange(defaultSN, seq);
171  coherent_sample_sequence_.reset();
172  coherent_sample_sequence_.insert(resetRange);
173  }
174  else {
175  coherent_sample_sequence_.insert(seq);
176  }
177 }
178 
179 void
180 WriterInfo::coherent_change(bool group_coherent, const GUID_t& publisher_id)
181 {
182  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
183  group_coherent_ = group_coherent;
184  publisher_id_ = publisher_id;
185  ++coherent_samples_;
186 }
187 #endif
188 
189 void
191 {
192  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
193  owner_evaluated_.clear();
194 }
195 
196 void
198 {
199  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
200  if (flag ||
201  (!flag && owner_evaluated_.find(instance) != owner_evaluated_.end())) {
202  owner_evaluated_[instance] = flag;
203  }
204 }
205 
206 bool
208 {
209  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
210  OwnerEvaluateFlags::iterator iter = owner_evaluated_.find(instance);
211  if (iter == owner_evaluated_.end()) {
212  owner_evaluated_.insert(OwnerEvaluateFlags::value_type(instance, false));
213  return false;
214  }
215  else
216  return iter->second;
217 }
218 
221 {
223 
224  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
225 
226  WriterInfoListener_rch reader = reader_.lock();
227 
228  // We only need check the liveliness with the non-zero liveliness_lease_duration_.
229  if (state_ == ALIVE && reader && !reader->liveliness_lease_duration_.is_zero()) {
230  expires_at = last_liveliness_activity_time_ + reader->liveliness_lease_duration_;
231 
232  if (expires_at <= now) {
233  // let all instances know this write is not alive.
234  guard.release();
235  reader->writer_became_dead(*this);
236  expires_at = MonotonicTimePoint::max_value;
237  }
238  }
239 
240  return expires_at;
241 }
242 
243 void
245 {
246  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
247  WriterInfoListener_rch reader = reader_.lock();
248  guard.release();
249  if (reader) {
250  reader->writer_removed(*this);
251  }
252 }
253 
254 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
257 {
258  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
259  if (writer_coherent_samples_.num_samples_ == 0) {
260  return NOT_COMPLETED_YET;
261  }
262 
263  if (!coherent_sample_sequence_.disjoint()
264  && (coherent_sample_sequence_.high()
265  == writer_coherent_samples_.last_sample_)) {
266  return COMPLETED;
267  }
268 
269  if (coherent_sample_sequence_.high() >
270  writer_coherent_samples_.last_sample_) {
271  return REJECTED;
272  }
273 
274  return NOT_COMPLETED_YET;
275 }
276 
277 void
279 {
280  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
281  coherent_samples_ = 0;
282  group_coherent_ = false;
283  publisher_id_ = GUID_UNKNOWN;
284  coherent_sample_sequence_.reset();
285  writer_coherent_samples_.reset();
286  group_coherent_samples_.clear();
287 }
288 
289 
290 void
292 {
293  ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
294  if (!(publisher_id_ == info.publisher_id_)
295  || group_coherent_ != info.group_coherent_) {
296  WriterInfoListener_rch reader = reader_.lock();
298  ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()")
299  ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"),
300  reader ? LogGuid(reader->subscription_id_).c_str() : "",
301  LogGuid(writer_id_).c_str()));
302  }
303 
304  writer_coherent_samples_ = info.coherent_samples_;
305  group_coherent_samples_ = info.group_coherent_samples_;
306 }
307 
308 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
309 
310 }
311 }
312 
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
void cancel_historic_samples_timer(EndHistoricSamplesMissedSweeper *sweeper)
Definition: WriterInfo.cpp:111
const InstanceHandle_t HANDLE_NIL
bool is_owner_evaluated(DDS::InstanceHandle_t instance)
Definition: WriterInfo.cpp:207
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void set_group_info(const CoherentChangeControl &info)
Definition: WriterInfo.cpp:291
void schedule_historic_samples_timer(EndHistoricSamplesMissedSweeper *sweeper, const ACE_Time_Value &ten_seconds)
Definition: WriterInfo.cpp:103
const char * c_str() const
virtual void writer_became_dead(WriterInfo &info)
Definition: WriterInfo.cpp:45
int release(void)
Coherent_State coherent_change_received()
Definition: WriterInfo.cpp:256
MonotonicTimePoint check_activity(const MonotonicTimePoint &now)
Definition: WriterInfo.cpp:220
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
bool check_historic(const SequenceNumber &seq, const ReceivedDataSample &sample, SequenceNumber &last_historic_seq)
Definition: WriterInfo.cpp:144
LM_DEBUG
const char * get_state_str() const
Definition: WriterInfo.cpp:84
virtual void reactor(ACE_Reactor *reactor)
Holds a data sample received by the transport.
void set_owner_evaluated(DDS::InstanceHandle_t instance, bool flag)
Definition: WriterInfo.cpp:197
static const TimePoint_T< MonotonicClock > max_value
Definition: TimePoint_T.h:41
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
void coherent_change(bool group_coherent, const GUID_t &publisher_id)
Definition: WriterInfo.cpp:180
std::pair< SequenceNumber, SequenceNumber > SequenceRange
void add_coherent_samples(const SequenceNumber &seq)
Definition: WriterInfo.cpp:165
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void removed()
update liveliness when remove_association is called.
Definition: WriterInfo.cpp:244
Sequence number abstraction. Only allows positive 64 bit values.
WriterInfo(const WriterInfoListener_rch &reader, const GUID_t &writer_id, const DDS::DataWriterQos &writer_qos)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
bool check_end_historic_samples(EndHistoricSamplesMissedSweeper *sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&to_deliver)
Definition: WriterInfo.cpp:121
virtual void writer_became_alive(WriterInfo &info, const MonotonicTimePoint &when)
Definition: WriterInfo.cpp:36
virtual void writer_removed(WriterInfo &info)
Definition: WriterInfo.cpp:52
#define TheServiceParticipant
Keeps track of a DataWriter&#39;s liveliness for a DataReader.
Definition: WriterInfo.h:81
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.