00001
00002
00003
00004
00005
00006
00007
00008 #ifndef DCPS_MULTICASTSESSION_H
00009 #define DCPS_MULTICASTSESSION_H
00010
00011 #include "Multicast_Export.h"
00012
00013 #include "MulticastDataLink.h"
00014 #include "MulticastTypes.h"
00015
00016 #include "ace/Message_Block.h"
00017 #include "ace/Synch_Traits.h"
00018
00019 #include "dds/DCPS/RcObject_T.h"
00020 #include "dds/DCPS/transport/framework/TransportHeader.h"
00021 #include "dds/DCPS/transport/framework/DataLinkWatchdog_T.h"
00022 #include "dds/DCPS/transport/framework/TransportReassembly.h"
00023
00024 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00025 class ACE_Reactor;
00026 ACE_END_VERSIONED_NAMESPACE_DECL
00027
00028 namespace OpenDDS {
00029 namespace DCPS {
00030
00031 class MulticastSession;
00032
00033 class OpenDDS_Multicast_Export SynWatchdog
00034 : public DataLinkWatchdog {
00035 public:
00036 explicit SynWatchdog(ACE_Reactor* reactor,
00037 ACE_thread_t owner,
00038 MulticastSession* session);
00039
00040 virtual bool reactor_is_shut_down() const;
00041
00042 protected:
00043 virtual ACE_Time_Value next_interval();
00044 virtual void on_interval(const void* arg);
00045
00046 virtual ACE_Time_Value next_timeout();
00047 virtual void on_timeout(const void* arg);
00048
00049 private:
00050 ~SynWatchdog() { }
00051 MulticastSession* session_;
00052 size_t retries_;
00053 };
00054
00055 class OpenDDS_Multicast_Export MulticastSession
00056 : public RcObject<ACE_SYNCH_MUTEX> {
00057 public:
00058 virtual ~MulticastSession();
00059
00060 MulticastDataLink* link();
00061
00062 MulticastPeer remote_peer() const;
00063
00064 bool acked();
00065 void set_acked();
00066 virtual bool is_reliable() { return false;}
00067
00068 void syn_received(ACE_Message_Block* control);
00069 void send_syn();
00070
00071 void synack_received(ACE_Message_Block* control);
00072 void send_synack();
00073 virtual void send_naks() {}
00074
00075 virtual bool check_header(const TransportHeader& header) = 0;
00076 virtual void record_header_received(const TransportHeader& header) = 0;
00077 virtual bool ready_to_deliver(const TransportHeader& header,
00078 const ReceivedDataSample& data) = 0;
00079 virtual void release_remote(const RepoId& ) {};
00080
00081 virtual bool control_received(char submessage_id,
00082 ACE_Message_Block* control);
00083
00084 virtual bool start(bool active, bool acked) = 0;
00085 virtual void stop();
00086
00087 bool reassemble(ReceivedDataSample& data, const TransportHeader& header);
00088
00089 protected:
00090 MulticastDataLink* link_;
00091
00092 MulticastPeer remote_peer_;
00093
00094 MulticastSession(ACE_Reactor* reactor,
00095 ACE_thread_t owner,
00096 MulticastDataLink* link,
00097 MulticastPeer remote_peer);
00098
00099 void send_control(char submessage_id,
00100 ACE_Message_Block* data);
00101
00102 bool start_syn();
00103
00104 virtual void syn_hook(const SequenceNumber& ) {}
00105
00106 typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
00107 Reverse_Lock_t reverse_start_lock_;
00108
00109 ACE_Thread_Mutex start_lock_;
00110 bool started_;
00111
00112
00113
00114
00115
00116
00117
00118 bool active_;
00119
00120 TransportReassembly reassembly_;
00121
00122 bool acked_;
00123
00124 private:
00125 ACE_Thread_Mutex ack_lock_;
00126 SynWatchdog* syn_watchdog_;
00127 };
00128
00129 }
00130 }
00131
00132 #ifdef __ACE_INLINE__
00133 # include "MulticastSession.inl"
00134 #endif
00135
00136 #endif