00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_RELIABLESESSION_H
00009 #define DCPS_RELIABLESESSION_H
00010
00011 #include "Multicast_Export.h"
00012
00013 #include "MulticastSession.h"
00014 #include "MulticastTypes.h"
00015
00016 #include "ace/Synch_Traits.h"
00017
00018 #include "dds/DCPS/DisjointSequence.h"
00019 #include "dds/DCPS/PoolAllocator.h"
00020
00021 namespace OpenDDS {
00022 namespace DCPS {
00023
00024 class ReliableSession;
00025
00026 class OpenDDS_Multicast_Export NakWatchdog
00027 : public DataLinkWatchdog {
00028 public:
00029 explicit NakWatchdog(ACE_Reactor* reactor,
00030 ACE_thread_t owner,
00031 ReliableSession* session);
00032
00033 virtual bool reactor_is_shut_down() const;
00034
00035 protected:
00036 virtual ACE_Time_Value next_interval();
00037 virtual void on_interval(const void* arg);
00038
00039 private:
00040 ~NakWatchdog() { }
00041 ReliableSession* session_;
00042 };
00043
00044 class OpenDDS_Multicast_Export ReliableSession
00045 : public MulticastSession {
00046 public:
00047 ReliableSession(ACE_Reactor* reactor,
00048 ACE_thread_t owner,
00049 MulticastDataLink* link,
00050 MulticastPeer remote_peer);
00051
00052 ~ReliableSession();
00053
00054 virtual bool check_header(const TransportHeader& header);
00055 virtual void record_header_received(const TransportHeader& header);
00056
00057 virtual bool ready_to_deliver(const TransportHeader& header,
00058 const ReceivedDataSample& data);
00059 void deliver_held_data();
00060 virtual void release_remote(const RepoId& remote);
00061
00062 virtual bool control_received(char submessage_id,
00063 ACE_Message_Block* control);
00064
00065 void expire_naks();
00066 void send_naks();
00067
00068 void nak_received(ACE_Message_Block* control);
00069 void send_naks(DisjointSequence& found);
00070
00071 void nakack_received(ACE_Message_Block* control);
00072 virtual void send_nakack(SequenceNumber low);
00073
00074 virtual bool start(bool active, bool acked);
00075 virtual void stop();
00076 virtual bool is_reliable() { return true;}
00077
00078 virtual void syn_hook(const SequenceNumber& seq);
00079
00080 private:
00081 NakWatchdog* nak_watchdog_;
00082
00083 DisjointSequence nak_sequence_;
00084
00085 typedef OPENDDS_MAP(ACE_Time_Value, SequenceNumber) NakRequestMap;
00086 NakRequestMap nak_requests_;
00087
00088 ACE_Thread_Mutex held_lock_;
00089 typedef SequenceNumber TransportHeaderSN;
00090 OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample) held_;
00091
00092 typedef OPENDDS_SET(SequenceRange) NakPeerSet;
00093 NakPeerSet nak_peers_;
00094 };
00095
00096 }
00097 }
00098
00099 #endif