ReceivedDataElementList.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
00009 #define OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H
00010 
00011 #include "ace/Atomic_Op_T.h"
00012 #include "ace/Thread_Mutex.h"
00013 
00014 #include "dcps_export.h"
00015 #include "Definitions.h"
00016 #include "GuidUtils.h"
00017 #include "DataSampleHeader.h"
00018 #include "Time_Helper.h"
00019 #include "unique_ptr.h"
00020 
00021 #include "dds/DdsDcpsInfrastructureC.h"
00022 
00023 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00024 #pragma once
00025 #endif /* ACE_LACKS_PRAGMA_ONCE */
00026 
00027 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00028 
00029 namespace OpenDDS {
00030 namespace DCPS {
00031 
00032 class InstanceState;
00033 
00034 class OpenDDS_Dcps_Export ReceivedDataElement {
00035 public:
00036   ReceivedDataElement(const DataSampleHeader& header, void *received_data, ACE_Recursive_Thread_Mutex* mx)
00037     : pub_(header.publication_id_),
00038       registered_data_(received_data),
00039       sample_state_(DDS::NOT_READ_SAMPLE_STATE),
00040 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00041       coherent_change_(header.coherent_change_),
00042       group_coherent_(header.group_coherent_),
00043       publisher_id_ (header.publisher_id_),
00044 #endif
00045       disposed_generation_count_(0),
00046       no_writers_generation_count_(0),
00047       zero_copy_cnt_(0),
00048       sequence_(header.sequence_),
00049       previous_data_sample_(0),
00050       next_data_sample_(0),
00051       ref_count_(1),
00052       mx_(mx)
00053   {
00054 
00055     this->destination_timestamp_ = time_value_to_time(ACE_OS::gettimeofday());
00056 
00057     this->source_timestamp_.sec = header.source_timestamp_sec_;
00058     this->source_timestamp_.nanosec = header.source_timestamp_nanosec_;
00059   }
00060 
00061   virtual ~ReceivedDataElement(){}
00062 
00063   void dec_ref() {
00064      if (0 == --this->ref_count_)
00065        delete this;
00066   }
00067 
00068   void inc_ref() {
00069     ++this->ref_count_;
00070   }
00071 
00072   long ref_count() {
00073     return this->ref_count_.value();
00074   }
00075 
00076   PublicationId pub_;
00077 
00078   /// Data sample received
00079   void * const registered_data_;  // ugly, but works....
00080 
00081   /// Sample state for this data sample:
00082   /// DDS::NOT_READ_SAMPLE_STATE/DDS::READ_SAMPLE_STATE
00083   DDS::SampleStateKind sample_state_ ;
00084 
00085   ///Source time stamp for this data sample
00086   DDS::Time_t source_timestamp_;
00087 
00088   /// Reception time stamp for this data sample
00089   DDS::Time_t destination_timestamp_;
00090 
00091 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00092   /// Sample belongs to an active coherent change set
00093   bool coherent_change_;
00094 
00095   /// Sample belongs to a group coherent changes.
00096   bool group_coherent_;
00097 
00098   /// Publisher id represent group identifier.
00099   RepoId publisher_id_;
00100 #endif
00101 
00102   /// The data sample's instance's disposed_generation_count_
00103   /// at the time the sample was received
00104   size_t disposed_generation_count_ ;
00105 
00106   /// The data sample's instance's no_writers_generation_count_
00107   /// at the time the sample was received
00108   size_t no_writers_generation_count_ ;
00109 
00110   /// This is needed to know if delete DataReader should fail with
00111   /// PRECONDITION_NOT_MET because there are outstanding loans.
00112   ACE_Atomic_Op<ACE_Thread_Mutex, long> zero_copy_cnt_;
00113 
00114   /// The data sample's sequence number
00115   SequenceNumber sequence_ ;
00116 
00117   /// the previous data sample in the ReceivedDataElementList
00118   ReceivedDataElement *previous_data_sample_ ;
00119 
00120   /// the next data sample in the ReceivedDataElementList
00121   ReceivedDataElement *next_data_sample_ ;
00122 
00123   void* operator new(size_t size, ACE_New_Allocator& pool);
00124   void operator delete(void* memory);
00125   void operator delete(void* memory, ACE_New_Allocator& pool);
00126 
00127 private:
00128   ACE_Atomic_Op<ACE_Thread_Mutex, long> ref_count_;
00129 protected:
00130   ACE_Recursive_Thread_Mutex* mx_;
00131 }; // class ReceivedDataElement
00132 
00133 struct ReceivedDataElementMemoryBlock
00134 {
00135   ReceivedDataElement element_;
00136   ACE_New_Allocator* allocator_;
00137 };
00138 
00139 
00140 template <typename DataTypeWithAllocator>
00141 class ReceivedDataElementWithType : public ReceivedDataElement
00142 {
00143 public:
00144   ReceivedDataElementWithType(const DataSampleHeader& header, DataTypeWithAllocator* received_data, ACE_Recursive_Thread_Mutex* mx)
00145     : ReceivedDataElement(header, received_data, mx)
00146   {
00147   }
00148 
00149   ~ReceivedDataElementWithType() {
00150     ACE_GUARD(ACE_Recursive_Thread_Mutex,
00151               guard,
00152               *this->mx_)
00153     delete static_cast<DataTypeWithAllocator*> (registered_data_);
00154   }
00155 };
00156 
00157 class OpenDDS_Dcps_Export ReceivedDataFilter {
00158 public:
00159   ReceivedDataFilter() {}
00160   virtual ~ReceivedDataFilter() {}
00161 
00162   virtual bool operator()(ReceivedDataElement* data_sample) = 0;
00163 };
00164 
00165 class OpenDDS_Dcps_Export ReceivedDataOperation {
00166 public:
00167   ReceivedDataOperation() {}
00168   virtual ~ReceivedDataOperation() {}
00169 
00170   virtual void operator()(ReceivedDataElement* data_sample) = 0;
00171 };
00172 
00173 class OpenDDS_Dcps_Export ReceivedDataElementList {
00174 public:
00175   ReceivedDataElementList(InstanceState *instance_state = 0) ;
00176 
00177   ~ReceivedDataElementList() ;
00178 
00179   void apply_all(ReceivedDataFilter& match, ReceivedDataOperation& func);
00180 
00181   // adds a data sample to the end of the list
00182   void add(ReceivedDataElement *data_sample) ;
00183 
00184   // returns true if the instance was released
00185   bool remove(ReceivedDataElement *data_sample) ;
00186 
00187   // returns true if the instance was released
00188   bool remove(ReceivedDataFilter& match, bool eval_all);
00189 
00190   ReceivedDataElement *remove_head() ;
00191   ReceivedDataElement *remove_tail() ;
00192 
00193   /// The first element of the list.
00194   ReceivedDataElement* head_ ;
00195 
00196   /// The last element of the list.
00197   ReceivedDataElement* tail_ ;
00198 
00199   /// Number of elements in the list.
00200   ssize_t              size_ ;
00201 
00202 private:
00203   InstanceState *instance_state_ ;
00204 }; // ReceivedDataElementList
00205 
00206 } // namespace DCPS
00207 } // namespace OpenDDS
00208 
00209 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00210 
00211 #if defined (__ACE_INLINE__)
00212 # include "ReceivedDataElementList.inl"
00213 #endif  /* __ACE_INLINE__ */
00214 
00215 #endif /* OPENDDS_DCPS_RECEIVEDDATAELEMENTLIST_H  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1