TransportImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORTIMPL_H
00009 #define OPENDDS_DCPS_TRANSPORTIMPL_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/RcObject_T.h"
00013 #include "dds/DdsDcpsInfoUtilsC.h"
00014 #include "dds/DdsDcpsSubscriptionC.h"
00015 #include "dds/DdsDcpsPublicationC.h"
00016 #include "dds/DCPS/PoolAllocator.h"
00017 #include "TransportDefs.h"
00018 #include "TransportInst.h"
00019 #include "TransportInst_rch.h"
00020 #include "TransportReactorTask.h"
00021 #include "TransportReactorTask_rch.h"
00022 #include "DataLinkCleanupTask.h"
00023 #include "ace/Synch.h"
00024 #include "dds/DCPS/PoolAllocator.h"
00025 #include "dds/DCPS/DiscoveryListener.h"
00026 
00027 namespace OpenDDS {
00028 namespace DCPS {
00029 
00030 class TransportClient;
00031 class TransportReceiveListener;
00032 class DataLink;
00033 class Monitor;
00034 struct AssociationData;
00035 
00036 /** The TransportImpl class includes the abstract methods that must be implemented
00037 *   by any implementation to provide data delivery service to the DCPS implementation.
00038 *   This includes methods to send data, received data, configure the operation, and
00039 *   manage associations and datalinks between local and remote objects of the implementation.
00040 *
00041 *   Notes about object ownership:
00042 *   1)Has longer lifetime than the publisher and subscriber objects. The publishers
00043 *     and subscribers are owned by the DomainParticipant and transport factory shutdown
00044 *     is always after DomainParticipant factory shutdown.
00045 *   2)The concrete transport object owns the datalink objects.
00046 *   3)Own  a DataLinkCleanup object.
00047 *   4)Reference to TransportInst object and TransportReactorTask object owned
00048 *     by TransportRegistry.
00049 *   5)During transport shutdown, if this object does not have ownership of an object
00050 *     but has a references via smart pointer then the reference should be freed;
00051 *     if this object has ownership of task objects then the tasks should be closed.
00052 */
00053 class OpenDDS_Dcps_Export TransportImpl : public RcObject<ACE_SYNCH_MUTEX> {
00054 public:
00055 
00056   virtual ~TransportImpl();
00057 
00058   /// Remove any pending_release mappings.
00059   virtual void unbind_link(DataLink* link);
00060 
00061   /// Callback from the DataLink to clean up any associated resources.
00062   /// This usually is done when the DataLink is lost. The call is made with
00063   /// no transport/DCPS locks held.
00064   bool release_link_resources(DataLink* link);
00065 
00066   /// Expose the configuration information so others can see what
00067   /// we can do.
00068   TransportInst* config() const;
00069 
00070   /// Called by our connection_info() method to allow the concrete
00071   /// TransportImpl subclass to do the dirty work since it really
00072   /// is the one that knows how to populate the supplied
00073   /// TransportLocator object.
00074   virtual bool connection_info_i(TransportLocator& local_info) const = 0;
00075 
00076   virtual void register_for_reader(const RepoId& /*participant*/,
00077                                    const RepoId& /*writerid*/,
00078                                    const RepoId& /*readerid*/,
00079                                    const TransportLocatorSeq& /*locators*/,
00080                                    OpenDDS::DCPS::DiscoveryListener* /*listener*/) { }
00081 
00082   virtual void unregister_for_reader(const RepoId& /*participant*/,
00083                                      const RepoId& /*writerid*/,
00084                                      const RepoId& /*readerid*/) { }
00085 
00086   virtual void register_for_writer(const RepoId& /*participant*/,
00087                                    const RepoId& /*readerid*/,
00088                                    const RepoId& /*writerid*/,
00089                                    const TransportLocatorSeq& /*locators*/,
00090                                    DiscoveryListener* /*listener*/) { }
00091 
00092   virtual void unregister_for_writer(const RepoId& /*participant*/,
00093                                      const RepoId& /*readerid*/,
00094                                      const RepoId& /*writerid*/) { }
00095 
00096   /// Interface to the transport's reactor for scheduling timers.
00097   ACE_Reactor_Timer_Interface* timer() const;
00098 
00099   ACE_Reactor* reactor() const;
00100   ACE_thread_t reactor_owner() const;
00101   bool is_shut_down() const;
00102 
00103   /// Create the reactor task using sync send or optionally async send
00104   /// by parameter on supported Windows platforms only.
00105   void create_reactor_task(bool useAsyncSend = false);
00106 
00107   /// Diagnostic aid.
00108   void dump();
00109   OPENDDS_STRING dump_to_str();
00110 
00111   void report();
00112 
00113   struct ConnectionAttribs {
00114     RepoId local_id_;
00115     Priority priority_;
00116     bool local_reliable_, local_durable_;
00117   };
00118 
00119   struct RemoteTransport {
00120     RepoId repo_id_;
00121     TransportBLOB blob_;
00122     Priority publication_transport_priority_;
00123     bool reliable_, durable_;
00124   };
00125 
00126   struct AcceptConnectResult {
00127     enum Status { ACR_SUCCESS, ACR_FAILED };
00128     explicit AcceptConnectResult(Status ok = ACR_FAILED)
00129       : success_(ok == ACR_SUCCESS), link_(0) {}
00130     explicit AcceptConnectResult(DataLink* link)
00131       : success_(link), link_(link) {}
00132     /// If false, the accept or connect has failed and link_ is ignored.
00133     bool success_;
00134     /// If success_ is true, link_ may either be null or have a valid DataLink.
00135     /// If link_ is null the DataLink is not ready for use, and
00136     /// TransportClient::use_datalink() is called later.
00137     DataLink_rch link_;
00138   };
00139 
00140 protected:
00141   TransportImpl();
00142 
00143   bool configure(TransportInst* config);
00144 
00145   /// connect_datalink() is called from TransportClient to initiate an
00146   /// association as the active peer.  A DataLink may be returned if
00147   /// one is already connected and ready to use, otherwise
00148   /// initiate a connection to the passive side and return from this
00149   /// method.  Upon completion of the physical connection, the
00150   /// transport calls back to TransportClient::use_datalink().
00151   virtual AcceptConnectResult connect_datalink(const RemoteTransport& remote,
00152                                                const ConnectionAttribs& attribs,
00153                                                TransportClient* client) = 0;
00154 
00155   /// accept_datalink() is called from TransportClient to initiate an
00156   /// association as the passive peer.  A DataLink may be returned if
00157   /// one is already connected and ready to use, otherwise
00158   /// passively wait for a physical connection from the active
00159   /// side (either in the form of a connection event or handshaking
00160   /// message).  Upon completion of the physical connection, the
00161   /// transport calls back to TransportClient::use_datalink().
00162   virtual AcceptConnectResult accept_datalink(const RemoteTransport& remote,
00163                                               const ConnectionAttribs& attribs,
00164                                               TransportClient* client) = 0;
00165 
00166   /// stop_accepting_or_connecting() is called from TransportClient
00167   /// to terminate the accepting process begun by accept_datalink()
00168   /// or connect_datalink().  This allows the TransportImpl to clean
00169   /// up any resources associated with this pending connection.
00170   /// The TransportClient* passed in to accept or connect is not
00171   /// valid after this method is called.
00172   virtual void stop_accepting_or_connecting(TransportClient* client,
00173                                             const RepoId& remote_id) = 0;
00174 
00175   /// Concrete subclass gets a shot at the config object.  The subclass
00176   /// will likely downcast the TransportInst object to a
00177   /// subclass type that it expects/requires.
00178   virtual bool configure_i(TransportInst* config) = 0;
00179 
00180   /// Called during the shutdown() method in order to give the
00181   /// concrete TransportImpl subclass a chance to do something when
00182   /// the shutdown "event" occurs.
00183   virtual void shutdown_i() = 0;
00184 
00185   /// Called before transport is shutdown to let the
00186   /// concrete transport to do anything necessary.
00187   virtual void pre_shutdown_i();
00188 
00189   /// Accessor to obtain a "copy" of the reference to the reactor task.
00190   /// Caller is responsible for the "copy" of the reference that is
00191   /// returned.
00192   TransportReactorTask* reactor_task();
00193 
00194   OPENDDS_MULTIMAP(TransportClient*, DataLink_rch) pending_connections_;
00195   void add_pending_connection(TransportClient* client, DataLink* link);
00196 
00197 private:
00198   /// We have a few friends in the transport framework so that they
00199   /// can access our private methods.  We do this to avoid pollution
00200   /// of our public interface with internal framework methods.
00201   friend class TransportInst;
00202   friend class TransportClient;
00203   friend class DataLink;
00204   /// Called by the TransportRegistry when this TransportImpl object
00205   /// is released while the TransportRegistry is handling a release()
00206   /// "event".
00207   void shutdown();
00208 
00209   /// The DataLink itself calls this method when it thinks it is
00210   /// no longer used for any associations.  This occurs during
00211   /// a "remove associations" operation being performed by some
00212   /// TransportClient that uses this TransportImpl.  The
00213   /// TransportClient is known to have acquired our reservation_lock_,
00214   /// so there won't be any reserve_datalink() calls being made from
00215   /// any other threads while we perform this release.
00216   virtual void release_datalink(DataLink* link) = 0;
00217 
00218   void attach_client(TransportClient* client);
00219   void detach_client(TransportClient* client);
00220   virtual void pre_detach(TransportClient*) {}
00221 
00222   DataLink* find_connect_i(const RepoId& local_id,
00223                            const AssociationData& remote_association,
00224                            const ConnectionAttribs& attribs,
00225                            bool active, bool connect);
00226 
00227 public:
00228   /// Called by our friends, the TransportClient, and the DataLink.
00229   /// Since this TransportImpl can be attached to many TransportClient
00230   /// objects, and each TransportClient object could be "running" in
00231   /// a separate thread, we need to protect all of the "reservation"
00232   /// methods with a lock.  The protocol is that a client of ours
00233   /// must "acquire" our reservation_lock_ before it can proceed to
00234   /// call any methods that affect the DataLink reservations.  It
00235   /// should release the reservation_lock_ as soon as it is done.
00236   int acquire();
00237   int tryacquire();
00238   int release();
00239   int remove();
00240 
00241   virtual OPENDDS_STRING transport_type() const = 0;
00242 
00243   /// Called by our friend, the TransportClient.
00244   /// Accessor for the TransportInterfaceInfo.  Accepts a reference
00245   /// to a TransportInterfaceInfo object that will be "populated"
00246   /// with this TransportImpl's connection information (ie, how
00247   /// another process would connect to this TransportImpl).
00248   bool connection_info(TransportLocator& local_info) const;
00249 
00250   typedef ACE_SYNCH_MUTEX     LockType;
00251   typedef ACE_Guard<LockType> GuardType;
00252 
00253   /// Lock to protect the config_ and reactor_task_ data members.
00254   mutable LockType lock_;
00255 
00256   OPENDDS_SET(TransportClient*) clients_;
00257 
00258   /// A reference (via a smart pointer) to the TransportInst
00259   /// object that was supplied to us during our configure() method.
00260   TransportInst_rch config_;
00261 
00262   /// The reactor (task) object - may not even be used if the concrete
00263   /// subclass (of TransportImpl) doesn't require a reactor.
00264   TransportReactorTask_rch reactor_task_;
00265 
00266   /// smart ptr to the associated DL cleanup task
00267   DataLinkCleanupTask dl_clean_task_;
00268 
00269   /// Monitor object for this entity
00270   Monitor* monitor_;
00271 
00272 protected:
00273   /// Id of the last link established.
00274   std::size_t last_link_;
00275   bool is_shut_down_;
00276 };
00277 
00278 } // namespace DCPS
00279 } // namespace OpenDDS
00280 
00281 #if defined (__ACE_INLINE__)
00282 #include "TransportImpl.inl"
00283 #endif /* __ACE_INLINE__ */
00284 
00285 #endif  /* OPENDDS_DCPS_TRANSPORTIMPL_H */

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7