OpenDDS  Snapshot(2023/04/28-20:55)
ReceivedDataStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "Time_Helper.h"
12 #include "ReceivedDataStrategy.h"
13 #include "GuidUtils.h"
14 
15 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
16 
17 namespace {
18 
19 class CoherentFilter : public OpenDDS::DCPS::ReceivedDataFilter {
20 public:
21  CoherentFilter(const OpenDDS::DCPS::GUID_t& writer,
22  const OpenDDS::DCPS::GUID_t& publisher)
23  : writer_(writer),
24  publisher_(publisher),
25  group_coherent_(publisher_ != ::OpenDDS::DCPS::GUID_UNKNOWN)
26  {}
27 
29  {
30  if (group_coherent_) {
31  return data_sample->coherent_change_
32  && (publisher_ == data_sample->publisher_id_);
33  } else {
34  return data_sample->coherent_change_
35  && (writer_ == data_sample->pub_);
36  }
37  }
38 
39 private:
40 
41  const OpenDDS::DCPS::GUID_t& writer_;
42  const OpenDDS::DCPS::GUID_t& publisher_;
43  bool group_coherent_;
44 };
45 
46 class AcceptCoherent : public OpenDDS::DCPS::ReceivedDataOperation {
47 public:
48  AcceptCoherent(const OpenDDS::DCPS::GUID_t& writer,
49  const OpenDDS::DCPS::GUID_t& publisher,
51  : writer_(writer)
52  , publisher_(publisher)
53  , rdel_(rdel)
54  , group_coherent_(publisher_ != ::OpenDDS::DCPS::GUID_UNKNOWN)
55  {}
56 
57  void operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample)
58  {
59  // Clear coherent_change_ flag; this makes
60  // the data available for read/take operations.
61  if (group_coherent_) {
62  if (data_sample->coherent_change_
63  && (publisher_ == data_sample->publisher_id_)) {
64  rdel_->accept_coherent_change(data_sample);
65  }
66  } else {
67  if (data_sample->coherent_change_ && (writer_ == data_sample->pub_)) {
68  rdel_->accept_coherent_change(data_sample);
69  }
70  }
71  }
72 
73 private:
74 
75  const OpenDDS::DCPS::GUID_t& writer_;
76  const OpenDDS::DCPS::GUID_t& publisher_;
78  bool group_coherent_;
79 
80 };
81 
82 } // namespace
83 
84 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
85 
87 
88 namespace OpenDDS {
89 namespace DCPS {
90 
91 ReceivedDataStrategy::ReceivedDataStrategy(
92  ReceivedDataElementList& rcvd_samples)
93  : rcvd_samples_(rcvd_samples)
94 {}
95 
97 {}
98 
99 void
101 {
102  this->rcvd_samples_.add(data_sample);
103 }
104 
105 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
106 void
108  const GUID_t& publisher)
109 {
110  CoherentFilter filter(writer, publisher);
111  AcceptCoherent operation(writer, publisher, &rcvd_samples_);
112  rcvd_samples_.apply_all(filter, operation);
113 }
114 
115 void
117  const GUID_t& publisher)
118 {
119  CoherentFilter filter(writer, publisher);
120  this->rcvd_samples_.remove(filter, true);
121 }
122 #endif
123 
125  ReceivedDataElementList& rcvd_samples)
126  : ReceivedDataStrategy(rcvd_samples)
127 {}
128 
130 {}
131 
133  ReceivedDataElementList& rcvd_samples)
134  : ReceivedDataStrategy(rcvd_samples)
135 {}
136 
138 {}
139 
140 void
142 {
143  rcvd_samples_.add_by_timestamp(data_sample);
144 }
145 
146 } // namespace DCPS
147 } // namespace OpenDDS
148 
virtual void add(ReceivedDataElement *data_sample)
void add_by_timestamp(ReceivedDataElement *data_sample)
ReceivedDataElementList & rcvd_samples_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void add(ReceivedDataElement *data_sample)
virtual void reject_coherent(const GUID_t &writer, const GUID_t &publisher)
virtual void add(ReceivedDataElement *data_sample)
virtual void accept_coherent(const GUID_t &writer, const GUID_t &publisher)
SourceDataStrategy(ReceivedDataElementList &rcvd_samples)
virtual bool operator()(ReceivedDataElement *data_sample)=0
GUID_t publisher_id_
Publisher id represent group identifier.
bool remove(ReceivedDataElement *data_sample)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void apply_all(ReceivedDataFilter &match, ReceivedDataOperation &func)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
bool coherent_change_
Sample belongs to an active coherent change set.
ReceptionDataStrategy(ReceivedDataElementList &rcvd_samples)