LCOV - code coverage report
Current view: top level - DCPS/transport/framework - TransportClient.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 6 69 8.7 %
Date: 2023-04-30 01:32:43 Functions: 3 23 13.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTCLIENT_H
       9             : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTCLIENT_H
      10             : 
      11             : #include "TransportConfig_rch.h"
      12             : #include "TransportImpl.h"
      13             : #include "DataLinkSet.h"
      14             : 
      15             : #include <dds/DCPS/dcps_export.h>
      16             : #include <dds/DCPS/AssociationData.h>
      17             : #include <dds/DCPS/ReactorInterceptor.h>
      18             : #include <dds/DCPS/Service_Participant.h>
      19             : #include <dds/DCPS/PoolAllocator.h>
      20             : #include <dds/DCPS/PoolAllocationBase.h>
      21             : #include <dds/DCPS/DiscoveryListener.h>
      22             : #include <dds/DCPS/RcEventHandler.h>
      23             : #include <dds/DCPS/BuiltInTopicUtils.h>
      24             : 
      25             : #include <ace/Time_Value.h>
      26             : #include <ace/Event_Handler.h>
      27             : #include <ace/Reverse_Lock_T.h>
      28             : 
      29             : // Forward definition of a test-friendly class in the global name space
      30             : class DDS_TEST;
      31             : 
      32             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      33             : 
      34             : namespace OpenDDS {
      35             : namespace DCPS {
      36             : 
      37             : class SendStateDataSampleList;
      38             : 
      39             : /**
      40             :  * @brief Mix-in class for DDS entities which directly use the transport layer.
      41             :  *
      42             :  * DataReaderImpl and DataWriterImpl are TransportClients.  The TransportClient
      43             :  * class manages the TransportImpl objects that represent the available
      44             :  * communication mechanisms and the DataLink objects that represent the
      45             :  * currently active communication channels to peers.
      46             :  */
      47             : class OpenDDS_Dcps_Export TransportClient
      48             :   : public virtual RcObject
      49             : {
      50             : public:
      51             :   // Used by TransportImpl to complete associate() processing:
      52             :   void use_datalink(const GUID_t& remote_id, const DataLink_rch& link);
      53             : 
      54             :   // values for flags parameter of transport_assoc_done():
      55             :   enum { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };
      56             :   TransportClient();
      57             :   virtual ~TransportClient();
      58             : 
      59             : 
      60             :   // Local setup:
      61             : 
      62             :   void enable_transport(bool reliable, bool durable);
      63             :   void enable_transport_using_config(bool reliable, bool durable,
      64             :                                      const TransportConfig_rch& tc);
      65             : 
      66           0 :   bool swap_bytes() const { return swap_bytes_; }
      67           0 :   bool cdr_encapsulation() const { return cdr_encapsulation_; }
      68           0 :   const TransportLocatorSeq& connection_info() const { return conn_info_; }
      69             :   void populate_connection_info();
      70             :   bool is_reliable() const { return reliable_; }
      71             : 
      72             :   // Managing associations to remote peers:
      73             : 
      74             :   bool associate(const AssociationData& peer, bool active);
      75             :   void disassociate(const GUID_t& peerId);
      76             :   void stop_associating();
      77             :   void stop_associating(const GUID_t* repos, CORBA::ULong length);
      78             :   void send_final_acks();
      79             :   void transport_stop();
      80             : 
      81             :   // Discovery:
      82             :   void register_for_reader(const GUID_t& participant,
      83             :                            const GUID_t& writerid,
      84             :                            const GUID_t& readerid,
      85             :                            const TransportLocatorSeq& locators,
      86             :                            OpenDDS::DCPS::DiscoveryListener* listener);
      87             : 
      88             :   void unregister_for_reader(const GUID_t& participant,
      89             :                              const GUID_t& writerid,
      90             :                              const GUID_t& readerid);
      91             : 
      92             :   void register_for_writer(const GUID_t& participant,
      93             :                            const GUID_t& readerid,
      94             :                            const GUID_t& writerid,
      95             :                            const TransportLocatorSeq& locators,
      96             :                            DiscoveryListener* listener);
      97             : 
      98             :   void unregister_for_writer(const GUID_t& participant,
      99             :                              const GUID_t& readerid,
     100             :                              const GUID_t& writerid);
     101             : 
     102             :   void update_locators(const GUID_t& remote,
     103             :                        const TransportLocatorSeq& locators);
     104             : 
     105             :   WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
     106             : 
     107             :   // Data transfer:
     108             : 
     109             :   bool send_response(const GUID_t& peer,
     110             :                      const DataSampleHeader& header,
     111             :                      Message_Block_Ptr payload); // [DR]
     112             : 
     113             :   void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id = 0);
     114             : 
     115             :   SendControlStatus send_w_control(SendStateDataSampleList send_list,
     116             :                                    const DataSampleHeader& header,
     117             :                                    Message_Block_Ptr msg,
     118             :                                    const GUID_t& destination);
     119             : 
     120             :   SendControlStatus send_control(const DataSampleHeader& header,
     121             :                                  Message_Block_Ptr msg);
     122             : 
     123             :   SendControlStatus send_control_to(const DataSampleHeader& header,
     124             :                                     Message_Block_Ptr msg,
     125             :                                     const GUID_t& destination);
     126             : 
     127             :   bool remove_sample(const DataSampleElement* sample);
     128             :   bool remove_all_msgs();
     129             : 
     130             :   virtual void add_link(const DataLink_rch& link, const GUID_t& peer);
     131             :   virtual GUID_t get_guid() const = 0;
     132           0 :   virtual RcHandle<BitSubscriber> get_builtin_subscriber_proxy() const { return RcHandle<BitSubscriber>(); }
     133             : 
     134             :   void terminate_send_if_suspended();
     135             : 
     136             :   bool associated_with(const GUID_t& remote) const;
     137             :   bool pending_association_with(const GUID_t& remote) const;
     138             : 
     139           0 :   GUID_t repo_id() const
     140             :   {
     141           0 :     ACE_Guard<ACE_Thread_Mutex> guard(lock_);
     142           0 :     return repo_id_;
     143           0 :   }
     144             : 
     145             :   void data_acked(const GUID_t& remote);
     146             : 
     147             :   bool is_leading(const GUID_t& reader_id) const;
     148             : 
     149             : protected:
     150           0 :   void cdr_encapsulation(bool encap)
     151             :   {
     152           0 :     cdr_encapsulation_ = encap;
     153           0 :   }
     154             : 
     155             : private:
     156             : 
     157             :   // Implemented by derived classes (DataReaderImpl/DataWriterImpl)
     158             :   virtual bool check_transport_qos(const TransportInst& inst) = 0;
     159             :   virtual DDS::DomainId_t domain_id() const = 0;
     160             :   virtual Priority get_priority_value(const AssociationData& data) const = 0;
     161           0 :   virtual void transport_assoc_done(int /*flags*/, const GUID_t& /*remote*/) {}
     162           0 :   virtual SequenceNumber get_max_sn() const { return SequenceNumber::SEQUENCENUMBER_UNKNOWN(); };
     163             : 
     164             : 
     165             : 
     166             : #if defined(OPENDDS_SECURITY)
     167           0 :   virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
     168             :   {
     169           0 :     return DDS::HANDLE_NIL;
     170             :   }
     171             : #endif
     172             : 
     173             :   // helpers
     174             :   typedef ACE_Guard<ACE_Thread_Mutex> Guard;
     175             :   void use_datalink_i(const GUID_t& remote_id,
     176             :                       const DataLink_rch& link,
     177             :                       Guard& guard);
     178             :   TransportSendListener_rch get_send_listener();
     179             :   TransportReceiveListener_rch get_receive_listener();
     180             : 
     181             :   //helper for initiating connection, called by PendingAssoc objects
     182             :   //allows PendingAssoc to temporarily release lock_ to allow
     183             :   //TransportImpl to access Reactor if needed
     184             :   bool initiate_connect_i(TransportImpl::AcceptConnectResult& result,
     185             :                           TransportImpl_rch impl,
     186             :                           const TransportImpl::RemoteTransport& remote,
     187             :                           const TransportImpl::ConnectionAttribs& attribs_,
     188             :                           Guard& guard);
     189             : 
     190             :   void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id);
     191             : 
     192             :   // A class, normally provided by an unit test, who needs access to a client's
     193             :   // privates.
     194             :   friend class ::DDS_TEST;
     195             : 
     196             :   typedef OPENDDS_MAP_CMP(GUID_t, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex;
     197             :   typedef OPENDDS_VECTOR(WeakRcHandle<TransportImpl>) ImplsType;
     198             : 
     199             :   typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
     200             :   struct PendingAssoc : RcEventHandler {
     201             :     ACE_Thread_Mutex mutex_;
     202             :     bool active_, scheduled_;
     203             :     ImplsType impls_;
     204             :     CORBA::ULong blob_index_;
     205             :     AssociationData data_;
     206             :     TransportImpl::ConnectionAttribs attribs_;
     207             :     WeakRcHandle<TransportClient> client_;
     208             : 
     209           0 :     explicit PendingAssoc(RcHandle<TransportClient> tc_rch)
     210           0 :       : active_(false)
     211           0 :       , scheduled_(false)
     212           0 :       , blob_index_(0)
     213           0 :       , client_(tc_rch)
     214           0 :     {}
     215             : 
     216             :     void reset_client();
     217             :     bool safe_to_remove();
     218             :     bool initiate_connect(TransportClient* tc, Guard& guard);
     219             :     int handle_timeout(const ACE_Time_Value& time, const void* arg);
     220             :   };
     221             : 
     222             :   typedef RcHandle<PendingAssoc> PendingAssoc_rch;
     223             : 
     224             :   typedef OPENDDS_MAP_CMP(GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PendingMap;
     225             :   typedef OPENDDS_MULTIMAP_CMP(GUID_t, PendingAssoc_rch, GUID_tKeyLessThan) PrevPendingMap;
     226             : 
     227             :   void clean_prev_pending();
     228             : 
     229             :   class PendingAssocTimer : public ReactorInterceptor {
     230             :   public:
     231           3 :     PendingAssocTimer(ACE_Reactor* reactor,
     232             :                       ACE_thread_t owner)
     233           3 :       : ReactorInterceptor(reactor, owner)
     234           3 :       , timer_id_(-1)
     235           3 :     { }
     236             : 
     237           0 :     void schedule_timer(TransportClient_rch transport_client, const PendingAssoc_rch& pend)
     238             :     {
     239           0 :       execute_or_enqueue(make_rch<ScheduleCommand>(this, transport_client, pend));
     240           0 :     }
     241             : 
     242           0 :     ReactorInterceptor::CommandPtr cancel_timer(const PendingAssoc_rch& pend)
     243             :     {
     244           0 :       return execute_or_enqueue(make_rch<CancelCommand>(this, pend));
     245             :     }
     246             : 
     247           0 :     void set_id(long id) { timer_id_ = id; }
     248           0 :     long get_id() const { return timer_id_; }
     249             : 
     250           0 :     virtual bool reactor_is_shut_down() const
     251             :     {
     252           0 :       return TheServiceParticipant->is_shut_down();
     253             :     }
     254             : 
     255             :   private:
     256           6 :     ~PendingAssocTimer()
     257           6 :     { }
     258             : 
     259             :     class CommandBase : public Command {
     260             :     public:
     261           0 :       CommandBase(PendingAssocTimer* timer,
     262             :                   const PendingAssoc_rch& assoc)
     263           0 :         : timer_ (timer)
     264           0 :         , assoc_ (assoc)
     265           0 :       { }
     266             :     protected:
     267             :       PendingAssocTimer* timer_;
     268             :       PendingAssoc_rch assoc_;
     269             :     };
     270             :     struct ScheduleCommand : public CommandBase {
     271           0 :       ScheduleCommand(PendingAssocTimer* timer,
     272             :                       TransportClient_rch transport_client,
     273             :                       const PendingAssoc_rch& assoc)
     274           0 :         : CommandBase (timer, assoc)
     275           0 :         , transport_client_ (transport_client)
     276           0 :       { }
     277           0 :       virtual void execute()
     278             :       {
     279           0 :         if (timer_->reactor()) {
     280           0 :           TransportClient_rch client = transport_client_.lock();
     281           0 :           if (client) {
     282           0 :             ACE_Guard<ACE_Thread_Mutex> guard(assoc_->mutex_);
     283           0 :             assoc_->scheduled_ = true;
     284           0 :             long id = timer_->reactor()->schedule_timer(assoc_.in(),
     285           0 :                                                         client.in(),
     286           0 :                                                         client->passive_connect_duration_.value());
     287           0 :             if (id != -1) {
     288           0 :               timer_->set_id(id);
     289             :             }
     290           0 :           }
     291           0 :         }
     292           0 :       }
     293             :       WeakRcHandle<TransportClient> transport_client_;
     294             :     };
     295             :     struct CancelCommand : public CommandBase {
     296           0 :       CancelCommand(PendingAssocTimer* timer,
     297             :                     const PendingAssoc_rch& assoc)
     298           0 :         : CommandBase (timer, assoc)
     299           0 :       { }
     300           0 :       virtual void execute()
     301             :       {
     302           0 :         if (timer_->reactor() && timer_->get_id()) {
     303           0 :           ACE_Guard<ACE_Thread_Mutex> guard(assoc_->mutex_);
     304           0 :           timer_->reactor()->cancel_timer(timer_->get_id());
     305           0 :           timer_->set_id(-1);
     306           0 :           assoc_->scheduled_ = false;
     307           0 :         }
     308           0 :       }
     309             :     };
     310             :     long timer_id_;
     311             :   };
     312             :   RcHandle<PendingAssocTimer> pending_assoc_timer_;
     313             : 
     314             :   // Associated Impls and DataLinks:
     315             : 
     316             :   TransportConfig_rch config_;
     317             :   ImplsType impls_;
     318             :   PendingMap pending_;
     319             :   PrevPendingMap prev_pending_;
     320             :   DataLinkSet links_;
     321             : 
     322             :   DataLinkIndex data_link_index_;
     323             : 
     324             :   // Used to allow sends to completed as a transaction and block
     325             :   // multi-threaded writers from proceeding to send data
     326             :   // on two thread simultaneously, which could cause out-of-order data.
     327             :   ACE_Thread_Mutex send_transaction_lock_;
     328             :   ACE_UINT64 expected_transaction_id_;
     329             :   ACE_UINT64 max_transaction_id_seen_;
     330             : 
     331             :   //max_transaction_tail_ will always be the tail of the
     332             :   //max transaction that has been observed or 0 if this is
     333             :   //the first transaction or a transaction after the expected
     334             :   //value was met and thus reset to 0 indicating the samples were
     335             :   //sent up to max_transaction_id_
     336             :   DataSampleElement* max_transaction_tail_;
     337             : 
     338             :   // Configuration details:
     339             : 
     340             :   bool swap_bytes_, cdr_encapsulation_, reliable_, durable_;
     341             : 
     342             :   TimeDuration passive_connect_duration_;
     343             : 
     344             :   TransportLocatorSeq conn_info_;
     345             : 
     346             :   /// Seems to protect accesses to impls_, pending_, links_, data_link_index_
     347             :   mutable ACE_Thread_Mutex lock_;
     348             : 
     349             :   Reverse_Lock_t reverse_lock_;
     350             : 
     351             :   GUID_t repo_id_;
     352             : };
     353             : 
     354             : typedef RcHandle<TransportClient> TransportClient_rch;
     355             : 
     356             : }
     357             : }
     358             : 
     359             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     360             : 
     361             : #endif

Generated by: LCOV version 1.16