00001
00002
00003
00004
00005
00006
00007
00008 #include "ReliableSession.h"
00009
00010 #include "MulticastDataLink.h"
00011 #include "MulticastInst.h"
00012 #include "MulticastReceiveStrategy.h"
00013
00014 #include "ace/Global_Macros.h"
00015 #include "ace/Time_Value.h"
00016 #include "ace/Truncate.h"
00017
00018 #include "dds/DCPS/Serializer.h"
00019 #include "dds/DCPS/GuidConverter.h"
00020
00021 #include <cstdlib>
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 namespace OpenDDS {
00026 namespace DCPS {
00027
00028 NakWatchdog::NakWatchdog(ACE_Reactor* reactor,
00029 ACE_thread_t owner,
00030 ReliableSession* session)
00031 : DataLinkWatchdog(reactor, owner)
00032 , session_(session)
00033 {
00034 }
00035
00036 ACE_Time_Value
00037 NakWatchdog::next_interval()
00038 {
00039 ACE_Time_Value interval(this->session_->link()->config().nak_interval_);
00040
00041
00042 interval *= static_cast<double>(std::rand()) /
00043 static_cast<double>(RAND_MAX) + 1.0;
00044
00045 return interval;
00046 }
00047
00048 void
00049 NakWatchdog::on_interval(const void* )
00050 {
00051
00052
00053
00054 this->session_->expire_naks();
00055
00056
00057
00058 this->session_->send_naks();
00059 }
00060
00061 ReliableSession::ReliableSession(ACE_Reactor* reactor,
00062 ACE_thread_t owner,
00063 MulticastDataLink* link,
00064 MulticastPeer remote_peer)
00065 : MulticastSession(reactor, owner, link, remote_peer),
00066 nak_watchdog_(make_rch<NakWatchdog> (reactor, owner, this))
00067 {
00068 }
00069
00070 ReliableSession::~ReliableSession()
00071 {
00072 nak_watchdog_->cancel();
00073 nak_watchdog_->wait();
00074 }
00075
00076 bool
00077 NakWatchdog::reactor_is_shut_down() const
00078 {
00079 return session_->link()->transport().is_shut_down();
00080 }
00081
00082 bool
00083 ReliableSession::check_header(const TransportHeader& header)
00084 {
00085
00086 if (this->remote_peer_ != header.source_) return false;
00087
00088
00089 if (this->active_) return true;
00090
00091
00092
00093
00094 return this->nak_sequence_.insert(SequenceRange(
00095 header.sequence_ == 2 ? SequenceNumber() : header.sequence_,
00096 header.sequence_));
00097 }
00098
00099 void
00100 ReliableSession::record_header_received(const TransportHeader& header)
00101 {
00102
00103 if (this->remote_peer_ != header.source_) return;
00104
00105
00106 if (this->active_) return;
00107
00108
00109 this->nak_sequence_.insert(header.sequence_);
00110 deliver_held_data();
00111 }
00112
00113
00114 bool
00115 ReliableSession::ready_to_deliver(const TransportHeader& header,
00116 const ReceivedDataSample& data)
00117 {
00118 if (!acked()
00119 || nak_sequence_.disjoint()
00120 || (!nak_sequence_.empty() && nak_sequence_.cumulative_ack() != header.sequence_)
00121 || (!nak_sequence_.empty() && nak_sequence_.low() > 1)
00122 || (nak_sequence_.empty() && header.sequence_ > 1)) {
00123
00124 if (Transport_debug_level > 5) {
00125 GuidConverter writer(data.header_.publication_id_);
00126 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00127 ACE_TEXT(" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"),
00128 header.sequence_.getValue(),
00129 data.header_.sequence_.getValue(),
00130 OPENDDS_STRING(writer).c_str()));
00131 }
00132 {
00133 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, held_lock_, false);
00134
00135 held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.sequence_, data));
00136
00137 if (Transport_debug_level > 5) {
00138 OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator it = held_.begin();
00139 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00140 ACE_TEXT(" held_ data currently contains: %d samples\n"),
00141 held_.size()));
00142 while (it != held_.end()) {
00143 GuidConverter writer(it->second.header_.publication_id_);
00144 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00145 ACE_TEXT(" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"),
00146 it->first.getValue(),
00147 it->second.header_.sequence_.getValue(),
00148 OPENDDS_STRING(writer).c_str()));
00149 ++it;
00150 }
00151 }
00152 }
00153 deliver_held_data();
00154 return false;
00155 } else {
00156 if (Transport_debug_level > 5) {
00157 GuidConverter writer(data.header_.publication_id_);
00158 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
00159 ACE_TEXT(" tseq: %q data seq: %q from %C OK to deliver\n"),
00160 header.sequence_.getValue(),
00161 data.header_.sequence_.getValue(),
00162 OPENDDS_STRING(writer).c_str()));
00163 }
00164 return true;
00165 }
00166 }
00167
00168 void
00169 ReliableSession::deliver_held_data()
00170 {
00171 if (nak_sequence_.empty() || nak_sequence_.low() > 1) return;
00172
00173 OPENDDS_VECTOR(ReceivedDataSample) to_deliver;
00174 const SequenceNumber ca = nak_sequence_.cumulative_ack();
00175
00176 {
00177 ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_);
00178
00179 typedef OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator iter;
00180 const iter end = this->held_.upper_bound(ca);
00181 for (iter it = this->held_.begin(); it != end; ) {
00182 if (Transport_debug_level > 5) {
00183 GuidConverter writer(it->second.header_.publication_id_);
00184 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::deliver_held_data -")
00185 ACE_TEXT(" deliver tseq: %q dseq: %q from %C\n"),
00186 it->first.getValue(),
00187 it->second.header_.sequence_.getValue(),
00188 OPENDDS_STRING(writer).c_str()));
00189 }
00190 to_deliver.push_back(it->second);
00191 this->held_.erase(it++);
00192 }
00193 }
00194 for (size_t i = 0; i < to_deliver.size(); ++i) {
00195 this->link_->data_received(to_deliver.at(i));
00196 }
00197 }
00198
00199 void
00200 ReliableSession::release_remote(const RepoId& remote)
00201 {
00202 ACE_GUARD(ACE_Thread_Mutex, guard, held_lock_);
00203 if (!held_.empty()) {
00204 OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample)::iterator it = held_.begin();
00205 while (it != held_.end()) {
00206 if (it->second.header_.publication_id_ == remote) {
00207 held_.erase(it++);
00208 } else {
00209 it++;
00210 }
00211 }
00212 }
00213 }
00214
00215
00216 bool
00217 ReliableSession::control_received(char submessage_id,
00218 const Message_Block_Ptr& control)
00219 {
00220 if (MulticastSession::control_received(submessage_id, control)) {
00221 return true;
00222 }
00223
00224 switch (submessage_id) {
00225 case MULTICAST_NAK:
00226 nak_received(control);
00227 break;
00228
00229 case MULTICAST_NAKACK:
00230 nakack_received(control);
00231 break;
00232
00233 default:
00234 ACE_ERROR((LM_WARNING,
00235 ACE_TEXT("(%P|%t) WARNING: ")
00236 ACE_TEXT("ReliableSession::control_received: ")
00237 ACE_TEXT("unknown TRANSPORT_CONTROL submessage: 0x%x!\n"),
00238 submessage_id));
00239 break;
00240 }
00241 return true;
00242 }
00243
00244 void
00245 ReliableSession::syn_hook(const SequenceNumber& seq)
00246 {
00247 const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
00248 this->nak_sequence_.reset();
00249 this->nak_sequence_.insert(seq);
00250
00251 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00252 iter != ranges.end(); ++iter) {
00253 this->nak_sequence_.insert(SequenceRange(iter->first, iter->second));
00254 }
00255 }
00256
00257 void
00258 ReliableSession::expire_naks()
00259 {
00260 if (this->nak_requests_.empty()) return;
00261
00262 ACE_Time_Value deadline(ACE_OS::gettimeofday());
00263 deadline -= this->link_->config().nak_timeout_;
00264
00265 NakRequestMap::iterator first(this->nak_requests_.begin());
00266 NakRequestMap::iterator last(this->nak_requests_.upper_bound(deadline));
00267
00268 if (first == last) return;
00269
00270
00271
00272 SequenceNumber lastSeq = (last == this->nak_requests_.end())
00273 ? this->nak_requests_.rbegin()->second
00274 : last->second;
00275
00276 std::vector<SequenceRange> dropped;
00277 if (this->nak_sequence_.insert(SequenceRange(this->nak_sequence_.low(),
00278 lastSeq), dropped)) {
00279
00280 for (size_t i = 0; i < dropped.size(); ++i) {
00281 this->reassembly_.data_unavailable(dropped[i]);
00282 }
00283
00284 ACE_ERROR((LM_WARNING,
00285 ACE_TEXT("(%P|%t) WARNING: ")
00286 ACE_TEXT("ReliableSession::expire_naks: ")
00287 ACE_TEXT("timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"),
00288 (unsigned int)(this->remote_peer_ >> 32),
00289 (unsigned int) this->remote_peer_,
00290 this->nak_sequence_.low().getValue(),
00291 lastSeq.getValue()));
00292 }
00293
00294
00295 this->nak_requests_.erase(first, last);
00296 deliver_held_data();
00297 }
00298
00299 void
00300 ReliableSession::send_naks()
00301 {
00302
00303
00304 if (!this->acked()) {
00305 if (DCPS_debug_level > 5) {
00306 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00307 ACE_TEXT("remote %#08x%08x session NOT acked yet, don't send naks\n"),
00308 (unsigned int)(this->link()->local_peer() >> 32),
00309 (unsigned int) this->link()->local_peer(),
00310 (unsigned int)(this->remote_peer_ >> 32),
00311 (unsigned int) this->remote_peer_));
00312 }
00313 return;
00314 }
00315
00316 if (DCPS_debug_level > 5) {
00317 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00318 ACE_TEXT("remote %#08x%08x nak request size %d \n"),
00319 (unsigned int)(this->link()->local_peer() >> 32),
00320 (unsigned int) this->link()->local_peer(),
00321 (unsigned int)(this->remote_peer_ >> 32),
00322 (unsigned int) this->remote_peer_,
00323 this->nak_requests_.size()));
00324 }
00325
00326 if (!(this->nak_sequence_.low() > 1) && !this->nak_sequence_.disjoint()) {
00327 if (DCPS_debug_level > 5) {
00328 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00329 ACE_TEXT("remote %#08x%08x nak sequence not disjoint, don't send naks \n"),
00330 (unsigned int)(this->link()->local_peer() >> 32),
00331 (unsigned int) this->link()->local_peer(),
00332 (unsigned int)(this->remote_peer_ >> 32),
00333 (unsigned int) this->remote_peer_));
00334 }
00335
00336 if (DCPS_debug_level > 9) {
00337 const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
00338 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00339 iter != ranges.end(); ++iter) {
00340 ACE_DEBUG((LM_DEBUG, "(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n",
00341 (unsigned int)(this->link()->local_peer() >> 32),
00342 (unsigned int) this->link()->local_peer(),
00343 (unsigned int)(this->remote_peer_ >> 32),
00344 (unsigned int) this->remote_peer_,
00345 iter->first.getValue(),
00346 iter->second.getValue()));
00347 }
00348 }
00349 return;
00350 }
00351
00352 ACE_Time_Value now(ACE_OS::gettimeofday());
00353
00354
00355
00356
00357 if (this->nak_sequence_.low() > 1) {
00358 this->nak_requests_[now] = SequenceNumber();
00359 } else {
00360 this->nak_requests_[now] = this->nak_sequence_.cumulative_ack();
00361 }
00362
00363 typedef std::vector<SequenceRange> RangeVector;
00364 RangeVector ignored;
00365
00366
00367 SequenceNumber first;
00368 SequenceNumber second;
00369
00370 NakRequestMap::reverse_iterator itr(this->nak_requests_.rbegin());
00371
00372 if (this->nak_requests_.size() > 1) {
00373
00374 ++itr;
00375
00376 size_t nak_delay_intervals = this->link()->config().nak_delay_intervals_;
00377 size_t nak_max = this->link()->config().nak_max_;
00378 size_t sz = this->nak_requests_.size();
00379
00380
00381
00382
00383
00384
00385
00386
00387
00388 for (size_t i = 1; i < sz; ++i) {
00389 if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) {
00390 if (first != SequenceNumber()) {
00391 first = this->nak_requests_.begin()->second;
00392 }
00393 else {
00394 ignored.push_back(std::make_pair(this->nak_requests_.begin()->second, itr->second));
00395 }
00396 break;
00397 }
00398
00399 if (i % (nak_delay_intervals + 1) == 1) {
00400 second = itr->second;
00401 }
00402 if (second != SequenceNumber()) {
00403 first = itr->second;
00404 }
00405
00406 if (i % (nak_delay_intervals + 1) == 0) {
00407 first = itr->second;
00408
00409 if (first != SequenceNumber() && second != SequenceNumber()) {
00410 ignored.push_back(std::make_pair(first, second));
00411 first = SequenceNumber();
00412 second = SequenceNumber();
00413 }
00414 }
00415
00416 ++itr;
00417 }
00418
00419 if (first != SequenceNumber() && second != SequenceNumber() && first != second) {
00420 ignored.push_back(std::make_pair(first, second));
00421 }
00422 }
00423
00424
00425 DisjointSequence received(this->nak_sequence_);
00426 if (DCPS_debug_level > 0) {
00427 received.dump();
00428 }
00429
00430 size_t sz = ignored.size();
00431 for (size_t i = 0; i < sz; ++i) {
00432
00433 if (ignored[i].second > received.cumulative_ack()) {
00434 SequenceNumber high = ignored[i].second;
00435 SequenceNumber low = ignored[i].first;
00436 if (low < received.cumulative_ack()) {
00437 low = received.cumulative_ack();
00438 }
00439
00440 if (DCPS_debug_level > 5) {
00441 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00442 ACE_TEXT("remote %#08x%08x ignore missing [%q - %q]\n"),
00443 (unsigned int)(this->link()->local_peer() >> 32),
00444 (unsigned int) this->link()->local_peer(),
00445 (unsigned int)(this->remote_peer_ >> 32),
00446 (unsigned int) this->remote_peer_,
00447 low.getValue(), high.getValue()));
00448 }
00449
00450
00451 received.insert(SequenceRange(low, high));
00452 }
00453 }
00454
00455 for (NakPeerSet::iterator it(this->nak_peers_.begin());
00456 it != this->nak_peers_.end(); ++it) {
00457
00458
00459 received.insert(*it);
00460 }
00461 bool sending_naks = false;
00462 if (received.low() > 1){
00463
00464
00465 sending_naks = true;
00466 std::vector<SequenceRange> ranges;
00467 ranges.push_back(SequenceRange(SequenceNumber(), received.low()));
00468
00469 CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
00470
00471 size_t len = sizeof(this->remote_peer_)
00472 + sizeof(size)
00473 + size * 2 * sizeof(SequenceNumber);
00474
00475 Message_Block_Ptr data(new ACE_Message_Block(len));
00476
00477 Serializer serializer(data.get());
00478
00479 serializer << this->remote_peer_;
00480 serializer << size;
00481 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00482 iter != ranges.end(); ++iter) {
00483 serializer << iter->first;
00484 serializer << iter->second;
00485 if (DCPS_debug_level > 0) {
00486 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks ")
00487 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
00488 (unsigned int)(this->link()->local_peer() >> 32),
00489 (unsigned int) this->link()->local_peer(),
00490 (unsigned int)(this->remote_peer_ >> 32),
00491 (unsigned int) this->remote_peer_,
00492 iter->first.getValue(), iter->second.getValue()));
00493 }
00494 }
00495
00496 send_control(MULTICAST_NAK, move(data));
00497 }
00498 if (received.disjoint()) {
00499 sending_naks = true;
00500 send_naks(received);
00501 }
00502
00503 if (!sending_naks && DCPS_debug_level > 5){
00504 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
00505 ACE_TEXT("remote %#08x%08x received sequence not disjoint, don't send naks \n"),
00506 (unsigned int)(this->link()->local_peer() >> 32),
00507 (unsigned int) this->link()->local_peer(),
00508 (unsigned int)(this->remote_peer_ >> 32),
00509 (unsigned int) this->remote_peer_));
00510 }
00511
00512
00513 this->nak_peers_.clear();
00514 }
00515
00516 void
00517 ReliableSession::nak_received(const Message_Block_Ptr& control)
00518 {
00519 if (!this->active_) return;
00520
00521 const TransportHeader& header =
00522 this->link_->receive_strategy()->received_header();
00523
00524 Serializer serializer(control.get(), header.swap_bytes());
00525
00526 MulticastPeer local_peer;
00527 CORBA::ULong size = 0;
00528 serializer >> local_peer;
00529 serializer >> size;
00530
00531 std::vector<SequenceRange> ranges;
00532
00533 for (CORBA::ULong i = 0; i < size; ++i) {
00534 SequenceRange range;
00535 serializer >> range.first;
00536 serializer >> range.second;
00537 ranges.push_back(range);
00538 }
00539
00540
00541 if ((local_peer != this->link_->local_peer())
00542 || (this->remote_peer_ != header.source_)) return;
00543
00544 SingleSendBuffer* send_buffer = this->link_->send_buffer();
00545
00546
00547
00548 if (!send_buffer->empty() && send_buffer->low() > ranges.begin()->first) {
00549 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00550 ACE_DEBUG ((LM_DEBUG,
00551 ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
00552 ACE_TEXT (" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"),
00553 (unsigned int)(this->link()->local_peer() >> 32),
00554 (unsigned int) this->link()->local_peer(),
00555 (unsigned int)(this->remote_peer_ >> 32),
00556 (unsigned int) this->remote_peer_,
00557 send_buffer->low().getValue()));
00558 }
00559 send_nakack(send_buffer->low());
00560 }
00561
00562 for (CORBA::ULong i = 0; i < size; ++i) {
00563 bool ret = send_buffer->resend(ranges[i]);
00564 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00565 ACE_DEBUG ((LM_DEBUG,
00566 ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
00567 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"),
00568 (unsigned int)(this->link()->local_peer() >> 32),
00569 (unsigned int) this->link()->local_peer(),
00570 (unsigned int)(this->remote_peer_ >> 32),
00571 (unsigned int) this->remote_peer_,
00572 ranges[i].first.getValue(), ranges[i].second.getValue(),
00573 ret ? "SUCCESS" : "FAILED"));
00574 }
00575 }
00576 }
00577
00578 void
00579 ReliableSession::send_naks(DisjointSequence& received)
00580 {
00581 const std::vector<SequenceRange> ranges(received.missing_sequence_ranges());
00582
00583 CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
00584
00585 size_t len = sizeof(this->remote_peer_)
00586 + sizeof(size)
00587 + size * 2 * sizeof(SequenceNumber);
00588
00589 Message_Block_Ptr data(new ACE_Message_Block(len));
00590
00591 Serializer serializer(data.get());
00592
00593 serializer << this->remote_peer_;
00594 serializer << size;
00595 for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
00596 iter != ranges.end(); ++iter) {
00597 serializer << iter->first;
00598 serializer << iter->second;
00599 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00600 ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks (Disjoint) ")
00601 ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
00602 (unsigned int)(this->link()->local_peer() >> 32),
00603 (unsigned int) this->link()->local_peer(),
00604 (unsigned int)(this->remote_peer_ >> 32),
00605 (unsigned int) this->remote_peer_,
00606 iter->first.getValue(), iter->second.getValue()));
00607 }
00608 }
00609
00610 send_control(MULTICAST_NAK, move(data));
00611 }
00612
00613
00614 void
00615 ReliableSession::nakack_received(const Message_Block_Ptr& control)
00616 {
00617 if (this->active_) return;
00618
00619 const TransportHeader& header =
00620 this->link_->receive_strategy()->received_header();
00621
00622
00623 if (this->remote_peer_ != header.source_) return;
00624
00625 Serializer serializer(control.get(), header.swap_bytes());
00626
00627 SequenceNumber low;
00628 serializer >> low;
00629
00630
00631
00632
00633 std::vector<SequenceRange> dropped;
00634 SequenceNumber range_low = SequenceNumber();
00635 SequenceNumber range_high = low == SequenceNumber() ? SequenceNumber() : low.previous();
00636
00637 if (range_low == SequenceNumber() && range_high == SequenceNumber()) {
00638
00639 this->nak_sequence_.insert(range_low);
00640
00641 } else if (this->nak_sequence_.insert(SequenceRange(range_low, range_high), dropped)) {
00642
00643 for (size_t i = 0; i < dropped.size(); ++i) {
00644 this->reassembly_.data_unavailable(dropped[i]);
00645 }
00646
00647 if (DCPS_debug_level > 0) {
00648 ACE_ERROR((LM_WARNING,
00649 ACE_TEXT("(%P|%t) WARNING: ReliableSession::nakack_received ")
00650 ACE_TEXT("local %#08x%08x remote %#08x%08x with low [%q] ")
00651 ACE_TEXT("- some ranges dropped.\n"),
00652 (unsigned int)(this->link()->local_peer() >> 32),
00653 (unsigned int) this->link()->local_peer(),
00654 (unsigned int)(this->remote_peer_ >> 32),
00655 (unsigned int) this->remote_peer_,
00656 low.getValue()));
00657 }
00658 }
00659 deliver_held_data();
00660 }
00661
00662 void
00663 ReliableSession::send_nakack(SequenceNumber low)
00664 {
00665 size_t len = sizeof(low.getValue());
00666
00667 Message_Block_Ptr data(new ACE_Message_Block(len));
00668
00669 Serializer serializer(data.get());
00670
00671 serializer << low;
00672
00673 send_control(MULTICAST_NAKACK, move(data));
00674 }
00675
00676 bool
00677 ReliableSession::start(bool active, bool acked)
00678 {
00679 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->start_lock_, false);
00680
00681 if (this->started_) {
00682 return true;
00683 }
00684
00685 this->active_ = active;
00686 {
00687
00688 ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, this->reverse_start_lock_, false);
00689
00690
00691
00692
00693
00694 if (!active) {
00695 if (acked) {
00696 this->set_acked();
00697 }
00698 if (!this->nak_watchdog_->schedule()) {
00699 ACE_ERROR_RETURN((LM_ERROR,
00700 ACE_TEXT("(%P|%t) ERROR: ")
00701 ACE_TEXT("ReliableSession::start: ")
00702 ACE_TEXT("failed to schedule NAK watchdog!\n")),
00703 false);
00704 }
00705 }
00706
00707
00708
00709
00710
00711
00712 if (active && !this->start_syn()) {
00713 this->nak_watchdog_->cancel();
00714 ACE_ERROR_RETURN((LM_ERROR,
00715 ACE_TEXT("(%P|%t) ERROR: ")
00716 ACE_TEXT("ReliableSession::start: ")
00717 ACE_TEXT("failed to schedule SYN watchdog!\n")),
00718 false);
00719 }
00720 }
00721
00722 return this->started_ = true;
00723 }
00724
00725 void
00726 ReliableSession::stop()
00727 {
00728 MulticastSession::stop();
00729 this->nak_watchdog_->cancel();
00730 }
00731
00732 }
00733 }
00734
00735 OPENDDS_END_VERSIONED_NAMESPACE_DECL