TransportClient.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORT_CLIENT_H
00009 #define OPENDDS_DCPS_TRANSPORT_CLIENT_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "TransportConfig_rch.h"
00013 #include "TransportImpl.h"
00014 #include "DataLinkSet.h"
00015 
00016 #include "dds/DCPS/AssociationData.h"
00017 #include "dds/DCPS/ReactorInterceptor.h"
00018 #include "dds/DCPS/Service_Participant.h"
00019 #include "dds/DCPS/PoolAllocator.h"
00020 #include "dds/DCPS/PoolAllocationBase.h"
00021 #include "dds/DCPS/DiscoveryListener.h"
00022 #include "dds/DCPS/RcEventHandler.h"
00023 
00024 #include "ace/Time_Value.h"
00025 #include "ace/Event_Handler.h"
00026 #include "ace/Reverse_Lock_T.h"
00027 
00028 // Forward definition of a test-friendly class in the global name space
00029 class DDS_TEST;
00030 
00031 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00032 
00033 namespace OpenDDS {
00034 namespace DCPS {
00035 
00036 class EntityImpl;
00037 class TransportInst;
00038 class AssocationInfo;
00039 class ReaderIdSeq;
00040 class WriterIdSeq;
00041 class SendStateDataSampleList;
00042 class SendStateDataSampleListIterator;
00043 
00044 /**
00045  * @brief Mix-in class for DDS entities which directly use the transport layer.
00046  *
00047  * DataReaderImpl and DataWriterImpl are TransportClients.  The TransportClient
00048  * class manages the TransportImpl objects that represent the available
00049  * communication mechanisms and the DataLink objects that represent the
00050  * currently active communication channels to peers.
00051  */
00052 class OpenDDS_Dcps_Export TransportClient
00053   : public virtual RcObject
00054 {
00055 public:
00056   // Used by TransportImpl to complete associate() processing:
00057   void use_datalink(const RepoId& remote_id, const DataLink_rch& link);
00058 
00059   // values for flags parameter of transport_assoc_done():
00060   enum { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };
00061   TransportClient();
00062   virtual ~TransportClient();
00063 
00064 
00065   // Local setup:
00066 
00067   void enable_transport(bool reliable, bool durable);
00068   void enable_transport_using_config(bool reliable, bool durable,
00069                                      const TransportConfig_rch& tc);
00070 
00071   bool swap_bytes() const { return swap_bytes_; }
00072   bool cdr_encapsulation() const { return cdr_encapsulation_; }
00073   const TransportLocatorSeq& connection_info() const { return conn_info_; }
00074 
00075   // Managing associations to remote peers:
00076 
00077   bool associate(const AssociationData& peer, bool active);
00078   void disassociate(const RepoId& peerId);
00079   void stop_associating();
00080   void stop_associating(const GUID_t* repos, CORBA::ULong length);
00081   void send_final_acks();
00082 
00083   // Discovery:
00084   void register_for_reader(const RepoId& participant,
00085                            const RepoId& writerid,
00086                            const RepoId& readerid,
00087                            const TransportLocatorSeq& locators,
00088                            OpenDDS::DCPS::DiscoveryListener* listener);
00089 
00090   void unregister_for_reader(const RepoId& participant,
00091                              const RepoId& writerid,
00092                              const RepoId& readerid);
00093 
00094   void register_for_writer(const RepoId& participant,
00095                            const RepoId& readerid,
00096                            const RepoId& writerid,
00097                            const TransportLocatorSeq& locators,
00098                            DiscoveryListener* listener);
00099 
00100   void unregister_for_writer(const RepoId& participant,
00101                              const RepoId& readerid,
00102                              const RepoId& writerid);
00103 
00104   // Data transfer:
00105 
00106   bool send_response(const RepoId& peer,
00107                      const DataSampleHeader& header,
00108                      Message_Block_Ptr payload); // [DR]
00109 
00110   void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id = 0);
00111 
00112   SendControlStatus send_w_control(SendStateDataSampleList send_list,
00113                                    const DataSampleHeader& header,
00114                                    Message_Block_Ptr msg,
00115                                    const RepoId& destination);
00116 
00117   SendControlStatus send_control(const DataSampleHeader& header,
00118                                  Message_Block_Ptr msg);
00119 
00120   SendControlStatus send_control_to(const DataSampleHeader& header,
00121                                     Message_Block_Ptr msg,
00122                                     const RepoId& destination);
00123 
00124   bool remove_sample(const DataSampleElement* sample);
00125   bool remove_all_msgs();
00126 
00127   virtual void add_link(const DataLink_rch& link, const RepoId& peer);
00128 
00129 private:
00130 
00131   // Implemented by derived classes (DataReaderImpl/DataWriterImpl)
00132   virtual bool check_transport_qos(const TransportInst& inst) = 0;
00133   virtual const RepoId& get_repo_id() const = 0;
00134   virtual DDS::DomainId_t domain_id() const = 0;
00135   virtual Priority get_priority_value(const AssociationData& data) const = 0;
00136   virtual void transport_assoc_done(int /*flags*/, const RepoId& /*remote*/) {}
00137 
00138 #if defined(OPENDDS_SECURITY)
00139   virtual DDS::Security::ParticipantCryptoHandle get_crypto_handle() const
00140   {
00141     return DDS::HANDLE_NIL;
00142   }
00143 #endif
00144 
00145   // helpers
00146   typedef ACE_Guard<ACE_Thread_Mutex> Guard;
00147   void use_datalink_i(const RepoId& remote_id,
00148                       const DataLink_rch& link,
00149                       Guard& guard);
00150   TransportSendListener_rch get_send_listener();
00151   TransportReceiveListener_rch get_receive_listener();
00152 
00153   //helper for initiating connection, called by PendingAssoc objects
00154   //allows PendingAssoc to temporarily release lock_ to allow
00155   //TransportImpl to access Reactor if needed
00156   bool initiate_connect_i(TransportImpl::AcceptConnectResult& result,
00157                           TransportImpl* impl,
00158                           const TransportImpl::RemoteTransport& remote,
00159                           const TransportImpl::ConnectionAttribs& attribs_,
00160                           Guard& guard);
00161 
00162   void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id);
00163 
00164   // A class, normally provided by an unit test, who needs access to a client's
00165   // privates.
00166   friend class ::DDS_TEST;
00167 
00168   typedef OPENDDS_MAP_CMP(RepoId, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex;
00169   typedef OPENDDS_VECTOR(TransportImpl*) ImplsType;
00170 
00171   struct PendingAssoc : RcEventHandler {
00172     bool active_, removed_;
00173     ImplsType impls_;
00174     CORBA::ULong blob_index_;
00175     AssociationData data_;
00176     TransportImpl::ConnectionAttribs attribs_;
00177 
00178     PendingAssoc()
00179       : active_(false)
00180       , removed_(false)
00181       , blob_index_(0)
00182     {}
00183 
00184     bool initiate_connect(TransportClient* tc, Guard& guard);
00185     int handle_timeout(const ACE_Time_Value& time, const void* arg);
00186   };
00187 
00188   typedef RcHandle<PendingAssoc> PendingAssoc_rch;
00189 
00190   typedef OPENDDS_MAP_CMP(RepoId, PendingAssoc_rch, GUID_tKeyLessThan) PendingMap;
00191 
00192   class PendingAssocTimer : public ReactorInterceptor {
00193   public:
00194     PendingAssocTimer(ACE_Reactor* reactor,
00195                       ACE_thread_t owner)
00196       : ReactorInterceptor(reactor, owner)
00197     { }
00198 
00199     void schedule_timer(TransportClient* transport_client, const PendingAssoc_rch& pend)
00200     {
00201       ScheduleCommand c(this, transport_client, pend);
00202       execute_or_enqueue(c);
00203     }
00204 
00205     void cancel_timer(TransportClient* transport_client, const PendingAssoc_rch& pend)
00206     {
00207       CancelCommand c(this, transport_client, pend);
00208       execute_or_enqueue(c);
00209     }
00210 
00211     virtual bool reactor_is_shut_down() const
00212     {
00213       return TheServiceParticipant->is_shut_down();
00214     }
00215 
00216   private:
00217     ~PendingAssocTimer()
00218     { }
00219 
00220     class CommandBase : public Command {
00221     public:
00222       CommandBase(PendingAssocTimer* timer,
00223                   TransportClient* transport_client,
00224                   const PendingAssoc_rch& assoc)
00225         : timer_ (timer)
00226         , transport_client_ (transport_client)
00227         , assoc_ (assoc)
00228       { }
00229     protected:
00230       PendingAssocTimer* timer_;
00231       TransportClient* transport_client_;
00232       PendingAssoc_rch assoc_;
00233     };
00234     struct ScheduleCommand : public CommandBase {
00235       ScheduleCommand(PendingAssocTimer* timer,
00236                       TransportClient* transport_client,
00237                       const PendingAssoc_rch& assoc)
00238         : CommandBase (timer, transport_client, assoc)
00239       { }
00240       virtual void execute()
00241       {
00242         if (timer_->reactor()) {
00243           timer_->reactor()->schedule_timer(assoc_.in(),
00244                                             transport_client_,
00245                                             transport_client_->passive_connect_duration_);
00246         }
00247       }
00248     };
00249     struct CancelCommand : public CommandBase {
00250       CancelCommand(PendingAssocTimer* timer,
00251                     TransportClient* transport_client,
00252                     const PendingAssoc_rch& assoc)
00253         : CommandBase (timer, transport_client, assoc)
00254       { }
00255       virtual void execute()
00256       {
00257         if (timer_->reactor()) {
00258           timer_->reactor()->cancel_timer(assoc_.in());
00259         }
00260       }
00261     };
00262   };
00263   RcHandle<PendingAssocTimer> pending_assoc_timer_;
00264 
00265   // Associated Impls and DataLinks:
00266 
00267   ImplsType impls_;
00268   PendingMap pending_;
00269   DataLinkSet links_;
00270 
00271   /// These are the links being used during the call to send(). This is made a member of the
00272   /// class to minimize allocation/deallocations of the data link set.
00273   DataLinkSet send_links_;
00274 
00275   DataLinkIndex data_link_index_;
00276 
00277   // Used to allow sends to completed as a transaction and block
00278   // multi-threaded writers from proceeding to send data
00279   // on two thread simultaneously, which could cause out-of-order data.
00280   ACE_Thread_Mutex send_transaction_lock_;
00281   ACE_UINT64 expected_transaction_id_;
00282   ACE_UINT64 max_transaction_id_seen_;
00283 
00284   //max_transaction_tail_ will always be the tail of the
00285   //max transaction that has been observed or 0 if this is
00286   //the first transaction or a transaction after the expected
00287   //value was met and thus reset to 0 indicating the samples were
00288   //sent up to max_transaction_id_
00289   DataSampleElement* max_transaction_tail_;
00290 
00291   // Configuration details:
00292 
00293   bool swap_bytes_, cdr_encapsulation_, reliable_, durable_;
00294 
00295   ACE_Time_Value passive_connect_duration_;
00296 
00297   TransportLocatorSeq conn_info_;
00298 
00299   /// Seems to protect accesses to impls_, pending_, links_, data_link_index_
00300   ACE_Thread_Mutex lock_;
00301 
00302   typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
00303   Reverse_Lock_t reverse_lock_;
00304 
00305   RepoId repo_id_;
00306 };
00307 
00308 typedef RcHandle<TransportClient> TransportClient_rch;
00309 
00310 }
00311 }
00312 
00313 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00314 
00315 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1