OpenDDS  Snapshot(2023/04/28-20:55)
ReceivedDataElementList.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
9 #define OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
10 
11 #include "Atomic.h"
12 #include "dcps_export.h"
13 #include "DataSampleHeader.h"
14 #include "Definitions.h"
15 #include "GuidUtils.h"
16 #include "InstanceState.h"
17 #include "Time_Helper.h"
18 #include "unique_ptr.h"
19 
20 #include <dds/DdsDcpsInfrastructureC.h>
21 
22 #include "ace/Thread_Mutex.h"
23 
24 #if !defined (ACE_LACKS_PRAGMA_ONCE)
25 #pragma once
26 #endif /* ACE_LACKS_PRAGMA_ONCE */
27 
29 
30 namespace OpenDDS {
31 namespace DCPS {
32 
34 public:
36  : pub_(header.publication_id_),
37  registered_data_(received_data),
38  sample_state_(DDS::NOT_READ_SAMPLE_STATE),
39 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
40  coherent_change_(header.coherent_change_),
41  group_coherent_(header.group_coherent_),
42  publisher_id_(header.publisher_id_),
43 #endif
44  valid_data_(received_data != 0),
45  disposed_generation_count_(0),
46  no_writers_generation_count_(0),
47  zero_copy_cnt_(0),
48  sequence_(header.sequence_),
49  previous_data_sample_(0),
50  next_data_sample_(0),
51  ref_count_(1),
52  mx_(mx)
53  {
54  destination_timestamp_ = SystemTimePoint::now().to_dds_time();
55 
56  source_timestamp_.sec = header.source_timestamp_sec_;
57  source_timestamp_.nanosec = header.source_timestamp_nanosec_;
58 
59  /*
60  * In some situations, we will not have data to give to the user and
61  * valid_data is how we communcate that to the user through a SampleInfo.
62  */
63  if (!header.valid_data()) {
64  valid_data_ = false;
65  }
66  }
67 
69 
70  void dec_ref()
71  {
72  if (0 == --ref_count_) {
73  delete this;
74  }
75  }
76 
77  void inc_ref()
78  {
79  ++ref_count_;
80  }
81 
82  long ref_count()
83  {
84  return ref_count_;
85  }
86 
88 
89  /**
90  * Data sample received, could only be the key fields in case we received
91  * dispose and/or unregister message.
92  */
93  void* const registered_data_; // ugly, but works....
94 
95  /// Sample state for this data sample:
96  /// DDS::NOT_READ_SAMPLE_STATE/DDS::READ_SAMPLE_STATE
98 
99  /// Source time stamp for this data sample
101 
102  /// Reception time stamp for this data sample
104 
105 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
106  /// Sample belongs to an active coherent change set
108 
109  /// Sample belongs to a group coherent changes.
111 
112  /// Publisher id represent group identifier.
114 #endif
115 
116  /// Do we contain valid data
118 
119  /// The data sample's instance's disposed_generation_count_
120  /// at the time the sample was received
122 
123  /// The data sample's instance's no_writers_generation_count_
124  /// at the time the sample was received
126 
127  /// This is needed to know if delete DataReader should fail with
128  /// PRECONDITION_NOT_MET because there are outstanding loans.
130 
131  /// The data sample's sequence number
133 
134  /// the previous data sample in the ReceivedDataElementList
136 
137  /// the next data sample in the ReceivedDataElementList
139 
140  void* operator new(size_t size, ACE_New_Allocator& pool);
141  void operator delete(void* memory);
142  void operator delete(void* memory, ACE_New_Allocator& pool);
143 
144 private:
146 protected:
148 }; // class ReceivedDataElement
149 
151 {
154 };
155 
156 
157 template <typename DataTypeWithAllocator>
159 {
160 public:
161  ReceivedDataElementWithType(const DataSampleHeader& header, DataTypeWithAllocator* received_data, ACE_Recursive_Thread_Mutex* mx)
162  : ReceivedDataElement(header, received_data, mx)
163  {
164  }
165 
168  guard,
169  *mx_)
170  delete static_cast<DataTypeWithAllocator*> (registered_data_);
171  }
172 };
173 
175 public:
177  virtual ~ReceivedDataFilter() {}
178 
179  virtual bool operator()(ReceivedDataElement* data_sample) = 0;
180 };
181 
183 public:
186 
187  virtual void operator()(ReceivedDataElement* data_sample) = 0;
188 };
189 
191 public:
192  explicit ReceivedDataElementList(const DataReaderImpl_rch& reader, const InstanceState_rch& instance_state = InstanceState_rch());
193 
195 
196  void apply_all(ReceivedDataFilter& match, ReceivedDataOperation& func);
197 
198  // adds a data sample to the end of the list
199  void add(ReceivedDataElement* data_sample);
200  void add_by_timestamp(ReceivedDataElement* data_sample);
201 
202  // returns true if the instance was released
203  bool remove(ReceivedDataElement* data_sample);
204 
205  // returns true if the instance was released
206  bool remove(ReceivedDataFilter& match, bool eval_all);
207 
208  const ReceivedDataElement* peek_tail() { return tail_; }
209 
210  ReceivedDataElement* remove_head();
211  ReceivedDataElement* remove_tail();
212 
213  size_t size() const { return size_; }
214 
215  bool has_zero_copies() const;
216  bool matches(CORBA::ULong sample_states) const;
217  ReceivedDataElement* get_next_match(CORBA::ULong sample_states, ReceivedDataElement* prev);
218 
219  void mark_read(ReceivedDataElement* item);
220 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
221  void accept_coherent_change(ReceivedDataElement* item);
222 #endif
223 
224 private:
226 
227  /// The first element of the list.
229 
230  /// The last element of the list.
232 
233  /// Number of elements in the list.
234  size_t size_;
235 
239 
240  void increment_read_count();
241  void decrement_read_count();
242  void increment_not_read_count();
243  void decrement_not_read_count();
245 
246  bool sanity_check();
247  bool sanity_check(ReceivedDataElement* item);
248 }; // ReceivedDataElementList
249 
250 } // namespace DCPS
251 } // namespace OpenDDS
252 
254 
255 #if defined (__ACE_INLINE__)
256 # include "ReceivedDataElementList.inl"
257 #endif /* __ACE_INLINE__ */
258 
259 #endif /* OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H */
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
ReceivedDataElement * previous_data_sample_
the previous data sample in the ReceivedDataElementList
DDS::Time_t source_timestamp_
Source time stamp for this data sample.
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
DDS::Time_t destination_timestamp_
Reception time stamp for this data sample.
bool group_coherent_
Sample belongs to a group coherent changes.
ReceivedDataElement(const DataSampleHeader &header, void *received_data, ACE_Recursive_Thread_Mutex *mx)
size_t size_
Number of elements in the list.
ReceivedDataElement * head_
The first element of the list.
unsigned long SampleStateKind
bool valid_data() const
Returns true if the sample has a complete serialized payload.
bool valid_data_
Do we contain valid data.
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
SequenceNumber sequence_
The data sample&#39;s sequence number.
ACE_CDR::ULong ULong
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
size_t size_
ReceivedDataElementWithType(const DataSampleHeader &header, DataTypeWithAllocator *received_data, ACE_Recursive_Thread_Mutex *mx)
GUID_t publisher_id_
Publisher id represent group identifier.
RcHandle< InstanceState > InstanceState_rch
Definition: InstanceState.h:33
The End User API.
ReceivedDataElement * tail_
The last element of the list.
Sequence number abstraction. Only allows positive 64 bit values.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const SampleStateKind NOT_READ_SAMPLE_STATE
ReceivedDataElement * next_data_sample_
the next data sample in the ReceivedDataElementList
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
bool coherent_change_
Sample belongs to an active coherent change set.