LCOV - code coverage report
Current view: top level - DCPS/transport/framework - DataLink.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 10 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 7 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_DATALINK_H
       9             : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_DATALINK_H
      10             : 
      11             : #include "dds/DCPS/dcps_export.h"
      12             : #include "dds/DCPS/Definitions.h"
      13             : #include "dds/DCPS/RcObject.h"
      14             : #include "dds/DCPS/PoolAllocator.h"
      15             : #include "dds/DCPS/RcEventHandler.h"
      16             : #include "ReceiveListenerSetMap.h"
      17             : #include "SendResponseListener.h"
      18             : #include "TransportDefs.h"
      19             : #include "TransportSendStrategy.h"
      20             : #include "TransportSendStrategy_rch.h"
      21             : #include "TransportStrategy.h"
      22             : #include "TransportStrategy_rch.h"
      23             : #include "TransportSendControlElement.h"
      24             : #include "TransportSendListener.h"
      25             : #include "TransportReceiveListener.h"
      26             : #include "QueueTaskBase_T.h"
      27             : #include "dds/DCPS/ReactorInterceptor.h"
      28             : #include "dds/DCPS/TimeTypes.h"
      29             : 
      30             : #include "ace/Event_Handler.h"
      31             : #include "ace/Synch_Traits.h"
      32             : 
      33             : #include <utility>
      34             : 
      35             : #include <iosfwd> // For operator<<() diagnostic formatter.
      36             : 
      37             : ACE_BEGIN_VERSIONED_NAMESPACE_DECL
      38             : class ACE_SOCK;
      39             : ACE_END_VERSIONED_NAMESPACE_DECL
      40             : 
      41             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      42             : 
      43             : namespace OpenDDS {
      44             : 
      45             : namespace ICE {
      46             :   class Endpoint;
      47             : }
      48             : 
      49             : namespace DCPS {
      50             : 
      51             : 
      52             : class TransportQueueElement;
      53             : class ReceivedDataSample;
      54             : class DataSampleElement;
      55             : class ThreadPerConnectionSendTask;
      56             : class TransportClient;
      57             : class TransportImpl;
      58             : 
      59             : typedef OPENDDS_MAP_CMP(GUID_t, DataLinkSet_rch, GUID_tKeyLessThan) DataLinkSetMap;
      60             : 
      61             : typedef WeakRcHandle<TransportSendListener> TransportSendListener_wrch;
      62             : 
      63             : /**
      64             :  * This class manages the reservations based on the associations between datareader
      65             :  * and datawriter. It basically delegate the samples to send strategy for sending and
      66             :  * deliver the samples received by receive strategy to the listener.
      67             :  *
      68             :  * Notes about object ownership:
      69             :  * 1) Own the send strategy object and receive strategy object.
      70             :  * 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection
      71             :  *    is enabled.
      72             :  */
      73             : class OpenDDS_Dcps_Export DataLink
      74             : : public virtual RcEventHandler {
      75             : 
      76             :   friend class DataLinkCleanupTask;
      77             : 
      78             : public:
      79             : 
      80             :   enum ConnectionNotice {
      81             :     DISCONNECTED,
      82             :     RECONNECTED,
      83             :     LOST
      84             :   };
      85             : 
      86             :   /// A DataLink object is always created by a TransportImpl object.
      87             :   /// Thus, the TransportImpl object passed-in here is the object that
      88             :   /// created this DataLink.  The ability to specify a priority
      89             :   /// for individual links is included for construction so its
      90             :   /// value can be available for activating any threads.
      91             :   DataLink(const TransportImpl_rch& impl, Priority priority, bool is_loopback, bool is_active);
      92             :   virtual ~DataLink();
      93             : 
      94             :   /// Reactor invokes this after being notified in schedule_stop or cancel_release
      95             :   int handle_exception(ACE_HANDLE /* fd */);
      96             : 
      97             :   /// Allows DataLink::stop to be done on the reactor thread so that
      98             :   /// this thread avoids possibly deadlocking trying to access reactor
      99             :   /// to stop strategies or schedule timers
     100             :   void schedule_stop(const MonotonicTimePoint& schedule_to_stop_at);
     101             :   /// The stop method is used to stop the DataLink prior to shutdown.
     102             :   void stop();
     103             : 
     104             :   /// The resume_send is used in the case of reconnection
     105             :   /// on the subscriber's side.
     106             :   void resume_send();
     107             : 
     108             :   /// Only called by our TransportImpl object.
     109             :   ///
     110             :   /// Return Codes: 0 means successful reservation made.
     111             :   ///              -1 means failure.
     112             :   virtual int make_reservation(const GUID_t& remote_subscription_id,
     113             :                                const GUID_t& local_publication_id,
     114             :                                const TransportSendListener_wrch& send_listener,
     115             :                                bool reliable);
     116             : 
     117             :   /// Only called by our TransportImpl object.
     118             :   ///
     119             :   /// Return Codes: 0 means successful reservation made.
     120             :   ///              -1 means failure.
     121             :   virtual int make_reservation(const GUID_t& remote_publication_id,
     122             :                                const GUID_t& local_subscription_id,
     123             :                                const TransportReceiveListener_wrch& receive_listener,
     124             :                                bool reliable);
     125             : 
     126             :   // ciju: Called by LinkSet with locks held
     127             :   /// This will release reservations that were made by one of the
     128             :   /// make_reservation() methods.  All we know is that the supplied
     129             :   /// GUID_t is considered to be a remote id.  It could be a
     130             :   /// remote subscriber or a remote publisher.
     131             :   void release_reservations(GUID_t          remote_id,
     132             :                             GUID_t          local_id,
     133             :                             DataLinkSetMap& released_locals);
     134             : 
     135             :   void schedule_delayed_release();
     136             : 
     137             :   const TimeDuration& datalink_release_delay() const;
     138             : 
     139             :   /// Either send or receive listener for this local_id should be
     140             :   /// removed from internal DataLink structures so it no longer
     141             :   /// receives events.
     142             :   void remove_listener(const GUID_t& local_id);
     143             : 
     144             :   // ciju: Called by LinkSet with locks held
     145             :   /// Called by the TransportClient objects that reference this
     146             :   /// DataLink.  Used by the TransportClient to send a sample,
     147             :   /// or to send a control message. These functions either give the
     148             :   /// request to the PerThreadConnectionSendTask when thread_per_connection
     149             :   /// configuration is true or just simply delegate to the send strategy.
     150             :   void send_start();
     151             :   void send(TransportQueueElement* element);
     152             :   void send_stop(GUID_t repoId);
     153             : 
     154             :   // ciju: Called by LinkSet with locks held
     155             :   /// This method is essentially an "undo_send()" method.  It's goal
     156             :   /// is to remove all traces of the sample from this DataLink (if
     157             :   /// the sample is even known to the DataLink).
     158             :   virtual RemoveResult remove_sample(const DataSampleElement* sample);
     159             : 
     160             :   // ciju: Called by LinkSet with locks held
     161             :   virtual void remove_all_msgs(const GUID_t& pub_id);
     162             : 
     163             :   /// This is called by our TransportReceiveStrategy object when it
     164             :   /// has received a complete data sample.  This method will cause
     165             :   /// the appropriate TransportReceiveListener objects to be told
     166             :   /// that data_received().
     167             :   /// If readerId is not GUID_UNKNOWN, only the TransportReceiveListener
     168             :   /// with that ID (if one exists) will receive the data.
     169             :   int data_received(ReceivedDataSample& sample,
     170             :                     const GUID_t& readerId = GUID_UNKNOWN);
     171             : 
     172             :   /// Varation of data_received() that allows for excluding a subset of readers
     173             :   /// by specifying which readers specifically should receive.
     174             :   /// Any reader ID that does not appear in the include set will be skipped.
     175             :   void data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl);
     176             : 
     177             :   /// Obtain a unique identifier for this DataLink object.
     178             :   DataLinkIdType id() const;
     179             : 
     180             :   /// Our TransportImpl will inform us if it is being shutdown()
     181             :   /// by calling this method.
     182             :   void transport_shutdown();
     183             : 
     184             :   /// Notify the datawriters and datareaders that the connection is
     185             :   /// disconnected, lost, or reconnected. The datareader/datawriter
     186             :   /// will notify the corresponding listener.
     187             :   void notify(ConnectionNotice notice);
     188             : 
     189             :   /// Called before release the datalink or before shutdown to let
     190             :   /// the concrete DataLink to do anything necessary.
     191             :   virtual void pre_stop_i();
     192             : 
     193             :   // Call-back from the concrete transport object.
     194             :   // The connection has been broken. No locks are being held.
     195             :   // Take a snapshot of current associations which will be removed
     196             :   // by DataLinkCleanupTask.
     197             :   void release_resources();
     198             : 
     199             :   // Used by to inform the send strategy to clear all unsent samples upon
     200             :   // backpressure timed out.
     201             :   void terminate_send();
     202             :   void terminate_send_if_suspended();
     203             : 
     204             :   /// This is called on publisher side to see if this link communicates
     205             :   /// with the provided sub or by the subscriber side to see if this link
     206             :   /// communicates with the provided pub
     207             :   bool is_target(const GUID_t& remote_id);
     208             : 
     209             :   /// This is called by DataLinkCleanupTask thread to remove the associations
     210             :   /// based on the snapshot in release_resources().
     211             :   void clear_associations();
     212             : 
     213             :   int handle_timeout(const ACE_Time_Value& tv, const void* arg);
     214             :   int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m);
     215             : 
     216             :   // Set the DiffServ codepoint of the socket.  This is a stateless
     217             :   // method and is here only because this is a convenient common
     218             :   // location that can be reached by client code that needs to
     219             :   // perform this behavior.
     220             :   void set_dscp_codepoint(int cp, ACE_SOCK& socket);
     221             : 
     222             :   /// Accessors for the TRANSPORT_PRIORITY value associated with
     223             :   /// this link.
     224             :   Priority& transport_priority();
     225             :   Priority  transport_priority() const;
     226             : 
     227             :   bool& is_loopback();
     228             :   bool  is_loopback() const;
     229             : 
     230             :   bool& is_active();
     231             :   bool  is_active() const;
     232             : 
     233             :   bool cancel_release();
     234             : 
     235             :   /// This allows a subclass to easily create a transport control
     236             :   /// sample to send via send_control.
     237             :   ACE_Message_Block* create_control(char submessage_id,
     238             :                                     DataSampleHeader& header,
     239             :                                     Message_Block_Ptr data);
     240             : 
     241             :   /// This allows a subclass to send transport control samples over
     242             :   /// this DataLink. This is useful for sending transport-specific
     243             :   /// control messages between one or more endpoints under this
     244             :   /// DataLink's control.
     245             :   SendControlStatus send_control(const DataSampleHeader& header, Message_Block_Ptr data);
     246             : 
     247             :   /// For a given publication "pub_id", store the total number of corresponding
     248             :   /// subscriptions in "n_subs" and given a set of subscriptions
     249             :   /// (the "in" sequence), return the subset of the input set "in" which are
     250             :   /// targets of this DataLink (see is_target()).
     251             :   GUIDSeq* target_intersection(const GUID_t& pub_id, const GUIDSeq& in, size_t& n_subs);
     252             : 
     253             :   TransportImpl_rch impl() const;
     254             : 
     255             :   void default_listener(const TransportReceiveListener_wrch& trl);
     256             :   TransportReceiveListener_wrch default_listener() const;
     257             : 
     258             :   typedef WeakRcHandle<TransportClient> TransportClient_wrch;
     259             :   typedef std::pair<TransportClient_wrch, GUID_t> OnStartCallback;
     260             : 
     261             :   bool add_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote);
     262             :   void remove_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote);
     263             :   void invoke_on_start_callbacks(bool success);
     264             :   bool invoke_on_start_callbacks(const GUID_t& local, const GUID_t& remote, bool success);
     265             :   void remove_startup_callbacks(const GUID_t& local, const GUID_t& remote);
     266             : 
     267             :   class Interceptor : public ReactorInterceptor {
     268             :   public:
     269           0 :     Interceptor(ACE_Reactor* reactor, ACE_thread_t owner) : ReactorInterceptor(reactor, owner) {}
     270             :     bool reactor_is_shut_down() const;
     271             :   };
     272             : 
     273             :   class ImmediateStart : public virtual ReactorInterceptor::Command {
     274             :   public:
     275           0 :     ImmediateStart(RcHandle<DataLink> link, WeakRcHandle<TransportClient> client, const GUID_t& remote) : link_(link), client_(client), remote_(remote) {}
     276             :     void execute();
     277             :   private:
     278             :     RcHandle<DataLink> link_;
     279             :     WeakRcHandle<TransportClient> client_;
     280             :     GUID_t remote_;
     281             :   };
     282             : 
     283             :   void set_scheduling_release(bool scheduling_release);
     284             : 
     285             :   virtual void send_final_acks (const GUID_t& readerid);
     286             : 
     287           0 :   virtual WeakRcHandle<ICE::Endpoint> get_ice_endpoint() const { return WeakRcHandle<ICE::Endpoint>(); }
     288             : 
     289           0 :   virtual bool is_leading(const GUID_t& /*writer*/,
     290           0 :                           const GUID_t& /*reader*/) const { return false; }
     291             : 
     292             : 
     293             : protected:
     294             : 
     295             :   /// This is how the subclass "announces" to this DataLink base class
     296             :   /// that this DataLink has now been "connected" and should start
     297             :   /// the supplied strategy objects.  This start method is also
     298             :   /// going to keep a "copy" of the references to the strategy objects.
     299             :   /// Also note that it is acceptable to pass-in a NULL (0)
     300             :   /// TransportReceiveStrategy*, but it is assumed that the
     301             :   /// TransportSendStrategy* argument is not NULL.
     302             :   ///
     303             :   /// If the start() method fails to start either strategy, then a -1
     304             :   /// is returned.  Otherwise, a 0 is returned.  In the failure case,
     305             :   /// if one of the strategy objects was started successfully, then
     306             :   /// it will be stopped before the start() method returns -1.
     307             :   int start(const TransportSendStrategy_rch& send_strategy,
     308             :             const TransportStrategy_rch& receive_strategy,
     309             :             bool invoke_all = true);
     310             : 
     311             :   /// This announces the "stop" event to our subclass.  The "stop"
     312             :   /// event will occur when this DataLink is handling a
     313             :   /// release_reservations() call and determines that it has just
     314             :   /// released all of the remaining reservations on this DataLink.
     315             :   /// The "stop" event will also occur when the TransportImpl
     316             :   /// is being shutdown() - we call stop_i() from our
     317             :   /// transport_shutdown() method to handle this case.
     318             :   virtual void stop_i();
     319             : 
     320             :   /// Used to provide unique Ids to all DataLink methods.
     321             :   static ACE_UINT64 get_next_datalink_id();
     322             : 
     323             :   /// The transport receive strategy object for this DataLink.
     324             :   TransportStrategy_rch receive_strategy_;
     325             : 
     326             :   friend class ThreadPerConnectionSendTask;
     327             : 
     328             :   /// The implementation of the functions that accomplish the
     329             :   /// sample or control message delivery. They just simply
     330             :   /// delegate to the send strategy.
     331             :   void send_start_i();
     332             :   virtual void send_i(TransportQueueElement* element, bool relink = true);
     333             :   void send_stop_i(GUID_t repoId);
     334             : 
     335             :   /// For a given local GUID_t (publication or subscription), return the list
     336             :   /// of remote peer GUID_ts (subscriptions or publications) that this link
     337             :   /// knows about due to make_reservation().
     338             :   GUIDSeq* peer_ids(const GUID_t& local_id) const;
     339             : 
     340             :   void network_change() const;
     341             : 
     342             :   void replay_durable_data(const GUID_t& local_pub_id, const GUID_t& remote_sub_id) const;
     343             : 
     344             : private:
     345             : 
     346             :   /// Helper function to output the enum as a string to help debugging.
     347             :   const char* connection_notice_as_str(ConnectionNotice notice);
     348             : 
     349             :   TransportSendListener_rch send_listener_for(const GUID_t& pub_id) const;
     350             :   TransportReceiveListener_rch recv_listener_for(const GUID_t& sub_id) const;
     351             : 
     352             :   /// Save current sub and pub association maps for releasing and create
     353             :   /// empty maps for new associations.
     354             :   void prepare_release();
     355             : 
     356             :   virtual bool handle_send_request_ack(TransportQueueElement* element);
     357             : 
     358             :   /// Allow derived classes to provide an alternate "customized" queue element
     359             :   /// for this DataLink (not shared with other links in the DataLinkSet).
     360           0 :   virtual TransportQueueElement* customize_queue_element(
     361             :     TransportQueueElement* element)
     362             :   {
     363           0 :     return element;
     364             :   }
     365             : 
     366           0 :   virtual void release_remote_i(const GUID_t& /*remote_id*/) {}
     367           0 :   virtual void release_reservations_i(const GUID_t& /*remote_id*/,
     368           0 :                                       const GUID_t& /*local_id*/) {}
     369             : 
     370             :   void data_received_i(ReceivedDataSample& sample,
     371             :                        const GUID_t& readerId,
     372             :                        const RepoIdSet& incl_excl,
     373             :                        ReceiveListenerSet::ConstrainReceiveSet constrain);
     374             : 
     375             :   void notify_reactor();
     376             : 
     377             :   typedef ACE_SYNCH_MUTEX     LockType;
     378             : 
     379             :   /// Convenience function for diagnostic information.
     380             : #ifndef OPENDDS_SAFETY_PROFILE
     381             :   friend OpenDDS_Dcps_Export
     382             :   std::ostream& operator<<(std::ostream& str, const DataLink& value);
     383             : #endif
     384             : 
     385             :   /// A boolean indicating if the DataLink has been stopped. This
     386             :   /// value is protected by the strategy_lock_.
     387             :   bool stopped_;
     388             :   MonotonicTimePoint scheduled_to_stop_at_;
     389             : 
     390             :   /// Map publication Id value to TransportSendListener.
     391             :   typedef OPENDDS_MAP_CMP(GUID_t, TransportSendListener_wrch, GUID_tKeyLessThan) IdToSendListenerMap;
     392             :   IdToSendListenerMap send_listeners_;
     393             : 
     394             :   /// Map subscription Id value to TransportReceieveListener.
     395             :   typedef OPENDDS_MAP_CMP(GUID_t, TransportReceiveListener_wrch, GUID_tKeyLessThan) IdToRecvListenerMap;
     396             :   IdToRecvListenerMap recv_listeners_;
     397             : 
     398             :   /// If default_listener_ is not null and this DataLink receives a sample
     399             :   /// from a publication GUID that's not in pub_map_, it will call
     400             :   /// data_received() on the default_listener_.
     401             :   TransportReceiveListener_wrch default_listener_;
     402             : 
     403             :   mutable LockType pub_sub_maps_lock_;
     404             : 
     405             :   typedef OPENDDS_MAP_CMP(GUID_t, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote;
     406             :   AssocByRemote assoc_by_remote_;
     407             : 
     408             :   struct LocalAssociationInfo {
     409             :     bool reliable_;
     410             :     RepoIdSet associated_;
     411             :   };
     412             : 
     413             :   typedef OPENDDS_MAP_CMP(GUID_t, LocalAssociationInfo, GUID_tKeyLessThan) AssocByLocal;
     414             :   AssocByLocal assoc_by_local_;
     415             : 
     416             :   /// A weak rchandle to the TransportImpl that created this DataLink.
     417             :   WeakRcHandle<TransportImpl> impl_;
     418             : 
     419             :   /// The id for this DataLink
     420             :   ACE_UINT64 id_;
     421             : 
     422             :   /// The task used to do the sending. This ThreadPerConnectionSendTask
     423             :   /// object is created when the thread_per_connection configuration is
     424             :   /// true. It only dedicate to this datalink.
     425             :   unique_ptr<ThreadPerConnectionSendTask> thr_per_con_send_task_;
     426             : 
     427             :   // snapshot of associations when the release_resources() is called.
     428             :   AssocByLocal assoc_releasing_;
     429             : 
     430             :   /// TRANSPORT_PRIORITY value associated with the link.
     431             :   Priority transport_priority_;
     432             : 
     433             :   bool scheduling_release_;
     434             : 
     435             : protected:
     436             : 
     437             :   typedef ACE_Guard<LockType> GuardType;
     438             : 
     439             :   /// The transport send strategy object for this DataLink.
     440             :   TransportSendStrategy_rch send_strategy_;
     441             :   LockType strategy_lock_;
     442             : 
     443             :   TransportSendStrategy_rch get_send_strategy();
     444             : 
     445             :   typedef OPENDDS_MAP_CMP(GUID_t, TransportClient_wrch, GUID_tKeyLessThan) RepoToClientMap;
     446             :   typedef OPENDDS_MAP_CMP(GUID_t, RepoToClientMap, GUID_tKeyLessThan) OnStartCallbackMap;
     447             :   OnStartCallbackMap on_start_callbacks_;
     448             :   typedef OPENDDS_MAP_CMP(GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingOnStartsMap;
     449             :   PendingOnStartsMap pending_on_starts_;
     450             : 
     451             :   /// Configurable delay in milliseconds that the datalink
     452             :   /// should be released after all associations are removed.
     453             :   TimeDuration datalink_release_delay_;
     454             : 
     455             :   /// Allocators for data and message blocks used by transport
     456             :   /// control samples when send_control is called.
     457             :   unique_ptr<MessageBlockAllocator> mb_allocator_;
     458             :   unique_ptr<DataBlockAllocator> db_allocator_;
     459             : 
     460             :   /// Is remote attached to same transport ?
     461             :   bool is_loopback_;
     462             :   /// Is pub or sub ?
     463             :   bool is_active_;
     464             :   bool started_;
     465             : 
     466             :   /// Listener for TransportSendControlElements created in send_control
     467             :   SendResponseListener send_response_listener_;
     468             : 
     469             :   Interceptor interceptor_;
     470             : };
     471             : 
     472             : } // namespace DCPS
     473             : } // namespace OpenDDS
     474             : 
     475             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     476             : 
     477             : #if defined (__ACE_INLINE__)
     478             : #include "DataLink.inl"
     479             : #endif /* __ACE_INLINE__ */
     480             : 
     481             : #endif /* OPENDDS_DCPS_DATALINK_H */

Generated by: LCOV version 1.16