OpenDDS  Snapshot(2023/04/28-20:55)
TransportClient.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTCLIENT_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTCLIENT_H
10 
11 #include "TransportConfig_rch.h"
12 #include "TransportImpl.h"
13 #include "DataLinkSet.h"
14 
15 #include <dds/DCPS/dcps_export.h>
19 #include <dds/DCPS/PoolAllocator.h>
24 
25 #include <ace/Time_Value.h>
26 #include <ace/Event_Handler.h>
27 #include <ace/Reverse_Lock_T.h>
28 
29 // Forward definition of a test-friendly class in the global name space
30 class DDS_TEST;
31 
33 
34 namespace OpenDDS {
35 namespace DCPS {
36 
37 class SendStateDataSampleList;
38 
39 /**
40  * @brief Mix-in class for DDS entities which directly use the transport layer.
41  *
42  * DataReaderImpl and DataWriterImpl are TransportClients. The TransportClient
43  * class manages the TransportImpl objects that represent the available
44  * communication mechanisms and the DataLink objects that represent the
45  * currently active communication channels to peers.
46  */
48  : public virtual RcObject
49 {
50 public:
51  // Used by TransportImpl to complete associate() processing:
52  void use_datalink(const GUID_t& remote_id, const DataLink_rch& link);
53 
54  // values for flags parameter of transport_assoc_done():
55  enum { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };
57  virtual ~TransportClient();
58 
59 
60  // Local setup:
61 
62  void enable_transport(bool reliable, bool durable);
63  void enable_transport_using_config(bool reliable, bool durable,
64  const TransportConfig_rch& tc);
65 
66  bool swap_bytes() const { return swap_bytes_; }
67  bool cdr_encapsulation() const { return cdr_encapsulation_; }
68  const TransportLocatorSeq& connection_info() const { return conn_info_; }
69  void populate_connection_info();
70  bool is_reliable() const { return reliable_; }
71 
72  // Managing associations to remote peers:
73 
74  bool associate(const AssociationData& peer, bool active);
75  void disassociate(const GUID_t& peerId);
76  void stop_associating();
77  void stop_associating(const GUID_t* repos, CORBA::ULong length);
78  void send_final_acks();
79  void transport_stop();
80 
81  // Discovery:
82  void register_for_reader(const GUID_t& participant,
83  const GUID_t& writerid,
84  const GUID_t& readerid,
85  const TransportLocatorSeq& locators,
87 
88  void unregister_for_reader(const GUID_t& participant,
89  const GUID_t& writerid,
90  const GUID_t& readerid);
91 
92  void register_for_writer(const GUID_t& participant,
93  const GUID_t& readerid,
94  const GUID_t& writerid,
95  const TransportLocatorSeq& locators,
96  DiscoveryListener* listener);
97 
98  void unregister_for_writer(const GUID_t& participant,
99  const GUID_t& readerid,
100  const GUID_t& writerid);
101 
102  void update_locators(const GUID_t& remote,
103  const TransportLocatorSeq& locators);
104 
105  WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
106 
107  // Data transfer:
108 
109  bool send_response(const GUID_t& peer,
110  const DataSampleHeader& header,
111  Message_Block_Ptr payload); // [DR]
112 
113  void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id = 0);
114 
115  SendControlStatus send_w_control(SendStateDataSampleList send_list,
116  const DataSampleHeader& header,
117  Message_Block_Ptr msg,
118  const GUID_t& destination);
119 
120  SendControlStatus send_control(const DataSampleHeader& header,
121  Message_Block_Ptr msg);
122 
123  SendControlStatus send_control_to(const DataSampleHeader& header,
124  Message_Block_Ptr msg,
125  const GUID_t& destination);
126 
127  bool remove_sample(const DataSampleElement* sample);
128  bool remove_all_msgs();
129 
130  virtual void add_link(const DataLink_rch& link, const GUID_t& peer);
131  virtual GUID_t get_guid() const = 0;
133 
134  void terminate_send_if_suspended();
135 
136  bool associated_with(const GUID_t& remote) const;
137  bool pending_association_with(const GUID_t& remote) const;
138 
139  GUID_t repo_id() const
140  {
142  return repo_id_;
143  }
144 
145  void data_acked(const GUID_t& remote);
146 
147  bool is_leading(const GUID_t& reader_id) const;
148 
149 protected:
150  void cdr_encapsulation(bool encap)
151  {
152  cdr_encapsulation_ = encap;
153  }
154 
155 private:
156 
157  // Implemented by derived classes (DataReaderImpl/DataWriterImpl)
158  virtual bool check_transport_qos(const TransportInst& inst) = 0;
159  virtual DDS::DomainId_t domain_id() const = 0;
160  virtual Priority get_priority_value(const AssociationData& data) const = 0;
161  virtual void transport_assoc_done(int /*flags*/, const GUID_t& /*remote*/) {}
163 
164 
165 
166 #if defined(OPENDDS_SECURITY)
168  {
169  return DDS::HANDLE_NIL;
170  }
171 #endif
172 
173  // helpers
175  void use_datalink_i(const GUID_t& remote_id,
176  const DataLink_rch& link,
177  Guard& guard);
178  TransportSendListener_rch get_send_listener();
179  TransportReceiveListener_rch get_receive_listener();
180 
181  //helper for initiating connection, called by PendingAssoc objects
182  //allows PendingAssoc to temporarily release lock_ to allow
183  //TransportImpl to access Reactor if needed
184  bool initiate_connect_i(TransportImpl::AcceptConnectResult& result,
185  TransportImpl_rch impl,
186  const TransportImpl::RemoteTransport& remote,
187  const TransportImpl::ConnectionAttribs& attribs_,
188  Guard& guard);
189 
190  void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id);
191 
192  // A class, normally provided by an unit test, who needs access to a client's
193  // privates.
194  friend class ::DDS_TEST;
195 
196  typedef OPENDDS_MAP_CMP(GUID_t, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex;
197  typedef OPENDDS_VECTOR(WeakRcHandle<TransportImpl>) ImplsType;
198 
202  bool active_, scheduled_;
203  ImplsType impls_;
208 
210  : active_(false)
211  , scheduled_(false)
212  , blob_index_(0)
213  , client_(tc_rch)
214  {}
215 
216  void reset_client();
217  bool safe_to_remove();
218  bool initiate_connect(TransportClient* tc, Guard& guard);
219  int handle_timeout(const ACE_Time_Value& time, const void* arg);
220  };
221 
223 
224  typedef OPENDDS_MAP_CMP(GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PendingMap;
225  typedef OPENDDS_MULTIMAP_CMP(GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PrevPendingMap;
226 
227  void clean_prev_pending();
228 
230  public:
232  ACE_thread_t owner)
233  : ReactorInterceptor(reactor, owner)
234  , timer_id_(-1)
235  { }
236 
237  void schedule_timer(TransportClient_rch transport_client, const PendingAssoc_rch& pend)
238  {
239  execute_or_enqueue(make_rch<ScheduleCommand>(this, transport_client, pend));
240  }
241 
242  ReactorInterceptor::CommandPtr cancel_timer(const PendingAssoc_rch& pend)
243  {
244  return execute_or_enqueue(make_rch<CancelCommand>(this, pend));
245  }
246 
247  void set_id(long id) { timer_id_ = id; }
248  long get_id() const { return timer_id_; }
249 
250  virtual bool reactor_is_shut_down() const
251  {
252  return TheServiceParticipant->is_shut_down();
253  }
254 
255  private:
257  { }
258 
259  class CommandBase : public Command {
260  public:
262  const PendingAssoc_rch& assoc)
263  : timer_ (timer)
264  , assoc_ (assoc)
265  { }
266  protected:
268  PendingAssoc_rch assoc_;
269  };
270  struct ScheduleCommand : public CommandBase {
272  TransportClient_rch transport_client,
273  const PendingAssoc_rch& assoc)
274  : CommandBase (timer, assoc)
275  , transport_client_ (transport_client)
276  { }
277  virtual void execute()
278  {
279  if (timer_->reactor()) {
280  TransportClient_rch client = transport_client_.lock();
281  if (client) {
282  ACE_Guard<ACE_Thread_Mutex> guard(assoc_->mutex_);
283  assoc_->scheduled_ = true;
284  long id = timer_->reactor()->schedule_timer(assoc_.in(),
285  client.in(),
286  client->passive_connect_duration_.value());
287  if (id != -1) {
288  timer_->set_id(id);
289  }
290  }
291  }
292  }
294  };
295  struct CancelCommand : public CommandBase {
297  const PendingAssoc_rch& assoc)
298  : CommandBase (timer, assoc)
299  { }
300  virtual void execute()
301  {
302  if (timer_->reactor() && timer_->get_id()) {
303  ACE_Guard<ACE_Thread_Mutex> guard(assoc_->mutex_);
304  timer_->reactor()->cancel_timer(timer_->get_id());
305  timer_->set_id(-1);
306  assoc_->scheduled_ = false;
307  }
308  }
309  };
310  long timer_id_;
311  };
313 
314  // Associated Impls and DataLinks:
315 
317  ImplsType impls_;
318  PendingMap pending_;
319  PrevPendingMap prev_pending_;
321 
322  DataLinkIndex data_link_index_;
323 
324  // Used to allow sends to completed as a transaction and block
325  // multi-threaded writers from proceeding to send data
326  // on two thread simultaneously, which could cause out-of-order data.
330 
331  //max_transaction_tail_ will always be the tail of the
332  //max transaction that has been observed or 0 if this is
333  //the first transaction or a transaction after the expected
334  //value was met and thus reset to 0 indicating the samples were
335  //sent up to max_transaction_id_
337 
338  // Configuration details:
339 
340  bool swap_bytes_, cdr_encapsulation_, reliable_, durable_;
341 
343 
345 
346  /// Seems to protect accesses to impls_, pending_, links_, data_link_index_
348 
349  Reverse_Lock_t reverse_lock_;
350 
352 };
353 
355 
356 }
357 }
358 
360 
361 #endif
CommandBase(PendingAssocTimer *timer, const PendingAssoc_rch &assoc)
PendingAssocTimer(ACE_Reactor *reactor, ACE_thread_t owner)
WeakRcHandle< TransportClient > client_
virtual SequenceNumber get_max_sn() const
const InstanceHandle_t HANDLE_NIL
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
virtual RcHandle< BitSubscriber > get_builtin_subscriber_proxy() const
ReactorInterceptor::CommandPtr cancel_timer(const PendingAssoc_rch &pend)
virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
time_t time(time_t *tloc=0)
const TransportLocatorSeq & connection_info() const
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
void schedule_timer(TransportClient_rch transport_client, const PendingAssoc_rch &pend)
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
ACE_Thread_Mutex lock_
Seems to protect accesses to impls_, pending_, links_, data_link_index_.
sequence< TransportLocator > TransportLocatorSeq
ACE_Guard< ACE_Thread_Mutex > lock_
TransportImpl::ConnectionAttribs attribs_
DataSampleElement * max_transaction_tail_
DOMAINID_TYPE_NATIVE DomainId_t
#define OPENDDS_MULTIMAP_CMP(K, T, C)
ACE_CDR::ULong ULong
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
RcHandle< TransportClient > TransportClient_rch
DWORD ACE_thread_t
RcHandle< PendingAssocTimer > pending_assoc_timer_
TransportLocatorSeq conn_info_
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
ScheduleCommand(PendingAssocTimer *timer, TransportClient_rch transport_client, const PendingAssoc_rch &assoc)
virtual void transport_assoc_done(int, const GUID_t &)
ACE_Guard< ACE_Thread_Mutex > Guard
RcHandle< PendingAssoc > PendingAssoc_rch
Mix-in class for DDS entities which directly use the transport layer.
ACE_Reverse_Lock< ACE_Thread_Mutex > Reverse_Lock_t
unsigned long long ACE_UINT64
ssize_t send_i(ACE_HANDLE handle, const void *buf, size_t len)
ACE_CDR::Long Priority
Sequence number abstraction. Only allows positive 64 bit values.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
CancelCommand(PendingAssocTimer *timer, const PendingAssoc_rch &assoc)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
PendingAssoc(RcHandle< TransportClient > tc_rch)
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
#define TheServiceParticipant
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
ACE_Thread_Mutex send_transaction_lock_
SendControlStatus
Return code type for send_control() operations.