DataLink.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_DATALINK_H
00009 #define OPENDDS_DCPS_DATALINK_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/RcObject_T.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015 #include "ReceiveListenerSetMap.h"
00016 #include "SendResponseListener.h"
00017 #include "TransportDefs.h"
00018 #include "TransportImpl_rch.h"
00019 #include "TransportSendStrategy.h"
00020 #include "TransportSendStrategy_rch.h"
00021 #include "TransportStrategy.h"
00022 #include "TransportStrategy_rch.h"
00023 #include "TransportSendControlElement.h"
00024 #include "TransportSendListener.h"
00025 #include "TransportReceiveListener.h"
00026 #include "dds/DCPS/transport/framework/QueueTaskBase_T.h"
00027 
00028 #include "ace/Synch.h"
00029 #include "ace/Event_Handler.h"
00030 
00031 #include <utility>
00032 
00033 #include <iosfwd> // For operator<<() diagnostic formatter.
00034 
00035 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00036 class ACE_SOCK;
00037 ACE_END_VERSIONED_NAMESPACE_DECL
00038 
00039 namespace OpenDDS {
00040 namespace DCPS {
00041 
00042 class  TransportReceiveListener;
00043 class  TransportSendListener;
00044 class  TransportQueueElement;
00045 class  ReceivedDataSample;
00046 class  DataSampleElement;
00047 class  ThreadPerConnectionSendTask;
00048 class  TransportClient;
00049 
00050 typedef OPENDDS_MAP_CMP(RepoId, DataLinkSet_rch, GUID_tKeyLessThan) DataLinkSetMap;
00051 
00052 /**
00053  * This class manages the reservations based on the associations between datareader
00054  * and datawriter. It basically delegate the samples to send strategy for sending and
00055  * deliver the samples received by receive strategy to the listener.
00056  *
00057  * Notes about object ownership:
00058  * 1) Own the send strategy object and receive strategy object.
00059  * 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection
00060  *    is enabled.
00061  */
00062 class OpenDDS_Dcps_Export DataLink
00063   : public RcObject<ACE_SYNCH_MUTEX>,
00064     public ACE_Event_Handler {
00065 
00066   friend class DataLinkCleanupTask;
00067 
00068 public:
00069 
00070   enum ConnectionNotice {
00071     DISCONNECTED,
00072     RECONNECTED,
00073     LOST
00074   };
00075 
00076   /// A DataLink object is always created by a TransportImpl object.
00077   /// Thus, the TransportImpl object passed-in here is the object that
00078   /// created this DataLink.  The ability to specify a priority
00079   /// for individual links is included for construction so its
00080   /// value can be available for activating any threads.
00081   DataLink(TransportImpl* impl, Priority priority, bool is_loopback, bool is_active);
00082   virtual ~DataLink();
00083 
00084   //Reactor invokes this after being notified in schedule_stop or cancel_release
00085   int handle_exception(ACE_HANDLE /* fd */);
00086 
00087   //Allows DataLink::stop to be done on the reactor thread so that
00088   //this thread avoids possibly deadlocking trying to access reactor
00089   //to stop strategies or schedule timers
00090   void schedule_stop(ACE_Time_Value& schedule_to_stop_at);
00091   /// The stop method is used to stop the DataLink prior to shutdown.
00092   void stop();
00093 
00094   /// The resume_send is used in the case of reconnection
00095   /// on the subscriber's side.
00096   void resume_send();
00097 
00098   /// Only called by our TransportImpl object.
00099   ///
00100   /// Return Codes: 0 means successful reservation made.
00101   ///              -1 means failure.
00102   int make_reservation(const RepoId& remote_subscription_id,
00103                        const RepoId& local_publication_id,
00104                        TransportSendListener* send_listener);
00105 
00106   /// Only called by our TransportImpl object.
00107   ///
00108   /// Return Codes: 0 means successful reservation made.
00109   ///              -1 means failure.
00110   int make_reservation(const RepoId& remote_publication_id,
00111                        const RepoId& local_subcription_id,
00112                        TransportReceiveListener* receive_listener);
00113 
00114   // ciju: Called by LinkSet with locks held
00115   /// This will release reservations that were made by one of the
00116   /// make_reservation() methods.  All we know is that the supplied
00117   /// RepoId is considered to be a remote id.  It could be a
00118   /// remote subscriber or a remote publisher.
00119   void release_reservations(RepoId          remote_id,
00120                             RepoId          local_id,
00121                             DataLinkSetMap& released_locals);
00122 
00123   void schedule_delayed_release();
00124 
00125   const ACE_Time_Value& datalink_release_delay() const;
00126 
00127   /// Either send or receive listener for this local_id should be
00128   /// removed from internal DataLink structures so it no longer
00129   /// receives events.
00130   void remove_listener(const RepoId& local_id);
00131 
00132   // ciju: Called by LinkSet with locks held
00133   /// Called by the TransportClient objects that reference this
00134   /// DataLink.  Used by the TransportClient to send a sample,
00135   /// or to send a control message. These functions either give the
00136   /// request to the PerThreadConnectionSendTask when thread_per_connection
00137   /// configuration is true or just simply delegate to the send strategy.
00138   void send_start();
00139   void send(TransportQueueElement* element);
00140   void send_stop(RepoId repoId);
00141 
00142   // ciju: Called by LinkSet with locks held
00143   /// This method is essentially an "undo_send()" method.  It's goal
00144   /// is to remove all traces of the sample from this DataLink (if
00145   /// the sample is even known to the DataLink).
00146   RemoveResult remove_sample(const DataSampleElement* sample);
00147 
00148   // ciju: Called by LinkSet with locks held
00149   void remove_all_msgs(RepoId pub_id);
00150 
00151   /// This is called by our TransportReceiveStrategy object when it
00152   /// has received a complete data sample.  This method will cause
00153   /// the appropriate TransportReceiveListener objects to be told
00154   /// that data_received().
00155   /// If readerId is not GUID_UNKNOWN, only the TransportReceiveListener
00156   /// with that ID (if one exists) will receive the data.
00157   int data_received(ReceivedDataSample& sample,
00158                     const RepoId& readerId = GUID_UNKNOWN);
00159 
00160   /// Varation of data_received() that allows for excluding a subset of readers
00161   /// by specifying which readers specifically should receive.
00162   /// Any reader ID that does not appear in the include set will be skipped.
00163   void data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl);
00164 
00165   /// Obtain a unique identifier for this DataLink object.
00166   DataLinkIdType id() const;
00167 
00168   /// Our TransportImpl will inform us if it is being shutdown()
00169   /// by calling this method.
00170   void transport_shutdown();
00171 
00172   /// Notify the datawriters and datareaders that the connection is
00173   /// disconnected, lost, or reconnected. The datareader/datawriter
00174   /// will notify the corresponding listener.
00175   void notify(ConnectionNotice notice);
00176 
00177   void notify_connection_deleted();
00178   virtual bool issues_on_deleted_callback() const;
00179 
00180   /// Called before release the datalink or before shutdown to let
00181   /// the concrete DataLink to do anything necessary.
00182   virtual void pre_stop_i();
00183 
00184   // Call-back from the concrete transport object.
00185   // The connection has been broken. No locks are being held.
00186   // Take a snapshot of current associations which will be removed
00187   // by DataLinkCleanupTask.
00188   bool release_resources();
00189 
00190   // Used by to inform the send strategy to clear all unsent samples upon
00191   // backpressure timed out.
00192   void terminate_send();
00193 
00194   /// This is called on publisher side to see if this link communicates
00195   /// with the provided sub.
00196   bool is_target(const RepoId& remote_sub_id);
00197 
00198   /// This is called by DataLinkCleanupTask thread to remove the associations
00199   /// based on the snapshot in release_resources().
00200   void clear_associations();
00201 
00202   int handle_timeout(const ACE_Time_Value& tv, const void* arg);
00203   int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m);
00204 
00205   // Set the DiffServ codepoint of the socket.  This is a stateless
00206   // method and is here only because this is a convenient common
00207   // location that can be reached by client code that needs to
00208   // perform this behavior.
00209   void set_dscp_codepoint(int cp, ACE_SOCK& socket);
00210 
00211   /// Accessors for the TRANSPORT_PRIORITY value associated with
00212   /// this link.
00213   Priority& transport_priority();
00214   Priority  transport_priority() const;
00215 
00216   bool& is_loopback();
00217   bool  is_loopback() const;
00218 
00219   bool& is_active();
00220   bool  is_active() const;
00221 
00222   bool cancel_release();
00223 
00224   /// This allows a subclass to easily create a transport control
00225   /// sample to send via send_control.
00226   ACE_Message_Block* create_control(char submessage_id,
00227                                     DataSampleHeader& header,
00228                                     ACE_Message_Block* data);
00229 
00230   /// This allows a subclass to send transport control samples over
00231   /// this DataLink. This is useful for sending transport-specific
00232   /// control messages between one or more endpoints under this
00233   /// DataLink's control.
00234   SendControlStatus send_control(const DataSampleHeader& header, ACE_Message_Block* data);
00235 
00236   /// For a given publication "pub_id", store the total number of corresponding
00237   /// subscriptions in "n_subs" and given a set of subscriptions
00238   /// (the "in" sequence), return the subset of the input set "in" which are
00239   /// targets of this DataLink (see is_target()).
00240   GUIDSeq* target_intersection(const RepoId& pub_id, const GUIDSeq& in, size_t& n_subs);
00241 
00242   TransportImpl_rch impl() const;
00243 
00244   void default_listener(TransportReceiveListener* trl);
00245   TransportReceiveListener* default_listener() const;
00246 
00247   typedef std::pair<TransportClient*, RepoId> OnStartCallback;
00248   bool add_on_start_callback(TransportClient* client, const RepoId& remote);
00249   void remove_on_start_callback(TransportClient* client, const RepoId& remote);
00250   void invoke_on_start_callbacks(bool success);
00251 
00252   void set_scheduling_release(bool scheduling_release);
00253 
00254   virtual void send_final_acks (const RepoId& readerid);
00255 
00256 protected:
00257 
00258   /// This is how the subclass "announces" to this DataLink base class
00259   /// that this DataLink has now been "connected" and should start
00260   /// the supplied strategy objects.  This start method is also
00261   /// going to keep a "copy" of the references to the strategy objects.
00262   /// Also note that it is acceptable to pass-in a NULL (0)
00263   /// TransportReceiveStrategy*, but it is assumed that the
00264   /// TransportSendStrategy* argument is not NULL.
00265   ///
00266   /// If the start() method fails to start either strategy, then a -1
00267   /// is returned.  Otherwise, a 0 is returned.  In the failure case,
00268   /// if one of the strategy objects was started successfully, then
00269   /// it will be stopped before the start() method returns -1.
00270   int start(const TransportSendStrategy_rch& send_strategy,
00271             const TransportStrategy_rch& receive_strategy);
00272 
00273   /// This announces the "stop" event to our subclass.  The "stop"
00274   /// event will occur when this DataLink is handling a
00275   /// release_reservations() call and determines that it has just
00276   /// released all of the remaining reservations on this DataLink.
00277   /// The "stop" event will also occur when the TransportImpl
00278   /// is being shutdown() - we call stop_i() from our
00279   /// transport_shutdown() method to handle this case.
00280   virtual void stop_i();
00281 
00282   /// Used to provide unique Ids to all DataLink methods.
00283   static ACE_UINT64 get_next_datalink_id();
00284 
00285   /// The transport receive strategy object for this DataLink.
00286   TransportStrategy_rch receive_strategy_;
00287 
00288   friend class ThreadPerConnectionSendTask;
00289 
00290   /// The implementation of the functions that accomplish the
00291   /// sample or control message delivery. IThey just simply
00292   /// delegate to the send strategy.
00293   void send_start_i();
00294   virtual void send_i(TransportQueueElement* element, bool relink = true);
00295   void send_stop_i(RepoId repoId);
00296 
00297   /// For a given local RepoId (publication or subscription), return the list
00298   /// of remote peer RepoIds (subscriptions or publications) that this link
00299   /// knows about due to make_reservation().
00300   GUIDSeq* peer_ids(const RepoId& local_id) const;
00301 
00302 private:
00303 
00304   /// Helper function to output the enum as a string to help debugging.
00305   const char* connection_notice_as_str(ConnectionNotice notice);
00306 
00307   TransportSendListener* send_listener_for(const RepoId& pub_id) const;
00308   TransportReceiveListener* recv_listener_for(const RepoId& sub_id) const;
00309 
00310   /// Save current sub and pub association maps for releasing and create
00311   /// empty maps for new associations.
00312   void prepare_release();
00313 
00314   /// Allow derived classes to provide an alternate "customized" queue element
00315   /// for this DataLink (not shared with other links in the DataLinkSet).
00316   virtual TransportQueueElement* customize_queue_element(
00317     TransportQueueElement* element)
00318   {
00319     return element;
00320   }
00321 
00322   virtual void release_remote_i(const RepoId& /*remote_id*/) {}
00323   virtual void release_reservations_i(const RepoId& /*remote_id*/,
00324                                       const RepoId& /*local_id*/) {}
00325 
00326   void data_received_i(ReceivedDataSample& sample,
00327                        const RepoId& readerId,
00328                        const RepoIdSet& incl_excl,
00329                        ReceiveListenerSet::ConstrainReceiveSet constrain);
00330 
00331   typedef ACE_SYNCH_MUTEX     LockType;
00332 
00333   /// Convenience function for diagnostic information.
00334 #ifndef OPENDDS_SAFETY_PROFILE
00335   friend OpenDDS_Dcps_Export
00336   std::ostream& operator<<(std::ostream& str, const DataLink& value);
00337 #endif
00338 
00339   /// A boolean indicating if the DataLink has been stopped. This
00340   /// value is protected by the strategy_lock_.
00341   bool stopped_;
00342   ACE_Time_Value scheduled_to_stop_at_;
00343 
00344   /// Map publication Id value to TransportSendListener.
00345   typedef OPENDDS_MAP_CMP(RepoId, TransportSendListener*, GUID_tKeyLessThan) IdToSendListenerMap;
00346   IdToSendListenerMap send_listeners_;
00347 
00348   /// Map subscription Id value to TransportReceieveListener.
00349   typedef OPENDDS_MAP_CMP(RepoId, TransportReceiveListener*, GUID_tKeyLessThan) IdToRecvListenerMap;
00350   IdToRecvListenerMap recv_listeners_;
00351 
00352   /// If default_listener_ is not null and this DataLink receives a sample
00353   /// from a publication GUID that's not in pub_map_, it will call
00354   /// data_received() on the default_listener_.
00355   TransportReceiveListener* default_listener_;
00356 
00357   mutable LockType pub_sub_maps_lock_;
00358 
00359   typedef OPENDDS_MAP_CMP(RepoId, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote;
00360   AssocByRemote assoc_by_remote_;
00361 
00362   typedef OPENDDS_MAP_CMP(RepoId, RepoIdSet, GUID_tKeyLessThan) AssocByLocal;
00363   AssocByLocal assoc_by_local_;
00364 
00365   mutable LockType released_assoc_by_local_lock_;
00366   AssocByLocal released_assoc_by_local_;
00367 
00368   /// A (smart) pointer to the TransportImpl that created this DataLink.
00369   TransportImpl_rch impl_;
00370 
00371   /// The id for this DataLink
00372   ACE_UINT64 id_;
00373 
00374   /// The task used to do the sending. This ThreadPerConnectionSendTask
00375   /// object is created when the thread_per_connection configuration is
00376   /// true. It only dedicate to this datalink.
00377   ThreadPerConnectionSendTask* thr_per_con_send_task_;
00378 
00379   // snapshot of associations when the release_resources() is called.
00380   AssocByLocal assoc_releasing_;
00381 
00382   /// TRANSPORT_PRIORITY value associated with the link.
00383   Priority transport_priority_;
00384 
00385   bool scheduling_release_;
00386 
00387 protected:
00388 
00389   typedef ACE_Guard<LockType> GuardType;
00390 
00391   /// The transport send strategy object for this DataLink.
00392   TransportSendStrategy_rch send_strategy_;
00393 
00394   LockType strategy_lock_;
00395   OPENDDS_VECTOR(OnStartCallback) on_start_callbacks_;
00396 
00397   /// Configurable delay in milliseconds that the datalink
00398   /// should be released after all associations are removed.
00399   ACE_Time_Value datalink_release_delay_;
00400 
00401   /// Allocator for TransportSendControlElements created when
00402   /// send_control is called.
00403   TransportSendControlElementAllocator* send_control_allocator_;
00404 
00405   /// Allocators for data and message blocks used by transport
00406   /// control samples when send_control is called.
00407   MessageBlockAllocator* mb_allocator_;
00408   DataBlockAllocator* db_allocator_;
00409 
00410   /// Is remote attached to same transport ?
00411   bool is_loopback_;
00412   /// Is pub or sub ?
00413   bool is_active_;
00414   bool started_;
00415 
00416   /// Listener for TransportSendControlElements created in send_control
00417   SendResponseListener send_response_listener_;
00418 };
00419 
00420 } // namespace DCPS
00421 } // namespace OpenDDS
00422 
00423 #if defined (__ACE_INLINE__)
00424 #include "DataLink.inl"
00425 #endif /* __ACE_INLINE__ */
00426 
00427 #endif /* OPENDDS_DCPS_DATALINK_H */

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7