Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_DATALINK_H
9 : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_DATALINK_H
10 :
11 : #include "dds/DCPS/dcps_export.h"
12 : #include "dds/DCPS/Definitions.h"
13 : #include "dds/DCPS/RcObject.h"
14 : #include "dds/DCPS/PoolAllocator.h"
15 : #include "dds/DCPS/RcEventHandler.h"
16 : #include "ReceiveListenerSetMap.h"
17 : #include "SendResponseListener.h"
18 : #include "TransportDefs.h"
19 : #include "TransportSendStrategy.h"
20 : #include "TransportSendStrategy_rch.h"
21 : #include "TransportStrategy.h"
22 : #include "TransportStrategy_rch.h"
23 : #include "TransportSendControlElement.h"
24 : #include "TransportSendListener.h"
25 : #include "TransportReceiveListener.h"
26 : #include "QueueTaskBase_T.h"
27 : #include "dds/DCPS/ReactorInterceptor.h"
28 : #include "dds/DCPS/TimeTypes.h"
29 :
30 : #include "ace/Event_Handler.h"
31 : #include "ace/Synch_Traits.h"
32 :
33 : #include <utility>
34 :
35 : #include <iosfwd> // For operator<<() diagnostic formatter.
36 :
37 : ACE_BEGIN_VERSIONED_NAMESPACE_DECL
38 : class ACE_SOCK;
39 : ACE_END_VERSIONED_NAMESPACE_DECL
40 :
41 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
42 :
43 : namespace OpenDDS {
44 :
45 : namespace ICE {
46 : class Endpoint;
47 : }
48 :
49 : namespace DCPS {
50 :
51 :
52 : class TransportQueueElement;
53 : class ReceivedDataSample;
54 : class DataSampleElement;
55 : class ThreadPerConnectionSendTask;
56 : class TransportClient;
57 : class TransportImpl;
58 :
59 : typedef OPENDDS_MAP_CMP(GUID_t, DataLinkSet_rch, GUID_tKeyLessThan) DataLinkSetMap;
60 :
61 : typedef WeakRcHandle<TransportSendListener> TransportSendListener_wrch;
62 :
63 : /**
64 : * This class manages the reservations based on the associations between datareader
65 : * and datawriter. It basically delegate the samples to send strategy for sending and
66 : * deliver the samples received by receive strategy to the listener.
67 : *
68 : * Notes about object ownership:
69 : * 1) Own the send strategy object and receive strategy object.
70 : * 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection
71 : * is enabled.
72 : */
73 : class OpenDDS_Dcps_Export DataLink
74 : : public virtual RcEventHandler {
75 :
76 : friend class DataLinkCleanupTask;
77 :
78 : public:
79 :
80 : enum ConnectionNotice {
81 : DISCONNECTED,
82 : RECONNECTED,
83 : LOST
84 : };
85 :
86 : /// A DataLink object is always created by a TransportImpl object.
87 : /// Thus, the TransportImpl object passed-in here is the object that
88 : /// created this DataLink. The ability to specify a priority
89 : /// for individual links is included for construction so its
90 : /// value can be available for activating any threads.
91 : DataLink(const TransportImpl_rch& impl, Priority priority, bool is_loopback, bool is_active);
92 : virtual ~DataLink();
93 :
94 : /// Reactor invokes this after being notified in schedule_stop or cancel_release
95 : int handle_exception(ACE_HANDLE /* fd */);
96 :
97 : /// Allows DataLink::stop to be done on the reactor thread so that
98 : /// this thread avoids possibly deadlocking trying to access reactor
99 : /// to stop strategies or schedule timers
100 : void schedule_stop(const MonotonicTimePoint& schedule_to_stop_at);
101 : /// The stop method is used to stop the DataLink prior to shutdown.
102 : void stop();
103 :
104 : /// The resume_send is used in the case of reconnection
105 : /// on the subscriber's side.
106 : void resume_send();
107 :
108 : /// Only called by our TransportImpl object.
109 : ///
110 : /// Return Codes: 0 means successful reservation made.
111 : /// -1 means failure.
112 : virtual int make_reservation(const GUID_t& remote_subscription_id,
113 : const GUID_t& local_publication_id,
114 : const TransportSendListener_wrch& send_listener,
115 : bool reliable);
116 :
117 : /// Only called by our TransportImpl object.
118 : ///
119 : /// Return Codes: 0 means successful reservation made.
120 : /// -1 means failure.
121 : virtual int make_reservation(const GUID_t& remote_publication_id,
122 : const GUID_t& local_subscription_id,
123 : const TransportReceiveListener_wrch& receive_listener,
124 : bool reliable);
125 :
126 : // ciju: Called by LinkSet with locks held
127 : /// This will release reservations that were made by one of the
128 : /// make_reservation() methods. All we know is that the supplied
129 : /// GUID_t is considered to be a remote id. It could be a
130 : /// remote subscriber or a remote publisher.
131 : void release_reservations(GUID_t remote_id,
132 : GUID_t local_id,
133 : DataLinkSetMap& released_locals);
134 :
135 : void schedule_delayed_release();
136 :
137 : const TimeDuration& datalink_release_delay() const;
138 :
139 : /// Either send or receive listener for this local_id should be
140 : /// removed from internal DataLink structures so it no longer
141 : /// receives events.
142 : void remove_listener(const GUID_t& local_id);
143 :
144 : // ciju: Called by LinkSet with locks held
145 : /// Called by the TransportClient objects that reference this
146 : /// DataLink. Used by the TransportClient to send a sample,
147 : /// or to send a control message. These functions either give the
148 : /// request to the PerThreadConnectionSendTask when thread_per_connection
149 : /// configuration is true or just simply delegate to the send strategy.
150 : void send_start();
151 : void send(TransportQueueElement* element);
152 : void send_stop(GUID_t repoId);
153 :
154 : // ciju: Called by LinkSet with locks held
155 : /// This method is essentially an "undo_send()" method. It's goal
156 : /// is to remove all traces of the sample from this DataLink (if
157 : /// the sample is even known to the DataLink).
158 : virtual RemoveResult remove_sample(const DataSampleElement* sample);
159 :
160 : // ciju: Called by LinkSet with locks held
161 : virtual void remove_all_msgs(const GUID_t& pub_id);
162 :
163 : /// This is called by our TransportReceiveStrategy object when it
164 : /// has received a complete data sample. This method will cause
165 : /// the appropriate TransportReceiveListener objects to be told
166 : /// that data_received().
167 : /// If readerId is not GUID_UNKNOWN, only the TransportReceiveListener
168 : /// with that ID (if one exists) will receive the data.
169 : int data_received(ReceivedDataSample& sample,
170 : const GUID_t& readerId = GUID_UNKNOWN);
171 :
172 : /// Varation of data_received() that allows for excluding a subset of readers
173 : /// by specifying which readers specifically should receive.
174 : /// Any reader ID that does not appear in the include set will be skipped.
175 : void data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl);
176 :
177 : /// Obtain a unique identifier for this DataLink object.
178 : DataLinkIdType id() const;
179 :
180 : /// Our TransportImpl will inform us if it is being shutdown()
181 : /// by calling this method.
182 : void transport_shutdown();
183 :
184 : /// Notify the datawriters and datareaders that the connection is
185 : /// disconnected, lost, or reconnected. The datareader/datawriter
186 : /// will notify the corresponding listener.
187 : void notify(ConnectionNotice notice);
188 :
189 : /// Called before release the datalink or before shutdown to let
190 : /// the concrete DataLink to do anything necessary.
191 : virtual void pre_stop_i();
192 :
193 : // Call-back from the concrete transport object.
194 : // The connection has been broken. No locks are being held.
195 : // Take a snapshot of current associations which will be removed
196 : // by DataLinkCleanupTask.
197 : void release_resources();
198 :
199 : // Used by to inform the send strategy to clear all unsent samples upon
200 : // backpressure timed out.
201 : void terminate_send();
202 : void terminate_send_if_suspended();
203 :
204 : /// This is called on publisher side to see if this link communicates
205 : /// with the provided sub or by the subscriber side to see if this link
206 : /// communicates with the provided pub
207 : bool is_target(const GUID_t& remote_id);
208 :
209 : /// This is called by DataLinkCleanupTask thread to remove the associations
210 : /// based on the snapshot in release_resources().
211 : void clear_associations();
212 :
213 : int handle_timeout(const ACE_Time_Value& tv, const void* arg);
214 : int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m);
215 :
216 : // Set the DiffServ codepoint of the socket. This is a stateless
217 : // method and is here only because this is a convenient common
218 : // location that can be reached by client code that needs to
219 : // perform this behavior.
220 : void set_dscp_codepoint(int cp, ACE_SOCK& socket);
221 :
222 : /// Accessors for the TRANSPORT_PRIORITY value associated with
223 : /// this link.
224 : Priority& transport_priority();
225 : Priority transport_priority() const;
226 :
227 : bool& is_loopback();
228 : bool is_loopback() const;
229 :
230 : bool& is_active();
231 : bool is_active() const;
232 :
233 : bool cancel_release();
234 :
235 : /// This allows a subclass to easily create a transport control
236 : /// sample to send via send_control.
237 : ACE_Message_Block* create_control(char submessage_id,
238 : DataSampleHeader& header,
239 : Message_Block_Ptr data);
240 :
241 : /// This allows a subclass to send transport control samples over
242 : /// this DataLink. This is useful for sending transport-specific
243 : /// control messages between one or more endpoints under this
244 : /// DataLink's control.
245 : SendControlStatus send_control(const DataSampleHeader& header, Message_Block_Ptr data);
246 :
247 : /// For a given publication "pub_id", store the total number of corresponding
248 : /// subscriptions in "n_subs" and given a set of subscriptions
249 : /// (the "in" sequence), return the subset of the input set "in" which are
250 : /// targets of this DataLink (see is_target()).
251 : GUIDSeq* target_intersection(const GUID_t& pub_id, const GUIDSeq& in, size_t& n_subs);
252 :
253 : TransportImpl_rch impl() const;
254 :
255 : void default_listener(const TransportReceiveListener_wrch& trl);
256 : TransportReceiveListener_wrch default_listener() const;
257 :
258 : typedef WeakRcHandle<TransportClient> TransportClient_wrch;
259 : typedef std::pair<TransportClient_wrch, GUID_t> OnStartCallback;
260 :
261 : bool add_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote);
262 : void remove_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote);
263 : void invoke_on_start_callbacks(bool success);
264 : bool invoke_on_start_callbacks(const GUID_t& local, const GUID_t& remote, bool success);
265 : void remove_startup_callbacks(const GUID_t& local, const GUID_t& remote);
266 :
267 : class Interceptor : public ReactorInterceptor {
268 : public:
269 0 : Interceptor(ACE_Reactor* reactor, ACE_thread_t owner) : ReactorInterceptor(reactor, owner) {}
270 : bool reactor_is_shut_down() const;
271 : };
272 :
273 : class ImmediateStart : public virtual ReactorInterceptor::Command {
274 : public:
275 0 : ImmediateStart(RcHandle<DataLink> link, WeakRcHandle<TransportClient> client, const GUID_t& remote) : link_(link), client_(client), remote_(remote) {}
276 : void execute();
277 : private:
278 : RcHandle<DataLink> link_;
279 : WeakRcHandle<TransportClient> client_;
280 : GUID_t remote_;
281 : };
282 :
283 : void set_scheduling_release(bool scheduling_release);
284 :
285 : virtual void send_final_acks (const GUID_t& readerid);
286 :
287 0 : virtual WeakRcHandle<ICE::Endpoint> get_ice_endpoint() const { return WeakRcHandle<ICE::Endpoint>(); }
288 :
289 0 : virtual bool is_leading(const GUID_t& /*writer*/,
290 0 : const GUID_t& /*reader*/) const { return false; }
291 :
292 :
293 : protected:
294 :
295 : /// This is how the subclass "announces" to this DataLink base class
296 : /// that this DataLink has now been "connected" and should start
297 : /// the supplied strategy objects. This start method is also
298 : /// going to keep a "copy" of the references to the strategy objects.
299 : /// Also note that it is acceptable to pass-in a NULL (0)
300 : /// TransportReceiveStrategy*, but it is assumed that the
301 : /// TransportSendStrategy* argument is not NULL.
302 : ///
303 : /// If the start() method fails to start either strategy, then a -1
304 : /// is returned. Otherwise, a 0 is returned. In the failure case,
305 : /// if one of the strategy objects was started successfully, then
306 : /// it will be stopped before the start() method returns -1.
307 : int start(const TransportSendStrategy_rch& send_strategy,
308 : const TransportStrategy_rch& receive_strategy,
309 : bool invoke_all = true);
310 :
311 : /// This announces the "stop" event to our subclass. The "stop"
312 : /// event will occur when this DataLink is handling a
313 : /// release_reservations() call and determines that it has just
314 : /// released all of the remaining reservations on this DataLink.
315 : /// The "stop" event will also occur when the TransportImpl
316 : /// is being shutdown() - we call stop_i() from our
317 : /// transport_shutdown() method to handle this case.
318 : virtual void stop_i();
319 :
320 : /// Used to provide unique Ids to all DataLink methods.
321 : static ACE_UINT64 get_next_datalink_id();
322 :
323 : /// The transport receive strategy object for this DataLink.
324 : TransportStrategy_rch receive_strategy_;
325 :
326 : friend class ThreadPerConnectionSendTask;
327 :
328 : /// The implementation of the functions that accomplish the
329 : /// sample or control message delivery. They just simply
330 : /// delegate to the send strategy.
331 : void send_start_i();
332 : virtual void send_i(TransportQueueElement* element, bool relink = true);
333 : void send_stop_i(GUID_t repoId);
334 :
335 : /// For a given local GUID_t (publication or subscription), return the list
336 : /// of remote peer GUID_ts (subscriptions or publications) that this link
337 : /// knows about due to make_reservation().
338 : GUIDSeq* peer_ids(const GUID_t& local_id) const;
339 :
340 : void network_change() const;
341 :
342 : void replay_durable_data(const GUID_t& local_pub_id, const GUID_t& remote_sub_id) const;
343 :
344 : private:
345 :
346 : /// Helper function to output the enum as a string to help debugging.
347 : const char* connection_notice_as_str(ConnectionNotice notice);
348 :
349 : TransportSendListener_rch send_listener_for(const GUID_t& pub_id) const;
350 : TransportReceiveListener_rch recv_listener_for(const GUID_t& sub_id) const;
351 :
352 : /// Save current sub and pub association maps for releasing and create
353 : /// empty maps for new associations.
354 : void prepare_release();
355 :
356 : virtual bool handle_send_request_ack(TransportQueueElement* element);
357 :
358 : /// Allow derived classes to provide an alternate "customized" queue element
359 : /// for this DataLink (not shared with other links in the DataLinkSet).
360 0 : virtual TransportQueueElement* customize_queue_element(
361 : TransportQueueElement* element)
362 : {
363 0 : return element;
364 : }
365 :
366 0 : virtual void release_remote_i(const GUID_t& /*remote_id*/) {}
367 0 : virtual void release_reservations_i(const GUID_t& /*remote_id*/,
368 0 : const GUID_t& /*local_id*/) {}
369 :
370 : void data_received_i(ReceivedDataSample& sample,
371 : const GUID_t& readerId,
372 : const RepoIdSet& incl_excl,
373 : ReceiveListenerSet::ConstrainReceiveSet constrain);
374 :
375 : void notify_reactor();
376 :
377 : typedef ACE_SYNCH_MUTEX LockType;
378 :
379 : /// Convenience function for diagnostic information.
380 : #ifndef OPENDDS_SAFETY_PROFILE
381 : friend OpenDDS_Dcps_Export
382 : std::ostream& operator<<(std::ostream& str, const DataLink& value);
383 : #endif
384 :
385 : /// A boolean indicating if the DataLink has been stopped. This
386 : /// value is protected by the strategy_lock_.
387 : bool stopped_;
388 : MonotonicTimePoint scheduled_to_stop_at_;
389 :
390 : /// Map publication Id value to TransportSendListener.
391 : typedef OPENDDS_MAP_CMP(GUID_t, TransportSendListener_wrch, GUID_tKeyLessThan) IdToSendListenerMap;
392 : IdToSendListenerMap send_listeners_;
393 :
394 : /// Map subscription Id value to TransportReceieveListener.
395 : typedef OPENDDS_MAP_CMP(GUID_t, TransportReceiveListener_wrch, GUID_tKeyLessThan) IdToRecvListenerMap;
396 : IdToRecvListenerMap recv_listeners_;
397 :
398 : /// If default_listener_ is not null and this DataLink receives a sample
399 : /// from a publication GUID that's not in pub_map_, it will call
400 : /// data_received() on the default_listener_.
401 : TransportReceiveListener_wrch default_listener_;
402 :
403 : mutable LockType pub_sub_maps_lock_;
404 :
405 : typedef OPENDDS_MAP_CMP(GUID_t, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote;
406 : AssocByRemote assoc_by_remote_;
407 :
408 : struct LocalAssociationInfo {
409 : bool reliable_;
410 : RepoIdSet associated_;
411 : };
412 :
413 : typedef OPENDDS_MAP_CMP(GUID_t, LocalAssociationInfo, GUID_tKeyLessThan) AssocByLocal;
414 : AssocByLocal assoc_by_local_;
415 :
416 : /// A weak rchandle to the TransportImpl that created this DataLink.
417 : WeakRcHandle<TransportImpl> impl_;
418 :
419 : /// The id for this DataLink
420 : ACE_UINT64 id_;
421 :
422 : /// The task used to do the sending. This ThreadPerConnectionSendTask
423 : /// object is created when the thread_per_connection configuration is
424 : /// true. It only dedicate to this datalink.
425 : unique_ptr<ThreadPerConnectionSendTask> thr_per_con_send_task_;
426 :
427 : // snapshot of associations when the release_resources() is called.
428 : AssocByLocal assoc_releasing_;
429 :
430 : /// TRANSPORT_PRIORITY value associated with the link.
431 : Priority transport_priority_;
432 :
433 : bool scheduling_release_;
434 :
435 : protected:
436 :
437 : typedef ACE_Guard<LockType> GuardType;
438 :
439 : /// The transport send strategy object for this DataLink.
440 : TransportSendStrategy_rch send_strategy_;
441 : LockType strategy_lock_;
442 :
443 : TransportSendStrategy_rch get_send_strategy();
444 :
445 : typedef OPENDDS_MAP_CMP(GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap;
446 : typedef OPENDDS_MAP_CMP(GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap;
447 : OnStartCallbackMap on_start_callbacks_;
448 : typedef OPENDDS_MAP_CMP(GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap;
449 : PendingOnStartsMap pending_on_starts_;
450 :
451 : /// Configurable delay in milliseconds that the datalink
452 : /// should be released after all associations are removed.
453 : TimeDuration datalink_release_delay_;
454 :
455 : /// Allocators for data and message blocks used by transport
456 : /// control samples when send_control is called.
457 : unique_ptr<MessageBlockAllocator> mb_allocator_;
458 : unique_ptr<DataBlockAllocator> db_allocator_;
459 :
460 : /// Is remote attached to same transport ?
461 : bool is_loopback_;
462 : /// Is pub or sub ?
463 : bool is_active_;
464 : bool started_;
465 :
466 : /// Listener for TransportSendControlElements created in send_control
467 : SendResponseListener send_response_listener_;
468 :
469 : Interceptor interceptor_;
470 : };
471 :
472 : } // namespace DCPS
473 : } // namespace OpenDDS
474 :
475 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
476 :
477 : #if defined (__ACE_INLINE__)
478 : #include "DataLink.inl"
479 : #endif /* __ACE_INLINE__ */
480 :
481 : #endif /* OPENDDS_DCPS_DATALINK_H */
|