ReceivedDataStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "Qos_Helper.h"
00011 #include "ReceivedDataElementList.h"
00012 #include "ReceivedDataStrategy.h"
00013 #include "GuidUtils.h"
00014 
00015 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00016 
00017 namespace {
00018 
00019 class CoherentFilter : public OpenDDS::DCPS::ReceivedDataFilter {
00020 public:
00021   CoherentFilter (OpenDDS::DCPS::PublicationId& writer,
00022                   OpenDDS::DCPS::RepoId& publisher)
00023   : writer_ (writer),
00024     publisher_ (publisher),
00025     group_coherent_ (! (this->publisher_ == ::OpenDDS::DCPS::GUID_UNKNOWN))
00026   {}
00027 
00028   bool operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) {
00029     if (this->group_coherent_) {
00030       return data_sample->coherent_change_
00031              && (this->publisher_ == data_sample->publisher_id_);
00032     }
00033     else {
00034       return data_sample->coherent_change_
00035              && (this->writer_ == data_sample->pub_);
00036     }
00037   }
00038 
00039 private:
00040 
00041   OpenDDS::DCPS::PublicationId& writer_;
00042   OpenDDS::DCPS::RepoId& publisher_;
00043   bool group_coherent_;
00044 };
00045 
00046 class AcceptCoherent : public OpenDDS::DCPS::ReceivedDataOperation {
00047 public:
00048   AcceptCoherent (OpenDDS::DCPS::PublicationId& writer,
00049                   OpenDDS::DCPS::RepoId& publisher)
00050   : writer_ (writer),
00051     publisher_ (publisher),
00052     group_coherent_ (! (this->publisher_ == ::OpenDDS::DCPS::GUID_UNKNOWN))
00053   {}
00054 
00055   void operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) {
00056     // Clear coherent_change_ flag; this makes
00057     // the data available for read/take operations.
00058     if (this->group_coherent_) {
00059       if (data_sample->coherent_change_
00060           && (this->publisher_ == data_sample->publisher_id_)) {
00061         data_sample->coherent_change_ = false;
00062       }
00063     }
00064     else {
00065       if (data_sample->coherent_change_ && (this->writer_ == data_sample->pub_)) {
00066         data_sample->coherent_change_ = false;
00067       }
00068     }
00069   }
00070 
00071 private:
00072 
00073   OpenDDS::DCPS::PublicationId& writer_;
00074   OpenDDS::DCPS::RepoId& publisher_;
00075   bool group_coherent_;
00076 };
00077 
00078 } // namespace
00079 
00080 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00081 
00082 namespace OpenDDS {
00083 namespace DCPS {
00084 
00085 ReceivedDataStrategy::ReceivedDataStrategy(
00086   ReceivedDataElementList& rcvd_samples)
00087   : rcvd_samples_(rcvd_samples)
00088 {}
00089 
00090 ReceivedDataStrategy::~ReceivedDataStrategy()
00091 {}
00092 
00093 void
00094 ReceivedDataStrategy::add(ReceivedDataElement* data_sample)
00095 {
00096   this->rcvd_samples_.add(data_sample);
00097 }
00098 
00099 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00100 void
00101 ReceivedDataStrategy::accept_coherent(PublicationId& writer,
00102                                       RepoId& publisher)
00103 {
00104   CoherentFilter    filter = CoherentFilter(writer, publisher);
00105   AcceptCoherent operation = AcceptCoherent(writer, publisher);
00106   this->rcvd_samples_.apply_all(filter, operation);
00107 }
00108 
00109 void
00110 ReceivedDataStrategy::reject_coherent(PublicationId& writer,
00111                                       RepoId& publisher)
00112 {
00113   CoherentFilter filter = CoherentFilter(writer, publisher);
00114   this->rcvd_samples_.remove(filter, true);
00115 }
00116 #endif
00117 
00118 ReceptionDataStrategy::ReceptionDataStrategy(
00119   ReceivedDataElementList& rcvd_samples)
00120   : ReceivedDataStrategy(rcvd_samples)
00121 {}
00122 
00123 ReceptionDataStrategy::~ReceptionDataStrategy()
00124 {}
00125 
00126 SourceDataStrategy::SourceDataStrategy(
00127   ReceivedDataElementList& rcvd_samples)
00128   : ReceivedDataStrategy(rcvd_samples)
00129 {}
00130 
00131 SourceDataStrategy::~SourceDataStrategy()
00132 {}
00133 
00134 void
00135 SourceDataStrategy::add(ReceivedDataElement* data_sample)
00136 {
00137   for (ReceivedDataElement* it = this->rcvd_samples_.head_;
00138        it != 0; it = it->next_data_sample_) {
00139     if (data_sample->source_timestamp_ < it->source_timestamp_) {
00140       data_sample->previous_data_sample_ = it->previous_data_sample_;
00141       data_sample->next_data_sample_ = it;
00142 
00143       // Are we replacing the head?
00144       if (it->previous_data_sample_ == 0) {
00145         this->rcvd_samples_.head_ = data_sample;
00146 
00147       } else {
00148         it->previous_data_sample_->next_data_sample_ = data_sample;
00149       }
00150 
00151       it->previous_data_sample_ = data_sample;
00152 
00153       ++this->rcvd_samples_.size_;
00154 
00155       return;
00156     }
00157   }
00158 
00159   this->rcvd_samples_.add(data_sample);
00160 }
00161 
00162 } // namespace DCPS
00163 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7