OpenDDS  Snapshot(2023/04/28-20:55)
TransportInst.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTINST_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTINST_H
10 
11 #include <ace/config.h>
12 #if !defined (ACE_LACKS_PRAGMA_ONCE)
13 #pragma once
14 #endif
15 
16 #include "TransportDefs.h"
17 #include "TransportImpl_rch.h"
18 #include "TransportImpl.h"
19 
20 #include <dds/DCPS/dcps_export.h>
21 #include <dds/DCPS/RcObject.h>
22 #include <dds/DCPS/PoolAllocator.h>
26 #include <dds/DCPS/TimeDuration.h>
27 
28 #include <dds/DdsDcpsInfoUtilsC.h>
29 #include <dds/OpenddsDcpsExtC.h>
30 
31 #include <ace/Synch_Traits.h>
32 
37 
39 
40 namespace OpenDDS {
41 
42 namespace ICE {
43  class Endpoint;
44 }
45 
46 namespace DCPS {
47 
48 /**
49  * @class TransportInst
50  *
51  * @brief Base class to hold configuration settings for TransportImpls.
52  *
53  * Each transport implementation will need to define a concrete
54  * subclass of the TransportInst class. The base
55  * class (TransportInst) contains configuration settings that
56  * are common to all (or most) concrete transport implementations.
57  * The concrete transport implementation defines any configuration
58  * settings that it requires within its concrete subclass of this
59  * TransportInst base class.
60  *
61  * The TransportInst object is supplied to the
62  * TransportImpl::configure() method.
63  */
64 class OpenDDS_Dcps_Export TransportInst : public virtual RcObject {
65 public:
66 
67  static const long DEFAULT_DATALINK_RELEASE_DELAY = 10000;
68  static const size_t DEFAULT_DATALINK_CONTROL_CHUNKS = 32u;
69 
70  const OPENDDS_STRING& name() const { return name_; }
71 
72  /// Overwrite the default configurations with the configuration from the
73  /// given section in the ACE_Configuration_Heap object.
74  virtual int load(ACE_Configuration_Heap& cf,
76 
77  /// Diagnostic aid.
78  void dump() const;
79  virtual OPENDDS_STRING dump_to_str() const;
80 
81  /// Format name of transport configuration parameter for use in
82  /// conjunction with dump(std::ostream& os).
83  static OPENDDS_STRING formatNameForDump(const char* name);
84 
86 
87  /// Number of pre-created link (list) objects per pool for the
88  /// "send queue" of each DataLink.
90 
91  /// Initial number of pre-allocated pools of link (list) objects
92  /// for the "send queue" of each DataLink.
94 
95  /// Max size (in bytes) of a packet (packet header + sample(s))
96  ACE_UINT32 max_packet_size_;
97 
98  /// Max number of samples that should ever be in a single packet.
100 
101  /// Optimum size (in bytes) of a packet (packet header + sample(s)).
103 
104  /// Flag for whether a new thread is needed for connection to
105  /// send without backpressure.
107 
108  /// Delay in milliseconds that the datalink should be released after all
109  /// associations are removed. The default value is 10 seconds.
111 
112  /// The number of chunks used to size allocators for transport control
113  /// samples. The default value is 32.
115 
116  /// Maximum time to store incoming fragments of incomplete data samples.
117  /// The expiration time is relative to the last received fragment.
119 
120  /// Preallocated chunks in allocator for message blocks.
121  /// Default (0) is to use built-in constants in TransportReceiveStrategy
123 
124  /// Preallocated chunks in allocator for data blocks and data buffers.
125  /// Default (0) is to use built-in constants in TransportReceiveStrategy
127 
128  /// Does the transport as configured support RELIABLE_RELIABILITY_QOS?
129  virtual bool is_reliable() const = 0;
130 
131  /// Does the transport require a CDR-encapsulated data payload?
132  virtual bool requires_cdr_encapsulation() const { return false; }
133 
134  /// Populate a transport locator sequence. Return the number of "locators."
135  virtual size_t populate_locator(OpenDDS::DCPS::TransportLocator& trans_info, ConnectionInfoFlags flags) const = 0;
136 
137  DCPS::WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
138  void rtps_relay_only_now(bool flag);
139  void use_rtps_relay_now(bool flag);
140  void use_ice_now(bool flag);
141 
142  virtual void update_locators(const GUID_t& /*remote_id*/,
143  const TransportLocatorSeq& /*locators*/) {}
144 
145  virtual void get_last_recv_locator(const GUID_t& /*remote_id*/,
146  TransportLocator& /*locators*/) {}
147 
148  virtual void rtps_relay_address_change() {}
149 
150  ReactorTask_rch reactor_task();
151  EventDispatcher_rch event_dispatcher();
152 
153  /**
154  * @{
155  * The RtpsUdpSendStrategy can be configured to simulate a lossy
156  * connection by probabilistically not sending RTPS messages. This
157  * capability is enabled by setting the flag to true.
158  *
159  * The coefficients m and b correspond to a linear model whose
160  * argument is the length of the RTPS message. The probablility of
161  * dropping the message is p = m * message_length + b. When sending
162  * a message, a random number is selected from [0.0, 1.0]. If the
163  * random number is less than the drop probability, the message is
164  * dropped.
165  *
166  * The flag and coefficient can be changed dynamically.
167  *
168  * Examples
169  *
170  * m = 0 and b = .5 - the probability of dropping a message is .5
171  * regardless of message length.
172  *
173  * m = .001 and b = .2 - the probability of dropping a 200-byte
174  * message is .4. In this example, all messages longer than 800
175  * bytes will be dropped.
176  */
177  void drop_messages(bool flag)
178  {
179  ACE_GUARD(ACE_Thread_Mutex, g, config_lock_);
180  drop_messages_ = flag;
181  }
182 
183  void drop_messages_m(double m)
184  {
185  ACE_GUARD(ACE_Thread_Mutex, g, config_lock_);
186  drop_messages_m_ = m;
187  }
188 
189  void drop_messages_b(double b)
190  {
191  ACE_GUARD(ACE_Thread_Mutex, g, config_lock_);
192  drop_messages_b_ = b;
193  }
194 
195  bool should_drop(ssize_t length) const;
196 
197  bool should_drop(const iovec iov[], int n, ssize_t& length) const
198  {
199  length = 0;
200  for (int i = 0; i < n; ++i) {
201  length += iov[i].iov_len;
202  }
203  return should_drop(length);
204  }
205  /**@}*/
206 
207  void count_messages(bool flag)
208  {
209  ACE_GUARD(ACE_Thread_Mutex, g, config_lock_);
210  count_messages_ = flag;
211  }
212 
213  bool count_messages() const
214  {
215  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, config_lock_, false);
216  return count_messages_;
217  }
218 
220 
221 protected:
222 
223  TransportInst(const char* type,
224  const OPENDDS_STRING& name);
225 
226  virtual ~TransportInst();
227 
228  void set_port_in_addr_string(OPENDDS_STRING& addr_str, u_short port_number);
229 
232 
233 private:
234 
235  /// Adjust the configuration values which gives warning on adjusted
236  /// value.
237  void adjust_config_value();
238 
239  friend class TransportRegistry;
240  void shutdown();
241 
242  friend class TransportClient;
243  protected:
244  TransportImpl_rch get_or_create_impl();
245  TransportImpl_rch get_impl();
246  private:
247  virtual TransportImpl_rch new_impl() = 0;
248 
250 
252 
256 
258 
260 };
261 
262 } // namespace DCPS
263 } // namespace OpenDDS
264 
266 
267 #if defined(__ACE_INLINE__)
268 #include "TransportInst.inl"
269 #endif /* __ACE_INLINE__ */
270 
271 #endif
#define ACE_BEGIN_VERSIONED_NAMESPACE_DECL
static int load
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void append_transport_statistics(TransportStatisticsSequence &)
#define ACE_SYNCH_MUTEX
ACE_UINT32 optimum_packet_size_
Optimum size (in bytes) of a packet (packet header + sample(s)).
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
const OPENDDS_STRING name_
virtual void get_last_recv_locator(const GUID_t &, TransportLocator &)
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
const OPENDDS_STRING & name() const
Definition: TransportInst.h:70
sequence< TransportLocator > TransportLocatorSeq
int ssize_t
ACE_Thread_Mutex config_lock_
virtual void rtps_relay_address_change()
size_t max_samples_per_packet_
Max number of samples that should ever be in a single packet.
Definition: TransportInst.h:99
#define OPENDDS_STRING
const OPENDDS_STRING transport_type_
Definition: TransportInst.h:85
TimeDuration fragment_reassembly_timeout_
sequence< TransportStatistics > TransportStatisticsSequence
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void count_messages(bool flag)
#define ACE_END_VERSIONED_NAMESPACE_DECL
ACE_UINT32 max_packet_size_
Max size (in bytes) of a packet (packet header + sample(s))
Definition: TransportInst.h:96
const char *const name
Definition: debug.cpp:60
Mix-in class for DDS entities which directly use the transport layer.
virtual bool requires_cdr_encapsulation() const
Does the transport require a CDR-encapsulated data payload?
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int shutdown(ACE_HANDLE handle, int how)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual void update_locators(const GUID_t &, const TransportLocatorSeq &)
size_t ConnectionInfoFlags
bool should_drop(const iovec iov[], int n, ssize_t &length) const