ReliableSession.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_RELIABLESESSION_H
00009 #define DCPS_RELIABLESESSION_H
00010 
00011 #include "Multicast_Export.h"
00012 
00013 #include "MulticastSession.h"
00014 #include "MulticastTypes.h"
00015 
00016 #include "ace/Synch_Traits.h"
00017 
00018 #include "dds/DCPS/DisjointSequence.h"
00019 #include "dds/DCPS/PoolAllocator.h"
00020 
00021 namespace OpenDDS {
00022 namespace DCPS {
00023 
00024 class ReliableSession;
00025 
00026 class OpenDDS_Multicast_Export NakWatchdog
00027   : public DataLinkWatchdog {
00028 public:
00029   explicit NakWatchdog(ACE_Reactor* reactor,
00030                        ACE_thread_t owner,
00031                        ReliableSession* session);
00032 
00033   virtual bool reactor_is_shut_down() const;
00034 
00035 protected:
00036   virtual ACE_Time_Value next_interval();
00037   virtual void on_interval(const void* arg);
00038 
00039 private:
00040   ~NakWatchdog() { }
00041   ReliableSession* session_;
00042 };
00043 
00044 class OpenDDS_Multicast_Export ReliableSession
00045   : public MulticastSession {
00046 public:
00047   ReliableSession(ACE_Reactor* reactor,
00048                   ACE_thread_t owner,
00049                   MulticastDataLink* link,
00050                   MulticastPeer remote_peer);
00051 
00052   ~ReliableSession();
00053 
00054   virtual bool check_header(const TransportHeader& header);
00055   virtual void record_header_received(const TransportHeader& header);
00056 
00057   virtual bool ready_to_deliver(const TransportHeader& header,
00058                                 const ReceivedDataSample& data);
00059   void deliver_held_data();
00060   virtual void release_remote(const RepoId& remote);
00061 
00062   virtual bool control_received(char submessage_id,
00063                                 ACE_Message_Block* control);
00064 
00065   void expire_naks();
00066   void send_naks();
00067 
00068   void nak_received(ACE_Message_Block* control);
00069   void send_naks(DisjointSequence& found);
00070 
00071   void nakack_received(ACE_Message_Block* control);
00072   virtual void send_nakack(SequenceNumber low);
00073 
00074   virtual bool start(bool active, bool acked);
00075   virtual void stop();
00076   virtual bool is_reliable() { return true;}
00077 
00078   virtual void syn_hook(const SequenceNumber& seq);
00079 
00080 private:
00081   NakWatchdog* nak_watchdog_;
00082 
00083   DisjointSequence nak_sequence_;
00084 
00085   typedef OPENDDS_MAP(ACE_Time_Value, SequenceNumber) NakRequestMap;
00086   NakRequestMap nak_requests_;
00087 
00088   ACE_Thread_Mutex held_lock_;
00089   typedef SequenceNumber TransportHeaderSN;
00090   OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample) held_;
00091 
00092   typedef OPENDDS_SET(SequenceRange) NakPeerSet;
00093   NakPeerSet nak_peers_;
00094 };
00095 
00096 } // namespace DCPS
00097 } // namespace OpenDDS
00098 
00099 #endif  /* DCPS_RELIABLESESSION_H */

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7