BestEffortSession.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "BestEffortSession.h"
00009
00010 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00011
00012 namespace OpenDDS {
00013 namespace DCPS {
00014
00015 BestEffortSession::BestEffortSession(ACE_Reactor* reactor,
00016 ACE_thread_t owner,
00017 MulticastDataLink* link,
00018 MulticastPeer remote_peer)
00019 : MulticastSession(reactor, owner, link, remote_peer)
00020 , expected_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00021 {
00022 }
00023
00024 bool
00025 BestEffortSession::check_header(const TransportHeader& header)
00026 {
00027 if (header.sequence_ != this->expected_ &&
00028 expected_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00029 VDBG_LVL((LM_WARNING,
00030 ACE_TEXT("(%P|%t) WARNING: BestEffortSession::check_header ")
00031 ACE_TEXT("expected %q received %q\n"),
00032 this->expected_.getValue(), header.sequence_.getValue()), 2);
00033 if (header.sequence_ > this->expected_) {
00034 SequenceRange range(this->expected_, header.sequence_.previous());
00035 this->reassembly_.data_unavailable(range);
00036 }
00037 }
00038
00039 this->expected_ = header.sequence_;
00040 ++this->expected_;
00041
00042
00043
00044 return true;
00045 }
00046
00047 void
00048 BestEffortSession::record_header_received(const TransportHeader& header)
00049 {
00050 if (this->remote_peer_ != header.source_) return;
00051
00052 check_header(header);
00053 }
00054
00055 bool
00056 BestEffortSession::ready_to_deliver(const TransportHeader& header,
00057 const ReceivedDataSample& )
00058 {
00059 if (expected_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
00060 && header.sequence_ == expected_.previous()) {
00061 return true;
00062 }
00063 return false;
00064 }
00065
00066 bool
00067 BestEffortSession::start(bool active, bool )
00068 {
00069 ACE_GUARD_RETURN(ACE_SYNCH_MUTEX,
00070 guard,
00071 this->start_lock_,
00072 false);
00073
00074 if (this->started_) return true;
00075
00076 this->active_ = active;
00077
00078 if (active && !this->start_syn()) {
00079 ACE_ERROR_RETURN((LM_ERROR,
00080 ACE_TEXT("(%P|%t) ERROR: ")
00081 ACE_TEXT("BestEffortSession::start: ")
00082 ACE_TEXT("failed to schedule SYN watchdog!\n")),
00083 false);
00084 }
00085
00086 return this->started_ = true;
00087 }
00088
00089
00090 }
00091 }
00092
00093 OPENDDS_END_VERSIONED_NAMESPACE_DECL