RakeResults_T.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 #ifndef RAKERESULTS_T_CPP
00008 #define RAKERESULTS_T_CPP
00009 
00010 #include "dds/DCPS/RakeResults_T.h"
00011 #include "dds/DCPS/SubscriptionInstance.h"
00012 #include "dds/DCPS/DataReaderImpl.h"
00013 #include "dds/DCPS/QueryConditionImpl.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015 
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 namespace OpenDDS {
00019 namespace DCPS {
00020 
00021 template <class SampleSeq>
00022 RakeResults<SampleSeq>::RakeResults(DataReaderImpl* reader,
00023                                     SampleSeq& received_data,
00024                                     DDS::SampleInfoSeq& info_seq,
00025                                     CORBA::Long max_samples,
00026                                     DDS::PresentationQosPolicy presentation,
00027 #ifndef OPENDDS_NO_QUERY_CONDITION
00028                                     DDS::QueryCondition_ptr cond,
00029 #endif
00030                                     Operation_t oper)
00031   : reader_(reader)
00032   , received_data_(received_data)
00033   , info_seq_(info_seq)
00034   , max_samples_(max_samples)
00035 #ifndef OPENDDS_NO_QUERY_CONDITION
00036   , cond_(cond)
00037 #endif
00038   , oper_(oper)
00039   , do_sort_(false)
00040   , do_filter_(false)
00041 {
00042 #ifndef OPENDDS_NO_QUERY_CONDITION
00043 
00044   if (cond_) {
00045     const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00046     if (!qci) {
00047       ACE_ERROR((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: RakeResults(): ")
00048         ACE_TEXT("failed to obtain QueryConditionImpl\n")));
00049       return;
00050     }
00051     do_filter_ = qci->hasFilter();
00052     std::vector<OPENDDS_STRING> order_bys = qci->getOrderBys();
00053     do_sort_ = order_bys.size() > 0;
00054 
00055     if (do_sort_) {
00056       ComparatorBase::Ptr cmp;
00057 
00058       // Iterate in reverse over the comma-separated fields so that the
00059       // top-level comparison is the leftmost.  The others will be chained.
00060       for (size_t i = order_bys.size(); i > 0; --i) {
00061         const OPENDDS_STRING& fieldspec = order_bys[i - 1];
00062         //FUTURE: handle ASC / DESC as an extension to the DDS spec?
00063         cmp = getMetaStruct<typename SampleSeq::value_type>()
00064           .create_qc_comparator(fieldspec.c_str(), cmp);
00065       }
00066 
00067       SortedSetCmp comparator(cmp);
00068       SortedSet actual_sort(comparator);
00069       sorted_.swap(actual_sort);
00070     }
00071 
00072   } else {
00073 #endif
00074     // PRESENTATION ordered access (TOPIC)
00075     this->do_sort_ = presentation.ordered_access == true &&
00076                      presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS;
00077 #ifndef OPENDDS_NO_QUERY_CONDITION
00078   }
00079 
00080 #endif
00081 }
00082 
00083 template <class SampleSeq>
00084 bool RakeResults<SampleSeq>::insert_sample(ReceivedDataElement* sample,
00085                                            SubscriptionInstance_rch instance,
00086                                            size_t index_in_instance)
00087 {
00088 #ifndef OPENDDS_NO_QUERY_CONDITION
00089 
00090   if (do_filter_) {
00091     const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00092     typedef typename SampleSeq::value_type VT;
00093     const VT* typed_sample = static_cast<VT*>(sample->registered_data_);
00094     if (!qci || !typed_sample || !qci->filter(*typed_sample)) return false;
00095   }
00096 
00097 #endif
00098 
00099   if (do_sort_) {
00100     // N.B. Until a better heuristic is found, non-valid
00101     // samples are elided when sorting by QueryCondition.
00102 #ifndef OPENDDS_NO_QUERY_CONDITION
00103     if (cond_ && !sample->registered_data_) return false;
00104 #endif
00105 
00106     RakeData rd = {sample, instance, index_in_instance};
00107     sorted_.insert(rd);
00108 
00109   } else {
00110     if (unsorted_.size() == max_samples_) return false;
00111 
00112     RakeData rd = {sample, instance, index_in_instance};
00113     unsorted_.push_back(rd);
00114   }
00115 
00116   return true;
00117 }
00118 
00119 template <class SampleSeq>
00120 template <class FwdIter>
00121 bool RakeResults<SampleSeq>::copy_into(FwdIter iter, FwdIter end,
00122                                        typename SampleSeq::PrivateMemberAccess& received_data_p)
00123 {
00124   typedef typename SampleSeq::value_type Sample;
00125   typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap;
00126   InstanceMap inst_map;
00127 
00128   typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet;
00129   InstanceSet released_instances;
00130 
00131   for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) {
00132     // 1. Populate the Received Data sequence
00133     ReceivedDataElement* rde = iter->rde_;
00134 
00135     if (received_data_.maximum() != 0) {
00136       if (rde->registered_data_ == 0) {
00137         received_data_p.assign_sample(idx, Sample());
00138 
00139       } else {
00140         received_data_p.assign_sample(idx,
00141                                       *static_cast<Sample*>(rde->registered_data_));
00142       }
00143 
00144     } else {
00145       received_data_p.assign_ptr(idx, rde);
00146     }
00147 
00148     // 2. Per-sample SampleInfo (not the three *_rank variables) and state
00149     SubscriptionInstance& inst = *iter->si_;
00150     inst.instance_state_.sample_info(info_seq_[idx], rde);
00151     rde->sample_state_ = DDS::READ_SAMPLE_STATE;
00152 
00153     // 3. Record some info about per-instance SampleInfo (*_rank) so that
00154     //    we can fill in the ranks after the loop has completed
00155     std::pair<typename InstanceMap::iterator, bool> result =
00156       inst_map.insert(std::make_pair(&inst, InstanceData()));
00157     InstanceData& id = result.first->second;
00158 
00159     if (result.second) { // first time we've seen this Instance
00160       ReceivedDataElement& mrs = *inst.rcvd_samples_.tail_;
00161       id.MRS_disposed_gc_ =
00162         static_cast<CORBA::Long>(mrs.disposed_generation_count_);
00163       id.MRS_nowriters_gc_ =
00164         static_cast<CORBA::Long>(mrs.no_writers_generation_count_);
00165     }
00166 
00167     if (iter->index_in_instance_ >= id.MRSIC_index_) {
00168       id.MRSIC_index_ = iter->index_in_instance_;
00169       id.MRSIC_disposed_gc_ =
00170         static_cast<CORBA::Long>(rde->disposed_generation_count_);
00171       id.MRSIC_nowriters_gc_ =
00172         static_cast<CORBA::Long>(rde->no_writers_generation_count_);
00173     }
00174 
00175     if (!id.most_recent_generation_) {
00176       id.most_recent_generation_ =
00177         inst.instance_state_.most_recent_generation(rde);
00178     }
00179 
00180     id.sampleinfo_positions_.push_back(idx);
00181 
00182     // 4. Take
00183     if (oper_ == DDS_OPERATION_TAKE) {
00184       // If removing the sample releases it
00185       if (inst.rcvd_samples_.remove(rde)) {
00186         // Prevent access of the SampleInfo, below
00187         released_instances.insert(&inst);
00188       }
00189       rde->dec_ref();
00190     }
00191   }
00192 
00193   // Fill in the *_ranks in the SampleInfo, and set instance state (mrg)
00194   for (typename InstanceMap::iterator i_iter(inst_map.begin()),
00195        i_end(inst_map.end()); i_iter != i_end; ++i_iter) {
00196 
00197     InstanceData& id = i_iter->second;
00198     {  // Danger, limit visibility of inst
00199       SubscriptionInstance& inst = *i_iter->first;
00200       // If this instance has not been released
00201       if (released_instances.find(&inst) == released_instances.end()) {
00202         if (id.most_recent_generation_) {
00203           inst.instance_state_.accessed();
00204         }
00205       }
00206     }
00207 
00208     CORBA::Long sample_rank =
00209       static_cast<CORBA::Long>(id.sampleinfo_positions_.size());
00210 
00211     for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()),
00212          s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) {
00213       DDS::SampleInfo& si = info_seq_[*s_iter];
00214       si.sample_rank = --sample_rank;
00215       si.generation_rank = id.MRSIC_disposed_gc_
00216                            + id.MRSIC_nowriters_gc_ - si.generation_rank;
00217       si.absolute_generation_rank = id.MRS_disposed_gc_ +
00218                                     id.MRS_nowriters_gc_ - si.absolute_generation_rank;
00219     }
00220   }
00221 
00222   return true;
00223 }
00224 
00225 template <class SampleSeq>
00226 bool RakeResults<SampleSeq>::copy_to_user()
00227 {
00228   typename SampleSeq::PrivateMemberAccess received_data_p(received_data_);
00229 
00230   if (do_sort_) {
00231     size_t len = std::min(static_cast<size_t>(sorted_.size()),
00232                           static_cast<size_t>(max_samples_));
00233     received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00234     info_seq_.length(static_cast<CORBA::ULong>(len));
00235     return copy_into(sorted_.begin(), sorted_.end(), received_data_p);
00236 
00237   } else {
00238     size_t len = unsorted_.size(); //can't be larger than max_samples_
00239     received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00240     info_seq_.length(static_cast<CORBA::ULong>(len));
00241     return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p);
00242   }
00243 }
00244 
00245 } // namespace DCPS
00246 } // namespace OpenDDS
00247 
00248 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00249 
00250 #endif /* RAKERESULTS_H  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1