#include <ReliableSession.h>
Inheritance diagram for OpenDDS::DCPS::ReliableSession:
Public Member Functions | |
ReliableSession (ACE_Reactor *reactor, ACE_thread_t owner, MulticastDataLink *link, MulticastPeer remote_peer) | |
~ReliableSession () | |
virtual bool | check_header (const TransportHeader &header) |
virtual void | record_header_received (const TransportHeader &header) |
virtual bool | ready_to_deliver (const TransportHeader &header, const ReceivedDataSample &data) |
void | deliver_held_data () |
virtual void | release_remote (const RepoId &remote) |
virtual bool | control_received (char submessage_id, ACE_Message_Block *control) |
void | expire_naks () |
void | send_naks () |
void | nak_received (ACE_Message_Block *control) |
void | send_naks (DisjointSequence &found) |
void | nakack_received (ACE_Message_Block *control) |
virtual void | send_nakack (SequenceNumber low) |
virtual bool | start (bool active, bool acked) |
virtual void | stop () |
virtual bool | is_reliable () |
virtual void | syn_hook (const SequenceNumber &seq) |
Private Types | |
typedef SequenceNumber | TransportHeaderSN |
Private Member Functions | |
typedef | OPENDDS_MAP (ACE_Time_Value, SequenceNumber) NakRequestMap |
OPENDDS_MULTIMAP (TransportHeaderSN, ReceivedDataSample) held_ | |
typedef | OPENDDS_SET (SequenceRange) NakPeerSet |
Private Attributes | |
NakWatchdog * | nak_watchdog_ |
DisjointSequence | nak_sequence_ |
NakRequestMap | nak_requests_ |
ACE_Thread_Mutex | held_lock_ |
NakPeerSet | nak_peers_ |
Definition at line 44 of file ReliableSession.h.
typedef SequenceNumber OpenDDS::DCPS::ReliableSession::TransportHeaderSN [private] |
Definition at line 89 of file ReliableSession.h.
OpenDDS::DCPS::ReliableSession::ReliableSession | ( | ACE_Reactor * | reactor, | |
ACE_thread_t | owner, | |||
MulticastDataLink * | link, | |||
MulticastPeer | remote_peer | |||
) |
Definition at line 60 of file ReliableSession.cpp.
00064 : MulticastSession(reactor, owner, link, remote_peer), 00065 nak_watchdog_(new NakWatchdog (reactor, owner, this)) 00066 { 00067 }
OpenDDS::DCPS::ReliableSession::~ReliableSession | ( | ) |
Definition at line 69 of file ReliableSession.cpp.
References OpenDDS::DCPS::DataLinkWatchdog::cancel(), OpenDDS::DCPS::ReactorInterceptor::destroy(), nak_watchdog_, and OpenDDS::DCPS::ReactorInterceptor::wait().
00070 { 00071 nak_watchdog_->cancel(); 00072 nak_watchdog_->wait(); 00073 nak_watchdog_->destroy(); 00074 }
bool OpenDDS::DCPS::ReliableSession::check_header | ( | const TransportHeader & | header | ) | [virtual] |
Implements OpenDDS::DCPS::MulticastSession.
Definition at line 83 of file ReliableSession.cpp.
References header, OpenDDS::DCPS::DisjointSequence::insert(), and nak_sequence_.
00084 { 00085 // Not from the remote peer for this session. 00086 if (this->remote_peer_ != header.source_) return false; 00087 00088 // Active sessions don't need to track nak_sequence_ 00089 if (this->active_) return true; 00090 00091 // Update last seen sequence for remote peer; return false if we 00092 // have already seen this datagram to prevent duplicate delivery 00093 // Note: SN 2 is first SN recorded - fill in up to 2 when rcvd 00094 return this->nak_sequence_.insert(SequenceRange( 00095 header.sequence_ == 2 ? SequenceNumber() : header.sequence_, 00096 header.sequence_)); 00097 }
bool OpenDDS::DCPS::ReliableSession::control_received | ( | char | submessage_id, | |
ACE_Message_Block * | control | |||
) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 217 of file ReliableSession.cpp.
References OpenDDS::DCPS::MulticastSession::control_received(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MULTICAST_NAKACK, nak_received(), and nakack_received().
00219 { 00220 if (MulticastSession::control_received(submessage_id, control)) { 00221 return true; // base class handled message 00222 } 00223 00224 switch (submessage_id) { 00225 case MULTICAST_NAK: 00226 nak_received(control); 00227 break; 00228 00229 case MULTICAST_NAKACK: 00230 nakack_received(control); 00231 break; 00232 00233 default: 00234 ACE_ERROR((LM_WARNING, 00235 ACE_TEXT("(%P|%t) WARNING: ") 00236 ACE_TEXT("ReliableSession::control_received: ") 00237 ACE_TEXT("unknown TRANSPORT_CONTROL submessage: 0x%x!\n"), 00238 submessage_id)); 00239 break; 00240 } 00241 return true; 00242 }
void OpenDDS::DCPS::ReliableSession::deliver_held_data | ( | ) |
Definition at line 169 of file ReliableSession.cpp.
References OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::empty(), held_lock_, OpenDDS::DCPS::DisjointSequence::low(), nak_sequence_, OPENDDS_MULTIMAP(), OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), and OpenDDS::DCPS::Transport_debug_level.
Referenced by expire_naks(), nakack_received(), ready_to_deliver(), and record_header_received().
00170 { 00171 if (nak_sequence_.empty() || nak_sequence_.low() > 1) return; 00172 00173 OPENDDS_VECTOR(ReceivedDataSample) to_deliver; 00174 const SequenceNumber ca = nak_sequence_.cumulative_ack(); 00175 00176 { 00177 ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_); 00178 00179 typedef OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator iter; 00180 const iter end = this->held_.upper_bound(ca); 00181 for (iter it = this->held_.begin(); it != end; /*increment in loop body*/) { 00182 if (Transport_debug_level > 5) { 00183 GuidConverter writer(it->second.header_.publication_id_); 00184 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::deliver_held_data -") 00185 ACE_TEXT(" deliver tseq: %q dseq: %q from %C\n"), 00186 it->first.getValue(), 00187 it->second.header_.sequence_.getValue(), 00188 OPENDDS_STRING(writer).c_str())); 00189 } 00190 to_deliver.push_back(it->second); 00191 this->held_.erase(it++); 00192 } 00193 } 00194 for (size_t i = 0; i < to_deliver.size(); ++i) { 00195 this->link_->data_received(to_deliver.at(i)); 00196 } 00197 }
void OpenDDS::DCPS::ReliableSession::expire_naks | ( | ) |
Definition at line 258 of file ReliableSession.cpp.
References OpenDDS::DCPS::MulticastDataLink::config(), deliver_held_data(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::MulticastSession::link_, nak_requests_, and OpenDDS::DCPS::MulticastInst::nak_timeout_.
Referenced by OpenDDS::DCPS::NakWatchdog::on_interval().
00259 { 00260 if (this->nak_requests_.empty()) return; // nothing to expire 00261 00262 ACE_Time_Value deadline(ACE_OS::gettimeofday()); 00263 deadline -= this->link_->config()->nak_timeout_; 00264 00265 NakRequestMap::iterator first(this->nak_requests_.begin()); 00266 NakRequestMap::iterator last(this->nak_requests_.upper_bound(deadline)); 00267 00268 if (first == last) return; // nothing to expire 00269 00270 // Skip unrecoverable datagrams to 00271 // re-establish a baseline to detect future reception gaps. 00272 SequenceNumber lastSeq = (last == this->nak_requests_.end()) 00273 ? this->nak_requests_.rbegin()->second 00274 : last->second; 00275 00276 std::vector<SequenceRange> dropped; 00277 if (this->nak_sequence_.insert(SequenceRange(this->nak_sequence_.low(), 00278 lastSeq), dropped)) { 00279 00280 for (size_t i = 0; i < dropped.size(); ++i) { 00281 this->reassembly_.data_unavailable(dropped[i]); 00282 } 00283 00284 ACE_ERROR((LM_WARNING, 00285 ACE_TEXT("(%P|%t) WARNING: ") 00286 ACE_TEXT("ReliableSession::expire_naks: ") 00287 ACE_TEXT("timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"), 00288 (unsigned int)(this->remote_peer_ >> 32), 00289 (unsigned int) this->remote_peer_, 00290 this->nak_sequence_.low().getValue(), 00291 lastSeq.getValue())); 00292 } 00293 00294 // Clear expired repair requests: 00295 this->nak_requests_.erase(first, last); 00296 deliver_held_data(); 00297 }
virtual bool OpenDDS::DCPS::ReliableSession::is_reliable | ( | ) | [inline, virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 76 of file ReliableSession.h.
void OpenDDS::DCPS::ReliableSession::nak_received | ( | ACE_Message_Block * | control | ) |
Definition at line 518 of file ReliableSession.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SingleSendBuffer::empty(), OpenDDS::DCPS::SequenceNumber::getValue(), header, OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::SingleSendBuffer::low(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::SingleSendBuffer::resend(), OpenDDS::DCPS::MulticastDataLink::send_buffer(), and send_nakack().
Referenced by control_received().
00519 { 00520 if (!this->active_) return; // sub send naks, then doesn't receive them. 00521 00522 const TransportHeader& header = 00523 this->link_->receive_strategy()->received_header(); 00524 00525 Serializer serializer(control, header.swap_bytes()); 00526 00527 MulticastPeer local_peer; 00528 CORBA::ULong size = 0; 00529 serializer >> local_peer; // sent as remote_peer 00530 serializer >> size; 00531 00532 std::vector<SequenceRange> ranges; 00533 00534 for (CORBA::ULong i = 0; i < size; ++i) { 00535 SequenceRange range; 00536 serializer >> range.first; 00537 serializer >> range.second; 00538 ranges.push_back(range); 00539 } 00540 00541 // Ignore sample if not destined for us: 00542 if ((local_peer != this->link_->local_peer()) // Not to us. 00543 || (this->remote_peer_ != header.source_)) return; // Not from the remote peer for this session. 00544 00545 SingleSendBuffer* send_buffer = this->link_->send_buffer(); 00546 // Broadcast a MULTICAST_NAKACK control sample before resending to suppress 00547 // repair requests for unrecoverable samples by providing a 00548 // new low-water mark for affected peers: 00549 if (!send_buffer->empty() && send_buffer->low() > ranges.begin()->first) { 00550 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00551 ACE_DEBUG ((LM_DEBUG, 00552 ACE_TEXT ("(%P|%t) ReliableSession::nak_received") 00553 ACE_TEXT (" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"), 00554 (unsigned int)(this->link()->local_peer() >> 32), 00555 (unsigned int) this->link()->local_peer(), 00556 (unsigned int)(this->remote_peer_ >> 32), 00557 (unsigned int) this->remote_peer_, 00558 send_buffer->low().getValue())); 00559 } 00560 send_nakack(send_buffer->low()); 00561 } 00562 00563 for (CORBA::ULong i = 0; i < size; ++i) { 00564 bool ret = send_buffer->resend(ranges[i]); 00565 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00566 ACE_DEBUG ((LM_DEBUG, 00567 ACE_TEXT ("(%P|%t) ReliableSession::nak_received") 00568 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"), 00569 (unsigned int)(this->link()->local_peer() >> 32), 00570 (unsigned int) this->link()->local_peer(), 00571 (unsigned int)(this->remote_peer_ >> 32), 00572 (unsigned int) this->remote_peer_, 00573 ranges[i].first.getValue(), ranges[i].second.getValue(), 00574 ret ? "SUCCESS" : "FAILED")); 00575 } 00576 } 00577 }
void OpenDDS::DCPS::ReliableSession::nakack_received | ( | ACE_Message_Block * | control | ) |
Definition at line 617 of file ReliableSession.cpp.
References OpenDDS::DCPS::DCPS_debug_level, deliver_held_data(), header, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, OpenDDS::DCPS::MulticastDataLink::local_peer(), nak_sequence_, and OpenDDS::DCPS::MulticastDataLink::receive_strategy().
Referenced by control_received().
00618 { 00619 if (this->active_) return; // pub send nakack, doesn't receive them. 00620 00621 const TransportHeader& header = 00622 this->link_->receive_strategy()->received_header(); 00623 00624 // Not from the remote peer for this session. 00625 if (this->remote_peer_ != header.source_) return; 00626 00627 Serializer serializer(control, header.swap_bytes()); 00628 00629 SequenceNumber low; 00630 serializer >> low; 00631 00632 // MULTICAST_NAKACK control samples indicate data which cannot be 00633 // repaired by a remote peer; if any values were needed below 00634 // this value, then the sequence needs to be shifted: 00635 std::vector<SequenceRange> dropped; 00636 SequenceNumber range_low = SequenceNumber(); 00637 SequenceNumber range_high = low == SequenceNumber() ? SequenceNumber() : low.previous(); 00638 00639 if (range_low == SequenceNumber() && range_high == SequenceNumber()) { 00640 00641 this->nak_sequence_.insert(range_low); 00642 00643 } else if (this->nak_sequence_.insert(SequenceRange(range_low, range_high), dropped)) { 00644 00645 for (size_t i = 0; i < dropped.size(); ++i) { 00646 this->reassembly_.data_unavailable(dropped[i]); 00647 } 00648 00649 if (DCPS_debug_level > 0) { 00650 ACE_ERROR((LM_WARNING, 00651 ACE_TEXT("(%P|%t) WARNING: ReliableSession::nakack_received ") 00652 ACE_TEXT("local %#08x%08x remote %#08x%08x with low [%q] ") 00653 ACE_TEXT("- some ranges dropped.\n"), 00654 (unsigned int)(this->link()->local_peer() >> 32), 00655 (unsigned int) this->link()->local_peer(), 00656 (unsigned int)(this->remote_peer_ >> 32), 00657 (unsigned int) this->remote_peer_, 00658 low.getValue())); 00659 } 00660 } 00661 deliver_held_data(); 00662 }
typedef OpenDDS::DCPS::ReliableSession::OPENDDS_MAP | ( | ACE_Time_Value | , | |
SequenceNumber | ||||
) | [private] |
OpenDDS::DCPS::ReliableSession::OPENDDS_MULTIMAP | ( | TransportHeaderSN | , | |
ReceivedDataSample | ||||
) | [private] |
Referenced by deliver_held_data(), ready_to_deliver(), and release_remote().
typedef OpenDDS::DCPS::ReliableSession::OPENDDS_SET | ( | SequenceRange | ) | [private] |
bool OpenDDS::DCPS::ReliableSession::ready_to_deliver | ( | const TransportHeader & | header, | |
const ReceivedDataSample & | data | |||
) | [virtual] |
Implements OpenDDS::DCPS::MulticastSession.
Definition at line 115 of file ReliableSession.cpp.
References OpenDDS::DCPS::MulticastSession::acked(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), deliver_held_data(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::SequenceNumber::getValue(), header, OpenDDS::DCPS::ReceivedDataSample::header_, held_lock_, OpenDDS::DCPS::DisjointSequence::low(), nak_sequence_, OPENDDS_MULTIMAP(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, and OpenDDS::DCPS::Transport_debug_level.
00117 { 00118 if (!acked() 00119 || nak_sequence_.disjoint() 00120 || (!nak_sequence_.empty() && nak_sequence_.cumulative_ack() != header.sequence_) 00121 || (!nak_sequence_.empty() && nak_sequence_.low() > 1) 00122 || (nak_sequence_.empty() && header.sequence_ > 1)) { 00123 00124 if (Transport_debug_level > 5) { 00125 GuidConverter writer(data.header_.publication_id_); 00126 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -") 00127 ACE_TEXT(" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"), 00128 header.sequence_.getValue(), 00129 data.header_.sequence_.getValue(), 00130 OPENDDS_STRING(writer).c_str())); 00131 } 00132 { 00133 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, held_lock_, false); 00134 00135 held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.sequence_, data)); 00136 00137 if (Transport_debug_level > 5) { 00138 OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator it = held_.begin(); 00139 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -") 00140 ACE_TEXT(" held_ data currently contains: %d samples\n"), 00141 held_.size())); 00142 while (it != held_.end()) { 00143 GuidConverter writer(it->second.header_.publication_id_); 00144 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -") 00145 ACE_TEXT(" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"), 00146 it->first.getValue(), 00147 it->second.header_.sequence_.getValue(), 00148 OPENDDS_STRING(writer).c_str())); 00149 ++it; 00150 } 00151 } 00152 } 00153 deliver_held_data(); 00154 return false; 00155 } else { 00156 if (Transport_debug_level > 5) { 00157 GuidConverter writer(data.header_.publication_id_); 00158 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -") 00159 ACE_TEXT(" tseq: %q data seq: %q from %C OK to deliver\n"), 00160 header.sequence_.getValue(), 00161 data.header_.sequence_.getValue(), 00162 OPENDDS_STRING(writer).c_str())); 00163 } 00164 return true; 00165 } 00166 }
void OpenDDS::DCPS::ReliableSession::record_header_received | ( | const TransportHeader & | header | ) | [virtual] |
Implements OpenDDS::DCPS::MulticastSession.
Definition at line 100 of file ReliableSession.cpp.
References deliver_held_data(), header, OpenDDS::DCPS::DisjointSequence::insert(), and nak_sequence_.
00101 { 00102 // Not from the remote peer for this session. 00103 if (this->remote_peer_ != header.source_) return; 00104 00105 // Active sessions don't need to track nak_sequence_ 00106 if (this->active_) return; 00107 00108 // Update nak sequence for seen sequence from remote peer 00109 this->nak_sequence_.insert(header.sequence_); 00110 deliver_held_data(); 00111 }
void OpenDDS::DCPS::ReliableSession::release_remote | ( | const RepoId & | remote | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 200 of file ReliableSession.cpp.
References held_lock_, and OPENDDS_MULTIMAP().
00201 { 00202 ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_); 00203 if (!held_.empty()) { 00204 OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample)::iterator it = held_.begin(); 00205 while (it != held_.end()) { 00206 if (it->second.header_.publication_id_ == remote) { 00207 held_.erase(it++); 00208 } else { 00209 it++; 00210 } 00211 } 00212 } 00213 }
void OpenDDS::DCPS::ReliableSession::send_nakack | ( | SequenceNumber | low | ) | [virtual] |
Definition at line 665 of file ReliableSession.cpp.
References OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::MULTICAST_NAKACK, and OpenDDS::DCPS::MulticastSession::send_control().
Referenced by nak_received().
00666 { 00667 size_t len = sizeof(low.getValue()); 00668 00669 ACE_Message_Block* data; 00670 ACE_NEW(data, ACE_Message_Block(len)); 00671 00672 Serializer serializer(data); 00673 00674 serializer << low; 00675 // Broadcast control sample to all peers: 00676 send_control(MULTICAST_NAKACK, data); 00677 }
void OpenDDS::DCPS::ReliableSession::send_naks | ( | DisjointSequence & | found | ) |
Definition at line 580 of file ReliableSession.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MulticastSession::remote_peer_, and OpenDDS::DCPS::MulticastSession::send_control().
00581 { 00582 const std::vector<SequenceRange> ranges(received.missing_sequence_ranges()); 00583 00584 CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size()); 00585 00586 size_t len = sizeof(this->remote_peer_) 00587 + sizeof(size) 00588 + size * 2 * sizeof(SequenceNumber); 00589 00590 ACE_Message_Block* data; 00591 ACE_NEW(data, ACE_Message_Block(len)); 00592 00593 Serializer serializer(data); 00594 00595 serializer << this->remote_peer_; 00596 serializer << size; 00597 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin(); 00598 iter != ranges.end(); ++iter) { 00599 serializer << iter->first; 00600 serializer << iter->second; 00601 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00602 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks (Disjoint) ") 00603 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"), 00604 (unsigned int)(this->link()->local_peer() >> 32), 00605 (unsigned int) this->link()->local_peer(), 00606 (unsigned int)(this->remote_peer_ >> 32), 00607 (unsigned int) this->remote_peer_, 00608 iter->first.getValue(), iter->second.getValue())); 00609 } 00610 } 00611 // Send control sample to remote peer: 00612 send_control(MULTICAST_NAK, data); 00613 }
void OpenDDS::DCPS::ReliableSession::send_naks | ( | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 300 of file ReliableSession.cpp.
References OpenDDS::DCPS::MulticastDataLink::config(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MulticastInst::nak_delay_intervals_, OpenDDS::DCPS::MulticastInst::nak_max_, nak_peers_, nak_requests_, nak_sequence_, OpenDDS::DCPS::MulticastSession::remote_peer_, and OpenDDS::DCPS::MulticastSession::send_control().
Referenced by OpenDDS::DCPS::NakWatchdog::on_interval().
00301 { 00302 // Could get data samples before syn control message. 00303 // No use nak'ing until syn control message is received and session is acked. 00304 if (!this->acked()) { 00305 if (DCPS_debug_level > 5) { 00306 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00307 ACE_TEXT("remote %#08x%08x session NOT acked yet, don't send naks\n"), 00308 (unsigned int)(this->link()->local_peer() >> 32), 00309 (unsigned int) this->link()->local_peer(), 00310 (unsigned int)(this->remote_peer_ >> 32), 00311 (unsigned int) this->remote_peer_)); 00312 } 00313 return; 00314 } 00315 00316 if (DCPS_debug_level > 5) { 00317 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00318 ACE_TEXT("remote %#08x%08x nak request size %d \n"), 00319 (unsigned int)(this->link()->local_peer() >> 32), 00320 (unsigned int) this->link()->local_peer(), 00321 (unsigned int)(this->remote_peer_ >> 32), 00322 (unsigned int) this->remote_peer_, 00323 this->nak_requests_.size())); 00324 } 00325 00326 if (!(this->nak_sequence_.low() > 1) && !this->nak_sequence_.disjoint()) { 00327 if (DCPS_debug_level > 5) { 00328 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00329 ACE_TEXT("remote %#08x%08x nak sequence not disjoint, don't send naks \n"), 00330 (unsigned int)(this->link()->local_peer() >> 32), 00331 (unsigned int) this->link()->local_peer(), 00332 (unsigned int)(this->remote_peer_ >> 32), 00333 (unsigned int) this->remote_peer_)); 00334 } 00335 00336 if (DCPS_debug_level > 9) { 00337 const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges()); 00338 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin(); 00339 iter != ranges.end(); ++iter) { 00340 ACE_DEBUG((LM_DEBUG, "(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n", 00341 (unsigned int)(this->link()->local_peer() >> 32), 00342 (unsigned int) this->link()->local_peer(), 00343 (unsigned int)(this->remote_peer_ >> 32), 00344 (unsigned int) this->remote_peer_, 00345 iter->first.getValue(), 00346 iter->second.getValue())); 00347 } 00348 } 00349 return; // nothing to send 00350 } 00351 00352 ACE_Time_Value now(ACE_OS::gettimeofday()); 00353 00354 // Record low-water mark for this interval; this value will 00355 // be used to reset the low-water mark in the event the remote 00356 // peer becomes unresponsive: 00357 if (this->nak_sequence_.low() > 1) { 00358 this->nak_requests_[now] = SequenceNumber(); 00359 } else { 00360 this->nak_requests_[now] = this->nak_sequence_.cumulative_ack(); 00361 } 00362 00363 typedef std::vector<SequenceRange> RangeVector; 00364 RangeVector ignored; 00365 00366 /// The range first - second will be skipped (no naks sent for it). 00367 SequenceNumber first; 00368 SequenceNumber second; 00369 00370 NakRequestMap::reverse_iterator itr(this->nak_requests_.rbegin()); 00371 00372 if (this->nak_requests_.size() > 1) { 00373 // The sequences between rbegin - 1 and rbegin will not be ignored for naking. 00374 ++itr; 00375 00376 size_t nak_delay_intervals = this->link()->config()->nak_delay_intervals_; 00377 size_t nak_max = this->link()->config()->nak_max_; 00378 size_t sz = this->nak_requests_.size(); 00379 00380 // Image i is the index of element in nak_requests_ in reverse order. 00381 // index 0 sequence is most recent high water mark. 00382 // e.g index , 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0 00383 // 0 (rbegin) is always skipped because missing sample between 1 and 0 interval 00384 // should always be naked., 00385 // if nak_delay_intervals=4, nak_max=3, any sequence between 5 - 1, 10 - 6, 15 - 11 00386 // are skipped for naking due to nak_delay_intervals and 20 - 16 are skipped for 00387 // naking due to nak_max. 00388 for (size_t i = 1; i < sz; ++i) { 00389 if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) { 00390 if (first != SequenceNumber()) { 00391 first = this->nak_requests_.begin()->second; 00392 } 00393 else { 00394 ignored.push_back(std::make_pair(this->nak_requests_.begin()->second, itr->second)); 00395 } 00396 break; 00397 } 00398 00399 if (i % (nak_delay_intervals + 1) == 1) { 00400 second = itr->second; 00401 } 00402 if (second != SequenceNumber()) { 00403 first = itr->second; 00404 } 00405 00406 if (i % (nak_delay_intervals + 1) == 0) { 00407 first = itr->second; 00408 00409 if (first != SequenceNumber() && second != SequenceNumber()) { 00410 ignored.push_back(std::make_pair(first, second)); 00411 first = SequenceNumber(); 00412 second = SequenceNumber(); 00413 } 00414 } 00415 00416 ++itr; 00417 } 00418 00419 if (first != SequenceNumber() && second != SequenceNumber() && first != second) { 00420 ignored.push_back(std::make_pair(first, second)); 00421 } 00422 } 00423 00424 // Take a copy to facilitate temporary suppression: 00425 DisjointSequence received(this->nak_sequence_); 00426 if (DCPS_debug_level > 0) { 00427 received.dump(); 00428 } 00429 00430 size_t sz = ignored.size(); 00431 for (size_t i = 0; i < sz; ++i) { 00432 00433 if (ignored[i].second > received.cumulative_ack()) { 00434 SequenceNumber high = ignored[i].second; 00435 SequenceNumber low = ignored[i].first; 00436 if (low < received.cumulative_ack()) { 00437 low = received.cumulative_ack(); 00438 } 00439 00440 if (DCPS_debug_level > 5) { 00441 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00442 ACE_TEXT("remote %#08x%08x ignore missing [%q - %q]\n"), 00443 (unsigned int)(this->link()->local_peer() >> 32), 00444 (unsigned int) this->link()->local_peer(), 00445 (unsigned int)(this->remote_peer_ >> 32), 00446 (unsigned int) this->remote_peer_, 00447 low.getValue(), high.getValue())); 00448 } 00449 00450 // Make contiguous between ignored sequences. 00451 received.insert(SequenceRange(low, high)); 00452 } 00453 } 00454 00455 for (NakPeerSet::iterator it(this->nak_peers_.begin()); 00456 it != this->nak_peers_.end(); ++it) { 00457 // Update sequence to temporarily suppress repair requests for 00458 // ranges already requested by other peers for this interval: 00459 received.insert(*it); 00460 } 00461 bool sending_naks = false; 00462 if (received.low() > 1){ 00463 //Special case: nak from beginning to make sure no missing sequence 00464 //number below the first received 00465 sending_naks = true; 00466 std::vector<SequenceRange> ranges; 00467 ranges.push_back(SequenceRange(SequenceNumber(), received.low())); 00468 00469 CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size()); 00470 00471 size_t len = sizeof(this->remote_peer_) 00472 + sizeof(size) 00473 + size * 2 * sizeof(SequenceNumber); 00474 00475 ACE_Message_Block* data; 00476 ACE_NEW(data, ACE_Message_Block(len)); 00477 00478 Serializer serializer(data); 00479 00480 serializer << this->remote_peer_; 00481 serializer << size; 00482 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin(); 00483 iter != ranges.end(); ++iter) { 00484 serializer << iter->first; 00485 serializer << iter->second; 00486 if (DCPS_debug_level > 0) { 00487 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks ") 00488 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"), 00489 (unsigned int)(this->link()->local_peer() >> 32), 00490 (unsigned int) this->link()->local_peer(), 00491 (unsigned int)(this->remote_peer_ >> 32), 00492 (unsigned int) this->remote_peer_, 00493 iter->first.getValue(), iter->second.getValue())); 00494 } 00495 } 00496 // Send control sample to remote peer: 00497 send_control(MULTICAST_NAK, data); 00498 } 00499 if (received.disjoint()) { 00500 sending_naks = true; 00501 send_naks(received); 00502 } 00503 00504 if (!sending_naks && DCPS_debug_level > 5){ 00505 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ") 00506 ACE_TEXT("remote %#08x%08x received sequence not disjoint, don't send naks \n"), 00507 (unsigned int)(this->link()->local_peer() >> 32), 00508 (unsigned int) this->link()->local_peer(), 00509 (unsigned int)(this->remote_peer_ >> 32), 00510 (unsigned int) this->remote_peer_)); 00511 } 00512 00513 // Clear peer repair requests: 00514 this->nak_peers_.clear(); 00515 }
bool OpenDDS::DCPS::ReliableSession::start | ( | bool | active, | |
bool | acked | |||
) | [virtual] |
Implements OpenDDS::DCPS::MulticastSession.
Definition at line 680 of file ReliableSession.cpp.
References OpenDDS::DCPS::MulticastSession::active_, OpenDDS::DCPS::MulticastSession::set_acked(), and OpenDDS::DCPS::MulticastSession::started_.
00681 { 00682 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->start_lock_, false); 00683 00684 if (this->started_) { 00685 return true; // already started 00686 } 00687 00688 this->active_ = active; 00689 { 00690 //can't call accept_datalink while holding lock due to possible reactor deadlock with passive_connection 00691 ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, this->reverse_start_lock_, false); 00692 00693 // A watchdog timer is scheduled to periodically check for gaps in 00694 // received data. If a gap is discovered, MULTICAST_NAK control 00695 // samples will be sent to initiate repairs. 00696 // Only subscriber send naks so just schedule for sub role. 00697 if (!active) { 00698 if (acked) { 00699 this->set_acked(); 00700 } 00701 if (!this->nak_watchdog_->schedule()) { 00702 ACE_ERROR_RETURN((LM_ERROR, 00703 ACE_TEXT("(%P|%t) ERROR: ") 00704 ACE_TEXT("ReliableSession::start: ") 00705 ACE_TEXT("failed to schedule NAK watchdog!\n")), 00706 false); 00707 } 00708 } 00709 00710 // Active peers schedule a watchdog timer to initiate a 2-way 00711 // handshake to verify that passive endpoints can send/receive 00712 // data reliably. This process must be executed using the 00713 // transport reactor thread to prevent blocking. 00714 // Only publisher send syn so just schedule for pub role. 00715 if (active && !this->start_syn()) { 00716 this->nak_watchdog_->cancel(); 00717 ACE_ERROR_RETURN((LM_ERROR, 00718 ACE_TEXT("(%P|%t) ERROR: ") 00719 ACE_TEXT("ReliableSession::start: ") 00720 ACE_TEXT("failed to schedule SYN watchdog!\n")), 00721 false); 00722 } 00723 } //Reacquire start_lock_ after releasing unlock_guard with release_start_lock_ 00724 00725 return this->started_ = true; 00726 }
void OpenDDS::DCPS::ReliableSession::stop | ( | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 729 of file ReliableSession.cpp.
References OpenDDS::DCPS::DataLinkWatchdog::cancel(), nak_watchdog_, and OpenDDS::DCPS::MulticastSession::stop().
00730 { 00731 MulticastSession::stop(); 00732 this->nak_watchdog_->cancel(); 00733 }
void OpenDDS::DCPS::ReliableSession::syn_hook | ( | const SequenceNumber & | seq | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::MulticastSession.
Definition at line 245 of file ReliableSession.cpp.
References OpenDDS::DCPS::DisjointSequence::insert(), nak_sequence_, and OpenDDS::DCPS::DisjointSequence::reset().
00246 { 00247 const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges()); 00248 this->nak_sequence_.reset(); 00249 this->nak_sequence_.insert(seq); 00250 00251 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin(); 00252 iter != ranges.end(); ++iter) { 00253 this->nak_sequence_.insert(SequenceRange(iter->first, iter->second)); 00254 } 00255 }
ACE_Thread_Mutex OpenDDS::DCPS::ReliableSession::held_lock_ [private] |
Definition at line 88 of file ReliableSession.h.
Referenced by deliver_held_data(), ready_to_deliver(), and release_remote().
NakPeerSet OpenDDS::DCPS::ReliableSession::nak_peers_ [private] |
NakRequestMap OpenDDS::DCPS::ReliableSession::nak_requests_ [private] |
Definition at line 83 of file ReliableSession.h.
Referenced by check_header(), deliver_held_data(), nakack_received(), ready_to_deliver(), record_header_received(), send_naks(), and syn_hook().