ReceivedDataStrategy.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "Time_Helper.h"
00011 #include "ReceivedDataElementList.h"
00012 #include "ReceivedDataStrategy.h"
00013 #include "GuidUtils.h"
00014
00015 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00016
00017 namespace {
00018
00019 class CoherentFilter : public OpenDDS::DCPS::ReceivedDataFilter {
00020 public:
00021 CoherentFilter (OpenDDS::DCPS::PublicationId& writer,
00022 OpenDDS::DCPS::RepoId& publisher)
00023 : writer_ (writer),
00024 publisher_ (publisher),
00025 group_coherent_ (! (this->publisher_ == ::OpenDDS::DCPS::GUID_UNKNOWN))
00026 {}
00027
00028 bool operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) {
00029 if (this->group_coherent_) {
00030 return data_sample->coherent_change_
00031 && (this->publisher_ == data_sample->publisher_id_);
00032 }
00033 else {
00034 return data_sample->coherent_change_
00035 && (this->writer_ == data_sample->pub_);
00036 }
00037 }
00038
00039 private:
00040
00041 OpenDDS::DCPS::PublicationId& writer_;
00042 OpenDDS::DCPS::RepoId& publisher_;
00043 bool group_coherent_;
00044 };
00045
00046 class AcceptCoherent : public OpenDDS::DCPS::ReceivedDataOperation {
00047 public:
00048 AcceptCoherent (OpenDDS::DCPS::PublicationId& writer,
00049 OpenDDS::DCPS::RepoId& publisher)
00050 : writer_ (writer),
00051 publisher_ (publisher),
00052 group_coherent_ (! (this->publisher_ == ::OpenDDS::DCPS::GUID_UNKNOWN))
00053 {}
00054
00055 void operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) {
00056
00057
00058 if (this->group_coherent_) {
00059 if (data_sample->coherent_change_
00060 && (this->publisher_ == data_sample->publisher_id_)) {
00061 data_sample->coherent_change_ = false;
00062 }
00063 }
00064 else {
00065 if (data_sample->coherent_change_ && (this->writer_ == data_sample->pub_)) {
00066 data_sample->coherent_change_ = false;
00067 }
00068 }
00069 }
00070
00071 private:
00072
00073 OpenDDS::DCPS::PublicationId& writer_;
00074 OpenDDS::DCPS::RepoId& publisher_;
00075 bool group_coherent_;
00076 };
00077
00078 }
00079
00080 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00081
00082 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00083
00084 namespace OpenDDS {
00085 namespace DCPS {
00086
00087 ReceivedDataStrategy::ReceivedDataStrategy(
00088 ReceivedDataElementList& rcvd_samples)
00089 : rcvd_samples_(rcvd_samples)
00090 {}
00091
00092 ReceivedDataStrategy::~ReceivedDataStrategy()
00093 {}
00094
00095 void
00096 ReceivedDataStrategy::add(ReceivedDataElement* data_sample)
00097 {
00098 this->rcvd_samples_.add(data_sample);
00099 }
00100
00101 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00102 void
00103 ReceivedDataStrategy::accept_coherent(PublicationId& writer,
00104 RepoId& publisher)
00105 {
00106 CoherentFilter filter = CoherentFilter(writer, publisher);
00107 AcceptCoherent operation = AcceptCoherent(writer, publisher);
00108 this->rcvd_samples_.apply_all(filter, operation);
00109 }
00110
00111 void
00112 ReceivedDataStrategy::reject_coherent(PublicationId& writer,
00113 RepoId& publisher)
00114 {
00115 CoherentFilter filter = CoherentFilter(writer, publisher);
00116 this->rcvd_samples_.remove(filter, true);
00117 }
00118 #endif
00119
00120 ReceptionDataStrategy::ReceptionDataStrategy(
00121 ReceivedDataElementList& rcvd_samples)
00122 : ReceivedDataStrategy(rcvd_samples)
00123 {}
00124
00125 ReceptionDataStrategy::~ReceptionDataStrategy()
00126 {}
00127
00128 SourceDataStrategy::SourceDataStrategy(
00129 ReceivedDataElementList& rcvd_samples)
00130 : ReceivedDataStrategy(rcvd_samples)
00131 {}
00132
00133 SourceDataStrategy::~SourceDataStrategy()
00134 {}
00135
00136 void
00137 SourceDataStrategy::add(ReceivedDataElement* data_sample)
00138 {
00139 for (ReceivedDataElement* it = this->rcvd_samples_.head_;
00140 it != 0; it = it->next_data_sample_) {
00141 if (data_sample->source_timestamp_ < it->source_timestamp_) {
00142 data_sample->previous_data_sample_ = it->previous_data_sample_;
00143 data_sample->next_data_sample_ = it;
00144
00145
00146 if (it->previous_data_sample_ == 0) {
00147 this->rcvd_samples_.head_ = data_sample;
00148
00149 } else {
00150 it->previous_data_sample_->next_data_sample_ = data_sample;
00151 }
00152
00153 it->previous_data_sample_ = data_sample;
00154
00155 ++this->rcvd_samples_.size_;
00156
00157 return;
00158 }
00159 }
00160
00161 this->rcvd_samples_.add(data_sample);
00162 }
00163
00164 }
00165 }
00166
00167 OPENDDS_END_VERSIONED_NAMESPACE_DECL