00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "Qos_Helper.h"
00011 #include "ReceivedDataElementList.h"
00012 #include "ReceivedDataStrategy.h"
00013 #include "GuidUtils.h"
00014
00015 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00016
00017 namespace {
00018
00019 class CoherentFilter : public OpenDDS::DCPS::ReceivedDataFilter {
00020 public:
00021 CoherentFilter (OpenDDS::DCPS::PublicationId& writer,
00022 OpenDDS::DCPS::RepoId& publisher)
00023 : writer_ (writer),
00024 publisher_ (publisher),
00025 group_coherent_ (! (this->publisher_ == ::OpenDDS::DCPS::GUID_UNKNOWN))
00026 {}
00027
00028 bool operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) {
00029 if (this->group_coherent_) {
00030 return data_sample->coherent_change_
00031 && (this->publisher_ == data_sample->publisher_id_);
00032 }
00033 else {
00034 return data_sample->coherent_change_
00035 && (this->writer_ == data_sample->pub_);
00036 }
00037 }
00038
00039 private:
00040
00041 OpenDDS::DCPS::PublicationId& writer_;
00042 OpenDDS::DCPS::RepoId& publisher_;
00043 bool group_coherent_;
00044 };
00045
00046 class AcceptCoherent : public OpenDDS::DCPS::ReceivedDataOperation {
00047 public:
00048 AcceptCoherent (OpenDDS::DCPS::PublicationId& writer,
00049 OpenDDS::DCPS::RepoId& publisher)
00050 : writer_ (writer),
00051 publisher_ (publisher),
00052 group_coherent_ (! (this->publisher_ == ::OpenDDS::DCPS::GUID_UNKNOWN))
00053 {}
00054
00055 void operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) {
00056
00057
00058 if (this->group_coherent_) {
00059 if (data_sample->coherent_change_
00060 && (this->publisher_ == data_sample->publisher_id_)) {
00061 data_sample->coherent_change_ = false;
00062 }
00063 }
00064 else {
00065 if (data_sample->coherent_change_ && (this->writer_ == data_sample->pub_)) {
00066 data_sample->coherent_change_ = false;
00067 }
00068 }
00069 }
00070
00071 private:
00072
00073 OpenDDS::DCPS::PublicationId& writer_;
00074 OpenDDS::DCPS::RepoId& publisher_;
00075 bool group_coherent_;
00076 };
00077
00078 }
00079
00080 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00081
00082 namespace OpenDDS {
00083 namespace DCPS {
00084
00085 ReceivedDataStrategy::ReceivedDataStrategy(
00086 ReceivedDataElementList& rcvd_samples)
00087 : rcvd_samples_(rcvd_samples)
00088 {}
00089
00090 ReceivedDataStrategy::~ReceivedDataStrategy()
00091 {}
00092
00093 void
00094 ReceivedDataStrategy::add(ReceivedDataElement* data_sample)
00095 {
00096 this->rcvd_samples_.add(data_sample);
00097 }
00098
00099 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00100 void
00101 ReceivedDataStrategy::accept_coherent(PublicationId& writer,
00102 RepoId& publisher)
00103 {
00104 CoherentFilter filter = CoherentFilter(writer, publisher);
00105 AcceptCoherent operation = AcceptCoherent(writer, publisher);
00106 this->rcvd_samples_.apply_all(filter, operation);
00107 }
00108
00109 void
00110 ReceivedDataStrategy::reject_coherent(PublicationId& writer,
00111 RepoId& publisher)
00112 {
00113 CoherentFilter filter = CoherentFilter(writer, publisher);
00114 this->rcvd_samples_.remove(filter, true);
00115 }
00116 #endif
00117
00118 ReceptionDataStrategy::ReceptionDataStrategy(
00119 ReceivedDataElementList& rcvd_samples)
00120 : ReceivedDataStrategy(rcvd_samples)
00121 {}
00122
00123 ReceptionDataStrategy::~ReceptionDataStrategy()
00124 {}
00125
00126 SourceDataStrategy::SourceDataStrategy(
00127 ReceivedDataElementList& rcvd_samples)
00128 : ReceivedDataStrategy(rcvd_samples)
00129 {}
00130
00131 SourceDataStrategy::~SourceDataStrategy()
00132 {}
00133
00134 void
00135 SourceDataStrategy::add(ReceivedDataElement* data_sample)
00136 {
00137 for (ReceivedDataElement* it = this->rcvd_samples_.head_;
00138 it != 0; it = it->next_data_sample_) {
00139 if (data_sample->source_timestamp_ < it->source_timestamp_) {
00140 data_sample->previous_data_sample_ = it->previous_data_sample_;
00141 data_sample->next_data_sample_ = it;
00142
00143
00144 if (it->previous_data_sample_ == 0) {
00145 this->rcvd_samples_.head_ = data_sample;
00146
00147 } else {
00148 it->previous_data_sample_->next_data_sample_ = data_sample;
00149 }
00150
00151 it->previous_data_sample_ = data_sample;
00152
00153 ++this->rcvd_samples_.size_;
00154
00155 return;
00156 }
00157 }
00158
00159 this->rcvd_samples_.add(data_sample);
00160 }
00161
00162 }
00163 }