OpenDDS  Snapshot(2023/04/28-20:55)
TransportImpl.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTIMPL_H
7 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTIMPL_H
8 
9 #include "TransportDefs.h"
10 #include "TransportInst_rch.h"
11 #include "TransportInst.h"
12 #include "DataLinkCleanupTask.h"
13 
14 #include <dds/DCPS/dcps_export.h>
15 #include <dds/DCPS/RcObject.h>
16 #include <dds/DCPS/PoolAllocator.h>
17 #include <dds/DCPS/ReactorTask.h>
19 #include <dds/DCPS/PoolAllocator.h>
22 #include <dds/DCPS/AtomicBool.h>
23 
24 #include <dds/OpenddsDcpsExtC.h>
25 #include <dds/DdsDcpsSubscriptionC.h>
26 #include <dds/DdsDcpsPublicationC.h>
27 #ifdef OPENDDS_SECURITY
28 # include <dds/DdsSecurityCoreC.h>
29 #endif
30 
31 #include <ace/Synch_Traits.h>
32 
34 
35 namespace OpenDDS {
36 namespace DCPS {
37 
38 class TransportClient;
39 class TransportReceiveListener;
40 class DataLink;
41 class TransportInst;
42 class Monitor;
43 struct AssociationData;
44 typedef RcHandle<TransportClient> TransportClient_rch;
46 
47 /** The TransportImpl class includes the abstract methods that must be implemented
48 * by any implementation to provide data delivery service to the DCPS implementation.
49 * This includes methods to send data, received data, configure the operation, and
50 * manage associations and datalinks between local and remote objects of the implementation.
51 *
52 * Notes about object ownership:
53 * 1)Has longer lifetime than the publisher and subscriber objects. The publishers
54 * and subscribers are owned by the DomainParticipant and transport factory shutdown
55 * is always after DomainParticipant factory shutdown.
56 * 2)The concrete transport object owns the datalink objects.
57 * 3)Own a DataLinkCleanup object.
58 * 4)Reference to TransportInst object and TransportReactorTask object owned
59 * by TransportRegistry.
60 * 5)During transport shutdown, if this object does not have ownership of an object
61 * but has a references via smart pointer then the reference should be freed;
62 * if this object has ownership of task objects then the tasks should be closed.
63 */
64 class OpenDDS_Dcps_Export TransportImpl : public virtual RcObject {
65 public:
66 
67  virtual ~TransportImpl();
68 
69  /// Remove any pending_release mappings.
70  virtual void unbind_link(DataLink* link);
71 
72  /// Callback from the DataLink to clean up any associated resources.
73  /// This usually is done when the DataLink is lost. The call is made with
74  /// no transport/DCPS locks held.
75  bool release_link_resources(DataLink* link);
76 
77  /// Expose the configuration information so others can see what
78  /// we can do.
79  TransportInst_rch config() const;
80 
81  /// Called by our connection_info() method to allow the concrete
82  /// TransportImpl subclass to do the dirty work since it really
83  /// is the one that knows how to populate the supplied
84  /// TransportLocator object.
85  virtual bool connection_info_i(TransportLocator& local_info, ConnectionInfoFlags flags) const = 0;
86 
87  virtual void register_for_reader(const GUID_t& /*participant*/,
88  const GUID_t& /*writerid*/,
89  const GUID_t& /*readerid*/,
90  const TransportLocatorSeq& /*locators*/,
91  OpenDDS::DCPS::DiscoveryListener* /*listener*/) { }
92 
93  virtual void unregister_for_reader(const GUID_t& /*participant*/,
94  const GUID_t& /*writerid*/,
95  const GUID_t& /*readerid*/) { }
96 
97  virtual void register_for_writer(const GUID_t& /*participant*/,
98  const GUID_t& /*readerid*/,
99  const GUID_t& /*writerid*/,
100  const TransportLocatorSeq& /*locators*/,
101  DiscoveryListener* /*listener*/) { }
102 
103  virtual void unregister_for_writer(const GUID_t& /*participant*/,
104  const GUID_t& /*readerid*/,
105  const GUID_t& /*writerid*/) { }
106 
107  virtual void update_locators(const GUID_t& /*remote*/,
108  const TransportLocatorSeq& /*locators*/) { }
109 
110  virtual void get_last_recv_locator(const GUID_t& /*remote_id*/,
111  TransportLocator& /*locators*/) {}
112 
113  virtual void rtps_relay_address_change() {}
115 
116  /// Interface to the transport's reactor for scheduling timers.
117  ACE_Reactor_Timer_Interface* timer() const;
118 
119  ACE_Reactor* reactor() const;
120  ACE_thread_t reactor_owner() const;
121  bool is_shut_down() const;
122 
123  /// Create the reactor task using sync send or optionally async send
124  /// by parameter on supported Windows platforms only.
125  void create_reactor_task(bool useAsyncSend = false, const OPENDDS_STRING& name = "");
126 
127  /// Diagnostic aid.
128  void dump();
129  OPENDDS_STRING dump_to_str();
130 
131  void report();
132 
136  bool local_reliable_, local_durable_;
138 
140  : local_id_(GUID_UNKNOWN)
141  , priority_(0)
142  , local_reliable_(false)
143  , local_durable_(false)
145  {}
146  };
147 
155  bool reliable_, durable_;
156  };
157 
159  enum Status { ACR_SUCCESS, ACR_FAILED };
160  explicit AcceptConnectResult(Status ok = ACR_FAILED)
161  : success_(ok == ACR_SUCCESS), link_() {}
163  : success_(link), link_(link) {}
164  /// If false, the accept or connect has failed and link_ is ignored.
165  bool success_;
166  /// If success_ is true, link_ may either be null or have a valid DataLink.
167  /// If link_ is null the DataLink is not ready for use, and
168  /// TransportClient::use_datalink() is called later.
170  };
171 
173  virtual void rtps_relay_only_now(bool /*flag*/) {}
174  virtual void use_rtps_relay_now(bool /*flag*/) {}
175  virtual void use_ice_now(bool /*flag*/) {}
176 
177  /// Accessor to obtain a "copy" of the reference to the reactor task.
178  /// Caller is responsible for the "copy" of the reference that is
179  /// returned.
180  ReactorTask_rch reactor_task();
181 
182  EventDispatcher_rch event_dispatcher() { return event_dispatcher_; }
183 
184 protected:
186 
187  bool open();
188 
189  /// connect_datalink() is called from TransportClient to initiate an
190  /// association as the active peer. A DataLink may be returned if
191  /// one is already connected and ready to use, otherwise
192  /// initiate a connection to the passive side and return from this
193  /// method. Upon completion of the physical connection, the
194  /// transport calls back to TransportClient::use_datalink().
195  virtual AcceptConnectResult connect_datalink(const RemoteTransport& remote,
196  const ConnectionAttribs& attribs,
197  const TransportClient_rch& client) = 0;
198 
199  /// accept_datalink() is called from TransportClient to initiate an
200  /// association as the passive peer. A DataLink may be returned if
201  /// one is already connected and ready to use, otherwise
202  /// passively wait for a physical connection from the active
203  /// side (either in the form of a connection event or handshaking
204  /// message). Upon completion of the physical connection, the
205  /// transport calls back to TransportClient::use_datalink().
206  virtual AcceptConnectResult accept_datalink(const RemoteTransport& remote,
207  const ConnectionAttribs& attribs,
208  const TransportClient_rch& client) = 0;
209 
210  /// stop_accepting_or_connecting() is called from TransportClient
211  /// to terminate the accepting process begun by accept_datalink()
212  /// or connect_datalink(). This allows the TransportImpl to clean
213  /// up any resources associated with this pending connection.
214  /// The TransportClient* passed in to accept or connect is not
215  /// valid after this method is called.
216  virtual void stop_accepting_or_connecting(const TransportClient_wrch& client,
217  const GUID_t& remote_id,
218  bool disassociate,
219  bool association_failed) = 0;
220 
221 
222  /// Called during the shutdown() method in order to give the
223  /// concrete TransportImpl subclass a chance to do something when
224  /// the shutdown "event" occurs.
225  virtual void shutdown_i() = 0;
226 
229 
230  /// Lock to protect the pending_connections_ data member
231  mutable LockType pending_connections_lock_;
232 
233  typedef OPENDDS_MULTIMAP(TransportClient_wrch, DataLink_rch) PendConnMap;
234  PendConnMap pending_connections_;
235  void add_pending_connection(const TransportClient_rch& client, DataLink_rch link);
236  void shutdown();
237 
238 private:
239  /// We have a few friends in the transport framework so that they
240  /// can access our private methods. We do this to avoid pollution
241  /// of our public interface with internal framework methods.
242  friend class TransportInst;
243  friend class TransportClient;
244  friend class DataLink;
245  /// Called by the TransportRegistry when this TransportImpl object
246  /// is released while the TransportRegistry is handling a release()
247  /// "event".
248 
249  /// The DataLink itself calls this method when it thinks it is
250  /// no longer used for any associations. This occurs during
251  /// a "remove associations" operation being performed by some
252  /// TransportClient that uses this TransportImpl. The
253  /// TransportClient is known to have acquired our reservation_lock_,
254  /// so there won't be any reserve_datalink() calls being made from
255  /// any other threads while we perform this release.
256  virtual void release_datalink(DataLink* link) = 0;
257 
258  virtual void client_stop(const GUID_t&) {}
259 
260  DataLink* find_connect_i(const GUID_t& local_id,
261  const AssociationData& remote_association,
262  const ConnectionAttribs& attribs,
263  bool active, bool connect);
264 
265 #if defined(OPENDDS_SECURITY)
267 #endif
268 
269 public:
270  /// Called by our friends, the TransportClient, and the DataLink.
271  /// Since this TransportImpl can be attached to many TransportClient
272  /// objects, and each TransportClient object could be "running" in
273  /// a separate thread, we need to protect all of the "reservation"
274  /// methods with a lock. The protocol is that a client of ours
275  /// must "acquire" our reservation_lock_ before it can proceed to
276  /// call any methods that affect the DataLink reservations. It
277  /// should release the reservation_lock_ as soon as it is done.
278  int acquire();
279  int tryacquire();
280  int release();
281  int remove();
282 
283  virtual OPENDDS_STRING transport_type() const = 0;
284 
285  /// Called by our friend, the TransportClient.
286  /// Accessor for the TransportInterfaceInfo. Accepts a reference
287  /// to a TransportInterfaceInfo object that will be "populated"
288  /// with this TransportImpl's connection information (ie, how
289  /// another process would connect to this TransportImpl).
290  bool connection_info(TransportLocator& local_info, ConnectionInfoFlags flags) const;
291 
292  /// Lock to protect the config_ and reactor_task_ data members.
293  mutable LockType lock_;
294 
295  /// A reference to the TransportInst
296  /// object that was supplied to us during our configure() method.
298 
299  /// The reactor (task) object - may not even be used if the concrete
300  /// subclass (of TransportImpl) doesn't require a reactor.
302 
303  struct DoClear : EventBase {
304  explicit DoClear(RcHandle<DataLink> link) : link_(link) {}
306  {
307  DataLink_rch link = link_.lock();
308  if (link) {
309  link->clear_associations();
310  }
311  }
313  };
314 
315  /// smart ptr to the associated DL cleanup task
317 
318  /// Monitor object for this entity
320 
321 protected:
322  /// Id of the last link established.
324 };
325 
326 } // namespace DCPS
327 } // namespace OpenDDS
328 
330 
331 #if defined (__ACE_INLINE__)
332 #include "TransportImpl.inl"
333 #endif /* __ACE_INLINE__ */
334 
335 #endif /* OPENDDS_DCPS_TRANSPORTIMPL_H */
WeakRcHandle< DataLink > link_
virtual void append_transport_statistics(TransportStatisticsSequence &)
EventDispatcher_rch event_dispatcher_
smart ptr to the associated DL cleanup task
#define ACE_SYNCH_MUTEX
void release(T x)
Base class to hold configuration settings for TransportImpls.
Definition: TransportInst.h:64
virtual void rtps_relay_only_now(bool)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual void use_rtps_relay_now(bool)
virtual void use_ice_now(bool)
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
void handle_event()
Called when the event is dispatched by an EventDispatcher.
virtual void get_last_recv_locator(const GUID_t &, TransportLocator &)
virtual void client_stop(const GUID_t &)
#define OPENDDS_MULTIMAP(K, T)
DoClear(RcHandle< DataLink > link)
sequence< TransportLocator > TransportLocatorSeq
virtual void local_crypto_handle(DDS::Security::ParticipantCryptoHandle)
virtual void rtps_relay_address_change()
LockType lock_
Lock to protect the config_ and reactor_task_ data members.
#define OPENDDS_STRING
ACE_Guard< LockType > GuardType
ACE_HANDLE open(const char *filename, int mode, mode_t perms=ACE_DEFAULT_OPEN_PERMS, LPSECURITY_ATTRIBUTES sa=0)
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
Definition: TransportImpl.h:97
WeakRcHandle< TransportClient > TransportClient_wrch
Definition: TransportImpl.h:45
sequence< TransportStatistics > TransportStatisticsSequence
RcHandle< TransportClient > TransportClient_rch
DWORD ACE_thread_t
EventDispatcher_rch event_dispatcher()
ACE_UINT32 ULong
int connect(ACE_HANDLE handle, struct sockaddr *addr, int addrlen)
const char *const name
Definition: debug.cpp:60
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.
Mix-in class for DDS entities which directly use the transport layer.
ACE_CDR::Long Priority
virtual void unregister_for_reader(const GUID_t &, const GUID_t &, const GUID_t &)
Definition: TransportImpl.h:93
virtual void register_for_reader(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, OpenDDS::DCPS::DiscoveryListener *)
Definition: TransportImpl.h:87
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
Definition: MessageTypes.h:50
Sequence number abstraction. Only allows positive 64 bit values.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
WeakRcHandle< TransportInst > config_
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
int shutdown(ACE_HANDLE handle, int how)
bool success_
If false, the accept or connect has failed and link_ is ignored.
DDS::OctetSeq TransportBLOB
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
virtual void update_locators(const GUID_t &, const TransportLocatorSeq &)
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
AtomicBool is_shut_down_
Id of the last link established.
size_t ConnectionInfoFlags