OpenDDS  Snapshot(2023/04/28-20:55)
MultiTopicDataReader_T.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
9 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
10 
11 #ifndef OPENDDS_NO_MULTI_TOPIC
12 
13 #include <stdexcept>
14 #include <sstream>
15 
16 
18 
19 namespace OpenDDS {
20 namespace DCPS {
21 
22 template<typename Sample, typename TypedDataReader>
23 void
25 {
26  typed_reader_ = TypedDataReader::Interface::_narrow(dr);
27 }
28 
29 template<typename Sample, typename TypedDataReader>
30 const MetaStruct&
32 {
33  return getMetaStruct<Sample>();
34 }
35 
36 template<typename Sample, typename TypedDataReader>
37 void
39  void* incoming, const MultiTopicDataReaderBase::QueryPlan& qp, const MetaStruct& meta)
40 {
41  using namespace std;
42  const vector<SubjectFieldSpec>& proj = qp.projection_;
43  const MetaStruct& resulting_meta = getResultingMeta();
44 
45  typedef vector<SubjectFieldSpec>::const_iterator iter_t;
46  for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
47  resulting_meta.assign(&resulting, iter->resulting_name_.c_str(),
48  incoming, iter->incoming_name_.c_str(), meta);
49  }
50 
51  const vector<OPENDDS_STRING>& proj_out = qp.keys_projected_out_;
52  for (vector<OPENDDS_STRING>::const_iterator iter = proj_out.begin();
53  iter != proj_out.end(); ++iter) {
54  resulting_meta.assign(&resulting, iter->c_str(),
55  incoming, iter->c_str(), meta);
56  }
57 }
58 
59 template<typename Sample, typename TypedDataReader>
60 void
62  Sample& target, const Sample& source, const TopicSet& other_topics)
63 {
64  using namespace std;
65  const MetaStruct& resulting_meta = getResultingMeta();
66 
67  for (TopicSet::const_iterator iterTopic = other_topics.begin();
68  iterTopic != other_topics.end(); ++iterTopic) {
69  const vector<SubjectFieldSpec>& proj = query_plans_[*iterTopic].projection_;
70  typedef vector<SubjectFieldSpec>::const_iterator iter_t;
71  for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
72  resulting_meta.assign(&target, iter->resulting_name_.c_str(),
73  &source, iter->resulting_name_.c_str(), resulting_meta);
74  }
75  }
76 }
77 
78 template<typename Sample, typename TypedDataReader>
79 bool
81  SampleVec& resulting, const SampleWithInfo& prototype,
82  const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
83  DDS::DataReader_ptr other_dr, const MetaStruct& other_meta)
84 {
85  using namespace DDS;
86  DataReaderImpl* other_dri = dynamic_cast<DataReaderImpl*>(other_dr);
87  if (!other_dri) {
88  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReader_T::join: ")
89  ACE_TEXT("Failed to get DataReaderImpl.\n")));
90  return false;
91  }
92 
93  TopicDescription_var other_td = other_dri->get_topicdescription();
94  CORBA::String_var other_topic = other_td->get_name();
95  const QueryPlan& other_qp = query_plans_[other_topic.in()];
96  const size_t n_keys = key_names.size();
97 
98  if (n_keys > 0 && other_meta.numDcpsKeys() == n_keys) { // complete key
99  InstanceHandle_t ih = other_dri->lookup_instance_generic(key_data);
100  if (ih != HANDLE_NIL) {
101  GenericData other_data(other_meta, false);
102  SampleInfo info;
103  const ReturnCode_t ret = other_dri->read_instance_generic(other_data.ptr_,
105  if (ret != DDS::RETCODE_OK && ret != DDS::RETCODE_NO_DATA) {
106  if (log_level >= LogLevel::Notice) {
107  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: MultiTopicDataReader_T::join: read_instance_generic"
108  " for topic %C returns %C\n", other_topic.in(), retcode_to_string(ret)));
109  }
110  return false;
111  }
112  if (ret == DDS::RETCODE_NO_DATA || !info.valid_data) {
113  return false;
114  }
115 
116  resulting.push_back(prototype);
117  resulting.back().combine(SampleWithInfo(other_topic.in(), info));
118  assign_fields(resulting.back().sample_, other_data.ptr_, other_qp, other_meta);
119  }
120  } else { // incomplete key or cross-join (0 key fields)
121  ReturnCode_t ret = RETCODE_OK;
122  for (InstanceHandle_t ih = HANDLE_NIL; ret != RETCODE_NO_DATA;) {
123  GenericData other_data(other_meta, false);
124  SampleInfo info;
125  const ReturnCode_t ret = other_dri->read_next_instance_generic(other_data.ptr_,
127  if (ret != RETCODE_OK && ret != RETCODE_NO_DATA) {
128  if (log_level >= LogLevel::Notice) {
129  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: MultiTopicDataReader_T::join:"
130  " read_next_instance_generic for topic %C returns %C\n",
131  other_topic.in(), retcode_to_string(ret)));
132  }
133  return false;
134  }
135  if (ret == RETCODE_NO_DATA || !info.valid_data) {
136  break;
137  }
138  ih = info.instance_handle;
139 
140  bool match = true;
141  for (size_t i = 0; match && i < key_names.size(); ++i) {
142  if (!other_meta.compare(key_data, other_data.ptr_, key_names[i].c_str())) {
143  match = false;
144  }
145  }
146 
147  if (match) {
148  resulting.push_back(prototype);
149  resulting.back().combine(SampleWithInfo(other_topic.in(), info));
150  assign_fields(resulting.back().sample_, other_data.ptr_, other_qp, other_meta);
151  }
152  }
153  }
154  return true;
155 }
156 
157 template<typename Sample, typename TypedDataReader>
158 void
160  SampleVec& resulting, const SampleVec& other,
161  const std::vector<OPENDDS_STRING>& key_names, const TopicSet& other_topics)
162 {
163  const MetaStruct& meta = getResultingMeta();
164  SampleVec new_data;
165  for (typename SampleVec::iterator it_res = resulting.begin();
166  it_res != resulting.end(); /*incremented in loop*/) {
167  bool found_one_match = false;
168  for (typename SampleVec::const_iterator it_other = other.begin();
169  it_other != other.end(); ++it_other) {
170  bool match = true;
171  for (size_t i = 0; match && i < key_names.size(); ++i) {
172  if (!meta.compare(&*it_res, &*it_other, key_names[i].c_str())) {
173  match = false;
174  }
175  }
176  if (!match) {
177  continue;
178  }
179  if (found_one_match) {
180  new_data.push_back(*it_res);
181  new_data.back().combine(*it_other);
182  assign_resulting_fields(new_data.back().sample_, it_other->sample_, other_topics);
183  } else {
184  found_one_match = true;
185  it_res->combine(*it_other);
186  assign_resulting_fields(it_res->sample_, it_other->sample_, other_topics);
187  }
188  }
189  if (found_one_match) {
190  ++it_res;
191  } else {
192  // no match found in 'other' so data must not appear in result set
193  it_res = resulting.erase(it_res);
194  }
195  }
196  resulting.insert(resulting.end(), new_data.begin(), new_data.end());
197 }
198 
199 /**
200  * Two constituent topics are joinable directly if they have some common join keys,
201  * or indirectly via a third topic which has common join keys with each of them.
202  * The constituent topics form one or more groups of connected topics. In each groups,
203  * topics are connected. But any two groups are not connected (otherwise, they would
204  * have been a single group). And so groups are cross-joined with each other.
205  * Within a group, topics are joined using process_joins(). Starting from an incoming
206  * sample of a given topic, it computes partial resulting samples from the samples of
207  * the constituent topics in the same group that are already received, in a DFS manner.
208  * @partial_results contains entries for sets of topics, each corresponding to a path
209  * of topics which have been visited, starting from the incoming topic.
210  * For example, if the graph of visited topics looks like this
211  * A (incoming topic)
212  * / \
213  * B C
214  * / / | \
215  * D E F G
216  * / . . .
217  * H . . .
218  * . . . .
219  * then @a partial_results contains entries for topic sets: {A,B,D,H}, {A,C,E},
220  * {A,C,F}, {A,C,G}.
221  * When traverse a path, if the next adjacent topic appears on a different path, i.e.,
222  * there is an entry corresponding to that path in @a partial_results that has it in
223  * its topic set, then the two entries are combined into a new entry which contains
224  * topics from both of them. For example, if we are traversing path (A,C,G) and
225  * the next adjacent topic to G is B, which appears in path (A,B,D,H), then entries
226  * keyed by (A,C,G) and (A,B,D,H) are combined into a single entry with key
227  * (A,B,C,D,G,H). Entries (A,C,G) and (A,B,D,H) are removed from @a partial_results.
228  */
229 template<typename Sample, typename TypedDataReader>
232  std::map<TopicSet, SampleVec>& partial_results, SampleVec starting,
233  const TopicSet& seen, const QueryPlan& qp)
234 {
235  using namespace std;
236  const MetaStruct& resulting_meta = getResultingMeta();
237  OPENDDS_STRING this_topic;
238  {
240  this_topic = topicNameFor(qp.data_reader_);
241  }
242  typedef multimap<OPENDDS_STRING, OPENDDS_STRING>::const_iterator iter_t;
243  for (iter_t iter = qp.adjacent_joins_.begin(); iter != qp.adjacent_joins_.end();) {
244  // for each topic we're joining
245  const OPENDDS_STRING& other_topic = iter->first;
246  iter_t range_end = qp.adjacent_joins_.upper_bound(other_topic);
247  const QueryPlan& other_qp = query_plans_[other_topic];
248  DDS::DataReader_ptr other_dr = other_qp.data_reader_;
249  const MetaStruct& other_meta = metaStructFor(other_dr);
250 
251  vector<OPENDDS_STRING> keys;
252  for (; iter != range_end; ++iter) { // for each key in common w/ this topic
253  keys.push_back(iter->second);
254  }
255 
256  typename std::map<TopicSet, SampleVec>::iterator found =
257  find_if(partial_results.begin(), partial_results.end(), Contains(other_topic));
258 
259  if (found == partial_results.end()) { // haven't seen this topic yet
260  partial_results.erase(seen);
261  TopicSet with_join(seen);
262  with_join.insert(other_topic);
263  SampleVec& join_result = partial_results[with_join];
264  for (size_t i = 0; i < starting.size(); ++i) {
265  GenericData other_keys(other_meta);
266  for (size_t j = 0; j < keys.size(); ++j) {
267  other_meta.assign(other_keys.ptr_, keys[j].c_str(),
268  &starting[i], keys[j].c_str(), resulting_meta);
269  }
270  if (!join(join_result, starting[i], keys, other_keys.ptr_, other_dr, other_meta)) {
271  return DDS::RETCODE_ERROR;
272  }
273  }
274 
275  if (!join_result.empty() && !seen.count(other_topic)) {
276  // Recursively join with topics that are adjacent to other_topic.
277  const DDS::ReturnCode_t ret = process_joins(partial_results, join_result,
278  with_join, other_qp);
279  if (ret != DDS::RETCODE_OK) {
280  return ret;
281  }
282  }
283  } else if (!found->first.count(this_topic) /*avoid looping back*/) {
284  // We have partialResults for this topic, use them instead of recursing.
285  // Combine the partial samples for the TopicSet seen and found->first.
286  // Store the result into a new entry keyed with all topics in seen and found->first.
287  // The existing two entries are removed since they are not needed anymore.
288  combine(starting, found->second, keys, found->first);
289  TopicSet new_topics(seen);
290  for (set<OPENDDS_STRING>::const_iterator it = found->first.begin(); it != found->first.end(); ++it) {
291  new_topics.insert(*it);
292  }
293 
294  partial_results.erase(found);
295  partial_results.erase(seen);
296  partial_results[new_topics] = starting;
297  }
298  }
299  return DDS::RETCODE_OK;
300 }
301 
302 template<typename Sample, typename TypedDataReader>
305  std::map<TopicSet, SampleVec>& partial_results, const TopicSet& seen,
306  const QueryPlan& qp)
307 {
308  using namespace std;
309  const MetaStruct& other_meta = metaStructFor(qp.data_reader_);
310  vector<OPENDDS_STRING> no_keys;
311  for (typename std::map<TopicSet, SampleVec>::iterator it_pr = partial_results.begin();
312  it_pr != partial_results.end(); ++it_pr) {
313  SampleVec resulting;
314  for (typename SampleVec::iterator i = it_pr->second.begin(); i != it_pr->second.end(); ++i) {
315  if (!join(resulting, *i, no_keys, 0, qp.data_reader_, other_meta)) {
316  return DDS::RETCODE_ERROR;
317  }
318  }
319  resulting.swap(it_pr->second);
320  }
321 
322  TopicSet with_join(seen);
323  with_join.insert(topicNameFor(qp.data_reader_));
324  partial_results[with_join].swap(partial_results[seen]);
325  partial_results.erase(seen);
326  const DDS::ReturnCode_t ret = process_joins(partial_results, partial_results[with_join],
327  with_join, qp);
328  if (ret != DDS::RETCODE_OK) {
329  partial_results.erase(with_join);
330  return ret;
331  }
332 
333  return DDS::RETCODE_OK;
334 }
335 
336 template<typename Sample, typename TypedDataReader>
337 void
339  const DDS::SampleInfo& info, const char* topic, const MetaStruct& meta)
340 {
341  using namespace std;
342  using namespace DDS;
343  const QueryPlan& qp = query_plans_[topic];
344 
345  // Track results of joins along multiple paths through the MultiTopic keys.
346  std::map<TopicSet, SampleVec> partial_results;
347  TopicSet seen;
348  seen.insert(topic);
349  partial_results[seen].push_back(SampleWithInfo(topic, info));
350  assign_fields(partial_results[seen].back().sample_, sample, qp, meta);
351 
352  DDS::ReturnCode_t ret = process_joins(partial_results, partial_results[seen], seen, qp);
353  if (ret != DDS::RETCODE_OK) {
354  return;
355  }
356 
357  // Any topic we haven't seen needs to be cross-joined
358  for (std::map<OPENDDS_STRING, QueryPlan>::iterator iter = query_plans_.begin();
359  iter != query_plans_.end(); ++iter) {
360  typename std::map<TopicSet, SampleVec>::iterator found =
361  find_if(partial_results.begin(), partial_results.end(), Contains(iter->first));
362  if (found == partial_results.end()) {
363  ret = cross_join(partial_results, seen, iter->second);
364  if (ret != DDS::RETCODE_OK) {
365  return;
366  }
367  }
368  }
369 
370  TypedDataReader* tdr = dynamic_cast<TypedDataReader*>(typed_reader_.in());
371  if (!tdr) {
372  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReader_T::incoming_sample:")
373  ACE_TEXT(" Failed to get TypedDataReader.\n")));
374  return;
375  }
376 
377  for (typename std::map<TopicSet, SampleVec>::iterator it_pr = partial_results.begin();
378  it_pr != partial_results.end(); ++it_pr) {
379  for (typename SampleVec::iterator i = it_pr->second.begin(); i != it_pr->second.end(); ++i) {
380  InstanceHandle_t ih = tdr->store_synthetic_data(i->sample_, i->view_);
381  if (ih != HANDLE_NIL) {
382  typedef std::map<OPENDDS_STRING, InstanceHandle_t>::iterator mapiter_t;
383  for (mapiter_t it_map = i->info_.begin(); it_map != i->info_.end(); ++it_map) {
384  query_plans_[it_map->first].instances_.insert(make_pair(it_map->second, ih));
385  }
386  }
387  }
388  }
389 }
390 
391 // The following methods implement the FooDataReader API by delegating
392 // to the typed_reader_.
393 
394 template<typename Sample, typename TypedDataReader>
400 {
401  return typed_reader_->read(received_data, info_seq, max_samples,
402  sample_states, view_states, instance_states);
403 }
404 
405 template<typename Sample, typename TypedDataReader>
411 {
412  return typed_reader_->take(received_data, info_seq, max_samples,
413  sample_states, view_states, instance_states);
414 }
415 
416 template<typename Sample, typename TypedDataReader>
419  SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
420  CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
421 {
422  return typed_reader_->read_w_condition(data_values, sample_infos,
423  max_samples, a_condition);
424 }
425 
426 template<typename Sample, typename TypedDataReader>
429  SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
430  CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
431 {
432  return typed_reader_->take_w_condition(data_values, sample_infos,
433  max_samples, a_condition);
434 }
435 
436 template<typename Sample, typename TypedDataReader>
439  Sample& received_data, DDS::SampleInfo& sample_info)
440 {
441  return typed_reader_->read_next_sample(received_data, sample_info);
442 }
443 
444 template<typename Sample, typename TypedDataReader>
447  Sample& received_data, DDS::SampleInfo& sample_info)
448 {
449  return typed_reader_->take_next_sample(received_data, sample_info);
450 }
451 
452 template<typename Sample, typename TypedDataReader>
455  SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
459 {
460  return typed_reader_->read_instance(received_data, info_seq, max_samples,
461  a_handle, sample_states, view_states, instance_states);
462 }
463 
464 template<typename Sample, typename TypedDataReader>
467  SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
471 {
472  return typed_reader_->take_instance(received_data, info_seq, max_samples,
473  a_handle, sample_states, view_states, instance_states);
474 }
475 
476 template<typename Sample, typename TypedDataReader>
479  SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
481  DDS::ReadCondition_ptr a_condition)
482 {
483  return typed_reader_->read_instance_w_condition(data_values,
484  sample_infos, max_samples, handle, a_condition);
485 }
486 
487 template<typename Sample, typename TypedDataReader>
490  SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
492  DDS::ReadCondition_ptr a_condition)
493 {
494  return typed_reader_->take_instance_w_condition(data_values,
495  sample_infos, max_samples, handle, a_condition);
496 }
497 
498 template<typename Sample, typename TypedDataReader>
501  SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
505 {
506  return typed_reader_->read_next_instance(received_data, info_seq, max_samples,
507  a_handle, sample_states, view_states, instance_states);
508 }
509 
510 template<typename Sample, typename TypedDataReader>
513  SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
517 {
518  return typed_reader_->take_next_instance(received_data, info_seq, max_samples,
519  a_handle, sample_states, view_states, instance_states);
520 }
521 
522 template<typename Sample, typename TypedDataReader>
525  SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
527  DDS::ReadCondition_ptr a_condition)
528 {
529  return typed_reader_->read_next_instance_w_condition(data_values,
530  sample_infos, max_samples, previous_handle, a_condition);
531 }
532 
533 template<typename Sample, typename TypedDataReader>
536  SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
538  DDS::ReadCondition_ptr a_condition)
539 {
540  return typed_reader_->take_next_instance_w_condition(data_values,
541  sample_infos, max_samples, previous_handle, a_condition);
542 }
543 
544 template<typename Sample, typename TypedDataReader>
547  SampleSeq& received_data, DDS::SampleInfoSeq& info_seq)
548 {
549  return typed_reader_->return_loan(received_data, info_seq);
550 }
551 
552 template<typename Sample, typename TypedDataReader>
555  Sample& key_holder, DDS::InstanceHandle_t handle)
556 {
557  return typed_reader_->get_key_value(key_holder, handle);
558 }
559 
560 template<typename Sample, typename TypedDataReader>
563  const Sample& instance_data)
564 {
565  return typed_reader_->lookup_instance(instance_data);
566 }
567 
568 }
569 }
570 
572 
573 #endif
574 #endif
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
ACE_CDR::Long Long
#define ACE_ERROR(X)
const InstanceHandle_t HANDLE_NIL
virtual size_t numDcpsKeys() const =0
virtual bool compare(const void *lhs, const void *rhs, const char *fieldSpec) const =0
void incoming_sample(void *sample, const DDS::SampleInfo &info, const char *topic, const MetaStruct &meta)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
virtual DDS::TopicDescription_ptr get_topicdescription()
DDS::ReturnCode_t read(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
void assign_fields(Sample &resulting, void *incoming, const QueryPlan &qp, const MetaStruct &meta)
std::vector< SampleWithInfo > SampleVec
virtual void assign(void *lhs, const char *lhsFieldSpec, const void *rhs, const char *rhsFieldSpec, const MetaStruct &rhsMeta) const =0
sequence< SampleInfo > SampleInfoSeq
DDS::ReturnCode_t take_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t take_next_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_next_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition)
DDS::ReturnCode_t take(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
unsigned long InstanceStateMask
DDS::ReturnCode_t read_next_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
bool join(SampleVec &resulting, const SampleWithInfo &prototype, const std::vector< OPENDDS_STRING > &key_names, const void *key_data, DDS::DataReader_ptr other_dr, const MetaStruct &other_meta)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t get_key_value(Sample &key_holder, DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define OPENDDS_STRING
void assign_resulting_fields(Sample &target, const Sample &source, const TopicSet &other_topics)
DDS::ReturnCode_t read_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
InstanceHandle_t instance_handle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
STL namespace.
DDS::ReturnCode_t process_joins(OPENDDS_MAP(TopicSet, SampleVec)&partial_results, SampleVec starting, const TopicSet &seen, const QueryPlan &qp)
Implements the DDS::DataReader interface.
void combine(SampleVec &resulting, const SampleVec &other, const std::vector< OPENDDS_STRING > &key_names, const TopicSet &other_topics)
LM_NOTICE
DDS::ReturnCode_t read_next_sample(Sample &received_data, DDS::SampleInfo &sample_info)
const ViewStateMask ANY_VIEW_STATE
DDS::ReturnCode_t return_loan(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
The End User API.
unsigned long SampleStateMask
DDS::ReturnCode_t take_next_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const SampleStateKind READ_SAMPLE_STATE
ACE_TEXT("TCP_Factory")
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_NO_DATA
OpenDDS_Dcps_Export LogLevel log_level
DDS::ReturnCode_t take_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition)
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DDS::InstanceHandle_t lookup_instance(const Sample &instance_data)
virtual DDS::ReturnCode_t read_instance_generic(void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t read_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition)
const character_type * in(void) const
DDS::ReturnCode_t take_next_sample(Sample &received_data, DDS::SampleInfo &sample_info)
std::multimap< OPENDDS_STRING, OPENDDS_STRING > adjacent_joins_
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
unsigned long ViewStateMask
const InstanceStateKind ALIVE_INSTANCE_STATE
virtual DDS::InstanceHandle_t lookup_instance_generic(const void *data)=0
DDS::ReturnCode_t take_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t read_next_instance_generic(void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
DDS::ReturnCode_t cross_join(OPENDDS_MAP(TopicSet, SampleVec)&partial_results, const TopicSet &seen, const QueryPlan &qp)