LCOV - code coverage report
Current view: top level - DCPS - RakeResults_T.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 113 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 270 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : #ifndef RAKERESULTS_T_CPP
       8             : #define RAKERESULTS_T_CPP
       9             : 
      10             : #include "RakeResults_T.h"
      11             : #include "SubscriptionInstance.h"
      12             : #include "DataReaderImpl.h"
      13             : #include "QueryConditionImpl.h"
      14             : #include "PoolAllocator.h"
      15             : 
      16             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      17             : 
      18             : namespace OpenDDS {
      19             : namespace DCPS {
      20             : 
      21             : template <class MessageType>
      22           0 : RakeResults<MessageType>::RakeResults(DataReaderImpl* reader,
      23             :                                       SampleSeq& received_data,
      24             :                                       DDS::SampleInfoSeq& info_seq,
      25             :                                       CORBA::Long max_samples,
      26             :                                       DDS::PresentationQosPolicy presentation,
      27             : #ifndef OPENDDS_NO_QUERY_CONDITION
      28             :                                       DDS::QueryCondition_ptr cond,
      29             : #endif
      30             :                                       Operation_t oper)
      31           0 :   : reader_(reader)
      32           0 :   , received_data_(received_data)
      33           0 :   , info_seq_(info_seq)
      34           0 :   , max_samples_(max_samples)
      35             : #ifndef OPENDDS_NO_QUERY_CONDITION
      36           0 :   , cond_(cond)
      37             : #endif
      38           0 :   , oper_(oper)
      39           0 :   , do_sort_(false)
      40           0 :   , do_filter_(false)
      41             : {
      42             : #ifndef OPENDDS_NO_QUERY_CONDITION
      43             : 
      44           0 :   if (cond_) {
      45           0 :     const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
      46           0 :     if (!qci) {
      47           0 :       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RakeResults(): ")
      48             :         ACE_TEXT("failed to obtain QueryConditionImpl\n")));
      49           0 :       return;
      50             :     }
      51           0 :     do_filter_ = qci->hasFilter();
      52           0 :     std::vector<String> order_bys = qci->getOrderBys();
      53           0 :     do_sort_ = order_bys.size() > 0;
      54             : 
      55           0 :     if (do_sort_) {
      56           0 :       ComparatorBase::Ptr cmp;
      57             : 
      58             :       // Iterate in reverse over the comma-separated fields so that the
      59             :       // top-level comparison is the leftmost.  The others will be chained.
      60           0 :       for (size_t i = order_bys.size(); i > 0; --i) {
      61           0 :         const String& fieldspec = order_bys[i - 1];
      62             :         //FUTURE: handle ASC / DESC as an extension to the DDS spec?
      63           0 :         cmp = getMetaStruct<MessageType>().create_qc_comparator(fieldspec.c_str(), cmp);
      64             :       }
      65             : 
      66           0 :       SortedSetCmp comparator(cmp);
      67           0 :       SortedSet actual_sort(comparator);
      68           0 :       sorted_.swap(actual_sort);
      69           0 :     }
      70             : 
      71           0 :   } else {
      72             : #endif
      73             :     // PRESENTATION ordered access (TOPIC)
      74           0 :     do_sort_ = presentation.ordered_access && presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS;
      75             : #ifndef OPENDDS_NO_QUERY_CONDITION
      76             :   }
      77             : 
      78             : #endif
      79           0 : }
      80             : 
      81             : template <class MessageType>
      82           0 : bool RakeResults<MessageType>::insert_sample(ReceivedDataElement* sample,
      83             :                                              ReceivedDataElementList* rdel,
      84             :                                              SubscriptionInstance_rch instance,
      85             :                                              size_t index_in_instance)
      86             : {
      87             : #ifndef OPENDDS_NO_QUERY_CONDITION
      88             : 
      89           0 :   if (do_filter_) {
      90           0 :     const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
      91           0 :     const MessageType* typed_sample = static_cast<MessageType*>(sample->registered_data_);
      92           0 :     if (!qci || !typed_sample || !qci->filter(*typed_sample, !sample->valid_data_)) {
      93           0 :       return false;
      94             :     }
      95             :   }
      96             : 
      97             : #endif
      98             : 
      99           0 :   if (do_sort_) {
     100             :     // N.B. Until a better heuristic is found, non-valid
     101             :     // samples are elided when sorting by QueryCondition.
     102             : #ifndef OPENDDS_NO_QUERY_CONDITION
     103           0 :     if (cond_ && !sample->registered_data_) return false;
     104             : #endif
     105             : 
     106           0 :     RakeData rd = {sample, rdel, instance, index_in_instance};
     107           0 :     sorted_.insert(rd);
     108             : 
     109           0 :   } else {
     110           0 :     if (unsorted_.size() == max_samples_) return false;
     111             : 
     112           0 :     RakeData rd = {sample, rdel, instance, index_in_instance};
     113           0 :     unsorted_.push_back(rd);
     114           0 :   }
     115             : 
     116           0 :   return true;
     117             : }
     118             : 
     119             : template <class MessageType>
     120             : template <class FwdIter>
     121           0 : bool RakeResults<MessageType>::copy_into(FwdIter iter, FwdIter end,
     122             :                                          MessageSequenceAdapterType& received_data_p)
     123             : {
     124             :   typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap;
     125           0 :   InstanceMap inst_map;
     126             : 
     127             :   typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet;
     128           0 :   InstanceSet released_instances;
     129             : 
     130           0 :   for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) {
     131             :     // 1. Populate the Received Data sequence
     132           0 :     ReceivedDataElement* rde = iter->rde_;
     133           0 :     ReceivedDataElementList* rdel = iter->rdel_;
     134             : 
     135           0 :     if (received_data_.maximum() != 0) {
     136           0 :       if (rde->registered_data_ == 0) {
     137           0 :         received_data_p.assign_sample(idx, MessageType());
     138             : 
     139             :       } else {
     140           0 :         received_data_p.assign_sample(idx,
     141           0 :                                       *static_cast<MessageType*>(rde->registered_data_));
     142             :       }
     143             : 
     144             :     } else {
     145           0 :       received_data_p.assign_ptr(idx, rde);
     146             :     }
     147             : 
     148             :     // 2. Per-sample SampleInfo (not the three *_rank variables) and state
     149           0 :     SubscriptionInstance& inst = *iter->si_;
     150           0 :     inst.instance_state_->sample_info(info_seq_[idx], rde);
     151           0 :     rdel->mark_read(rde);
     152             : 
     153             :     // 3. Record some info about per-instance SampleInfo (*_rank) so that
     154             :     //    we can fill in the ranks after the loop has completed
     155             :     std::pair<typename InstanceMap::iterator, bool> result =
     156           0 :       inst_map.insert(std::make_pair(&inst, InstanceData()));
     157           0 :     InstanceData& id = result.first->second;
     158             : 
     159           0 :     if (result.second) { // first time we've seen this Instance
     160           0 :       const ReceivedDataElement& mrs = *inst.rcvd_samples_.peek_tail();
     161           0 :       id.MRS_disposed_gc_ =
     162           0 :         static_cast<CORBA::Long>(mrs.disposed_generation_count_);
     163           0 :       id.MRS_nowriters_gc_ =
     164           0 :         static_cast<CORBA::Long>(mrs.no_writers_generation_count_);
     165             :     }
     166             : 
     167           0 :     if (iter->index_in_instance_ >= id.MRSIC_index_) {
     168           0 :       id.MRSIC_index_ = iter->index_in_instance_;
     169           0 :       id.MRSIC_disposed_gc_ =
     170           0 :         static_cast<CORBA::Long>(rde->disposed_generation_count_);
     171           0 :       id.MRSIC_nowriters_gc_ =
     172           0 :         static_cast<CORBA::Long>(rde->no_writers_generation_count_);
     173             :     }
     174             : 
     175           0 :     if (!id.most_recent_generation_) {
     176           0 :       id.most_recent_generation_ =
     177           0 :         inst.instance_state_->most_recent_generation(rde);
     178             :     }
     179             : 
     180           0 :     id.sampleinfo_positions_.push_back(idx);
     181             : 
     182             :     // 4. Take
     183           0 :     if (oper_ == DDS_OPERATION_TAKE) {
     184             :       // If removing the sample releases it
     185           0 :       if (inst.rcvd_samples_.remove(rde)) {
     186             :         // Prevent access of the SampleInfo, below
     187           0 :         released_instances.insert(&inst);
     188             :       }
     189           0 :       rde->dec_ref();
     190             :     }
     191             :   }
     192             : 
     193             :   // Fill in the *_ranks in the SampleInfo, and set instance state (mrg)
     194           0 :   for (typename InstanceMap::iterator i_iter(inst_map.begin()),
     195           0 :        i_end(inst_map.end()); i_iter != i_end; ++i_iter) {
     196             : 
     197           0 :     InstanceData& id = i_iter->second;
     198             :     {  // Danger, limit visibility of inst
     199           0 :       SubscriptionInstance& inst = *i_iter->first;
     200             :       // If this instance has not been released
     201           0 :       if (released_instances.find(&inst) == released_instances.end()) {
     202           0 :         if (id.most_recent_generation_) {
     203           0 :           inst.instance_state_->accessed();
     204             :         }
     205             :       }
     206             :     }
     207             : 
     208           0 :     CORBA::Long sample_rank =
     209           0 :       static_cast<CORBA::Long>(id.sampleinfo_positions_.size());
     210             : 
     211           0 :     for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()),
     212           0 :          s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) {
     213           0 :       DDS::SampleInfo& si = info_seq_[*s_iter];
     214           0 :       si.sample_rank = --sample_rank;
     215           0 :       si.generation_rank = id.MRSIC_disposed_gc_
     216           0 :                            + id.MRSIC_nowriters_gc_ - si.generation_rank;
     217           0 :       si.absolute_generation_rank = id.MRS_disposed_gc_ +
     218           0 :                                     id.MRS_nowriters_gc_ - si.absolute_generation_rank;
     219             :     }
     220             :   }
     221             : 
     222           0 :   return true;
     223           0 : }
     224             : 
     225             : template <class MessageType>
     226           0 : bool RakeResults<MessageType>::copy_to_user()
     227             : {
     228           0 :   MessageSequenceAdapterType received_data_p(received_data_);
     229             : 
     230           0 :   if (do_sort_) {
     231           0 :     size_t len = std::min(static_cast<size_t>(sorted_.size()),
     232           0 :                           static_cast<size_t>(max_samples_));
     233           0 :     received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
     234           0 :     info_seq_.length(static_cast<CORBA::ULong>(len));
     235           0 :     return copy_into(sorted_.begin(), sorted_.end(), received_data_p);
     236             : 
     237             :   } else {
     238           0 :     size_t len = unsorted_.size(); //can't be larger than max_samples_
     239           0 :     received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
     240           0 :     info_seq_.length(static_cast<CORBA::ULong>(len));
     241           0 :     return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p);
     242             :   }
     243             : }
     244             : 
     245             : } // namespace DCPS
     246             : } // namespace OpenDDS
     247             : 
     248             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     249             : 
     250             : #endif // RAKERESULTS_T_CPP

Generated by: LCOV version 1.16