00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_TRANSPORTSENDBUFFER_H
00009 #define DCPS_TRANSPORTSENDBUFFER_H
00010
00011 #include "dds/DCPS/dcps_export.h"
00012
00013 #include "TransportRetainedElement.h"
00014 #include "TransportReplacedElement.h"
00015 #include "TransportSendStrategy.h"
00016
00017 #include "dds/DCPS/Definitions.h"
00018
00019 #include "dds/DCPS/PoolAllocator.h"
00020 #include <utility>
00021
00022 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00023 class ACE_Message_Block;
00024 ACE_END_VERSIONED_NAMESPACE_DECL
00025
00026 namespace OpenDDS {
00027 namespace DCPS {
00028
00029 class DisjointSequence;
00030
00031
00032
00033
00034
00035 class OpenDDS_Dcps_Export TransportSendBuffer {
00036 public:
00037 size_t capacity() const;
00038 void bind(TransportSendStrategy* strategy);
00039
00040 virtual void retain_all(RepoId pub_id) = 0;
00041 virtual void insert(SequenceNumber sequence,
00042 TransportSendStrategy::QueueType* queue,
00043 ACE_Message_Block* chain) = 0;
00044
00045 typedef TransportSendStrategy::LockType LockType;
00046 LockType& strategy_lock() { return this->strategy_->lock_; }
00047
00048 protected:
00049 explicit TransportSendBuffer(size_t capacity)
00050 : strategy_(0), capacity_(capacity) {}
00051 virtual ~TransportSendBuffer();
00052
00053 typedef TransportSendStrategy::QueueType QueueType;
00054 typedef std::pair<QueueType*, ACE_Message_Block*> BufferType;
00055
00056 void resend_one(const BufferType& buffer);
00057
00058 TransportSendStrategy* strategy_;
00059 const size_t capacity_;
00060
00061 private:
00062 TransportSendBuffer(const TransportSendBuffer&);
00063 TransportSendBuffer& operator=(const TransportSendBuffer&);
00064 };
00065
00066
00067
00068
00069 class OpenDDS_Dcps_Export SingleSendBuffer
00070 : public TransportSendBuffer, public RcObject<ACE_SYNCH_MUTEX> {
00071 public:
00072
00073 static const size_t UNLIMITED = 0;
00074
00075 void release_all();
00076 typedef OPENDDS_MAP(SequenceNumber, BufferType) BufferMap;
00077 void release_acked(SequenceNumber seq);
00078 void release(BufferMap::iterator buffer_iter);
00079
00080 size_t n_chunks() const;
00081
00082 SingleSendBuffer(size_t capacity, size_t max_samples_per_packet);
00083 ~SingleSendBuffer();
00084
00085 bool resend(const SequenceRange& range, DisjointSequence* gaps = 0);
00086
00087
00088 bool resend_i(const SequenceRange& range, DisjointSequence* gaps = 0);
00089
00090 void resend_fragments_i(const SequenceNumber& sequence,
00091 const DisjointSequence& fragments);
00092
00093 SequenceNumber low() const;
00094 SequenceNumber high() const;
00095 bool empty() const;
00096 bool contains(const SequenceNumber& seq) const;
00097
00098 void retain_all(RepoId pub_id);
00099 void insert(SequenceNumber sequence,
00100 TransportSendStrategy::QueueType* queue,
00101 ACE_Message_Block* chain);
00102 void insert_fragment(SequenceNumber sequence,
00103 SequenceNumber fragment,
00104 TransportSendStrategy::QueueType* queue,
00105 ACE_Message_Block* chain);
00106
00107 private:
00108 void check_capacity();
00109 RemoveResult retain_buffer(const RepoId& pub_id, BufferType& buffer);
00110 void insert_buffer(BufferType& buffer,
00111 TransportSendStrategy::QueueType* queue,
00112 ACE_Message_Block* chain);
00113
00114 size_t n_chunks_;
00115
00116 TransportRetainedElementAllocator retained_allocator_;
00117 MessageBlockAllocator retained_mb_allocator_;
00118 DataBlockAllocator retained_db_allocator_;
00119 TransportReplacedElementAllocator replaced_allocator_;
00120 MessageBlockAllocator replaced_mb_allocator_;
00121 DataBlockAllocator replaced_db_allocator_;
00122
00123 BufferMap buffers_;
00124
00125 typedef OPENDDS_MAP(SequenceNumber, BufferMap) FragmentMap;
00126 FragmentMap fragments_;
00127 };
00128
00129 }
00130 }
00131
00132 #ifdef __ACE_INLINE__
00133 # include "TransportSendBuffer.inl"
00134 #endif
00135
00136 #endif