RakeResults_T.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007 #ifndef RAKERESULTS_T_CPP
00008 #define RAKERESULTS_T_CPP
00009
00010 #include "dds/DCPS/RakeResults_T.h"
00011 #include "dds/DCPS/SubscriptionInstance.h"
00012 #include "dds/DCPS/DataReaderImpl.h"
00013 #include "dds/DCPS/QueryConditionImpl.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 template <class SampleSeq>
00022 RakeResults<SampleSeq>::RakeResults(DataReaderImpl* reader,
00023 SampleSeq& received_data,
00024 DDS::SampleInfoSeq& info_seq,
00025 CORBA::Long max_samples,
00026 DDS::PresentationQosPolicy presentation,
00027 #ifndef OPENDDS_NO_QUERY_CONDITION
00028 DDS::QueryCondition_ptr cond,
00029 #endif
00030 Operation_t oper)
00031 : reader_(reader)
00032 , received_data_(received_data)
00033 , info_seq_(info_seq)
00034 , max_samples_(max_samples)
00035 #ifndef OPENDDS_NO_QUERY_CONDITION
00036 , cond_(cond)
00037 #endif
00038 , oper_(oper)
00039 , do_sort_(false)
00040 , do_filter_(false)
00041 {
00042 #ifndef OPENDDS_NO_QUERY_CONDITION
00043
00044 if (cond_) {
00045 const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00046 if (!qci) {
00047 ACE_ERROR((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: RakeResults(): ")
00048 ACE_TEXT("failed to obtain QueryConditionImpl\n")));
00049 return;
00050 }
00051 do_filter_ = qci->hasFilter();
00052 std::vector<OPENDDS_STRING> order_bys = qci->getOrderBys();
00053 do_sort_ = order_bys.size() > 0;
00054
00055 if (do_sort_) {
00056 ComparatorBase::Ptr cmp;
00057
00058
00059
00060 for (size_t i = order_bys.size(); i > 0; --i) {
00061 const OPENDDS_STRING& fieldspec = order_bys[i - 1];
00062
00063 cmp = getMetaStruct<typename SampleSeq::value_type>()
00064 .create_qc_comparator(fieldspec.c_str(), cmp);
00065 }
00066
00067 SortedSetCmp comparator(cmp);
00068 SortedSet actual_sort(comparator);
00069 sorted_.swap(actual_sort);
00070 }
00071
00072 } else {
00073 #endif
00074
00075 this->do_sort_ = presentation.ordered_access == true &&
00076 presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS;
00077 #ifndef OPENDDS_NO_QUERY_CONDITION
00078 }
00079
00080 #endif
00081 }
00082
00083 template <class SampleSeq>
00084 bool RakeResults<SampleSeq>::insert_sample(ReceivedDataElement* sample,
00085 SubscriptionInstance_rch instance,
00086 size_t index_in_instance)
00087 {
00088 #ifndef OPENDDS_NO_QUERY_CONDITION
00089
00090 if (do_filter_) {
00091 const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00092 typedef typename SampleSeq::value_type VT;
00093 const VT* typed_sample = static_cast<VT*>(sample->registered_data_);
00094 if (!qci || !typed_sample || !qci->filter(*typed_sample)) return false;
00095 }
00096
00097 #endif
00098
00099 if (do_sort_) {
00100
00101
00102 #ifndef OPENDDS_NO_QUERY_CONDITION
00103 if (cond_ && !sample->registered_data_) return false;
00104 #endif
00105
00106 RakeData rd = {sample, instance, index_in_instance};
00107 sorted_.insert(rd);
00108
00109 } else {
00110 if (unsorted_.size() == max_samples_) return false;
00111
00112 RakeData rd = {sample, instance, index_in_instance};
00113 unsorted_.push_back(rd);
00114 }
00115
00116 return true;
00117 }
00118
00119 template <class SampleSeq>
00120 template <class FwdIter>
00121 bool RakeResults<SampleSeq>::copy_into(FwdIter iter, FwdIter end,
00122 typename SampleSeq::PrivateMemberAccess& received_data_p)
00123 {
00124 typedef typename SampleSeq::value_type Sample;
00125 typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap;
00126 InstanceMap inst_map;
00127
00128 typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet;
00129 InstanceSet released_instances;
00130
00131 for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) {
00132
00133 ReceivedDataElement* rde = iter->rde_;
00134
00135 if (received_data_.maximum() != 0) {
00136 if (rde->registered_data_ == 0) {
00137 received_data_p.assign_sample(idx, Sample());
00138
00139 } else {
00140 received_data_p.assign_sample(idx,
00141 *static_cast<Sample*>(rde->registered_data_));
00142 }
00143
00144 } else {
00145 received_data_p.assign_ptr(idx, rde);
00146 }
00147
00148
00149 SubscriptionInstance& inst = *iter->si_;
00150 inst.instance_state_.sample_info(info_seq_[idx], rde);
00151 rde->sample_state_ = DDS::READ_SAMPLE_STATE;
00152
00153
00154
00155 std::pair<typename InstanceMap::iterator, bool> result =
00156 inst_map.insert(std::make_pair(&inst, InstanceData()));
00157 InstanceData& id = result.first->second;
00158
00159 if (result.second) {
00160 ReceivedDataElement& mrs = *inst.rcvd_samples_.tail_;
00161 id.MRS_disposed_gc_ =
00162 static_cast<CORBA::Long>(mrs.disposed_generation_count_);
00163 id.MRS_nowriters_gc_ =
00164 static_cast<CORBA::Long>(mrs.no_writers_generation_count_);
00165 }
00166
00167 if (iter->index_in_instance_ >= id.MRSIC_index_) {
00168 id.MRSIC_index_ = iter->index_in_instance_;
00169 id.MRSIC_disposed_gc_ =
00170 static_cast<CORBA::Long>(rde->disposed_generation_count_);
00171 id.MRSIC_nowriters_gc_ =
00172 static_cast<CORBA::Long>(rde->no_writers_generation_count_);
00173 }
00174
00175 if (!id.most_recent_generation_) {
00176 id.most_recent_generation_ =
00177 inst.instance_state_.most_recent_generation(rde);
00178 }
00179
00180 id.sampleinfo_positions_.push_back(idx);
00181
00182
00183 if (oper_ == DDS_OPERATION_TAKE) {
00184
00185 if (inst.rcvd_samples_.remove(rde)) {
00186
00187 released_instances.insert(&inst);
00188 }
00189 rde->dec_ref();
00190 }
00191 }
00192
00193
00194 for (typename InstanceMap::iterator i_iter(inst_map.begin()),
00195 i_end(inst_map.end()); i_iter != i_end; ++i_iter) {
00196
00197 InstanceData& id = i_iter->second;
00198 {
00199 SubscriptionInstance& inst = *i_iter->first;
00200
00201 if (released_instances.find(&inst) == released_instances.end()) {
00202 if (id.most_recent_generation_) {
00203 inst.instance_state_.accessed();
00204 }
00205 }
00206 }
00207
00208 CORBA::Long sample_rank =
00209 static_cast<CORBA::Long>(id.sampleinfo_positions_.size());
00210
00211 for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()),
00212 s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) {
00213 DDS::SampleInfo& si = info_seq_[*s_iter];
00214 si.sample_rank = --sample_rank;
00215 si.generation_rank = id.MRSIC_disposed_gc_
00216 + id.MRSIC_nowriters_gc_ - si.generation_rank;
00217 si.absolute_generation_rank = id.MRS_disposed_gc_ +
00218 id.MRS_nowriters_gc_ - si.absolute_generation_rank;
00219 }
00220 }
00221
00222 return true;
00223 }
00224
00225 template <class SampleSeq>
00226 bool RakeResults<SampleSeq>::copy_to_user()
00227 {
00228 typename SampleSeq::PrivateMemberAccess received_data_p(received_data_);
00229
00230 if (do_sort_) {
00231 size_t len = std::min(static_cast<size_t>(sorted_.size()),
00232 static_cast<size_t>(max_samples_));
00233 received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00234 info_seq_.length(static_cast<CORBA::ULong>(len));
00235 return copy_into(sorted_.begin(), sorted_.end(), received_data_p);
00236
00237 } else {
00238 size_t len = unsorted_.size();
00239 received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00240 info_seq_.length(static_cast<CORBA::ULong>(len));
00241 return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p);
00242 }
00243 }
00244
00245 }
00246 }
00247
00248 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00249
00250 #endif