ReceivedDataElementList.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
00009 #define OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
00010 
00011 #include "ace/Atomic_Op_T.h"
00012 #include "ace/Thread_Mutex.h"
00013 
00014 #include "dcps_export.h"
00015 #include "Definitions.h"
00016 #include "GuidUtils.h"
00017 #include "DataSampleHeader.h"
00018 #include "Qos_Helper.h"
00019 
00020 #include "dds/DdsDcpsInfrastructureC.h"
00021 
00022 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00023 #pragma once
00024 #endif /* ACE_LACKS_PRAGMA_ONCE */
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 class InstanceState;
00030 
00031 class OpenDDS_Dcps_Export ReceivedDataElement {
00032 public:
00033   ReceivedDataElement(const DataSampleHeader& header, void *received_data)
00034     : pub_(header.publication_id_),
00035       registered_data_(received_data),
00036       sample_state_(DDS::NOT_READ_SAMPLE_STATE),
00037 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00038       coherent_change_(header.coherent_change_),
00039       group_coherent_(header.group_coherent_),
00040       publisher_id_ (header.publisher_id_),
00041 #endif
00042       disposed_generation_count_(0),
00043       no_writers_generation_count_(0),
00044       zero_copy_cnt_(0),
00045       sequence_(header.sequence_),
00046       previous_data_sample_(0),
00047       next_data_sample_(0),
00048       ref_count_(1)
00049   {
00050 
00051     this->destination_timestamp_ = time_value_to_time(ACE_OS::gettimeofday());
00052 
00053     this->source_timestamp_.sec = header.source_timestamp_sec_;
00054     this->source_timestamp_.nanosec = header.source_timestamp_nanosec_;
00055   }
00056 
00057   long dec_ref() {
00058     return --this->ref_count_;
00059   }
00060 
00061   long inc_ref() {
00062     return ++this->ref_count_;
00063   }
00064 
00065   long ref_count() {
00066     return this->ref_count_.value();
00067   }
00068 
00069   PublicationId pub_;
00070 
00071   /// Data sample received
00072   void *registered_data_;  // ugly, but works....
00073 
00074   /// Sample state for this data sample:
00075   /// DDS::NOT_READ_SAMPLE_STATE/DDS::READ_SAMPLE_STATE
00076   DDS::SampleStateKind sample_state_ ;
00077 
00078   ///Source time stamp for this data sample
00079   DDS::Time_t source_timestamp_;
00080 
00081   /// Reception time stamp for this data sample
00082   DDS::Time_t destination_timestamp_;
00083 
00084 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00085   /// Sample belongs to an active coherent change set
00086   bool coherent_change_;
00087 
00088   /// Sample belongs to a group coherent changes.
00089   bool group_coherent_;
00090 
00091   /// Publisher id represent group identifier.
00092   RepoId publisher_id_;
00093 #endif
00094 
00095   /// The data sample's instance's disposed_generation_count_
00096   /// at the time the sample was received
00097   size_t disposed_generation_count_ ;
00098 
00099   /// The data sample's instance's no_writers_generation_count_
00100   /// at the time the sample was received
00101   size_t no_writers_generation_count_ ;
00102 
00103   /// This is needed to know if delete DataReader should fail with
00104   /// PRECONDITION_NOT_MET because there are outstanding loans.
00105   ACE_Atomic_Op<ACE_Thread_Mutex, long> zero_copy_cnt_;
00106 
00107   /// The data sample's sequence number
00108   SequenceNumber sequence_ ;
00109 
00110   /// the previous data sample in the ReceivedDataElementList
00111   ReceivedDataElement *previous_data_sample_ ;
00112 
00113   /// the next data sample in the ReceivedDataElementList
00114   ReceivedDataElement *next_data_sample_ ;
00115 
00116 private:
00117   ACE_Atomic_Op<ACE_Thread_Mutex, long> ref_count_;
00118 
00119 }; // class ReceivedDataElement
00120 
00121 class OpenDDS_Dcps_Export ReceivedDataFilter {
00122 public:
00123   ReceivedDataFilter() {}
00124   virtual ~ReceivedDataFilter() {}
00125 
00126   virtual bool operator()(ReceivedDataElement* data_sample) = 0;
00127 };
00128 
00129 class OpenDDS_Dcps_Export ReceivedDataOperation {
00130 public:
00131   ReceivedDataOperation() {}
00132   virtual ~ReceivedDataOperation() {}
00133 
00134   virtual void operator()(ReceivedDataElement* data_sample) = 0;
00135 };
00136 
00137 class OpenDDS_Dcps_Export ReceivedDataElementList {
00138 public:
00139   ReceivedDataElementList(InstanceState *instance_state = 0) ;
00140 
00141   ~ReceivedDataElementList() ;
00142 
00143   void apply_all(ReceivedDataFilter& match, ReceivedDataOperation& func);
00144 
00145   // adds a data sample to the end of the list
00146   void add(ReceivedDataElement *data_sample) ;
00147 
00148   // returns true if the instance was released
00149   bool remove(ReceivedDataElement *data_sample) ;
00150 
00151   // returns true if the instance was released
00152   bool remove(ReceivedDataFilter& match, bool eval_all);
00153 
00154   ReceivedDataElement *remove_head() ;
00155   ReceivedDataElement *remove_tail() ;
00156 
00157   /// The first element of the list.
00158   ReceivedDataElement* head_ ;
00159 
00160   /// The last element of the list.
00161   ReceivedDataElement* tail_ ;
00162 
00163   /// Number of elements in the list.
00164   ssize_t              size_ ;
00165 
00166 private:
00167   InstanceState *instance_state_ ;
00168 }; // ReceivedDataElementList
00169 
00170 } // namespace DCPS
00171 } // namespace OpenDDS
00172 
00173 #if defined (__ACE_INLINE__)
00174 # include "ReceivedDataElementList.inl"
00175 #endif  /* __ACE_INLINE__ */
00176 
00177 #endif /* OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H  */

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7