TransportSendBuffer.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_TRANSPORTSENDBUFFER_H
00009 #define DCPS_TRANSPORTSENDBUFFER_H
00010
00011 #include "dds/DCPS/dcps_export.h"
00012
00013 #include "TransportRetainedElement.h"
00014 #include "TransportReplacedElement.h"
00015 #include "TransportSendStrategy.h"
00016
00017 #include "dds/DCPS/Definitions.h"
00018
00019 #include "dds/DCPS/PoolAllocator.h"
00020 #include "ace/Synch_Traits.h"
00021 #include <utility>
00022
00023 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00024 class ACE_Message_Block;
00025 ACE_END_VERSIONED_NAMESPACE_DECL
00026
00027 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00028
00029 namespace OpenDDS {
00030 namespace DCPS {
00031
00032 class DisjointSequence;
00033
00034
00035
00036
00037
00038 class OpenDDS_Dcps_Export TransportSendBuffer {
00039 public:
00040 size_t capacity() const;
00041 void bind(TransportSendStrategy* strategy);
00042
00043 virtual void retain_all(RepoId pub_id) = 0;
00044 virtual void insert(SequenceNumber sequence,
00045 TransportSendStrategy::QueueType* queue,
00046 ACE_Message_Block* chain) = 0;
00047
00048 typedef TransportSendStrategy::LockType LockType;
00049 LockType& strategy_lock() { return this->strategy_->lock_; }
00050
00051 protected:
00052 explicit TransportSendBuffer(size_t capacity)
00053 : strategy_(0), capacity_(capacity) {}
00054 virtual ~TransportSendBuffer();
00055
00056 typedef TransportSendStrategy::QueueType QueueType;
00057 typedef std::pair<QueueType*, ACE_Message_Block*> BufferType;
00058
00059 void resend_one(const BufferType& buffer);
00060
00061 TransportSendStrategy* strategy_;
00062 const size_t capacity_;
00063
00064 private:
00065 TransportSendBuffer(const TransportSendBuffer&);
00066 TransportSendBuffer& operator=(const TransportSendBuffer&);
00067 };
00068
00069
00070
00071
00072 class OpenDDS_Dcps_Export SingleSendBuffer
00073 : public TransportSendBuffer, public RcObject {
00074 public:
00075
00076 static const size_t UNLIMITED;
00077
00078 void release_all();
00079 typedef OPENDDS_MAP(SequenceNumber, BufferType) BufferMap;
00080 void release_acked(SequenceNumber seq);
00081 void release(BufferMap::iterator buffer_iter);
00082
00083 size_t n_chunks() const;
00084
00085 SingleSendBuffer(size_t capacity, size_t max_samples_per_packet);
00086 ~SingleSendBuffer();
00087
00088 bool resend(const SequenceRange& range, DisjointSequence* gaps = 0);
00089
00090
00091 bool resend_i(const SequenceRange& range, DisjointSequence* gaps = 0);
00092 bool resend_i(const SequenceRange& range, DisjointSequence* gaps,
00093 const RepoId& destination);
00094
00095 void resend_fragments_i(const SequenceNumber& sequence,
00096 const DisjointSequence& fragments);
00097
00098 SequenceNumber low() const;
00099 SequenceNumber high() const;
00100 bool empty() const;
00101 bool contains(const SequenceNumber& seq) const;
00102
00103 void retain_all(RepoId pub_id);
00104 void insert(SequenceNumber sequence,
00105 TransportSendStrategy::QueueType* queue,
00106 ACE_Message_Block* chain);
00107 void insert_fragment(SequenceNumber sequence,
00108 SequenceNumber fragment,
00109 TransportSendStrategy::QueueType* queue,
00110 ACE_Message_Block* chain);
00111
00112 private:
00113 void check_capacity();
00114 RemoveResult retain_buffer(const RepoId& pub_id, BufferType& buffer);
00115 void insert_buffer(BufferType& buffer,
00116 TransportSendStrategy::QueueType* queue,
00117 ACE_Message_Block* chain);
00118
00119 size_t n_chunks_;
00120
00121 MessageBlockAllocator retained_mb_allocator_;
00122 DataBlockAllocator retained_db_allocator_;
00123 MessageBlockAllocator replaced_mb_allocator_;
00124 DataBlockAllocator replaced_db_allocator_;
00125
00126 BufferMap buffers_;
00127
00128 typedef OPENDDS_MAP(SequenceNumber, BufferMap) FragmentMap;
00129 FragmentMap fragments_;
00130
00131 typedef OPENDDS_MAP(SequenceNumber, RepoId) DestinationMap;
00132 DestinationMap destinations_;
00133 };
00134
00135 }
00136 }
00137
00138 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00139
00140 #ifdef __ACE_INLINE__
00141 # include "TransportSendBuffer.inl"
00142 #endif
00143
00144 #endif