TransportSendBuffer.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_TRANSPORTSENDBUFFER_H
00009 #define DCPS_TRANSPORTSENDBUFFER_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 
00013 #include "TransportRetainedElement.h"
00014 #include "TransportReplacedElement.h"
00015 #include "TransportSendStrategy.h"
00016 
00017 #include "dds/DCPS/Definitions.h"
00018 
00019 #include "dds/DCPS/PoolAllocator.h"
00020 #include <utility>
00021 
00022 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00023 class ACE_Message_Block;
00024 ACE_END_VERSIONED_NAMESPACE_DECL
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 class DisjointSequence;
00030 
00031 /// Abstract base class that forms the interface for TransportSendStrategy
00032 /// to store data for potential retransmission.  Derived classes actually
00033 /// store the data and can utilize TransportSendBuffer's friendship in
00034 /// TransportSendStrategy to retransmit (see method "resend_one").
00035 class OpenDDS_Dcps_Export TransportSendBuffer {
00036 public:
00037   size_t capacity() const;
00038   void bind(TransportSendStrategy* strategy);
00039 
00040   virtual void retain_all(RepoId pub_id) = 0;
00041   virtual void insert(SequenceNumber sequence,
00042                       TransportSendStrategy::QueueType* queue,
00043                       ACE_Message_Block* chain) = 0;
00044 
00045   typedef TransportSendStrategy::LockType LockType;
00046   LockType& strategy_lock() { return this->strategy_->lock_; }
00047 
00048 protected:
00049   explicit TransportSendBuffer(size_t capacity)
00050     : strategy_(0), capacity_(capacity) {}
00051   virtual ~TransportSendBuffer();
00052 
00053   typedef TransportSendStrategy::QueueType QueueType;
00054   typedef std::pair<QueueType*, ACE_Message_Block*> BufferType;
00055 
00056   void resend_one(const BufferType& buffer);
00057 
00058   TransportSendStrategy* strategy_;
00059   const size_t capacity_;
00060 
00061 private:
00062   TransportSendBuffer(const TransportSendBuffer&); // unimplemented
00063   TransportSendBuffer& operator=(const TransportSendBuffer&); // unimplemented
00064 };
00065 
00066 /// Implementation of TransportSendBuffer that manages data for a single
00067 /// domain of SequenceNumbers -- for a given SingleSendBuffer object, the
00068 /// sequence numbers passed to insert() must be generated from the same place.
00069 class OpenDDS_Dcps_Export SingleSendBuffer
00070   : public TransportSendBuffer, public RcObject<ACE_SYNCH_MUTEX> {
00071 public:
00072 
00073   static const size_t UNLIMITED = 0;
00074 
00075   void release_all();
00076   typedef OPENDDS_MAP(SequenceNumber, BufferType) BufferMap;
00077   void release_acked(SequenceNumber seq);
00078   void release(BufferMap::iterator buffer_iter);
00079 
00080   size_t n_chunks() const;
00081 
00082   SingleSendBuffer(size_t capacity, size_t max_samples_per_packet);
00083   ~SingleSendBuffer();
00084 
00085   bool resend(const SequenceRange& range, DisjointSequence* gaps = 0);
00086 
00087   // caller must already have the send strategy lock
00088   bool resend_i(const SequenceRange& range, DisjointSequence* gaps = 0);
00089 
00090   void resend_fragments_i(const SequenceNumber& sequence,
00091                           const DisjointSequence& fragments);
00092 
00093   SequenceNumber low() const;
00094   SequenceNumber high() const;
00095   bool empty() const;
00096   bool contains(const SequenceNumber& seq) const;
00097 
00098   void retain_all(RepoId pub_id);
00099   void insert(SequenceNumber sequence,
00100               TransportSendStrategy::QueueType* queue,
00101               ACE_Message_Block* chain);
00102   void insert_fragment(SequenceNumber sequence,
00103                        SequenceNumber fragment,
00104                        TransportSendStrategy::QueueType* queue,
00105                        ACE_Message_Block* chain);
00106 
00107 private:
00108   void check_capacity();
00109   RemoveResult retain_buffer(const RepoId& pub_id, BufferType& buffer);
00110   void insert_buffer(BufferType& buffer,
00111                      TransportSendStrategy::QueueType* queue,
00112                      ACE_Message_Block* chain);
00113 
00114   size_t n_chunks_;
00115 
00116   TransportRetainedElementAllocator retained_allocator_;
00117   MessageBlockAllocator retained_mb_allocator_;
00118   DataBlockAllocator retained_db_allocator_;
00119   TransportReplacedElementAllocator replaced_allocator_;
00120   MessageBlockAllocator replaced_mb_allocator_;
00121   DataBlockAllocator replaced_db_allocator_;
00122 
00123   BufferMap buffers_;
00124 
00125   typedef OPENDDS_MAP(SequenceNumber, BufferMap) FragmentMap;
00126   FragmentMap fragments_;
00127 };
00128 
00129 } // namespace DCPS
00130 } // namespace OpenDDS
00131 
00132 #ifdef __ACE_INLINE__
00133 # include "TransportSendBuffer.inl"
00134 #endif  /* __ACE_INLINE__ */
00135 
00136 #endif  /* DCPS_TRANSPORTSENDBUFFER_H */

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7