#include <RakeResults_T.h>
Collaboration diagram for OpenDDS::DCPS::RakeResults< SampleSeq >:
Public Member Functions | |
RakeResults (DataReaderImpl *reader, SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::PresentationQosPolicy presentation, DDS::QueryCondition_ptr cond, Operation_t oper) | |
bool | insert_sample (ReceivedDataElement *sample, SubscriptionInstance *i, size_t index_in_instance) |
bool | copy_to_user () |
Private Member Functions | |
template<class FwdIter> | |
bool | copy_into (FwdIter begin, FwdIter end, typename SampleSeq::PrivateMemberAccess &received_data_p) |
RakeResults (const RakeResults &) | |
RakeResults & | operator= (const RakeResults &) |
typedef | OPENDDS_MULTISET_CMP (RakeData, SortedSetCmp) SortedSet |
OPENDDS_VECTOR (RakeData) unsorted_ | |
typedef | OPENDDS_VECTOR (CORBA::ULong) IndexList |
Private Attributes | |
DataReaderImpl * | reader_ |
SampleSeq & | received_data_ |
DDS::SampleInfoSeq & | info_seq_ |
CORBA::ULong | max_samples_ |
DDS::QueryCondition_ptr | cond_ |
Operation_t | oper_ |
bool | do_sort_ |
bool | do_filter_ |
SortedSet | sorted_ |
Classes | |
struct | InstanceData |
class | SortedSetCmp |
Definition at line 31 of file RakeResults_T.h.
OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults | ( | DataReaderImpl * | reader, | |
SampleSeq & | received_data, | |||
DDS::SampleInfoSeq & | info_seq, | |||
CORBA::Long | max_samples, | |||
DDS::PresentationQosPolicy | presentation, | |||
DDS::QueryCondition_ptr | cond, | |||
Operation_t | oper | |||
) |
Definition at line 20 of file RakeResults_T.cpp.
References DDS::PresentationQosPolicy::access_scope, OpenDDS::DCPS::RakeResults< SampleSeq >::cond_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::getOrderBys(), OpenDDS::DCPS::QueryConditionImpl::hasFilter(), OPENDDS_STRING, DDS::PresentationQosPolicy::ordered_access, OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_, and DDS::TOPIC_PRESENTATION_QOS.
00029 : reader_(reader) 00030 , received_data_(received_data) 00031 , info_seq_(info_seq) 00032 , max_samples_(max_samples) 00033 #ifndef OPENDDS_NO_QUERY_CONDITION 00034 , cond_(cond) 00035 #endif 00036 , oper_(oper) 00037 , do_sort_(false) 00038 , do_filter_(false) 00039 { 00040 #ifndef OPENDDS_NO_QUERY_CONDITION 00041 00042 if (cond_) { 00043 const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_); 00044 do_filter_ = qci->hasFilter(); 00045 std::vector<OPENDDS_STRING> order_bys = qci->getOrderBys(); 00046 do_sort_ = order_bys.size() > 0; 00047 00048 if (do_sort_) { 00049 ComparatorBase::Ptr cmp = 0; 00050 00051 // Iterate in reverse over the comma-separated fields so that the 00052 // top-level comparison is the leftmost. The others will be chained. 00053 for (size_t i = order_bys.size(); i > 0; --i) { 00054 const OPENDDS_STRING& fieldspec = order_bys[i - 1]; 00055 //FUTURE: handle ASC / DESC as an extension to the DDS spec? 00056 cmp = getMetaStruct<typename SampleSeq::value_type>() 00057 .create_qc_comparator(fieldspec.c_str(), cmp); 00058 } 00059 00060 SortedSetCmp comparator(cmp); 00061 SortedSet actual_sort(comparator); 00062 sorted_.swap(actual_sort); 00063 } 00064 00065 } else { 00066 #endif 00067 // PRESENTATION ordered access (TOPIC) 00068 this->do_sort_ = presentation.ordered_access == true && 00069 presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS; 00070 #ifndef OPENDDS_NO_QUERY_CONDITION 00071 } 00072 00073 #endif 00074 }
OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults | ( | const RakeResults< SampleSeq > & | ) | [private] |
bool OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into | ( | FwdIter | begin, | |
FwdIter | end, | |||
typename SampleSeq::PrivateMemberAccess & | received_data_p | |||
) | [private] |
Definition at line 114 of file RakeResults_T.cpp.
References OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::DataReaderImpl::dec_ref_data_element(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_SET, OpenDDS::DCPS::RakeResults< SampleSeq >::oper_, DDS::READ_SAMPLE_STATE, OpenDDS::DCPS::RakeResults< SampleSeq >::reader_, OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_, and OpenDDS::DCPS::InstanceState::sample_info().
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().
00116 { 00117 typedef typename SampleSeq::value_type Sample; 00118 typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap; 00119 InstanceMap inst_map; 00120 00121 typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet; 00122 InstanceSet released_instances; 00123 00124 for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) { 00125 // 1. Populate the Received Data sequence 00126 ReceivedDataElement* rde = iter->rde_; 00127 00128 if (received_data_.maximum() != 0) { 00129 if (rde->registered_data_ == 0) { 00130 received_data_p.assign_sample(idx, Sample()); 00131 00132 } else { 00133 received_data_p.assign_sample(idx, 00134 *static_cast<Sample*>(rde->registered_data_)); 00135 } 00136 00137 } else { 00138 received_data_p.assign_ptr(idx, rde); 00139 } 00140 00141 // 2. Per-sample SampleInfo (not the three *_rank variables) and state 00142 SubscriptionInstance& inst = *iter->si_; 00143 inst.instance_state_.sample_info(info_seq_[idx], rde); 00144 rde->sample_state_ = DDS::READ_SAMPLE_STATE; 00145 00146 // 3. Record some info about per-instance SampleInfo (*_rank) so that 00147 // we can fill in the ranks after the loop has completed 00148 std::pair<typename InstanceMap::iterator, bool> result = 00149 inst_map.insert(std::make_pair(&inst, InstanceData())); 00150 InstanceData& id = result.first->second; 00151 00152 if (result.second) { // first time we've seen this Instance 00153 ReceivedDataElement& mrs = *inst.rcvd_samples_.tail_; 00154 id.MRS_disposed_gc_ = 00155 static_cast<CORBA::Long>(mrs.disposed_generation_count_); 00156 id.MRS_nowriters_gc_ = 00157 static_cast<CORBA::Long>(mrs.no_writers_generation_count_); 00158 } 00159 00160 if (iter->index_in_instance_ >= id.MRSIC_index_) { 00161 id.MRSIC_index_ = iter->index_in_instance_; 00162 id.MRSIC_disposed_gc_ = 00163 static_cast<CORBA::Long>(rde->disposed_generation_count_); 00164 id.MRSIC_nowriters_gc_ = 00165 static_cast<CORBA::Long>(rde->no_writers_generation_count_); 00166 } 00167 00168 if (!id.most_recent_generation_) { 00169 id.most_recent_generation_ = 00170 inst.instance_state_.most_recent_generation(rde); 00171 } 00172 00173 id.sampleinfo_positions_.push_back(idx); 00174 00175 // 4. Take 00176 if (oper_ == DDS_OPERATION_TAKE) { 00177 // If removing the sample releases it 00178 if (inst.rcvd_samples_.remove(rde)) { 00179 // Prevent access of the SampleInfo, below 00180 released_instances.insert(&inst); 00181 } 00182 this->reader_->dec_ref_data_element(rde); 00183 } 00184 } 00185 00186 // Fill in the *_ranks in the SampleInfo, and set instance state (mrg) 00187 for (typename InstanceMap::iterator i_iter(inst_map.begin()), 00188 i_end(inst_map.end()); i_iter != i_end; ++i_iter) { 00189 00190 InstanceData& id = i_iter->second; 00191 { // Danger, limit visibility of inst 00192 SubscriptionInstance& inst = *i_iter->first; 00193 // If this instance has not been released 00194 if (released_instances.find(&inst) == released_instances.end()) { 00195 if (id.most_recent_generation_) { 00196 inst.instance_state_.accessed(); 00197 } 00198 } 00199 } 00200 00201 CORBA::Long sample_rank = 00202 static_cast<CORBA::Long>(id.sampleinfo_positions_.size()); 00203 00204 for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()), 00205 s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) { 00206 DDS::SampleInfo& si = info_seq_[*s_iter]; 00207 si.sample_rank = --sample_rank; 00208 si.generation_rank = id.MRSIC_disposed_gc_ 00209 + id.MRSIC_nowriters_gc_ - si.generation_rank; 00210 si.absolute_generation_rank = id.MRS_disposed_gc_ + 00211 id.MRS_nowriters_gc_ - si.absolute_generation_rank; 00212 } 00213 } 00214 00215 return true; 00216 }
bool OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user | ( | ) |
Definition at line 219 of file RakeResults_T.cpp.
References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_, OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_, and OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_.
00220 { 00221 typename SampleSeq::PrivateMemberAccess received_data_p(received_data_); 00222 00223 if (do_sort_) { 00224 size_t len = std::min(static_cast<size_t>(sorted_.size()), 00225 static_cast<size_t>(max_samples_)); 00226 received_data_p.internal_set_length(static_cast<CORBA::ULong>(len)); 00227 info_seq_.length(static_cast<CORBA::ULong>(len)); 00228 return copy_into(sorted_.begin(), sorted_.end(), received_data_p); 00229 00230 } else { 00231 size_t len = unsorted_.size(); //can't be larger than max_samples_ 00232 received_data_p.internal_set_length(static_cast<CORBA::ULong>(len)); 00233 info_seq_.length(static_cast<CORBA::ULong>(len)); 00234 return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p); 00235 } 00236 }
bool OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample | ( | ReceivedDataElement * | sample, | |
SubscriptionInstance * | i, | |||
size_t | index_in_instance | |||
) |
Returns false if the sample will definitely not be part of the resulting dataset, however if this returns true it still may be excluded (due to sorting and max_samples).
Definition at line 77 of file RakeResults_T.cpp.
References OpenDDS::DCPS::RakeResults< SampleSeq >::cond_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::filter(), OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::ReceivedDataElement::registered_data_, and OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_.
00080 { 00081 #ifndef OPENDDS_NO_QUERY_CONDITION 00082 00083 if (do_filter_) { 00084 const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_); 00085 typedef typename SampleSeq::value_type VT; 00086 const VT* typed_sample = static_cast<VT*>(sample->registered_data_); 00087 if (!qci->filter(*typed_sample)) return false; 00088 } 00089 00090 #endif 00091 00092 if (do_sort_) { 00093 // N.B. Until a better heuristic is found, non-valid 00094 // samples are elided when sorting by QueryCondition. 00095 #ifndef OPENDDS_NO_QUERY_CONDITION 00096 if (cond_ && !sample->registered_data_) return false; 00097 #endif 00098 00099 RakeData rd = {sample, instance, index_in_instance}; 00100 sorted_.insert(rd); 00101 00102 } else { 00103 if (unsorted_.size() == max_samples_) return false; 00104 00105 RakeData rd = {sample, instance, index_in_instance}; 00106 unsorted_.push_back(rd); 00107 } 00108 00109 return true; 00110 }
typedef OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_MULTISET_CMP | ( | RakeData | , | |
SortedSetCmp | ||||
) | [private] |
typedef OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_VECTOR | ( | CORBA::ULong | ) | [private] |
OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_VECTOR | ( | RakeData | ) | [private] |
RakeResults& OpenDDS::DCPS::RakeResults< SampleSeq >::operator= | ( | const RakeResults< SampleSeq > & | ) | [private] |
DDS::QueryCondition_ptr OpenDDS::DCPS::RakeResults< SampleSeq >::cond_ [private] |
Definition at line 64 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().
bool OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_ [private] |
Definition at line 87 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().
bool OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_ [private] |
Definition at line 87 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().
DDS::SampleInfoSeq& OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_ [private] |
Definition at line 61 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), and OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().
CORBA::ULong OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_ [private] |
Definition at line 62 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), and OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample().
Operation_t OpenDDS::DCPS::RakeResults< SampleSeq >::oper_ [private] |
Definition at line 66 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into().
DataReaderImpl* OpenDDS::DCPS::RakeResults< SampleSeq >::reader_ [private] |
Definition at line 59 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into().
SampleSeq& OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_ [private] |
Definition at line 60 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), and OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().
SortedSet OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_ [private] |
Definition at line 91 of file RakeResults_T.h.
Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().