TcpConnection.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_TCPCONNECTION_H
00009 #define OPENDDS_TCPCONNECTION_H
00010 
00011 #include "TcpInst_rch.h"
00012 #ifdef __BORLANDC__
00013 #  include "TcpDataLink.h"
00014 #endif
00015 #include "TcpDataLink_rch.h"
00016 #include "TcpConnection_rch.h"
00017 #include "TcpSendStrategy_rch.h"
00018 #include "TcpReceiveStrategy_rch.h"
00019 #include "TcpReconnectTask.h"
00020 #include "TcpTransport_rch.h"
00021 
00022 #include "dds/DCPS/RcObject_T.h"
00023 #include "dds/DCPS/transport/framework/TransportDefs.h"
00024 
00025 #include "ace/SOCK_Stream.h"
00026 #include "ace/Svc_Handler.h"
00027 #include "ace/INET_Addr.h"
00028 #include "ace/Synch.h"
00029 
00030 namespace OpenDDS {
00031 namespace DCPS {
00032 
00033 class TcpConnection
00034   : public ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>
00035   , public RcObject<ACE_SYNCH_MUTEX> {
00036 public:
00037 
00038   /// States are used during reconnecting.
00039   enum ReconnectState {
00040     INIT_STATE,
00041     LOST_STATE,
00042     RECONNECTED_STATE,
00043     PASSIVE_WAITING_STATE,
00044     PASSIVE_TIMEOUT_CALLED_STATE
00045   };
00046 
00047   /// Passive side constructor (acceptor)
00048   TcpConnection();
00049 
00050   /// Active side constructor (connector)
00051   TcpConnection(const ACE_INET_Addr& remote_address,
00052                 Priority priority,
00053                 const TcpInst_rch& config);
00054 
00055   virtual ~TcpConnection();
00056 
00057   std::size_t& id();
00058 
00059   /// Protocol setup (handshake) on the active side.
00060   /// The local address is sent to the remote (passive) side to
00061   /// identify ourselves to the remote side.
00062   int active_open();
00063 
00064   /// This will be called by the DataLink (that "owns" us) when
00065   /// the TcpTransport has been told to shutdown(), or when
00066   /// the DataLink finds itself no longer needed, and is
00067   /// "self-releasing".
00068   void disconnect();
00069 
00070   // Note that the acceptor or connector that calls the open() method will pass
00071   // itself in as a void*.
00072   virtual int open(void* arg);
00073 
00074   void set_receive_strategy(TcpReceiveStrategy* receive_strategy);
00075 
00076   void remove_receive_strategy();
00077 
00078   /// Give a "copy" of the TcpSendStrategy object to this
00079   /// connection object.
00080   void set_send_strategy(TcpSendStrategy* send_strategy);
00081 
00082   void remove_send_strategy();
00083 
00084   /// We pass this "event" along to the receive_strategy.
00085   virtual int handle_input(ACE_HANDLE);
00086 
00087   /// Handle back pressure when sending.
00088   virtual int handle_output(ACE_HANDLE);
00089 
00090   virtual int close(u_long);
00091   virtual int handle_close(ACE_HANDLE, ACE_Reactor_Mask);
00092 
00093   void set_sock_options(TcpInst* tcp_config);
00094 
00095   int reconnect(bool on_new_association = false);
00096 
00097   /// Return true if the object represents the connector side, otherwise
00098   /// it's the acceptor side. The acceptor/connector role is not changed
00099   /// when re-establishing the connection.
00100   bool is_connector() const;
00101 
00102   /// Return true if connection is connected.
00103   bool is_connected() const;
00104 
00105   void transfer(TcpConnection* connection);
00106 
00107   int handle_timeout(const ACE_Time_Value &tv, const void *arg);
00108 
00109   /// Cache the reference to the datalink object for lost connection
00110   /// callbacks.
00111   void set_datalink(TcpDataLink* link);
00112 
00113   void notify_lost_on_backpressure_timeout();
00114 
00115   ACE_INET_Addr get_remote_address();
00116 
00117   /// Reconnect initiated by send strategy
00118   void relink_from_send(bool do_suspend);
00119 
00120   /// Reconnect initiated by receive strategy
00121   void relink_from_recv(bool do_suspend);
00122 
00123   /// Called by the reconnect task to inform us that the
00124   /// link & any associated data can be torn down.
00125   /// This call is done with no DCPS/transport locks held.
00126   bool tear_link();
00127 
00128   void shutdown();
00129 
00130   /// Access TRANSPORT_PRIORITY.value policy value if set.
00131   Priority& transport_priority();
00132   Priority  transport_priority() const;
00133 
00134   OPENDDS_POOL_ALLOCATION_FWD
00135 
00136 private:
00137 
00138   /// Attempt an active connection establishment to the remote address.
00139   /// The local address is sent to the remote (passive) side to
00140   /// identify ourselves to the remote side.
00141   /// Note this method is not thread protected. The caller need acquire
00142   /// the reconnect_lock_ before calling this function.
00143   int active_establishment(bool initiate_connect = true);
00144 
00145   int active_reconnect_i();
00146   int passive_reconnect_i();
00147   int active_reconnect_on_new_association();
00148 
00149   /// During the connection setup phase, the passive side sets passive_setup_,
00150   /// redirecting handle_input() events here (there is no recv strategy yet).
00151   int handle_setup_input(ACE_HANDLE h);
00152 
00153   typedef ACE_SYNCH_MUTEX     LockType;
00154   typedef ACE_Guard<LockType> GuardType;
00155 
00156   /// Lock to avoid the reconnect() called multiple times when
00157   /// both send() and recv() fail.
00158   LockType  reconnect_lock_;
00159 
00160   /// Flag indicates if connected or disconneted. It's set to true
00161   /// when actively connecting or passively acepting succeeds and set
00162   /// to false whenever the peer stream is closed.
00163   ACE_Atomic_Op<ACE_SYNCH_MUTEX, bool>  connected_;
00164 
00165   /// Flag indicate this connection object is the connector or acceptor.
00166   bool is_connector_;
00167 
00168   /// Reference to the receiving strategy.
00169   TcpReceiveStrategy_rch receive_strategy_;
00170 
00171   /// Reference to the send strategy.
00172   TcpSendStrategy_rch send_strategy_;
00173 
00174   /// Remote address.
00175   ACE_INET_Addr remote_address_;
00176 
00177   /// Local address.
00178   ACE_INET_Addr local_address_;
00179 
00180   /// The configuration used by this connection.
00181   TcpInst_rch tcp_config_;
00182 
00183   /// Datalink object which is needed for connection lost callback.
00184   TcpDataLink_rch link_;
00185 
00186   /// The id of the scheduled timer. The timer is scheduled to check if the connection
00187   /// is re-established during the passive_reconnect_duration_. This id controls
00188   /// that the timer is just scheduled once when there are multiple threads detect
00189   /// the lost connection.
00190   int passive_reconnect_timer_id_;
00191 
00192   /// The task to do the reconnecting.
00193   /// TODO: We might need reuse the PerConnectionSynch thread
00194   /// to do the reconnecting or create the reconnect task when
00195   /// we need reconnect.
00196   TcpReconnectTask reconnect_task_;
00197 
00198   /// The state indicates each step of the reconnecting.
00199   ReconnectState reconnect_state_;
00200 
00201   /// Last time the connection is re-established.
00202   ACE_Time_Value last_reconnect_attempted_;
00203 
00204   /// TRANSPORT_PRIORITY.value policy value.
00205   Priority transport_priority_;
00206 
00207   /// shutdown flag
00208   bool shutdown_;
00209 
00210   bool passive_setup_;
00211   ACE_Message_Block passive_setup_buffer_;
00212   TcpTransport_rch transport_during_setup_;
00213 
00214   /// Small unique identifying value.
00215   std::size_t id_;
00216 };
00217 
00218 } // namespace DCPS
00219 } // namespace OpenDDS
00220 
00221 #if defined (__ACE_INLINE__)
00222 #include "TcpConnection.inl"
00223 #endif /* __ACE_INLINE__ */
00224 
00225 #endif  /* OPENDDS_TCPCONNECTION_H */

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7