8 #ifndef OPENDDS_DCPS_INTERNAL_DATA_READER_H 9 #define OPENDDS_DCPS_INTERNAL_DATA_READER_H 13 #ifndef ACE_LACKS_PRAGMA_ONCE 23 #include <dds/DdsDcpsCoreC.h> 24 #include <dds/DdsDcpsInfrastructureC.h> 62 #ifndef OPENDDS_SAFETY_PROFILE 88 return si == other.
si;
102 Listener_rch listener = Listener_rch())
104 , listener_(listener)
111 void remove_publication(InternalEntity_wrch publication_handle,
bool autodispose_unregistered_instances)
116 bool schedule =
false;
117 for (
typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ++pos) {
118 if (autodispose_unregistered_instances && pos->second.dispose(publication_handle, qos_)) {
121 if (pos->second.unregister_instance(publication_handle, qos_)) {
127 const Listener_rch listener = listener_.lock();
135 void write(InternalEntity_wrch publication_handle,
const T& sample)
139 const std::pair<typename InstanceMap::iterator, bool> p = instance_map_.insert(std::make_pair(sample, Instance()));
140 p.first->second.write(publication_handle, sample, qos_);
142 const Listener_rch listener = listener_.lock();
148 void dispose(InternalEntity_wrch publication_handle,
const T& sample)
152 typename InstanceMap::iterator pos = instance_map_.find(sample);
153 if (pos == instance_map_.end()) {
157 if (pos->second.dispose(publication_handle, qos_)) {
158 const Listener_rch listener = listener_.lock();
169 typename InstanceMap::iterator pos = instance_map_.find(sample);
170 if (pos == instance_map_.end()) {
174 if (pos->second.unregister_instance(publication_handle, qos_)) {
175 const Listener_rch listener = listener_.lock();
188 listener_ = listener;
194 return listener_.lock();
197 void read(SampleSequence& samples,
198 InternalSampleInfoSequence& infos,
209 for (
typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ) {
210 pos->second.read(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
211 pos->second.purge_samples(qos_);
212 if (pos->second.can_purge_instance(qos_)) {
213 instance_map_.erase(pos++);
220 void take(SampleSequence& samples,
221 InternalSampleInfoSequence& infos,
232 for (
typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ) {
233 pos->second.take(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
234 pos->second.purge_samples(qos_);
235 if (pos->second.can_purge_instance(qos_)) {
236 instance_map_.erase(pos++);
244 InternalSampleInfoSequence& infos,
255 typename InstanceMap::iterator pos = instance_map_.find(key);
256 if (pos != instance_map_.end()) {
257 pos->second.read(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
258 pos->second.purge_samples(qos_);
259 if (pos->second.can_purge_instance(qos_)) {
260 instance_map_.erase(pos);
266 InternalSampleInfoSequence& infos,
277 typename InstanceMap::iterator pos = instance_map_.find(key);
278 if (pos != instance_map_.end()) {
279 pos->second.take(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
280 pos->second.purge_samples(qos_);
281 if (pos->second.can_purge_instance(qos_)) {
282 instance_map_.erase(pos);
303 , disposed_generation_count_(0)
304 , no_writers_generation_count_(0)
305 , informed_of_not_alive_(false)
307 disposed_expiration_date_.sec = 0;
308 disposed_expiration_date_.nanosec = 0;
309 no_writers_expiration_date_.sec = 0;
310 no_writers_expiration_date_.nanosec = 0;
322 not_read_samples_.clear();
323 read_samples_.clear();
330 not_read_samples_.empty() &&
331 read_samples_.empty() &&
332 publication_set_.empty()) {
346 SampleSequence& samples,
347 InternalSampleInfoSequence& infos,
353 if (!((view_states & view_state_) && (instance_states & instance_state_))) {
360 for (
typename SampleList::const_iterator pos = read_samples_.begin(), limit = read_samples_.end();
362 samples.push_back(pos->sample);
363 infos.push_back(
make_sample_info(DDS::READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0,
true));
369 typename SampleList::iterator pos = not_read_samples_.begin();
370 for (
typename SampleList::iterator limit = not_read_samples_.end();
372 samples.push_back(pos->sample);
373 infos.push_back(
make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0,
true));
376 read_samples_.splice(read_samples_.end(), not_read_samples_, not_read_samples_.begin(), pos);
379 if (sample_count == 0 &&
381 !informed_of_not_alive_) {
382 samples.push_back(key);
383 infos.push_back(
make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, disposed_generation_count_, no_writers_generation_count_, 0, 0, 0,
false));
388 compute_ranks(sample_count, infos);
392 informed_of_not_alive_ =
true;
397 SampleSequence& samples,
398 InternalSampleInfoSequence& infos,
404 if (!((view_states & view_state_) && (instance_states & instance_state_))) {
411 typename SampleList::iterator pos = read_samples_.begin();
412 for (
typename SampleList::iterator limit = read_samples_.end();
414 samples.push_back(pos->sample);
415 infos.push_back(
make_sample_info(DDS::READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0,
true));
418 read_samples_.erase(read_samples_.begin(), pos);
422 typename SampleList::iterator pos = not_read_samples_.begin();
423 for (
typename SampleList::iterator limit = not_read_samples_.end();
425 samples.push_back(pos->sample);
426 infos.push_back(
make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0,
true));
429 not_read_samples_.erase(not_read_samples_.begin(), pos);
432 if (sample_count == 0 &&
434 !informed_of_not_alive_) {
435 samples.push_back(key);
436 infos.push_back(
make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, disposed_generation_count_, no_writers_generation_count_, 0, 0, 0,
false));
441 compute_ranks(sample_count, infos);
445 informed_of_not_alive_ =
true;
449 void write(InternalEntity_wrch publication_handle,
453 publication_set_.insert(publication_handle);
459 switch (instance_state_) {
463 ++disposed_generation_count_;
466 ++no_writers_generation_count_;
473 while (read_samples_.size() + not_read_samples_.size() >=
static_cast<size_t>(qos.
history.
depth)) {
474 if (!read_samples_.empty()) {
475 read_samples_.pop_front();
477 not_read_samples_.pop_front();
482 not_read_samples_.push_back(
SampleHolder(sample, disposed_generation_count_, no_writers_generation_count_));
485 bool dispose(InternalEntity_wrch publication_handle,
488 publication_set_.insert(publication_handle);
493 informed_of_not_alive_ =
false;
503 publication_set_.erase(publication_handle);
508 informed_of_not_alive_ =
false;
525 , disposed_generation_count(dgc)
526 , no_writers_generation_count(nwgc)
546 if (sample_count == 0) {
550 typename InternalSampleInfoSequence::reverse_iterator pos = infos.rbegin();
551 const CORBA::Long mrsic = pos->disposed_generation_count + pos->no_writers_generation_count;
552 const CORBA::Long mrs = disposed_generation_count_ + no_writers_generation_count_;
554 for (
CORBA::Long rank = 0; rank != sample_count; ++rank, ++pos) {
555 pos->sample_rank = rank;
556 pos->generation_rank = mrsic - (pos->disposed_generation_count + pos->no_writers_generation_count);
557 pos->absolute_generation_rank = mrs - (pos->disposed_generation_count + pos->no_writers_generation_count);
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Listener_rch get_listener() const
void read(SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
RcHandle< T > rchandle_from(T *pointer)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::Time_t to_dds_time() const
void dispose(InternalEntity_wrch publication_handle, const T &sample)
InstanceStateKind instance_state
const InstanceHandle_t HANDLE_NIL
void write(InternalEntity_wrch publication_handle, const T &sample, const DDS::DataReaderQos &qos)
void take(const T &key, SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
bool can_purge_instance(const DDS::DataReaderQos &qos) const
HistoryQosPolicyKind kind
void read_instance(SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, const T &key, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
bool informed_of_not_alive_
DDS::SampleInfo make_sample_info(DDS::SampleStateKind sample_state, DDS::ViewStateKind view_state, DDS::InstanceStateKind instance_state, CORBA::Long disposed_generation_count, CORBA::Long no_writers_generation_count, CORBA::Long sample_rank, CORBA::Long generation_rank, CORBA::Long absolute_generation_rank, bool valid_data)
bool dispose(InternalEntity_wrch publication_handle, const DDS::DataReaderQos &qos)
void remove_publication(InternalEntity_wrch publication_handle, bool autodispose_unregistered_instances)
DDS::Time_t no_writers_expiration_date_
long absolute_generation_rank
unsigned long InstanceStateMask
InternalDataReader(const DDS::DataReaderQos qos, Listener_rch listener=Listener_rch())
RcHandle< InternalDataReaderListener< T > > Listener_rch
unsigned long SampleStateKind
CORBA::Long disposed_generation_count
void read(const T &key, SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
bool operator==(const DisjointSequence::OrderedRanges< T > &a, const DisjointSequence::OrderedRanges< T > &b)
SampleInfoWrapper(const DDS::SampleInfo &sample_info)
sequence< Publication > PublicationSet
DDS::ViewStateKind view_state_
Duration_t autopurge_disposed_samples_delay
DDS::ViewStateKind view_state() const
void unregister_instance(InternalEntity_wrch publication_handle, const T &sample)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
static TimePoint_T< SystemClock > now()
WeakRcHandle< InternalDataReaderListener< T > > Listener_wrch
const ViewStateKind NOT_NEW_VIEW_STATE
SampleHolder(const T &s, CORBA::Long dgc, CORBA::Long nwgc)
const DDS::DataReaderQos qos_
DDS::Time_t disposed_expiration_date_
InstanceHandle_t publication_handle
CORBA::Long no_writers_generation_count
WeakRcHandle< InternalEntity > InternalEntity_wrch
InstanceHandle_t instance_handle
bool unregister_instance(InternalEntity_wrch publication_handle, const DDS::DataReaderQos &qos)
bool operator==(const SampleInfoWrapper &other) const
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unsigned long InstanceStateKind
long no_writers_generation_count
InstanceMap instance_map_
void take(SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
CORBA::Long no_writers_generation_count_
void purge_samples(const DDS::DataReaderQos &qos)
const ViewStateKind NEW_VIEW_STATE
DDS::InstanceStateKind instance_state_
SampleStateKind sample_state
void compute_ranks(CORBA::Long sample_count, InternalSampleInfoSequence &infos)
unsigned long SampleStateMask
ReaderDataLifecycleQosPolicy reader_data_lifecycle
const SampleStateKind READ_SAMPLE_STATE
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
long disposed_generation_count
ACE_INLINE OpenDDS_Dcps_Export DDS::Time_t make_time_t(int sec, unsigned long nanosec)
ACE_INLINE OpenDDS_Dcps_Export bool is_infinite(const DDS::Duration_t &value)
void take_instance(SampleSequence &samples, InternalSampleInfoSequence &infos, CORBA::Long max_samples, const T &key, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
CORBA::Long disposed_generation_count_
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const SampleStateKind NOT_READ_SAMPLE_STATE
DDS::InstanceStateKind instance_state() const
const long LENGTH_UNLIMITED
PublicationSet publication_set_
unsigned long ViewStateKind
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
void set_listener(Listener_rch listener)
The Internal API and Implementation of OpenDDS.
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
unsigned long ViewStateMask
Duration_t autopurge_nowriter_samples_delay
typedef OPENDDS_LIST(SubsectionPair) KeyList
const InstanceStateKind ALIVE_INSTANCE_STATE
void write(InternalEntity_wrch publication_handle, const T &sample)
SampleList not_read_samples_
typedef OPENDDS_SET(NetworkAddress) AddrSet