OpenDDS::DCPS::ReliableSession Class Reference

#include <ReliableSession.h>

Inheritance diagram for OpenDDS::DCPS::ReliableSession:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ReliableSession:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ReliableSession (ACE_Reactor *reactor, ACE_thread_t owner, MulticastDataLink *link, MulticastPeer remote_peer)
 ~ReliableSession ()
virtual bool check_header (const TransportHeader &header)
virtual void record_header_received (const TransportHeader &header)
virtual bool ready_to_deliver (const TransportHeader &header, const ReceivedDataSample &data)
void deliver_held_data ()
virtual void release_remote (const RepoId &remote)
virtual bool control_received (char submessage_id, ACE_Message_Block *control)
void expire_naks ()
void send_naks ()
void nak_received (ACE_Message_Block *control)
void send_naks (DisjointSequence &found)
void nakack_received (ACE_Message_Block *control)
virtual void send_nakack (SequenceNumber low)
virtual bool start (bool active, bool acked)
virtual void stop ()
virtual bool is_reliable ()
virtual void syn_hook (const SequenceNumber &seq)

Private Types

typedef SequenceNumber TransportHeaderSN

Private Member Functions

typedef OPENDDS_MAP (ACE_Time_Value, SequenceNumber) NakRequestMap
 OPENDDS_MULTIMAP (TransportHeaderSN, ReceivedDataSample) held_
typedef OPENDDS_SET (SequenceRange) NakPeerSet

Private Attributes

NakWatchdognak_watchdog_
DisjointSequence nak_sequence_
NakRequestMap nak_requests_
ACE_Thread_Mutex held_lock_
NakPeerSet nak_peers_

Detailed Description

Definition at line 44 of file ReliableSession.h.


Member Typedef Documentation

typedef SequenceNumber OpenDDS::DCPS::ReliableSession::TransportHeaderSN [private]

Definition at line 89 of file ReliableSession.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ReliableSession::ReliableSession ( ACE_Reactor *  reactor,
ACE_thread_t  owner,
MulticastDataLink link,
MulticastPeer  remote_peer 
)

Definition at line 60 of file ReliableSession.cpp.

00064   : MulticastSession(reactor, owner, link, remote_peer),
00065     nak_watchdog_(new NakWatchdog (reactor, owner, this))
00066 {
00067 }

OpenDDS::DCPS::ReliableSession::~ReliableSession (  ) 

Definition at line 69 of file ReliableSession.cpp.

References OpenDDS::DCPS::DataLinkWatchdog::cancel(), OpenDDS::DCPS::ReactorInterceptor::destroy(), nak_watchdog_, and OpenDDS::DCPS::ReactorInterceptor::wait().

00070 {
00071   nak_watchdog_->cancel();
00072   nak_watchdog_->wait();
00073   nak_watchdog_->destroy();
00074 }


Member Function Documentation

bool OpenDDS::DCPS::ReliableSession::check_header ( const TransportHeader header  )  [virtual]

Implements OpenDDS::DCPS::MulticastSession.

Definition at line 83 of file ReliableSession.cpp.

References header, OpenDDS::DCPS::DisjointSequence::insert(), and nak_sequence_.

00084 {
00085   // Not from the remote peer for this session.
00086   if (this->remote_peer_ != header.source_) return false;
00087 
00088   // Active sessions don't need to track nak_sequence_
00089   if (this->active_) return true;
00090 
00091   // Update last seen sequence for remote peer; return false if we
00092   // have already seen this datagram to prevent duplicate delivery
00093   // Note: SN 2 is first SN recorded - fill in up to 2 when rcvd
00094   return this->nak_sequence_.insert(SequenceRange(
00095       header.sequence_ == 2 ? SequenceNumber() : header.sequence_,
00096       header.sequence_));
00097 }

bool OpenDDS::DCPS::ReliableSession::control_received ( char  submessage_id,
ACE_Message_Block *  control 
) [virtual]

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 217 of file ReliableSession.cpp.

References OpenDDS::DCPS::MulticastSession::control_received(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MULTICAST_NAKACK, nak_received(), and nakack_received().

00219 {
00220   if (MulticastSession::control_received(submessage_id, control)) {
00221     return true; // base class handled message
00222   }
00223 
00224   switch (submessage_id) {
00225   case MULTICAST_NAK:
00226     nak_received(control);
00227     break;
00228 
00229   case MULTICAST_NAKACK:
00230     nakack_received(control);
00231     break;
00232 
00233   default:
00234     ACE_ERROR((LM_WARNING,
00235                ACE_TEXT("(%P|%t) WARNING: ")
00236                ACE_TEXT("ReliableSession::control_received: ")
00237                ACE_TEXT("unknown TRANSPORT_CONTROL submessage: 0x%x!\n"),
00238                submessage_id));
00239     break;
00240   }
00241   return true;
00242 }

void OpenDDS::DCPS::ReliableSession::deliver_held_data (  ) 

Definition at line 169 of file ReliableSession.cpp.

References OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::empty(), held_lock_, OpenDDS::DCPS::DisjointSequence::low(), nak_sequence_, OPENDDS_MULTIMAP(), OPENDDS_STRING, OpenDDS::DCPS::OPENDDS_VECTOR(), and OpenDDS::DCPS::Transport_debug_level.

Referenced by expire_naks(), nakack_received(), ready_to_deliver(), and record_header_received().

00170 {
00171   if (nak_sequence_.empty() || nak_sequence_.low() > 1) return;
00172 
00173   OPENDDS_VECTOR(ReceivedDataSample) to_deliver;
00174   const SequenceNumber ca = nak_sequence_.cumulative_ack();
00175 
00176   {
00177     ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_);
00178 
00179     typedef OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator iter;
00180     const iter end = this->held_.upper_bound(ca);
00181     for (iter it = this->held_.begin(); it != end; /*increment in loop body*/) {
00182       if (Transport_debug_level > 5) {
00183         GuidConverter writer(it->second.header_.publication_id_);
00184         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::deliver_held_data -")
00185                              ACE_TEXT(" deliver tseq: %q dseq: %q from %C\n"),
00186                              it->first.getValue(),
00187                              it->second.header_.sequence_.getValue(),
00188                              OPENDDS_STRING(writer).c_str()));
00189       }
00190       to_deliver.push_back(it->second);
00191       this->held_.erase(it++);
00192     }
00193   }
00194   for (size_t i = 0; i < to_deliver.size(); ++i) {
00195     this->link_->data_received(to_deliver.at(i));
00196   }
00197 }

void OpenDDS::DCPS::ReliableSession::expire_naks (  ) 

Definition at line 258 of file ReliableSession.cpp.

References OpenDDS::DCPS::MulticastDataLink::config(), deliver_held_data(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::MulticastSession::link_, nak_requests_, and OpenDDS::DCPS::MulticastInst::nak_timeout_.

Referenced by OpenDDS::DCPS::NakWatchdog::on_interval().

00259 {
00260   if (this->nak_requests_.empty()) return; // nothing to expire
00261 
00262   ACE_Time_Value deadline(ACE_OS::gettimeofday());
00263   deadline -= this->link_->config()->nak_timeout_;
00264 
00265   NakRequestMap::iterator first(this->nak_requests_.begin());
00266   NakRequestMap::iterator last(this->nak_requests_.upper_bound(deadline));
00267 
00268   if (first == last) return; // nothing to expire
00269 
00270   // Skip unrecoverable datagrams to
00271   // re-establish a baseline to detect future reception gaps.
00272   SequenceNumber lastSeq = (last == this->nak_requests_.end())
00273                          ? this->nak_requests_.rbegin()->second
00274                          : last->second;
00275 
00276   std::vector<SequenceRange> dropped;
00277   if (this->nak_sequence_.insert(SequenceRange(this->nak_sequence_.low(),
00278                                                lastSeq), dropped)) {
00279 
00280     for (size_t i = 0; i < dropped.size(); ++i) {
00281       this->reassembly_.data_unavailable(dropped[i]);
00282     }
00283 
00284     ACE_ERROR((LM_WARNING,
00285                 ACE_TEXT("(%P|%t) WARNING: ")
00286                 ACE_TEXT("ReliableSession::expire_naks: ")
00287                 ACE_TEXT("timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"),
00288                 (unsigned int)(this->remote_peer_ >> 32),
00289                 (unsigned int) this->remote_peer_,
00290                 this->nak_sequence_.low().getValue(),
00291                 lastSeq.getValue()));
00292   }
00293 
00294   // Clear expired repair requests:
00295   this->nak_requests_.erase(first, last);
00296   deliver_held_data();
00297 }

virtual bool OpenDDS::DCPS::ReliableSession::is_reliable (  )  [inline, virtual]

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 76 of file ReliableSession.h.

00076 { return true;}

void OpenDDS::DCPS::ReliableSession::nak_received ( ACE_Message_Block *  control  ) 

Definition at line 518 of file ReliableSession.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SingleSendBuffer::empty(), OpenDDS::DCPS::SequenceNumber::getValue(), header, OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::SingleSendBuffer::low(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::SingleSendBuffer::resend(), OpenDDS::DCPS::MulticastDataLink::send_buffer(), and send_nakack().

Referenced by control_received().

00519 {
00520   if (!this->active_) return; // sub send naks, then doesn't receive them.
00521 
00522   const TransportHeader& header =
00523     this->link_->receive_strategy()->received_header();
00524 
00525   Serializer serializer(control, header.swap_bytes());
00526 
00527   MulticastPeer local_peer;
00528   CORBA::ULong size = 0;
00529   serializer >> local_peer; // sent as remote_peer
00530   serializer >> size;
00531 
00532   std::vector<SequenceRange> ranges;
00533 
00534   for (CORBA::ULong i = 0; i < size; ++i) {
00535     SequenceRange range;
00536     serializer >> range.first;
00537     serializer >> range.second;
00538     ranges.push_back(range);
00539   }
00540 
00541   // Ignore sample if not destined for us:
00542   if ((local_peer != this->link_->local_peer())        // Not to us.
00543     || (this->remote_peer_ != header.source_)) return; // Not from the remote peer for this session.
00544 
00545   SingleSendBuffer* send_buffer = this->link_->send_buffer();
00546   // Broadcast a MULTICAST_NAKACK control sample before resending to suppress
00547   // repair requests for unrecoverable samples by providing a
00548   // new low-water mark for affected peers:
00549   if (!send_buffer->empty() && send_buffer->low() > ranges.begin()->first) {
00550       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00551         ACE_DEBUG ((LM_DEBUG,
00552                     ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
00553                     ACE_TEXT (" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"),
00554                     (unsigned int)(this->link()->local_peer() >> 32),
00555                     (unsigned int) this->link()->local_peer(),
00556                     (unsigned int)(this->remote_peer_ >> 32),
00557                     (unsigned int) this->remote_peer_,
00558                     send_buffer->low().getValue()));
00559       }
00560     send_nakack(send_buffer->low());
00561   }
00562 
00563   for (CORBA::ULong i = 0; i < size; ++i) {
00564     bool ret = send_buffer->resend(ranges[i]);
00565     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00566       ACE_DEBUG ((LM_DEBUG,
00567                   ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
00568                   ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"),
00569                   (unsigned int)(this->link()->local_peer() >> 32),
00570                   (unsigned int) this->link()->local_peer(),
00571                   (unsigned int)(this->remote_peer_ >> 32),
00572                   (unsigned int) this->remote_peer_,
00573                   ranges[i].first.getValue(), ranges[i].second.getValue(),
00574                   ret ? "SUCCESS" : "FAILED"));
00575     }
00576   }
00577 }

void OpenDDS::DCPS::ReliableSession::nakack_received ( ACE_Message_Block *  control  ) 

Definition at line 617 of file ReliableSession.cpp.

References OpenDDS::DCPS::DCPS_debug_level, deliver_held_data(), header, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, OpenDDS::DCPS::MulticastDataLink::local_peer(), nak_sequence_, and OpenDDS::DCPS::MulticastDataLink::receive_strategy().

Referenced by control_received().

00618 {
00619   if (this->active_) return; // pub send nakack, doesn't receive them.
00620 
00621   const TransportHeader& header =
00622     this->link_->receive_strategy()->received_header();
00623 
00624   // Not from the remote peer for this session.
00625   if (this->remote_peer_ != header.source_) return;
00626 
00627   Serializer serializer(control, header.swap_bytes());
00628 
00629   SequenceNumber low;
00630   serializer >> low;
00631 
00632   // MULTICAST_NAKACK control samples indicate data which cannot be
00633   // repaired by a remote peer; if any values were needed below
00634   // this value, then the sequence needs to be shifted:
00635   std::vector<SequenceRange> dropped;
00636   SequenceNumber range_low =  SequenceNumber();
00637   SequenceNumber range_high = low == SequenceNumber() ? SequenceNumber() : low.previous();
00638 
00639   if (range_low == SequenceNumber() && range_high == SequenceNumber()) {
00640 
00641     this->nak_sequence_.insert(range_low);
00642 
00643   } else if (this->nak_sequence_.insert(SequenceRange(range_low, range_high), dropped)) {
00644 
00645     for (size_t i = 0; i < dropped.size(); ++i) {
00646       this->reassembly_.data_unavailable(dropped[i]);
00647     }
00648 
00649     if (DCPS_debug_level > 0) {
00650       ACE_ERROR((LM_WARNING,
00651                 ACE_TEXT("(%P|%t) WARNING: ReliableSession::nakack_received ")
00652                 ACE_TEXT("local %#08x%08x remote %#08x%08x with low [%q] ")
00653                 ACE_TEXT("- some ranges dropped.\n"),
00654                 (unsigned int)(this->link()->local_peer() >> 32),
00655                 (unsigned int) this->link()->local_peer(),
00656                 (unsigned int)(this->remote_peer_ >> 32),
00657                 (unsigned int) this->remote_peer_,
00658                 low.getValue()));
00659     }
00660   }
00661   deliver_held_data();
00662 }

typedef OpenDDS::DCPS::ReliableSession::OPENDDS_MAP ( ACE_Time_Value  ,
SequenceNumber   
) [private]

OpenDDS::DCPS::ReliableSession::OPENDDS_MULTIMAP ( TransportHeaderSN  ,
ReceivedDataSample   
) [private]

Referenced by deliver_held_data(), ready_to_deliver(), and release_remote().

typedef OpenDDS::DCPS::ReliableSession::OPENDDS_SET ( SequenceRange   )  [private]

bool OpenDDS::DCPS::ReliableSession::ready_to_deliver ( const TransportHeader header,
const ReceivedDataSample data 
) [virtual]

Implements OpenDDS::DCPS::MulticastSession.

Definition at line 115 of file ReliableSession.cpp.

References OpenDDS::DCPS::MulticastSession::acked(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), deliver_held_data(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::SequenceNumber::getValue(), header, OpenDDS::DCPS::ReceivedDataSample::header_, held_lock_, OpenDDS::DCPS::DisjointSequence::low(), nak_sequence_, OPENDDS_MULTIMAP(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, and OpenDDS::DCPS::Transport_debug_level.

00117 {
00118   if (!acked()
00119       || nak_sequence_.disjoint()
00120       || (!nak_sequence_.empty() && nak_sequence_.cumulative_ack() != header.sequence_)
00121       || (!nak_sequence_.empty() && nak_sequence_.low() > 1)
00122       || (nak_sequence_.empty() && header.sequence_ > 1)) {
00123 
00124     if (Transport_debug_level > 5) {
00125       GuidConverter writer(data.header_.publication_id_);
00126       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00127                            ACE_TEXT(" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"),
00128                            header.sequence_.getValue(),
00129                            data.header_.sequence_.getValue(),
00130                            OPENDDS_STRING(writer).c_str()));
00131     }
00132     {
00133       ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, held_lock_, false);
00134 
00135       held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.sequence_, data));
00136 
00137       if (Transport_debug_level > 5) {
00138         OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator it = held_.begin();
00139         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00140                              ACE_TEXT(" held_ data currently contains: %d samples\n"),
00141                              held_.size()));
00142         while (it != held_.end()) {
00143           GuidConverter writer(it->second.header_.publication_id_);
00144           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00145                                ACE_TEXT(" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"),
00146                                it->first.getValue(),
00147                                it->second.header_.sequence_.getValue(),
00148                                OPENDDS_STRING(writer).c_str()));
00149           ++it;
00150         }
00151       }
00152     }
00153     deliver_held_data();
00154     return false;
00155   } else {
00156     if (Transport_debug_level > 5) {
00157       GuidConverter writer(data.header_.publication_id_);
00158       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00159                            ACE_TEXT(" tseq: %q data seq: %q from %C OK to deliver\n"),
00160                            header.sequence_.getValue(),
00161                            data.header_.sequence_.getValue(),
00162                            OPENDDS_STRING(writer).c_str()));
00163     }
00164     return true;
00165   }
00166 }

void OpenDDS::DCPS::ReliableSession::record_header_received ( const TransportHeader header  )  [virtual]

Implements OpenDDS::DCPS::MulticastSession.

Definition at line 100 of file ReliableSession.cpp.

References deliver_held_data(), header, OpenDDS::DCPS::DisjointSequence::insert(), and nak_sequence_.

00101 {
00102   // Not from the remote peer for this session.
00103   if (this->remote_peer_ != header.source_) return;
00104 
00105   // Active sessions don't need to track nak_sequence_
00106   if (this->active_) return;
00107 
00108   // Update nak sequence for seen sequence from remote peer
00109   this->nak_sequence_.insert(header.sequence_);
00110   deliver_held_data();
00111 }

void OpenDDS::DCPS::ReliableSession::release_remote ( const RepoId remote  )  [virtual]

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 200 of file ReliableSession.cpp.

References held_lock_, and OPENDDS_MULTIMAP().

00201 {
00202   ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_);
00203   if (!held_.empty()) {
00204     OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample)::iterator it = held_.begin();
00205     while (it != held_.end()) {
00206       if (it->second.header_.publication_id_ == remote) {
00207         held_.erase(it++);
00208       } else {
00209         it++;
00210       }
00211     }
00212   }
00213 }

void OpenDDS::DCPS::ReliableSession::send_nakack ( SequenceNumber  low  )  [virtual]

Definition at line 665 of file ReliableSession.cpp.

References OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::MULTICAST_NAKACK, and OpenDDS::DCPS::MulticastSession::send_control().

Referenced by nak_received().

00666 {
00667   size_t len = sizeof(low.getValue());
00668 
00669   ACE_Message_Block* data;
00670   ACE_NEW(data, ACE_Message_Block(len));
00671 
00672   Serializer serializer(data);
00673 
00674   serializer << low;
00675   // Broadcast control sample to all peers:
00676   send_control(MULTICAST_NAKACK, data);
00677 }

void OpenDDS::DCPS::ReliableSession::send_naks ( DisjointSequence found  ) 

Definition at line 580 of file ReliableSession.cpp.

References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MulticastSession::remote_peer_, and OpenDDS::DCPS::MulticastSession::send_control().

00581 {
00582   const std::vector<SequenceRange> ranges(received.missing_sequence_ranges());
00583 
00584   CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
00585 
00586   size_t len = sizeof(this->remote_peer_)
00587              + sizeof(size)
00588              + size * 2 * sizeof(SequenceNumber);
00589 
00590   ACE_Message_Block* data;
00591   ACE_NEW(data, ACE_Message_Block(len));
00592 
00593   Serializer serializer(data);
00594 
00595   serializer << this->remote_peer_;
00596   serializer << size;
00597   for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00598        iter != ranges.end(); ++iter) {
00599     serializer << iter->first;
00600     serializer << iter->second;
00601     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00602       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks (Disjoint) ")
00603                             ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
00604                             (unsigned int)(this->link()->local_peer() >> 32),
00605                             (unsigned int) this->link()->local_peer(),
00606                             (unsigned int)(this->remote_peer_ >> 32),
00607                             (unsigned int) this->remote_peer_,
00608                             iter->first.getValue(), iter->second.getValue()));
00609     }
00610   }
00611   // Send control sample to remote peer:
00612   send_control(MULTICAST_NAK, data);
00613 }

void OpenDDS::DCPS::ReliableSession::send_naks (  )  [virtual]

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 300 of file ReliableSession.cpp.

References OpenDDS::DCPS::MulticastDataLink::config(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MulticastInst::nak_delay_intervals_, OpenDDS::DCPS::MulticastInst::nak_max_, nak_peers_, nak_requests_, nak_sequence_, OpenDDS::DCPS::MulticastSession::remote_peer_, and OpenDDS::DCPS::MulticastSession::send_control().

Referenced by OpenDDS::DCPS::NakWatchdog::on_interval().

00301 {
00302   // Could get data samples before syn control message.
00303   // No use nak'ing until syn control message is received and session is acked.
00304   if (!this->acked()) {
00305     if (DCPS_debug_level > 5) {
00306       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00307                            ACE_TEXT("remote %#08x%08x session NOT acked yet, don't send naks\n"),
00308                            (unsigned int)(this->link()->local_peer() >> 32),
00309                            (unsigned int) this->link()->local_peer(),
00310                            (unsigned int)(this->remote_peer_ >> 32),
00311                            (unsigned int) this->remote_peer_));
00312     }
00313     return;
00314   }
00315 
00316   if (DCPS_debug_level > 5) {
00317     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00318                          ACE_TEXT("remote %#08x%08x nak request size %d \n"),
00319                          (unsigned int)(this->link()->local_peer() >> 32),
00320                          (unsigned int) this->link()->local_peer(),
00321                          (unsigned int)(this->remote_peer_ >> 32),
00322                          (unsigned int) this->remote_peer_,
00323                          this->nak_requests_.size()));
00324   }
00325 
00326   if (!(this->nak_sequence_.low() > 1) && !this->nak_sequence_.disjoint()) {
00327     if (DCPS_debug_level > 5) {
00328       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00329                            ACE_TEXT("remote %#08x%08x nak sequence not disjoint, don't send naks \n"),
00330                            (unsigned int)(this->link()->local_peer() >> 32),
00331                            (unsigned int) this->link()->local_peer(),
00332                            (unsigned int)(this->remote_peer_ >> 32),
00333                            (unsigned int) this->remote_peer_));
00334     }
00335 
00336     if (DCPS_debug_level > 9) {
00337       const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
00338       for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00339            iter != ranges.end(); ++iter) {
00340           ACE_DEBUG((LM_DEBUG, "(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n",
00341                      (unsigned int)(this->link()->local_peer() >> 32),
00342                      (unsigned int) this->link()->local_peer(),
00343                      (unsigned int)(this->remote_peer_ >> 32),
00344                      (unsigned int) this->remote_peer_,
00345                      iter->first.getValue(),
00346                      iter->second.getValue()));
00347       }
00348     }
00349     return;  // nothing to send
00350   }
00351 
00352   ACE_Time_Value now(ACE_OS::gettimeofday());
00353 
00354   // Record low-water mark for this interval; this value will
00355   // be used to reset the low-water mark in the event the remote
00356   // peer becomes unresponsive:
00357   if (this->nak_sequence_.low() > 1) {
00358     this->nak_requests_[now] = SequenceNumber();
00359   } else {
00360     this->nak_requests_[now] = this->nak_sequence_.cumulative_ack();
00361   }
00362 
00363   typedef std::vector<SequenceRange> RangeVector;
00364   RangeVector ignored;
00365 
00366   /// The range first - second will be skipped (no naks sent for it).
00367   SequenceNumber first;
00368   SequenceNumber second;
00369 
00370   NakRequestMap::reverse_iterator itr(this->nak_requests_.rbegin());
00371 
00372   if (this->nak_requests_.size() > 1) {
00373     // The sequences between rbegin - 1 and rbegin will not be ignored for naking.
00374     ++itr;
00375 
00376     size_t nak_delay_intervals = this->link()->config()->nak_delay_intervals_;
00377     size_t nak_max = this->link()->config()->nak_max_;
00378     size_t sz = this->nak_requests_.size();
00379 
00380     // Image i is the index of element in nak_requests_ in reverse order.
00381     // index 0 sequence is most recent high water mark.
00382     // e.g index , 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
00383     //  0 (rbegin) is always skipped because missing sample between 1 and 0 interval
00384     //  should always be naked.,
00385     //  if nak_delay_intervals=4, nak_max=3, any sequence between 5 - 1, 10 - 6, 15 - 11
00386     //  are skipped for naking due to nak_delay_intervals and 20 - 16 are skipped for
00387     //  naking due to nak_max.
00388     for (size_t i = 1; i < sz; ++i) {
00389       if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) {
00390         if (first != SequenceNumber()) {
00391           first = this->nak_requests_.begin()->second;
00392         }
00393         else {
00394           ignored.push_back(std::make_pair(this->nak_requests_.begin()->second, itr->second));
00395         }
00396         break;
00397       }
00398 
00399       if (i % (nak_delay_intervals + 1) == 1) {
00400         second = itr->second;
00401       }
00402       if (second != SequenceNumber()) {
00403         first = itr->second;
00404       }
00405 
00406       if (i % (nak_delay_intervals + 1) == 0) {
00407         first = itr->second;
00408 
00409         if (first != SequenceNumber() && second != SequenceNumber()) {
00410           ignored.push_back(std::make_pair(first, second));
00411           first = SequenceNumber();
00412           second = SequenceNumber();
00413         }
00414       }
00415 
00416       ++itr;
00417     }
00418 
00419     if (first != SequenceNumber() && second != SequenceNumber() && first != second) {
00420       ignored.push_back(std::make_pair(first, second));
00421     }
00422   }
00423 
00424   // Take a copy to facilitate temporary suppression:
00425   DisjointSequence received(this->nak_sequence_);
00426   if (DCPS_debug_level > 0) {
00427     received.dump();
00428   }
00429 
00430   size_t sz = ignored.size();
00431   for (size_t i = 0; i < sz; ++i) {
00432 
00433     if (ignored[i].second > received.cumulative_ack()) {
00434       SequenceNumber high = ignored[i].second;
00435       SequenceNumber low = ignored[i].first;
00436       if (low < received.cumulative_ack()) {
00437         low = received.cumulative_ack();
00438       }
00439 
00440       if (DCPS_debug_level > 5) {
00441         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00442           ACE_TEXT("remote %#08x%08x ignore missing [%q - %q]\n"),
00443           (unsigned int)(this->link()->local_peer() >> 32),
00444           (unsigned int) this->link()->local_peer(),
00445           (unsigned int)(this->remote_peer_ >> 32),
00446           (unsigned int) this->remote_peer_,
00447           low.getValue(), high.getValue()));
00448       }
00449 
00450       // Make contiguous between ignored sequences.
00451       received.insert(SequenceRange(low, high));
00452     }
00453   }
00454 
00455   for (NakPeerSet::iterator it(this->nak_peers_.begin());
00456        it != this->nak_peers_.end(); ++it) {
00457     // Update sequence to temporarily suppress repair requests for
00458     // ranges already requested by other peers for this interval:
00459     received.insert(*it);
00460   }
00461   bool sending_naks = false;
00462   if (received.low() > 1){
00463     //Special case: nak from beginning to make sure no missing sequence
00464     //number below the first received
00465     sending_naks = true;
00466     std::vector<SequenceRange> ranges;
00467     ranges.push_back(SequenceRange(SequenceNumber(), received.low()));
00468 
00469     CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
00470 
00471     size_t len = sizeof(this->remote_peer_)
00472                + sizeof(size)
00473                + size * 2 * sizeof(SequenceNumber);
00474 
00475     ACE_Message_Block* data;
00476     ACE_NEW(data, ACE_Message_Block(len));
00477 
00478     Serializer serializer(data);
00479 
00480     serializer << this->remote_peer_;
00481     serializer << size;
00482     for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00483          iter != ranges.end(); ++iter) {
00484       serializer << iter->first;
00485       serializer << iter->second;
00486       if (DCPS_debug_level > 0) {
00487         ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks ")
00488                               ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
00489                               (unsigned int)(this->link()->local_peer() >> 32),
00490                               (unsigned int) this->link()->local_peer(),
00491                               (unsigned int)(this->remote_peer_ >> 32),
00492                               (unsigned int) this->remote_peer_,
00493                               iter->first.getValue(), iter->second.getValue()));
00494       }
00495     }
00496     // Send control sample to remote peer:
00497     send_control(MULTICAST_NAK, data);
00498   }
00499   if (received.disjoint()) {
00500     sending_naks = true;
00501     send_naks(received);
00502   }
00503 
00504   if (!sending_naks && DCPS_debug_level > 5){
00505     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00506                          ACE_TEXT("remote %#08x%08x received sequence not disjoint, don't send naks \n"),
00507                          (unsigned int)(this->link()->local_peer() >> 32),
00508                          (unsigned int) this->link()->local_peer(),
00509                          (unsigned int)(this->remote_peer_ >> 32),
00510                          (unsigned int) this->remote_peer_));
00511   }
00512 
00513   // Clear peer repair requests:
00514   this->nak_peers_.clear();
00515 }

bool OpenDDS::DCPS::ReliableSession::start ( bool  active,
bool  acked 
) [virtual]

Implements OpenDDS::DCPS::MulticastSession.

Definition at line 680 of file ReliableSession.cpp.

References OpenDDS::DCPS::MulticastSession::active_, OpenDDS::DCPS::MulticastSession::set_acked(), and OpenDDS::DCPS::MulticastSession::started_.

00681 {
00682   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->start_lock_, false);
00683 
00684   if (this->started_) {
00685     return true;  // already started
00686   }
00687 
00688   this->active_  = active;
00689   {
00690     //can't call accept_datalink while holding lock due to possible reactor deadlock with passive_connection
00691     ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, this->reverse_start_lock_, false);
00692 
00693     // A watchdog timer is scheduled to periodically check for gaps in
00694     // received data. If a gap is discovered, MULTICAST_NAK control
00695     // samples will be sent to initiate repairs.
00696     // Only subscriber send naks so just schedule for sub role.
00697     if (!active) {
00698       if (acked) {
00699         this->set_acked();
00700       }
00701       if (!this->nak_watchdog_->schedule()) {
00702         ACE_ERROR_RETURN((LM_ERROR,
00703                           ACE_TEXT("(%P|%t) ERROR: ")
00704                           ACE_TEXT("ReliableSession::start: ")
00705                           ACE_TEXT("failed to schedule NAK watchdog!\n")),
00706                          false);
00707       }
00708     }
00709 
00710     // Active peers schedule a watchdog timer to initiate a 2-way
00711     // handshake to verify that passive endpoints can send/receive
00712     // data reliably. This process must be executed using the
00713     // transport reactor thread to prevent blocking.
00714     // Only publisher send syn so just schedule for pub role.
00715     if (active && !this->start_syn()) {
00716       this->nak_watchdog_->cancel();
00717       ACE_ERROR_RETURN((LM_ERROR,
00718                         ACE_TEXT("(%P|%t) ERROR: ")
00719                         ACE_TEXT("ReliableSession::start: ")
00720                         ACE_TEXT("failed to schedule SYN watchdog!\n")),
00721                        false);
00722     }
00723   } //Reacquire start_lock_ after releasing unlock_guard with release_start_lock_
00724 
00725   return this->started_ = true;
00726 }

void OpenDDS::DCPS::ReliableSession::stop (  )  [virtual]

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 729 of file ReliableSession.cpp.

References OpenDDS::DCPS::DataLinkWatchdog::cancel(), nak_watchdog_, and OpenDDS::DCPS::MulticastSession::stop().

00730 {
00731   MulticastSession::stop();
00732   this->nak_watchdog_->cancel();
00733 }

void OpenDDS::DCPS::ReliableSession::syn_hook ( const SequenceNumber seq  )  [virtual]

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 245 of file ReliableSession.cpp.

References OpenDDS::DCPS::DisjointSequence::insert(), nak_sequence_, and OpenDDS::DCPS::DisjointSequence::reset().

00246 {
00247   const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
00248   this->nak_sequence_.reset();
00249   this->nak_sequence_.insert(seq);
00250 
00251   for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00252        iter != ranges.end(); ++iter) {
00253     this->nak_sequence_.insert(SequenceRange(iter->first, iter->second));
00254   }
00255 }


Member Data Documentation

ACE_Thread_Mutex OpenDDS::DCPS::ReliableSession::held_lock_ [private]

Definition at line 88 of file ReliableSession.h.

Referenced by deliver_held_data(), ready_to_deliver(), and release_remote().

NakPeerSet OpenDDS::DCPS::ReliableSession::nak_peers_ [private]

Definition at line 93 of file ReliableSession.h.

Referenced by send_naks().

NakRequestMap OpenDDS::DCPS::ReliableSession::nak_requests_ [private]

Definition at line 86 of file ReliableSession.h.

Referenced by expire_naks(), and send_naks().

DisjointSequence OpenDDS::DCPS::ReliableSession::nak_sequence_ [private]

Definition at line 83 of file ReliableSession.h.

Referenced by check_header(), deliver_held_data(), nakack_received(), ready_to_deliver(), record_header_received(), send_naks(), and syn_hook().

NakWatchdog* OpenDDS::DCPS::ReliableSession::nak_watchdog_ [private]

Definition at line 81 of file ReliableSession.h.

Referenced by stop(), and ~ReliableSession().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:35 2016 for OpenDDS by  doxygen 1.4.7