OpenDDS::DCPS::RakeResults< SampleSeq > Class Template Reference

#include <RakeResults_T.h>

Collaboration diagram for OpenDDS::DCPS::RakeResults< SampleSeq >:
Collaboration graph
[legend]

List of all members.

Classes

struct  InstanceData
class  SortedSetCmp

Public Member Functions

 RakeResults (DataReaderImpl *reader, SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::PresentationQosPolicy presentation, DDS::QueryCondition_ptr cond, Operation_t oper)
bool insert_sample (ReceivedDataElement *sample, SubscriptionInstance_rch i, size_t index_in_instance)
bool copy_to_user ()

Private Member Functions

template<class FwdIter >
bool copy_into (FwdIter begin, FwdIter end, typename SampleSeq::PrivateMemberAccess &received_data_p)
 RakeResults (const RakeResults &)
RakeResultsoperator= (const RakeResults &)
typedef OPENDDS_MULTISET_CMP (RakeData, SortedSetCmp) SortedSet
 OPENDDS_VECTOR (RakeData) unsorted_
typedef OPENDDS_VECTOR (CORBA::ULong) IndexList

Private Attributes

DataReaderImplreader_
SampleSeq & received_data_
DDS::SampleInfoSeqinfo_seq_
CORBA::ULong max_samples_
DDS::QueryCondition_ptr cond_
Operation_t oper_
bool do_sort_
bool do_filter_
SortedSet sorted_

Detailed Description

template<class SampleSeq>
class OpenDDS::DCPS::RakeResults< SampleSeq >

Rake is an abbreviation for "read or take". This class manages the results from a read() or take() operation, which are the received_data and the info_seq sequences passed in by-reference from the user.

Definition at line 33 of file RakeResults_T.h.


Constructor & Destructor Documentation

template<class SampleSeq >
OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults ( DataReaderImpl reader,
SampleSeq &  received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::PresentationQosPolicy  presentation,
DDS::QueryCondition_ptr  cond,
Operation_t  oper 
) [inline]

Definition at line 22 of file RakeResults_T.cpp.

References DDS::PresentationQosPolicy::access_scope, ACE_TEXT(), OpenDDS::DCPS::RakeResults< SampleSeq >::cond_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::getOrderBys(), OpenDDS::DCPS::QueryConditionImpl::hasFilter(), LM_DEBUG, OPENDDS_STRING, DDS::PresentationQosPolicy::ordered_access, OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_, and DDS::TOPIC_PRESENTATION_QOS.

00031   : reader_(reader)
00032   , received_data_(received_data)
00033   , info_seq_(info_seq)
00034   , max_samples_(max_samples)
00035 #ifndef OPENDDS_NO_QUERY_CONDITION
00036   , cond_(cond)
00037 #endif
00038   , oper_(oper)
00039   , do_sort_(false)
00040   , do_filter_(false)
00041 {
00042 #ifndef OPENDDS_NO_QUERY_CONDITION
00043 
00044   if (cond_) {
00045     const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00046     if (!qci) {
00047       ACE_ERROR((LM_DEBUG, ACE_TEXT("(%P|%t) ERROR: RakeResults(): ")
00048         ACE_TEXT("failed to obtain QueryConditionImpl\n")));
00049       return;
00050     }
00051     do_filter_ = qci->hasFilter();
00052     std::vector<OPENDDS_STRING> order_bys = qci->getOrderBys();
00053     do_sort_ = order_bys.size() > 0;
00054 
00055     if (do_sort_) {
00056       ComparatorBase::Ptr cmp;
00057 
00058       // Iterate in reverse over the comma-separated fields so that the
00059       // top-level comparison is the leftmost.  The others will be chained.
00060       for (size_t i = order_bys.size(); i > 0; --i) {
00061         const OPENDDS_STRING& fieldspec = order_bys[i - 1];
00062         //FUTURE: handle ASC / DESC as an extension to the DDS spec?
00063         cmp = getMetaStruct<typename SampleSeq::value_type>()
00064           .create_qc_comparator(fieldspec.c_str(), cmp);
00065       }
00066 
00067       SortedSetCmp comparator(cmp);
00068       SortedSet actual_sort(comparator);
00069       sorted_.swap(actual_sort);
00070     }
00071 
00072   } else {
00073 #endif
00074     // PRESENTATION ordered access (TOPIC)
00075     this->do_sort_ = presentation.ordered_access == true &&
00076                      presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS;
00077 #ifndef OPENDDS_NO_QUERY_CONDITION
00078   }
00079 
00080 #endif
00081 }

Here is the call graph for this function:

template<class SampleSeq>
OpenDDS::DCPS::RakeResults< SampleSeq >::RakeResults ( const RakeResults< SampleSeq > &   )  [private]

Member Function Documentation

template<class SampleSeq >
template<class FwdIter >
bool OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into ( FwdIter  begin,
FwdIter  end,
typename SampleSeq::PrivateMemberAccess &  received_data_p 
) [inline, private]

Definition at line 121 of file RakeResults_T.cpp.

References DDS::SampleInfo::absolute_generation_rank, OpenDDS::DCPS::InstanceState::accessed(), OpenDDS::DCPS::DDS_OPERATION_TAKE, OpenDDS::DCPS::ReceivedDataElement::dec_ref(), OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, DDS::SampleInfo::generation_rank, OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::InstanceState::most_recent_generation(), OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_SET, OpenDDS::DCPS::RakeResults< SampleSeq >::oper_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, DDS::READ_SAMPLE_STATE, OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_, OpenDDS::DCPS::ReceivedDataElement::registered_data_, OpenDDS::DCPS::ReceivedDataElementList::remove(), OpenDDS::DCPS::InstanceState::sample_info(), DDS::SampleInfo::sample_rank, OpenDDS::DCPS::ReceivedDataElement::sample_state_, and OpenDDS::DCPS::ReceivedDataElementList::tail_.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user().

00123 {
00124   typedef typename SampleSeq::value_type Sample;
00125   typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap;
00126   InstanceMap inst_map;
00127 
00128   typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet;
00129   InstanceSet released_instances;
00130 
00131   for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) {
00132     // 1. Populate the Received Data sequence
00133     ReceivedDataElement* rde = iter->rde_;
00134 
00135     if (received_data_.maximum() != 0) {
00136       if (rde->registered_data_ == 0) {
00137         received_data_p.assign_sample(idx, Sample());
00138 
00139       } else {
00140         received_data_p.assign_sample(idx,
00141                                       *static_cast<Sample*>(rde->registered_data_));
00142       }
00143 
00144     } else {
00145       received_data_p.assign_ptr(idx, rde);
00146     }
00147 
00148     // 2. Per-sample SampleInfo (not the three *_rank variables) and state
00149     SubscriptionInstance& inst = *iter->si_;
00150     inst.instance_state_.sample_info(info_seq_[idx], rde);
00151     rde->sample_state_ = DDS::READ_SAMPLE_STATE;
00152 
00153     // 3. Record some info about per-instance SampleInfo (*_rank) so that
00154     //    we can fill in the ranks after the loop has completed
00155     std::pair<typename InstanceMap::iterator, bool> result =
00156       inst_map.insert(std::make_pair(&inst, InstanceData()));
00157     InstanceData& id = result.first->second;
00158 
00159     if (result.second) { // first time we've seen this Instance
00160       ReceivedDataElement& mrs = *inst.rcvd_samples_.tail_;
00161       id.MRS_disposed_gc_ =
00162         static_cast<CORBA::Long>(mrs.disposed_generation_count_);
00163       id.MRS_nowriters_gc_ =
00164         static_cast<CORBA::Long>(mrs.no_writers_generation_count_);
00165     }
00166 
00167     if (iter->index_in_instance_ >= id.MRSIC_index_) {
00168       id.MRSIC_index_ = iter->index_in_instance_;
00169       id.MRSIC_disposed_gc_ =
00170         static_cast<CORBA::Long>(rde->disposed_generation_count_);
00171       id.MRSIC_nowriters_gc_ =
00172         static_cast<CORBA::Long>(rde->no_writers_generation_count_);
00173     }
00174 
00175     if (!id.most_recent_generation_) {
00176       id.most_recent_generation_ =
00177         inst.instance_state_.most_recent_generation(rde);
00178     }
00179 
00180     id.sampleinfo_positions_.push_back(idx);
00181 
00182     // 4. Take
00183     if (oper_ == DDS_OPERATION_TAKE) {
00184       // If removing the sample releases it
00185       if (inst.rcvd_samples_.remove(rde)) {
00186         // Prevent access of the SampleInfo, below
00187         released_instances.insert(&inst);
00188       }
00189       rde->dec_ref();
00190     }
00191   }
00192 
00193   // Fill in the *_ranks in the SampleInfo, and set instance state (mrg)
00194   for (typename InstanceMap::iterator i_iter(inst_map.begin()),
00195        i_end(inst_map.end()); i_iter != i_end; ++i_iter) {
00196 
00197     InstanceData& id = i_iter->second;
00198     {  // Danger, limit visibility of inst
00199       SubscriptionInstance& inst = *i_iter->first;
00200       // If this instance has not been released
00201       if (released_instances.find(&inst) == released_instances.end()) {
00202         if (id.most_recent_generation_) {
00203           inst.instance_state_.accessed();
00204         }
00205       }
00206     }
00207 
00208     CORBA::Long sample_rank =
00209       static_cast<CORBA::Long>(id.sampleinfo_positions_.size());
00210 
00211     for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()),
00212          s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) {
00213       DDS::SampleInfo& si = info_seq_[*s_iter];
00214       si.sample_rank = --sample_rank;
00215       si.generation_rank = id.MRSIC_disposed_gc_
00216                            + id.MRSIC_nowriters_gc_ - si.generation_rank;
00217       si.absolute_generation_rank = id.MRS_disposed_gc_ +
00218                                     id.MRS_nowriters_gc_ - si.absolute_generation_rank;
00219     }
00220   }
00221 
00222   return true;
00223 }

Here is the call graph for this function:

Here is the caller graph for this function:

template<class SampleSeq >
bool OpenDDS::DCPS::RakeResults< SampleSeq >::copy_to_user (  )  [inline]

Definition at line 226 of file RakeResults_T.cpp.

References OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into(), OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_, len, OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_, and OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i(), and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i().

00227 {
00228   typename SampleSeq::PrivateMemberAccess received_data_p(received_data_);
00229 
00230   if (do_sort_) {
00231     size_t len = std::min(static_cast<size_t>(sorted_.size()),
00232                           static_cast<size_t>(max_samples_));
00233     received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00234     info_seq_.length(static_cast<CORBA::ULong>(len));
00235     return copy_into(sorted_.begin(), sorted_.end(), received_data_p);
00236 
00237   } else {
00238     size_t len = unsorted_.size(); //can't be larger than max_samples_
00239     received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
00240     info_seq_.length(static_cast<CORBA::ULong>(len));
00241     return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p);
00242   }
00243 }

Here is the call graph for this function:

Here is the caller graph for this function:

template<class SampleSeq >
bool OpenDDS::DCPS::RakeResults< SampleSeq >::insert_sample ( ReceivedDataElement sample,
SubscriptionInstance_rch  i,
size_t  index_in_instance 
) [inline]

Returns false if the sample will definitely not be part of the resulting dataset, however if this returns true it still may be excluded (due to sorting and max_samples).

Definition at line 84 of file RakeResults_T.cpp.

References OpenDDS::DCPS::RakeResults< SampleSeq >::cond_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_, OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_, OpenDDS::DCPS::QueryConditionImpl::filter(), OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_, OpenDDS::DCPS::ReceivedDataElement::registered_data_, and OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i(), OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i(), and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i().

00087 {
00088 #ifndef OPENDDS_NO_QUERY_CONDITION
00089 
00090   if (do_filter_) {
00091     const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
00092     typedef typename SampleSeq::value_type VT;
00093     const VT* typed_sample = static_cast<VT*>(sample->registered_data_);
00094     if (!qci || !typed_sample || !qci->filter(*typed_sample)) return false;
00095   }
00096 
00097 #endif
00098 
00099   if (do_sort_) {
00100     // N.B. Until a better heuristic is found, non-valid
00101     // samples are elided when sorting by QueryCondition.
00102 #ifndef OPENDDS_NO_QUERY_CONDITION
00103     if (cond_ && !sample->registered_data_) return false;
00104 #endif
00105 
00106     RakeData rd = {sample, instance, index_in_instance};
00107     sorted_.insert(rd);
00108 
00109   } else {
00110     if (unsorted_.size() == max_samples_) return false;
00111 
00112     RakeData rd = {sample, instance, index_in_instance};
00113     unsorted_.push_back(rd);
00114   }
00115 
00116   return true;
00117 }

Here is the call graph for this function:

Here is the caller graph for this function:

template<class SampleSeq>
typedef OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_MULTISET_CMP ( RakeData  ,
SortedSetCmp   
) [private]
template<class SampleSeq>
typedef OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_VECTOR ( CORBA::ULong   )  [private]
template<class SampleSeq>
OpenDDS::DCPS::RakeResults< SampleSeq >::OPENDDS_VECTOR ( RakeData   )  [private]
template<class SampleSeq>
RakeResults& OpenDDS::DCPS::RakeResults< SampleSeq >::operator= ( const RakeResults< SampleSeq > &   )  [private]

Member Data Documentation

template<class SampleSeq>
DDS::QueryCondition_ptr OpenDDS::DCPS::RakeResults< SampleSeq >::cond_ [private]
template<class SampleSeq>
bool OpenDDS::DCPS::RakeResults< SampleSeq >::do_filter_ [private]
template<class SampleSeq>
bool OpenDDS::DCPS::RakeResults< SampleSeq >::do_sort_ [private]
template<class SampleSeq>
DDS::SampleInfoSeq& OpenDDS::DCPS::RakeResults< SampleSeq >::info_seq_ [private]
template<class SampleSeq>
CORBA::ULong OpenDDS::DCPS::RakeResults< SampleSeq >::max_samples_ [private]
template<class SampleSeq>
Operation_t OpenDDS::DCPS::RakeResults< SampleSeq >::oper_ [private]

Definition at line 68 of file RakeResults_T.h.

Referenced by OpenDDS::DCPS::RakeResults< SampleSeq >::copy_into().

template<class SampleSeq>
DataReaderImpl* OpenDDS::DCPS::RakeResults< SampleSeq >::reader_ [private]

Definition at line 61 of file RakeResults_T.h.

template<class SampleSeq>
SampleSeq& OpenDDS::DCPS::RakeResults< SampleSeq >::received_data_ [private]
template<class SampleSeq>
SortedSet OpenDDS::DCPS::RakeResults< SampleSeq >::sorted_ [private]

The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1