DataLink.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_DATALINK_H
00009 #define OPENDDS_DCPS_DATALINK_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/RcObject.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015 #include "dds/DCPS/RcEventHandler.h"
00016 #include "ReceiveListenerSetMap.h"
00017 #include "SendResponseListener.h"
00018 #include "TransportDefs.h"
00019 #include "TransportSendStrategy.h"
00020 #include "TransportSendStrategy_rch.h"
00021 #include "TransportStrategy.h"
00022 #include "TransportStrategy_rch.h"
00023 #include "TransportSendControlElement.h"
00024 #include "TransportSendListener.h"
00025 #include "TransportReceiveListener.h"
00026 #include "dds/DCPS/transport/framework/QueueTaskBase_T.h"
00027 
00028 #include "ace/Event_Handler.h"
00029 #include "ace/Synch_Traits.h"
00030 
00031 #include <utility>
00032 
00033 #include <iosfwd> // For operator<<() diagnostic formatter.
00034 
00035 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00036 class ACE_SOCK;
00037 ACE_END_VERSIONED_NAMESPACE_DECL
00038 
00039 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00040 
00041 namespace OpenDDS {
00042 namespace DCPS {
00043 
00044 
00045 class  TransportQueueElement;
00046 class  ReceivedDataSample;
00047 class  DataSampleElement;
00048 class  ThreadPerConnectionSendTask;
00049 class  TransportClient;
00050 class TransportImpl;
00051 
00052 typedef OPENDDS_MAP_CMP(RepoId, DataLinkSet_rch, GUID_tKeyLessThan) DataLinkSetMap;
00053 
00054 typedef WeakRcHandle<TransportSendListener> TransportSendListener_wrch;
00055 
00056 /**
00057  * This class manages the reservations based on the associations between datareader
00058  * and datawriter. It basically delegate the samples to send strategy for sending and
00059  * deliver the samples received by receive strategy to the listener.
00060  *
00061  * Notes about object ownership:
00062  * 1) Own the send strategy object and receive strategy object.
00063  * 2) Own ThreadPerConnectionSendTask object which is used when thread_per_connection
00064  *    is enabled.
00065  */
00066 class OpenDDS_Dcps_Export DataLink
00067 : public RcEventHandler {
00068 
00069   friend class DataLinkCleanupTask;
00070 
00071 public:
00072 
00073   enum ConnectionNotice {
00074     DISCONNECTED,
00075     RECONNECTED,
00076     LOST
00077   };
00078 
00079   /// A DataLink object is always created by a TransportImpl object.
00080   /// Thus, the TransportImpl object passed-in here is the object that
00081   /// created this DataLink.  The ability to specify a priority
00082   /// for individual links is included for construction so its
00083   /// value can be available for activating any threads.
00084   DataLink(TransportImpl& impl, Priority priority, bool is_loopback, bool is_active);
00085   virtual ~DataLink();
00086 
00087   //Reactor invokes this after being notified in schedule_stop or cancel_release
00088   int handle_exception(ACE_HANDLE /* fd */);
00089 
00090   //Allows DataLink::stop to be done on the reactor thread so that
00091   //this thread avoids possibly deadlocking trying to access reactor
00092   //to stop strategies or schedule timers
00093   void schedule_stop(const ACE_Time_Value& schedule_to_stop_at);
00094   /// The stop method is used to stop the DataLink prior to shutdown.
00095   void stop();
00096 
00097   /// The resume_send is used in the case of reconnection
00098   /// on the subscriber's side.
00099   void resume_send();
00100 
00101   /// Only called by our TransportImpl object.
00102   ///
00103   /// Return Codes: 0 means successful reservation made.
00104   ///              -1 means failure.
00105   int make_reservation(const RepoId& remote_subscription_id,
00106                        const RepoId& local_publication_id,
00107                        const TransportSendListener_wrch& send_listener);
00108 
00109   /// Only called by our TransportImpl object.
00110   ///
00111   /// Return Codes: 0 means successful reservation made.
00112   ///              -1 means failure.
00113   int make_reservation(const RepoId& remote_publication_id,
00114                        const RepoId& local_subcription_id,
00115                        const TransportReceiveListener_wrch& receive_listener);
00116 
00117   // ciju: Called by LinkSet with locks held
00118   /// This will release reservations that were made by one of the
00119   /// make_reservation() methods.  All we know is that the supplied
00120   /// RepoId is considered to be a remote id.  It could be a
00121   /// remote subscriber or a remote publisher.
00122   void release_reservations(RepoId          remote_id,
00123                             RepoId          local_id,
00124                             DataLinkSetMap& released_locals);
00125 
00126   void schedule_delayed_release();
00127 
00128   const ACE_Time_Value& datalink_release_delay() const;
00129 
00130   /// Either send or receive listener for this local_id should be
00131   /// removed from internal DataLink structures so it no longer
00132   /// receives events.
00133   void remove_listener(const RepoId& local_id);
00134 
00135   // ciju: Called by LinkSet with locks held
00136   /// Called by the TransportClient objects that reference this
00137   /// DataLink.  Used by the TransportClient to send a sample,
00138   /// or to send a control message. These functions either give the
00139   /// request to the PerThreadConnectionSendTask when thread_per_connection
00140   /// configuration is true or just simply delegate to the send strategy.
00141   void send_start();
00142   void send(TransportQueueElement* element);
00143   void send_stop(RepoId repoId);
00144 
00145   // ciju: Called by LinkSet with locks held
00146   /// This method is essentially an "undo_send()" method.  It's goal
00147   /// is to remove all traces of the sample from this DataLink (if
00148   /// the sample is even known to the DataLink).
00149   virtual RemoveResult remove_sample(const DataSampleElement* sample,
00150                                      void* context);
00151 
00152   // ciju: Called by LinkSet with locks held
00153   void remove_all_msgs(RepoId pub_id);
00154 
00155   /// This is called by our TransportReceiveStrategy object when it
00156   /// has received a complete data sample.  This method will cause
00157   /// the appropriate TransportReceiveListener objects to be told
00158   /// that data_received().
00159   /// If readerId is not GUID_UNKNOWN, only the TransportReceiveListener
00160   /// with that ID (if one exists) will receive the data.
00161   int data_received(ReceivedDataSample& sample,
00162                     const RepoId& readerId = GUID_UNKNOWN);
00163 
00164   /// Varation of data_received() that allows for excluding a subset of readers
00165   /// by specifying which readers specifically should receive.
00166   /// Any reader ID that does not appear in the include set will be skipped.
00167   void data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl);
00168 
00169   /// Obtain a unique identifier for this DataLink object.
00170   DataLinkIdType id() const;
00171 
00172   /// Our TransportImpl will inform us if it is being shutdown()
00173   /// by calling this method.
00174   void transport_shutdown();
00175 
00176   /// Notify the datawriters and datareaders that the connection is
00177   /// disconnected, lost, or reconnected. The datareader/datawriter
00178   /// will notify the corresponding listener.
00179   void notify(ConnectionNotice notice);
00180 
00181   /// Called before release the datalink or before shutdown to let
00182   /// the concrete DataLink to do anything necessary.
00183   virtual void pre_stop_i();
00184 
00185   // Call-back from the concrete transport object.
00186   // The connection has been broken. No locks are being held.
00187   // Take a snapshot of current associations which will be removed
00188   // by DataLinkCleanupTask.
00189   bool release_resources();
00190 
00191   // Used by to inform the send strategy to clear all unsent samples upon
00192   // backpressure timed out.
00193   void terminate_send();
00194 
00195   /// This is called on publisher side to see if this link communicates
00196   /// with the provided sub.
00197   bool is_target(const RepoId& remote_sub_id);
00198 
00199   /// This is called by DataLinkCleanupTask thread to remove the associations
00200   /// based on the snapshot in release_resources().
00201   void clear_associations();
00202 
00203   int handle_timeout(const ACE_Time_Value& tv, const void* arg);
00204   int handle_close(ACE_HANDLE h, ACE_Reactor_Mask m);
00205 
00206   // Set the DiffServ codepoint of the socket.  This is a stateless
00207   // method and is here only because this is a convenient common
00208   // location that can be reached by client code that needs to
00209   // perform this behavior.
00210   void set_dscp_codepoint(int cp, ACE_SOCK& socket);
00211 
00212   /// Accessors for the TRANSPORT_PRIORITY value associated with
00213   /// this link.
00214   Priority& transport_priority();
00215   Priority  transport_priority() const;
00216 
00217   bool& is_loopback();
00218   bool  is_loopback() const;
00219 
00220   bool& is_active();
00221   bool  is_active() const;
00222 
00223   bool cancel_release();
00224 
00225   /// This allows a subclass to easily create a transport control
00226   /// sample to send via send_control.
00227   ACE_Message_Block* create_control(char submessage_id,
00228                                     DataSampleHeader& header,
00229                                     Message_Block_Ptr data);
00230 
00231   /// This allows a subclass to send transport control samples over
00232   /// this DataLink. This is useful for sending transport-specific
00233   /// control messages between one or more endpoints under this
00234   /// DataLink's control.
00235   SendControlStatus send_control(const DataSampleHeader& header, Message_Block_Ptr data);
00236 
00237   /// For a given publication "pub_id", store the total number of corresponding
00238   /// subscriptions in "n_subs" and given a set of subscriptions
00239   /// (the "in" sequence), return the subset of the input set "in" which are
00240   /// targets of this DataLink (see is_target()).
00241   GUIDSeq* target_intersection(const RepoId& pub_id, const GUIDSeq& in, size_t& n_subs);
00242 
00243   TransportImpl& impl() const;
00244 
00245   void default_listener(const TransportReceiveListener_wrch& trl);
00246   TransportReceiveListener_wrch default_listener() const;
00247 
00248   typedef WeakRcHandle<TransportClient> TransportClient_wrch;
00249   typedef std::pair<TransportClient_wrch, RepoId> OnStartCallback;
00250   bool add_on_start_callback(const TransportClient_wrch& client, const RepoId& remote);
00251   void remove_on_start_callback(const TransportClient_wrch& client, const RepoId& remote);
00252   void invoke_on_start_callbacks(bool success);
00253 
00254   void set_scheduling_release(bool scheduling_release);
00255 
00256   virtual void send_final_acks (const RepoId& readerid);
00257 
00258 protected:
00259 
00260   /// This is how the subclass "announces" to this DataLink base class
00261   /// that this DataLink has now been "connected" and should start
00262   /// the supplied strategy objects.  This start method is also
00263   /// going to keep a "copy" of the references to the strategy objects.
00264   /// Also note that it is acceptable to pass-in a NULL (0)
00265   /// TransportReceiveStrategy*, but it is assumed that the
00266   /// TransportSendStrategy* argument is not NULL.
00267   ///
00268   /// If the start() method fails to start either strategy, then a -1
00269   /// is returned.  Otherwise, a 0 is returned.  In the failure case,
00270   /// if one of the strategy objects was started successfully, then
00271   /// it will be stopped before the start() method returns -1.
00272   int start(const TransportSendStrategy_rch& send_strategy,
00273             const TransportStrategy_rch& receive_strategy);
00274 
00275   /// This announces the "stop" event to our subclass.  The "stop"
00276   /// event will occur when this DataLink is handling a
00277   /// release_reservations() call and determines that it has just
00278   /// released all of the remaining reservations on this DataLink.
00279   /// The "stop" event will also occur when the TransportImpl
00280   /// is being shutdown() - we call stop_i() from our
00281   /// transport_shutdown() method to handle this case.
00282   virtual void stop_i();
00283 
00284   /// Used to provide unique Ids to all DataLink methods.
00285   static ACE_UINT64 get_next_datalink_id();
00286 
00287   /// The transport receive strategy object for this DataLink.
00288   TransportStrategy_rch receive_strategy_;
00289 
00290   friend class ThreadPerConnectionSendTask;
00291 
00292   /// The implementation of the functions that accomplish the
00293   /// sample or control message delivery. IThey just simply
00294   /// delegate to the send strategy.
00295   void send_start_i();
00296   virtual void send_i(TransportQueueElement* element, bool relink = true);
00297   void send_stop_i(RepoId repoId);
00298 
00299   /// For a given local RepoId (publication or subscription), return the list
00300   /// of remote peer RepoIds (subscriptions or publications) that this link
00301   /// knows about due to make_reservation().
00302   GUIDSeq* peer_ids(const RepoId& local_id) const;
00303 
00304 private:
00305 
00306   /// Helper function to output the enum as a string to help debugging.
00307   const char* connection_notice_as_str(ConnectionNotice notice);
00308 
00309   TransportSendListener_rch send_listener_for(const RepoId& pub_id) const;
00310   TransportReceiveListener_rch recv_listener_for(const RepoId& sub_id) const;
00311 
00312   /// Save current sub and pub association maps for releasing and create
00313   /// empty maps for new associations.
00314   void prepare_release();
00315 
00316   virtual bool handle_send_request_ack(TransportQueueElement* element);
00317 
00318   /// Allow derived classes to provide an alternate "customized" queue element
00319   /// for this DataLink (not shared with other links in the DataLinkSet).
00320   virtual TransportQueueElement* customize_queue_element(
00321     TransportQueueElement* element)
00322   {
00323     return element;
00324   }
00325 
00326   virtual void release_remote_i(const RepoId& /*remote_id*/) {}
00327   virtual void release_reservations_i(const RepoId& /*remote_id*/,
00328                                       const RepoId& /*local_id*/) {}
00329 
00330   void data_received_i(ReceivedDataSample& sample,
00331                        const RepoId& readerId,
00332                        const RepoIdSet& incl_excl,
00333                        ReceiveListenerSet::ConstrainReceiveSet constrain);
00334 
00335   void notify_reactor();
00336 
00337   typedef ACE_SYNCH_MUTEX     LockType;
00338 
00339   /// Convenience function for diagnostic information.
00340 #ifndef OPENDDS_SAFETY_PROFILE
00341   friend OpenDDS_Dcps_Export
00342   std::ostream& operator<<(std::ostream& str, const DataLink& value);
00343 #endif
00344 
00345   /// A boolean indicating if the DataLink has been stopped. This
00346   /// value is protected by the strategy_lock_.
00347   bool stopped_;
00348   ACE_Time_Value scheduled_to_stop_at_;
00349 
00350   /// Map publication Id value to TransportSendListener.
00351   typedef OPENDDS_MAP_CMP(RepoId, TransportSendListener_wrch, GUID_tKeyLessThan) IdToSendListenerMap;
00352   IdToSendListenerMap send_listeners_;
00353 
00354   /// Map subscription Id value to TransportReceieveListener.
00355   typedef OPENDDS_MAP_CMP(RepoId, TransportReceiveListener_wrch, GUID_tKeyLessThan) IdToRecvListenerMap;
00356   IdToRecvListenerMap recv_listeners_;
00357 
00358   /// If default_listener_ is not null and this DataLink receives a sample
00359   /// from a publication GUID that's not in pub_map_, it will call
00360   /// data_received() on the default_listener_.
00361   TransportReceiveListener_wrch default_listener_;
00362 
00363   mutable LockType pub_sub_maps_lock_;
00364 
00365   typedef OPENDDS_MAP_CMP(RepoId, ReceiveListenerSet_rch, GUID_tKeyLessThan) AssocByRemote;
00366   AssocByRemote assoc_by_remote_;
00367 
00368   typedef OPENDDS_MAP_CMP(RepoId, RepoIdSet, GUID_tKeyLessThan) AssocByLocal;
00369   AssocByLocal assoc_by_local_;
00370 
00371   /// A (smart) pointer to the TransportImpl that created this DataLink.
00372   TransportImpl& impl_;
00373 
00374   /// The id for this DataLink
00375   ACE_UINT64 id_;
00376 
00377   /// The task used to do the sending. This ThreadPerConnectionSendTask
00378   /// object is created when the thread_per_connection configuration is
00379   /// true. It only dedicate to this datalink.
00380   unique_ptr<ThreadPerConnectionSendTask> thr_per_con_send_task_;
00381 
00382   // snapshot of associations when the release_resources() is called.
00383   AssocByLocal assoc_releasing_;
00384 
00385   /// TRANSPORT_PRIORITY value associated with the link.
00386   Priority transport_priority_;
00387 
00388   bool scheduling_release_;
00389 
00390 protected:
00391 
00392   typedef ACE_Guard<LockType> GuardType;
00393 
00394   /// The transport send strategy object for this DataLink.
00395   TransportSendStrategy_rch send_strategy_;
00396 
00397   LockType strategy_lock_;
00398   OPENDDS_VECTOR(OnStartCallback) on_start_callbacks_;
00399 
00400   /// Configurable delay in milliseconds that the datalink
00401   /// should be released after all associations are removed.
00402   ACE_Time_Value datalink_release_delay_;
00403 
00404   /// Allocators for data and message blocks used by transport
00405   /// control samples when send_control is called.
00406   unique_ptr<MessageBlockAllocator> mb_allocator_;
00407   unique_ptr<DataBlockAllocator> db_allocator_;
00408 
00409   /// Is remote attached to same transport ?
00410   bool is_loopback_;
00411   /// Is pub or sub ?
00412   bool is_active_;
00413   bool started_;
00414 
00415   /// Listener for TransportSendControlElements created in send_control
00416   SendResponseListener send_response_listener_;
00417 };
00418 
00419 } // namespace DCPS
00420 } // namespace OpenDDS
00421 
00422 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00423 
00424 #if defined (__ACE_INLINE__)
00425 #include "DataLink.inl"
00426 #endif /* __ACE_INLINE__ */
00427 
00428 #endif /* OPENDDS_DCPS_DATALINK_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1