00001
00002
00003
00004
00005
00006
00007 #ifndef RAKERESULTS_T_CPP
00008 #define RAKERESULTS_T_CPP
00009
00010 #include "dds/DCPS/RakeResults_T.h"
00011 #include "dds/DCPS/SubscriptionInstance.h"
00012 #include "dds/DCPS/DataReaderImpl.h"
00013 #include "dds/DCPS/QueryConditionImpl.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015
00016 namespace OpenDDS {
00017 namespace DCPS {
00018
00019 template <class SampleSeq>
00020 RakeResults<SampleSeq>::RakeResults(DataReaderImpl* reader,
00021 SampleSeq& received_data,
00022 DDS::SampleInfoSeq& info_seq,
00023 CORBA::Long max_samples,
00024 DDS::PresentationQosPolicy presentation,
00025 #ifndef OPENDDS_NO_QUERY_CONDITION
00026 DDS::QueryCondition_ptr cond,
00027 #endif
00028 Operation_t oper)
00029 : reader_(reader)
00030 , received_data_(received_data)
00031 , info_seq_(info_seq)
00032 , max_samples_(max_samples)
00033 #ifndef OPENDDS_NO_QUERY_CONDITION
00034 , cond_(cond)
00035 #endif
00036 , oper_(oper)
00037 , do_sort_(false)
00038 , do_filter_(false)
00039 {
00040 #ifndef OPENDDS_NO_QUERY_CONDITION
00041
00042 if (cond_) {
00043 const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00044 do_filter_ = qci->hasFilter();
00045 std::vector<OPENDDS_STRING> order_bys = qci->getOrderBys();
00046 do_sort_ = order_bys.size() > 0;
00047
00048 if (do_sort_) {
00049 ComparatorBase::Ptr cmp = 0;
00050
00051
00052
00053 for (size_t i = order_bys.size(); i > 0; --i) {
00054 const OPENDDS_STRING& fieldspec = order_bys[i - 1];
00055
00056 cmp = getMetaStruct<typename SampleSeq::value_type>()
00057 .create_qc_comparator(fieldspec.c_str(), cmp);
00058 }
00059
00060 SortedSetCmp comparator(cmp);
00061 SortedSet actual_sort(comparator);
00062 sorted_.swap(actual_sort);
00063 }
00064
00065 } else {
00066 #endif
00067
00068 this->do_sort_ = presentation.ordered_access == true &&
00069 presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS;
00070 #ifndef OPENDDS_NO_QUERY_CONDITION
00071 }
00072
00073 #endif
00074 }
00075
00076 template <class SampleSeq>
00077 bool RakeResults<SampleSeq>::insert_sample(ReceivedDataElement* sample,
00078 SubscriptionInstance* instance,
00079 size_t index_in_instance)
00080 {
00081 #ifndef OPENDDS_NO_QUERY_CONDITION
00082
00083 if (do_filter_) {
00084 const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00085 typedef typename SampleSeq::value_type VT;
00086 const VT* typed_sample = static_cast<VT*>(sample->registered_data_);
00087 if (!qci->filter(*typed_sample)) return false;
00088 }
00089
00090 #endif
00091
00092 if (do_sort_) {
00093
00094
00095 #ifndef OPENDDS_NO_QUERY_CONDITION
00096 if (cond_ && !sample->registered_data_) return false;
00097 #endif
00098
00099 RakeData rd = {sample, instance, index_in_instance};
00100 sorted_.insert(rd);
00101
00102 } else {
00103 if (unsorted_.size() == max_samples_) return false;
00104
00105 RakeData rd = {sample, instance, index_in_instance};
00106 unsorted_.push_back(rd);
00107 }
00108
00109 return true;
00110 }
00111
00112 template <class SampleSeq>
00113 template <class FwdIter>
00114 bool RakeResults<SampleSeq>::copy_into(FwdIter iter, FwdIter end,
00115 typename SampleSeq::PrivateMemberAccess& received_data_p)
00116 {
00117 typedef typename SampleSeq::value_type Sample;
00118 typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap;
00119 InstanceMap inst_map;
00120
00121 typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet;
00122 InstanceSet released_instances;
00123
00124 for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) {
00125
00126 ReceivedDataElement* rde = iter->rde_;
00127
00128 if (received_data_.maximum() != 0) {
00129 if (rde->registered_data_ == 0) {
00130 received_data_p.assign_sample(idx, Sample());
00131
00132 } else {
00133 received_data_p.assign_sample(idx,
00134 *static_cast<Sample*>(rde->registered_data_));
00135 }
00136
00137 } else {
00138 received_data_p.assign_ptr(idx, rde);
00139 }
00140
00141
00142 SubscriptionInstance& inst = *iter->si_;
00143 inst.instance_state_.sample_info(info_seq_[idx], rde);
00144 rde->sample_state_ = DDS::READ_SAMPLE_STATE;
00145
00146
00147
00148 std::pair<typename InstanceMap::iterator, bool> result =
00149 inst_map.insert(std::make_pair(&inst, InstanceData()));
00150 InstanceData& id = result.first->second;
00151
00152 if (result.second) {
00153 ReceivedDataElement& mrs = *inst.rcvd_samples_.tail_;
00154 id.MRS_disposed_gc_ =
00155 static_cast<CORBA::Long>(mrs.disposed_generation_count_);
00156 id.MRS_nowriters_gc_ =
00157 static_cast<CORBA::Long>(mrs.no_writers_generation_count_);
00158 }
00159
00160 if (iter->index_in_instance_ >= id.MRSIC_index_) {
00161 id.MRSIC_index_ = iter->index_in_instance_;
00162 id.MRSIC_disposed_gc_ =
00163 static_cast<CORBA::Long>(rde->disposed_generation_count_);
00164 id.MRSIC_nowriters_gc_ =
00165 static_cast<CORBA::Long>(rde->no_writers_generation_count_);
00166 }
00167
00168 if (!id.most_recent_generation_) {
00169 id.most_recent_generation_ =
00170 inst.instance_state_.most_recent_generation(rde);
00171 }
00172
00173 id.sampleinfo_positions_.push_back(idx);
00174
00175
00176 if (oper_ == DDS_OPERATION_TAKE) {
00177
00178 if (inst.rcvd_samples_.remove(rde)) {
00179
00180 released_instances.insert(&inst);
00181 }
00182 this->reader_->dec_ref_data_element(rde);
00183 }
00184 }
00185
00186
00187 for (typename InstanceMap::iterator i_iter(inst_map.begin()),
00188 i_end(inst_map.end()); i_iter != i_end; ++i_iter) {
00189
00190 InstanceData& id = i_iter->second;
00191 {
00192 SubscriptionInstance& inst = *i_iter->first;
00193
00194 if (released_instances.find(&inst) == released_instances.end()) {
00195 if (id.most_recent_generation_) {
00196 inst.instance_state_.accessed();
00197 }
00198 }
00199 }
00200
00201 CORBA::Long sample_rank =
00202 static_cast<CORBA::Long>(id.sampleinfo_positions_.size());
00203
00204 for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()),
00205 s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) {
00206 DDS::SampleInfo& si = info_seq_[*s_iter];
00207 si.sample_rank = --sample_rank;
00208 si.generation_rank = id.MRSIC_disposed_gc_
00209 + id.MRSIC_nowriters_gc_ - si.generation_rank;
00210 si.absolute_generation_rank = id.MRS_disposed_gc_ +
00211 id.MRS_nowriters_gc_ - si.absolute_generation_rank;
00212 }
00213 }
00214
00215 return true;
00216 }
00217
00218 template <class SampleSeq>
00219 bool RakeResults<SampleSeq>::copy_to_user()
00220 {
00221 typename SampleSeq::PrivateMemberAccess received_data_p(received_data_);
00222
00223 if (do_sort_) {
00224 size_t len = std::min(static_cast<size_t>(sorted_.size()),
00225 static_cast<size_t>(max_samples_));
00226 received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00227 info_seq_.length(static_cast<CORBA::ULong>(len));
00228 return copy_into(sorted_.begin(), sorted_.end(), received_data_p);
00229
00230 } else {
00231 size_t len = unsorted_.size();
00232 received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00233 info_seq_.length(static_cast<CORBA::ULong>(len));
00234 return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p);
00235 }
00236 }
00237
00238 }
00239 }
00240
00241 #endif