#include <TransportSendBuffer.h>
Inheritance diagram for OpenDDS::DCPS::SingleSendBuffer:
Definition at line 69 of file TransportSendBuffer.h.
OpenDDS::DCPS::SingleSendBuffer::SingleSendBuffer | ( | size_t | capacity, | |
size_t | max_samples_per_packet | |||
) |
Definition at line 42 of file TransportSendBuffer.cpp.
00044 : TransportSendBuffer(capacity), 00045 n_chunks_(capacity * max_samples_per_packet), 00046 retained_allocator_(this->n_chunks_), 00047 retained_mb_allocator_(this->n_chunks_ * 2), 00048 retained_db_allocator_(this->n_chunks_ * 2), 00049 replaced_allocator_(this->n_chunks_), 00050 replaced_mb_allocator_(this->n_chunks_ * 2), 00051 replaced_db_allocator_(this->n_chunks_ * 2) 00052 { 00053 }
OpenDDS::DCPS::SingleSendBuffer::~SingleSendBuffer | ( | ) |
Definition at line 55 of file TransportSendBuffer.cpp.
References release_all().
00056 { 00057 release_all(); 00058 }
void OpenDDS::DCPS::SingleSendBuffer::check_capacity | ( | ) | [private] |
Definition at line 265 of file TransportSendBuffer.cpp.
References OpenDDS::DCPS::TransportSendBuffer::capacity_, release(), OpenDDS::DCPS::Transport_debug_level, and UNLIMITED.
Referenced by insert(), and insert_fragment().
00266 { 00267 if (this->capacity_ == SingleSendBuffer::UNLIMITED) { 00268 return; 00269 } 00270 // Age off oldest sample if we are at capacity: 00271 if (this->buffers_.size() == this->capacity_) { 00272 BufferMap::iterator it(this->buffers_.begin()); 00273 if (it == this->buffers_.end()) return; 00274 00275 if (Transport_debug_level > 5) { 00276 ACE_DEBUG((LM_DEBUG, 00277 ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ") 00278 ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"), 00279 it->first.getValue(), 00280 it->second.first, it->second.second 00281 )); 00282 } 00283 00284 release(it); 00285 } 00286 }
ACE_INLINE bool OpenDDS::DCPS::SingleSendBuffer::contains | ( | const SequenceNumber & | seq | ) | const |
Definition at line 53 of file TransportSendBuffer.inl.
References buffers_.
00054 { 00055 return this->buffers_.count(seq); 00056 }
ACE_INLINE bool OpenDDS::DCPS::SingleSendBuffer::empty | ( | ) | const |
Definition at line 47 of file TransportSendBuffer.inl.
References buffers_.
Referenced by OpenDDS::DCPS::ReliableSession::nak_received().
00048 { 00049 return this->buffers_.empty(); 00050 }
ACE_INLINE SequenceNumber OpenDDS::DCPS::SingleSendBuffer::high | ( | ) | const |
Definition at line 40 of file TransportSendBuffer.inl.
References buffers_.
Referenced by resend_i().
00041 { 00042 if (this->buffers_.empty()) throw std::exception(); 00043 return this->buffers_.rbegin()->first; 00044 }
void OpenDDS::DCPS::SingleSendBuffer::insert | ( | SequenceNumber | sequence, | |
TransportSendStrategy::QueueType * | queue, | |||
ACE_Message_Block * | chain | |||
) | [virtual] |
Implements OpenDDS::DCPS::TransportSendBuffer.
Definition at line 196 of file TransportSendBuffer.cpp.
References buffers_, check_capacity(), OpenDDS::DCPS::SequenceNumber::getValue(), insert_buffer(), and OpenDDS::DCPS::Transport_debug_level.
00199 { 00200 check_capacity(); 00201 00202 BufferType& buffer = this->buffers_[sequence]; 00203 insert_buffer(buffer, queue, chain); 00204 00205 if (Transport_debug_level > 5) { 00206 ACE_DEBUG((LM_DEBUG, 00207 ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ") 00208 ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"), 00209 sequence.getValue(), 00210 buffer.first, buffer.second 00211 )); 00212 } 00213 }
void OpenDDS::DCPS::SingleSendBuffer::insert_buffer | ( | BufferType & | buffer, | |
TransportSendStrategy::QueueType * | queue, | |||
ACE_Message_Block * | chain | |||
) | [private] |
Definition at line 216 of file TransportSendBuffer.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), OpenDDS::DCPS::TransportQueueElement::clone_mb(), and OpenDDS::DCPS::BasicQueue< T >::size().
Referenced by insert(), and insert_fragment().
00219 { 00220 // Copy sample's TransportQueueElements: 00221 TransportSendStrategy::QueueType*& elems = buffer.first; 00222 ACE_NEW(elems, TransportSendStrategy::QueueType(queue->size(), 1)); 00223 00224 CopyChainVisitor visitor(*elems, 00225 &this->retained_allocator_, 00226 &this->retained_mb_allocator_, 00227 &this->retained_db_allocator_); 00228 queue->accept_visitor(visitor); 00229 00230 // Copy sample's message/data block descriptors: 00231 ACE_Message_Block*& data = buffer.second; 00232 data = TransportQueueElement::clone_mb(chain, 00233 &this->retained_mb_allocator_, 00234 &this->retained_db_allocator_); 00235 }
void OpenDDS::DCPS::SingleSendBuffer::insert_fragment | ( | SequenceNumber | sequence, | |
SequenceNumber | fragment, | |||
TransportSendStrategy::QueueType * | queue, | |||
ACE_Message_Block * | chain | |||
) |
Definition at line 238 of file TransportSendBuffer.cpp.
References buffers_, check_capacity(), fragments_, OpenDDS::DCPS::SequenceNumber::getValue(), insert_buffer(), and OpenDDS::DCPS::Transport_debug_level.
00242 { 00243 check_capacity(); 00244 00245 // Insert into buffers_ so that the overall capacity is maintained 00246 // The entry in buffers_ with two null pointers indicates that the 00247 // actual data is stored in fragments_[sequence]. 00248 buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0), 00249 static_cast<ACE_Message_Block*>(0)); 00250 00251 BufferType& buffer = fragments_[sequence][fragment]; 00252 insert_buffer(buffer, queue, chain); 00253 00254 if (Transport_debug_level > 5) { 00255 ACE_DEBUG((LM_DEBUG, 00256 ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ") 00257 ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"), 00258 sequence.getValue(), fragment.getValue(), 00259 buffer.first, buffer.second 00260 )); 00261 } 00262 }
ACE_INLINE SequenceNumber OpenDDS::DCPS::SingleSendBuffer::low | ( | ) | const |
Definition at line 33 of file TransportSendBuffer.inl.
References buffers_.
Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), and resend_i().
00034 { 00035 if (this->buffers_.empty()) throw std::exception(); 00036 return this->buffers_.begin()->first; 00037 }
ACE_INLINE size_t OpenDDS::DCPS::SingleSendBuffer::n_chunks | ( | ) | const |
Definition at line 27 of file TransportSendBuffer.inl.
References n_chunks_.
00028 { 00029 return this->n_chunks_; 00030 }
typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP | ( | SequenceNumber | , | |
BufferMap | ||||
) | [private] |
typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP | ( | SequenceNumber | , | |
BufferType | ||||
) |
void OpenDDS::DCPS::SingleSendBuffer::release | ( | BufferMap::iterator | buffer_iter | ) |
Definition at line 91 of file TransportSendBuffer.cpp.
References buffers_, fragments_, and OpenDDS::DCPS::Transport_debug_level.
Referenced by check_capacity(), release_acked(), release_all(), and retain_all().
00092 { 00093 BufferType& buffer(buffer_iter->second); 00094 if (Transport_debug_level > 5) { 00095 ACE_DEBUG((LM_DEBUG, 00096 ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ") 00097 ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"), 00098 buffer.first, buffer.second 00099 )); 00100 } 00101 00102 if (buffer.first && buffer.second) { 00103 // not a fragment 00104 RemoveAllVisitor visitor; 00105 buffer.first->accept_remove_visitor(visitor); 00106 delete buffer.first; 00107 00108 buffer.second->release(); 00109 buffer.second = 0; 00110 00111 } else { 00112 // data actually stored in fragments_ 00113 const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first); 00114 if (fm_it != fragments_.end()) { 00115 for (BufferMap::iterator bm_it = fm_it->second.begin(); 00116 bm_it != fm_it->second.end(); ++bm_it) { 00117 RemoveAllVisitor visitor; 00118 bm_it->second.first->accept_remove_visitor(visitor); 00119 delete bm_it->second.first; 00120 00121 bm_it->second.second->release(); 00122 bm_it->second.second = 0; 00123 } 00124 fragments_.erase(fm_it); 00125 } 00126 } 00127 00128 buffers_.erase(buffer_iter); 00129 }
void OpenDDS::DCPS::SingleSendBuffer::release_acked | ( | SequenceNumber | seq | ) |
Definition at line 70 of file TransportSendBuffer.cpp.
References buffers_, release(), and OpenDDS::DCPS::Transport_debug_level.
00070 { 00071 BufferMap::iterator buffer_iter = buffers_.begin(); 00072 BufferType& buffer(buffer_iter->second); 00073 00074 if (Transport_debug_level > 5) { 00075 ACE_DEBUG((LM_DEBUG, 00076 ACE_TEXT("(%P|%t) SingleSendBuffer::release_acked() - ") 00077 ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"), 00078 buffer.first, buffer.second 00079 )); 00080 } 00081 while (buffer_iter != buffers_.end()) { 00082 if (buffer_iter->first == seq) { 00083 release(buffer_iter); 00084 return; 00085 } 00086 ++buffer_iter; 00087 } 00088 }
void OpenDDS::DCPS::SingleSendBuffer::release_all | ( | ) |
Definition at line 61 of file TransportSendBuffer.cpp.
References buffers_, and release().
Referenced by ~SingleSendBuffer().
00062 { 00063 for (BufferMap::iterator it(this->buffers_.begin()); 00064 it != this->buffers_.end();) { 00065 release(it++); 00066 } 00067 }
bool OpenDDS::DCPS::SingleSendBuffer::resend | ( | const SequenceRange & | range, | |
DisjointSequence * | gaps = 0 | |||
) |
Definition at line 289 of file TransportSendBuffer.cpp.
References resend_i(), and OpenDDS::DCPS::TransportSendBuffer::strategy_lock().
Referenced by OpenDDS::DCPS::ReliableSession::nak_received().
00290 { 00291 ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false); 00292 return resend_i(range, gaps); 00293 }
void OpenDDS::DCPS::SingleSendBuffer::resend_fragments_i | ( | const SequenceNumber & | sequence, | |
const DisjointSequence & | fragments | |||
) |
Definition at line 337 of file TransportSendBuffer.cpp.
References OpenDDS::DCPS::DisjointSequence::empty(), fragments_, OpenDDS::DCPS::OPENDDS_VECTOR(), OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::SequenceNumber::ZERO().
00339 { 00340 if (fragments_.empty() || requested_frags.empty()) { 00341 return; 00342 } 00343 const BufferMap& buffers = fragments_[seq]; 00344 const OPENDDS_VECTOR(SequenceRange) psr = 00345 requested_frags.present_sequence_ranges(); 00346 SequenceNumber sent = SequenceNumber::ZERO(); 00347 for (size_t i = 0; i < psr.size(); ++i) { 00348 BufferMap::const_iterator it = buffers.lower_bound(psr[i].first); 00349 if (it == buffers.end()) { 00350 return; 00351 } 00352 BufferMap::const_iterator it2 = buffers.lower_bound(psr[i].second); 00353 while (true) { 00354 if (sent < it->first) { 00355 resend_one(it->second); 00356 sent = it->first; 00357 } 00358 if (it == it2) { 00359 break; 00360 } 00361 ++it; 00362 } 00363 } 00364 }
bool OpenDDS::DCPS::SingleSendBuffer::resend_i | ( | const SequenceRange & | range, | |
DisjointSequence * | gaps = 0 | |||
) |
Definition at line 296 of file TransportSendBuffer.cpp.
References fragments_, high(), low(), OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::Transport_debug_level.
Referenced by resend(), and OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies().
00297 { 00298 //Special case, nak to make sure it has all history 00299 const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? low() : range.first; 00300 00301 for (SequenceNumber sequence(range.first); 00302 sequence <= range.second; ++sequence) { 00303 // Re-send requested sample if still buffered; missing samples 00304 // will be scored against the given DisjointSequence: 00305 BufferMap::iterator it(this->buffers_.find(sequence)); 00306 if (it == this->buffers_.end()) { 00307 if (gaps) { 00308 gaps->insert(sequence); 00309 } 00310 } else { 00311 if (Transport_debug_level > 5) { 00312 ACE_DEBUG((LM_DEBUG, 00313 ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ") 00314 ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"), 00315 sequence.getValue(), 00316 it->second.first, 00317 it->second.second)); 00318 } 00319 if (it->second.first && it->second.second) { 00320 resend_one(it->second); 00321 } else { 00322 const FragmentMap::iterator fm_it = fragments_.find(it->first); 00323 if (fm_it != fragments_.end()) { 00324 for (BufferMap::iterator bm_it = fm_it->second.begin(); 00325 bm_it != fm_it->second.end(); ++bm_it) { 00326 resend_one(bm_it->second); 00327 } 00328 } 00329 } 00330 } 00331 } 00332 // Have we resent all requested data? 00333 return lowForAllResent >= low() && range.second <= high(); 00334 }
void OpenDDS::DCPS::SingleSendBuffer::retain_all | ( | RepoId | pub_id | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendBuffer.
Definition at line 132 of file TransportSendBuffer.cpp.
References buffers_, fragments_, OPENDDS_STRING, release(), OpenDDS::DCPS::REMOVE_ERROR, retain_buffer(), and OpenDDS::DCPS::Transport_debug_level.
00133 { 00134 if (Transport_debug_level > 5) { 00135 GuidConverter converter(pub_id); 00136 ACE_DEBUG((LM_DEBUG, 00137 ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ") 00138 ACE_TEXT("copying out blocks for publication: %C\n"), 00139 OPENDDS_STRING(converter).c_str() 00140 )); 00141 } 00142 for (BufferMap::iterator it(this->buffers_.begin()); 00143 it != this->buffers_.end();) { 00144 if (it->second.first && it->second.second) { 00145 if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) { 00146 GuidConverter converter(pub_id); 00147 ACE_ERROR((LM_WARNING, 00148 ACE_TEXT("(%P|%t) WARNING: ") 00149 ACE_TEXT("SingleSendBuffer::retain_all: ") 00150 ACE_TEXT("failed to retain data from publication: %C!\n"), 00151 OPENDDS_STRING(converter).c_str())); 00152 release(it++); 00153 } else { 00154 ++it; 00155 } 00156 00157 } else { 00158 const FragmentMap::iterator fm_it = fragments_.find(it->first); 00159 if (fm_it != fragments_.end()) { 00160 for (BufferMap::iterator bm_it = fm_it->second.begin(); 00161 bm_it != fm_it->second.end();) { 00162 if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) { 00163 GuidConverter converter(pub_id); 00164 ACE_ERROR((LM_WARNING, 00165 ACE_TEXT("(%P|%t) WARNING: ") 00166 ACE_TEXT("SingleSendBuffer::retain_all: failed to ") 00167 ACE_TEXT("retain fragment data from publication: %C!\n"), 00168 OPENDDS_STRING(converter).c_str())); 00169 release(bm_it++); 00170 } else { 00171 ++bm_it; 00172 } 00173 } 00174 } 00175 ++it; 00176 } 00177 } 00178 }
RemoveResult OpenDDS::DCPS::SingleSendBuffer::retain_buffer | ( | const RepoId & | pub_id, | |
BufferType & | buffer | |||
) | [private] |
Definition at line 181 of file TransportSendBuffer.cpp.
References OpenDDS::DCPS::PacketRemoveVisitor::status().
Referenced by retain_all().
00182 { 00183 TransportQueueElement::MatchOnPubId match(pub_id); 00184 PacketRemoveVisitor visitor(match, 00185 buffer.second, 00186 buffer.second, 00187 this->replaced_allocator_, 00188 this->replaced_mb_allocator_, 00189 this->replaced_db_allocator_); 00190 00191 buffer.first->accept_replace_visitor(visitor); 00192 return visitor.status(); 00193 }
BufferMap OpenDDS::DCPS::SingleSendBuffer::buffers_ [private] |
Definition at line 123 of file TransportSendBuffer.h.
Referenced by contains(), empty(), high(), insert(), insert_fragment(), low(), release(), release_acked(), release_all(), and retain_all().
FragmentMap OpenDDS::DCPS::SingleSendBuffer::fragments_ [private] |
Definition at line 126 of file TransportSendBuffer.h.
Referenced by insert_fragment(), release(), resend_fragments_i(), resend_i(), and retain_all().
size_t OpenDDS::DCPS::SingleSendBuffer::n_chunks_ [private] |
Definition at line 119 of file TransportSendBuffer.h.
Definition at line 121 of file TransportSendBuffer.h.
Definition at line 120 of file TransportSendBuffer.h.
Definition at line 116 of file TransportSendBuffer.h.
Definition at line 118 of file TransportSendBuffer.h.
Definition at line 117 of file TransportSendBuffer.h.
const size_t OpenDDS::DCPS::SingleSendBuffer::UNLIMITED = 0 [static] |
Definition at line 73 of file TransportSendBuffer.h.
Referenced by check_capacity(), and OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert().