TransportSendBuffer.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_TRANSPORTSENDBUFFER_H
00009 #define DCPS_TRANSPORTSENDBUFFER_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 
00013 #include "TransportRetainedElement.h"
00014 #include "TransportReplacedElement.h"
00015 #include "TransportSendStrategy.h"
00016 
00017 #include "dds/DCPS/Definitions.h"
00018 
00019 #include "dds/DCPS/PoolAllocator.h"
00020 #include "ace/Synch_Traits.h"
00021 #include <utility>
00022 
00023 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00024 class ACE_Message_Block;
00025 ACE_END_VERSIONED_NAMESPACE_DECL
00026 
00027 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00028 
00029 namespace OpenDDS {
00030 namespace DCPS {
00031 
00032 class DisjointSequence;
00033 
00034 /// Abstract base class that forms the interface for TransportSendStrategy
00035 /// to store data for potential retransmission.  Derived classes actually
00036 /// store the data and can utilize TransportSendBuffer's friendship in
00037 /// TransportSendStrategy to retransmit (see method "resend_one").
00038 class OpenDDS_Dcps_Export TransportSendBuffer {
00039 public:
00040   size_t capacity() const;
00041   void bind(TransportSendStrategy* strategy);
00042 
00043   virtual void retain_all(RepoId pub_id) = 0;
00044   virtual void insert(SequenceNumber sequence,
00045                       TransportSendStrategy::QueueType* queue,
00046                       ACE_Message_Block* chain) = 0;
00047 
00048   typedef TransportSendStrategy::LockType LockType;
00049   LockType& strategy_lock() { return this->strategy_->lock_; }
00050 
00051 protected:
00052   explicit TransportSendBuffer(size_t capacity)
00053     : strategy_(0), capacity_(capacity) {}
00054   virtual ~TransportSendBuffer();
00055 
00056   typedef TransportSendStrategy::QueueType QueueType;
00057   typedef std::pair<QueueType*, ACE_Message_Block*> BufferType;
00058 
00059   void resend_one(const BufferType& buffer);
00060 
00061   TransportSendStrategy* strategy_;
00062   const size_t capacity_;
00063 
00064 private:
00065   TransportSendBuffer(const TransportSendBuffer&); // unimplemented
00066   TransportSendBuffer& operator=(const TransportSendBuffer&); // unimplemented
00067 };
00068 
00069 /// Implementation of TransportSendBuffer that manages data for a single
00070 /// domain of SequenceNumbers -- for a given SingleSendBuffer object, the
00071 /// sequence numbers passed to insert() must be generated from the same place.
00072 class OpenDDS_Dcps_Export SingleSendBuffer
00073   : public TransportSendBuffer, public RcObject {
00074 public:
00075 
00076   static const size_t UNLIMITED;
00077 
00078   void release_all();
00079   typedef OPENDDS_MAP(SequenceNumber, BufferType) BufferMap;
00080   void release_acked(SequenceNumber seq);
00081   void release(BufferMap::iterator buffer_iter);
00082 
00083   size_t n_chunks() const;
00084 
00085   SingleSendBuffer(size_t capacity, size_t max_samples_per_packet);
00086   ~SingleSendBuffer();
00087 
00088   bool resend(const SequenceRange& range, DisjointSequence* gaps = 0);
00089 
00090   // caller must already have the send strategy lock
00091   bool resend_i(const SequenceRange& range, DisjointSequence* gaps = 0);
00092   bool resend_i(const SequenceRange& range, DisjointSequence* gaps,
00093                 const RepoId& destination);
00094 
00095   void resend_fragments_i(const SequenceNumber& sequence,
00096                           const DisjointSequence& fragments);
00097 
00098   SequenceNumber low() const;
00099   SequenceNumber high() const;
00100   bool empty() const;
00101   bool contains(const SequenceNumber& seq) const;
00102 
00103   void retain_all(RepoId pub_id);
00104   void insert(SequenceNumber sequence,
00105               TransportSendStrategy::QueueType* queue,
00106               ACE_Message_Block* chain);
00107   void insert_fragment(SequenceNumber sequence,
00108                        SequenceNumber fragment,
00109                        TransportSendStrategy::QueueType* queue,
00110                        ACE_Message_Block* chain);
00111 
00112 private:
00113   void check_capacity();
00114   RemoveResult retain_buffer(const RepoId& pub_id, BufferType& buffer);
00115   void insert_buffer(BufferType& buffer,
00116                      TransportSendStrategy::QueueType* queue,
00117                      ACE_Message_Block* chain);
00118 
00119   size_t n_chunks_;
00120 
00121   MessageBlockAllocator retained_mb_allocator_;
00122   DataBlockAllocator retained_db_allocator_;
00123   MessageBlockAllocator replaced_mb_allocator_;
00124   DataBlockAllocator replaced_db_allocator_;
00125 
00126   BufferMap buffers_;
00127 
00128   typedef OPENDDS_MAP(SequenceNumber, BufferMap) FragmentMap;
00129   FragmentMap fragments_;
00130 
00131   typedef OPENDDS_MAP(SequenceNumber, RepoId) DestinationMap;
00132   DestinationMap destinations_;
00133 };
00134 
00135 } // namespace DCPS
00136 } // namespace OpenDDS
00137 
00138 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00139 
00140 #ifdef __ACE_INLINE__
00141 # include "TransportSendBuffer.inl"
00142 #endif  /* __ACE_INLINE__ */
00143 
00144 #endif  /* DCPS_TRANSPORTSENDBUFFER_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1