00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_TRANSPORTIMPL_H 00009 #define OPENDDS_DCPS_TRANSPORTIMPL_H 00010 00011 #include "dds/DCPS/dcps_export.h" 00012 #include "dds/DCPS/RcObject_T.h" 00013 #include "dds/DdsDcpsInfoUtilsC.h" 00014 #include "dds/DdsDcpsSubscriptionC.h" 00015 #include "dds/DdsDcpsPublicationC.h" 00016 #include "dds/DCPS/PoolAllocator.h" 00017 #include "TransportDefs.h" 00018 #include "TransportInst.h" 00019 #include "TransportInst_rch.h" 00020 #include "TransportReactorTask.h" 00021 #include "TransportReactorTask_rch.h" 00022 #include "DataLinkCleanupTask.h" 00023 #include "ace/Synch.h" 00024 #include "dds/DCPS/PoolAllocator.h" 00025 #include "dds/DCPS/DiscoveryListener.h" 00026 00027 namespace OpenDDS { 00028 namespace DCPS { 00029 00030 class TransportClient; 00031 class TransportReceiveListener; 00032 class DataLink; 00033 class Monitor; 00034 struct AssociationData; 00035 00036 /** The TransportImpl class includes the abstract methods that must be implemented 00037 * by any implementation to provide data delivery service to the DCPS implementation. 00038 * This includes methods to send data, received data, configure the operation, and 00039 * manage associations and datalinks between local and remote objects of the implementation. 00040 * 00041 * Notes about object ownership: 00042 * 1)Has longer lifetime than the publisher and subscriber objects. The publishers 00043 * and subscribers are owned by the DomainParticipant and transport factory shutdown 00044 * is always after DomainParticipant factory shutdown. 00045 * 2)The concrete transport object owns the datalink objects. 00046 * 3)Own a DataLinkCleanup object. 00047 * 4)Reference to TransportInst object and TransportReactorTask object owned 00048 * by TransportRegistry. 00049 * 5)During transport shutdown, if this object does not have ownership of an object 00050 * but has a references via smart pointer then the reference should be freed; 00051 * if this object has ownership of task objects then the tasks should be closed. 00052 */ 00053 class OpenDDS_Dcps_Export TransportImpl : public RcObject<ACE_SYNCH_MUTEX> { 00054 public: 00055 00056 virtual ~TransportImpl(); 00057 00058 /// Remove any pending_release mappings. 00059 virtual void unbind_link(DataLink* link); 00060 00061 /// Callback from the DataLink to clean up any associated resources. 00062 /// This usually is done when the DataLink is lost. The call is made with 00063 /// no transport/DCPS locks held. 00064 bool release_link_resources(DataLink* link); 00065 00066 /// Expose the configuration information so others can see what 00067 /// we can do. 00068 TransportInst* config() const; 00069 00070 /// Called by our connection_info() method to allow the concrete 00071 /// TransportImpl subclass to do the dirty work since it really 00072 /// is the one that knows how to populate the supplied 00073 /// TransportLocator object. 00074 virtual bool connection_info_i(TransportLocator& local_info) const = 0; 00075 00076 virtual void register_for_reader(const RepoId& /*participant*/, 00077 const RepoId& /*writerid*/, 00078 const RepoId& /*readerid*/, 00079 const TransportLocatorSeq& /*locators*/, 00080 OpenDDS::DCPS::DiscoveryListener* /*listener*/) { } 00081 00082 virtual void unregister_for_reader(const RepoId& /*participant*/, 00083 const RepoId& /*writerid*/, 00084 const RepoId& /*readerid*/) { } 00085 00086 virtual void register_for_writer(const RepoId& /*participant*/, 00087 const RepoId& /*readerid*/, 00088 const RepoId& /*writerid*/, 00089 const TransportLocatorSeq& /*locators*/, 00090 DiscoveryListener* /*listener*/) { } 00091 00092 virtual void unregister_for_writer(const RepoId& /*participant*/, 00093 const RepoId& /*readerid*/, 00094 const RepoId& /*writerid*/) { } 00095 00096 /// Interface to the transport's reactor for scheduling timers. 00097 ACE_Reactor_Timer_Interface* timer() const; 00098 00099 ACE_Reactor* reactor() const; 00100 ACE_thread_t reactor_owner() const; 00101 bool is_shut_down() const; 00102 00103 /// Create the reactor task using sync send or optionally async send 00104 /// by parameter on supported Windows platforms only. 00105 void create_reactor_task(bool useAsyncSend = false); 00106 00107 /// Diagnostic aid. 00108 void dump(); 00109 OPENDDS_STRING dump_to_str(); 00110 00111 void report(); 00112 00113 struct ConnectionAttribs { 00114 RepoId local_id_; 00115 Priority priority_; 00116 bool local_reliable_, local_durable_; 00117 }; 00118 00119 struct RemoteTransport { 00120 RepoId repo_id_; 00121 TransportBLOB blob_; 00122 Priority publication_transport_priority_; 00123 bool reliable_, durable_; 00124 }; 00125 00126 struct AcceptConnectResult { 00127 enum Status { ACR_SUCCESS, ACR_FAILED }; 00128 explicit AcceptConnectResult(Status ok = ACR_FAILED) 00129 : success_(ok == ACR_SUCCESS), link_(0) {} 00130 explicit AcceptConnectResult(DataLink* link) 00131 : success_(link), link_(link) {} 00132 /// If false, the accept or connect has failed and link_ is ignored. 00133 bool success_; 00134 /// If success_ is true, link_ may either be null or have a valid DataLink. 00135 /// If link_ is null the DataLink is not ready for use, and 00136 /// TransportClient::use_datalink() is called later. 00137 DataLink_rch link_; 00138 }; 00139 00140 protected: 00141 TransportImpl(); 00142 00143 bool configure(TransportInst* config); 00144 00145 /// connect_datalink() is called from TransportClient to initiate an 00146 /// association as the active peer. A DataLink may be returned if 00147 /// one is already connected and ready to use, otherwise 00148 /// initiate a connection to the passive side and return from this 00149 /// method. Upon completion of the physical connection, the 00150 /// transport calls back to TransportClient::use_datalink(). 00151 virtual AcceptConnectResult connect_datalink(const RemoteTransport& remote, 00152 const ConnectionAttribs& attribs, 00153 TransportClient* client) = 0; 00154 00155 /// accept_datalink() is called from TransportClient to initiate an 00156 /// association as the passive peer. A DataLink may be returned if 00157 /// one is already connected and ready to use, otherwise 00158 /// passively wait for a physical connection from the active 00159 /// side (either in the form of a connection event or handshaking 00160 /// message). Upon completion of the physical connection, the 00161 /// transport calls back to TransportClient::use_datalink(). 00162 virtual AcceptConnectResult accept_datalink(const RemoteTransport& remote, 00163 const ConnectionAttribs& attribs, 00164 TransportClient* client) = 0; 00165 00166 /// stop_accepting_or_connecting() is called from TransportClient 00167 /// to terminate the accepting process begun by accept_datalink() 00168 /// or connect_datalink(). This allows the TransportImpl to clean 00169 /// up any resources associated with this pending connection. 00170 /// The TransportClient* passed in to accept or connect is not 00171 /// valid after this method is called. 00172 virtual void stop_accepting_or_connecting(TransportClient* client, 00173 const RepoId& remote_id) = 0; 00174 00175 /// Concrete subclass gets a shot at the config object. The subclass 00176 /// will likely downcast the TransportInst object to a 00177 /// subclass type that it expects/requires. 00178 virtual bool configure_i(TransportInst* config) = 0; 00179 00180 /// Called during the shutdown() method in order to give the 00181 /// concrete TransportImpl subclass a chance to do something when 00182 /// the shutdown "event" occurs. 00183 virtual void shutdown_i() = 0; 00184 00185 /// Called before transport is shutdown to let the 00186 /// concrete transport to do anything necessary. 00187 virtual void pre_shutdown_i(); 00188 00189 /// Accessor to obtain a "copy" of the reference to the reactor task. 00190 /// Caller is responsible for the "copy" of the reference that is 00191 /// returned. 00192 TransportReactorTask* reactor_task(); 00193 00194 OPENDDS_MULTIMAP(TransportClient*, DataLink_rch) pending_connections_; 00195 void add_pending_connection(TransportClient* client, DataLink* link); 00196 00197 private: 00198 /// We have a few friends in the transport framework so that they 00199 /// can access our private methods. We do this to avoid pollution 00200 /// of our public interface with internal framework methods. 00201 friend class TransportInst; 00202 friend class TransportClient; 00203 friend class DataLink; 00204 /// Called by the TransportRegistry when this TransportImpl object 00205 /// is released while the TransportRegistry is handling a release() 00206 /// "event". 00207 void shutdown(); 00208 00209 /// The DataLink itself calls this method when it thinks it is 00210 /// no longer used for any associations. This occurs during 00211 /// a "remove associations" operation being performed by some 00212 /// TransportClient that uses this TransportImpl. The 00213 /// TransportClient is known to have acquired our reservation_lock_, 00214 /// so there won't be any reserve_datalink() calls being made from 00215 /// any other threads while we perform this release. 00216 virtual void release_datalink(DataLink* link) = 0; 00217 00218 void attach_client(TransportClient* client); 00219 void detach_client(TransportClient* client); 00220 virtual void pre_detach(TransportClient*) {} 00221 00222 DataLink* find_connect_i(const RepoId& local_id, 00223 const AssociationData& remote_association, 00224 const ConnectionAttribs& attribs, 00225 bool active, bool connect); 00226 00227 public: 00228 /// Called by our friends, the TransportClient, and the DataLink. 00229 /// Since this TransportImpl can be attached to many TransportClient 00230 /// objects, and each TransportClient object could be "running" in 00231 /// a separate thread, we need to protect all of the "reservation" 00232 /// methods with a lock. The protocol is that a client of ours 00233 /// must "acquire" our reservation_lock_ before it can proceed to 00234 /// call any methods that affect the DataLink reservations. It 00235 /// should release the reservation_lock_ as soon as it is done. 00236 int acquire(); 00237 int tryacquire(); 00238 int release(); 00239 int remove(); 00240 00241 virtual OPENDDS_STRING transport_type() const = 0; 00242 00243 /// Called by our friend, the TransportClient. 00244 /// Accessor for the TransportInterfaceInfo. Accepts a reference 00245 /// to a TransportInterfaceInfo object that will be "populated" 00246 /// with this TransportImpl's connection information (ie, how 00247 /// another process would connect to this TransportImpl). 00248 bool connection_info(TransportLocator& local_info) const; 00249 00250 typedef ACE_SYNCH_MUTEX LockType; 00251 typedef ACE_Guard<LockType> GuardType; 00252 00253 /// Lock to protect the config_ and reactor_task_ data members. 00254 mutable LockType lock_; 00255 00256 OPENDDS_SET(TransportClient*) clients_; 00257 00258 /// A reference (via a smart pointer) to the TransportInst 00259 /// object that was supplied to us during our configure() method. 00260 TransportInst_rch config_; 00261 00262 /// The reactor (task) object - may not even be used if the concrete 00263 /// subclass (of TransportImpl) doesn't require a reactor. 00264 TransportReactorTask_rch reactor_task_; 00265 00266 /// smart ptr to the associated DL cleanup task 00267 DataLinkCleanupTask dl_clean_task_; 00268 00269 /// Monitor object for this entity 00270 Monitor* monitor_; 00271 00272 protected: 00273 /// Id of the last link established. 00274 std::size_t last_link_; 00275 bool is_shut_down_; 00276 }; 00277 00278 } // namespace DCPS 00279 } // namespace OpenDDS 00280 00281 #if defined (__ACE_INLINE__) 00282 #include "TransportImpl.inl" 00283 #endif /* __ACE_INLINE__ */ 00284 00285 #endif /* OPENDDS_DCPS_TRANSPORTIMPL_H */