OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::RakeResults< MessageType > Class Template Reference

#include <RakeResults_T.h>

Collaboration diagram for OpenDDS::DCPS::RakeResults< MessageType >:
Collaboration graph
[legend]

Classes

struct  InstanceData
 
class  SortedSetCmp
 

Public Member Functions

 RakeResults (DataReaderImpl *reader, SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::PresentationQosPolicy presentation, DDS::QueryCondition_ptr cond, Operation_t oper)
 
bool insert_sample (ReceivedDataElement *sample, ReceivedDataElementList *rdel, SubscriptionInstance_rch instance, size_t index_in_instance)
 
bool copy_to_user ()
 

Private Types

typedef DDSTraits< MessageType >::MessageSequenceType SampleSeq
 
typedef DDSTraits< MessageType >::MessageSequenceAdapterType MessageSequenceAdapterType
 

Private Member Functions

template<class FwdIter >
bool copy_into (FwdIter begin, FwdIter end, MessageSequenceAdapterType &received_data_p)
 
 RakeResults (const RakeResults &)
 
RakeResultsoperator= (const RakeResults &)
 
typedef OPENDDS_MULTISET_CMP (RakeData, SortedSetCmp) SortedSet
 
 OPENDDS_VECTOR (RakeData) unsorted_
 
typedef OPENDDS_VECTOR (CORBA::ULong) IndexList
 

Private Attributes

DataReaderImplreader_
 
SampleSeqreceived_data_
 
DDS::SampleInfoSeqinfo_seq_
 
CORBA::ULong max_samples_
 
DDS::QueryCondition_ptr cond_
 
Operation_t oper_
 
bool do_sort_
 
bool do_filter_
 
SortedSet sorted_
 

Detailed Description

template<class MessageType>
class OpenDDS::DCPS::RakeResults< MessageType >

Rake is an abbreviation for "read or take". This class manages the results from a read() or take() operation, which are the received_data and the info_seq sequences passed in by-reference from the user.

Definition at line 34 of file RakeResults_T.h.

Member Typedef Documentation

◆ MessageSequenceAdapterType

template<class MessageType>
typedef DDSTraits<MessageType>::MessageSequenceAdapterType OpenDDS::DCPS::RakeResults< MessageType >::MessageSequenceAdapterType
private

Definition at line 37 of file RakeResults_T.h.

◆ SampleSeq

template<class MessageType>
typedef DDSTraits<MessageType>::MessageSequenceType OpenDDS::DCPS::RakeResults< MessageType >::SampleSeq
private

Definition at line 36 of file RakeResults_T.h.

Constructor & Destructor Documentation

◆ RakeResults() [1/2]

template<class MessageType >
OpenDDS::DCPS::RakeResults< MessageType >::RakeResults ( DataReaderImpl reader,
SampleSeq received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::PresentationQosPolicy  presentation,
DDS::QueryCondition_ptr  cond,
Operation_t  oper 
)

Definition at line 22 of file RakeResults_T.cpp.

References DDS::PresentationQosPolicy::access_scope, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::RakeResults< MessageType >::cond_, OpenDDS::DCPS::RakeResults< MessageType >::do_filter_, OpenDDS::DCPS::RakeResults< MessageType >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::getOrderBys(), OpenDDS::DCPS::QueryConditionImpl::hasFilter(), LM_ERROR, DDS::PresentationQosPolicy::ordered_access, OpenDDS::DCPS::RakeResults< MessageType >::sorted_, and DDS::TOPIC_PRESENTATION_QOS.

31  : reader_(reader)
32  , received_data_(received_data)
35 #ifndef OPENDDS_NO_QUERY_CONDITION
36  , cond_(cond)
37 #endif
38  , oper_(oper)
39  , do_sort_(false)
40  , do_filter_(false)
41 {
42 #ifndef OPENDDS_NO_QUERY_CONDITION
43 
44  if (cond_) {
45  const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
46  if (!qci) {
47  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RakeResults(): ")
48  ACE_TEXT("failed to obtain QueryConditionImpl\n")));
49  return;
50  }
51  do_filter_ = qci->hasFilter();
52  std::vector<String> order_bys = qci->getOrderBys();
53  do_sort_ = order_bys.size() > 0;
54 
55  if (do_sort_) {
57 
58  // Iterate in reverse over the comma-separated fields so that the
59  // top-level comparison is the leftmost. The others will be chained.
60  for (size_t i = order_bys.size(); i > 0; --i) {
61  const String& fieldspec = order_bys[i - 1];
62  //FUTURE: handle ASC / DESC as an extension to the DDS spec?
63  cmp = getMetaStruct<MessageType>().create_qc_comparator(fieldspec.c_str(), cmp);
64  }
65 
66  SortedSetCmp comparator(cmp);
67  SortedSet actual_sort(comparator);
68  sorted_.swap(actual_sort);
69  }
70 
71  } else {
72 #endif
73  // PRESENTATION ordered access (TOPIC)
74  do_sort_ = presentation.ordered_access && presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS;
75 #ifndef OPENDDS_NO_QUERY_CONDITION
76  }
77 
78 #endif
79 }
#define ACE_ERROR(X)
DDS::SampleInfoSeq & info_seq_
Definition: RakeResults_T.h:70
DataReaderImpl * reader_
Definition: RakeResults_T.h:68
RcHandle< ComparatorBase > Ptr
Definition: Comparator_T.h:30
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
ACE_TEXT("TCP_Factory")
std::string String
DDS::QueryCondition_ptr cond_
Definition: RakeResults_T.h:73
PresentationQosPolicyAccessScopeKind access_scope
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ RakeResults() [2/2]

template<class MessageType>
OpenDDS::DCPS::RakeResults< MessageType >::RakeResults ( const RakeResults< MessageType > &  )
private

Member Function Documentation

◆ copy_into()

template<class MessageType >
template<class FwdIter >
bool OpenDDS::DCPS::RakeResults< MessageType >::copy_into ( FwdIter  begin,
FwdIter  end,
MessageSequenceAdapterType received_data_p 
)
private

Definition at line 121 of file RakeResults_T.cpp.

References DDS::SampleInfo::absolute_generation_rank, OpenDDS::DCPS::InstanceState::accessed(), OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::ReceivedDataElement::dec_ref(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, DDS::SampleInfo::generation_rank, OpenDDS::DCPS::RakeResults< MessageType >::info_seq_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::ReceivedDataElementList::mark_read(), OpenDDS::DCPS::RakeResults< MessageType >::max_samples_, OpenDDS::DCPS::InstanceState::most_recent_generation(), OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, OpenDDS::DCPS::OPENDDS_MAP(), OpenDDS::DCPS::OPENDDS_SET(), OpenDDS::DCPS::RakeResults< MessageType >::oper_, OpenDDS::DCPS::ReceivedDataElementList::peek_tail(), OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, OpenDDS::DCPS::RakeResults< MessageType >::received_data_, OpenDDS::DCPS::ReceivedDataElement::registered_data_, OpenDDS::DCPS::ReceivedDataElementList::remove(), OpenDDS::DCPS::InstanceState::sample_info(), and DDS::SampleInfo::sample_rank.

Referenced by OpenDDS::DCPS::RakeResults< MessageType >::copy_to_user().

123 {
124  typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap;
125  InstanceMap inst_map;
126 
127  typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet;
128  InstanceSet released_instances;
129 
130  for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) {
131  // 1. Populate the Received Data sequence
132  ReceivedDataElement* rde = iter->rde_;
133  ReceivedDataElementList* rdel = iter->rdel_;
134 
135  if (received_data_.maximum() != 0) {
136  if (rde->registered_data_ == 0) {
137  received_data_p.assign_sample(idx, MessageType());
138 
139  } else {
140  received_data_p.assign_sample(idx,
141  *static_cast<MessageType*>(rde->registered_data_));
142  }
143 
144  } else {
145  received_data_p.assign_ptr(idx, rde);
146  }
147 
148  // 2. Per-sample SampleInfo (not the three *_rank variables) and state
149  SubscriptionInstance& inst = *iter->si_;
150  inst.instance_state_->sample_info(info_seq_[idx], rde);
151  rdel->mark_read(rde);
152 
153  // 3. Record some info about per-instance SampleInfo (*_rank) so that
154  // we can fill in the ranks after the loop has completed
155  std::pair<typename InstanceMap::iterator, bool> result =
156  inst_map.insert(std::make_pair(&inst, InstanceData()));
157  InstanceData& id = result.first->second;
158 
159  if (result.second) { // first time we've seen this Instance
160  const ReceivedDataElement& mrs = *inst.rcvd_samples_.peek_tail();
161  id.MRS_disposed_gc_ =
162  static_cast<CORBA::Long>(mrs.disposed_generation_count_);
163  id.MRS_nowriters_gc_ =
164  static_cast<CORBA::Long>(mrs.no_writers_generation_count_);
165  }
166 
167  if (iter->index_in_instance_ >= id.MRSIC_index_) {
168  id.MRSIC_index_ = iter->index_in_instance_;
169  id.MRSIC_disposed_gc_ =
170  static_cast<CORBA::Long>(rde->disposed_generation_count_);
171  id.MRSIC_nowriters_gc_ =
172  static_cast<CORBA::Long>(rde->no_writers_generation_count_);
173  }
174 
175  if (!id.most_recent_generation_) {
176  id.most_recent_generation_ =
177  inst.instance_state_->most_recent_generation(rde);
178  }
179 
180  id.sampleinfo_positions_.push_back(idx);
181 
182  // 4. Take
183  if (oper_ == DDS_OPERATION_TAKE) {
184  // If removing the sample releases it
185  if (inst.rcvd_samples_.remove(rde)) {
186  // Prevent access of the SampleInfo, below
187  released_instances.insert(&inst);
188  }
189  rde->dec_ref();
190  }
191  }
192 
193  // Fill in the *_ranks in the SampleInfo, and set instance state (mrg)
194  for (typename InstanceMap::iterator i_iter(inst_map.begin()),
195  i_end(inst_map.end()); i_iter != i_end; ++i_iter) {
196 
197  InstanceData& id = i_iter->second;
198  { // Danger, limit visibility of inst
199  SubscriptionInstance& inst = *i_iter->first;
200  // If this instance has not been released
201  if (released_instances.find(&inst) == released_instances.end()) {
202  if (id.most_recent_generation_) {
203  inst.instance_state_->accessed();
204  }
205  }
206  }
207 
208  CORBA::Long sample_rank =
209  static_cast<CORBA::Long>(id.sampleinfo_positions_.size());
210 
211  for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()),
212  s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) {
213  DDS::SampleInfo& si = info_seq_[*s_iter];
214  si.sample_rank = --sample_rank;
215  si.generation_rank = id.MRSIC_disposed_gc_
216  + id.MRSIC_nowriters_gc_ - si.generation_rank;
217  si.absolute_generation_rank = id.MRS_disposed_gc_ +
218  id.MRS_nowriters_gc_ - si.absolute_generation_rank;
219  }
220  }
221 
222  return true;
223 }
ACE_CDR::Long Long
long absolute_generation_rank
typedef OPENDDS_SET(NetworkAddress) AddrSet
DDS::SampleInfoSeq & info_seq_
Definition: RakeResults_T.h:70
ACE_CDR::ULong ULong
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.

◆ copy_to_user()

template<class MessageType >
bool OpenDDS::DCPS::RakeResults< MessageType >::copy_to_user ( )

Definition at line 226 of file RakeResults_T.cpp.

References OpenDDS::DCPS::RakeResults< MessageType >::copy_into(), OpenDDS::DCPS::RakeResults< MessageType >::do_sort_, OpenDDS::DCPS::RakeResults< MessageType >::info_seq_, OpenDDS::DCPS::RakeResults< MessageType >::max_samples_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OpenDDS::DCPS::RakeResults< MessageType >::received_data_, and OpenDDS::DCPS::RakeResults< MessageType >::sorted_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_i(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_instance_i(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::take_i(), and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::take_instance_i().

227 {
229 
230  if (do_sort_) {
231  size_t len = std::min(static_cast<size_t>(sorted_.size()),
232  static_cast<size_t>(max_samples_));
233  received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
234  info_seq_.length(static_cast<CORBA::ULong>(len));
235  return copy_into(sorted_.begin(), sorted_.end(), received_data_p);
236 
237  } else {
238  size_t len = unsorted_.size(); //can't be larger than max_samples_
239  received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
240  info_seq_.length(static_cast<CORBA::ULong>(len));
241  return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p);
242  }
243 }
bool copy_into(FwdIter begin, FwdIter end, MessageSequenceAdapterType &received_data_p)
DDS::SampleInfoSeq & info_seq_
Definition: RakeResults_T.h:70
DDSTraits< MessageType >::MessageSequenceAdapterType MessageSequenceAdapterType
Definition: RakeResults_T.h:37

◆ insert_sample()

template<class MessageType >
bool OpenDDS::DCPS::RakeResults< MessageType >::insert_sample ( ReceivedDataElement sample,
ReceivedDataElementList rdel,
SubscriptionInstance_rch  instance,
size_t  index_in_instance 
)

Returns false if the sample will definitely not be part of the resulting dataset, however if this returns true it still may be excluded (due to sorting and max_samples).

Definition at line 82 of file RakeResults_T.cpp.

References OpenDDS::DCPS::RakeResults< MessageType >::cond_, OpenDDS::DCPS::RakeResults< MessageType >::do_filter_, OpenDDS::DCPS::RakeResults< MessageType >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::filter(), OpenDDS::DCPS::RakeResults< MessageType >::max_samples_, OpenDDS::DCPS::ReceivedDataElement::registered_data_, OpenDDS::DCPS::RakeResults< MessageType >::sorted_, and OpenDDS::DCPS::ReceivedDataElement::valid_data_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_i(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::read_instance_i(), OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::take_i(), and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::take_instance_i().

86 {
87 #ifndef OPENDDS_NO_QUERY_CONDITION
88 
89  if (do_filter_) {
90  const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
91  const MessageType* typed_sample = static_cast<MessageType*>(sample->registered_data_);
92  if (!qci || !typed_sample || !qci->filter(*typed_sample, !sample->valid_data_)) {
93  return false;
94  }
95  }
96 
97 #endif
98 
99  if (do_sort_) {
100  // N.B. Until a better heuristic is found, non-valid
101  // samples are elided when sorting by QueryCondition.
102 #ifndef OPENDDS_NO_QUERY_CONDITION
103  if (cond_ && !sample->registered_data_) return false;
104 #endif
105 
106  RakeData rd = {sample, rdel, instance, index_in_instance};
107  sorted_.insert(rd);
108 
109  } else {
110  if (unsorted_.size() == max_samples_) return false;
111 
112  RakeData rd = {sample, rdel, instance, index_in_instance};
113  unsorted_.push_back(rd);
114  }
115 
116  return true;
117 }
DDS::QueryCondition_ptr cond_
Definition: RakeResults_T.h:73

◆ OPENDDS_MULTISET_CMP()

template<class MessageType>
typedef OpenDDS::DCPS::RakeResults< MessageType >::OPENDDS_MULTISET_CMP ( RakeData  ,
SortedSetCmp   
)
private

◆ OPENDDS_VECTOR() [1/2]

template<class MessageType>
OpenDDS::DCPS::RakeResults< MessageType >::OPENDDS_VECTOR ( RakeData  )
private

◆ OPENDDS_VECTOR() [2/2]

template<class MessageType>
typedef OpenDDS::DCPS::RakeResults< MessageType >::OPENDDS_VECTOR ( CORBA::ULong  )
private

◆ operator=()

template<class MessageType>
RakeResults& OpenDDS::DCPS::RakeResults< MessageType >::operator= ( const RakeResults< MessageType > &  )
private

Member Data Documentation

◆ cond_

template<class MessageType>
DDS::QueryCondition_ptr OpenDDS::DCPS::RakeResults< MessageType >::cond_
private

◆ do_filter_

template<class MessageType>
bool OpenDDS::DCPS::RakeResults< MessageType >::do_filter_
private

◆ do_sort_

template<class MessageType>
bool OpenDDS::DCPS::RakeResults< MessageType >::do_sort_
private

◆ info_seq_

template<class MessageType>
DDS::SampleInfoSeq& OpenDDS::DCPS::RakeResults< MessageType >::info_seq_
private

◆ max_samples_

template<class MessageType>
CORBA::ULong OpenDDS::DCPS::RakeResults< MessageType >::max_samples_
private

◆ oper_

template<class MessageType>
Operation_t OpenDDS::DCPS::RakeResults< MessageType >::oper_
private

◆ reader_

template<class MessageType>
DataReaderImpl* OpenDDS::DCPS::RakeResults< MessageType >::reader_
private

Definition at line 68 of file RakeResults_T.h.

◆ received_data_

template<class MessageType>
SampleSeq& OpenDDS::DCPS::RakeResults< MessageType >::received_data_
private

◆ sorted_

template<class MessageType>
SortedSet OpenDDS::DCPS::RakeResults< MessageType >::sorted_
private

The documentation for this class was generated from the following files: