#include <TransportSendBuffer.h>
Implementation of TransportSendBuffer that manages data for a single domain of SequenceNumbers -- for a given SingleSendBuffer object, the sequence numbers passed to insert() must be generated from the same place.
Definition at line 72 of file TransportSendBuffer.h.
OpenDDS::DCPS::SingleSendBuffer::SingleSendBuffer | ( | size_t | capacity, | |
size_t | max_samples_per_packet | |||
) |
Definition at line 48 of file TransportSendBuffer.cpp.
00050 : TransportSendBuffer(capacity), 00051 n_chunks_(capacity * max_samples_per_packet), 00052 retained_mb_allocator_(this->n_chunks_ * 2), 00053 retained_db_allocator_(this->n_chunks_ * 2), 00054 replaced_mb_allocator_(this->n_chunks_ * 2), 00055 replaced_db_allocator_(this->n_chunks_ * 2) 00056 { 00057 }
OpenDDS::DCPS::SingleSendBuffer::~SingleSendBuffer | ( | ) |
Definition at line 59 of file TransportSendBuffer.cpp.
References release_all().
00060 { 00061 release_all(); 00062 }
void OpenDDS::DCPS::SingleSendBuffer::check_capacity | ( | ) | [private] |
Definition at line 277 of file TransportSendBuffer.cpp.
References ACE_TEXT(), buffers_, OpenDDS::DCPS::TransportSendBuffer::capacity_, destinations_, LM_DEBUG, release(), OpenDDS::DCPS::Transport_debug_level, and UNLIMITED.
Referenced by insert(), and insert_fragment().
00278 { 00279 if (this->capacity_ == SingleSendBuffer::UNLIMITED) { 00280 return; 00281 } 00282 // Age off oldest sample if we are at capacity: 00283 if (this->buffers_.size() == this->capacity_) { 00284 BufferMap::iterator it(this->buffers_.begin()); 00285 if (it == this->buffers_.end()) return; 00286 00287 if (Transport_debug_level > 5) { 00288 ACE_DEBUG((LM_DEBUG, 00289 ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ") 00290 ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"), 00291 it->first.getValue(), 00292 it->second.first, it->second.second 00293 )); 00294 } 00295 00296 destinations_.erase(it->first); 00297 release(it); 00298 } 00299 }
ACE_INLINE bool OpenDDS::DCPS::SingleSendBuffer::contains | ( | const SequenceNumber & | seq | ) | const |
Definition at line 55 of file TransportSendBuffer.inl.
References buffers_.
00056 { 00057 return this->buffers_.count(seq); 00058 }
ACE_INLINE bool OpenDDS::DCPS::SingleSendBuffer::empty | ( | void | ) | const |
Definition at line 49 of file TransportSendBuffer.inl.
References buffers_.
Referenced by OpenDDS::DCPS::ReliableSession::nak_received().
00050 { 00051 return this->buffers_.empty(); 00052 }
ACE_INLINE SequenceNumber OpenDDS::DCPS::SingleSendBuffer::high | ( | ) | const |
Definition at line 42 of file TransportSendBuffer.inl.
References buffers_.
Referenced by resend_i().
00043 { 00044 if (this->buffers_.empty()) throw std::exception(); 00045 return this->buffers_.rbegin()->first; 00046 }
void OpenDDS::DCPS::SingleSendBuffer::insert | ( | SequenceNumber | sequence, | |
TransportSendStrategy::QueueType * | queue, | |||
ACE_Message_Block * | chain | |||
) | [virtual] |
Implements OpenDDS::DCPS::TransportSendBuffer.
Definition at line 199 of file TransportSendBuffer.cpp.
References ACE_TEXT(), buffers_, check_capacity(), destinations_, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, insert_buffer(), LM_DEBUG, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::BasicQueue< T >::size(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and OpenDDS::DCPS::Transport_debug_level.
00202 { 00203 check_capacity(); 00204 00205 BufferType& buffer = this->buffers_[sequence]; 00206 insert_buffer(buffer, queue, chain); 00207 00208 if (Transport_debug_level > 5) { 00209 ACE_DEBUG((LM_DEBUG, 00210 ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ") 00211 ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"), 00212 sequence.getValue(), 00213 buffer.first, buffer.second 00214 )); 00215 } 00216 00217 if (queue && queue->size() == 1) { 00218 const TransportQueueElement* elt = queue->peek(); 00219 const RepoId subId = elt->subscription_id(); 00220 const ACE_Message_Block* msg = elt->msg(); 00221 if (msg && subId != GUID_UNKNOWN && 00222 !DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, msg)) { 00223 destinations_[sequence] = subId; 00224 } 00225 } 00226 }
void OpenDDS::DCPS::SingleSendBuffer::insert_buffer | ( | BufferType & | buffer, | |
TransportSendStrategy::QueueType * | queue, | |||
ACE_Message_Block * | chain | |||
) | [private] |
Definition at line 229 of file TransportSendBuffer.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), OpenDDS::DCPS::TransportQueueElement::clone_mb(), retained_db_allocator_, and retained_mb_allocator_.
Referenced by insert(), and insert_fragment().
00232 { 00233 // Copy sample's TransportQueueElements: 00234 TransportSendStrategy::QueueType*& elems = buffer.first; 00235 ACE_NEW(elems, TransportSendStrategy::QueueType()); 00236 00237 CopyChainVisitor visitor(*elems, 00238 &this->retained_mb_allocator_, 00239 &this->retained_db_allocator_); 00240 queue->accept_visitor(visitor); 00241 00242 // Copy sample's message/data block descriptors: 00243 ACE_Message_Block*& data = buffer.second; 00244 data = TransportQueueElement::clone_mb(chain, 00245 &this->retained_mb_allocator_, 00246 &this->retained_db_allocator_); 00247 }
void OpenDDS::DCPS::SingleSendBuffer::insert_fragment | ( | SequenceNumber | sequence, | |
SequenceNumber | fragment, | |||
TransportSendStrategy::QueueType * | queue, | |||
ACE_Message_Block * | chain | |||
) |
Definition at line 250 of file TransportSendBuffer.cpp.
References ACE_TEXT(), buffers_, check_capacity(), fragments_, OpenDDS::DCPS::SequenceNumber::getValue(), insert_buffer(), LM_DEBUG, and OpenDDS::DCPS::Transport_debug_level.
00254 { 00255 check_capacity(); 00256 00257 // Insert into buffers_ so that the overall capacity is maintained 00258 // The entry in buffers_ with two null pointers indicates that the 00259 // actual data is stored in fragments_[sequence]. 00260 buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0), 00261 static_cast<ACE_Message_Block*>(0)); 00262 00263 BufferType& buffer = fragments_[sequence][fragment]; 00264 insert_buffer(buffer, queue, chain); 00265 00266 if (Transport_debug_level > 5) { 00267 ACE_DEBUG((LM_DEBUG, 00268 ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ") 00269 ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"), 00270 sequence.getValue(), fragment.getValue(), 00271 buffer.first, buffer.second 00272 )); 00273 } 00274 }
ACE_INLINE SequenceNumber OpenDDS::DCPS::SingleSendBuffer::low | ( | ) | const |
Definition at line 35 of file TransportSendBuffer.inl.
References buffers_.
Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), and resend_i().
00036 { 00037 if (this->buffers_.empty()) throw std::exception(); 00038 return this->buffers_.begin()->first; 00039 }
ACE_INLINE size_t OpenDDS::DCPS::SingleSendBuffer::n_chunks | ( | ) | const |
Definition at line 29 of file TransportSendBuffer.inl.
References n_chunks_.
00030 { 00031 return this->n_chunks_; 00032 }
typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP | ( | SequenceNumber | , | |
RepoId | ||||
) | [private] |
typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP | ( | SequenceNumber | , | |
BufferMap | ||||
) | [private] |
typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP | ( | SequenceNumber | , | |
BufferType | ||||
) |
void OpenDDS::DCPS::SingleSendBuffer::release | ( | BufferMap::iterator | buffer_iter | ) |
Definition at line 95 of file TransportSendBuffer.cpp.
References ACE_TEXT(), buffers_, fragments_, LM_DEBUG, and OpenDDS::DCPS::Transport_debug_level.
Referenced by check_capacity(), release_acked(), release_all(), and retain_all().
00096 { 00097 BufferType& buffer(buffer_iter->second); 00098 if (Transport_debug_level > 5) { 00099 ACE_DEBUG((LM_DEBUG, 00100 ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ") 00101 ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"), 00102 buffer.first, buffer.second 00103 )); 00104 } 00105 00106 if (buffer.first && buffer.second) { 00107 // not a fragment 00108 RemoveAllVisitor visitor; 00109 buffer.first->accept_remove_visitor(visitor); 00110 delete buffer.first; 00111 00112 buffer.second->release(); 00113 buffer.second = 0; 00114 00115 } else { 00116 // data actually stored in fragments_ 00117 const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first); 00118 if (fm_it != fragments_.end()) { 00119 for (BufferMap::iterator bm_it = fm_it->second.begin(); 00120 bm_it != fm_it->second.end(); ++bm_it) { 00121 RemoveAllVisitor visitor; 00122 bm_it->second.first->accept_remove_visitor(visitor); 00123 delete bm_it->second.first; 00124 00125 bm_it->second.second->release(); 00126 bm_it->second.second = 0; 00127 } 00128 fragments_.erase(fm_it); 00129 } 00130 } 00131 00132 buffers_.erase(buffer_iter); 00133 }
void OpenDDS::DCPS::SingleSendBuffer::release_acked | ( | SequenceNumber | seq | ) |
Definition at line 74 of file TransportSendBuffer.cpp.
References ACE_TEXT(), buffers_, LM_DEBUG, release(), and OpenDDS::DCPS::Transport_debug_level.
00074 { 00075 BufferMap::iterator buffer_iter = buffers_.begin(); 00076 BufferType& buffer(buffer_iter->second); 00077 00078 if (Transport_debug_level > 5) { 00079 ACE_DEBUG((LM_DEBUG, 00080 ACE_TEXT("(%P|%t) SingleSendBuffer::release_acked() - ") 00081 ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"), 00082 buffer.first, buffer.second 00083 )); 00084 } 00085 while (buffer_iter != buffers_.end()) { 00086 if (buffer_iter->first == seq) { 00087 release(buffer_iter); 00088 return; 00089 } 00090 ++buffer_iter; 00091 } 00092 }
void OpenDDS::DCPS::SingleSendBuffer::release_all | ( | ) |
Definition at line 65 of file TransportSendBuffer.cpp.
References buffers_, and release().
Referenced by ~SingleSendBuffer().
00066 { 00067 for (BufferMap::iterator it(this->buffers_.begin()); 00068 it != this->buffers_.end();) { 00069 release(it++); 00070 } 00071 }
bool OpenDDS::DCPS::SingleSendBuffer::resend | ( | const SequenceRange & | range, | |
DisjointSequence * | gaps = 0 | |||
) |
Definition at line 302 of file TransportSendBuffer.cpp.
References resend_i(), and OpenDDS::DCPS::TransportSendBuffer::strategy_lock().
Referenced by OpenDDS::DCPS::ReliableSession::nak_received().
00303 { 00304 ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false); 00305 return resend_i(range, gaps); 00306 }
void OpenDDS::DCPS::SingleSendBuffer::resend_fragments_i | ( | const SequenceNumber & | sequence, | |
const DisjointSequence & | fragments | |||
) |
Definition at line 363 of file TransportSendBuffer.cpp.
References OpenDDS::DCPS::DisjointSequence::empty(), fragments_, OpenDDS::DCPS::OPENDDS_VECTOR(), OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::SequenceNumber::ZERO().
00365 { 00366 if (fragments_.empty() || requested_frags.empty()) { 00367 return; 00368 } 00369 const BufferMap& buffers = fragments_[seq]; 00370 const OPENDDS_VECTOR(SequenceRange) psr = 00371 requested_frags.present_sequence_ranges(); 00372 SequenceNumber sent = SequenceNumber::ZERO(); 00373 for (size_t i = 0; i < psr.size(); ++i) { 00374 BufferMap::const_iterator it = buffers.lower_bound(psr[i].first); 00375 if (it == buffers.end()) { 00376 return; 00377 } 00378 BufferMap::const_iterator it2 = buffers.lower_bound(psr[i].second); 00379 while (true) { 00380 if (sent < it->first) { 00381 resend_one(it->second); 00382 sent = it->first; 00383 } 00384 if (it == it2) { 00385 break; 00386 } 00387 ++it; 00388 } 00389 } 00390 }
bool OpenDDS::DCPS::SingleSendBuffer::resend_i | ( | const SequenceRange & | range, | |
DisjointSequence * | gaps, | |||
const RepoId & | destination | |||
) |
Definition at line 315 of file TransportSendBuffer.cpp.
References ACE_TEXT(), buffers_, destinations_, fragments_, OpenDDS::DCPS::GUID_UNKNOWN, high(), OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, low(), OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::Transport_debug_level.
00317 { 00318 //Special case, nak to make sure it has all history 00319 const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? low() : range.first; 00320 const bool has_dest = destination != GUID_UNKNOWN; 00321 00322 for (SequenceNumber sequence(range.first); 00323 sequence <= range.second; ++sequence) { 00324 // Re-send requested sample if still buffered; missing samples 00325 // will be scored against the given DisjointSequence: 00326 BufferMap::iterator it(buffers_.find(sequence)); 00327 DestinationMap::iterator dest_data; 00328 if (has_dest) { 00329 dest_data = destinations_.find(sequence); 00330 } 00331 if (it == buffers_.end() || (has_dest && (dest_data == destinations_.end() || 00332 dest_data->second != destination))) { 00333 if (gaps) { 00334 gaps->insert(sequence); 00335 } 00336 } else { 00337 if (Transport_debug_level > 5) { 00338 ACE_DEBUG((LM_DEBUG, 00339 ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ") 00340 ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"), 00341 sequence.getValue(), 00342 it->second.first, 00343 it->second.second)); 00344 } 00345 if (it->second.first && it->second.second) { 00346 resend_one(it->second); 00347 } else { 00348 const FragmentMap::iterator fm_it = fragments_.find(it->first); 00349 if (fm_it != fragments_.end()) { 00350 for (BufferMap::iterator bm_it = fm_it->second.begin(); 00351 bm_it != fm_it->second.end(); ++bm_it) { 00352 resend_one(bm_it->second); 00353 } 00354 } 00355 } 00356 } 00357 } 00358 // Have we resent all requested data? 00359 return lowForAllResent >= low() && range.second <= high(); 00360 }
bool OpenDDS::DCPS::SingleSendBuffer::resend_i | ( | const SequenceRange & | range, | |
DisjointSequence * | gaps = 0 | |||
) |
Definition at line 309 of file TransportSendBuffer.cpp.
References OpenDDS::DCPS::GUID_UNKNOWN.
Referenced by resend(), OpenDDS::DCPS::RtpsUdpDataLink::send_directed_nack_replies(), and OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies().
00310 { 00311 return resend_i(range, gaps, GUID_UNKNOWN); 00312 }
void OpenDDS::DCPS::SingleSendBuffer::retain_all | ( | RepoId | pub_id | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendBuffer.
Definition at line 136 of file TransportSendBuffer.cpp.
References ACE_TEXT(), buffers_, fragments_, LM_DEBUG, LM_WARNING, OPENDDS_STRING, release(), OpenDDS::DCPS::REMOVE_ERROR, retain_buffer(), and OpenDDS::DCPS::Transport_debug_level.
00137 { 00138 if (Transport_debug_level > 5) { 00139 GuidConverter converter(pub_id); 00140 ACE_DEBUG((LM_DEBUG, 00141 ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ") 00142 ACE_TEXT("copying out blocks for publication: %C\n"), 00143 OPENDDS_STRING(converter).c_str() 00144 )); 00145 } 00146 for (BufferMap::iterator it(this->buffers_.begin()); 00147 it != this->buffers_.end();) { 00148 if (it->second.first && it->second.second) { 00149 if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) { 00150 GuidConverter converter(pub_id); 00151 ACE_ERROR((LM_WARNING, 00152 ACE_TEXT("(%P|%t) WARNING: ") 00153 ACE_TEXT("SingleSendBuffer::retain_all: ") 00154 ACE_TEXT("failed to retain data from publication: %C!\n"), 00155 OPENDDS_STRING(converter).c_str())); 00156 release(it++); 00157 } else { 00158 ++it; 00159 } 00160 00161 } else { 00162 const FragmentMap::iterator fm_it = fragments_.find(it->first); 00163 if (fm_it != fragments_.end()) { 00164 for (BufferMap::iterator bm_it = fm_it->second.begin(); 00165 bm_it != fm_it->second.end();) { 00166 if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) { 00167 GuidConverter converter(pub_id); 00168 ACE_ERROR((LM_WARNING, 00169 ACE_TEXT("(%P|%t) WARNING: ") 00170 ACE_TEXT("SingleSendBuffer::retain_all: failed to ") 00171 ACE_TEXT("retain fragment data from publication: %C!\n"), 00172 OPENDDS_STRING(converter).c_str())); 00173 release(bm_it++); 00174 } else { 00175 ++bm_it; 00176 } 00177 } 00178 } 00179 ++it; 00180 } 00181 } 00182 }
RemoveResult OpenDDS::DCPS::SingleSendBuffer::retain_buffer | ( | const RepoId & | pub_id, | |
BufferType & | buffer | |||
) | [private] |
Definition at line 185 of file TransportSendBuffer.cpp.
References OpenDDS::DCPS::PacketRemoveVisitor::status().
Referenced by retain_all().
00186 { 00187 TransportQueueElement::MatchOnPubId match(pub_id); 00188 PacketRemoveVisitor visitor(match, 00189 buffer.second, 00190 buffer.second, 00191 this->replaced_mb_allocator_, 00192 this->replaced_db_allocator_); 00193 00194 buffer.first->accept_replace_visitor(visitor); 00195 return visitor.status(); 00196 }
BufferMap OpenDDS::DCPS::SingleSendBuffer::buffers_ [private] |
Definition at line 126 of file TransportSendBuffer.h.
Referenced by check_capacity(), contains(), empty(), high(), insert(), insert_fragment(), low(), release(), release_acked(), release_all(), resend_i(), and retain_all().
DestinationMap OpenDDS::DCPS::SingleSendBuffer::destinations_ [private] |
Definition at line 132 of file TransportSendBuffer.h.
Referenced by check_capacity(), insert(), and resend_i().
FragmentMap OpenDDS::DCPS::SingleSendBuffer::fragments_ [private] |
Definition at line 129 of file TransportSendBuffer.h.
Referenced by insert_fragment(), release(), resend_fragments_i(), resend_i(), and retain_all().
Definition at line 119 of file TransportSendBuffer.h.
Referenced by n_chunks().
Definition at line 124 of file TransportSendBuffer.h.
Definition at line 123 of file TransportSendBuffer.h.
Definition at line 122 of file TransportSendBuffer.h.
Referenced by insert_buffer().
Definition at line 121 of file TransportSendBuffer.h.
Referenced by insert_buffer().
const size_t OpenDDS::DCPS::SingleSendBuffer::UNLIMITED = 0 [static] |
Definition at line 76 of file TransportSendBuffer.h.
Referenced by check_capacity(), and OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert().