00001
00002
00003
00004
00005
00006
00007
00008 #include "ReliableSession.h"
00009
00010 #include "MulticastDataLink.h"
00011 #include "MulticastInst.h"
00012 #include "MulticastReceiveStrategy.h"
00013
00014 #include "ace/Global_Macros.h"
00015 #include "ace/Time_Value.h"
00016 #include "ace/Truncate.h"
00017
00018 #include "dds/DCPS/Serializer.h"
00019 #include "dds/DCPS/GuidConverter.h"
00020
00021 #include <cstdlib>
00022
00023 namespace OpenDDS {
00024 namespace DCPS {
00025
00026 NakWatchdog::NakWatchdog(ACE_Reactor* reactor,
00027 ACE_thread_t owner,
00028 ReliableSession* session)
00029 : DataLinkWatchdog(reactor, owner)
00030 , session_(session)
00031 {
00032 }
00033
00034 ACE_Time_Value
00035 NakWatchdog::next_interval()
00036 {
00037 MulticastInst* config = this->session_->link()->config();
00038 ACE_Time_Value interval(config->nak_interval_);
00039
00040
00041 interval *= static_cast<double>(std::rand()) /
00042 static_cast<double>(RAND_MAX) + 1.0;
00043
00044 return interval;
00045 }
00046
00047 void
00048 NakWatchdog::on_interval(const void* )
00049 {
00050
00051
00052
00053 this->session_->expire_naks();
00054
00055
00056
00057 this->session_->send_naks();
00058 }
00059
00060 ReliableSession::ReliableSession(ACE_Reactor* reactor,
00061 ACE_thread_t owner,
00062 MulticastDataLink* link,
00063 MulticastPeer remote_peer)
00064 : MulticastSession(reactor, owner, link, remote_peer),
00065 nak_watchdog_(new NakWatchdog (reactor, owner, this))
00066 {
00067 }
00068
00069 ReliableSession::~ReliableSession()
00070 {
00071 nak_watchdog_->cancel();
00072 nak_watchdog_->wait();
00073 nak_watchdog_->destroy();
00074 }
00075
00076 bool
00077 NakWatchdog::reactor_is_shut_down() const
00078 {
00079 return session_->link()->transport()->is_shut_down();
00080 }
00081
00082 bool
00083 ReliableSession::check_header(const TransportHeader& header)
00084 {
00085
00086 if (this->remote_peer_ != header.source_) return false;
00087
00088
00089 if (this->active_) return true;
00090
00091
00092
00093
00094 return this->nak_sequence_.insert(SequenceRange(
00095 header.sequence_ == 2 ? SequenceNumber() : header.sequence_,
00096 header.sequence_));
00097 }
00098
00099 void
00100 ReliableSession::record_header_received(const TransportHeader& header)
00101 {
00102
00103 if (this->remote_peer_ != header.source_) return;
00104
00105
00106 if (this->active_) return;
00107
00108
00109 this->nak_sequence_.insert(header.sequence_);
00110 deliver_held_data();
00111 }
00112
00113
00114 bool
00115 ReliableSession::ready_to_deliver(const TransportHeader& header,
00116 const ReceivedDataSample& data)
00117 {
00118 if (!acked()
00119 || nak_sequence_.disjoint()
00120 || (!nak_sequence_.empty() && nak_sequence_.cumulative_ack() != header.sequence_)
00121 || (!nak_sequence_.empty() && nak_sequence_.low() > 1)
00122 || (nak_sequence_.empty() && header.sequence_ > 1)) {
00123
00124 if (Transport_debug_level > 5) {
00125 GuidConverter writer(data.header_.publication_id_);
00126 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00127 ACE_TEXT(" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"),
00128 header.sequence_.getValue(),
00129 data.header_.sequence_.getValue(),
00130 OPENDDS_STRING(writer).c_str()));
00131 }
00132 {
00133 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, held_lock_, false);
00134
00135 held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.sequence_, data));
00136
00137 if (Transport_debug_level > 5) {
00138 OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator it = held_.begin();
00139 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00140 ACE_TEXT(" held_ data currently contains: %d samples\n"),
00141 held_.size()));
00142 while (it != held_.end()) {
00143 GuidConverter writer(it->second.header_.publication_id_);
00144 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00145 ACE_TEXT(" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"),
00146 it->first.getValue(),
00147 it->second.header_.sequence_.getValue(),
00148 OPENDDS_STRING(writer).c_str()));
00149 ++it;
00150 }
00151 }
00152 }
00153 deliver_held_data();
00154 return false;
00155 } else {
00156 if (Transport_debug_level > 5) {
00157 GuidConverter writer(data.header_.publication_id_);
00158 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00159 ACE_TEXT(" tseq: %q data seq: %q from %C OK to deliver\n"),
00160 header.sequence_.getValue(),
00161 data.header_.sequence_.getValue(),
00162 OPENDDS_STRING(writer).c_str()));
00163 }
00164 return true;
00165 }
00166 }
00167
00168 void
00169 ReliableSession::deliver_held_data()
00170 {
00171 if (nak_sequence_.empty() || nak_sequence_.low() > 1) return;
00172
00173 OPENDDS_VECTOR(ReceivedDataSample) to_deliver;
00174 const SequenceNumber ca = nak_sequence_.cumulative_ack();
00175
00176 {
00177 ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_);
00178
00179 typedef OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator iter;
00180 const iter end = this->held_.upper_bound(ca);
00181 for (iter it = this->held_.begin(); it != end; ) {
00182 if (Transport_debug_level > 5) {
00183 GuidConverter writer(it->second.header_.publication_id_);
00184 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::deliver_held_data -")
00185 ACE_TEXT(" deliver tseq: %q dseq: %q from %C\n"),
00186 it->first.getValue(),
00187 it->second.header_.sequence_.getValue(),
00188 OPENDDS_STRING(writer).c_str()));
00189 }
00190 to_deliver.push_back(it->second);
00191 this->held_.erase(it++);
00192 }
00193 }
00194 for (size_t i = 0; i < to_deliver.size(); ++i) {
00195 this->link_->data_received(to_deliver.at(i));
00196 }
00197 }
00198
00199 void
00200 ReliableSession::release_remote(const RepoId& remote)
00201 {
00202 ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_);
00203 if (!held_.empty()) {
00204 OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample)::iterator it = held_.begin();
00205 while (it != held_.end()) {
00206 if (it->second.header_.publication_id_ == remote) {
00207 held_.erase(it++);
00208 } else {
00209 it++;
00210 }
00211 }
00212 }
00213 }
00214
00215
00216 bool
00217 ReliableSession::control_received(char submessage_id,
00218 ACE_Message_Block* control)
00219 {
00220 if (MulticastSession::control_received(submessage_id, control)) {
00221 return true;
00222 }
00223
00224 switch (submessage_id) {
00225 case MULTICAST_NAK:
00226 nak_received(control);
00227 break;
00228
00229 case MULTICAST_NAKACK:
00230 nakack_received(control);
00231 break;
00232
00233 default:
00234 ACE_ERROR((LM_WARNING,
00235 ACE_TEXT("(%P|%t) WARNING: ")
00236 ACE_TEXT("ReliableSession::control_received: ")
00237 ACE_TEXT("unknown TRANSPORT_CONTROL submessage: 0x%x!\n"),
00238 submessage_id));
00239 break;
00240 }
00241 return true;
00242 }
00243
00244 void
00245 ReliableSession::syn_hook(const SequenceNumber& seq)
00246 {
00247 const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
00248 this->nak_sequence_.reset();
00249 this->nak_sequence_.insert(seq);
00250
00251 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00252 iter != ranges.end(); ++iter) {
00253 this->nak_sequence_.insert(SequenceRange(iter->first, iter->second));
00254 }
00255 }
00256
00257 void
00258 ReliableSession::expire_naks()
00259 {
00260 if (this->nak_requests_.empty()) return;
00261
00262 ACE_Time_Value deadline(ACE_OS::gettimeofday());
00263 deadline -= this->link_->config()->nak_timeout_;
00264
00265 NakRequestMap::iterator first(this->nak_requests_.begin());
00266 NakRequestMap::iterator last(this->nak_requests_.upper_bound(deadline));
00267
00268 if (first == last) return;
00269
00270
00271
00272 SequenceNumber lastSeq = (last == this->nak_requests_.end())
00273 ? this->nak_requests_.rbegin()->second
00274 : last->second;
00275
00276 std::vector<SequenceRange> dropped;
00277 if (this->nak_sequence_.insert(SequenceRange(this->nak_sequence_.low(),
00278 lastSeq), dropped)) {
00279
00280 for (size_t i = 0; i < dropped.size(); ++i) {
00281 this->reassembly_.data_unavailable(dropped[i]);
00282 }
00283
00284 ACE_ERROR((LM_WARNING,
00285 ACE_TEXT("(%P|%t) WARNING: ")
00286 ACE_TEXT("ReliableSession::expire_naks: ")
00287 ACE_TEXT("timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"),
00288 (unsigned int)(this->remote_peer_ >> 32),
00289 (unsigned int) this->remote_peer_,
00290 this->nak_sequence_.low().getValue(),
00291 lastSeq.getValue()));
00292 }
00293
00294
00295 this->nak_requests_.erase(first, last);
00296 deliver_held_data();
00297 }
00298
00299 void
00300 ReliableSession::send_naks()
00301 {
00302
00303
00304 if (!this->acked()) {
00305 if (DCPS_debug_level > 5) {
00306 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00307 ACE_TEXT("remote %#08x%08x session NOT acked yet, don't send naks\n"),
00308 (unsigned int)(this->link()->local_peer() >> 32),
00309 (unsigned int) this->link()->local_peer(),
00310 (unsigned int)(this->remote_peer_ >> 32),
00311 (unsigned int) this->remote_peer_));
00312 }
00313 return;
00314 }
00315
00316 if (DCPS_debug_level > 5) {
00317 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00318 ACE_TEXT("remote %#08x%08x nak request size %d \n"),
00319 (unsigned int)(this->link()->local_peer() >> 32),
00320 (unsigned int) this->link()->local_peer(),
00321 (unsigned int)(this->remote_peer_ >> 32),
00322 (unsigned int) this->remote_peer_,
00323 this->nak_requests_.size()));
00324 }
00325
00326 if (!(this->nak_sequence_.low() > 1) && !this->nak_sequence_.disjoint()) {
00327 if (DCPS_debug_level > 5) {
00328 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00329 ACE_TEXT("remote %#08x%08x nak sequence not disjoint, don't send naks \n"),
00330 (unsigned int)(this->link()->local_peer() >> 32),
00331 (unsigned int) this->link()->local_peer(),
00332 (unsigned int)(this->remote_peer_ >> 32),
00333 (unsigned int) this->remote_peer_));
00334 }
00335
00336 if (DCPS_debug_level > 9) {
00337 const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
00338 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00339 iter != ranges.end(); ++iter) {
00340 ACE_DEBUG((LM_DEBUG, "(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n",
00341 (unsigned int)(this->link()->local_peer() >> 32),
00342 (unsigned int) this->link()->local_peer(),
00343 (unsigned int)(this->remote_peer_ >> 32),
00344 (unsigned int) this->remote_peer_,
00345 iter->first.getValue(),
00346 iter->second.getValue()));
00347 }
00348 }
00349 return;
00350 }
00351
00352 ACE_Time_Value now(ACE_OS::gettimeofday());
00353
00354
00355
00356
00357 if (this->nak_sequence_.low() > 1) {
00358 this->nak_requests_[now] = SequenceNumber();
00359 } else {
00360 this->nak_requests_[now] = this->nak_sequence_.cumulative_ack();
00361 }
00362
00363 typedef std::vector<SequenceRange> RangeVector;
00364 RangeVector ignored;
00365
00366
00367 SequenceNumber first;
00368 SequenceNumber second;
00369
00370 NakRequestMap::reverse_iterator itr(this->nak_requests_.rbegin());
00371
00372 if (this->nak_requests_.size() > 1) {
00373
00374 ++itr;
00375
00376 size_t nak_delay_intervals = this->link()->config()->nak_delay_intervals_;
00377 size_t nak_max = this->link()->config()->nak_max_;
00378 size_t sz = this->nak_requests_.size();
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 for (size_t i = 1; i < sz; ++i) {
00389 if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) {
00390 if (first != SequenceNumber()) {
00391 first = this->nak_requests_.begin()->second;
00392 }
00393 else {
00394 ignored.push_back(std::make_pair(this->nak_requests_.begin()->second, itr->second));
00395 }
00396 break;
00397 }
00398
00399 if (i % (nak_delay_intervals + 1) == 1) {
00400 second = itr->second;
00401 }
00402 if (second != SequenceNumber()) {
00403 first = itr->second;
00404 }
00405
00406 if (i % (nak_delay_intervals + 1) == 0) {
00407 first = itr->second;
00408
00409 if (first != SequenceNumber() && second != SequenceNumber()) {
00410 ignored.push_back(std::make_pair(first, second));
00411 first = SequenceNumber();
00412 second = SequenceNumber();
00413 }
00414 }
00415
00416 ++itr;
00417 }
00418
00419 if (first != SequenceNumber() && second != SequenceNumber() && first != second) {
00420 ignored.push_back(std::make_pair(first, second));
00421 }
00422 }
00423
00424
00425 DisjointSequence received(this->nak_sequence_);
00426 if (DCPS_debug_level > 0) {
00427 received.dump();
00428 }
00429
00430 size_t sz = ignored.size();
00431 for (size_t i = 0; i < sz; ++i) {
00432
00433 if (ignored[i].second > received.cumulative_ack()) {
00434 SequenceNumber high = ignored[i].second;
00435 SequenceNumber low = ignored[i].first;
00436 if (low < received.cumulative_ack()) {
00437 low = received.cumulative_ack();
00438 }
00439
00440 if (DCPS_debug_level > 5) {
00441 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00442 ACE_TEXT("remote %#08x%08x ignore missing [%q - %q]\n"),
00443 (unsigned int)(this->link()->local_peer() >> 32),
00444 (unsigned int) this->link()->local_peer(),
00445 (unsigned int)(this->remote_peer_ >> 32),
00446 (unsigned int) this->remote_peer_,
00447 low.getValue(), high.getValue()));
00448 }
00449
00450
00451 received.insert(SequenceRange(low, high));
00452 }
00453 }
00454
00455 for (NakPeerSet::iterator it(this->nak_peers_.begin());
00456 it != this->nak_peers_.end(); ++it) {
00457
00458
00459 received.insert(*it);
00460 }
00461 bool sending_naks = false;
00462 if (received.low() > 1){
00463
00464
00465 sending_naks = true;
00466 std::vector<SequenceRange> ranges;
00467 ranges.push_back(SequenceRange(SequenceNumber(), received.low()));
00468
00469 CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
00470
00471 size_t len = sizeof(this->remote_peer_)
00472 + sizeof(size)
00473 + size * 2 * sizeof(SequenceNumber);
00474
00475 ACE_Message_Block* data;
00476 ACE_NEW(data, ACE_Message_Block(len));
00477
00478 Serializer serializer(data);
00479
00480 serializer << this->remote_peer_;
00481 serializer << size;
00482 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00483 iter != ranges.end(); ++iter) {
00484 serializer << iter->first;
00485 serializer << iter->second;
00486 if (DCPS_debug_level > 0) {
00487 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks ")
00488 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
00489 (unsigned int)(this->link()->local_peer() >> 32),
00490 (unsigned int) this->link()->local_peer(),
00491 (unsigned int)(this->remote_peer_ >> 32),
00492 (unsigned int) this->remote_peer_,
00493 iter->first.getValue(), iter->second.getValue()));
00494 }
00495 }
00496
00497 send_control(MULTICAST_NAK, data);
00498 }
00499 if (received.disjoint()) {
00500 sending_naks = true;
00501 send_naks(received);
00502 }
00503
00504 if (!sending_naks && DCPS_debug_level > 5){
00505 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00506 ACE_TEXT("remote %#08x%08x received sequence not disjoint, don't send naks \n"),
00507 (unsigned int)(this->link()->local_peer() >> 32),
00508 (unsigned int) this->link()->local_peer(),
00509 (unsigned int)(this->remote_peer_ >> 32),
00510 (unsigned int) this->remote_peer_));
00511 }
00512
00513
00514 this->nak_peers_.clear();
00515 }
00516
00517 void
00518 ReliableSession::nak_received(ACE_Message_Block* control)
00519 {
00520 if (!this->active_) return;
00521
00522 const TransportHeader& header =
00523 this->link_->receive_strategy()->received_header();
00524
00525 Serializer serializer(control, header.swap_bytes());
00526
00527 MulticastPeer local_peer;
00528 CORBA::ULong size = 0;
00529 serializer >> local_peer;
00530 serializer >> size;
00531
00532 std::vector<SequenceRange> ranges;
00533
00534 for (CORBA::ULong i = 0; i < size; ++i) {
00535 SequenceRange range;
00536 serializer >> range.first;
00537 serializer >> range.second;
00538 ranges.push_back(range);
00539 }
00540
00541
00542 if ((local_peer != this->link_->local_peer())
00543 || (this->remote_peer_ != header.source_)) return;
00544
00545 SingleSendBuffer* send_buffer = this->link_->send_buffer();
00546
00547
00548
00549 if (!send_buffer->empty() && send_buffer->low() > ranges.begin()->first) {
00550 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00551 ACE_DEBUG ((LM_DEBUG,
00552 ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
00553 ACE_TEXT (" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"),
00554 (unsigned int)(this->link()->local_peer() >> 32),
00555 (unsigned int) this->link()->local_peer(),
00556 (unsigned int)(this->remote_peer_ >> 32),
00557 (unsigned int) this->remote_peer_,
00558 send_buffer->low().getValue()));
00559 }
00560 send_nakack(send_buffer->low());
00561 }
00562
00563 for (CORBA::ULong i = 0; i < size; ++i) {
00564 bool ret = send_buffer->resend(ranges[i]);
00565 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00566 ACE_DEBUG ((LM_DEBUG,
00567 ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
00568 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"),
00569 (unsigned int)(this->link()->local_peer() >> 32),
00570 (unsigned int) this->link()->local_peer(),
00571 (unsigned int)(this->remote_peer_ >> 32),
00572 (unsigned int) this->remote_peer_,
00573 ranges[i].first.getValue(), ranges[i].second.getValue(),
00574 ret ? "SUCCESS" : "FAILED"));
00575 }
00576 }
00577 }
00578
00579 void
00580 ReliableSession::send_naks(DisjointSequence& received)
00581 {
00582 const std::vector<SequenceRange> ranges(received.missing_sequence_ranges());
00583
00584 CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
00585
00586 size_t len = sizeof(this->remote_peer_)
00587 + sizeof(size)
00588 + size * 2 * sizeof(SequenceNumber);
00589
00590 ACE_Message_Block* data;
00591 ACE_NEW(data, ACE_Message_Block(len));
00592
00593 Serializer serializer(data);
00594
00595 serializer << this->remote_peer_;
00596 serializer << size;
00597 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00598 iter != ranges.end(); ++iter) {
00599 serializer << iter->first;
00600 serializer << iter->second;
00601 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00602 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks (Disjoint) ")
00603 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
00604 (unsigned int)(this->link()->local_peer() >> 32),
00605 (unsigned int) this->link()->local_peer(),
00606 (unsigned int)(this->remote_peer_ >> 32),
00607 (unsigned int) this->remote_peer_,
00608 iter->first.getValue(), iter->second.getValue()));
00609 }
00610 }
00611
00612 send_control(MULTICAST_NAK, data);
00613 }
00614
00615
00616 void
00617 ReliableSession::nakack_received(ACE_Message_Block* control)
00618 {
00619 if (this->active_) return;
00620
00621 const TransportHeader& header =
00622 this->link_->receive_strategy()->received_header();
00623
00624
00625 if (this->remote_peer_ != header.source_) return;
00626
00627 Serializer serializer(control, header.swap_bytes());
00628
00629 SequenceNumber low;
00630 serializer >> low;
00631
00632
00633
00634
00635 std::vector<SequenceRange> dropped;
00636 SequenceNumber range_low = SequenceNumber();
00637 SequenceNumber range_high = low == SequenceNumber() ? SequenceNumber() : low.previous();
00638
00639 if (range_low == SequenceNumber() && range_high == SequenceNumber()) {
00640
00641 this->nak_sequence_.insert(range_low);
00642
00643 } else if (this->nak_sequence_.insert(SequenceRange(range_low, range_high), dropped)) {
00644
00645 for (size_t i = 0; i < dropped.size(); ++i) {
00646 this->reassembly_.data_unavailable(dropped[i]);
00647 }
00648
00649 if (DCPS_debug_level > 0) {
00650 ACE_ERROR((LM_WARNING,
00651 ACE_TEXT("(%P|%t) WARNING: ReliableSession::nakack_received ")
00652 ACE_TEXT("local %#08x%08x remote %#08x%08x with low [%q] ")
00653 ACE_TEXT("- some ranges dropped.\n"),
00654 (unsigned int)(this->link()->local_peer() >> 32),
00655 (unsigned int) this->link()->local_peer(),
00656 (unsigned int)(this->remote_peer_ >> 32),
00657 (unsigned int) this->remote_peer_,
00658 low.getValue()));
00659 }
00660 }
00661 deliver_held_data();
00662 }
00663
00664 void
00665 ReliableSession::send_nakack(SequenceNumber low)
00666 {
00667 size_t len = sizeof(low.getValue());
00668
00669 ACE_Message_Block* data;
00670 ACE_NEW(data, ACE_Message_Block(len));
00671
00672 Serializer serializer(data);
00673
00674 serializer << low;
00675
00676 send_control(MULTICAST_NAKACK, data);
00677 }
00678
00679 bool
00680 ReliableSession::start(bool active, bool acked)
00681 {
00682 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->start_lock_, false);
00683
00684 if (this->started_) {
00685 return true;
00686 }
00687
00688 this->active_ = active;
00689 {
00690
00691 ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, this->reverse_start_lock_, false);
00692
00693
00694
00695
00696
00697 if (!active) {
00698 if (acked) {
00699 this->set_acked();
00700 }
00701 if (!this->nak_watchdog_->schedule()) {
00702 ACE_ERROR_RETURN((LM_ERROR,
00703 ACE_TEXT("(%P|%t) ERROR: ")
00704 ACE_TEXT("ReliableSession::start: ")
00705 ACE_TEXT("failed to schedule NAK watchdog!\n")),
00706 false);
00707 }
00708 }
00709
00710
00711
00712
00713
00714
00715 if (active && !this->start_syn()) {
00716 this->nak_watchdog_->cancel();
00717 ACE_ERROR_RETURN((LM_ERROR,
00718 ACE_TEXT("(%P|%t) ERROR: ")
00719 ACE_TEXT("ReliableSession::start: ")
00720 ACE_TEXT("failed to schedule SYN watchdog!\n")),
00721 false);
00722 }
00723 }
00724
00725 return this->started_ = true;
00726 }
00727
00728 void
00729 ReliableSession::stop()
00730 {
00731 MulticastSession::stop();
00732 this->nak_watchdog_->cancel();
00733 }
00734
00735 }
00736 }