TransportClient.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORT_CLIENT_H
00009 #define OPENDDS_DCPS_TRANSPORT_CLIENT_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "TransportConfig_rch.h"
00013 #include "TransportImpl.h"
00014 #include "DataLinkSet.h"
00015 
00016 #include "dds/DCPS/AssociationData.h"
00017 #include "dds/DCPS/ReactorInterceptor.h"
00018 #include "dds/DCPS/Service_Participant.h"
00019 #include "dds/DCPS/PoolAllocator.h"
00020 #include "dds/DCPS/PoolAllocationBase.h"
00021 #include "dds/DCPS/DiscoveryListener.h"
00022 
00023 #include "ace/Time_Value.h"
00024 #include "ace/Event_Handler.h"
00025 #include "ace/Reverse_Lock_T.h"
00026 
00027 // Forward definition of a test-friendly class in the global name space
00028 class DDS_TEST;
00029 
00030 namespace OpenDDS {
00031 namespace DCPS {
00032 
00033 class EntityImpl;
00034 class TransportInst;
00035 class AssocationInfo;
00036 class ReaderIdSeq;
00037 class WriterIdSeq;
00038 class SendStateDataSampleList;
00039 class SendStateDataSampleListIterator;
00040 
00041 /**
00042  * @brief Mix-in class for DDS entities which directly use the transport layer.
00043  *
00044  * DataReaderImpl and DataWriterImpl are TransportClients.  The TransportClient
00045  * class manages the TransportImpl objects that represent the available
00046  * communication mechanisms and the DataLink objects that represent the
00047  * currently active communication channels to peers.
00048  */
00049 class OpenDDS_Dcps_Export TransportClient {
00050 public:
00051   // Used by TransportImpl to complete associate() processing:
00052   void use_datalink(const RepoId& remote_id, const DataLink_rch& link);
00053 
00054   // values for flags parameter of transport_assoc_done():
00055   enum { ASSOC_OK = 1, ASSOC_ACTIVE = 2 };
00056 
00057 protected:
00058   TransportClient();
00059   virtual ~TransportClient();
00060 
00061 
00062   // Local setup:
00063 
00064   void enable_transport(bool reliable, bool durable);
00065   void enable_transport_using_config(bool reliable, bool durable,
00066                                      const TransportConfig_rch& tc);
00067 
00068   bool swap_bytes() const { return swap_bytes_; }
00069   bool cdr_encapsulation() const { return cdr_encapsulation_; }
00070   const TransportLocatorSeq& connection_info() const { return conn_info_; }
00071 
00072   // Managing associations to remote peers:
00073 
00074   bool associate(const AssociationData& peer, bool active);
00075   void disassociate(const RepoId& peerId);
00076   void stop_associating();
00077   void stop_associating(const GUID_t* repos, CORBA::ULong length);
00078   void send_final_acks();
00079 
00080   // Discovery:
00081   void register_for_reader(const RepoId& participant,
00082                            const RepoId& writerid,
00083                            const RepoId& readerid,
00084                            const TransportLocatorSeq& locators,
00085                            OpenDDS::DCPS::DiscoveryListener* listener);
00086 
00087   void unregister_for_reader(const RepoId& participant,
00088                              const RepoId& writerid,
00089                              const RepoId& readerid);
00090 
00091   void register_for_writer(const RepoId& participant,
00092                            const RepoId& readerid,
00093                            const RepoId& writerid,
00094                            const TransportLocatorSeq& locators,
00095                            DiscoveryListener* listener);
00096 
00097   void unregister_for_writer(const RepoId& participant,
00098                              const RepoId& readerid,
00099                              const RepoId& writerid);
00100 
00101   // Data transfer:
00102 
00103   bool send_response(const RepoId& peer,
00104                      const DataSampleHeader& header,
00105                      ACE_Message_Block* payload); // [DR]
00106 
00107   void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id = 0);
00108 
00109   SendControlStatus send_w_control(SendStateDataSampleList send_list,
00110                                    const DataSampleHeader& header,
00111                                    ACE_Message_Block* msg,
00112                                    const RepoId& destination);
00113 
00114   SendControlStatus send_control(const DataSampleHeader& header,
00115                                  ACE_Message_Block* msg);
00116 
00117   SendControlStatus send_control_to(const DataSampleHeader& header,
00118                                     ACE_Message_Block* msg,
00119                                     const RepoId& destination);
00120 
00121   bool remove_sample(const DataSampleElement* sample);
00122   bool remove_all_msgs();
00123 
00124   virtual void add_link(const DataLink_rch& link, const RepoId& peer);
00125 
00126   void on_notification_of_connection_deletion(const RepoId& peerId);
00127 
00128 private:
00129 
00130   // Implemented by derived classes (DataReaderImpl/DataWriterImpl)
00131   virtual bool check_transport_qos(const TransportInst& inst) = 0;
00132   virtual const RepoId& get_repo_id() const = 0;
00133   virtual DDS::DomainId_t domain_id() const = 0;
00134   virtual Priority get_priority_value(const AssociationData& data) const = 0;
00135   virtual void transport_assoc_done(int /*flags*/, const RepoId& /*remote*/) {}
00136 
00137   // transport_detached() is called from TransportImpl when it shuts down
00138   friend class TransportImpl;
00139   void transport_detached(TransportImpl* which);
00140 
00141   // helpers
00142   typedef ACE_Guard<ACE_Thread_Mutex> Guard;
00143   void use_datalink_i(const RepoId& remote_id,
00144                       const DataLink_rch& link,
00145                       Guard& guard);
00146   TransportSendListener* get_send_listener();
00147   TransportReceiveListener* get_receive_listener();
00148 
00149   //helper for initiating connection, called by PendingAssoc objects
00150   //allows PendingAssoc to temporarily release lock_ to allow
00151   //TransportImpl to access Reactor if needed
00152   bool initiate_connect_i(TransportImpl::AcceptConnectResult& result,
00153                           const TransportImpl_rch impl,
00154                           const TransportImpl::RemoteTransport& remote,
00155                           const TransportImpl::ConnectionAttribs& attribs_,
00156                           Guard& guard);
00157 
00158   void send_i(SendStateDataSampleList send_list, ACE_UINT64 transaction_id);
00159 
00160   // A class, normally provided by an unit test, who needs access to a client's
00161   // privates.
00162   friend class ::DDS_TEST;
00163 
00164   typedef OPENDDS_MAP_CMP(RepoId, DataLink_rch, GUID_tKeyLessThan) DataLinkIndex;
00165   typedef OPENDDS_VECTOR(TransportImpl_rch) ImplsType;
00166 
00167   struct PendingAssoc : ACE_Event_Handler, public PoolAllocationBase {
00168     bool active_, removed_;
00169     ImplsType impls_;
00170     CORBA::ULong blob_index_;
00171     AssociationData data_;
00172     TransportImpl::ConnectionAttribs attribs_;
00173 
00174     PendingAssoc()
00175       : active_(false), removed_(false), blob_index_(0)
00176     {}
00177 
00178     bool initiate_connect(TransportClient* tc, Guard& guard);
00179     int handle_timeout(const ACE_Time_Value& time, const void* arg);
00180   };
00181 
00182   typedef OPENDDS_MAP_CMP(RepoId, PendingAssoc*, GUID_tKeyLessThan) PendingMap;
00183 
00184   class PendingAssocTimer : public ReactorInterceptor {
00185   public:
00186     PendingAssocTimer(ACE_Reactor* reactor,
00187                       ACE_thread_t owner)
00188       : ReactorInterceptor(reactor, owner)
00189     { }
00190 
00191     void schedule_timer(TransportClient* transport_client, PendingAssoc* pend)
00192     {
00193       ScheduleCommand c(this, transport_client, pend);
00194       execute_or_enqueue(c);
00195     }
00196 
00197     void cancel_timer(TransportClient* transport_client, PendingAssoc* pend)
00198     {
00199       CancelCommand c(this, transport_client, pend);
00200       execute_or_enqueue(c);
00201     }
00202 
00203     void delete_pending_assoc(PendingAssoc* pend)
00204     {
00205       DeleteCommand c(pend);
00206       // Always defer.
00207       enqueue(c);
00208     }
00209 
00210     virtual bool reactor_is_shut_down() const
00211     {
00212       return TheServiceParticipant->is_shut_down();
00213     }
00214 
00215   private:
00216     ~PendingAssocTimer()
00217     { }
00218 
00219     class CommandBase : public Command {
00220     public:
00221       CommandBase(PendingAssocTimer* timer,
00222                   TransportClient* transport_client,
00223                   PendingAssoc* assoc)
00224         : timer_ (timer)
00225         , transport_client_ (transport_client)
00226         , assoc_ (assoc)
00227       { }
00228     protected:
00229       PendingAssocTimer* timer_;
00230       TransportClient* transport_client_;
00231       PendingAssoc* assoc_;
00232     };
00233     struct ScheduleCommand : public CommandBase {
00234       ScheduleCommand(PendingAssocTimer* timer,
00235                       TransportClient* transport_client,
00236                       PendingAssoc* assoc)
00237         : CommandBase (timer, transport_client, assoc)
00238       { }
00239       virtual void execute()
00240       {
00241         if (timer_->reactor()) {
00242           timer_->reactor()->schedule_timer(assoc_, transport_client_, transport_client_->passive_connect_duration_);
00243         }
00244       }
00245     };
00246     struct CancelCommand : public CommandBase {
00247       CancelCommand(PendingAssocTimer* timer,
00248                     TransportClient* transport_client,
00249                     PendingAssoc* assoc)
00250         : CommandBase (timer, transport_client, assoc)
00251       { }
00252       virtual void execute()
00253       {
00254         if (timer_->reactor()) {
00255           timer_->reactor()->cancel_timer(assoc_);
00256         }
00257       }
00258     };
00259     struct DeleteCommand : public CommandBase {
00260       DeleteCommand(PendingAssoc* assoc)
00261         : CommandBase (0, 0, assoc)
00262       { }
00263       virtual void execute()
00264       {
00265         delete assoc_;
00266       }
00267     };
00268   };
00269   PendingAssocTimer* pending_assoc_timer_;
00270 
00271   // Associated Impls and DataLinks:
00272 
00273   ImplsType impls_;
00274   PendingMap pending_;
00275   DataLinkSet links_;
00276   DataLinkIndex links_waiting_for_on_deleted_callback_;
00277 
00278   /// These are the links being used during the call to send(). This is made a member of the
00279   /// class to minimize allocation/deallocations of the data link set.
00280   DataLinkSet send_links_;
00281 
00282   DataLinkIndex data_link_index_;
00283 
00284   // Used to allow sends to completed as a transaction and block
00285   // multi-threaded writers from proceeding to send data
00286   // on two thread simultaneously, which could cause out-of-order data.
00287   ACE_Thread_Mutex send_transaction_lock_;
00288   ACE_UINT64 expected_transaction_id_;
00289   ACE_UINT64 max_transaction_id_seen_;
00290 
00291   //max_transaction_tail_ will always be the tail of the
00292   //max transaction that has been observed or 0 if this is
00293   //the first transaction or a transaction after the expected
00294   //value was met and thus reset to 0 indicating the samples were
00295   //sent up to max_transaction_id_
00296   DataSampleElement* max_transaction_tail_;
00297 
00298   // Configuration details:
00299 
00300   bool swap_bytes_, cdr_encapsulation_, reliable_, durable_;
00301 
00302   ACE_Time_Value passive_connect_duration_;
00303 
00304   TransportLocatorSeq conn_info_;
00305 
00306   //Seems to protect accesses to impls_, pending_, links_, data_link_index_
00307   ACE_Thread_Mutex lock_;
00308 
00309   typedef ACE_Reverse_Lock<ACE_Thread_Mutex> Reverse_Lock_t;
00310   Reverse_Lock_t reverse_lock_;
00311 
00312   RepoId repo_id_;
00313 };
00314 
00315 }
00316 }
00317 
00318 #endif

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7