OpenDDS  Snapshot(2023/04/28-20:55)
DataLink.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_DATALINK_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_DATALINK_H
10 
11 #include "dds/DCPS/dcps_export.h"
12 #include "dds/DCPS/Definitions.h"
13 #include "dds/DCPS/RcObject.h"
14 #include "dds/DCPS/PoolAllocator.h"
16 #include "ReceiveListenerSetMap.h"
17 #include "SendResponseListener.h"
18 #include "TransportDefs.h"
19 #include "TransportSendStrategy.h"
21 #include "TransportStrategy.h"
22 #include "TransportStrategy_rch.h"
24 #include "TransportSendListener.h"
26 #include "QueueTaskBase_T.h"
28 #include "dds/DCPS/TimeTypes.h"
29 
30 #include "ace/Event_Handler.h"
31 #include "ace/Synch_Traits.h"
32 
33 #include <utility>
34 
35 #include <iosfwd> // For operator<<() diagnostic formatter.
36 
38 class ACE_SOCK;
40 
42 
43 namespace OpenDDS {
44 
45 namespace ICE {
46  class Endpoint;
47 }
48 
49 namespace DCPS {
50 
51 
52 class TransportQueueElement;
53 class ReceivedDataSample;
54 class DataSampleElement;
55 class ThreadPerConnectionSendTask;
56 class TransportClient;
57 class TransportImpl;
58 
59 typedef OPENDDS_MAP_CMP(GUID_t, DataLinkSet_rch, GUID_tKeyLessThan) DataLinkSetMap;
60 
62 
63 /**
64  * This class manages the reservations based on the associations between datareader
65  * and datawriter. It basically delegate the samples to send strategy for sending and
66  * deliver the samples received by receive strategy to the listener.
67  *
68  * Notes about object ownership:
69  * 1) Own the send strategy object and receive strategy object.
70  * 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection
71  * is enabled.
72  */
74 : public virtual RcEventHandler {
75 
76  friend class DataLinkCleanupTask;
77 
78 public:
79 
83  LOST
84  };
85 
86  /// A DataLink object is always created by a TransportImpl object.
87  /// Thus, the TransportImpl object passed-in here is the object that
88  /// created this DataLink. The ability to specify a priority
89  /// for individual links is included for construction so its
90  /// value can be available for activating any threads.
91  DataLink(const TransportImpl_rch& impl, Priority priority, bool is_loopback, bool is_active);
92  virtual ~DataLink();
93 
94  /// Reactor invokes this after being notified in schedule_stop or cancel_release
95  int handle_exception(ACE_HANDLE /* fd */);
96 
97  /// Allows DataLink::stop to be done on the reactor thread so that
98  /// this thread avoids possibly deadlocking trying to access reactor
99  /// to stop strategies or schedule timers
100  void schedule_stop(const MonotonicTimePoint& schedule_to_stop_at);
101  /// The stop method is used to stop the DataLink prior to shutdown.
102  void stop();
103 
104  /// The resume_send is used in the case of reconnection
105  /// on the subscriber's side.
106  void resume_send();
107 
108  /// Only called by our TransportImpl object.
109  ///
110  /// Return Codes: 0 means successful reservation made.
111  /// -1 means failure.
112  virtual int make_reservation(const GUID_t& remote_subscription_id,
113  const GUID_t& local_publication_id,
114  const TransportSendListener_wrch& send_listener,
115  bool reliable);
116 
117  /// Only called by our TransportImpl object.
118  ///
119  /// Return Codes: 0 means successful reservation made.
120  /// -1 means failure.
121  virtual int make_reservation(const GUID_t& remote_publication_id,
122  const GUID_t& local_subscription_id,
123  const TransportReceiveListener_wrch& receive_listener,
124  bool reliable);
125 
126  // ciju: Called by LinkSet with locks held
127  /// This will release reservations that were made by one of the
128  /// make_reservation() methods. All we know is that the supplied
129  /// GUID_t is considered to be a remote id. It could be a
130  /// remote subscriber or a remote publisher.
131  void release_reservations(GUID_t remote_id,
132  GUID_t local_id,
133  DataLinkSetMap& released_locals);
134 
135  void schedule_delayed_release();
136 
137  const TimeDuration& datalink_release_delay() const;
138 
139  /// Either send or receive listener for this local_id should be
140  /// removed from internal DataLink structures so it no longer
141  /// receives events.
142  void remove_listener(const GUID_t& local_id);
143 
144  // ciju: Called by LinkSet with locks held
145  /// Called by the TransportClient objects that reference this
146  /// DataLink. Used by the TransportClient to send a sample,
147  /// or to send a control message. These functions either give the
148  /// request to the PerThreadConnectionSendTask when thread_per_connection
149  /// configuration is true or just simply delegate to the send strategy.
150  void send_start();
151  void send(TransportQueueElement* element);
152  void send_stop(GUID_t repoId);
153 
154  // ciju: Called by LinkSet with locks held
155  /// This method is essentially an "undo_send()" method. It's goal
156  /// is to remove all traces of the sample from this DataLink (if
157  /// the sample is even known to the DataLink).
158  virtual RemoveResult remove_sample(const DataSampleElement* sample);
159 
160  // ciju: Called by LinkSet with locks held
161  virtual void remove_all_msgs(const GUID_t& pub_id);
162 
163  /// This is called by our TransportReceiveStrategy object when it
164  /// has received a complete data sample. This method will cause
165  /// the appropriate TransportReceiveListener objects to be told
166  /// that data_received().
167  /// If readerId is not GUID_UNKNOWN, only the TransportReceiveListener
168  /// with that ID (if one exists) will receive the data.
169  int data_received(ReceivedDataSample& sample,
170  const GUID_t& readerId = GUID_UNKNOWN);
171 
172  /// Varation of data_received() that allows for excluding a subset of readers
173  /// by specifying which readers specifically should receive.
174  /// Any reader ID that does not appear in the include set will be skipped.
175  void data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl);
176 
177  /// Obtain a unique identifier for this DataLink object.
178  DataLinkIdType id() const;
179 
180  /// Our TransportImpl will inform us if it is being shutdown()
181  /// by calling this method.
182  void transport_shutdown();
183 
184  /// Notify the datawriters and datareaders that the connection is
185  /// disconnected, lost, or reconnected. The datareader/datawriter
186  /// will notify the corresponding listener.
187  void notify(ConnectionNotice notice);
188 
189  /// Called before release the datalink or before shutdown to let
190  /// the concrete DataLink to do anything necessary.
191  virtual void pre_stop_i();
192 
193  // Call-back from the concrete transport object.
194  // The connection has been broken. No locks are being held.
195  // Take a snapshot of current associations which will be removed
196  // by DataLinkCleanupTask.
197  void release_resources();
198 
199  // Used by to inform the send strategy to clear all unsent samples upon
200  // backpressure timed out.
201  void terminate_send();
202  void terminate_send_if_suspended();
203 
204  /// This is called on publisher side to see if this link communicates
205  /// with the provided sub or by the subscriber side to see if this link
206  /// communicates with the provided pub
207  bool is_target(const GUID_t& remote_id);
208 
209  /// This is called by DataLinkCleanupTask thread to remove the associations
210  /// based on the snapshot in release_resources().
211  void clear_associations();
212 
213  int handle_timeout(const ACE_Time_Value& tv, const void* arg);
214  int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m);
215 
216  // Set the DiffServ codepoint of the socket. This is a stateless
217  // method and is here only because this is a convenient common
218  // location that can be reached by client code that needs to
219  // perform this behavior.
220  void set_dscp_codepoint(int cp, ACE_SOCK& socket);
221 
222  /// Accessors for the TRANSPORT_PRIORITY value associated with
223  /// this link.
224  Priority& transport_priority();
225  Priority transport_priority() const;
226 
227  bool& is_loopback();
228  bool is_loopback() const;
229 
230  bool& is_active();
231  bool is_active() const;
232 
233  bool cancel_release();
234 
235  /// This allows a subclass to easily create a transport control
236  /// sample to send via send_control.
237  ACE_Message_Block* create_control(char submessage_id,
239  Message_Block_Ptr data);
240 
241  /// This allows a subclass to send transport control samples over
242  /// this DataLink. This is useful for sending transport-specific
243  /// control messages between one or more endpoints under this
244  /// DataLink's control.
245  SendControlStatus send_control(const DataSampleHeader& header, Message_Block_Ptr data);
246 
247  /// For a given publication "pub_id", store the total number of corresponding
248  /// subscriptions in "n_subs" and given a set of subscriptions
249  /// (the "in" sequence), return the subset of the input set "in" which are
250  /// targets of this DataLink (see is_target()).
251  GUIDSeq* target_intersection(const GUID_t& pub_id, const GUIDSeq& in, size_t& n_subs);
252 
253  TransportImpl_rch impl() const;
254 
255  void default_listener(const TransportReceiveListener_wrch& trl);
256  TransportReceiveListener_wrch default_listener() const;
257 
259  typedef std::pair<TransportClient_wrch, GUID_t> OnStartCallback;
260 
261  bool add_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote);
262  void remove_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote);
263  void invoke_on_start_callbacks(bool success);
264  bool invoke_on_start_callbacks(const GUID_t& local, const GUID_t& remote, bool success);
265  void remove_startup_callbacks(const GUID_t& local, const GUID_t& remote);
266 
268  public:
269  Interceptor(ACE_Reactor* reactor, ACE_thread_t owner) : ReactorInterceptor(reactor, owner) {}
270  bool reactor_is_shut_down() const;
271  };
272 
274  public:
275  ImmediateStart(RcHandle<DataLink> link, WeakRcHandle<TransportClient> client, const GUID_t& remote) : link_(link), client_(client), remote_(remote) {}
276  void execute();
277  private:
281  };
282 
283  void set_scheduling_release(bool scheduling_release);
284 
285  virtual void send_final_acks (const GUID_t& readerid);
286 
288 
289  virtual bool is_leading(const GUID_t& /*writer*/,
290  const GUID_t& /*reader*/) const { return false; }
291 
292 
293 protected:
294 
295  /// This is how the subclass "announces" to this DataLink base class
296  /// that this DataLink has now been "connected" and should start
297  /// the supplied strategy objects. This start method is also
298  /// going to keep a "copy" of the references to the strategy objects.
299  /// Also note that it is acceptable to pass-in a NULL (0)
300  /// TransportReceiveStrategy*, but it is assumed that the
301  /// TransportSendStrategy* argument is not NULL.
302  ///
303  /// If the start() method fails to start either strategy, then a -1
304  /// is returned. Otherwise, a 0 is returned. In the failure case,
305  /// if one of the strategy objects was started successfully, then
306  /// it will be stopped before the start() method returns -1.
307  int start(const TransportSendStrategy_rch& send_strategy,
308  const TransportStrategy_rch& receive_strategy,
309  bool invoke_all = true);
310 
311  /// This announces the "stop" event to our subclass. The "stop"
312  /// event will occur when this DataLink is handling a
313  /// release_reservations() call and determines that it has just
314  /// released all of the remaining reservations on this DataLink.
315  /// The "stop" event will also occur when the TransportImpl
316  /// is being shutdown() - we call stop_i() from our
317  /// transport_shutdown() method to handle this case.
318  virtual void stop_i();
319 
320  /// Used to provide unique Ids to all DataLink methods.
321  static ACE_UINT64 get_next_datalink_id();
322 
323  /// The transport receive strategy object for this DataLink.
325 
327 
328  /// The implementation of the functions that accomplish the
329  /// sample or control message delivery. They just simply
330  /// delegate to the send strategy.
331  void send_start_i();
332  virtual void send_i(TransportQueueElement* element, bool relink = true);
333  void send_stop_i(GUID_t repoId);
334 
335  /// For a given local GUID_t (publication or subscription), return the list
336  /// of remote peer GUID_ts (subscriptions or publications) that this link
337  /// knows about due to make_reservation().
338  GUIDSeq* peer_ids(const GUID_t& local_id) const;
339 
340  void network_change() const;
341 
342  void replay_durable_data(const GUID_t& local_pub_id, const GUID_t& remote_sub_id) const;
343 
344 private:
345 
346  /// Helper function to output the enum as a string to help debugging.
347  const char* connection_notice_as_str(ConnectionNotice notice);
348 
349  TransportSendListener_rch send_listener_for(const GUID_t& pub_id) const;
350  TransportReceiveListener_rch recv_listener_for(const GUID_t& sub_id) const;
351 
352  /// Save current sub and pub association maps for releasing and create
353  /// empty maps for new associations.
354  void prepare_release();
355 
356  virtual bool handle_send_request_ack(TransportQueueElement* element);
357 
358  /// Allow derived classes to provide an alternate "customized" queue element
359  /// for this DataLink (not shared with other links in the DataLinkSet).
361  TransportQueueElement* element)
362  {
363  return element;
364  }
365 
366  virtual void release_remote_i(const GUID_t& /*remote_id*/) {}
367  virtual void release_reservations_i(const GUID_t& /*remote_id*/,
368  const GUID_t& /*local_id*/) {}
369 
370  void data_received_i(ReceivedDataSample& sample,
371  const GUID_t& readerId,
372  const RepoIdSet& incl_excl,
374 
375  void notify_reactor();
376 
378 
379  /// Convenience function for diagnostic information.
380 #ifndef OPENDDS_SAFETY_PROFILE
381  friend OpenDDS_Dcps_Export
382  std::ostream& operator<<(std::ostream& str, const DataLink& value);
383 #endif
384 
385  /// A boolean indicating if the DataLink has been stopped. This
386  /// value is protected by the strategy_lock_.
387  bool stopped_;
389 
390  /// Map publication Id value to TransportSendListener.
391  typedef OPENDDS_MAP_CMP(GUID_t, TransportSendListener_wrch, GUID_tKeyLessThan) IdToSendListenerMap;
392  IdToSendListenerMap send_listeners_;
393 
394  /// Map subscription Id value to TransportReceieveListener.
396  IdToRecvListenerMap recv_listeners_;
397 
398  /// If default_listener_ is not null and this DataLink receives a sample
399  /// from a publication GUID that's not in pub_map_, it will call
400  /// data_received() on the default_listener_.
402 
403  mutable LockType pub_sub_maps_lock_;
404 
406  AssocByRemote assoc_by_remote_;
407 
409  bool reliable_;
411  };
412 
414  AssocByLocal assoc_by_local_;
415 
416  /// A weak rchandle to the TransportImpl that created this DataLink.
418 
419  /// The id for this DataLink
421 
422  /// The task used to do the sending. This ThreadPerConnectionSendTask
423  /// object is created when the thread_per_connection configuration is
424  /// true. It only dedicate to this datalink.
426 
427  // snapshot of associations when the release_resources() is called.
428  AssocByLocal assoc_releasing_;
429 
430  /// TRANSPORT_PRIORITY value associated with the link.
432 
434 
435 protected:
436 
438 
439  /// The transport send strategy object for this DataLink.
441  LockType strategy_lock_;
442 
443  TransportSendStrategy_rch get_send_strategy();
444 
445  typedef OPENDDS_MAP_CMP(GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap;
446  typedef OPENDDS_MAP_CMP(GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap;
447  OnStartCallbackMap on_start_callbacks_;
448  typedef OPENDDS_MAP_CMP(GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap;
449  PendingOnStartsMap pending_on_starts_;
450 
451  /// Configurable delay in milliseconds that the datalink
452  /// should be released after all associations are removed.
454 
455  /// Allocators for data and message blocks used by transport
456  /// control samples when send_control is called.
459 
460  /// Is remote attached to same transport ?
462  /// Is pub or sub ?
464  bool started_;
465 
466  /// Listener for TransportSendControlElements created in send_control
468 
470 };
471 
472 } // namespace DCPS
473 } // namespace OpenDDS
474 
476 
477 #if defined (__ACE_INLINE__)
478 #include "DataLink.inl"
479 #endif /* __ACE_INLINE__ */
480 
481 #endif /* OPENDDS_DCPS_DATALINK_H */
#define ACE_BEGIN_VERSIONED_NAMESPACE_DECL
IdToSendListenerMap send_listeners_
Definition: DataLink.h:392
Interceptor(ACE_Reactor *reactor, ACE_thread_t owner)
Definition: DataLink.h:269
virtual TransportQueueElement * customize_queue_element(TransportQueueElement *element)
Definition: DataLink.h:360
const LogLevel::Value value
Definition: debug.cpp:61
#define ACE_SYNCH_MUTEX
std::pair< TransportClient_wrch, GUID_t > OnStartCallback
Definition: DataLink.h:259
WeakRcHandle< TransportSendListener > TransportSendListener_wrch
Definition: DataLink.h:61
unsigned long ACE_Reactor_Mask
bool is_active_
Is pub or sub ?
Definition: DataLink.h:463
Interceptor interceptor_
Definition: DataLink.h:469
virtual bool is_leading(const GUID_t &, const GUID_t &) const
Definition: DataLink.h:289
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
LockType pub_sub_maps_lock_
Definition: DataLink.h:403
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
WeakRcHandle< TransportImpl > impl_
A weak rchandle to the TransportImpl that created this DataLink.
Definition: DataLink.h:417
GuidSet RepoIdSet
Definition: GuidUtils.h:113
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
AssocByLocal assoc_by_local_
Definition: DataLink.h:414
AssocByLocal assoc_releasing_
Definition: DataLink.h:428
ACE_UINT64 DataLinkIdType
Identifier type for DataLink objects.
TransportStrategy_rch receive_strategy_
The transport receive strategy object for this DataLink.
Definition: DataLink.h:324
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
RcHandle< DataLinkSet > DataLinkSet_rch
The type definition for the smart-pointer to the underlying type.
Definition: DataLinkSet.h:27
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
TransportSendStrategy_rch send_strategy_
The transport send strategy object for this DataLink.
Definition: DataLink.h:440
DWORD ACE_thread_t
TransportReceiveListener_wrch default_listener_
Definition: DataLink.h:401
PendingOnStartsMap pending_on_starts_
Definition: DataLink.h:449
ACE_SYNCH_MUTEX LockType
Definition: DataLink.h:377
#define OPENDDS_MAP_CMP(K, V, C)
#define ACE_END_VERSIONED_NAMESPACE_DECL
Active Object responsible for cleaning up DataLink resources.
Simple listener to discard response samples.
TimeDuration datalink_release_delay_
Definition: DataLink.h:453
virtual void release_remote_i(const GUID_t &)
Definition: DataLink.h:366
virtual WeakRcHandle< ICE::Endpoint > get_ice_endpoint() const
Definition: DataLink.h:287
ImmediateStart(RcHandle< DataLink > link, WeakRcHandle< TransportClient > client, const GUID_t &remote)
Definition: DataLink.h:275
WeakRcHandle< TransportClient > client_
Definition: DataLink.h:279
unsigned long long ACE_UINT64
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
ssize_t send_i(ACE_HANDLE handle, const void *buf, size_t len)
unique_ptr< ThreadPerConnectionSendTask > thr_per_con_send_task_
Definition: DataLink.h:425
ACE_CDR::Long Priority
ACE_Guard< LockType > GuardType
Definition: DataLink.h:437
SendResponseListener send_response_listener_
Listener for TransportSendControlElements created in send_control.
Definition: DataLink.h:467
OnStartCallbackMap on_start_callbacks_
Definition: DataLink.h:447
unique_ptr< MessageBlockAllocator > mb_allocator_
Definition: DataLink.h:457
AssocByRemote assoc_by_remote_
Definition: DataLink.h:406
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
WeakRcHandle< TransportClient > TransportClient_wrch
Definition: DataLink.h:258
IdToRecvListenerMap recv_listeners_
Definition: DataLink.h:396
bool is_loopback_
Is remote attached to same transport ?
Definition: DataLink.h:461
virtual void release_reservations_i(const GUID_t &, const GUID_t &)
Definition: DataLink.h:367
ACE_UINT64 id_
The id for this DataLink.
Definition: DataLink.h:420
Execute the requests of sending a sample or control message.
Priority transport_priority_
TRANSPORT_PRIORITY value associated with the link.
Definition: DataLink.h:431
MonotonicTimePoint scheduled_to_stop_at_
Definition: DataLink.h:388
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
std::ostream & operator<<(std::ostream &stream, const GuidPair &guidp)
Definition: Checklist.h:146
Base wrapper class around a data/control sample to be sent.
unique_ptr< DataBlockAllocator > db_allocator_
Definition: DataLink.h:458
SendControlStatus
Return code type for send_control() operations.