ReliableSession.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "ReliableSession.h"
00009 
00010 #include "MulticastDataLink.h"
00011 #include "MulticastInst.h"
00012 #include "MulticastReceiveStrategy.h"
00013 
00014 #include "ace/Global_Macros.h"
00015 #include "ace/Time_Value.h"
00016 #include "ace/Truncate.h"
00017 
00018 #include "dds/DCPS/Serializer.h"
00019 #include "dds/DCPS/GuidConverter.h"
00020 
00021 #include <cstdlib>
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 NakWatchdog::NakWatchdog(ACE_Reactor* reactor,
00027                          ACE_thread_t owner,
00028                          ReliableSession* session)
00029   : DataLinkWatchdog(reactor, owner)
00030   , session_(session)
00031 {
00032 }
00033 
00034 ACE_Time_Value
00035 NakWatchdog::next_interval()
00036 {
00037   MulticastInst* config = this->session_->link()->config();
00038   ACE_Time_Value interval(config->nak_interval_);
00039 
00040   // Apply random backoff to minimize potential collisions:
00041   interval *= static_cast<double>(std::rand()) /
00042               static_cast<double>(RAND_MAX) + 1.0;
00043 
00044   return interval;
00045 }
00046 
00047 void
00048 NakWatchdog::on_interval(const void* /*arg*/)
00049 {
00050   // Expire outstanding repair requests that have not yet been
00051   // fulfilled; this prevents NAK implosions due to remote
00052   // peers becoming unresponsive:
00053   this->session_->expire_naks();
00054 
00055   // Initiate repairs by sending MULTICAST_NAK control samples
00056   // to remote peers from which we are missing data:
00057   this->session_->send_naks();
00058 }
00059 
00060 ReliableSession::ReliableSession(ACE_Reactor* reactor,
00061                                  ACE_thread_t owner,
00062                                  MulticastDataLink* link,
00063                                  MulticastPeer remote_peer)
00064   : MulticastSession(reactor, owner, link, remote_peer),
00065     nak_watchdog_(new NakWatchdog (reactor, owner, this))
00066 {
00067 }
00068 
00069 ReliableSession::~ReliableSession()
00070 {
00071   nak_watchdog_->cancel();
00072   nak_watchdog_->wait();
00073   nak_watchdog_->destroy();
00074 }
00075 
00076 bool
00077 NakWatchdog::reactor_is_shut_down() const
00078 {
00079   return session_->link()->transport()->is_shut_down();
00080 }
00081 
00082 bool
00083 ReliableSession::check_header(const TransportHeader& header)
00084 {
00085   // Not from the remote peer for this session.
00086   if (this->remote_peer_ != header.source_) return false;
00087 
00088   // Active sessions don't need to track nak_sequence_
00089   if (this->active_) return true;
00090 
00091   // Update last seen sequence for remote peer; return false if we
00092   // have already seen this datagram to prevent duplicate delivery
00093   // Note: SN 2 is first SN recorded - fill in up to 2 when rcvd
00094   return this->nak_sequence_.insert(SequenceRange(
00095       header.sequence_ == 2 ? SequenceNumber() : header.sequence_,
00096       header.sequence_));
00097 }
00098 
00099 void
00100 ReliableSession::record_header_received(const TransportHeader& header)
00101 {
00102   // Not from the remote peer for this session.
00103   if (this->remote_peer_ != header.source_) return;
00104 
00105   // Active sessions don't need to track nak_sequence_
00106   if (this->active_) return;
00107 
00108   // Update nak sequence for seen sequence from remote peer
00109   this->nak_sequence_.insert(header.sequence_);
00110   deliver_held_data();
00111 }
00112 
00113 
00114 bool
00115 ReliableSession::ready_to_deliver(const TransportHeader& header,
00116                                   const ReceivedDataSample& data)
00117 {
00118   if (!acked()
00119       || nak_sequence_.disjoint()
00120       || (!nak_sequence_.empty() && nak_sequence_.cumulative_ack() != header.sequence_)
00121       || (!nak_sequence_.empty() && nak_sequence_.low() > 1)
00122       || (nak_sequence_.empty() && header.sequence_ > 1)) {
00123 
00124     if (Transport_debug_level > 5) {
00125       GuidConverter writer(data.header_.publication_id_);
00126       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00127                            ACE_TEXT(" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"),
00128                            header.sequence_.getValue(),
00129                            data.header_.sequence_.getValue(),
00130                            OPENDDS_STRING(writer).c_str()));
00131     }
00132     {
00133       ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, held_lock_, false);
00134 
00135       held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.sequence_, data));
00136 
00137       if (Transport_debug_level > 5) {
00138         OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator it = held_.begin();
00139         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00140                              ACE_TEXT(" held_ data currently contains: %d samples\n"),
00141                              held_.size()));
00142         while (it != held_.end()) {
00143           GuidConverter writer(it->second.header_.publication_id_);
00144           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00145                                ACE_TEXT(" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"),
00146                                it->first.getValue(),
00147                                it->second.header_.sequence_.getValue(),
00148                                OPENDDS_STRING(writer).c_str()));
00149           ++it;
00150         }
00151       }
00152     }
00153     deliver_held_data();
00154     return false;
00155   } else {
00156     if (Transport_debug_level > 5) {
00157       GuidConverter writer(data.header_.publication_id_);
00158       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00159                            ACE_TEXT(" tseq: %q data seq: %q from %C OK to deliver\n"),
00160                            header.sequence_.getValue(),
00161                            data.header_.sequence_.getValue(),
00162                            OPENDDS_STRING(writer).c_str()));
00163     }
00164     return true;
00165   }
00166 }
00167 
00168 void
00169 ReliableSession::deliver_held_data()
00170 {
00171   if (nak_sequence_.empty() || nak_sequence_.low() > 1) return;
00172 
00173   OPENDDS_VECTOR(ReceivedDataSample) to_deliver;
00174   const SequenceNumber ca = nak_sequence_.cumulative_ack();
00175 
00176   {
00177     ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_);
00178 
00179     typedef OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator iter;
00180     const iter end = this->held_.upper_bound(ca);
00181     for (iter it = this->held_.begin(); it != end; /*increment in loop body*/) {
00182       if (Transport_debug_level > 5) {
00183         GuidConverter writer(it->second.header_.publication_id_);
00184         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::deliver_held_data -")
00185                              ACE_TEXT(" deliver tseq: %q dseq: %q from %C\n"),
00186                              it->first.getValue(),
00187                              it->second.header_.sequence_.getValue(),
00188                              OPENDDS_STRING(writer).c_str()));
00189       }
00190       to_deliver.push_back(it->second);
00191       this->held_.erase(it++);
00192     }
00193   }
00194   for (size_t i = 0; i < to_deliver.size(); ++i) {
00195     this->link_->data_received(to_deliver.at(i));
00196   }
00197 }
00198 
00199 void
00200 ReliableSession::release_remote(const RepoId& remote)
00201 {
00202   ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_);
00203   if (!held_.empty()) {
00204     OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample)::iterator it = held_.begin();
00205     while (it != held_.end()) {
00206       if (it->second.header_.publication_id_ == remote) {
00207         held_.erase(it++);
00208       } else {
00209         it++;
00210       }
00211     }
00212   }
00213 }
00214 
00215 
00216 bool
00217 ReliableSession::control_received(char submessage_id,
00218                                   ACE_Message_Block* control)
00219 {
00220   if (MulticastSession::control_received(submessage_id, control)) {
00221     return true; // base class handled message
00222   }
00223 
00224   switch (submessage_id) {
00225   case MULTICAST_NAK:
00226     nak_received(control);
00227     break;
00228 
00229   case MULTICAST_NAKACK:
00230     nakack_received(control);
00231     break;
00232 
00233   default:
00234     ACE_ERROR((LM_WARNING,
00235                ACE_TEXT("(%P|%t) WARNING: ")
00236                ACE_TEXT("ReliableSession::control_received: ")
00237                ACE_TEXT("unknown TRANSPORT_CONTROL submessage: 0x%x!\n"),
00238                submessage_id));
00239     break;
00240   }
00241   return true;
00242 }
00243 
00244 void
00245 ReliableSession::syn_hook(const SequenceNumber& seq)
00246 {
00247   const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
00248   this->nak_sequence_.reset();
00249   this->nak_sequence_.insert(seq);
00250 
00251   for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00252        iter != ranges.end(); ++iter) {
00253     this->nak_sequence_.insert(SequenceRange(iter->first, iter->second));
00254   }
00255 }
00256 
00257 void
00258 ReliableSession::expire_naks()
00259 {
00260   if (this->nak_requests_.empty()) return; // nothing to expire
00261 
00262   ACE_Time_Value deadline(ACE_OS::gettimeofday());
00263   deadline -= this->link_->config()->nak_timeout_;
00264 
00265   NakRequestMap::iterator first(this->nak_requests_.begin());
00266   NakRequestMap::iterator last(this->nak_requests_.upper_bound(deadline));
00267 
00268   if (first == last) return; // nothing to expire
00269 
00270   // Skip unrecoverable datagrams to
00271   // re-establish a baseline to detect future reception gaps.
00272   SequenceNumber lastSeq = (last == this->nak_requests_.end())
00273                          ? this->nak_requests_.rbegin()->second
00274                          : last->second;
00275 
00276   std::vector<SequenceRange> dropped;
00277   if (this->nak_sequence_.insert(SequenceRange(this->nak_sequence_.low(),
00278                                                lastSeq), dropped)) {
00279 
00280     for (size_t i = 0; i < dropped.size(); ++i) {
00281       this->reassembly_.data_unavailable(dropped[i]);
00282     }
00283 
00284     ACE_ERROR((LM_WARNING,
00285                 ACE_TEXT("(%P|%t) WARNING: ")
00286                 ACE_TEXT("ReliableSession::expire_naks: ")
00287                 ACE_TEXT("timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"),
00288                 (unsigned int)(this->remote_peer_ >> 32),
00289                 (unsigned int) this->remote_peer_,
00290                 this->nak_sequence_.low().getValue(),
00291                 lastSeq.getValue()));
00292   }
00293 
00294   // Clear expired repair requests:
00295   this->nak_requests_.erase(first, last);
00296   deliver_held_data();
00297 }
00298 
00299 void
00300 ReliableSession::send_naks()
00301 {
00302   // Could get data samples before syn control message.
00303   // No use nak'ing until syn control message is received and session is acked.
00304   if (!this->acked()) {
00305     if (DCPS_debug_level > 5) {
00306       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00307                            ACE_TEXT("remote %#08x%08x session NOT acked yet, don't send naks\n"),
00308                            (unsigned int)(this->link()->local_peer() >> 32),
00309                            (unsigned int) this->link()->local_peer(),
00310                            (unsigned int)(this->remote_peer_ >> 32),
00311                            (unsigned int) this->remote_peer_));
00312     }
00313     return;
00314   }
00315 
00316   if (DCPS_debug_level > 5) {
00317     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00318                          ACE_TEXT("remote %#08x%08x nak request size %d \n"),
00319                          (unsigned int)(this->link()->local_peer() >> 32),
00320                          (unsigned int) this->link()->local_peer(),
00321                          (unsigned int)(this->remote_peer_ >> 32),
00322                          (unsigned int) this->remote_peer_,
00323                          this->nak_requests_.size()));
00324   }
00325 
00326   if (!(this->nak_sequence_.low() > 1) && !this->nak_sequence_.disjoint()) {
00327     if (DCPS_debug_level > 5) {
00328       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00329                            ACE_TEXT("remote %#08x%08x nak sequence not disjoint, don't send naks \n"),
00330                            (unsigned int)(this->link()->local_peer() >> 32),
00331                            (unsigned int) this->link()->local_peer(),
00332                            (unsigned int)(this->remote_peer_ >> 32),
00333                            (unsigned int) this->remote_peer_));
00334     }
00335 
00336     if (DCPS_debug_level > 9) {
00337       const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
00338       for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00339            iter != ranges.end(); ++iter) {
00340           ACE_DEBUG((LM_DEBUG, "(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n",
00341                      (unsigned int)(this->link()->local_peer() >> 32),
00342                      (unsigned int) this->link()->local_peer(),
00343                      (unsigned int)(this->remote_peer_ >> 32),
00344                      (unsigned int) this->remote_peer_,
00345                      iter->first.getValue(),
00346                      iter->second.getValue()));
00347       }
00348     }
00349     return;  // nothing to send
00350   }
00351 
00352   ACE_Time_Value now(ACE_OS::gettimeofday());
00353 
00354   // Record low-water mark for this interval; this value will
00355   // be used to reset the low-water mark in the event the remote
00356   // peer becomes unresponsive:
00357   if (this->nak_sequence_.low() > 1) {
00358     this->nak_requests_[now] = SequenceNumber();
00359   } else {
00360     this->nak_requests_[now] = this->nak_sequence_.cumulative_ack();
00361   }
00362 
00363   typedef std::vector<SequenceRange> RangeVector;
00364   RangeVector ignored;
00365 
00366   /// The range first - second will be skipped (no naks sent for it).
00367   SequenceNumber first;
00368   SequenceNumber second;
00369 
00370   NakRequestMap::reverse_iterator itr(this->nak_requests_.rbegin());
00371 
00372   if (this->nak_requests_.size() > 1) {
00373     // The sequences between rbegin - 1 and rbegin will not be ignored for naking.
00374     ++itr;
00375 
00376     size_t nak_delay_intervals = this->link()->config()->nak_delay_intervals_;
00377     size_t nak_max = this->link()->config()->nak_max_;
00378     size_t sz = this->nak_requests_.size();
00379 
00380     // Image i is the index of element in nak_requests_ in reverse order.
00381     // index 0 sequence is most recent high water mark.
00382     // e.g index , 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
00383     //  0 (rbegin) is always skipped because missing sample between 1 and 0 interval
00384     //  should always be naked.,
00385     //  if nak_delay_intervals=4, nak_max=3, any sequence between 5 - 1, 10 - 6, 15 - 11
00386     //  are skipped for naking due to nak_delay_intervals and 20 - 16 are skipped for
00387     //  naking due to nak_max.
00388     for (size_t i = 1; i < sz; ++i) {
00389       if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) {
00390         if (first != SequenceNumber()) {
00391           first = this->nak_requests_.begin()->second;
00392         }
00393         else {
00394           ignored.push_back(std::make_pair(this->nak_requests_.begin()->second, itr->second));
00395         }
00396         break;
00397       }
00398 
00399       if (i % (nak_delay_intervals + 1) == 1) {
00400         second = itr->second;
00401       }
00402       if (second != SequenceNumber()) {
00403         first = itr->second;
00404       }
00405 
00406       if (i % (nak_delay_intervals + 1) == 0) {
00407         first = itr->second;
00408 
00409         if (first != SequenceNumber() && second != SequenceNumber()) {
00410           ignored.push_back(std::make_pair(first, second));
00411           first = SequenceNumber();
00412           second = SequenceNumber();
00413         }
00414       }
00415 
00416       ++itr;
00417     }
00418 
00419     if (first != SequenceNumber() && second != SequenceNumber() && first != second) {
00420       ignored.push_back(std::make_pair(first, second));
00421     }
00422   }
00423 
00424   // Take a copy to facilitate temporary suppression:
00425   DisjointSequence received(this->nak_sequence_);
00426   if (DCPS_debug_level > 0) {
00427     received.dump();
00428   }
00429 
00430   size_t sz = ignored.size();
00431   for (size_t i = 0; i < sz; ++i) {
00432 
00433     if (ignored[i].second > received.cumulative_ack()) {
00434       SequenceNumber high = ignored[i].second;
00435       SequenceNumber low = ignored[i].first;
00436       if (low < received.cumulative_ack()) {
00437         low = received.cumulative_ack();
00438       }
00439 
00440       if (DCPS_debug_level > 5) {
00441         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00442           ACE_TEXT("remote %#08x%08x ignore missing [%q - %q]\n"),
00443           (unsigned int)(this->link()->local_peer() >> 32),
00444           (unsigned int) this->link()->local_peer(),
00445           (unsigned int)(this->remote_peer_ >> 32),
00446           (unsigned int) this->remote_peer_,
00447           low.getValue(), high.getValue()));
00448       }
00449 
00450       // Make contiguous between ignored sequences.
00451       received.insert(SequenceRange(low, high));
00452     }
00453   }
00454 
00455   for (NakPeerSet::iterator it(this->nak_peers_.begin());
00456        it != this->nak_peers_.end(); ++it) {
00457     // Update sequence to temporarily suppress repair requests for
00458     // ranges already requested by other peers for this interval:
00459     received.insert(*it);
00460   }
00461   bool sending_naks = false;
00462   if (received.low() > 1){
00463     //Special case: nak from beginning to make sure no missing sequence
00464     //number below the first received
00465     sending_naks = true;
00466     std::vector<SequenceRange> ranges;
00467     ranges.push_back(SequenceRange(SequenceNumber(), received.low()));
00468 
00469     CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
00470 
00471     size_t len = sizeof(this->remote_peer_)
00472                + sizeof(size)
00473                + size * 2 * sizeof(SequenceNumber);
00474 
00475     ACE_Message_Block* data;
00476     ACE_NEW(data, ACE_Message_Block(len));
00477 
00478     Serializer serializer(data);
00479 
00480     serializer << this->remote_peer_;
00481     serializer << size;
00482     for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00483          iter != ranges.end(); ++iter) {
00484       serializer << iter->first;
00485       serializer << iter->second;
00486       if (DCPS_debug_level > 0) {
00487         ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks ")
00488                               ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
00489                               (unsigned int)(this->link()->local_peer() >> 32),
00490                               (unsigned int) this->link()->local_peer(),
00491                               (unsigned int)(this->remote_peer_ >> 32),
00492                               (unsigned int) this->remote_peer_,
00493                               iter->first.getValue(), iter->second.getValue()));
00494       }
00495     }
00496     // Send control sample to remote peer:
00497     send_control(MULTICAST_NAK, data);
00498   }
00499   if (received.disjoint()) {
00500     sending_naks = true;
00501     send_naks(received);
00502   }
00503 
00504   if (!sending_naks && DCPS_debug_level > 5){
00505     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00506                          ACE_TEXT("remote %#08x%08x received sequence not disjoint, don't send naks \n"),
00507                          (unsigned int)(this->link()->local_peer() >> 32),
00508                          (unsigned int) this->link()->local_peer(),
00509                          (unsigned int)(this->remote_peer_ >> 32),
00510                          (unsigned int) this->remote_peer_));
00511   }
00512 
00513   // Clear peer repair requests:
00514   this->nak_peers_.clear();
00515 }
00516 
00517 void
00518 ReliableSession::nak_received(ACE_Message_Block* control)
00519 {
00520   if (!this->active_) return; // sub send naks, then doesn't receive them.
00521 
00522   const TransportHeader& header =
00523     this->link_->receive_strategy()->received_header();
00524 
00525   Serializer serializer(control, header.swap_bytes());
00526 
00527   MulticastPeer local_peer;
00528   CORBA::ULong size = 0;
00529   serializer >> local_peer; // sent as remote_peer
00530   serializer >> size;
00531 
00532   std::vector<SequenceRange> ranges;
00533 
00534   for (CORBA::ULong i = 0; i < size; ++i) {
00535     SequenceRange range;
00536     serializer >> range.first;
00537     serializer >> range.second;
00538     ranges.push_back(range);
00539   }
00540 
00541   // Ignore sample if not destined for us:
00542   if ((local_peer != this->link_->local_peer())        // Not to us.
00543     || (this->remote_peer_ != header.source_)) return; // Not from the remote peer for this session.
00544 
00545   SingleSendBuffer* send_buffer = this->link_->send_buffer();
00546   // Broadcast a MULTICAST_NAKACK control sample before resending to suppress
00547   // repair requests for unrecoverable samples by providing a
00548   // new low-water mark for affected peers:
00549   if (!send_buffer->empty() && send_buffer->low() > ranges.begin()->first) {
00550       if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00551         ACE_DEBUG ((LM_DEBUG,
00552                     ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
00553                     ACE_TEXT (" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"),
00554                     (unsigned int)(this->link()->local_peer() >> 32),
00555                     (unsigned int) this->link()->local_peer(),
00556                     (unsigned int)(this->remote_peer_ >> 32),
00557                     (unsigned int) this->remote_peer_,
00558                     send_buffer->low().getValue()));
00559       }
00560     send_nakack(send_buffer->low());
00561   }
00562 
00563   for (CORBA::ULong i = 0; i < size; ++i) {
00564     bool ret = send_buffer->resend(ranges[i]);
00565     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00566       ACE_DEBUG ((LM_DEBUG,
00567                   ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
00568                   ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"),
00569                   (unsigned int)(this->link()->local_peer() >> 32),
00570                   (unsigned int) this->link()->local_peer(),
00571                   (unsigned int)(this->remote_peer_ >> 32),
00572                   (unsigned int) this->remote_peer_,
00573                   ranges[i].first.getValue(), ranges[i].second.getValue(),
00574                   ret ? "SUCCESS" : "FAILED"));
00575     }
00576   }
00577 }
00578 
00579 void
00580 ReliableSession::send_naks(DisjointSequence& received)
00581 {
00582   const std::vector<SequenceRange> ranges(received.missing_sequence_ranges());
00583 
00584   CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
00585 
00586   size_t len = sizeof(this->remote_peer_)
00587              + sizeof(size)
00588              + size * 2 * sizeof(SequenceNumber);
00589 
00590   ACE_Message_Block* data;
00591   ACE_NEW(data, ACE_Message_Block(len));
00592 
00593   Serializer serializer(data);
00594 
00595   serializer << this->remote_peer_;
00596   serializer << size;
00597   for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00598        iter != ranges.end(); ++iter) {
00599     serializer << iter->first;
00600     serializer << iter->second;
00601     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00602       ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks (Disjoint) ")
00603                             ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
00604                             (unsigned int)(this->link()->local_peer() >> 32),
00605                             (unsigned int) this->link()->local_peer(),
00606                             (unsigned int)(this->remote_peer_ >> 32),
00607                             (unsigned int) this->remote_peer_,
00608                             iter->first.getValue(), iter->second.getValue()));
00609     }
00610   }
00611   // Send control sample to remote peer:
00612   send_control(MULTICAST_NAK, data);
00613 }
00614 
00615 
00616 void
00617 ReliableSession::nakack_received(ACE_Message_Block* control)
00618 {
00619   if (this->active_) return; // pub send nakack, doesn't receive them.
00620 
00621   const TransportHeader& header =
00622     this->link_->receive_strategy()->received_header();
00623 
00624   // Not from the remote peer for this session.
00625   if (this->remote_peer_ != header.source_) return;
00626 
00627   Serializer serializer(control, header.swap_bytes());
00628 
00629   SequenceNumber low;
00630   serializer >> low;
00631 
00632   // MULTICAST_NAKACK control samples indicate data which cannot be
00633   // repaired by a remote peer; if any values were needed below
00634   // this value, then the sequence needs to be shifted:
00635   std::vector<SequenceRange> dropped;
00636   SequenceNumber range_low =  SequenceNumber();
00637   SequenceNumber range_high = low == SequenceNumber() ? SequenceNumber() : low.previous();
00638 
00639   if (range_low == SequenceNumber() && range_high == SequenceNumber()) {
00640 
00641     this->nak_sequence_.insert(range_low);
00642 
00643   } else if (this->nak_sequence_.insert(SequenceRange(range_low, range_high), dropped)) {
00644 
00645     for (size_t i = 0; i < dropped.size(); ++i) {
00646       this->reassembly_.data_unavailable(dropped[i]);
00647     }
00648 
00649     if (DCPS_debug_level > 0) {
00650       ACE_ERROR((LM_WARNING,
00651                 ACE_TEXT("(%P|%t) WARNING: ReliableSession::nakack_received ")
00652                 ACE_TEXT("local %#08x%08x remote %#08x%08x with low [%q] ")
00653                 ACE_TEXT("- some ranges dropped.\n"),
00654                 (unsigned int)(this->link()->local_peer() >> 32),
00655                 (unsigned int) this->link()->local_peer(),
00656                 (unsigned int)(this->remote_peer_ >> 32),
00657                 (unsigned int) this->remote_peer_,
00658                 low.getValue()));
00659     }
00660   }
00661   deliver_held_data();
00662 }
00663 
00664 void
00665 ReliableSession::send_nakack(SequenceNumber low)
00666 {
00667   size_t len = sizeof(low.getValue());
00668 
00669   ACE_Message_Block* data;
00670   ACE_NEW(data, ACE_Message_Block(len));
00671 
00672   Serializer serializer(data);
00673 
00674   serializer << low;
00675   // Broadcast control sample to all peers:
00676   send_control(MULTICAST_NAKACK, data);
00677 }
00678 
00679 bool
00680 ReliableSession::start(bool active, bool acked)
00681 {
00682   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->start_lock_, false);
00683 
00684   if (this->started_) {
00685     return true;  // already started
00686   }
00687 
00688   this->active_  = active;
00689   {
00690     //can't call accept_datalink while holding lock due to possible reactor deadlock with passive_connection
00691     ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, this->reverse_start_lock_, false);
00692 
00693     // A watchdog timer is scheduled to periodically check for gaps in
00694     // received data. If a gap is discovered, MULTICAST_NAK control
00695     // samples will be sent to initiate repairs.
00696     // Only subscriber send naks so just schedule for sub role.
00697     if (!active) {
00698       if (acked) {
00699         this->set_acked();
00700       }
00701       if (!this->nak_watchdog_->schedule()) {
00702         ACE_ERROR_RETURN((LM_ERROR,
00703                           ACE_TEXT("(%P|%t) ERROR: ")
00704                           ACE_TEXT("ReliableSession::start: ")
00705                           ACE_TEXT("failed to schedule NAK watchdog!\n")),
00706                          false);
00707       }
00708     }
00709 
00710     // Active peers schedule a watchdog timer to initiate a 2-way
00711     // handshake to verify that passive endpoints can send/receive
00712     // data reliably. This process must be executed using the
00713     // transport reactor thread to prevent blocking.
00714     // Only publisher send syn so just schedule for pub role.
00715     if (active && !this->start_syn()) {
00716       this->nak_watchdog_->cancel();
00717       ACE_ERROR_RETURN((LM_ERROR,
00718                         ACE_TEXT("(%P|%t) ERROR: ")
00719                         ACE_TEXT("ReliableSession::start: ")
00720                         ACE_TEXT("failed to schedule SYN watchdog!\n")),
00721                        false);
00722     }
00723   } //Reacquire start_lock_ after releasing unlock_guard with release_start_lock_
00724 
00725   return this->started_ = true;
00726 }
00727 
00728 void
00729 ReliableSession::stop()
00730 {
00731   MulticastSession::stop();
00732   this->nak_watchdog_->cancel();
00733 }
00734 
00735 } // namespace DCPS
00736 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7