MulticastSession.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef DCPS_MULTICASTSESSION_H
00009 #define DCPS_MULTICASTSESSION_H
00010 
00011 #include "Multicast_Export.h"
00012 
00013 #include "MulticastDataLink.h"
00014 #include "MulticastTypes.h"
00015 
00016 #include "ace/Message_Block.h"
00017 #include "ace/Synch_Traits.h"
00018 
00019 #include "dds/DCPS/RcObject_T.h"
00020 #include "dds/DCPS/transport/framework/TransportHeader.h"
00021 #include "dds/DCPS/transport/framework/DataLinkWatchdog_T.h"
00022 #include "dds/DCPS/transport/framework/TransportReassembly.h"
00023 
00024 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00025 class ACE_Reactor;
00026 ACE_END_VERSIONED_NAMESPACE_DECL
00027 
00028 namespace OpenDDS {
00029 namespace DCPS {
00030 
00031 class MulticastSession;
00032 
00033 class OpenDDS_Multicast_Export SynWatchdog
00034   : public DataLinkWatchdog {
00035 public:
00036   explicit SynWatchdog(ACE_Reactor* reactor,
00037                        ACE_thread_t owner,
00038                        MulticastSession* session);
00039 
00040   virtual bool reactor_is_shut_down() const;
00041 
00042 protected:
00043   virtual ACE_Time_Value next_interval();
00044   virtual void on_interval(const void* arg);
00045 
00046   virtual ACE_Time_Value next_timeout();
00047   virtual void on_timeout(const void* arg);
00048 
00049 private:
00050   ~SynWatchdog() { }
00051   MulticastSession* session_;
00052   size_t retries_;
00053 };
00054 
00055 class OpenDDS_Multicast_Export MulticastSession
00056   : public RcObject<ACE_SYNCH_MUTEX> {
00057 public:
00058   virtual ~MulticastSession();
00059 
00060   MulticastDataLink* link();
00061 
00062   MulticastPeer remote_peer() const;
00063 
00064   bool acked();
00065   void set_acked();
00066   virtual bool is_reliable() { return false;}
00067 
00068   void syn_received(ACE_Message_Block* control);
00069   void send_syn();
00070 
00071   void synack_received(ACE_Message_Block* control);
00072   void send_synack();
00073   virtual void send_naks() {}
00074 
00075   virtual bool check_header(const TransportHeader& header) = 0;
00076   virtual void record_header_received(const TransportHeader& header) = 0;
00077   virtual bool ready_to_deliver(const TransportHeader& header,
00078                                 const ReceivedDataSample& data) = 0;
00079   virtual void release_remote(const RepoId& /*remote*/) {};
00080 
00081   virtual bool control_received(char submessage_id,
00082                                 ACE_Message_Block* control);
00083 
00084   virtual bool start(bool active, bool acked) = 0;
00085   virtual void stop();
00086 
00087   bool reassemble(ReceivedDataSample& data, const TransportHeader& header);
00088 
00089 protected:
00090   MulticastDataLink* link_;
00091 
00092   MulticastPeer remote_peer_;
00093 
00094   MulticastSession(ACE_Reactor* reactor,
00095                    ACE_thread_t owner,
00096                    MulticastDataLink* link,
00097                    MulticastPeer remote_peer);
00098 
00099   void send_control(char submessage_id,
00100                     ACE_Message_Block* data);
00101 
00102   bool start_syn();
00103 
00104   virtual void syn_hook(const SequenceNumber& /*seq*/) {}
00105 
00106   typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
00107   Reverse_Lock_t reverse_start_lock_;
00108 
00109   ACE_Thread_Mutex start_lock_;
00110   bool started_;
00111 
00112   // A session must be for a publisher
00113   // or subscriber.  Implementation doesn't
00114   // support being for both.
00115   // As to control message,
00116   // only subscribers receive syn, send synack, send naks, receive nakack,
00117   // and publisher only send syn, receive synack, receive naks, send nakack.
00118   bool active_;
00119 
00120   TransportReassembly reassembly_;
00121 
00122   bool acked_;
00123 
00124 private:
00125   ACE_Thread_Mutex ack_lock_;
00126   SynWatchdog* syn_watchdog_;
00127 };
00128 
00129 } // namespace DCPS
00130 } // namespace OpenDDS
00131 
00132 #ifdef __ACE_INLINE__
00133 # include "MulticastSession.inl"
00134 #endif  /* __ACE_INLINE__ */
00135 
00136 #endif  /* DCPS_MULTICASTSESSION_H */

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7