#include <ReliableSession.h>
Definition at line 47 of file ReliableSession.h.
typedef SequenceNumber OpenDDS::DCPS::ReliableSession::TransportHeaderSN [private] |
Definition at line 92 of file ReliableSession.h.
OpenDDS::DCPS::ReliableSession::ReliableSession | ( | ACE_Reactor * | reactor, | |
ACE_thread_t | owner, | |||
MulticastDataLink * | link, | |||
MulticastPeer | remote_peer | |||
) |
Definition at line 61 of file ReliableSession.cpp.
00065 : MulticastSession(reactor, owner, link, remote_peer), 00066 nak_watchdog_(make_rch<NakWatchdog> (reactor, owner, this)) 00067 { 00068 }
OpenDDS::DCPS::ReliableSession::~ReliableSession | ( | ) |
Definition at line 70 of file ReliableSession.cpp.
References nak_watchdog_.
00071 { 00072 nak_watchdog_->cancel(); 00073 nak_watchdog_->wait(); 00074 }
bool OpenDDS::DCPS::ReliableSession::check_header | ( | const TransportHeader & | header | ) | [virtual] |
Implements OpenDDS::DCPS::MulticastSession.
Definition at line 83 of file ReliableSession.cpp.
References OpenDDS::DCPS::MulticastSession::active_, OpenDDS::DCPS::DisjointSequence::insert(), nak_sequence_, OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::TransportHeader::sequence_, and OpenDDS::DCPS::TransportHeader::source_.
00084 { 00085 // Not from the remote peer for this session. 00086 if (this->remote_peer_ != header.source_) return false; 00087 00088 // Active sessions don't need to track nak_sequence_ 00089 if (this->active_) return true; 00090 00091 // Update last seen sequence for remote peer; return false if we 00092 // have already seen this datagram to prevent duplicate delivery 00093 // Note: SN 2 is first SN recorded - fill in up to 2 when rcvd 00094 return this->nak_sequence_.insert(SequenceRange( 00095 header.sequence_ == 2 ? SequenceNumber() : header.sequence_, 00096 header.sequence_)); 00097 }
bool OpenDDS::DCPS::ReliableSession::control_received | ( | char | submessage_id, | |
const Message_Block_Ptr & | control | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 217 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::MulticastSession::control_received(), LM_WARNING, OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MULTICAST_NAKACK, nak_received(), and nakack_received().
00219 { 00220 if (MulticastSession::control_received(submessage_id, control)) { 00221 return true; // base class handled message 00222 } 00223 00224 switch (submessage_id) { 00225 case MULTICAST_NAK: 00226 nak_received(control); 00227 break; 00228 00229 case MULTICAST_NAKACK: 00230 nakack_received(control); 00231 break; 00232 00233 default: 00234 ACE_ERROR((LM_WARNING, 00235 ACE_TEXT("(%P|%t) WARNING: ") 00236 ACE_TEXT("ReliableSession::control_received: ") 00237 ACE_TEXT("unknown TRANSPORT_CONTROL submessage: 0x%x!\n"), 00238 submessage_id)); 00239 break; 00240 } 00241 return true; 00242 }
void OpenDDS::DCPS::ReliableSession::deliver_held_data | ( | ) |
Definition at line 169 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::DisjointSequence::empty(), held_lock_, OpenDDS::DCPS::MulticastSession::link_, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), nak_sequence_, OPENDDS_MULTIMAP(), OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), and OpenDDS::DCPS::Transport_debug_level.
Referenced by expire_naks(), nakack_received(), ready_to_deliver(), and record_header_received().
00170 { 00171 if (nak_sequence_.empty() || nak_sequence_.low() > 1) return; 00172 00173 OPENDDS_VECTOR(ReceivedDataSample) to_deliver; 00174 const SequenceNumber ca = nak_sequence_.cumulative_ack(); 00175 00176 { 00177 ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_); 00178 00179 typedef OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator iter; 00180 const iter end = this->held_.upper_bound(ca); 00181 for (iter it = this->held_.begin(); it != end; /*increment in loop body*/) { 00182 if (Transport_debug_level > 5) { 00183 GuidConverter writer(it->second.header_.publication_id_); 00184 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::deliver_held_data -") 00185 ACE_TEXT(" deliver tseq: %q dseq: %q from %C\n"), 00186 it->first.getValue(), 00187 it->second.header_.sequence_.getValue(), 00188 OPENDDS_STRING(writer).c_str())); 00189 } 00190 to_deliver.push_back(it->second); 00191 this->held_.erase(it++); 00192 } 00193 } 00194 for (size_t i = 0; i < to_deliver.size(); ++i) { 00195 this->link_->data_received(to_deliver.at(i)); 00196 } 00197 }
void OpenDDS::DCPS::ReliableSession::expire_naks | ( | ) |
Definition at line 258 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::MulticastDataLink::config(), OpenDDS::DCPS::TransportReassembly::data_unavailable(), deliver_held_data(), ACE_OS::gettimeofday(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link_, LM_WARNING, OpenDDS::DCPS::DisjointSequence::low(), nak_requests_, nak_sequence_, OpenDDS::DCPS::MulticastInst::nak_timeout_, OpenDDS::DCPS::MulticastSession::reassembly_, and OpenDDS::DCPS::MulticastSession::remote_peer_.
Referenced by OpenDDS::DCPS::NakWatchdog::on_interval().
00259 { 00260 if (this->nak_requests_.empty()) return; // nothing to expire 00261 00262 ACE_Time_Value deadline(ACE_OS::gettimeofday()); 00263 deadline -= this->link_->config().nak_timeout_; 00264 00265 NakRequestMap::iterator first(this->nak_requests_.begin()); 00266 NakRequestMap::iterator last(this->nak_requests_.upper_bound(deadline)); 00267 00268 if (first == last) return; // nothing to expire 00269 00270 // Skip unrecoverable datagrams to 00271 // re-establish a baseline to detect future reception gaps. 00272 SequenceNumber lastSeq = (last == this->nak_requests_.end()) 00273 ? this->nak_requests_.rbegin()->second 00274 : last->second; 00275 00276 std::vector<SequenceRange> dropped; 00277 if (this->nak_sequence_.insert(SequenceRange(this->nak_sequence_.low(), 00278 lastSeq), dropped)) { 00279 00280 for (size_t i = 0; i < dropped.size(); ++i) { 00281 this->reassembly_.data_unavailable(dropped[i]); 00282 } 00283 00284 ACE_ERROR((LM_WARNING, 00285 ACE_TEXT("(%P|%t) WARNING: ") 00286 ACE_TEXT("ReliableSession::expire_naks: ") 00287 ACE_TEXT("timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"), 00288 (unsigned int)(this->remote_peer_ >> 32), 00289 (unsigned int) this->remote_peer_, 00290 this->nak_sequence_.low().getValue(), 00291 lastSeq.getValue())); 00292 } 00293 00294 // Clear expired repair requests: 00295 this->nak_requests_.erase(first, last); 00296 deliver_held_data(); 00297 }
virtual bool OpenDDS::DCPS::ReliableSession::is_reliable | ( | ) | [inline, virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 79 of file ReliableSession.h.
void OpenDDS::DCPS::ReliableSession::nak_received | ( | const Message_Block_Ptr & | control | ) |
Definition at line 517 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::MulticastSession::active_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SingleSendBuffer::empty(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::SequenceNumber::getValue(), header, OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, LM_DEBUG, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::SingleSendBuffer::low(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::SingleSendBuffer::resend(), OpenDDS::DCPS::MulticastDataLink::send_buffer(), send_nakack(), size, OpenDDS::DCPS::TransportHeader::source_, and OpenDDS::DCPS::TransportHeader::swap_bytes().
Referenced by control_received().
00518 { 00519 if (!this->active_) return; // sub send naks, then doesn't receive them. 00520 00521 const TransportHeader& header = 00522 this->link_->receive_strategy()->received_header(); 00523 00524 Serializer serializer(control.get(), header.swap_bytes()); 00525 00526 MulticastPeer local_peer; 00527 CORBA::ULong size = 0; 00528 serializer >> local_peer; // sent as remote_peer 00529 serializer >> size; 00530 00531 std::vector<SequenceRange> ranges; 00532 00533 for (CORBA::ULong i = 0; i < size; ++i) { 00534 SequenceRange range; 00535 serializer >> range.first; 00536 serializer >> range.second; 00537 ranges.push_back(range); 00538 } 00539 00540 // Ignore sample if not destined for us: 00541 if ((local_peer != this->link_->local_peer()) // Not to us. 00542 || (this->remote_peer_ != header.source_)) return; // Not from the remote peer for this session. 00543 00544 SingleSendBuffer* send_buffer = this->link_->send_buffer(); 00545 // Broadcast a MULTICAST_NAKACK control sample before resending to suppress 00546 // repair requests for unrecoverable samples by providing a 00547 // new low-water mark for affected peers: 00548 if (!send_buffer->empty() && send_buffer->low() > ranges.begin()->first) { 00549 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00550 ACE_DEBUG ((LM_DEBUG, 00551 ACE_TEXT ("(%P|%t) ReliableSession::nak_received") 00552 ACE_TEXT (" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"), 00553 (unsigned int)(this->link()->local_peer() >> 32), 00554 (unsigned int) this->link()->local_peer(), 00555 (unsigned int)(this->remote_peer_ >> 32), 00556 (unsigned int) this->remote_peer_, 00557 send_buffer->low().getValue())); 00558 } 00559 send_nakack(send_buffer->low()); 00560 } 00561 00562 for (CORBA::ULong i = 0; i < size; ++i) { 00563 bool ret = send_buffer->resend(ranges[i]); 00564 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00565 ACE_DEBUG ((LM_DEBUG, 00566 ACE_TEXT ("(%P|%t) ReliableSession::nak_received") 00567 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"), 00568 (unsigned int)(this->link()->local_peer() >> 32), 00569 (unsigned int) this->link()->local_peer(), 00570 (unsigned int)(this->remote_peer_ >> 32), 00571 (unsigned int) this->remote_peer_, 00572 ranges[i].first.getValue(), ranges[i].second.getValue(), 00573 ret ? "SUCCESS" : "FAILED")); 00574 } 00575 } 00576 }
void OpenDDS::DCPS::ReliableSession::nakack_received | ( | const Message_Block_Ptr & | control | ) |
Definition at line 615 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::MulticastSession::active_, OpenDDS::DCPS::TransportReassembly::data_unavailable(), OpenDDS::DCPS::DCPS_debug_level, deliver_held_data(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, LM_WARNING, nak_sequence_, OpenDDS::DCPS::MulticastSession::reassembly_, OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::TransportHeader::source_, and OpenDDS::DCPS::TransportHeader::swap_bytes().
Referenced by control_received().
00616 { 00617 if (this->active_) return; // pub send nakack, doesn't receive them. 00618 00619 const TransportHeader& header = 00620 this->link_->receive_strategy()->received_header(); 00621 00622 // Not from the remote peer for this session. 00623 if (this->remote_peer_ != header.source_) return; 00624 00625 Serializer serializer(control.get(), header.swap_bytes()); 00626 00627 SequenceNumber low; 00628 serializer >> low; 00629 00630 // MULTICAST_NAKACK control samples indicate data which cannot be 00631 // repaired by a remote peer; if any values were needed below 00632 // this value, then the sequence needs to be shifted: 00633 std::vector<SequenceRange> dropped; 00634 SequenceNumber range_low = SequenceNumber(); 00635 SequenceNumber range_high = low == SequenceNumber() ? SequenceNumber() : low.previous(); 00636 00637 if (range_low == SequenceNumber() && range_high == SequenceNumber()) { 00638 00639 this->nak_sequence_.insert(range_low); 00640 00641 } else if (this->nak_sequence_.insert(SequenceRange(range_low, range_high), dropped)) { 00642 00643 for (size_t i = 0; i < dropped.size(); ++i) { 00644 this->reassembly_.data_unavailable(dropped[i]); 00645 } 00646 00647 if (DCPS_debug_level > 0) { 00648 ACE_ERROR((LM_WARNING, 00649 ACE_TEXT("(%P|%t) WARNING: ReliableSession::nakack_received ") 00650 ACE_TEXT("local %#08x%08x remote %#08x%08x with low [%q] ") 00651 ACE_TEXT("- some ranges dropped.\n"), 00652 (unsigned int)(this->link()->local_peer() >> 32), 00653 (unsigned int) this->link()->local_peer(), 00654 (unsigned int)(this->remote_peer_ >> 32), 00655 (unsigned int) this->remote_peer_, 00656 low.getValue())); 00657 } 00658 } 00659 deliver_held_data(); 00660 }
typedef OpenDDS::DCPS::ReliableSession::OPENDDS_MAP | ( | ACE_Time_Value | , | |
SequenceNumber | ||||
) | [private] |
OpenDDS::DCPS::ReliableSession::OPENDDS_MULTIMAP | ( | TransportHeaderSN | , | |
ReceivedDataSample | ||||
) | [private] |
Referenced by deliver_held_data(), ready_to_deliver(), and release_remote().
typedef OpenDDS::DCPS::ReliableSession::OPENDDS_SET | ( | SequenceRange | ) | [private] |
bool OpenDDS::DCPS::ReliableSession::ready_to_deliver | ( | const TransportHeader & | header, | |
const ReceivedDataSample & | data | |||
) | [virtual] |
Implements OpenDDS::DCPS::MulticastSession.
Definition at line 115 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::MulticastSession::acked(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), deliver_held_data(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataSample::header_, held_lock_, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), nak_sequence_, OPENDDS_MULTIMAP(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::TransportHeader::sequence_, and OpenDDS::DCPS::Transport_debug_level.
00117 { 00118 if (!acked() 00119 || nak_sequence_.disjoint() 00120 || (!nak_sequence_.empty() && nak_sequence_.cumulative_ack() != header.sequence_) 00121 || (!nak_sequence_.empty() && nak_sequence_.low() > 1) 00122 || (nak_sequence_.empty() && header.sequence_ > 1)) { 00123 00124 if (Transport_debug_level > 5) { 00125 GuidConverter writer(data.header_.publication_id_); 00126 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -") 00127 ACE_TEXT(" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"), 00128 header.sequence_.getValue(), 00129 data.header_.sequence_.getValue(), 00130 OPENDDS_STRING(writer).c_str())); 00131 } 00132 { 00133 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, held_lock_, false); 00134 00135 held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.sequence_, data)); 00136 00137 if (Transport_debug_level > 5) { 00138 OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator it = held_.begin(); 00139 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -") 00140 ACE_TEXT(" held_ data currently contains: %d samples\n"), 00141 held_.size())); 00142 while (it != held_.end()) { 00143 GuidConverter writer(it->second.header_.publication_id_); 00144 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -") 00145 ACE_TEXT(" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"), 00146 it->first.getValue(), 00147 it->second.header_.sequence_.getValue(), 00148 OPENDDS_STRING(writer).c_str())); 00149 ++it; 00150 } 00151 } 00152 } 00153 deliver_held_data(); 00154 return false; 00155 } else { 00156 if (Transport_debug_level > 5) { 00157 GuidConverter writer(data.header_.publication_id_); 00158 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -") 00159 ACE_TEXT(" tseq: %q data seq: %q from %C OK to deliver\n"), 00160 header.sequence_.getValue(), 00161 data.header_.sequence_.getValue(), 00162 OPENDDS_STRING(writer).c_str())); 00163 } 00164 return true; 00165 } 00166 }
void OpenDDS::DCPS::ReliableSession::record_header_received | ( | const TransportHeader & | header | ) | [virtual] |
Implements OpenDDS::DCPS::MulticastSession.
Definition at line 100 of file ReliableSession.cpp.
References OpenDDS::DCPS::MulticastSession::active_, deliver_held_data(), OpenDDS::DCPS::DisjointSequence::insert(), nak_sequence_, OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::TransportHeader::sequence_, and OpenDDS::DCPS::TransportHeader::source_.
00101 { 00102 // Not from the remote peer for this session. 00103 if (this->remote_peer_ != header.source_) return; 00104 00105 // Active sessions don't need to track nak_sequence_ 00106 if (this->active_) return; 00107 00108 // Update nak sequence for seen sequence from remote peer 00109 this->nak_sequence_.insert(header.sequence_); 00110 deliver_held_data(); 00111 }
void OpenDDS::DCPS::ReliableSession::release_remote | ( | const RepoId & | remote | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 200 of file ReliableSession.cpp.
References held_lock_, and OPENDDS_MULTIMAP().
00201 { 00202 ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_); 00203 if (!held_.empty()) { 00204 OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample)::iterator it = held_.begin(); 00205 while (it != held_.end()) { 00206 if (it->second.header_.publication_id_ == remote) { 00207 held_.erase(it++); 00208 } else { 00209 it++; 00210 } 00211 } 00212 } 00213 }
void OpenDDS::DCPS::ReliableSession::send_nakack | ( | SequenceNumber | low | ) | [virtual] |
Definition at line 663 of file ReliableSession.cpp.
References OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::SequenceNumber::getValue(), len, OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_NAKACK, and OpenDDS::DCPS::MulticastSession::send_control().
Referenced by nak_received().
00664 { 00665 size_t len = sizeof(low.getValue()); 00666 00667 Message_Block_Ptr data(new ACE_Message_Block(len)); 00668 00669 Serializer serializer(data.get()); 00670 00671 serializer << low; 00672 // Broadcast control sample to all peers: 00673 send_control(MULTICAST_NAKACK, move(data)); 00674 }
void OpenDDS::DCPS::ReliableSession::send_naks | ( | DisjointSequence & | found | ) |
Definition at line 579 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, len, OpenDDS::DCPS::MulticastSession::link(), LM_DEBUG, OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::MulticastSession::send_control(), size, and ACE_Utils::truncate_cast().
00580 { 00581 const std::vector<SequenceRange> ranges(received.missing_sequence_ranges()); 00582 00583 CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size()); 00584 00585 size_t len = sizeof(this->remote_peer_) 00586 + sizeof(size) 00587 + size * 2 * sizeof(SequenceNumber); 00588 00589 Message_Block_Ptr data(new ACE_Message_Block(len)); 00590 00591 Serializer serializer(data.get()); 00592 00593 serializer << this->remote_peer_; 00594 serializer << size; 00595 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin(); 00596 iter != ranges.end(); ++iter) { 00597 serializer << iter->first; 00598 serializer << iter->second; 00599 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00600 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks (Disjoint) ") 00601 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"), 00602 (unsigned int)(this->link()->local_peer() >> 32), 00603 (unsigned int) this->link()->local_peer(), 00604 (unsigned int)(this->remote_peer_ >> 32), 00605 (unsigned int) this->remote_peer_, 00606 iter->first.getValue(), iter->second.getValue())); 00607 } 00608 } 00609 // Send control sample to remote peer: 00610 send_control(MULTICAST_NAK, move(data)); 00611 }
void OpenDDS::DCPS::ReliableSession::send_naks | ( | ) | [virtual] |
The range first - second will be skipped (no naks sent for it).
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 300 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::MulticastSession::acked(), OpenDDS::DCPS::MulticastDataLink::config(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::dump(), ACE_OS::gettimeofday(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::insert(), len, OpenDDS::DCPS::MulticastSession::link(), LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MulticastInst::nak_delay_intervals_, OpenDDS::DCPS::MulticastInst::nak_max_, nak_peers_, nak_requests_, nak_sequence_, OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::MulticastSession::send_control(), size, and ACE_Utils::truncate_cast().
Referenced by OpenDDS::DCPS::NakWatchdog::on_interval().
00301 { 00302 // Could get data samples before syn control message. 00303 // No use nak'ing until syn control message is received and session is acked. 00304 if (!this->acked()) { 00305 if (DCPS_debug_level > 5) { 00306 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00307 ACE_TEXT("remote %#08x%08x session NOT acked yet, don't send naks\n"), 00308 (unsigned int)(this->link()->local_peer() >> 32), 00309 (unsigned int) this->link()->local_peer(), 00310 (unsigned int)(this->remote_peer_ >> 32), 00311 (unsigned int) this->remote_peer_)); 00312 } 00313 return; 00314 } 00315 00316 if (DCPS_debug_level > 5) { 00317 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00318 ACE_TEXT("remote %#08x%08x nak request size %d \n"), 00319 (unsigned int)(this->link()->local_peer() >> 32), 00320 (unsigned int) this->link()->local_peer(), 00321 (unsigned int)(this->remote_peer_ >> 32), 00322 (unsigned int) this->remote_peer_, 00323 this->nak_requests_.size())); 00324 } 00325 00326 if (!(this->nak_sequence_.low() > 1) && !this->nak_sequence_.disjoint()) { 00327 if (DCPS_debug_level > 5) { 00328 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00329 ACE_TEXT("remote %#08x%08x nak sequence not disjoint, don't send naks \n"), 00330 (unsigned int)(this->link()->local_peer() >> 32), 00331 (unsigned int) this->link()->local_peer(), 00332 (unsigned int)(this->remote_peer_ >> 32), 00333 (unsigned int) this->remote_peer_)); 00334 } 00335 00336 if (DCPS_debug_level > 9) { 00337 const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges()); 00338 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin(); 00339 iter != ranges.end(); ++iter) { 00340 ACE_DEBUG((LM_DEBUG, "(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n", 00341 (unsigned int)(this->link()->local_peer() >> 32), 00342 (unsigned int) this->link()->local_peer(), 00343 (unsigned int)(this->remote_peer_ >> 32), 00344 (unsigned int) this->remote_peer_, 00345 iter->first.getValue(), 00346 iter->second.getValue())); 00347 } 00348 } 00349 return; // nothing to send 00350 } 00351 00352 ACE_Time_Value now(ACE_OS::gettimeofday()); 00353 00354 // Record low-water mark for this interval; this value will 00355 // be used to reset the low-water mark in the event the remote 00356 // peer becomes unresponsive: 00357 if (this->nak_sequence_.low() > 1) { 00358 this->nak_requests_[now] = SequenceNumber(); 00359 } else { 00360 this->nak_requests_[now] = this->nak_sequence_.cumulative_ack(); 00361 } 00362 00363 typedef std::vector<SequenceRange> RangeVector; 00364 RangeVector ignored; 00365 00366 /// The range first - second will be skipped (no naks sent for it). 00367 SequenceNumber first; 00368 SequenceNumber second; 00369 00370 NakRequestMap::reverse_iterator itr(this->nak_requests_.rbegin()); 00371 00372 if (this->nak_requests_.size() > 1) { 00373 // The sequences between rbegin - 1 and rbegin will not be ignored for naking. 00374 ++itr; 00375 00376 size_t nak_delay_intervals = this->link()->config().nak_delay_intervals_; 00377 size_t nak_max = this->link()->config().nak_max_; 00378 size_t sz = this->nak_requests_.size(); 00379 00380 // Image i is the index of element in nak_requests_ in reverse order. 00381 // index 0 sequence is most recent high water mark. 00382 // e.g index , 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 00383 // 0 (rbegin) is always skipped because missing sample between 1 and 0 interval 00384 // should always be naked., 00385 // if nak_delay_intervals=4, nak_max=3, any sequence between 5 - 1, 10 - 6, 15 - 11 00386 // are skipped for naking due to nak_delay_intervals and 20 - 16 are skipped for 00387 // naking due to nak_max. 00388 for (size_t i = 1; i < sz; ++i) { 00389 if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) { 00390 if (first != SequenceNumber()) { 00391 first = this->nak_requests_.begin()->second; 00392 } 00393 else { 00394 ignored.push_back(std::make_pair(this->nak_requests_.begin()->second, itr->second)); 00395 } 00396 break; 00397 } 00398 00399 if (i % (nak_delay_intervals + 1) == 1) { 00400 second = itr->second; 00401 } 00402 if (second != SequenceNumber()) { 00403 first = itr->second; 00404 } 00405 00406 if (i % (nak_delay_intervals + 1) == 0) { 00407 first = itr->second; 00408 00409 if (first != SequenceNumber() && second != SequenceNumber()) { 00410 ignored.push_back(std::make_pair(first, second)); 00411 first = SequenceNumber(); 00412 second = SequenceNumber(); 00413 } 00414 } 00415 00416 ++itr; 00417 } 00418 00419 if (first != SequenceNumber() && second != SequenceNumber() && first != second) { 00420 ignored.push_back(std::make_pair(first, second)); 00421 } 00422 } 00423 00424 // Take a copy to facilitate temporary suppression: 00425 DisjointSequence received(this->nak_sequence_); 00426 if (DCPS_debug_level > 0) { 00427 received.dump(); 00428 } 00429 00430 size_t sz = ignored.size(); 00431 for (size_t i = 0; i < sz; ++i) { 00432 00433 if (ignored[i].second > received.cumulative_ack()) { 00434 SequenceNumber high = ignored[i].second; 00435 SequenceNumber low = ignored[i].first; 00436 if (low < received.cumulative_ack()) { 00437 low = received.cumulative_ack(); 00438 } 00439 00440 if (DCPS_debug_level > 5) { 00441 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00442 ACE_TEXT("remote %#08x%08x ignore missing [%q - %q]\n"), 00443 (unsigned int)(this->link()->local_peer() >> 32), 00444 (unsigned int) this->link()->local_peer(), 00445 (unsigned int)(this->remote_peer_ >> 32), 00446 (unsigned int) this->remote_peer_, 00447 low.getValue(), high.getValue())); 00448 } 00449 00450 // Make contiguous between ignored sequences. 00451 received.insert(SequenceRange(low, high)); 00452 } 00453 } 00454 00455 for (NakPeerSet::iterator it(this->nak_peers_.begin()); 00456 it != this->nak_peers_.end(); ++it) { 00457 // Update sequence to temporarily suppress repair requests for 00458 // ranges already requested by other peers for this interval: 00459 received.insert(*it); 00460 } 00461 bool sending_naks = false; 00462 if (received.low() > 1){ 00463 //Special case: nak from beginning to make sure no missing sequence 00464 //number below the first received 00465 sending_naks = true; 00466 std::vector<SequenceRange> ranges; 00467 ranges.push_back(SequenceRange(SequenceNumber(), received.low())); 00468 00469 CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size()); 00470 00471 size_t len = sizeof(this->remote_peer_) 00472 + sizeof(size) 00473 + size * 2 * sizeof(SequenceNumber); 00474 00475 Message_Block_Ptr data(new ACE_Message_Block(len)); 00476 00477 Serializer serializer(data.get()); 00478 00479 serializer << this->remote_peer_; 00480 serializer << size; 00481 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin(); 00482 iter != ranges.end(); ++iter) { 00483 serializer << iter->first; 00484 serializer << iter->second; 00485 if (DCPS_debug_level > 0) { 00486 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks ") 00487 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"), 00488 (unsigned int)(this->link()->local_peer() >> 32), 00489 (unsigned int) this->link()->local_peer(), 00490 (unsigned int)(this->remote_peer_ >> 32), 00491 (unsigned int) this->remote_peer_, 00492 iter->first.getValue(), iter->second.getValue())); 00493 } 00494 } 00495 // Send control sample to remote peer: 00496 send_control(MULTICAST_NAK, move(data)); 00497 } 00498 if (received.disjoint()) { 00499 sending_naks = true; 00500 send_naks(received); 00501 } 00502 00503 if (!sending_naks && DCPS_debug_level > 5){ 00504 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00505 ACE_TEXT("remote %#08x%08x received sequence not disjoint, don't send naks \n"), 00506 (unsigned int)(this->link()->local_peer() >> 32), 00507 (unsigned int) this->link()->local_peer(), 00508 (unsigned int)(this->remote_peer_ >> 32), 00509 (unsigned int) this->remote_peer_)); 00510 } 00511 00512 // Clear peer repair requests: 00513 this->nak_peers_.clear(); 00514 }
bool OpenDDS::DCPS::ReliableSession::start | ( | bool | active, | |
bool | acked | |||
) | [virtual] |
Implements OpenDDS::DCPS::MulticastSession.
Definition at line 677 of file ReliableSession.cpp.
References ACE_TEXT(), OpenDDS::DCPS::MulticastSession::active_, LM_ERROR, nak_watchdog_, OpenDDS::DCPS::MulticastSession::reverse_start_lock_, OpenDDS::DCPS::MulticastSession::set_acked(), OpenDDS::DCPS::MulticastSession::start_lock_, OpenDDS::DCPS::MulticastSession::start_syn(), and OpenDDS::DCPS::MulticastSession::started_.
00678 { 00679 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->start_lock_, false); 00680 00681 if (this->started_) { 00682 return true; // already started 00683 } 00684 00685 this->active_ = active; 00686 { 00687 //can't call accept_datalink while holding lock due to possible reactor deadlock with passive_connection 00688 ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, this->reverse_start_lock_, false); 00689 00690 // A watchdog timer is scheduled to periodically check for gaps in 00691 // received data. If a gap is discovered, MULTICAST_NAK control 00692 // samples will be sent to initiate repairs. 00693 // Only subscriber send naks so just schedule for sub role. 00694 if (!active) { 00695 if (acked) { 00696 this->set_acked(); 00697 } 00698 if (!this->nak_watchdog_->schedule()) { 00699 ACE_ERROR_RETURN((LM_ERROR, 00700 ACE_TEXT("(%P|%t) ERROR: ") 00701 ACE_TEXT("ReliableSession::start: ") 00702 ACE_TEXT("failed to schedule NAK watchdog!\n")), 00703 false); 00704 } 00705 } 00706 00707 // Active peers schedule a watchdog timer to initiate a 2-way 00708 // handshake to verify that passive endpoints can send/receive 00709 // data reliably. This process must be executed using the 00710 // transport reactor thread to prevent blocking. 00711 // Only publisher send syn so just schedule for pub role. 00712 if (active && !this->start_syn()) { 00713 this->nak_watchdog_->cancel(); 00714 ACE_ERROR_RETURN((LM_ERROR, 00715 ACE_TEXT("(%P|%t) ERROR: ") 00716 ACE_TEXT("ReliableSession::start: ") 00717 ACE_TEXT("failed to schedule SYN watchdog!\n")), 00718 false); 00719 } 00720 } //Reacquire start_lock_ after releasing unlock_guard with release_start_lock_ 00721 00722 return this->started_ = true; 00723 }
void OpenDDS::DCPS::ReliableSession::stop | ( | void | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 726 of file ReliableSession.cpp.
References nak_watchdog_.
00727 { 00728 MulticastSession::stop(); 00729 this->nak_watchdog_->cancel(); 00730 }
void OpenDDS::DCPS::ReliableSession::syn_hook | ( | const SequenceNumber & | seq | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 245 of file ReliableSession.cpp.
References OpenDDS::DCPS::DisjointSequence::insert(), nak_sequence_, and OpenDDS::DCPS::DisjointSequence::reset().
00246 { 00247 const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges()); 00248 this->nak_sequence_.reset(); 00249 this->nak_sequence_.insert(seq); 00250 00251 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin(); 00252 iter != ranges.end(); ++iter) { 00253 this->nak_sequence_.insert(SequenceRange(iter->first, iter->second)); 00254 } 00255 }
Definition at line 91 of file ReliableSession.h.
Referenced by deliver_held_data(), ready_to_deliver(), and release_remote().
NakPeerSet OpenDDS::DCPS::ReliableSession::nak_peers_ [private] |
Definition at line 96 of file ReliableSession.h.
Referenced by send_naks().
NakRequestMap OpenDDS::DCPS::ReliableSession::nak_requests_ [private] |
Definition at line 89 of file ReliableSession.h.
Referenced by expire_naks(), and send_naks().
Definition at line 86 of file ReliableSession.h.
Referenced by check_header(), deliver_held_data(), expire_naks(), nakack_received(), ready_to_deliver(), record_header_received(), send_naks(), and syn_hook().
Definition at line 84 of file ReliableSession.h.
Referenced by start(), stop(), and ~ReliableSession().