Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTIMPL_H
7 : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTIMPL_H
8 :
9 : #include "TransportDefs.h"
10 : #include "TransportInst_rch.h"
11 : #include "TransportInst.h"
12 : #include "DataLinkCleanupTask.h"
13 :
14 : #include <dds/DCPS/dcps_export.h>
15 : #include <dds/DCPS/RcObject.h>
16 : #include <dds/DCPS/PoolAllocator.h>
17 : #include <dds/DCPS/ReactorTask.h>
18 : #include <dds/DCPS/ReactorTask_rch.h>
19 : #include <dds/DCPS/PoolAllocator.h>
20 : #include <dds/DCPS/DiscoveryListener.h>
21 : #include <dds/DCPS/EventDispatcher.h>
22 : #include <dds/DCPS/AtomicBool.h>
23 :
24 : #include <dds/OpenddsDcpsExtC.h>
25 : #include <dds/DdsDcpsSubscriptionC.h>
26 : #include <dds/DdsDcpsPublicationC.h>
27 : #ifdef OPENDDS_SECURITY
28 : # include <dds/DdsSecurityCoreC.h>
29 : #endif
30 :
31 : #include <ace/Synch_Traits.h>
32 :
33 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
34 :
35 : namespace OpenDDS {
36 : namespace DCPS {
37 :
38 : class TransportClient;
39 : class TransportReceiveListener;
40 : class DataLink;
41 : class TransportInst;
42 : class Monitor;
43 : struct AssociationData;
44 : typedef RcHandle<TransportClient> TransportClient_rch;
45 : typedef WeakRcHandle<TransportClient> TransportClient_wrch;
46 :
47 : /** The TransportImpl class includes the abstract methods that must be implemented
48 : * by any implementation to provide data delivery service to the DCPS implementation.
49 : * This includes methods to send data, received data, configure the operation, and
50 : * manage associations and datalinks between local and remote objects of the implementation.
51 : *
52 : * Notes about object ownership:
53 : * 1)Has longer lifetime than the publisher and subscriber objects. The publishers
54 : * and subscribers are owned by the DomainParticipant and transport factory shutdown
55 : * is always after DomainParticipant factory shutdown.
56 : * 2)The concrete transport object owns the datalink objects.
57 : * 3)Own a DataLinkCleanup object.
58 : * 4)Reference to TransportInst object and TransportReactorTask object owned
59 : * by TransportRegistry.
60 : * 5)During transport shutdown, if this object does not have ownership of an object
61 : * but has a references via smart pointer then the reference should be freed;
62 : * if this object has ownership of task objects then the tasks should be closed.
63 : */
64 : class OpenDDS_Dcps_Export TransportImpl : public virtual RcObject {
65 : public:
66 :
67 : virtual ~TransportImpl();
68 :
69 : /// Remove any pending_release mappings.
70 : virtual void unbind_link(DataLink* link);
71 :
72 : /// Callback from the DataLink to clean up any associated resources.
73 : /// This usually is done when the DataLink is lost. The call is made with
74 : /// no transport/DCPS locks held.
75 : bool release_link_resources(DataLink* link);
76 :
77 : /// Expose the configuration information so others can see what
78 : /// we can do.
79 : TransportInst_rch config() const;
80 :
81 : /// Called by our connection_info() method to allow the concrete
82 : /// TransportImpl subclass to do the dirty work since it really
83 : /// is the one that knows how to populate the supplied
84 : /// TransportLocator object.
85 : virtual bool connection_info_i(TransportLocator& local_info, ConnectionInfoFlags flags) const = 0;
86 :
87 0 : virtual void register_for_reader(const GUID_t& /*participant*/,
88 : const GUID_t& /*writerid*/,
89 : const GUID_t& /*readerid*/,
90 : const TransportLocatorSeq& /*locators*/,
91 0 : OpenDDS::DCPS::DiscoveryListener* /*listener*/) { }
92 :
93 0 : virtual void unregister_for_reader(const GUID_t& /*participant*/,
94 : const GUID_t& /*writerid*/,
95 0 : const GUID_t& /*readerid*/) { }
96 :
97 0 : virtual void register_for_writer(const GUID_t& /*participant*/,
98 : const GUID_t& /*readerid*/,
99 : const GUID_t& /*writerid*/,
100 : const TransportLocatorSeq& /*locators*/,
101 0 : DiscoveryListener* /*listener*/) { }
102 :
103 0 : virtual void unregister_for_writer(const GUID_t& /*participant*/,
104 : const GUID_t& /*readerid*/,
105 0 : const GUID_t& /*writerid*/) { }
106 :
107 0 : virtual void update_locators(const GUID_t& /*remote*/,
108 0 : const TransportLocatorSeq& /*locators*/) { }
109 :
110 0 : virtual void get_last_recv_locator(const GUID_t& /*remote_id*/,
111 0 : TransportLocator& /*locators*/) {}
112 :
113 0 : virtual void rtps_relay_address_change() {}
114 0 : virtual void append_transport_statistics(TransportStatisticsSequence& /*seq*/) {}
115 :
116 : /// Interface to the transport's reactor for scheduling timers.
117 : ACE_Reactor_Timer_Interface* timer() const;
118 :
119 : ACE_Reactor* reactor() const;
120 : ACE_thread_t reactor_owner() const;
121 : bool is_shut_down() const;
122 :
123 : /// Create the reactor task using sync send or optionally async send
124 : /// by parameter on supported Windows platforms only.
125 : void create_reactor_task(bool useAsyncSend = false, const OPENDDS_STRING& name = "");
126 :
127 : /// Diagnostic aid.
128 : void dump();
129 : OPENDDS_STRING dump_to_str();
130 :
131 : void report();
132 :
133 : struct ConnectionAttribs {
134 : GUID_t local_id_;
135 : Priority priority_;
136 : bool local_reliable_, local_durable_;
137 : SequenceNumber max_sn_;
138 :
139 0 : ConnectionAttribs()
140 0 : : local_id_(GUID_UNKNOWN)
141 0 : , priority_(0)
142 0 : , local_reliable_(false)
143 0 : , local_durable_(false)
144 0 : , max_sn_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
145 0 : {}
146 : };
147 :
148 : struct RemoteTransport {
149 : GUID_t repo_id_;
150 : TransportBLOB blob_;
151 : TransportBLOB discovery_blob_;
152 : MonotonicTime_t participant_discovered_at_;
153 : ACE_CDR::ULong context_;
154 : Priority publication_transport_priority_;
155 : bool reliable_, durable_;
156 : };
157 :
158 : struct AcceptConnectResult {
159 : enum Status { ACR_SUCCESS, ACR_FAILED };
160 0 : explicit AcceptConnectResult(Status ok = ACR_FAILED)
161 0 : : success_(ok == ACR_SUCCESS), link_() {}
162 : AcceptConnectResult(const DataLink_rch& link)
163 : : success_(link), link_(link) {}
164 : /// If false, the accept or connect has failed and link_ is ignored.
165 : bool success_;
166 : /// If success_ is true, link_ may either be null or have a valid DataLink.
167 : /// If link_ is null the DataLink is not ready for use, and
168 : /// TransportClient::use_datalink() is called later.
169 : DataLink_rch link_;
170 : };
171 :
172 0 : virtual WeakRcHandle<ICE::Endpoint> get_ice_endpoint() { return WeakRcHandle<ICE::Endpoint>(); }
173 0 : virtual void rtps_relay_only_now(bool /*flag*/) {}
174 0 : virtual void use_rtps_relay_now(bool /*flag*/) {}
175 0 : virtual void use_ice_now(bool /*flag*/) {}
176 :
177 : /// Accessor to obtain a "copy" of the reference to the reactor task.
178 : /// Caller is responsible for the "copy" of the reference that is
179 : /// returned.
180 : ReactorTask_rch reactor_task();
181 :
182 0 : EventDispatcher_rch event_dispatcher() { return event_dispatcher_; }
183 :
184 : protected:
185 : TransportImpl(TransportInst_rch config);
186 :
187 : bool open();
188 :
189 : /// connect_datalink() is called from TransportClient to initiate an
190 : /// association as the active peer. A DataLink may be returned if
191 : /// one is already connected and ready to use, otherwise
192 : /// initiate a connection to the passive side and return from this
193 : /// method. Upon completion of the physical connection, the
194 : /// transport calls back to TransportClient::use_datalink().
195 : virtual AcceptConnectResult connect_datalink(const RemoteTransport& remote,
196 : const ConnectionAttribs& attribs,
197 : const TransportClient_rch& client) = 0;
198 :
199 : /// accept_datalink() is called from TransportClient to initiate an
200 : /// association as the passive peer. A DataLink may be returned if
201 : /// one is already connected and ready to use, otherwise
202 : /// passively wait for a physical connection from the active
203 : /// side (either in the form of a connection event or handshaking
204 : /// message). Upon completion of the physical connection, the
205 : /// transport calls back to TransportClient::use_datalink().
206 : virtual AcceptConnectResult accept_datalink(const RemoteTransport& remote,
207 : const ConnectionAttribs& attribs,
208 : const TransportClient_rch& client) = 0;
209 :
210 : /// stop_accepting_or_connecting() is called from TransportClient
211 : /// to terminate the accepting process begun by accept_datalink()
212 : /// or connect_datalink(). This allows the TransportImpl to clean
213 : /// up any resources associated with this pending connection.
214 : /// The TransportClient* passed in to accept or connect is not
215 : /// valid after this method is called.
216 : virtual void stop_accepting_or_connecting(const TransportClient_wrch& client,
217 : const GUID_t& remote_id,
218 : bool disassociate,
219 : bool association_failed) = 0;
220 :
221 :
222 : /// Called during the shutdown() method in order to give the
223 : /// concrete TransportImpl subclass a chance to do something when
224 : /// the shutdown "event" occurs.
225 : virtual void shutdown_i() = 0;
226 :
227 : typedef ACE_SYNCH_MUTEX LockType;
228 : typedef ACE_Guard<LockType> GuardType;
229 :
230 : /// Lock to protect the pending_connections_ data member
231 : mutable LockType pending_connections_lock_;
232 :
233 : typedef OPENDDS_MULTIMAP(TransportClient_wrch, DataLink_rch) PendConnMap;
234 : PendConnMap pending_connections_;
235 : void add_pending_connection(const TransportClient_rch& client, DataLink_rch link);
236 : void shutdown();
237 :
238 : private:
239 : /// We have a few friends in the transport framework so that they
240 : /// can access our private methods. We do this to avoid pollution
241 : /// of our public interface with internal framework methods.
242 : friend class TransportInst;
243 : friend class TransportClient;
244 : friend class DataLink;
245 : /// Called by the TransportRegistry when this TransportImpl object
246 : /// is released while the TransportRegistry is handling a release()
247 : /// "event".
248 :
249 : /// The DataLink itself calls this method when it thinks it is
250 : /// no longer used for any associations. This occurs during
251 : /// a "remove associations" operation being performed by some
252 : /// TransportClient that uses this TransportImpl. The
253 : /// TransportClient is known to have acquired our reservation_lock_,
254 : /// so there won't be any reserve_datalink() calls being made from
255 : /// any other threads while we perform this release.
256 : virtual void release_datalink(DataLink* link) = 0;
257 :
258 0 : virtual void client_stop(const GUID_t&) {}
259 :
260 : DataLink* find_connect_i(const GUID_t& local_id,
261 : const AssociationData& remote_association,
262 : const ConnectionAttribs& attribs,
263 : bool active, bool connect);
264 :
265 : #if defined(OPENDDS_SECURITY)
266 0 : virtual void local_crypto_handle(DDS::Security::ParticipantCryptoHandle) {}
267 : #endif
268 :
269 : public:
270 : /// Called by our friends, the TransportClient, and the DataLink.
271 : /// Since this TransportImpl can be attached to many TransportClient
272 : /// objects, and each TransportClient object could be "running" in
273 : /// a separate thread, we need to protect all of the "reservation"
274 : /// methods with a lock. The protocol is that a client of ours
275 : /// must "acquire" our reservation_lock_ before it can proceed to
276 : /// call any methods that affect the DataLink reservations. It
277 : /// should release the reservation_lock_ as soon as it is done.
278 : int acquire();
279 : int tryacquire();
280 : int release();
281 : int remove();
282 :
283 : virtual OPENDDS_STRING transport_type() const = 0;
284 :
285 : /// Called by our friend, the TransportClient.
286 : /// Accessor for the TransportInterfaceInfo. Accepts a reference
287 : /// to a TransportInterfaceInfo object that will be "populated"
288 : /// with this TransportImpl's connection information (ie, how
289 : /// another process would connect to this TransportImpl).
290 : bool connection_info(TransportLocator& local_info, ConnectionInfoFlags flags) const;
291 :
292 : /// Lock to protect the config_ and reactor_task_ data members.
293 : mutable LockType lock_;
294 :
295 : /// A reference to the TransportInst
296 : /// object that was supplied to us during our configure() method.
297 : WeakRcHandle<TransportInst> config_;
298 :
299 : /// The reactor (task) object - may not even be used if the concrete
300 : /// subclass (of TransportImpl) doesn't require a reactor.
301 : ReactorTask_rch reactor_task_;
302 :
303 : struct DoClear : EventBase {
304 0 : explicit DoClear(RcHandle<DataLink> link) : link_(link) {}
305 0 : void handle_event()
306 : {
307 0 : DataLink_rch link = link_.lock();
308 0 : if (link) {
309 0 : link->clear_associations();
310 : }
311 0 : }
312 : WeakRcHandle<DataLink> link_;
313 : };
314 :
315 : /// smart ptr to the associated DL cleanup task
316 : EventDispatcher_rch event_dispatcher_;
317 :
318 : /// Monitor object for this entity
319 : unique_ptr<Monitor> monitor_;
320 :
321 : protected:
322 : /// Id of the last link established.
323 : AtomicBool is_shut_down_;
324 : };
325 :
326 : } // namespace DCPS
327 : } // namespace OpenDDS
328 :
329 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
330 :
331 : #if defined (__ACE_INLINE__)
332 : #include "TransportImpl.inl"
333 : #endif /* __ACE_INLINE__ */
334 :
335 : #endif /* OPENDDS_DCPS_TRANSPORTIMPL_H */
|