Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H 9 : #define OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H 10 : 11 : #include "Atomic.h" 12 : #include "dcps_export.h" 13 : #include "DataSampleHeader.h" 14 : #include "Definitions.h" 15 : #include "GuidUtils.h" 16 : #include "InstanceState.h" 17 : #include "Time_Helper.h" 18 : #include "unique_ptr.h" 19 : 20 : #include <dds/DdsDcpsInfrastructureC.h> 21 : 22 : #include "ace/Thread_Mutex.h" 23 : 24 : #if !defined (ACE_LACKS_PRAGMA_ONCE) 25 : #pragma once 26 : #endif /* ACE_LACKS_PRAGMA_ONCE */ 27 : 28 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 29 : 30 : namespace OpenDDS { 31 : namespace DCPS { 32 : 33 : class OpenDDS_Dcps_Export ReceivedDataElement { 34 : public: 35 0 : ReceivedDataElement(const DataSampleHeader& header, void *received_data, ACE_Recursive_Thread_Mutex* mx) 36 0 : : pub_(header.publication_id_), 37 0 : registered_data_(received_data), 38 0 : sample_state_(DDS::NOT_READ_SAMPLE_STATE), 39 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 40 0 : coherent_change_(header.coherent_change_), 41 0 : group_coherent_(header.group_coherent_), 42 0 : publisher_id_(header.publisher_id_), 43 : #endif 44 0 : valid_data_(received_data != 0), 45 0 : disposed_generation_count_(0), 46 0 : no_writers_generation_count_(0), 47 0 : zero_copy_cnt_(0), 48 0 : sequence_(header.sequence_), 49 0 : previous_data_sample_(0), 50 0 : next_data_sample_(0), 51 0 : ref_count_(1), 52 0 : mx_(mx) 53 : { 54 0 : destination_timestamp_ = SystemTimePoint::now().to_dds_time(); 55 : 56 0 : source_timestamp_.sec = header.source_timestamp_sec_; 57 0 : source_timestamp_.nanosec = header.source_timestamp_nanosec_; 58 : 59 : /* 60 : * In some situations, we will not have data to give to the user and 61 : * valid_data is how we communcate that to the user through a SampleInfo. 62 : */ 63 0 : if (!header.valid_data()) { 64 0 : valid_data_ = false; 65 : } 66 0 : } 67 : 68 0 : virtual ~ReceivedDataElement(){} 69 : 70 0 : void dec_ref() 71 : { 72 0 : if (0 == --ref_count_) { 73 0 : delete this; 74 : } 75 0 : } 76 : 77 0 : void inc_ref() 78 : { 79 0 : ++ref_count_; 80 0 : } 81 : 82 : long ref_count() 83 : { 84 : return ref_count_; 85 : } 86 : 87 : GUID_t pub_; 88 : 89 : /** 90 : * Data sample received, could only be the key fields in case we received 91 : * dispose and/or unregister message. 92 : */ 93 : void* const registered_data_; // ugly, but works.... 94 : 95 : /// Sample state for this data sample: 96 : /// DDS::NOT_READ_SAMPLE_STATE/DDS::READ_SAMPLE_STATE 97 : DDS::SampleStateKind sample_state_; 98 : 99 : /// Source time stamp for this data sample 100 : DDS::Time_t source_timestamp_; 101 : 102 : /// Reception time stamp for this data sample 103 : DDS::Time_t destination_timestamp_; 104 : 105 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 106 : /// Sample belongs to an active coherent change set 107 : bool coherent_change_; 108 : 109 : /// Sample belongs to a group coherent changes. 110 : bool group_coherent_; 111 : 112 : /// Publisher id represent group identifier. 113 : GUID_t publisher_id_; 114 : #endif 115 : 116 : /// Do we contain valid data 117 : bool valid_data_; 118 : 119 : /// The data sample's instance's disposed_generation_count_ 120 : /// at the time the sample was received 121 : size_t disposed_generation_count_; 122 : 123 : /// The data sample's instance's no_writers_generation_count_ 124 : /// at the time the sample was received 125 : size_t no_writers_generation_count_; 126 : 127 : /// This is needed to know if delete DataReader should fail with 128 : /// PRECONDITION_NOT_MET because there are outstanding loans. 129 : Atomic<long> zero_copy_cnt_; 130 : 131 : /// The data sample's sequence number 132 : SequenceNumber sequence_; 133 : 134 : /// the previous data sample in the ReceivedDataElementList 135 : ReceivedDataElement* previous_data_sample_; 136 : 137 : /// the next data sample in the ReceivedDataElementList 138 : ReceivedDataElement* next_data_sample_; 139 : 140 : void* operator new(size_t size, ACE_New_Allocator& pool); 141 : void operator delete(void* memory); 142 : void operator delete(void* memory, ACE_New_Allocator& pool); 143 : 144 : private: 145 : Atomic<long> ref_count_; 146 : protected: 147 : ACE_Recursive_Thread_Mutex* mx_; 148 : }; // class ReceivedDataElement 149 : 150 : struct ReceivedDataElementMemoryBlock 151 : { 152 : ReceivedDataElement element_; 153 : ACE_New_Allocator* allocator_; 154 : }; 155 : 156 : 157 : template <typename DataTypeWithAllocator> 158 : class ReceivedDataElementWithType : public ReceivedDataElement 159 : { 160 : public: 161 0 : ReceivedDataElementWithType(const DataSampleHeader& header, DataTypeWithAllocator* received_data, ACE_Recursive_Thread_Mutex* mx) 162 0 : : ReceivedDataElement(header, received_data, mx) 163 : { 164 0 : } 165 : 166 0 : ~ReceivedDataElementWithType() { 167 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, 168 : guard, 169 : *mx_) 170 0 : delete static_cast<DataTypeWithAllocator*> (registered_data_); 171 0 : } 172 : }; 173 : 174 : class OpenDDS_Dcps_Export ReceivedDataFilter { 175 : public: 176 0 : ReceivedDataFilter() {} 177 0 : virtual ~ReceivedDataFilter() {} 178 : 179 : virtual bool operator()(ReceivedDataElement* data_sample) = 0; 180 : }; 181 : 182 : class OpenDDS_Dcps_Export ReceivedDataOperation { 183 : public: 184 0 : ReceivedDataOperation() {} 185 0 : virtual ~ReceivedDataOperation() {} 186 : 187 : virtual void operator()(ReceivedDataElement* data_sample) = 0; 188 : }; 189 : 190 : class OpenDDS_Dcps_Export ReceivedDataElementList { 191 : public: 192 : explicit ReceivedDataElementList(const DataReaderImpl_rch& reader, const InstanceState_rch& instance_state = InstanceState_rch()); 193 : 194 : ~ReceivedDataElementList(); 195 : 196 : void apply_all(ReceivedDataFilter& match, ReceivedDataOperation& func); 197 : 198 : // adds a data sample to the end of the list 199 : void add(ReceivedDataElement* data_sample); 200 : void add_by_timestamp(ReceivedDataElement* data_sample); 201 : 202 : // returns true if the instance was released 203 : bool remove(ReceivedDataElement* data_sample); 204 : 205 : // returns true if the instance was released 206 : bool remove(ReceivedDataFilter& match, bool eval_all); 207 : 208 0 : const ReceivedDataElement* peek_tail() { return tail_; } 209 : 210 : ReceivedDataElement* remove_head(); 211 : ReceivedDataElement* remove_tail(); 212 : 213 0 : size_t size() const { return size_; } 214 : 215 : bool has_zero_copies() const; 216 : bool matches(CORBA::ULong sample_states) const; 217 : ReceivedDataElement* get_next_match(CORBA::ULong sample_states, ReceivedDataElement* prev); 218 : 219 : void mark_read(ReceivedDataElement* item); 220 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 221 : void accept_coherent_change(ReceivedDataElement* item); 222 : #endif 223 : 224 : private: 225 : DataReaderImpl_wrch reader_; 226 : 227 : /// The first element of the list. 228 : ReceivedDataElement* head_; 229 : 230 : /// The last element of the list. 231 : ReceivedDataElement* tail_; 232 : 233 : /// Number of elements in the list. 234 : size_t size_; 235 : 236 : CORBA::ULong read_sample_count_; 237 : CORBA::ULong not_read_sample_count_; 238 : CORBA::ULong sample_states_; 239 : 240 : void increment_read_count(); 241 : void decrement_read_count(); 242 : void increment_not_read_count(); 243 : void decrement_not_read_count(); 244 : InstanceState_rch instance_state_; 245 : 246 : bool sanity_check(); 247 : bool sanity_check(ReceivedDataElement* item); 248 : }; // ReceivedDataElementList 249 : 250 : } // namespace DCPS 251 : } // namespace OpenDDS 252 : 253 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 254 : 255 : #if defined (__ACE_INLINE__) 256 : # include "ReceivedDataElementList.inl" 257 : #endif /* __ACE_INLINE__ */ 258 : 259 : #endif /* OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H */