22 #ifndef __ACE_INLINE__ 54 size_t max_samples_per_packet)
56 n_chunks_(capacity * max_samples_per_packet),
57 retained_mb_allocator_(n_chunks_ * 2),
58 retained_db_allocator_(n_chunks_ * 2),
59 replaced_mb_allocator_(n_chunks_ * 2),
60 replaced_db_allocator_(n_chunks_ * 2)
73 for (BufferMap::iterator it =
buffers_.begin();
82 BufferMap::iterator buffer_iter =
buffers_.find(seq);
92 BufferMap::iterator buffer_iter =
buffers_.find(seq);
105 ACE_TEXT(
"(%P|%t) SingleSendBuffer::release() - ")
106 ACE_TEXT(
"releasing buffer at: (0x%@,0x%@)\n"),
107 buffer.first, buffer.second
111 if (buffer.first && buffer.second) {
114 buffer.first->accept_remove_visitor(visitor);
122 const FragmentMap::iterator fm_it =
fragments_.find(buffer_iter->first);
124 for (BufferMap::iterator bm_it = fm_it->second.begin();
125 bm_it != fm_it->second.end(); ++bm_it) {
127 bm_it->second.first->accept_remove_visitor(visitor);
128 delete bm_it->second.first;
131 bm_it->second.second = 0;
147 ACE_TEXT(
"(%P|%t) SingleSendBuffer::release() - ")
148 ACE_TEXT(
"releasing buffer at: (0x%@,0x%@)\n"),
149 buffer.first, buffer.second
153 if (buffer.first && buffer.second) {
155 removed.push_back(buffer);
158 const FragmentMap::iterator fm_it =
fragments_.find(buffer_iter->first);
160 for (BufferMap::iterator bm_it = fm_it->second.begin();
161 bm_it != fm_it->second.end(); ++bm_it) {
162 removed.push_back(bm_it->second);
178 ACE_TEXT(
"(%P|%t) SingleSendBuffer::retain_all() - ")
179 ACE_TEXT(
"copying out blocks for publication: %C\n"),
184 for (BufferMap::iterator it(
buffers_.begin());
186 if (it->second.first && it->second.second) {
191 ACE_TEXT(
"SingleSendBuffer::retain_all: ")
192 ACE_TEXT(
"failed to retain data from publication: %C!\n"),
200 const FragmentMap::iterator fm_it =
fragments_.find(it->first);
202 for (BufferMap::iterator bm_it = fm_it->second.begin();
203 bm_it != fm_it->second.end();) {
208 ACE_TEXT(
"SingleSendBuffer::retain_all: failed to ")
209 ACE_TEXT(
"retain fragment data from publication: %C!\n"),
232 buffer.first->accept_replace_visitor(visitor);
262 ACE_TEXT(
"(%P|%t) SingleSendBuffer::insert() - ")
263 ACE_TEXT(
"saved PDU: %q as buffer(0x%@,0x%@)\n"),
265 buffer.first, buffer.second
269 if (queue && queue->
size() == 1) {
279 for (
size_t i = 0; i < removed.size(); ++i) {
281 removed[i].first->accept_remove_visitor(visitor);
282 delete removed[i].first;
308 bool is_last_fragment,
322 buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
323 static_cast<ACE_Message_Block*>(0));
326 if (is_last_fragment) {
333 ACE_TEXT(
"(%P|%t) SingleSendBuffer::insert_fragment() - ")
334 ACE_TEXT(
"saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
336 buffer.first, buffer.second
340 for (
size_t i = 0; i < removed.size(); ++i) {
342 removed[i].first->accept_remove_visitor(visitor);
343 delete removed[i].first;
351 if (
capacity_ == SingleSendBuffer::UNLIMITED) {
356 BufferMap::iterator it(
buffers_.begin());
361 ACE_TEXT(
"(%P|%t) SingleSendBuffer::check_capacity() - ")
362 ACE_TEXT(
"aging off PDU: %q as buffer(0x%@,0x%@)\n"),
363 it->first.getValue(),
364 it->second.first, it->second.second
394 const GUID_t& destination)
397 if (
buffers_.empty())
throw std::exception();
402 sequence <= range.second; ++sequence) {
405 BufferMap::iterator it(
buffers_.find(sequence));
406 DestinationMap::iterator dest_data;
411 dest_data->second != destination))) {
418 ACE_TEXT(
"(%P|%t) SingleSendBuffer::resend() - ")
419 ACE_TEXT(
"resending PDU: %q, (0x%@,0x%@)\n"),
424 if (it->second.first && it->second.second) {
427 const FragmentMap::iterator fm_it =
fragments_.find(it->first);
429 for (BufferMap::iterator bm_it = fm_it->second.begin();
430 bm_it != fm_it->second.end(); ++bm_it) {
438 return lowForAllResent >=
buffers_.begin()->first && range.second <=
buffers_.rbegin()->first;
444 size_t& cumulative_send_count)
449 const FragmentMap::const_iterator fm_it =
fragments_.find(seq);
453 const BufferMap& buffers = fm_it->second;
456 BufferMap::const_iterator it = buffers.lower_bound(psr.front().first);
457 BufferMap::const_iterator end = buffers.lower_bound(psr.back().second);
458 if (end != buffers.end()) {
466 while (i < psr.size() && it != end) {
467 if (psr[i].second < frag_min) {
473 if (it->first >= psr[i].first) {
475 ++cumulative_send_count;
477 frag_min = it->first + 1;
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void insert_buffer(BufferType &buffer, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
void retain_all(const GUID_t &pub_id)
SequenceNumberSet pre_seq_
#define ACE_NEW(POINTER, CONSTRUCTOR)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
RemoveResult retain_buffer(const GUID_t &pub_id, BufferType &buffer)
bool has_frags(const SequenceNumber &seq) const
ssize_t do_send_packet(const ACE_Message_Block *packet, int &bp)
Form an IOV and call the send_bytes() template method.
void release_acked(SequenceNumber seq)
TransportSendStrategy::LockType LockType
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
const char * c_str() const
void check_capacity_i(BufferVec &removed)
void resend_fragments_i(SequenceNumber sequence, const DisjointSequence &fragments, size_t &cumulative_send_count)
static const size_t UNLIMITED
std::pair< QueueType *, ACE_Message_Block * > BufferType
bool resend_i(const SequenceRange &range, DisjointSequence *gaps=0)
void insert(SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
virtual void retain_all(const GUID_t &pub_id)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
LockType & strategy_lock()
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
TransportSendStrategy * strategy_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual ACE_Message_Block * release(void)
virtual ACE_Message_Block * duplicate(void) const
size_t size() const
Accessor for the current number of elements in the queue.
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
void insert_fragment(SequenceNumber sequence, SequenceNumber fragment, bool is_last_fragment, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
MessageBlockAllocator retained_mb_allocator_
std::pair< SequenceNumber, SequenceNumber > SequenceRange
DestinationMap destinations_
RemoveResult status() const
void resend_one(const BufferType &buffer)
Sequence number abstraction. Only allows positive 64 bit values.
virtual GUID_t subscription_id() const
Accessor for the subscription id, if sent the sample is sent to 1 sub.
bool resend(const SequenceRange &range, DisjointSequence *gaps=0)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void accept_visitor(VisitorType &visitor) const
static ACE_Message_Block * clone_mb(const ACE_Message_Block *msg, MessageBlockAllocator *mb_allocator, DataBlockAllocator *db_allocator)
void remove_acked(SequenceNumber seq, BufferVec &removed)
MessageBlockAllocator replaced_mb_allocator_
virtual ~TransportSendBuffer()
DataBlockAllocator retained_db_allocator_
SingleSendBuffer(size_t capacity, size_t max_samples_per_packet)
DataBlockAllocator replaced_db_allocator_
void release_i(BufferMap::iterator buffer_iter)
The Internal API and Implementation of OpenDDS.
SequenceNumber minimum_sn_allowed_
Base wrapper class around a data/control sample to be sent.
void remove_i(BufferMap::iterator buffer_iter, BufferVec &removed)
typedef OPENDDS_VECTOR(BufferType) BufferVec
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)