00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "TransportSendBuffer.h"
00011 #include "CopyChainVisitor.h"
00012 #include "PacketRemoveVisitor.h"
00013 #include "RemoveAllVisitor.h"
00014
00015 #include "dds/DCPS/DataSampleHeader.h"
00016 #include "dds/DCPS/DisjointSequence.h"
00017
00018 #include "ace/Log_Msg.h"
00019
00020 #include "dds/DCPS/GuidConverter.h"
00021
00022 #ifndef __ACE_INLINE__
00023 # include "TransportSendBuffer.inl"
00024 #endif
00025
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027
00028 namespace OpenDDS {
00029 namespace DCPS {
00030
00031
00032 TransportSendBuffer::~TransportSendBuffer()
00033 {
00034 }
00035
00036 void
00037 TransportSendBuffer::resend_one(const BufferType& buffer)
00038 {
00039 int bp = 0;
00040 this->strategy_->do_send_packet(buffer.second, bp);
00041 }
00042
00043
00044
00045
00046 const size_t SingleSendBuffer::UNLIMITED = 0;
00047
00048 SingleSendBuffer::SingleSendBuffer(size_t capacity,
00049 size_t max_samples_per_packet)
00050 : TransportSendBuffer(capacity),
00051 n_chunks_(capacity * max_samples_per_packet),
00052 retained_mb_allocator_(this->n_chunks_ * 2),
00053 retained_db_allocator_(this->n_chunks_ * 2),
00054 replaced_mb_allocator_(this->n_chunks_ * 2),
00055 replaced_db_allocator_(this->n_chunks_ * 2)
00056 {
00057 }
00058
00059 SingleSendBuffer::~SingleSendBuffer()
00060 {
00061 release_all();
00062 }
00063
00064 void
00065 SingleSendBuffer::release_all()
00066 {
00067 for (BufferMap::iterator it(this->buffers_.begin());
00068 it != this->buffers_.end();) {
00069 release(it++);
00070 }
00071 }
00072
00073 void
00074 SingleSendBuffer::release_acked(SequenceNumber seq) {
00075 BufferMap::iterator buffer_iter = buffers_.begin();
00076 BufferType& buffer(buffer_iter->second);
00077
00078 if (Transport_debug_level > 5) {
00079 ACE_DEBUG((LM_DEBUG,
00080 ACE_TEXT("(%P|%t) SingleSendBuffer::release_acked() - ")
00081 ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00082 buffer.first, buffer.second
00083 ));
00084 }
00085 while (buffer_iter != buffers_.end()) {
00086 if (buffer_iter->first == seq) {
00087 release(buffer_iter);
00088 return;
00089 }
00090 ++buffer_iter;
00091 }
00092 }
00093
00094 void
00095 SingleSendBuffer::release(BufferMap::iterator buffer_iter)
00096 {
00097 BufferType& buffer(buffer_iter->second);
00098 if (Transport_debug_level > 5) {
00099 ACE_DEBUG((LM_DEBUG,
00100 ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
00101 ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00102 buffer.first, buffer.second
00103 ));
00104 }
00105
00106 if (buffer.first && buffer.second) {
00107
00108 RemoveAllVisitor visitor;
00109 buffer.first->accept_remove_visitor(visitor);
00110 delete buffer.first;
00111
00112 buffer.second->release();
00113 buffer.second = 0;
00114
00115 } else {
00116
00117 const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
00118 if (fm_it != fragments_.end()) {
00119 for (BufferMap::iterator bm_it = fm_it->second.begin();
00120 bm_it != fm_it->second.end(); ++bm_it) {
00121 RemoveAllVisitor visitor;
00122 bm_it->second.first->accept_remove_visitor(visitor);
00123 delete bm_it->second.first;
00124
00125 bm_it->second.second->release();
00126 bm_it->second.second = 0;
00127 }
00128 fragments_.erase(fm_it);
00129 }
00130 }
00131
00132 buffers_.erase(buffer_iter);
00133 }
00134
00135 void
00136 SingleSendBuffer::retain_all(RepoId pub_id)
00137 {
00138 if (Transport_debug_level > 5) {
00139 GuidConverter converter(pub_id);
00140 ACE_DEBUG((LM_DEBUG,
00141 ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ")
00142 ACE_TEXT("copying out blocks for publication: %C\n"),
00143 OPENDDS_STRING(converter).c_str()
00144 ));
00145 }
00146 for (BufferMap::iterator it(this->buffers_.begin());
00147 it != this->buffers_.end();) {
00148 if (it->second.first && it->second.second) {
00149 if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) {
00150 GuidConverter converter(pub_id);
00151 ACE_ERROR((LM_WARNING,
00152 ACE_TEXT("(%P|%t) WARNING: ")
00153 ACE_TEXT("SingleSendBuffer::retain_all: ")
00154 ACE_TEXT("failed to retain data from publication: %C!\n"),
00155 OPENDDS_STRING(converter).c_str()));
00156 release(it++);
00157 } else {
00158 ++it;
00159 }
00160
00161 } else {
00162 const FragmentMap::iterator fm_it = fragments_.find(it->first);
00163 if (fm_it != fragments_.end()) {
00164 for (BufferMap::iterator bm_it = fm_it->second.begin();
00165 bm_it != fm_it->second.end();) {
00166 if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) {
00167 GuidConverter converter(pub_id);
00168 ACE_ERROR((LM_WARNING,
00169 ACE_TEXT("(%P|%t) WARNING: ")
00170 ACE_TEXT("SingleSendBuffer::retain_all: failed to ")
00171 ACE_TEXT("retain fragment data from publication: %C!\n"),
00172 OPENDDS_STRING(converter).c_str()));
00173 release(bm_it++);
00174 } else {
00175 ++bm_it;
00176 }
00177 }
00178 }
00179 ++it;
00180 }
00181 }
00182 }
00183
00184 RemoveResult
00185 SingleSendBuffer::retain_buffer(const RepoId& pub_id, BufferType& buffer)
00186 {
00187 TransportQueueElement::MatchOnPubId match(pub_id);
00188 PacketRemoveVisitor visitor(match,
00189 buffer.second,
00190 buffer.second,
00191 this->replaced_mb_allocator_,
00192 this->replaced_db_allocator_);
00193
00194 buffer.first->accept_replace_visitor(visitor);
00195 return visitor.status();
00196 }
00197
00198 void
00199 SingleSendBuffer::insert(SequenceNumber sequence,
00200 TransportSendStrategy::QueueType* queue,
00201 ACE_Message_Block* chain)
00202 {
00203 check_capacity();
00204
00205 BufferType& buffer = this->buffers_[sequence];
00206 insert_buffer(buffer, queue, chain);
00207
00208 if (Transport_debug_level > 5) {
00209 ACE_DEBUG((LM_DEBUG,
00210 ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ")
00211 ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"),
00212 sequence.getValue(),
00213 buffer.first, buffer.second
00214 ));
00215 }
00216
00217 if (queue && queue->size() == 1) {
00218 const TransportQueueElement* elt = queue->peek();
00219 const RepoId subId = elt->subscription_id();
00220 const ACE_Message_Block* msg = elt->msg();
00221 if (msg && subId != GUID_UNKNOWN &&
00222 !DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, msg)) {
00223 destinations_[sequence] = subId;
00224 }
00225 }
00226 }
00227
00228 void
00229 SingleSendBuffer::insert_buffer(BufferType& buffer,
00230 TransportSendStrategy::QueueType* queue,
00231 ACE_Message_Block* chain)
00232 {
00233
00234 TransportSendStrategy::QueueType*& elems = buffer.first;
00235 ACE_NEW(elems, TransportSendStrategy::QueueType());
00236
00237 CopyChainVisitor visitor(*elems,
00238 &this->retained_mb_allocator_,
00239 &this->retained_db_allocator_);
00240 queue->accept_visitor(visitor);
00241
00242
00243 ACE_Message_Block*& data = buffer.second;
00244 data = TransportQueueElement::clone_mb(chain,
00245 &this->retained_mb_allocator_,
00246 &this->retained_db_allocator_);
00247 }
00248
00249 void
00250 SingleSendBuffer::insert_fragment(SequenceNumber sequence,
00251 SequenceNumber fragment,
00252 TransportSendStrategy::QueueType* queue,
00253 ACE_Message_Block* chain)
00254 {
00255 check_capacity();
00256
00257
00258
00259
00260 buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
00261 static_cast<ACE_Message_Block*>(0));
00262
00263 BufferType& buffer = fragments_[sequence][fragment];
00264 insert_buffer(buffer, queue, chain);
00265
00266 if (Transport_debug_level > 5) {
00267 ACE_DEBUG((LM_DEBUG,
00268 ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ")
00269 ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
00270 sequence.getValue(), fragment.getValue(),
00271 buffer.first, buffer.second
00272 ));
00273 }
00274 }
00275
00276 void
00277 SingleSendBuffer::check_capacity()
00278 {
00279 if (this->capacity_ == SingleSendBuffer::UNLIMITED) {
00280 return;
00281 }
00282
00283 if (this->buffers_.size() == this->capacity_) {
00284 BufferMap::iterator it(this->buffers_.begin());
00285 if (it == this->buffers_.end()) return;
00286
00287 if (Transport_debug_level > 5) {
00288 ACE_DEBUG((LM_DEBUG,
00289 ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ")
00290 ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"),
00291 it->first.getValue(),
00292 it->second.first, it->second.second
00293 ));
00294 }
00295
00296 destinations_.erase(it->first);
00297 release(it);
00298 }
00299 }
00300
00301 bool
00302 SingleSendBuffer::resend(const SequenceRange& range, DisjointSequence* gaps)
00303 {
00304 ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false);
00305 return resend_i(range, gaps);
00306 }
00307
00308 bool
00309 SingleSendBuffer::resend_i(const SequenceRange& range, DisjointSequence* gaps)
00310 {
00311 return resend_i(range, gaps, GUID_UNKNOWN);
00312 }
00313
00314 bool
00315 SingleSendBuffer::resend_i(const SequenceRange& range, DisjointSequence* gaps,
00316 const RepoId& destination)
00317 {
00318
00319 const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? low() : range.first;
00320 const bool has_dest = destination != GUID_UNKNOWN;
00321
00322 for (SequenceNumber sequence(range.first);
00323 sequence <= range.second; ++sequence) {
00324
00325
00326 BufferMap::iterator it(buffers_.find(sequence));
00327 DestinationMap::iterator dest_data;
00328 if (has_dest) {
00329 dest_data = destinations_.find(sequence);
00330 }
00331 if (it == buffers_.end() || (has_dest && (dest_data == destinations_.end() ||
00332 dest_data->second != destination))) {
00333 if (gaps) {
00334 gaps->insert(sequence);
00335 }
00336 } else {
00337 if (Transport_debug_level > 5) {
00338 ACE_DEBUG((LM_DEBUG,
00339 ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ")
00340 ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"),
00341 sequence.getValue(),
00342 it->second.first,
00343 it->second.second));
00344 }
00345 if (it->second.first && it->second.second) {
00346 resend_one(it->second);
00347 } else {
00348 const FragmentMap::iterator fm_it = fragments_.find(it->first);
00349 if (fm_it != fragments_.end()) {
00350 for (BufferMap::iterator bm_it = fm_it->second.begin();
00351 bm_it != fm_it->second.end(); ++bm_it) {
00352 resend_one(bm_it->second);
00353 }
00354 }
00355 }
00356 }
00357 }
00358
00359 return lowForAllResent >= low() && range.second <= high();
00360 }
00361
00362 void
00363 SingleSendBuffer::resend_fragments_i(const SequenceNumber& seq,
00364 const DisjointSequence& requested_frags)
00365 {
00366 if (fragments_.empty() || requested_frags.empty()) {
00367 return;
00368 }
00369 const BufferMap& buffers = fragments_[seq];
00370 const OPENDDS_VECTOR(SequenceRange) psr =
00371 requested_frags.present_sequence_ranges();
00372 SequenceNumber sent = SequenceNumber::ZERO();
00373 for (size_t i = 0; i < psr.size(); ++i) {
00374 BufferMap::const_iterator it = buffers.lower_bound(psr[i].first);
00375 if (it == buffers.end()) {
00376 return;
00377 }
00378 BufferMap::const_iterator it2 = buffers.lower_bound(psr[i].second);
00379 while (true) {
00380 if (sent < it->first) {
00381 resend_one(it->second);
00382 sent = it->first;
00383 }
00384 if (it == it2) {
00385 break;
00386 }
00387 ++it;
00388 }
00389 }
00390 }
00391
00392 }
00393 }
00394
00395 OPENDDS_END_VERSIONED_NAMESPACE_DECL