MulticastSession.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_MULTICASTSESSION_H
00009 #define DCPS_MULTICASTSESSION_H
00010
00011 #include "Multicast_Export.h"
00012
00013 #include "MulticastDataLink.h"
00014 #include "MulticastTypes.h"
00015
00016 #include "ace/Message_Block.h"
00017 #include "ace/Synch_Traits.h"
00018
00019 #include "dds/DCPS/RcObject.h"
00020 #include "dds/DCPS/transport/framework/TransportHeader.h"
00021 #include "dds/DCPS/transport/framework/DataLinkWatchdog_T.h"
00022 #include "dds/DCPS/transport/framework/TransportReassembly.h"
00023 #include "dds/DCPS/RcEventHandler.h"
00024
00025 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00026 class ACE_Reactor;
00027 ACE_END_VERSIONED_NAMESPACE_DECL
00028
00029 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00030
00031 namespace OpenDDS {
00032 namespace DCPS {
00033
00034 class MulticastSession;
00035
00036 class OpenDDS_Multicast_Export SynWatchdog
00037 : public DataLinkWatchdog {
00038 public:
00039 explicit SynWatchdog(ACE_Reactor* reactor,
00040 ACE_thread_t owner,
00041 MulticastSession* session);
00042
00043 virtual bool reactor_is_shut_down() const;
00044
00045 protected:
00046 virtual ACE_Time_Value next_interval();
00047 virtual void on_interval(const void* arg);
00048
00049 virtual ACE_Time_Value next_timeout();
00050 virtual void on_timeout(const void* arg);
00051
00052 private:
00053 ~SynWatchdog() { }
00054 MulticastSession* session_;
00055 size_t retries_;
00056 };
00057
00058 class OpenDDS_Multicast_Export MulticastSession
00059 : public RcObject {
00060 public:
00061 virtual ~MulticastSession();
00062
00063 MulticastDataLink* link();
00064
00065 MulticastPeer remote_peer() const;
00066
00067 bool acked();
00068 void set_acked();
00069 virtual bool is_reliable() { return false;}
00070
00071 void syn_received(const Message_Block_Ptr& control);
00072 void send_syn();
00073
00074 void synack_received(const Message_Block_Ptr& control);
00075 void send_synack();
00076 virtual void send_naks() {}
00077
00078 virtual bool check_header(const TransportHeader& header) = 0;
00079 virtual void record_header_received(const TransportHeader& header) = 0;
00080 virtual bool ready_to_deliver(const TransportHeader& header,
00081 const ReceivedDataSample& data) = 0;
00082 virtual void release_remote(const RepoId& ) {};
00083
00084 virtual bool control_received(char submessage_id,
00085 const Message_Block_Ptr& control);
00086
00087 virtual bool start(bool active, bool acked) = 0;
00088 virtual void stop();
00089
00090 bool reassemble(ReceivedDataSample& data, const TransportHeader& header);
00091
00092 protected:
00093 MulticastDataLink* link_;
00094
00095 MulticastPeer remote_peer_;
00096
00097 MulticastSession(ACE_Reactor* reactor,
00098 ACE_thread_t owner,
00099 MulticastDataLink* link,
00100 MulticastPeer remote_peer);
00101
00102 void send_control(char submessage_id,
00103 Message_Block_Ptr data);
00104
00105 bool start_syn();
00106
00107 virtual void syn_hook(const SequenceNumber& ) {}
00108
00109 typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
00110 Reverse_Lock_t reverse_start_lock_;
00111
00112 ACE_Thread_Mutex start_lock_;
00113 bool started_;
00114
00115
00116
00117
00118
00119
00120
00121 bool active_;
00122
00123 TransportReassembly reassembly_;
00124
00125 bool acked_;
00126
00127 private:
00128 ACE_Thread_Mutex ack_lock_;
00129 RcHandle<SynWatchdog> syn_watchdog_;
00130 };
00131
00132 }
00133 }
00134
00135 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00136
00137 #ifdef __ACE_INLINE__
00138 # include "MulticastSession.inl"
00139 #endif
00140
00141 #endif
00142