#include <RakeResults_T.h>
Classes | |
struct | InstanceData |
class | SortedSetCmp |
Public Member Functions | |
RakeResults (DataReaderImpl *reader, SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::PresentationQosPolicy presentation, DDS::QueryCondition_ptr cond, Operation_t oper) | |
bool | insert_sample (ReceivedDataElement *sample, SubscriptionInstance_rch i, size_t index_in_instance) |
bool | copy_to_user () |
Private Member Functions | |
template<class FwdIter > | |
bool | copy_into (FwdIter begin, FwdIter end, typename SampleSeq::PrivateMemberAccess &received_data_p) |
RakeResults (const RakeResults &) | |
RakeResults & | operator= (const RakeResults &) |
typedef | OPENDDS_MULTISET_CMP (RakeData, SortedSetCmp) SortedSet |
OPENDDS_VECTOR (RakeData) unsorted_ | |
typedef | OPENDDS_VECTOR (CORBA::ULong) IndexList |
Private Attributes | |
DataReaderImpl * | reader_ |
SampleSeq & | received_data_ |
DDS::SampleInfoSeq & | info_seq_ |
CORBA::ULong | max_samples_ |
DDS::QueryCondition_ptr | cond_ |
Operation_t | oper_ |
bool | do_sort_ |
bool | do_filter_ |
SortedSet | sorted_ |
Rake is an abbreviation for "read or take". This class manages the results from a read() or take() operation, which are the received_data and the info_seq sequences passed in by-reference from the user.
Definition at line 33 of file RakeResults_T.h.
OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults | ( | DataReaderImpl * | reader, | |
SampleSeq & | received_data, | |||
DDS::SampleInfoSeq & | info_seq, | |||
CORBA::Long | max_samples, | |||
DDS::PresentationQosPolicy | presentation, | |||
DDS::QueryCondition_ptr | cond, | |||
Operation_t | oper | |||
) | [inline] |
Definition at line 22 of file RakeResults_T.cpp.
References DDS::PresentationQosPolicy::access_scope, ACE_TEXT(), OpenDDS::DCPS::RakeResults< SampleSeq >::cond_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::getOrderBys(), OpenDDS::DCPS::QueryConditionImpl::hasFilter(), LM_DEBUG, OPENDDS_STRING, DDS::PresentationQosPolicy::ordered_access, OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_, and DDS::TOPIC_PRESENTATION_QOS.
00031 : reader_(reader) 00032 , received_data_(received_data) 00033 , info_seq_(info_seq) 00034 , max_samples_(max_samples) 00035 #ifndef OPENDDS_NO_QUERY_CONDITION 00036 , cond_(cond) 00037 #endif 00038 , oper_(oper) 00039 , do_sort_(false) 00040 , do_filter_(false) 00041 { 00042 #ifndef OPENDDS_NO_QUERY_CONDITION 00043 00044 if (cond_) { 00045 const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_); 00046 if (!qci) { 00047 ACE_ERROR((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: RakeResults(): ") 00048 ACE_TEXT("failed to obtain QueryConditionImpl\n"))); 00049 return; 00050 } 00051 do_filter_ = qci->hasFilter(); 00052 std::vector<OPENDDS_STRING> order_bys = qci->getOrderBys(); 00053 do_sort_ = order_bys.size() > 0; 00054 00055 if (do_sort_) { 00056 ComparatorBase::Ptr cmp; 00057 00058 // Iterate in reverse over the comma-separated fields so that the 00059 // top-level comparison is the leftmost. The others will be chained. 00060 for (size_t i = order_bys.size(); i > 0; --i) { 00061 const OPENDDS_STRING& fieldspec = order_bys[i - 1]; 00062 //FUTURE: handle ASC / DESC as an extension to the DDS spec? 00063 cmp = getMetaStruct<typename SampleSeq::value_type>() 00064 .create_qc_comparator(fieldspec.c_str(), cmp); 00065 } 00066 00067 SortedSetCmp comparator(cmp); 00068 SortedSet actual_sort(comparator); 00069 sorted_.swap(actual_sort); 00070 } 00071 00072 } else { 00073 #endif 00074 // PRESENTATION ordered access (TOPIC) 00075 this->do_sort_ = presentation.ordered_access == true && 00076 presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS; 00077 #ifndef OPENDDS_NO_QUERY_CONDITION 00078 } 00079 00080 #endif 00081 }
OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults | ( | const RakeResults< SampleSeq > & | ) | [private] |
bool OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into | ( | FwdIter | begin, | |
FwdIter | end, | |||
typename SampleSeq::PrivateMemberAccess & | received_data_p | |||
) | [inline, private] |
Definition at line 121 of file RakeResults_T.cpp.
References DDS::SampleInfo::absolute_generation_rank, OpenDDS::DCPS::InstanceState::accessed(), OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::ReceivedDataElement::dec_ref(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, DDS::SampleInfo::generation_rank, OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::InstanceState::most_recent_generation(), OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_SET, OpenDDS::DCPS::RakeResults< SampleSeq >::oper_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, DDS::READ_SAMPLE_STATE, OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_, OpenDDS::DCPS::ReceivedDataElement::registered_data_, OpenDDS::DCPS::ReceivedDataElementList::remove(), OpenDDS::DCPS::InstanceState::sample_info(), DDS::SampleInfo::sample_rank, OpenDDS::DCPS::ReceivedDataElement::sample_state_, and OpenDDS::DCPS::ReceivedDataElementList::tail_.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().
00123 { 00124 typedef typename SampleSeq::value_type Sample; 00125 typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap; 00126 InstanceMap inst_map; 00127 00128 typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet; 00129 InstanceSet released_instances; 00130 00131 for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) { 00132 // 1. Populate the Received Data sequence 00133 ReceivedDataElement* rde = iter->rde_; 00134 00135 if (received_data_.maximum() != 0) { 00136 if (rde->registered_data_ == 0) { 00137 received_data_p.assign_sample(idx, Sample()); 00138 00139 } else { 00140 received_data_p.assign_sample(idx, 00141 *static_cast<Sample*>(rde->registered_data_)); 00142 } 00143 00144 } else { 00145 received_data_p.assign_ptr(idx, rde); 00146 } 00147 00148 // 2. Per-sample SampleInfo (not the three *_rank variables) and state 00149 SubscriptionInstance& inst = *iter->si_; 00150 inst.instance_state_.sample_info(info_seq_[idx], rde); 00151 rde->sample_state_ = DDS::READ_SAMPLE_STATE; 00152 00153 // 3. Record some info about per-instance SampleInfo (*_rank) so that 00154 // we can fill in the ranks after the loop has completed 00155 std::pair<typename InstanceMap::iterator, bool> result = 00156 inst_map.insert(std::make_pair(&inst, InstanceData())); 00157 InstanceData& id = result.first->second; 00158 00159 if (result.second) { // first time we've seen this Instance 00160 ReceivedDataElement& mrs = *inst.rcvd_samples_.tail_; 00161 id.MRS_disposed_gc_ = 00162 static_cast<CORBA::Long>(mrs.disposed_generation_count_); 00163 id.MRS_nowriters_gc_ = 00164 static_cast<CORBA::Long>(mrs.no_writers_generation_count_); 00165 } 00166 00167 if (iter->index_in_instance_ >= id.MRSIC_index_) { 00168 id.MRSIC_index_ = iter->index_in_instance_; 00169 id.MRSIC_disposed_gc_ = 00170 static_cast<CORBA::Long>(rde->disposed_generation_count_); 00171 id.MRSIC_nowriters_gc_ = 00172 static_cast<CORBA::Long>(rde->no_writers_generation_count_); 00173 } 00174 00175 if (!id.most_recent_generation_) { 00176 id.most_recent_generation_ = 00177 inst.instance_state_.most_recent_generation(rde); 00178 } 00179 00180 id.sampleinfo_positions_.push_back(idx); 00181 00182 // 4. Take 00183 if (oper_ == DDS_OPERATION_TAKE) { 00184 // If removing the sample releases it 00185 if (inst.rcvd_samples_.remove(rde)) { 00186 // Prevent access of the SampleInfo, below 00187 released_instances.insert(&inst); 00188 } 00189 rde->dec_ref(); 00190 } 00191 } 00192 00193 // Fill in the *_ranks in the SampleInfo, and set instance state (mrg) 00194 for (typename InstanceMap::iterator i_iter(inst_map.begin()), 00195 i_end(inst_map.end()); i_iter != i_end; ++i_iter) { 00196 00197 InstanceData& id = i_iter->second; 00198 { // Danger, limit visibility of inst 00199 SubscriptionInstance& inst = *i_iter->first; 00200 // If this instance has not been released 00201 if (released_instances.find(&inst) == released_instances.end()) { 00202 if (id.most_recent_generation_) { 00203 inst.instance_state_.accessed(); 00204 } 00205 } 00206 } 00207 00208 CORBA::Long sample_rank = 00209 static_cast<CORBA::Long>(id.sampleinfo_positions_.size()); 00210 00211 for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()), 00212 s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) { 00213 DDS::SampleInfo& si = info_seq_[*s_iter]; 00214 si.sample_rank = --sample_rank; 00215 si.generation_rank = id.MRSIC_disposed_gc_ 00216 + id.MRSIC_nowriters_gc_ - si.generation_rank; 00217 si.absolute_generation_rank = id.MRS_disposed_gc_ + 00218 id.MRS_nowriters_gc_ - si.absolute_generation_rank; 00219 } 00220 } 00221 00222 return true; 00223 }
bool OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user | ( | ) | [inline] |
Definition at line 226 of file RakeResults_T.cpp.
References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_, len, OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_, and OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_.
Referenced by OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i(), and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i().
00227 { 00228 typename SampleSeq::PrivateMemberAccess received_data_p(received_data_); 00229 00230 if (do_sort_) { 00231 size_t len = std::min(static_cast<size_t>(sorted_.size()), 00232 static_cast<size_t>(max_samples_)); 00233 received_data_p.internal_set_length(static_cast<CORBA::ULong>(len)); 00234 info_seq_.length(static_cast<CORBA::ULong>(len)); 00235 return copy_into(sorted_.begin(), sorted_.end(), received_data_p); 00236 00237 } else { 00238 size_t len = unsorted_.size(); //can't be larger than max_samples_ 00239 received_data_p.internal_set_length(static_cast<CORBA::ULong>(len)); 00240 info_seq_.length(static_cast<CORBA::ULong>(len)); 00241 return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p); 00242 } 00243 }
bool OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample | ( | ReceivedDataElement * | sample, | |
SubscriptionInstance_rch | i, | |||
size_t | index_in_instance | |||
) | [inline] |
Returns false if the sample will definitely not be part of the resulting dataset, however if this returns true it still may be excluded (due to sorting and max_samples).
Definition at line 84 of file RakeResults_T.cpp.
References OpenDDS::DCPS::RakeResults< SampleSeq >::cond_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::filter(), OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::ReceivedDataElement::registered_data_, and OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_.
Referenced by OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i(), and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i().
00087 { 00088 #ifndef OPENDDS_NO_QUERY_CONDITION 00089 00090 if (do_filter_) { 00091 const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_); 00092 typedef typename SampleSeq::value_type VT; 00093 const VT* typed_sample = static_cast<VT*>(sample->registered_data_); 00094 if (!qci || !typed_sample || !qci->filter(*typed_sample)) return false; 00095 } 00096 00097 #endif 00098 00099 if (do_sort_) { 00100 // N.B. Until a better heuristic is found, non-valid 00101 // samples are elided when sorting by QueryCondition. 00102 #ifndef OPENDDS_NO_QUERY_CONDITION 00103 if (cond_ && !sample->registered_data_) return false; 00104 #endif 00105 00106 RakeData rd = {sample, instance, index_in_instance}; 00107 sorted_.insert(rd); 00108 00109 } else { 00110 if (unsorted_.size() == max_samples_) return false; 00111 00112 RakeData rd = {sample, instance, index_in_instance}; 00113 unsorted_.push_back(rd); 00114 } 00115 00116 return true; 00117 }
typedef OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_MULTISET_CMP | ( | RakeData | , | |
SortedSetCmp | ||||
) | [private] |
typedef OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_VECTOR | ( | CORBA::ULong | ) | [private] |
OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_VECTOR | ( | RakeData | ) | [private] |
RakeResults& OpenDDS::DCPS::RakeResults< SampleSeq >::operator= | ( | const RakeResults< SampleSeq > & | ) | [private] |
DDS::QueryCondition_ptr OpenDDS::DCPS::RakeResults< SampleSeq >::cond_ [private] |
Definition at line 66 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().
bool OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_ [private] |
Definition at line 90 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().
bool OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_ [private] |
Definition at line 90 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().
DDS::SampleInfoSeq& OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_ [private] |
Definition at line 63 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), and OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().
CORBA::ULong OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_ [private] |
Definition at line 64 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), and OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample().
Operation_t OpenDDS::DCPS::RakeResults< SampleSeq >::oper_ [private] |
Definition at line 68 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into().
DataReaderImpl* OpenDDS::DCPS::RakeResults< SampleSeq >::reader_ [private] |
Definition at line 61 of file RakeResults_T.h.
SampleSeq& OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_ [private] |
Definition at line 62 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), and OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().
SortedSet OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_ [private] |
Definition at line 94 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().