Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : 10 : #include "Time_Helper.h" 11 : #include "ReceivedDataElementList.h" 12 : #include "ReceivedDataStrategy.h" 13 : #include "GuidUtils.h" 14 : 15 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 16 : 17 : namespace { 18 : 19 : class CoherentFilter : public OpenDDS::DCPS::ReceivedDataFilter { 20 : public: 21 0 : CoherentFilter(const OpenDDS::DCPS::GUID_t& writer, 22 : const OpenDDS::DCPS::GUID_t& publisher) 23 0 : : writer_(writer), 24 0 : publisher_(publisher), 25 0 : group_coherent_(publisher_ != ::OpenDDS::DCPS::GUID_UNKNOWN) 26 0 : {} 27 : 28 0 : bool operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) 29 : { 30 0 : if (group_coherent_) { 31 0 : return data_sample->coherent_change_ 32 0 : && (publisher_ == data_sample->publisher_id_); 33 : } else { 34 0 : return data_sample->coherent_change_ 35 0 : && (writer_ == data_sample->pub_); 36 : } 37 : } 38 : 39 : private: 40 : 41 : const OpenDDS::DCPS::GUID_t& writer_; 42 : const OpenDDS::DCPS::GUID_t& publisher_; 43 : bool group_coherent_; 44 : }; 45 : 46 : class AcceptCoherent : public OpenDDS::DCPS::ReceivedDataOperation { 47 : public: 48 0 : AcceptCoherent(const OpenDDS::DCPS::GUID_t& writer, 49 : const OpenDDS::DCPS::GUID_t& publisher, 50 : OpenDDS::DCPS::ReceivedDataElementList* rdel) 51 0 : : writer_(writer) 52 0 : , publisher_(publisher) 53 0 : , rdel_(rdel) 54 0 : , group_coherent_(publisher_ != ::OpenDDS::DCPS::GUID_UNKNOWN) 55 0 : {} 56 : 57 0 : void operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) 58 : { 59 : // Clear coherent_change_ flag; this makes 60 : // the data available for read/take operations. 61 0 : if (group_coherent_) { 62 0 : if (data_sample->coherent_change_ 63 0 : && (publisher_ == data_sample->publisher_id_)) { 64 0 : rdel_->accept_coherent_change(data_sample); 65 : } 66 : } else { 67 0 : if (data_sample->coherent_change_ && (writer_ == data_sample->pub_)) { 68 0 : rdel_->accept_coherent_change(data_sample); 69 : } 70 : } 71 0 : } 72 : 73 : private: 74 : 75 : const OpenDDS::DCPS::GUID_t& writer_; 76 : const OpenDDS::DCPS::GUID_t& publisher_; 77 : OpenDDS::DCPS::ReceivedDataElementList* rdel_; 78 : bool group_coherent_; 79 : 80 : }; 81 : 82 : } // namespace 83 : 84 : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 85 : 86 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 87 : 88 : namespace OpenDDS { 89 : namespace DCPS { 90 : 91 0 : ReceivedDataStrategy::ReceivedDataStrategy( 92 0 : ReceivedDataElementList& rcvd_samples) 93 0 : : rcvd_samples_(rcvd_samples) 94 0 : {} 95 : 96 0 : ReceivedDataStrategy::~ReceivedDataStrategy() 97 0 : {} 98 : 99 : void 100 0 : ReceivedDataStrategy::add(ReceivedDataElement* data_sample) 101 : { 102 0 : this->rcvd_samples_.add(data_sample); 103 0 : } 104 : 105 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 106 : void 107 0 : ReceivedDataStrategy::accept_coherent(const GUID_t& writer, 108 : const GUID_t& publisher) 109 : { 110 0 : CoherentFilter filter(writer, publisher); 111 0 : AcceptCoherent operation(writer, publisher, &rcvd_samples_); 112 0 : rcvd_samples_.apply_all(filter, operation); 113 0 : } 114 : 115 : void 116 0 : ReceivedDataStrategy::reject_coherent(const GUID_t& writer, 117 : const GUID_t& publisher) 118 : { 119 0 : CoherentFilter filter(writer, publisher); 120 0 : this->rcvd_samples_.remove(filter, true); 121 0 : } 122 : #endif 123 : 124 0 : ReceptionDataStrategy::ReceptionDataStrategy( 125 0 : ReceivedDataElementList& rcvd_samples) 126 0 : : ReceivedDataStrategy(rcvd_samples) 127 0 : {} 128 : 129 0 : ReceptionDataStrategy::~ReceptionDataStrategy() 130 0 : {} 131 : 132 0 : SourceDataStrategy::SourceDataStrategy( 133 0 : ReceivedDataElementList& rcvd_samples) 134 0 : : ReceivedDataStrategy(rcvd_samples) 135 0 : {} 136 : 137 0 : SourceDataStrategy::~SourceDataStrategy() 138 0 : {} 139 : 140 : void 141 0 : SourceDataStrategy::add(ReceivedDataElement* data_sample) 142 : { 143 0 : rcvd_samples_.add_by_timestamp(data_sample); 144 0 : } 145 : 146 : } // namespace DCPS 147 : } // namespace OpenDDS 148 : 149 : OPENDDS_END_VERSIONED_NAMESPACE_DECL