00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
00009 #define OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
00010
00011 #include "ace/Atomic_Op_T.h"
00012 #include "ace/Thread_Mutex.h"
00013
00014 #include "dcps_export.h"
00015 #include "Definitions.h"
00016 #include "GuidUtils.h"
00017 #include "DataSampleHeader.h"
00018 #include "Qos_Helper.h"
00019
00020 #include "dds/DdsDcpsInfrastructureC.h"
00021
00022 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00023 #pragma once
00024 #endif
00025
00026 namespace OpenDDS {
00027 namespace DCPS {
00028
00029 class InstanceState;
00030
00031 class OpenDDS_Dcps_Export ReceivedDataElement {
00032 public:
00033 ReceivedDataElement(const DataSampleHeader& header, void *received_data)
00034 : pub_(header.publication_id_),
00035 registered_data_(received_data),
00036 sample_state_(DDS::NOT_READ_SAMPLE_STATE),
00037 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00038 coherent_change_(header.coherent_change_),
00039 group_coherent_(header.group_coherent_),
00040 publisher_id_ (header.publisher_id_),
00041 #endif
00042 disposed_generation_count_(0),
00043 no_writers_generation_count_(0),
00044 zero_copy_cnt_(0),
00045 sequence_(header.sequence_),
00046 previous_data_sample_(0),
00047 next_data_sample_(0),
00048 ref_count_(1)
00049 {
00050
00051 this->destination_timestamp_ = time_value_to_time(ACE_OS::gettimeofday());
00052
00053 this->source_timestamp_.sec = header.source_timestamp_sec_;
00054 this->source_timestamp_.nanosec = header.source_timestamp_nanosec_;
00055 }
00056
00057 long dec_ref() {
00058 return --this->ref_count_;
00059 }
00060
00061 long inc_ref() {
00062 return ++this->ref_count_;
00063 }
00064
00065 long ref_count() {
00066 return this->ref_count_.value();
00067 }
00068
00069 PublicationId pub_;
00070
00071
00072 void *registered_data_;
00073
00074
00075
00076 DDS::SampleStateKind sample_state_ ;
00077
00078
00079 DDS::Time_t source_timestamp_;
00080
00081
00082 DDS::Time_t destination_timestamp_;
00083
00084 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00085
00086 bool coherent_change_;
00087
00088
00089 bool group_coherent_;
00090
00091
00092 RepoId publisher_id_;
00093 #endif
00094
00095
00096
00097 size_t disposed_generation_count_ ;
00098
00099
00100
00101 size_t no_writers_generation_count_ ;
00102
00103
00104
00105 ACE_Atomic_Op<ACE_Thread_Mutex, long> zero_copy_cnt_;
00106
00107
00108 SequenceNumber sequence_ ;
00109
00110
00111 ReceivedDataElement *previous_data_sample_ ;
00112
00113
00114 ReceivedDataElement *next_data_sample_ ;
00115
00116 private:
00117 ACE_Atomic_Op<ACE_Thread_Mutex, long> ref_count_;
00118
00119 };
00120
00121 class OpenDDS_Dcps_Export ReceivedDataFilter {
00122 public:
00123 ReceivedDataFilter() {}
00124 virtual ~ReceivedDataFilter() {}
00125
00126 virtual bool operator()(ReceivedDataElement* data_sample) = 0;
00127 };
00128
00129 class OpenDDS_Dcps_Export ReceivedDataOperation {
00130 public:
00131 ReceivedDataOperation() {}
00132 virtual ~ReceivedDataOperation() {}
00133
00134 virtual void operator()(ReceivedDataElement* data_sample) = 0;
00135 };
00136
00137 class OpenDDS_Dcps_Export ReceivedDataElementList {
00138 public:
00139 ReceivedDataElementList(InstanceState *instance_state = 0) ;
00140
00141 ~ReceivedDataElementList() ;
00142
00143 void apply_all(ReceivedDataFilter& match, ReceivedDataOperation& func);
00144
00145
00146 void add(ReceivedDataElement *data_sample) ;
00147
00148
00149 bool remove(ReceivedDataElement *data_sample) ;
00150
00151
00152 bool remove(ReceivedDataFilter& match, bool eval_all);
00153
00154 ReceivedDataElement *remove_head() ;
00155 ReceivedDataElement *remove_tail() ;
00156
00157
00158 ReceivedDataElement* head_ ;
00159
00160
00161 ReceivedDataElement* tail_ ;
00162
00163
00164 ssize_t size_ ;
00165
00166 private:
00167 InstanceState *instance_state_ ;
00168 };
00169
00170 }
00171 }
00172
00173 #if defined (__ACE_INLINE__)
00174 # include "ReceivedDataElementList.inl"
00175 #endif
00176
00177 #endif