TransportQueueElement.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H
00009 #define OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/GuidUtils.h"
00014 #include "dds/DCPS/PoolAllocationBase.h"
00015 #include "dds/DCPS/SequenceNumber.h"
00016 
00017 #include <utility>
00018 
00019 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00020 class ACE_Message_Block;
00021 ACE_END_VERSIONED_NAMESPACE_DECL
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 namespace OpenDDS {
00026 namespace DCPS {
00027 
00028 class DataSampleElement;
00029 
00030 class TransportQueueElement;
00031 typedef std::pair<TransportQueueElement*, TransportQueueElement*> ElementPair;
00032 
00033 /**
00034  * @class TransportQueueElement
00035  *
00036  * @brief Base wrapper class around a data/control sample to be sent.
00037  *
00038  * This class serves as the base class for different types of samples
00039  * that can be sent.  For example, there are data samples and control
00040  * samples.  A subclass of TransportQueueElement exists for each of
00041  * these types of samples.
00042  *
00043  * This class maintains a counter that, when decremented to 0, will
00044  * trigger some logic (defined in the subclass) that will "return
00045  * the loan" of the sample.  The sample is "loaned" to the transport
00046  * via a send() or send_control() call on the TransportClient.
00047  * This wrapper object will "return the loan" when all DataLinks have
00048  * "returned" their sub-loans.
00049  */
00050 class OpenDDS_Dcps_Export TransportQueueElement : public PoolAllocationBase {
00051 public:
00052 
00053   virtual ~TransportQueueElement();
00054 
00055   class OpenDDS_Dcps_Export MatchCriteria {
00056   protected:
00057     virtual ~MatchCriteria();
00058     MatchCriteria() {}
00059   public:
00060     virtual bool matches(const TransportQueueElement& candidate) const = 0;
00061     virtual bool unique() const = 0; // (only expect to match 1 element)
00062   private: // and unimplemented...
00063     MatchCriteria(const MatchCriteria&);
00064     MatchCriteria& operator=(const MatchCriteria&);
00065   };
00066 
00067   class OpenDDS_Dcps_Export MatchOnPubId : public MatchCriteria {
00068   public:
00069     explicit MatchOnPubId(const RepoId& id) : pub_id_(id) {}
00070     virtual ~MatchOnPubId();
00071     virtual bool matches(const TransportQueueElement& candidate) const;
00072     virtual bool unique() const { return false; }
00073   private:
00074     RepoId pub_id_;
00075   };
00076 
00077   class OpenDDS_Dcps_Export MatchOnDataPayload : public MatchCriteria {
00078   public:
00079     explicit MatchOnDataPayload(const char* data) : data_(data) {}
00080     virtual ~MatchOnDataPayload();
00081     virtual bool matches(const TransportQueueElement& candidate) const;
00082     virtual bool unique() const { return true; }
00083   private:
00084     const char* data_;
00085   };
00086 
00087   /// Invoked when the sample is dropped from a DataLink due to a
00088   /// remove_sample() call.
00089   /// The dropped_by_transport flag true indicates the data dropping is initiated
00090   /// by transport when the transport send strategy is in a MODE_TERMINATED.
00091   /// The dropped_by_transport flag false indicates the dropping is initiated
00092   /// by the remove_sample and data_dropped() is a result of remove_sample().
00093   /// The return value indicates if this element is released.
00094   bool data_dropped(bool dropped_by_transport = false);
00095 
00096   /// Invoked when the sample has been sent by a DataLink.
00097   /// The return value indicates if this element is released.
00098   bool data_delivered();
00099 
00100   /// Does the sample require an exclusive transport packet?
00101   virtual bool requires_exclusive_packet() const;
00102 
00103   /// Accessor for the publication id that sent the sample.
00104   virtual RepoId publication_id() const = 0;
00105 
00106   /// Accessor for the subscription id, if sent the sample is sent to 1 sub
00107   virtual RepoId subscription_id() const {
00108     return GUID_UNKNOWN;
00109   }
00110 
00111   virtual SequenceNumber sequence() const {
00112     return SequenceNumber::SEQUENCENUMBER_UNKNOWN();
00113   }
00114 
00115   /// The marshalled sample (sample header + sample data)
00116   virtual const ACE_Message_Block* msg() const = 0;
00117 
00118   /// The marshalled payload only (sample data)
00119   virtual const ACE_Message_Block* msg_payload() const = 0;
00120 
00121   /// Is the element a "control" sample from the specified pub_id?
00122   virtual bool is_control(RepoId pub_id) const;
00123 
00124   /// Is the listener get called ?
00125   bool released() const;
00126   void released(bool flag);
00127 
00128   /// Clone method with provided message block allocator and data block
00129   /// allocators.
00130   static ACE_Message_Block* clone_mb(const ACE_Message_Block* msg,
00131                                      MessageBlockAllocator* mb_allocator,
00132                                      DataBlockAllocator* db_allocator);
00133 
00134   /// Is the sample created by the transport?
00135   virtual bool owned_by_transport() = 0;
00136 
00137   /// Create two TransportQueueElements representing the same data payload
00138   /// as the current TransportQueueElement, with the first one (including its
00139   /// DataSampleHeader) fitting in "size" bytes.  This method leaves the
00140   /// current TransportQueueElement alone (but can't be made const because
00141   /// the newly-created elements will need to invoke non-const methods on it).
00142   /// Each element in the pair will contain its own serialized modified
00143   /// DataSampleHeader.
00144   virtual ElementPair fragment(size_t size);
00145 
00146   /// Is this QueueElement the result of fragmentation?
00147   virtual bool is_fragment() const { return false; }
00148 
00149   virtual bool is_request_ack() const { return false; }
00150 
00151 protected:
00152 
00153   /// Ctor.  The initial_count is the number of DataLinks to which
00154   /// this TransportQueueElement will be sent.
00155   explicit TransportQueueElement(unsigned long initial_count);
00156 
00157   /// Invoked when the counter reaches 0.
00158   virtual void release_element(bool dropped_by_transport) = 0;
00159 
00160   /// May be used by subclass' implementation of release_element()
00161   /// to determine if any DataLinks dropped the data instead of
00162   /// delivering it.
00163   bool was_dropped() const;
00164 
00165 private:
00166 
00167   /// Common logic for data_dropped() and data_delivered().
00168   bool decision_made(bool dropped_by_transport);
00169   friend class TransportCustomizedElement;
00170 
00171   /// Counts the number of outstanding sub-loans.
00172   ACE_Atomic_Op<ACE_Thread_Mutex, unsigned long> sub_loan_count_;
00173 
00174   /// Flag flipped to true if any DataLink dropped the sample.
00175   bool dropped_;
00176 
00177   /// If the callback to DW is made.
00178   bool released_;
00179 };
00180 
00181 } // namespace DCPS
00182 } // namespace OpenDDS
00183 
00184 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00185 
00186 #if defined(__ACE_INLINE__)
00187 #include "TransportQueueElement.inl"
00188 #endif /* __ACE_INLINE__ */
00189 
00190 #endif  /* OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1