OpenDDS  Snapshot(2023/04/28-20:55)
TcpDataLink.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_TRANSPORT_TCP_TCPDATALINK_H
7 #define OPENDDS_DCPS_TRANSPORT_TCP_TCPDATALINK_H
8 
9 #include "TcpConnection_rch.h"
10 #include "TcpTransport.h"
11 
12 #include <dds/DCPS/AtomicBool.h>
14 
15 #include <ace/INET_Addr.h>
16 
18 
19 namespace OpenDDS {
20 namespace DCPS {
21 
22 class TcpSendStrategy;
23 class TcpReceiveStrategy;
24 
25 class TcpDataLink : public DataLink {
26 public:
27 
28  TcpDataLink(const TcpTransport_rch& transport_impl,
31  bool is_loopback,
32  bool is_active);
33  virtual ~TcpDataLink();
34 
35  /// Accessor for the remote address.
36  const ACE_INET_Addr& remote_address() const;
37 
38  /// Called when an established connection object is available
39  /// for this TcpDataLink. Called by the TcpTransport's
40  /// connect_datalink() method.
41  int connect(const TcpConnection_rch& connection,
44 
45  int reuse_existing_connection(const TcpConnection_rch& connection);
46  int reconnect(const TcpConnection_rch& connection);
47 
49 
50  bool check_active_client(const GUID_t& local_id);
51 
52  void client_stop(const GUID_t& local_id);
53 
54  virtual void pre_stop_i();
55 
56  /// Set release pending flag.
57  void set_release_pending(bool flag);
58  /// Get release pending flag.
59  bool is_release_pending() const;
60 
61  void ack_received(const ReceivedDataSample& sample);
62  void request_ack_received(const ReceivedDataSample& sample);
64 
67 
68  int make_reservation(const GUID_t& remote_subscription_id,
69  const GUID_t& local_publication_id,
70  const TransportSendListener_wrch& send_listener,
71  bool reliable);
72 
73  int make_reservation(const GUID_t& remote_publication_id,
74  const GUID_t& local_subscription_id,
75  const TransportReceiveListener_wrch& receive_listener,
76  bool reliable);
77 
79 
80 protected:
81 
82  /// Called when the DataLink is self-releasing because all of its
83  /// reservations have been released, or when the TransportImpl is
84  /// handling a shutdown() call.
85  virtual void stop_i();
86 
87  virtual void send_i(TransportQueueElement* element, bool relink = true);
88  virtual void send_stop_i(GUID_t repoId);
89 
90 private:
93  void send_association_msg(const GUID_t& local, const GUID_t& remote);
94 
99  typedef OPENDDS_VECTOR(TransportQueueElement*) PendingRequestAcks;
101  PendingRequestAcks pending_request_acks_;
102  typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) RepoIdSetType;
103  RepoIdSetType stopped_clients_;
105 };
106 
107 } // namespace DCPS
108 } // namespace OpenDDS
109 
111 
112 #if defined (__ACE_INLINE__)
113 #include "TcpDataLink.inl"
114 #endif /* __ACE_INLINE__ */
115 
116 #endif /* OPENDDS_TCPDATALINK_H */
bool handle_send_request_ack(TransportQueueElement *element)
WeakRcHandle< TcpConnection > connection_
Definition: TcpDataLink.h:96
#define ACE_SYNCH_MUTEX
ACE_INET_Addr remote_address_
Definition: TcpDataLink.h:95
void request_ack_received(const ReceivedDataSample &sample)
ACE_Thread_Mutex stopped_clients_mutex_
Definition: TcpDataLink.h:104
typedef OPENDDS_VECTOR(TransportQueueElement *) PendingRequestAcks
PendingRequestAcks pending_request_acks_
Definition: TcpDataLink.h:101
virtual void send_i(TransportQueueElement *element, bool relink=true)
Definition: TcpDataLink.cpp:68
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22
typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) RepoIdSetType
void ack_received(const ReceivedDataSample &sample)
TcpDataLink(const TcpTransport_rch &transport_impl, const ACE_INET_Addr &remote_address, Priority priority, bool is_loopback, bool is_active)
Definition: TcpDataLink.cpp:30
void send_association_msg(const GUID_t &local, const GUID_t &remote)
virtual void send_stop_i(GUID_t repoId)
Definition: TcpDataLink.cpp:79
int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
int connect(const TcpConnection_rch &connection, const RcHandle< TcpSendStrategy > &send_strategy, const RcHandle< TcpReceiveStrategy > &receive_strategy)
int reconnect(const TcpConnection_rch &connection)
TcpSendStrategy_rch send_strategy()
Holds a data sample received by the transport.
AtomicBool release_is_pending_
Definition: TcpDataLink.h:98
TcpReceiveStrategy_rch receive_strategy()
bool check_active_client(const GUID_t &local_id)
Definition: TcpDataLink.cpp:88
virtual int priority(void) const
void set_release_pending(bool flag)
Set release pending flag.
RepoIdSetType stopped_clients_
Definition: TcpDataLink.h:103
ACE_SYNCH_MUTEX pending_request_acks_lock_
Definition: TcpDataLink.h:100
void client_stop(const GUID_t &local_id)
Definition: TcpDataLink.cpp:95
ACE_CDR::Long Priority
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
bool is_release_pending() const
Get release pending flag.
int reuse_existing_connection(const TcpConnection_rch &connection)
const ACE_INET_Addr & remote_address() const
Accessor for the remote address.
Definition: TcpDataLink.inl:15
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Base wrapper class around a data/control sample to be sent.