Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 : #ifndef RAKERESULTS_T_CPP
8 : #define RAKERESULTS_T_CPP
9 :
10 : #include "RakeResults_T.h"
11 : #include "SubscriptionInstance.h"
12 : #include "DataReaderImpl.h"
13 : #include "QueryConditionImpl.h"
14 : #include "PoolAllocator.h"
15 :
16 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
17 :
18 : namespace OpenDDS {
19 : namespace DCPS {
20 :
21 : template <class MessageType>
22 0 : RakeResults<MessageType>::RakeResults(DataReaderImpl* reader,
23 : SampleSeq& received_data,
24 : DDS::SampleInfoSeq& info_seq,
25 : CORBA::Long max_samples,
26 : DDS::PresentationQosPolicy presentation,
27 : #ifndef OPENDDS_NO_QUERY_CONDITION
28 : DDS::QueryCondition_ptr cond,
29 : #endif
30 : Operation_t oper)
31 0 : : reader_(reader)
32 0 : , received_data_(received_data)
33 0 : , info_seq_(info_seq)
34 0 : , max_samples_(max_samples)
35 : #ifndef OPENDDS_NO_QUERY_CONDITION
36 0 : , cond_(cond)
37 : #endif
38 0 : , oper_(oper)
39 0 : , do_sort_(false)
40 0 : , do_filter_(false)
41 : {
42 : #ifndef OPENDDS_NO_QUERY_CONDITION
43 :
44 0 : if (cond_) {
45 0 : const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
46 0 : if (!qci) {
47 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RakeResults(): ")
48 : ACE_TEXT("failed to obtain QueryConditionImpl\n")));
49 0 : return;
50 : }
51 0 : do_filter_ = qci->hasFilter();
52 0 : std::vector<String> order_bys = qci->getOrderBys();
53 0 : do_sort_ = order_bys.size() > 0;
54 :
55 0 : if (do_sort_) {
56 0 : ComparatorBase::Ptr cmp;
57 :
58 : // Iterate in reverse over the comma-separated fields so that the
59 : // top-level comparison is the leftmost. The others will be chained.
60 0 : for (size_t i = order_bys.size(); i > 0; --i) {
61 0 : const String& fieldspec = order_bys[i - 1];
62 : //FUTURE: handle ASC / DESC as an extension to the DDS spec?
63 0 : cmp = getMetaStruct<MessageType>().create_qc_comparator(fieldspec.c_str(), cmp);
64 : }
65 :
66 0 : SortedSetCmp comparator(cmp);
67 0 : SortedSet actual_sort(comparator);
68 0 : sorted_.swap(actual_sort);
69 0 : }
70 :
71 0 : } else {
72 : #endif
73 : // PRESENTATION ordered access (TOPIC)
74 0 : do_sort_ = presentation.ordered_access && presentation.access_scope == DDS::TOPIC_PRESENTATION_QOS;
75 : #ifndef OPENDDS_NO_QUERY_CONDITION
76 : }
77 :
78 : #endif
79 0 : }
80 :
81 : template <class MessageType>
82 0 : bool RakeResults<MessageType>::insert_sample(ReceivedDataElement* sample,
83 : ReceivedDataElementList* rdel,
84 : SubscriptionInstance_rch instance,
85 : size_t index_in_instance)
86 : {
87 : #ifndef OPENDDS_NO_QUERY_CONDITION
88 :
89 0 : if (do_filter_) {
90 0 : const QueryConditionImpl* qci = dynamic_cast<QueryConditionImpl*>(cond_);
91 0 : const MessageType* typed_sample = static_cast<MessageType*>(sample->registered_data_);
92 0 : if (!qci || !typed_sample || !qci->filter(*typed_sample, !sample->valid_data_)) {
93 0 : return false;
94 : }
95 : }
96 :
97 : #endif
98 :
99 0 : if (do_sort_) {
100 : // N.B. Until a better heuristic is found, non-valid
101 : // samples are elided when sorting by QueryCondition.
102 : #ifndef OPENDDS_NO_QUERY_CONDITION
103 0 : if (cond_ && !sample->registered_data_) return false;
104 : #endif
105 :
106 0 : RakeData rd = {sample, rdel, instance, index_in_instance};
107 0 : sorted_.insert(rd);
108 :
109 0 : } else {
110 0 : if (unsorted_.size() == max_samples_) return false;
111 :
112 0 : RakeData rd = {sample, rdel, instance, index_in_instance};
113 0 : unsorted_.push_back(rd);
114 0 : }
115 :
116 0 : return true;
117 : }
118 :
119 : template <class MessageType>
120 : template <class FwdIter>
121 0 : bool RakeResults<MessageType>::copy_into(FwdIter iter, FwdIter end,
122 : MessageSequenceAdapterType& received_data_p)
123 : {
124 : typedef OPENDDS_MAP(SubscriptionInstance*, InstanceData) InstanceMap;
125 0 : InstanceMap inst_map;
126 :
127 : typedef OPENDDS_SET(SubscriptionInstance*) InstanceSet;
128 0 : InstanceSet released_instances;
129 :
130 0 : for (CORBA::ULong idx = 0; iter != end && idx < max_samples_; ++idx, ++iter) {
131 : // 1. Populate the Received Data sequence
132 0 : ReceivedDataElement* rde = iter->rde_;
133 0 : ReceivedDataElementList* rdel = iter->rdel_;
134 :
135 0 : if (received_data_.maximum() != 0) {
136 0 : if (rde->registered_data_ == 0) {
137 0 : received_data_p.assign_sample(idx, MessageType());
138 :
139 : } else {
140 0 : received_data_p.assign_sample(idx,
141 0 : *static_cast<MessageType*>(rde->registered_data_));
142 : }
143 :
144 : } else {
145 0 : received_data_p.assign_ptr(idx, rde);
146 : }
147 :
148 : // 2. Per-sample SampleInfo (not the three *_rank variables) and state
149 0 : SubscriptionInstance& inst = *iter->si_;
150 0 : inst.instance_state_->sample_info(info_seq_[idx], rde);
151 0 : rdel->mark_read(rde);
152 :
153 : // 3. Record some info about per-instance SampleInfo (*_rank) so that
154 : // we can fill in the ranks after the loop has completed
155 : std::pair<typename InstanceMap::iterator, bool> result =
156 0 : inst_map.insert(std::make_pair(&inst, InstanceData()));
157 0 : InstanceData& id = result.first->second;
158 :
159 0 : if (result.second) { // first time we've seen this Instance
160 0 : const ReceivedDataElement& mrs = *inst.rcvd_samples_.peek_tail();
161 0 : id.MRS_disposed_gc_ =
162 0 : static_cast<CORBA::Long>(mrs.disposed_generation_count_);
163 0 : id.MRS_nowriters_gc_ =
164 0 : static_cast<CORBA::Long>(mrs.no_writers_generation_count_);
165 : }
166 :
167 0 : if (iter->index_in_instance_ >= id.MRSIC_index_) {
168 0 : id.MRSIC_index_ = iter->index_in_instance_;
169 0 : id.MRSIC_disposed_gc_ =
170 0 : static_cast<CORBA::Long>(rde->disposed_generation_count_);
171 0 : id.MRSIC_nowriters_gc_ =
172 0 : static_cast<CORBA::Long>(rde->no_writers_generation_count_);
173 : }
174 :
175 0 : if (!id.most_recent_generation_) {
176 0 : id.most_recent_generation_ =
177 0 : inst.instance_state_->most_recent_generation(rde);
178 : }
179 :
180 0 : id.sampleinfo_positions_.push_back(idx);
181 :
182 : // 4. Take
183 0 : if (oper_ == DDS_OPERATION_TAKE) {
184 : // If removing the sample releases it
185 0 : if (inst.rcvd_samples_.remove(rde)) {
186 : // Prevent access of the SampleInfo, below
187 0 : released_instances.insert(&inst);
188 : }
189 0 : rde->dec_ref();
190 : }
191 : }
192 :
193 : // Fill in the *_ranks in the SampleInfo, and set instance state (mrg)
194 0 : for (typename InstanceMap::iterator i_iter(inst_map.begin()),
195 0 : i_end(inst_map.end()); i_iter != i_end; ++i_iter) {
196 :
197 0 : InstanceData& id = i_iter->second;
198 : { // Danger, limit visibility of inst
199 0 : SubscriptionInstance& inst = *i_iter->first;
200 : // If this instance has not been released
201 0 : if (released_instances.find(&inst) == released_instances.end()) {
202 0 : if (id.most_recent_generation_) {
203 0 : inst.instance_state_->accessed();
204 : }
205 : }
206 : }
207 :
208 0 : CORBA::Long sample_rank =
209 0 : static_cast<CORBA::Long>(id.sampleinfo_positions_.size());
210 :
211 0 : for (IndexList::iterator s_iter(id.sampleinfo_positions_.begin()),
212 0 : s_end(id.sampleinfo_positions_.end()); s_iter != s_end; ++s_iter) {
213 0 : DDS::SampleInfo& si = info_seq_[*s_iter];
214 0 : si.sample_rank = --sample_rank;
215 0 : si.generation_rank = id.MRSIC_disposed_gc_
216 0 : + id.MRSIC_nowriters_gc_ - si.generation_rank;
217 0 : si.absolute_generation_rank = id.MRS_disposed_gc_ +
218 0 : id.MRS_nowriters_gc_ - si.absolute_generation_rank;
219 : }
220 : }
221 :
222 0 : return true;
223 0 : }
224 :
225 : template <class MessageType>
226 0 : bool RakeResults<MessageType>::copy_to_user()
227 : {
228 0 : MessageSequenceAdapterType received_data_p(received_data_);
229 :
230 0 : if (do_sort_) {
231 0 : size_t len = std::min(static_cast<size_t>(sorted_.size()),
232 0 : static_cast<size_t>(max_samples_));
233 0 : received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
234 0 : info_seq_.length(static_cast<CORBA::ULong>(len));
235 0 : return copy_into(sorted_.begin(), sorted_.end(), received_data_p);
236 :
237 : } else {
238 0 : size_t len = unsorted_.size(); //can't be larger than max_samples_
239 0 : received_data_p.internal_set_length(static_cast<CORBA::ULong>(len));
240 0 : info_seq_.length(static_cast<CORBA::ULong>(len));
241 0 : return copy_into(unsorted_.begin(), unsorted_.end(), received_data_p);
242 : }
243 : }
244 :
245 : } // namespace DCPS
246 : } // namespace OpenDDS
247 :
248 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
249 :
250 : #endif // RAKERESULTS_T_CPP
|