ReceivedDataElementList.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
00009 #define OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
00010
00011 #include "ace/Atomic_Op_T.h"
00012 #include "ace/Thread_Mutex.h"
00013
00014 #include "dcps_export.h"
00015 #include "Definitions.h"
00016 #include "GuidUtils.h"
00017 #include "DataSampleHeader.h"
00018 #include "Time_Helper.h"
00019 #include "unique_ptr.h"
00020
00021 #include "dds/DdsDcpsInfrastructureC.h"
00022
00023 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00024 #pragma once
00025 #endif
00026
00027 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00028
00029 namespace OpenDDS {
00030 namespace DCPS {
00031
00032 class InstanceState;
00033
00034 class OpenDDS_Dcps_Export ReceivedDataElement {
00035 public:
00036 ReceivedDataElement(const DataSampleHeader& header, void *received_data, ACE_Recursive_Thread_Mutex* mx)
00037 : pub_(header.publication_id_),
00038 registered_data_(received_data),
00039 sample_state_(DDS::NOT_READ_SAMPLE_STATE),
00040 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00041 coherent_change_(header.coherent_change_),
00042 group_coherent_(header.group_coherent_),
00043 publisher_id_ (header.publisher_id_),
00044 #endif
00045 disposed_generation_count_(0),
00046 no_writers_generation_count_(0),
00047 zero_copy_cnt_(0),
00048 sequence_(header.sequence_),
00049 previous_data_sample_(0),
00050 next_data_sample_(0),
00051 ref_count_(1),
00052 mx_(mx)
00053 {
00054
00055 this->destination_timestamp_ = time_value_to_time(ACE_OS::gettimeofday());
00056
00057 this->source_timestamp_.sec = header.source_timestamp_sec_;
00058 this->source_timestamp_.nanosec = header.source_timestamp_nanosec_;
00059 }
00060
00061 virtual ~ReceivedDataElement(){}
00062
00063 void dec_ref() {
00064 if (0 == --this->ref_count_)
00065 delete this;
00066 }
00067
00068 void inc_ref() {
00069 ++this->ref_count_;
00070 }
00071
00072 long ref_count() {
00073 return this->ref_count_.value();
00074 }
00075
00076 PublicationId pub_;
00077
00078
00079 void * const registered_data_;
00080
00081
00082
00083 DDS::SampleStateKind sample_state_ ;
00084
00085
00086 DDS::Time_t source_timestamp_;
00087
00088
00089 DDS::Time_t destination_timestamp_;
00090
00091 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00092
00093 bool coherent_change_;
00094
00095
00096 bool group_coherent_;
00097
00098
00099 RepoId publisher_id_;
00100 #endif
00101
00102
00103
00104 size_t disposed_generation_count_ ;
00105
00106
00107
00108 size_t no_writers_generation_count_ ;
00109
00110
00111
00112 ACE_Atomic_Op<ACE_Thread_Mutex, long> zero_copy_cnt_;
00113
00114
00115 SequenceNumber sequence_ ;
00116
00117
00118 ReceivedDataElement *previous_data_sample_ ;
00119
00120
00121 ReceivedDataElement *next_data_sample_ ;
00122
00123 void* operator new(size_t size, ACE_New_Allocator& pool);
00124 void operator delete(void* memory);
00125 void operator delete(void* memory, ACE_New_Allocator& pool);
00126
00127 private:
00128 ACE_Atomic_Op<ACE_Thread_Mutex, long> ref_count_;
00129 protected:
00130 ACE_Recursive_Thread_Mutex* mx_;
00131 };
00132
00133 struct ReceivedDataElementMemoryBlock
00134 {
00135 ReceivedDataElement element_;
00136 ACE_New_Allocator* allocator_;
00137 };
00138
00139
00140 template <typename DataTypeWithAllocator>
00141 class ReceivedDataElementWithType : public ReceivedDataElement
00142 {
00143 public:
00144 ReceivedDataElementWithType(const DataSampleHeader& header, DataTypeWithAllocator* received_data, ACE_Recursive_Thread_Mutex* mx)
00145 : ReceivedDataElement(header, received_data, mx)
00146 {
00147 }
00148
00149 ~ReceivedDataElementWithType() {
00150 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00151 guard,
00152 *this->mx_)
00153 delete static_cast<DataTypeWithAllocator*> (registered_data_);
00154 }
00155 };
00156
00157 class OpenDDS_Dcps_Export ReceivedDataFilter {
00158 public:
00159 ReceivedDataFilter() {}
00160 virtual ~ReceivedDataFilter() {}
00161
00162 virtual bool operator()(ReceivedDataElement* data_sample) = 0;
00163 };
00164
00165 class OpenDDS_Dcps_Export ReceivedDataOperation {
00166 public:
00167 ReceivedDataOperation() {}
00168 virtual ~ReceivedDataOperation() {}
00169
00170 virtual void operator()(ReceivedDataElement* data_sample) = 0;
00171 };
00172
00173 class OpenDDS_Dcps_Export ReceivedDataElementList {
00174 public:
00175 ReceivedDataElementList(InstanceState *instance_state = 0) ;
00176
00177 ~ReceivedDataElementList() ;
00178
00179 void apply_all(ReceivedDataFilter& match, ReceivedDataOperation& func);
00180
00181
00182 void add(ReceivedDataElement *data_sample) ;
00183
00184
00185 bool remove(ReceivedDataElement *data_sample) ;
00186
00187
00188 bool remove(ReceivedDataFilter& match, bool eval_all);
00189
00190 ReceivedDataElement *remove_head() ;
00191 ReceivedDataElement *remove_tail() ;
00192
00193
00194 ReceivedDataElement* head_ ;
00195
00196
00197 ReceivedDataElement* tail_ ;
00198
00199
00200 ssize_t size_ ;
00201
00202 private:
00203 InstanceState *instance_state_ ;
00204 };
00205
00206 }
00207 }
00208
00209 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00210
00211 #if defined (__ACE_INLINE__)
00212 # include "ReceivedDataElementList.inl"
00213 #endif
00214
00215 #endif