00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "TransportSendBuffer.h"
00011 #include "CopyChainVisitor.h"
00012 #include "PacketRemoveVisitor.h"
00013 #include "RemoveAllVisitor.h"
00014
00015 #include "dds/DCPS/DisjointSequence.h"
00016
00017 #include "ace/Log_Msg.h"
00018
00019 #include "dds/DCPS/GuidConverter.h"
00020
00021 #ifndef __ACE_INLINE__
00022 # include "TransportSendBuffer.inl"
00023 #endif
00024
00025 namespace OpenDDS {
00026 namespace DCPS {
00027
00028 TransportSendBuffer::~TransportSendBuffer()
00029 {
00030 }
00031
00032 void
00033 TransportSendBuffer::resend_one(const BufferType& buffer)
00034 {
00035 int bp = 0;
00036 this->strategy_->do_send_packet(buffer.second, bp);
00037 }
00038
00039
00040
00041
00042 SingleSendBuffer::SingleSendBuffer(size_t capacity,
00043 size_t max_samples_per_packet)
00044 : TransportSendBuffer(capacity),
00045 n_chunks_(capacity * max_samples_per_packet),
00046 retained_allocator_(this->n_chunks_),
00047 retained_mb_allocator_(this->n_chunks_ * 2),
00048 retained_db_allocator_(this->n_chunks_ * 2),
00049 replaced_allocator_(this->n_chunks_),
00050 replaced_mb_allocator_(this->n_chunks_ * 2),
00051 replaced_db_allocator_(this->n_chunks_ * 2)
00052 {
00053 }
00054
00055 SingleSendBuffer::~SingleSendBuffer()
00056 {
00057 release_all();
00058 }
00059
00060 void
00061 SingleSendBuffer::release_all()
00062 {
00063 for (BufferMap::iterator it(this->buffers_.begin());
00064 it != this->buffers_.end();) {
00065 release(it++);
00066 }
00067 }
00068
00069 void
00070 SingleSendBuffer::release_acked(SequenceNumber seq) {
00071 BufferMap::iterator buffer_iter = buffers_.begin();
00072 BufferType& buffer(buffer_iter->second);
00073
00074 if (Transport_debug_level > 5) {
00075 ACE_DEBUG((LM_DEBUG,
00076 ACE_TEXT("(%P|%t) SingleSendBuffer::release_acked() - ")
00077 ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00078 buffer.first, buffer.second
00079 ));
00080 }
00081 while (buffer_iter != buffers_.end()) {
00082 if (buffer_iter->first == seq) {
00083 release(buffer_iter);
00084 return;
00085 }
00086 ++buffer_iter;
00087 }
00088 }
00089
00090 void
00091 SingleSendBuffer::release(BufferMap::iterator buffer_iter)
00092 {
00093 BufferType& buffer(buffer_iter->second);
00094 if (Transport_debug_level > 5) {
00095 ACE_DEBUG((LM_DEBUG,
00096 ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
00097 ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00098 buffer.first, buffer.second
00099 ));
00100 }
00101
00102 if (buffer.first && buffer.second) {
00103
00104 RemoveAllVisitor visitor;
00105 buffer.first->accept_remove_visitor(visitor);
00106 delete buffer.first;
00107
00108 buffer.second->release();
00109 buffer.second = 0;
00110
00111 } else {
00112
00113 const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
00114 if (fm_it != fragments_.end()) {
00115 for (BufferMap::iterator bm_it = fm_it->second.begin();
00116 bm_it != fm_it->second.end(); ++bm_it) {
00117 RemoveAllVisitor visitor;
00118 bm_it->second.first->accept_remove_visitor(visitor);
00119 delete bm_it->second.first;
00120
00121 bm_it->second.second->release();
00122 bm_it->second.second = 0;
00123 }
00124 fragments_.erase(fm_it);
00125 }
00126 }
00127
00128 buffers_.erase(buffer_iter);
00129 }
00130
00131 void
00132 SingleSendBuffer::retain_all(RepoId pub_id)
00133 {
00134 if (Transport_debug_level > 5) {
00135 GuidConverter converter(pub_id);
00136 ACE_DEBUG((LM_DEBUG,
00137 ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ")
00138 ACE_TEXT("copying out blocks for publication: %C\n"),
00139 OPENDDS_STRING(converter).c_str()
00140 ));
00141 }
00142 for (BufferMap::iterator it(this->buffers_.begin());
00143 it != this->buffers_.end();) {
00144 if (it->second.first && it->second.second) {
00145 if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) {
00146 GuidConverter converter(pub_id);
00147 ACE_ERROR((LM_WARNING,
00148 ACE_TEXT("(%P|%t) WARNING: ")
00149 ACE_TEXT("SingleSendBuffer::retain_all: ")
00150 ACE_TEXT("failed to retain data from publication: %C!\n"),
00151 OPENDDS_STRING(converter).c_str()));
00152 release(it++);
00153 } else {
00154 ++it;
00155 }
00156
00157 } else {
00158 const FragmentMap::iterator fm_it = fragments_.find(it->first);
00159 if (fm_it != fragments_.end()) {
00160 for (BufferMap::iterator bm_it = fm_it->second.begin();
00161 bm_it != fm_it->second.end();) {
00162 if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) {
00163 GuidConverter converter(pub_id);
00164 ACE_ERROR((LM_WARNING,
00165 ACE_TEXT("(%P|%t) WARNING: ")
00166 ACE_TEXT("SingleSendBuffer::retain_all: failed to ")
00167 ACE_TEXT("retain fragment data from publication: %C!\n"),
00168 OPENDDS_STRING(converter).c_str()));
00169 release(bm_it++);
00170 } else {
00171 ++bm_it;
00172 }
00173 }
00174 }
00175 ++it;
00176 }
00177 }
00178 }
00179
00180 RemoveResult
00181 SingleSendBuffer::retain_buffer(const RepoId& pub_id, BufferType& buffer)
00182 {
00183 TransportQueueElement::MatchOnPubId match(pub_id);
00184 PacketRemoveVisitor visitor(match,
00185 buffer.second,
00186 buffer.second,
00187 this->replaced_allocator_,
00188 this->replaced_mb_allocator_,
00189 this->replaced_db_allocator_);
00190
00191 buffer.first->accept_replace_visitor(visitor);
00192 return visitor.status();
00193 }
00194
00195 void
00196 SingleSendBuffer::insert(SequenceNumber sequence,
00197 TransportSendStrategy::QueueType* queue,
00198 ACE_Message_Block* chain)
00199 {
00200 check_capacity();
00201
00202 BufferType& buffer = this->buffers_[sequence];
00203 insert_buffer(buffer, queue, chain);
00204
00205 if (Transport_debug_level > 5) {
00206 ACE_DEBUG((LM_DEBUG,
00207 ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ")
00208 ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"),
00209 sequence.getValue(),
00210 buffer.first, buffer.second
00211 ));
00212 }
00213 }
00214
00215 void
00216 SingleSendBuffer::insert_buffer(BufferType& buffer,
00217 TransportSendStrategy::QueueType* queue,
00218 ACE_Message_Block* chain)
00219 {
00220
00221 TransportSendStrategy::QueueType*& elems = buffer.first;
00222 ACE_NEW(elems, TransportSendStrategy::QueueType(queue->size(), 1));
00223
00224 CopyChainVisitor visitor(*elems,
00225 &this->retained_allocator_,
00226 &this->retained_mb_allocator_,
00227 &this->retained_db_allocator_);
00228 queue->accept_visitor(visitor);
00229
00230
00231 ACE_Message_Block*& data = buffer.second;
00232 data = TransportQueueElement::clone_mb(chain,
00233 &this->retained_mb_allocator_,
00234 &this->retained_db_allocator_);
00235 }
00236
00237 void
00238 SingleSendBuffer::insert_fragment(SequenceNumber sequence,
00239 SequenceNumber fragment,
00240 TransportSendStrategy::QueueType* queue,
00241 ACE_Message_Block* chain)
00242 {
00243 check_capacity();
00244
00245
00246
00247
00248 buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
00249 static_cast<ACE_Message_Block*>(0));
00250
00251 BufferType& buffer = fragments_[sequence][fragment];
00252 insert_buffer(buffer, queue, chain);
00253
00254 if (Transport_debug_level > 5) {
00255 ACE_DEBUG((LM_DEBUG,
00256 ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ")
00257 ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
00258 sequence.getValue(), fragment.getValue(),
00259 buffer.first, buffer.second
00260 ));
00261 }
00262 }
00263
00264 void
00265 SingleSendBuffer::check_capacity()
00266 {
00267 if (this->capacity_ == SingleSendBuffer::UNLIMITED) {
00268 return;
00269 }
00270
00271 if (this->buffers_.size() == this->capacity_) {
00272 BufferMap::iterator it(this->buffers_.begin());
00273 if (it == this->buffers_.end()) return;
00274
00275 if (Transport_debug_level > 5) {
00276 ACE_DEBUG((LM_DEBUG,
00277 ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ")
00278 ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"),
00279 it->first.getValue(),
00280 it->second.first, it->second.second
00281 ));
00282 }
00283
00284 release(it);
00285 }
00286 }
00287
00288 bool
00289 SingleSendBuffer::resend(const SequenceRange& range, DisjointSequence* gaps)
00290 {
00291 ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false);
00292 return resend_i(range, gaps);
00293 }
00294
00295 bool
00296 SingleSendBuffer::resend_i(const SequenceRange& range, DisjointSequence* gaps)
00297 {
00298
00299 const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? low() : range.first;
00300
00301 for (SequenceNumber sequence(range.first);
00302 sequence <= range.second; ++sequence) {
00303
00304
00305 BufferMap::iterator it(this->buffers_.find(sequence));
00306 if (it == this->buffers_.end()) {
00307 if (gaps) {
00308 gaps->insert(sequence);
00309 }
00310 } else {
00311 if (Transport_debug_level > 5) {
00312 ACE_DEBUG((LM_DEBUG,
00313 ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ")
00314 ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"),
00315 sequence.getValue(),
00316 it->second.first,
00317 it->second.second));
00318 }
00319 if (it->second.first && it->second.second) {
00320 resend_one(it->second);
00321 } else {
00322 const FragmentMap::iterator fm_it = fragments_.find(it->first);
00323 if (fm_it != fragments_.end()) {
00324 for (BufferMap::iterator bm_it = fm_it->second.begin();
00325 bm_it != fm_it->second.end(); ++bm_it) {
00326 resend_one(bm_it->second);
00327 }
00328 }
00329 }
00330 }
00331 }
00332
00333 return lowForAllResent >= low() && range.second <= high();
00334 }
00335
00336 void
00337 SingleSendBuffer::resend_fragments_i(const SequenceNumber& seq,
00338 const DisjointSequence& requested_frags)
00339 {
00340 if (fragments_.empty() || requested_frags.empty()) {
00341 return;
00342 }
00343 const BufferMap& buffers = fragments_[seq];
00344 const OPENDDS_VECTOR(SequenceRange) psr =
00345 requested_frags.present_sequence_ranges();
00346 SequenceNumber sent = SequenceNumber::ZERO();
00347 for (size_t i = 0; i < psr.size(); ++i) {
00348 BufferMap::const_iterator it = buffers.lower_bound(psr[i].first);
00349 if (it == buffers.end()) {
00350 return;
00351 }
00352 BufferMap::const_iterator it2 = buffers.lower_bound(psr[i].second);
00353 while (true) {
00354 if (sent < it->first) {
00355 resend_one(it->second);
00356 sent = it->first;
00357 }
00358 if (it == it2) {
00359 break;
00360 }
00361 ++it;
00362 }
00363 }
00364 }
00365
00366 }
00367 }