OpenDDS  Snapshot(2023/04/28-20:55)
GroupRakeData.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 #include "GroupRakeData.h"
10 #include "SubscriptionInstance.h"
11 #include "DataReaderImpl.h"
12 #include "QueryConditionImpl.h"
13 
15 
16 namespace OpenDDS {
17 namespace DCPS {
18 
20 {
21 }
22 
23 
26  SubscriptionInstance_rch instance,
27  size_t index_in_instance)
28 {
29  // Ignore DISPOSE and UNREGISTER messages in case they are sent
30  // in the group coherent changes, but it shouldn't.
31  if (!sample->valid_data_) {
32  return false;
33  }
34 
35  RakeData rd = {sample, rdel, instance, index_in_instance};
36  this->sorted_.insert(rd);
37 
38  this->current_sample_ = this->sorted_.begin();
39  return true;
40 }
41 
42 
43 void
45 {
46  ACE_UNUSED_ARG(readers);
47 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
48  readers.length(static_cast<CORBA::ULong>(this->sorted_.size()));
49  CORBA::ULong i = 0;
50  SortedSet::iterator itEnd = this->sorted_.end();
51  for (SortedSet::iterator it = this->sorted_.begin(); it != itEnd; ++it) {
52  RcHandle<DDS::DataReader> reader = it->si_->instance_state_->data_reader().lock();
53  if (reader) {
54  readers[i++] = DDS::DataReader::_duplicate(reader.in());
55  } else {
56  readers.length(readers.length() - 1);
57  }
58  }
59 #endif
60 }
61 
62 
63 void
65 {
66  this->sorted_.clear();
67 }
68 
69 
72 {
73  RakeData data = *this->current_sample_;
74  ++this->current_sample_;
75  return data;
76 }
77 
78 } // namespace DCPS
79 } // namespace OpenDDS
80 
void get_datareaders(DDS::DataReaderSeq &readers)
bool valid_data_
Do we contain valid data.
bool insert_sample(ReceivedDataElement *sample, ReceivedDataElementList *rdel, SubscriptionInstance_rch i, size_t index_in_instance)
ACE_CDR::ULong ULong
SortedSet::iterator current_sample_
Definition: GroupRakeData.h:79
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
sequence< DataReader > DataReaderSeq
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28