Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 8 : #include "ReceivedDataElementList.h" 9 : 10 : #include "DataReaderImpl.h" 11 : 12 : #if !defined (__ACE_INLINE__) 13 : # include "ReceivedDataElementList.inl" 14 : #endif /* !__ACE_INLINE__ */ 15 : 16 : namespace { 17 : 18 : class IdentityFilter 19 : : public OpenDDS::DCPS::ReceivedDataFilter { 20 : public: 21 : explicit IdentityFilter(OpenDDS::DCPS::ReceivedDataElement* data_sample) 22 : : data_sample_(data_sample) 23 : {} 24 : 25 : bool 26 : operator()(OpenDDS::DCPS::ReceivedDataElement* data_sample) { 27 : return this->data_sample_ == data_sample; 28 : } 29 : 30 : private: 31 : OpenDDS::DCPS::ReceivedDataElement* data_sample_; 32 : }; 33 : 34 : } // namespace 35 : 36 : 37 0 : void* OpenDDS::DCPS::ReceivedDataElement::operator new(size_t , ACE_New_Allocator& pool) 38 : { 39 0 : OpenDDS::DCPS::ReceivedDataElementMemoryBlock* block = static_cast<OpenDDS::DCPS::ReceivedDataElementMemoryBlock*>(pool.malloc(sizeof(OpenDDS::DCPS::ReceivedDataElementMemoryBlock))); 40 0 : block->allocator_ = &pool; 41 0 : return block; 42 : } 43 : 44 0 : void OpenDDS::DCPS::ReceivedDataElement::operator delete(void* memory) 45 : { 46 0 : if (memory) { 47 0 : OpenDDS::DCPS::ReceivedDataElementMemoryBlock* block = static_cast<OpenDDS::DCPS::ReceivedDataElementMemoryBlock*>(memory); 48 0 : block->allocator_->free(block); 49 : } 50 0 : } 51 : 52 0 : void OpenDDS::DCPS::ReceivedDataElement::operator delete(void* memory, ACE_New_Allocator&) 53 : { 54 0 : operator delete(memory); 55 0 : } 56 : 57 0 : OpenDDS::DCPS::ReceivedDataElementList::ReceivedDataElementList(const DataReaderImpl_rch& reader, const InstanceState_rch& instance_state) 58 0 : : reader_(reader), head_(0), tail_(0), size_(0) 59 0 : , read_sample_count_(0), not_read_sample_count_(0), sample_states_(0) 60 0 : , instance_state_(instance_state) 61 : { 62 0 : } 63 : 64 0 : OpenDDS::DCPS::ReceivedDataElementList::~ReceivedDataElementList() 65 : { 66 : // The memory pointed to by instance_state_ is owned by 67 : // another object. 68 0 : } 69 : 70 : void 71 0 : OpenDDS::DCPS::ReceivedDataElementList::add_by_timestamp(ReceivedDataElement *data_sample) 72 : { 73 0 : data_sample->previous_data_sample_ = 0; 74 0 : data_sample->next_data_sample_ = 0; 75 : 76 0 : for (ReceivedDataElement* it = head_; it != 0; it = it->next_data_sample_) { 77 0 : if (data_sample->source_timestamp_ < it->source_timestamp_) { 78 0 : data_sample->previous_data_sample_ = it->previous_data_sample_; 79 0 : data_sample->next_data_sample_ = it; 80 : 81 : // Are we replacing the head? 82 0 : if (it->previous_data_sample_ == 0) { 83 0 : head_ = data_sample; 84 : } else { 85 0 : it->previous_data_sample_->next_data_sample_ = data_sample; 86 : } 87 0 : it->previous_data_sample_ = data_sample; 88 : 89 0 : ++size_; 90 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 91 0 : if (!data_sample->coherent_change_) 92 : #endif 93 : { 94 0 : if (data_sample->sample_state_ == DDS::NOT_READ_SAMPLE_STATE) { 95 0 : increment_not_read_count(); 96 : } else { 97 0 : increment_read_count(); 98 : } 99 : } 100 : 101 0 : return; 102 : } 103 : } 104 : 105 0 : add(data_sample); 106 : } 107 : 108 : void 109 0 : OpenDDS::DCPS::ReceivedDataElementList::apply_all( 110 : ReceivedDataFilter& match, 111 : ReceivedDataOperation& op) 112 : { 113 0 : for (ReceivedDataElement* it = head_; it != 0; it = it->next_data_sample_) { 114 0 : if (match(it)) { 115 0 : op(it); 116 : } 117 : } 118 0 : } 119 : 120 : bool 121 0 : OpenDDS::DCPS::ReceivedDataElementList::remove( 122 : ReceivedDataFilter& match, 123 : bool eval_all) 124 : { 125 0 : if (!head_) { 126 0 : return false; 127 : } 128 : 129 0 : bool released = false; 130 : 131 0 : for (ReceivedDataElement* item = head_; item != 0; item = item->next_data_sample_) { 132 0 : if (match(item)) { 133 0 : released = released || remove(item); 134 0 : if (!eval_all) break; 135 : } 136 : } 137 : 138 0 : return released; 139 : } 140 : 141 : bool 142 0 : OpenDDS::DCPS::ReceivedDataElementList::remove(ReceivedDataElement* item) 143 : { 144 0 : OPENDDS_ASSERT(sanity_check(item)); 145 : 146 0 : if (!head_) { 147 0 : return false; 148 : } 149 : 150 0 : bool released = false; 151 : 152 0 : size_--; 153 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 154 0 : if (!item->coherent_change_) 155 : #endif 156 : { 157 0 : if (item->sample_state_ == DDS::NOT_READ_SAMPLE_STATE) { 158 0 : decrement_not_read_count(); 159 : } else { 160 0 : decrement_read_count(); 161 : } 162 : } 163 0 : if (item == head_) { 164 0 : if (head_ == tail_) { 165 0 : head_ = tail_ = 0; 166 : 167 : } else { 168 0 : head_ = item->next_data_sample_; 169 : 170 0 : if (head_) { 171 0 : head_->previous_data_sample_ = 0; 172 : } 173 : } 174 : 175 0 : } else if (item == tail_) { 176 0 : tail_ = item->previous_data_sample_; 177 : 178 0 : if (tail_) { 179 0 : tail_->next_data_sample_ = 0; 180 : } 181 : 182 : } else { 183 0 : item->previous_data_sample_->next_data_sample_ = 184 0 : item->next_data_sample_; 185 0 : item->next_data_sample_->previous_data_sample_ = 186 0 : item->previous_data_sample_; 187 : } 188 : 189 0 : item->previous_data_sample_ = 0; 190 0 : item->next_data_sample_ = 0; 191 : 192 0 : if (instance_state_ && size_ == 0) { 193 : // let the instance know it is empty 194 0 : released = instance_state_->empty(true); 195 : } 196 : 197 0 : return released; 198 : } 199 : 200 : bool 201 0 : OpenDDS::DCPS::ReceivedDataElementList::has_zero_copies() const 202 : { 203 0 : for (ReceivedDataElement* item = head_; item != 0; item = item->next_data_sample_) { 204 0 : if (item->zero_copy_cnt_) { 205 0 : return true; 206 : } 207 : } 208 0 : return false; 209 : } 210 : 211 : bool 212 0 : OpenDDS::DCPS::ReceivedDataElementList::matches(CORBA::ULong sample_states) const 213 : { 214 0 : return sample_states_ & sample_states; 215 : } 216 : 217 : OpenDDS::DCPS::ReceivedDataElement* 218 0 : OpenDDS::DCPS::ReceivedDataElementList::get_next_match(CORBA::ULong sample_states, ReceivedDataElement* prev) 219 : { 220 0 : OPENDDS_ASSERT(sanity_check(prev)); 221 0 : if (prev == tail_) { 222 0 : return 0; 223 : } 224 0 : ReceivedDataElement* item = prev ? prev->next_data_sample_ : head_; 225 0 : for (; item != 0; item = item->next_data_sample_) { 226 0 : if ((item->sample_state_ & sample_states) 227 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 228 0 : && !item->coherent_change_ 229 : #endif 230 : ) { 231 0 : return item; 232 : } 233 : } 234 0 : return 0; 235 : } 236 : 237 : void 238 0 : OpenDDS::DCPS::ReceivedDataElementList::mark_read(ReceivedDataElement* item) 239 : { 240 0 : OPENDDS_ASSERT(sanity_check(item)); 241 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 242 0 : if (!item->coherent_change_) 243 : #endif 244 : { 245 0 : if (item->sample_state_ & DDS::NOT_READ_SAMPLE_STATE) { 246 0 : item->sample_state_ = DDS::READ_SAMPLE_STATE; 247 0 : decrement_not_read_count(); 248 0 : increment_read_count(); 249 : } 250 : } 251 0 : } 252 : 253 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 254 : void 255 0 : OpenDDS::DCPS::ReceivedDataElementList::accept_coherent_change(OpenDDS::DCPS::ReceivedDataElement* item) 256 : { 257 0 : OPENDDS_ASSERT(sanity_check(item)); 258 0 : if (item->coherent_change_) { 259 0 : item->coherent_change_ = false; 260 0 : increment_not_read_count(); 261 : } 262 0 : } 263 : #endif 264 : 265 0 : void OpenDDS::DCPS::ReceivedDataElementList::increment_read_count() 266 : { 267 0 : if (!read_sample_count_) { 268 0 : sample_states_ |= DDS::READ_SAMPLE_STATE; 269 0 : DataReaderImpl_rch reader(reader_.lock()); 270 0 : if (reader) { 271 0 : reader->state_updated(instance_state_->instance_handle()); 272 : } 273 0 : } 274 0 : ++read_sample_count_; 275 0 : } 276 : 277 0 : void OpenDDS::DCPS::ReceivedDataElementList::decrement_read_count() 278 : { 279 0 : OPENDDS_ASSERT(read_sample_count_); 280 0 : --read_sample_count_; 281 0 : if (!read_sample_count_) { 282 0 : sample_states_ &= (~DDS::READ_SAMPLE_STATE); 283 0 : DataReaderImpl_rch reader(reader_.lock()); 284 0 : if (reader) { 285 0 : reader->state_updated(instance_state_->instance_handle()); 286 : } 287 0 : } 288 0 : } 289 : 290 0 : void OpenDDS::DCPS::ReceivedDataElementList::increment_not_read_count() 291 : { 292 0 : if (!not_read_sample_count_) { 293 0 : sample_states_ |= DDS::NOT_READ_SAMPLE_STATE; 294 0 : DataReaderImpl_rch reader(reader_.lock()); 295 0 : if (reader) { 296 0 : reader->state_updated(instance_state_->instance_handle()); 297 : } 298 0 : } 299 0 : ++not_read_sample_count_; 300 0 : } 301 : 302 0 : void OpenDDS::DCPS::ReceivedDataElementList::decrement_not_read_count() 303 : { 304 0 : OPENDDS_ASSERT(not_read_sample_count_); 305 0 : --not_read_sample_count_; 306 0 : if (!not_read_sample_count_) { 307 0 : sample_states_ &= (~DDS::NOT_READ_SAMPLE_STATE); 308 0 : DataReaderImpl_rch reader(reader_.lock()); 309 0 : if (reader) { 310 0 : reader->state_updated(instance_state_->instance_handle()); 311 : } 312 0 : } 313 0 : } 314 : 315 0 : bool OpenDDS::DCPS::ReceivedDataElementList::sanity_check() 316 : { 317 0 : OPENDDS_ASSERT(head_ == 0 || head_->previous_data_sample_ == 0); 318 0 : for (ReceivedDataElement* item = head_; item != 0; item = item->next_data_sample_) { 319 0 : OPENDDS_ASSERT(sanity_check(item)); 320 : } 321 0 : OPENDDS_ASSERT(tail_ == 0 || tail_->next_data_sample_ == 0); 322 0 : return true; 323 : } 324 : 325 0 : bool OpenDDS::DCPS::ReceivedDataElementList::sanity_check(ReceivedDataElement* item) 326 : { 327 : ACE_UNUSED_ARG(item); 328 0 : OPENDDS_ASSERT(item == 0 || (item->next_data_sample_ == 0 && item == tail_) || (item->next_data_sample_ && item->next_data_sample_->previous_data_sample_ == item)); 329 0 : OPENDDS_ASSERT(item == 0 || (item->previous_data_sample_ == 0 && item == head_) || (item->previous_data_sample_ && item->previous_data_sample_->next_data_sample_ == item)); 330 0 : return true; 331 : }