OpenDDS::DCPS::RakeResults< SampleSeq > Class Template Reference

#include <RakeResults_T.h>

Collaboration diagram for OpenDDS::DCPS::RakeResults< SampleSeq >:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 RakeResults (DataReaderImpl *reader, SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::PresentationQosPolicy presentation, DDS::QueryCondition_ptr cond, Operation_t oper)
bool insert_sample (ReceivedDataElement *sample, SubscriptionInstance *i, size_t index_in_instance)
bool copy_to_user ()

Private Member Functions

template<class FwdIter>
bool copy_into (FwdIter begin, FwdIter end, typename SampleSeq::PrivateMemberAccess &received_data_p)
 RakeResults (const RakeResults &)
RakeResultsoperator= (const RakeResults &)
typedef OPENDDS_MULTISET_CMP (RakeData, SortedSetCmp) SortedSet
 OPENDDS_VECTOR (RakeData) unsorted_
typedef OPENDDS_VECTOR (CORBA::ULong) IndexList

Private Attributes

DataReaderImplreader_
SampleSeq & received_data_
DDS::SampleInfoSeqinfo_seq_
CORBA::ULong max_samples_
DDS::QueryCondition_ptr cond_
Operation_t oper_
bool do_sort_
bool do_filter_
SortedSet sorted_

Classes

struct  InstanceData
class  SortedSetCmp

Detailed Description

template<class SampleSeq>
class OpenDDS::DCPS::RakeResults< SampleSeq >

Rake is an abbreviation for "read or take". This class manages the results from a read() or take() operation, which are the received_data and the info_seq sequences passed in by-reference from the user.

Definition at line 31 of file RakeResults_T.h.


Constructor & Destructor Documentation

template<class SampleSeq>
OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults ( DataReaderImpl reader,
SampleSeq &  received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::PresentationQosPolicy  presentation,
DDS::QueryCondition_ptr  cond,
Operation_t  oper 
)

Definition at line 20 of file RakeResults_T.cpp.

References DDS::PresentationQosPolicy::access_scope, OpenDDS::DCPS::RakeResults< SampleSeq >::cond_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::getOrderBys(), OpenDDS::DCPS::QueryConditionImpl::hasFilter(), OPENDDS_STRING, DDS::PresentationQosPolicy::ordered_access, OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_, and DDS::TOPIC_PRESENTATION_QOS.

00029   : reader_(reader)
00030   , received_data_(received_data)
00031   , info_seq_(info_seq)
00032   , max_samples_(max_samples)
00033 #ifndef OPENDDS_NO_QUERY_CONDITION
00034   , cond_(cond)
00035 #endif
00036   , oper_(oper)
00037   , do_sort_(false)
00038   , do_filter_(false)
00039 {
00040 #ifndef OPENDDS_NO_QUERY_CONDITION
00041 
00042   if (cond_) {
00043     const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00044     do_filter_ = qci->hasFilter();
00045     std::vector<OPENDDS_STRING> order_bys = qci->getOrderBys();
00046     do_sort_ = order_bys.size() > 0;
00047 
00048     if (do_sort_) {
00049       ComparatorBase::Ptr cmp = 0;
00050 
00051       // Iterate in reverse over the comma-separated fields so that the
00052       // top-level comparison is the leftmost.  The others will be chained.
00053       for (size_t i = order_bys.size(); i > 0; --i) {
00054         const OPENDDS_STRING& fieldspec = order_bys[i - 1];
00055         //FUTURE: handle ASC / DESC as an extension to the DDS spec?
00056         cmp = getMetaStruct<typename SampleSeq::value_type>()
00057           .create_qc_comparator(fieldspec.c_str(), cmp);
00058       }
00059 
00060       SortedSetCmp comparator(cmp);
00061       SortedSet actual_sort(comparator);
00062       sorted_.swap(actual_sort);
00063     }
00064 
00065   } else {
00066 #endif
00067     // PRESENTATION ordered access (TOPIC)
00068     this->do_sort_ = presentation.ordered_access == true &&
00069                      presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS;
00070 #ifndef OPENDDS_NO_QUERY_CONDITION
00071   }
00072 
00073 #endif
00074 }

template<class SampleSeq>
OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults ( const RakeResults< SampleSeq > &   )  [private]


Member Function Documentation

template<class SampleSeq>
template<class FwdIter>
bool OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into ( FwdIter  begin,
FwdIter  end,
typename SampleSeq::PrivateMemberAccess &  received_data_p 
) [private]

Definition at line 114 of file RakeResults_T.cpp.

References OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::DataReaderImpl::dec_ref_data_element(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_SET, OpenDDS::DCPS::RakeResults< SampleSeq >::oper_, DDS::READ_SAMPLE_STATE, OpenDDS::DCPS::RakeResults< SampleSeq >::reader_, OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_, and OpenDDS::DCPS::InstanceState::sample_info().

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().

00116 {
00117   typedef typename SampleSeq::value_type Sample;
00118   typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap;
00119   InstanceMap inst_map;
00120 
00121   typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet;
00122   InstanceSet released_instances;
00123 
00124   for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) {
00125     // 1. Populate the Received Data sequence
00126     ReceivedDataElement* rde = iter->rde_;
00127 
00128     if (received_data_.maximum() != 0) {
00129       if (rde->registered_data_ == 0) {
00130         received_data_p.assign_sample(idx, Sample());
00131 
00132       } else {
00133         received_data_p.assign_sample(idx,
00134                                       *static_cast<Sample*>(rde->registered_data_));
00135       }
00136 
00137     } else {
00138       received_data_p.assign_ptr(idx, rde);
00139     }
00140 
00141     // 2. Per-sample SampleInfo (not the three *_rank variables) and state
00142     SubscriptionInstance& inst = *iter->si_;
00143     inst.instance_state_.sample_info(info_seq_[idx], rde);
00144     rde->sample_state_ = DDS::READ_SAMPLE_STATE;
00145 
00146     // 3. Record some info about per-instance SampleInfo (*_rank) so that
00147     //    we can fill in the ranks after the loop has completed
00148     std::pair<typename InstanceMap::iterator, bool> result =
00149       inst_map.insert(std::make_pair(&inst, InstanceData()));
00150     InstanceData& id = result.first->second;
00151 
00152     if (result.second) { // first time we've seen this Instance
00153       ReceivedDataElement& mrs = *inst.rcvd_samples_.tail_;
00154       id.MRS_disposed_gc_ =
00155         static_cast<CORBA::Long>(mrs.disposed_generation_count_);
00156       id.MRS_nowriters_gc_ =
00157         static_cast<CORBA::Long>(mrs.no_writers_generation_count_);
00158     }
00159 
00160     if (iter->index_in_instance_ >= id.MRSIC_index_) {
00161       id.MRSIC_index_ = iter->index_in_instance_;
00162       id.MRSIC_disposed_gc_ =
00163         static_cast<CORBA::Long>(rde->disposed_generation_count_);
00164       id.MRSIC_nowriters_gc_ =
00165         static_cast<CORBA::Long>(rde->no_writers_generation_count_);
00166     }
00167 
00168     if (!id.most_recent_generation_) {
00169       id.most_recent_generation_ =
00170         inst.instance_state_.most_recent_generation(rde);
00171     }
00172 
00173     id.sampleinfo_positions_.push_back(idx);
00174 
00175     // 4. Take
00176     if (oper_ == DDS_OPERATION_TAKE) {
00177       // If removing the sample releases it
00178       if (inst.rcvd_samples_.remove(rde)) {
00179         // Prevent access of the SampleInfo, below
00180         released_instances.insert(&inst);
00181       }
00182       this->reader_->dec_ref_data_element(rde);
00183     }
00184   }
00185 
00186   // Fill in the *_ranks in the SampleInfo, and set instance state (mrg)
00187   for (typename InstanceMap::iterator i_iter(inst_map.begin()),
00188        i_end(inst_map.end()); i_iter != i_end; ++i_iter) {
00189 
00190     InstanceData& id = i_iter->second;
00191     {  // Danger, limit visibility of inst
00192       SubscriptionInstance& inst = *i_iter->first;
00193       // If this instance has not been released
00194       if (released_instances.find(&inst) == released_instances.end()) {
00195         if (id.most_recent_generation_) {
00196           inst.instance_state_.accessed();
00197         }
00198       }
00199     }
00200 
00201     CORBA::Long sample_rank =
00202       static_cast<CORBA::Long>(id.sampleinfo_positions_.size());
00203 
00204     for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()),
00205          s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) {
00206       DDS::SampleInfo& si = info_seq_[*s_iter];
00207       si.sample_rank = --sample_rank;
00208       si.generation_rank = id.MRSIC_disposed_gc_
00209                            + id.MRSIC_nowriters_gc_ - si.generation_rank;
00210       si.absolute_generation_rank = id.MRS_disposed_gc_ +
00211                                     id.MRS_nowriters_gc_ - si.absolute_generation_rank;
00212     }
00213   }
00214 
00215   return true;
00216 }

template<class SampleSeq>
bool OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user (  ) 

Definition at line 219 of file RakeResults_T.cpp.

References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_, OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_, and OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_.

00220 {
00221   typename SampleSeq::PrivateMemberAccess received_data_p(received_data_);
00222 
00223   if (do_sort_) {
00224     size_t len = std::min(static_cast<size_t>(sorted_.size()),
00225                           static_cast<size_t>(max_samples_));
00226     received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00227     info_seq_.length(static_cast<CORBA::ULong>(len));
00228     return copy_into(sorted_.begin(), sorted_.end(), received_data_p);
00229 
00230   } else {
00231     size_t len = unsorted_.size(); //can't be larger than max_samples_
00232     received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00233     info_seq_.length(static_cast<CORBA::ULong>(len));
00234     return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p);
00235   }
00236 }

template<class SampleSeq>
bool OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample ( ReceivedDataElement sample,
SubscriptionInstance i,
size_t  index_in_instance 
)

Returns false if the sample will definitely not be part of the resulting dataset, however if this returns true it still may be excluded (due to sorting and max_samples).

Definition at line 77 of file RakeResults_T.cpp.

References OpenDDS::DCPS::RakeResults< SampleSeq >::cond_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::filter(), OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::ReceivedDataElement::registered_data_, and OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_.

00080 {
00081 #ifndef OPENDDS_NO_QUERY_CONDITION
00082 
00083   if (do_filter_) {
00084     const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00085     typedef typename SampleSeq::value_type VT;
00086     const VT* typed_sample = static_cast<VT*>(sample->registered_data_);
00087     if (!qci->filter(*typed_sample)) return false;
00088   }
00089 
00090 #endif
00091 
00092   if (do_sort_) {
00093     // N.B. Until a better heuristic is found, non-valid
00094     // samples are elided when sorting by QueryCondition.
00095 #ifndef OPENDDS_NO_QUERY_CONDITION
00096     if (cond_ && !sample->registered_data_) return false;
00097 #endif
00098 
00099     RakeData rd = {sample, instance, index_in_instance};
00100     sorted_.insert(rd);
00101 
00102   } else {
00103     if (unsorted_.size() == max_samples_) return false;
00104 
00105     RakeData rd = {sample, instance, index_in_instance};
00106     unsorted_.push_back(rd);
00107   }
00108 
00109   return true;
00110 }

template<class SampleSeq>
typedef OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_MULTISET_CMP ( RakeData  ,
SortedSetCmp   
) [private]

template<class SampleSeq>
typedef OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_VECTOR ( CORBA::ULong   )  [private]

template<class SampleSeq>
OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_VECTOR ( RakeData   )  [private]

template<class SampleSeq>
RakeResults& OpenDDS::DCPS::RakeResults< SampleSeq >::operator= ( const RakeResults< SampleSeq > &   )  [private]


Member Data Documentation

template<class SampleSeq>
DDS::QueryCondition_ptr OpenDDS::DCPS::RakeResults< SampleSeq >::cond_ [private]

Definition at line 64 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().

template<class SampleSeq>
bool OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_ [private]

Definition at line 87 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().

template<class SampleSeq>
bool OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_ [private]

Definition at line 87 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().

template<class SampleSeq>
DDS::SampleInfoSeq& OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_ [private]

Definition at line 61 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), and OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().

template<class SampleSeq>
CORBA::ULong OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_ [private]

Definition at line 62 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), and OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample().

template<class SampleSeq>
Operation_t OpenDDS::DCPS::RakeResults< SampleSeq >::oper_ [private]

Definition at line 66 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into().

template<class SampleSeq>
DataReaderImpl* OpenDDS::DCPS::RakeResults< SampleSeq >::reader_ [private]

Definition at line 59 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into().

template<class SampleSeq>
SampleSeq& OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_ [private]

Definition at line 60 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), and OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().

template<class SampleSeq>
SortedSet OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_ [private]

Definition at line 91 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user(), OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample(), and OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:21 2016 for OpenDDS by  doxygen 1.4.7