TransportImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORTIMPL_H
00009 #define OPENDDS_DCPS_TRANSPORTIMPL_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/RcObject.h"
00013 #include "dds/DdsDcpsInfoUtilsC.h"
00014 #include "dds/DdsDcpsSubscriptionC.h"
00015 #include "dds/DdsDcpsPublicationC.h"
00016 #include "dds/DCPS/PoolAllocator.h"
00017 #include "TransportDefs.h"
00018 #include "TransportInst.h"
00019 #include "TransportReactorTask.h"
00020 #include "TransportReactorTask_rch.h"
00021 #include "DataLinkCleanupTask.h"
00022 #include "dds/DCPS/PoolAllocator.h"
00023 #include "dds/DCPS/DiscoveryListener.h"
00024 
00025 #if defined(OPENDDS_SECURITY)
00026 #include "dds/DdsSecurityCoreC.h"
00027 #endif
00028 
00029 #include "ace/Synch_Traits.h"
00030 
00031 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00032 
00033 namespace OpenDDS {
00034 namespace DCPS {
00035 
00036 class TransportClient;
00037 class TransportReceiveListener;
00038 class DataLink;
00039 class TransportInst;
00040 class Monitor;
00041 struct AssociationData;
00042 typedef RcHandle<TransportClient> TransportClient_rch;
00043 typedef WeakRcHandle<TransportClient> TransportClient_wrch;
00044 
00045 /** The TransportImpl class includes the abstract methods that must be implemented
00046 *   by any implementation to provide data delivery service to the DCPS implementation.
00047 *   This includes methods to send data, received data, configure the operation, and
00048 *   manage associations and datalinks between local and remote objects of the implementation.
00049 *
00050 *   Notes about object ownership:
00051 *   1)Has longer lifetime than the publisher and subscriber objects. The publishers
00052 *     and subscribers are owned by the DomainParticipant and transport factory shutdown
00053 *     is always after DomainParticipant factory shutdown.
00054 *   2)The concrete transport object owns the datalink objects.
00055 *   3)Own  a DataLinkCleanup object.
00056 *   4)Reference to TransportInst object and TransportReactorTask object owned
00057 *     by TransportRegistry.
00058 *   5)During transport shutdown, if this object does not have ownership of an object
00059 *     but has a references via smart pointer then the reference should be freed;
00060 *     if this object has ownership of task objects then the tasks should be closed.
00061 */
00062 class OpenDDS_Dcps_Export TransportImpl : public RcObject {
00063 public:
00064 
00065   virtual ~TransportImpl();
00066 
00067   /// Remove any pending_release mappings.
00068   virtual void unbind_link(DataLink* link);
00069 
00070   /// Callback from the DataLink to clean up any associated resources.
00071   /// This usually is done when the DataLink is lost. The call is made with
00072   /// no transport/DCPS locks held.
00073   bool release_link_resources(DataLink* link);
00074 
00075   /// Expose the configuration information so others can see what
00076   /// we can do.
00077   TransportInst& config() const;
00078 
00079   /// Called by our connection_info() method to allow the concrete
00080   /// TransportImpl subclass to do the dirty work since it really
00081   /// is the one that knows how to populate the supplied
00082   /// TransportLocator object.
00083   virtual bool connection_info_i(TransportLocator& local_info) const = 0;
00084 
00085   virtual void register_for_reader(const RepoId& /*participant*/,
00086                                    const RepoId& /*writerid*/,
00087                                    const RepoId& /*readerid*/,
00088                                    const TransportLocatorSeq& /*locators*/,
00089                                    OpenDDS::DCPS::DiscoveryListener* /*listener*/) { }
00090 
00091   virtual void unregister_for_reader(const RepoId& /*participant*/,
00092                                      const RepoId& /*writerid*/,
00093                                      const RepoId& /*readerid*/) { }
00094 
00095   virtual void register_for_writer(const RepoId& /*participant*/,
00096                                    const RepoId& /*readerid*/,
00097                                    const RepoId& /*writerid*/,
00098                                    const TransportLocatorSeq& /*locators*/,
00099                                    DiscoveryListener* /*listener*/) { }
00100 
00101   virtual void unregister_for_writer(const RepoId& /*participant*/,
00102                                      const RepoId& /*readerid*/,
00103                                      const RepoId& /*writerid*/) { }
00104 
00105   /// Interface to the transport's reactor for scheduling timers.
00106   ACE_Reactor_Timer_Interface* timer() const;
00107 
00108   ACE_Reactor* reactor() const;
00109   ACE_thread_t reactor_owner() const;
00110   bool is_shut_down() const;
00111 
00112   /// Create the reactor task using sync send or optionally async send
00113   /// by parameter on supported Windows platforms only.
00114   void create_reactor_task(bool useAsyncSend = false);
00115 
00116   /// Diagnostic aid.
00117   void dump();
00118   OPENDDS_STRING dump_to_str();
00119 
00120   void report();
00121 
00122   struct ConnectionAttribs {
00123     RepoId local_id_;
00124     Priority priority_;
00125     bool local_reliable_, local_durable_;
00126 
00127     ConnectionAttribs()
00128       : local_id_(GUID_UNKNOWN)
00129       , priority_(0)
00130       , local_reliable_(false)
00131       , local_durable_(false)
00132     {}
00133   };
00134 
00135   struct RemoteTransport {
00136     RepoId repo_id_;
00137     TransportBLOB blob_;
00138     Priority publication_transport_priority_;
00139     bool reliable_, durable_;
00140   };
00141 
00142   struct AcceptConnectResult {
00143     enum Status { ACR_SUCCESS, ACR_FAILED };
00144     explicit AcceptConnectResult(Status ok = ACR_FAILED)
00145       : success_(ok == ACR_SUCCESS), link_() {}
00146     AcceptConnectResult(const DataLink_rch& link)
00147       : success_(link), link_(link) {}
00148     /// If false, the accept or connect has failed and link_ is ignored.
00149     bool success_;
00150     /// If success_ is true, link_ may either be null or have a valid DataLink.
00151     /// If link_ is null the DataLink is not ready for use, and
00152     /// TransportClient::use_datalink() is called later.
00153     DataLink_rch link_;
00154   };
00155 
00156 protected:
00157   TransportImpl(TransportInst& config);
00158 
00159   bool open();
00160 
00161   /// connect_datalink() is called from TransportClient to initiate an
00162   /// association as the active peer.  A DataLink may be returned if
00163   /// one is already connected and ready to use, otherwise
00164   /// initiate a connection to the passive side and return from this
00165   /// method.  Upon completion of the physical connection, the
00166   /// transport calls back to TransportClient::use_datalink().
00167   virtual AcceptConnectResult connect_datalink(const RemoteTransport& remote,
00168                                                const ConnectionAttribs& attribs,
00169                                                const TransportClient_rch& client) = 0;
00170 
00171   /// accept_datalink() is called from TransportClient to initiate an
00172   /// association as the passive peer.  A DataLink may be returned if
00173   /// one is already connected and ready to use, otherwise
00174   /// passively wait for a physical connection from the active
00175   /// side (either in the form of a connection event or handshaking
00176   /// message).  Upon completion of the physical connection, the
00177   /// transport calls back to TransportClient::use_datalink().
00178   virtual AcceptConnectResult accept_datalink(const RemoteTransport& remote,
00179                                               const ConnectionAttribs& attribs,
00180                                               const TransportClient_rch& client) = 0;
00181 
00182   /// stop_accepting_or_connecting() is called from TransportClient
00183   /// to terminate the accepting process begun by accept_datalink()
00184   /// or connect_datalink().  This allows the TransportImpl to clean
00185   /// up any resources associated with this pending connection.
00186   /// The TransportClient* passed in to accept or connect is not
00187   /// valid after this method is called.
00188   virtual void stop_accepting_or_connecting(const TransportClient_wrch& client,
00189                                             const RepoId& remote_id) = 0;
00190 
00191 
00192   /// Called during the shutdown() method in order to give the
00193   /// concrete TransportImpl subclass a chance to do something when
00194   /// the shutdown "event" occurs.
00195   virtual void shutdown_i() = 0;
00196 
00197   /// Accessor to obtain a "copy" of the reference to the reactor task.
00198   /// Caller is responsible for the "copy" of the reference that is
00199   /// returned.
00200   TransportReactorTask_rch reactor_task();
00201 
00202   typedef OPENDDS_MULTIMAP(TransportClient_wrch, DataLink_rch) PendConnMap;
00203   PendConnMap pending_connections_;
00204   void add_pending_connection(const TransportClient_rch& client, DataLink_rch link);
00205   void shutdown();
00206 
00207 private:
00208   /// We have a few friends in the transport framework so that they
00209   /// can access our private methods.  We do this to avoid pollution
00210   /// of our public interface with internal framework methods.
00211   friend class TransportInst;
00212   friend class TransportClient;
00213   friend class DataLink;
00214   /// Called by the TransportRegistry when this TransportImpl object
00215   /// is released while the TransportRegistry is handling a release()
00216   /// "event".
00217 
00218   /// The DataLink itself calls this method when it thinks it is
00219   /// no longer used for any associations.  This occurs during
00220   /// a "remove associations" operation being performed by some
00221   /// TransportClient that uses this TransportImpl.  The
00222   /// TransportClient is known to have acquired our reservation_lock_,
00223   /// so there won't be any reserve_datalink() calls being made from
00224   /// any other threads while we perform this release.
00225   virtual void release_datalink(DataLink* link) = 0;
00226 
00227   DataLink* find_connect_i(const RepoId& local_id,
00228                            const AssociationData& remote_association,
00229                            const ConnectionAttribs& attribs,
00230                            bool active, bool connect);
00231 
00232 #if defined(OPENDDS_SECURITY)
00233   virtual void local_crypto_handle(DDS::Security::ParticipantCryptoHandle) {}
00234 #endif
00235 
00236 public:
00237   /// Called by our friends, the TransportClient, and the DataLink.
00238   /// Since this TransportImpl can be attached to many TransportClient
00239   /// objects, and each TransportClient object could be "running" in
00240   /// a separate thread, we need to protect all of the "reservation"
00241   /// methods with a lock.  The protocol is that a client of ours
00242   /// must "acquire" our reservation_lock_ before it can proceed to
00243   /// call any methods that affect the DataLink reservations.  It
00244   /// should release the reservation_lock_ as soon as it is done.
00245   int acquire();
00246   int tryacquire();
00247   int release();
00248   int remove();
00249 
00250   virtual OPENDDS_STRING transport_type() const = 0;
00251 
00252   /// Called by our friend, the TransportClient.
00253   /// Accessor for the TransportInterfaceInfo.  Accepts a reference
00254   /// to a TransportInterfaceInfo object that will be "populated"
00255   /// with this TransportImpl's connection information (ie, how
00256   /// another process would connect to this TransportImpl).
00257   bool connection_info(TransportLocator& local_info) const;
00258 
00259   typedef ACE_SYNCH_MUTEX     LockType;
00260   typedef ACE_Guard<LockType> GuardType;
00261 
00262   /// Lock to protect the config_ and reactor_task_ data members.
00263   mutable LockType lock_;
00264 
00265   /// A reference to the TransportInst
00266   /// object that was supplied to us during our configure() method.
00267   TransportInst& config_;
00268 
00269   /// The reactor (task) object - may not even be used if the concrete
00270   /// subclass (of TransportImpl) doesn't require a reactor.
00271   TransportReactorTask_rch reactor_task_;
00272 
00273   /// smart ptr to the associated DL cleanup task
00274   DataLinkCleanupTask dl_clean_task_;
00275 
00276   /// Monitor object for this entity
00277   Monitor* monitor_;
00278 
00279 protected:
00280   /// Id of the last link established.
00281   std::size_t last_link_;
00282   bool is_shut_down_;
00283 };
00284 
00285 } // namespace DCPS
00286 } // namespace OpenDDS
00287 
00288 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00289 
00290 #if defined (__ACE_INLINE__)
00291 #include "TransportImpl.inl"
00292 #endif /* __ACE_INLINE__ */
00293 
00294 #endif  /* OPENDDS_DCPS_TRANSPORTIMPL_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1