ReceivedDataStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "Time_Helper.h"
00011 #include "ReceivedDataElementList.h"
00012 #include "ReceivedDataStrategy.h"
00013 #include "GuidUtils.h"
00014 
00015 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00016 
00017 namespace {
00018 
00019 class CoherentFilter : public OpenDDS::DCPS::ReceivedDataFilter {
00020 public:
00021   CoherentFilter (OpenDDS::DCPS::PublicationId& writer,
00022                   OpenDDS::DCPS::RepoId& publisher)
00023   : writer_ (writer),
00024     publisher_ (publisher),
00025     group_coherent_ (! (this->publisher_ == ::OpenDDS::DCPS::GUID_UNKNOWN))
00026   {}
00027 
00028   bool operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) {
00029     if (this->group_coherent_) {
00030       return data_sample->coherent_change_
00031              && (this->publisher_ == data_sample->publisher_id_);
00032     }
00033     else {
00034       return data_sample->coherent_change_
00035              && (this->writer_ == data_sample->pub_);
00036     }
00037   }
00038 
00039 private:
00040 
00041   OpenDDS::DCPS::PublicationId& writer_;
00042   OpenDDS::DCPS::RepoId& publisher_;
00043   bool group_coherent_;
00044 };
00045 
00046 class AcceptCoherent : public OpenDDS::DCPS::ReceivedDataOperation {
00047 public:
00048   AcceptCoherent (OpenDDS::DCPS::PublicationId& writer,
00049                   OpenDDS::DCPS::RepoId& publisher)
00050   : writer_ (writer),
00051     publisher_ (publisher),
00052     group_coherent_ (! (this->publisher_ == ::OpenDDS::DCPS::GUID_UNKNOWN))
00053   {}
00054 
00055   void operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) {
00056     // Clear coherent_change_ flag; this makes
00057     // the data available for read/take operations.
00058     if (this->group_coherent_) {
00059       if (data_sample->coherent_change_
00060           && (this->publisher_ == data_sample->publisher_id_)) {
00061         data_sample->coherent_change_ = false;
00062       }
00063     }
00064     else {
00065       if (data_sample->coherent_change_ && (this->writer_ == data_sample->pub_)) {
00066         data_sample->coherent_change_ = false;
00067       }
00068     }
00069   }
00070 
00071 private:
00072 
00073   OpenDDS::DCPS::PublicationId& writer_;
00074   OpenDDS::DCPS::RepoId& publisher_;
00075   bool group_coherent_;
00076 };
00077 
00078 } // namespace
00079 
00080 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00081 
00082 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00083 
00084 namespace OpenDDS {
00085 namespace DCPS {
00086 
00087 ReceivedDataStrategy::ReceivedDataStrategy(
00088   ReceivedDataElementList& rcvd_samples)
00089   : rcvd_samples_(rcvd_samples)
00090 {}
00091 
00092 ReceivedDataStrategy::~ReceivedDataStrategy()
00093 {}
00094 
00095 void
00096 ReceivedDataStrategy::add(ReceivedDataElement* data_sample)
00097 {
00098   this->rcvd_samples_.add(data_sample);
00099 }
00100 
00101 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00102 void
00103 ReceivedDataStrategy::accept_coherent(PublicationId& writer,
00104                                       RepoId& publisher)
00105 {
00106   CoherentFilter    filter = CoherentFilter(writer, publisher);
00107   AcceptCoherent operation = AcceptCoherent(writer, publisher);
00108   this->rcvd_samples_.apply_all(filter, operation);
00109 }
00110 
00111 void
00112 ReceivedDataStrategy::reject_coherent(PublicationId& writer,
00113                                       RepoId& publisher)
00114 {
00115   CoherentFilter filter = CoherentFilter(writer, publisher);
00116   this->rcvd_samples_.remove(filter, true);
00117 }
00118 #endif
00119 
00120 ReceptionDataStrategy::ReceptionDataStrategy(
00121   ReceivedDataElementList& rcvd_samples)
00122   : ReceivedDataStrategy(rcvd_samples)
00123 {}
00124 
00125 ReceptionDataStrategy::~ReceptionDataStrategy()
00126 {}
00127 
00128 SourceDataStrategy::SourceDataStrategy(
00129   ReceivedDataElementList& rcvd_samples)
00130   : ReceivedDataStrategy(rcvd_samples)
00131 {}
00132 
00133 SourceDataStrategy::~SourceDataStrategy()
00134 {}
00135 
00136 void
00137 SourceDataStrategy::add(ReceivedDataElement* data_sample)
00138 {
00139   for (ReceivedDataElement* it = this->rcvd_samples_.head_;
00140        it != 0; it = it->next_data_sample_) {
00141     if (data_sample->source_timestamp_ < it->source_timestamp_) {
00142       data_sample->previous_data_sample_ = it->previous_data_sample_;
00143       data_sample->next_data_sample_ = it;
00144 
00145       // Are we replacing the head?
00146       if (it->previous_data_sample_ == 0) {
00147         this->rcvd_samples_.head_ = data_sample;
00148 
00149       } else {
00150         it->previous_data_sample_->next_data_sample_ = data_sample;
00151       }
00152 
00153       it->previous_data_sample_ = data_sample;
00154 
00155       ++this->rcvd_samples_.size_;
00156 
00157       return;
00158     }
00159   }
00160 
00161   this->rcvd_samples_.add(data_sample);
00162 }
00163 
00164 } // namespace DCPS
00165 } // namespace OpenDDS
00166 
00167 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1