OpenDDS  Snapshot(2023/04/28-20:55)
ReceivedDataElementList.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "DataReaderImpl.h"
11 
12 #if !defined (__ACE_INLINE__)
14 #endif /* !__ACE_INLINE__ */
15 
16 namespace {
17 
18 class IdentityFilter
20 public:
21  explicit IdentityFilter(OpenDDS::DCPS::ReceivedDataElement* data_sample)
22  : data_sample_(data_sample)
23  {}
24 
25  bool
27  return this->data_sample_ == data_sample;
28  }
29 
30 private:
32 };
33 
34 } // namespace
35 
36 
37 void* OpenDDS::DCPS::ReceivedDataElement::operator new(size_t , ACE_New_Allocator& pool)
38 {
40  block->allocator_ = &pool;
41  return block;
42 }
43 
44 void OpenDDS::DCPS::ReceivedDataElement::operator delete(void* memory)
45 {
46  if (memory) {
48  block->allocator_->free(block);
49  }
50 }
51 
52 void OpenDDS::DCPS::ReceivedDataElement::operator delete(void* memory, ACE_New_Allocator&)
53 {
54  operator delete(memory);
55 }
56 
58  : reader_(reader), head_(0), tail_(0), size_(0)
59  , read_sample_count_(0), not_read_sample_count_(0), sample_states_(0)
60  , instance_state_(instance_state)
61 {
62 }
63 
65 {
66  // The memory pointed to by instance_state_ is owned by
67  // another object.
68 }
69 
70 void
72 {
73  data_sample->previous_data_sample_ = 0;
74  data_sample->next_data_sample_ = 0;
75 
76  for (ReceivedDataElement* it = head_; it != 0; it = it->next_data_sample_) {
77  if (data_sample->source_timestamp_ < it->source_timestamp_) {
79  data_sample->next_data_sample_ = it;
80 
81  // Are we replacing the head?
82  if (it->previous_data_sample_ == 0) {
83  head_ = data_sample;
84  } else {
85  it->previous_data_sample_->next_data_sample_ = data_sample;
86  }
87  it->previous_data_sample_ = data_sample;
88 
89  ++size_;
90 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
91  if (!data_sample->coherent_change_)
92 #endif
93  {
94  if (data_sample->sample_state_ == DDS::NOT_READ_SAMPLE_STATE) {
96  } else {
98  }
99  }
100 
101  return;
102  }
103  }
104 
105  add(data_sample);
106 }
107 
108 void
110  ReceivedDataFilter& match,
112 {
113  for (ReceivedDataElement* it = head_; it != 0; it = it->next_data_sample_) {
114  if (match(it)) {
115  op(it);
116  }
117  }
118 }
119 
120 bool
122  ReceivedDataFilter& match,
123  bool eval_all)
124 {
125  if (!head_) {
126  return false;
127  }
128 
129  bool released = false;
130 
131  for (ReceivedDataElement* item = head_; item != 0; item = item->next_data_sample_) {
132  if (match(item)) {
133  released = released || remove(item);
134  if (!eval_all) break;
135  }
136  }
137 
138  return released;
139 }
140 
141 bool
143 {
145 
146  if (!head_) {
147  return false;
148  }
149 
150  bool released = false;
151 
152  size_--;
153 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
154  if (!item->coherent_change_)
155 #endif
156  {
159  } else {
161  }
162  }
163  if (item == head_) {
164  if (head_ == tail_) {
165  head_ = tail_ = 0;
166 
167  } else {
168  head_ = item->next_data_sample_;
169 
170  if (head_) {
172  }
173  }
174 
175  } else if (item == tail_) {
177 
178  if (tail_) {
180  }
181 
182  } else {
184  item->next_data_sample_;
186  item->previous_data_sample_;
187  }
188 
189  item->previous_data_sample_ = 0;
190  item->next_data_sample_ = 0;
191 
192  if (instance_state_ && size_ == 0) {
193  // let the instance know it is empty
194  released = instance_state_->empty(true);
195  }
196 
197  return released;
198 }
199 
200 bool
202 {
203  for (ReceivedDataElement* item = head_; item != 0; item = item->next_data_sample_) {
204  if (item->zero_copy_cnt_) {
205  return true;
206  }
207  }
208  return false;
209 }
210 
211 bool
213 {
214  return sample_states_ & sample_states;
215 }
216 
219 {
221  if (prev == tail_) {
222  return 0;
223  }
224  ReceivedDataElement* item = prev ? prev->next_data_sample_ : head_;
225  for (; item != 0; item = item->next_data_sample_) {
226  if ((item->sample_state_ & sample_states)
227 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
228  && !item->coherent_change_
229 #endif
230  ) {
231  return item;
232  }
233  }
234  return 0;
235 }
236 
237 void
239 {
241 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
242  if (!item->coherent_change_)
243 #endif
244  {
249  }
250  }
251 }
252 
253 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
254 void
256 {
258  if (item->coherent_change_) {
259  item->coherent_change_ = false;
261  }
262 }
263 #endif
264 
266 {
267  if (!read_sample_count_) {
269  DataReaderImpl_rch reader(reader_.lock());
270  if (reader) {
271  reader->state_updated(instance_state_->instance_handle());
272  }
273  }
275 }
276 
278 {
281  if (!read_sample_count_) {
283  DataReaderImpl_rch reader(reader_.lock());
284  if (reader) {
285  reader->state_updated(instance_state_->instance_handle());
286  }
287  }
288 }
289 
291 {
292  if (!not_read_sample_count_) {
294  DataReaderImpl_rch reader(reader_.lock());
295  if (reader) {
296  reader->state_updated(instance_state_->instance_handle());
297  }
298  }
300 }
301 
303 {
306  if (!not_read_sample_count_) {
308  DataReaderImpl_rch reader(reader_.lock());
309  if (reader) {
310  reader->state_updated(instance_state_->instance_handle());
311  }
312  }
313 }
314 
316 {
318  for (ReceivedDataElement* item = head_; item != 0; item = item->next_data_sample_) {
320  }
322  return true;
323 }
324 
326 {
327  ACE_UNUSED_ARG(item);
328  OPENDDS_ASSERT(item == 0 || (item->next_data_sample_ == 0 && item == tail_) || (item->next_data_sample_ && item->next_data_sample_->previous_data_sample_ == item));
329  OPENDDS_ASSERT(item == 0 || (item->previous_data_sample_ == 0 && item == head_) || (item->previous_data_sample_ && item->previous_data_sample_->next_data_sample_ == item));
330  return true;
331 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
virtual void free(void *ptr)
ReceivedDataElement * previous_data_sample_
the previous data sample in the ReceivedDataElementList
DDS::Time_t source_timestamp_
Source time stamp for this data sample.
bool empty(bool value)
DataReader has become empty. Returns true if the instance was released.
void add_by_timestamp(ReceivedDataElement *data_sample)
ACE_Message_Block & head_
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
void add(ReceivedDataElement *data_sample)
size_t size_
Number of elements in the list.
ReceivedDataElement * head_
The first element of the list.
ACE_CDR::ULong ULong
virtual bool operator()(ReceivedDataElement *data_sample)=0
void accept_coherent_change(ReceivedDataElement *item)
DDS::InstanceHandle_t instance_handle() const
size_t size_
The End User API.
ReceivedDataElement * tail_
The last element of the list.
const SampleStateKind READ_SAMPLE_STATE
bool remove(ReceivedDataElement *data_sample)
ReceivedDataElementList(const DataReaderImpl_rch &reader, const InstanceState_rch &instance_state=InstanceState_rch())
const SampleStateKind NOT_READ_SAMPLE_STATE
void apply_all(ReceivedDataFilter &match, ReceivedDataOperation &func)
RcHandle< T > lock() const
Definition: RcObject.h:188
bool matches(CORBA::ULong sample_states) const
ReceivedDataElement * next_data_sample_
the next data sample in the ReceivedDataElementList
bool coherent_change_
Sample belongs to an active coherent change set.