00001
00002
00003
00004
00005
00006
00007
00008 #include "BestEffortSession.h"
00009
00010 namespace OpenDDS {
00011 namespace DCPS {
00012
00013 BestEffortSession::BestEffortSession(ACE_Reactor* reactor,
00014 ACE_thread_t owner,
00015 MulticastDataLink* link,
00016 MulticastPeer remote_peer)
00017 : MulticastSession(reactor, owner, link, remote_peer)
00018 , expected_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00019 {
00020 }
00021
00022 bool
00023 BestEffortSession::check_header(const TransportHeader& header)
00024 {
00025 if (header.sequence_ != this->expected_ &&
00026 expected_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00027 VDBG_LVL((LM_WARNING,
00028 ACE_TEXT("(%P|%t) WARNING: BestEffortSession::check_header ")
00029 ACE_TEXT("expected %q received %q\n"),
00030 this->expected_.getValue(), header.sequence_.getValue()), 2);
00031 if (header.sequence_ > this->expected_) {
00032 SequenceRange range(this->expected_, header.sequence_.previous());
00033 this->reassembly_.data_unavailable(range);
00034 }
00035 }
00036
00037 this->expected_ = header.sequence_;
00038 ++this->expected_;
00039
00040
00041
00042 return true;
00043 }
00044
00045 void
00046 BestEffortSession::record_header_received(const TransportHeader& header)
00047 {
00048 if (this->remote_peer_ != header.source_) return;
00049
00050 check_header(header);
00051 }
00052
00053 bool
00054 BestEffortSession::ready_to_deliver(const TransportHeader& header,
00055 const ReceivedDataSample& )
00056 {
00057 if (expected_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
00058 && header.sequence_ == expected_.previous()) {
00059 return true;
00060 }
00061 return false;
00062 }
00063
00064 bool
00065 BestEffortSession::start(bool active, bool )
00066 {
00067 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX,
00068 guard,
00069 this->start_lock_,
00070 false);
00071
00072 if (this->started_) return true;
00073
00074 this->active_ = active;
00075
00076 if (active && !this->start_syn()) {
00077 ACE_ERROR_RETURN((LM_ERROR,
00078 ACE_TEXT("(%P|%t) ERROR: ")
00079 ACE_TEXT("BestEffortSession::start: ")
00080 ACE_TEXT("failed to schedule SYN watchdog!\n")),
00081 false);
00082 }
00083
00084 return this->started_ = true;
00085 }
00086
00087
00088 }
00089 }