TransportQueueElement.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H
00009 #define OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/GuidUtils.h"
00014 #include "ace/Synch.h"
00015 #include "dds/DCPS/PoolAllocationBase.h"
00016 
00017 #include <utility>
00018 
00019 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00020 class ACE_Message_Block;
00021 ACE_END_VERSIONED_NAMESPACE_DECL
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 class DataSampleElement;
00027 
00028 class TransportQueueElement;
00029 typedef std::pair<TransportQueueElement*, TransportQueueElement*> ElementPair;
00030 
00031 /**
00032  * @class TransportQueueElement
00033  *
00034  * @brief Base wrapper class around a data/control sample to be sent.
00035  *
00036  * This class serves as the base class for different types of samples
00037  * that can be sent.  For example, there are data samples and control
00038  * samples.  A subclass of TransportQueueElement exists for each of
00039  * these types of samples.
00040  *
00041  * This class maintains a counter that, when decremented to 0, will
00042  * trigger some logic (defined in the subclass) that will "return
00043  * the loan" of the sample.  The sample is "loaned" to the transport
00044  * via a send() or send_control() call on the TransportClient.
00045  * This wrapper object will "return the loan" when all DataLinks have
00046  * "returned" their sub-loans.
00047  */
00048 class OpenDDS_Dcps_Export TransportQueueElement : public PoolAllocationBase {
00049 public:
00050 
00051   virtual ~TransportQueueElement();
00052 
00053   class OpenDDS_Dcps_Export MatchCriteria {
00054   protected:
00055     virtual ~MatchCriteria();
00056     MatchCriteria() {}
00057   public:
00058     virtual bool matches(const TransportQueueElement& candidate) const = 0;
00059     virtual bool unique() const = 0; // (only expect to match 1 element)
00060   private: // and unimplemented...
00061     MatchCriteria(const MatchCriteria&);
00062     MatchCriteria& operator=(const MatchCriteria&);
00063   };
00064 
00065   class OpenDDS_Dcps_Export MatchOnPubId : public MatchCriteria {
00066   public:
00067     explicit MatchOnPubId(const RepoId& id) : pub_id_(id) {}
00068     virtual ~MatchOnPubId();
00069     virtual bool matches(const TransportQueueElement& candidate) const;
00070     virtual bool unique() const { return false; }
00071   private:
00072     RepoId pub_id_;
00073   };
00074 
00075   class OpenDDS_Dcps_Export MatchOnDataPayload : public MatchCriteria {
00076   public:
00077     explicit MatchOnDataPayload(const char* data) : data_(data) {}
00078     virtual ~MatchOnDataPayload();
00079     virtual bool matches(const TransportQueueElement& candidate) const;
00080     virtual bool unique() const { return true; }
00081   private:
00082     const char* data_;
00083   };
00084 
00085   /// Invoked when the sample is dropped from a DataLink due to a
00086   /// remove_sample() call.
00087   /// The dropped_by_transport flag true indicates the data dropping is initiated
00088   /// by transport when the transport send strategy is in a MODE_TERMINATED.
00089   /// The dropped_by_transport flag false indicates the dropping is initiated
00090   /// by the remove_sample and data_dropped() is a result of remove_sample().
00091   /// The return value indicates if this element is released.
00092   bool data_dropped(bool dropped_by_transport = false);
00093 
00094   /// Invoked when the sample has been sent by a DataLink.
00095   /// The return value indicates if this element is released.
00096   bool data_delivered();
00097 
00098   /// Does the sample require an exclusive transport packet?
00099   virtual bool requires_exclusive_packet() const;
00100 
00101   /// Accessor for the publication id that sent the sample.
00102   virtual RepoId publication_id() const = 0;
00103 
00104   /// Accessor for the subscription id, if sent the sample is sent to 1 sub
00105   virtual RepoId subscription_id() const {
00106     return GUID_UNKNOWN;
00107   }
00108 
00109   virtual SequenceNumber sequence() const {
00110     return SequenceNumber::SEQUENCENUMBER_UNKNOWN();
00111   }
00112 
00113   /// The marshalled sample (sample header + sample data)
00114   virtual const ACE_Message_Block* msg() const = 0;
00115 
00116   /// The marshalled payload only (sample data)
00117   virtual const ACE_Message_Block* msg_payload() const = 0;
00118 
00119   /// Is the element a "control" sample from the specified pub_id?
00120   virtual bool is_control(RepoId pub_id) const;
00121 
00122   /// Is the listener get called ?
00123   bool released() const;
00124   void released(bool flag);
00125 
00126   /// Clone method with provided message block allocator and data block
00127   /// allocators.
00128   static ACE_Message_Block* clone_mb(const ACE_Message_Block* msg,
00129                                      MessageBlockAllocator* mb_allocator,
00130                                      DataBlockAllocator* db_allocator);
00131 
00132   /// Is the sample created by the transport?
00133   virtual bool owned_by_transport() = 0;
00134 
00135   /// Create two TransportQueueElements representing the same data payload
00136   /// as the current TransportQueueElement, with the first one (including its
00137   /// DataSampleHeader) fitting in "size" bytes.  This method leaves the
00138   /// current TransportQueueElement alone (but can't be made const because
00139   /// the newly-created elements will need to invoke non-const methods on it).
00140   /// Each element in the pair will contain its own serialized modified
00141   /// DataSampleHeader.
00142   virtual ElementPair fragment(size_t size);
00143 
00144   /// Is this QueueElement the result of fragmentation?
00145   virtual bool is_fragment() const { return false; }
00146 
00147 protected:
00148 
00149   /// Ctor.  The initial_count is the number of DataLinks to which
00150   /// this TransportQueueElement will be sent.
00151   explicit TransportQueueElement(unsigned long initial_count);
00152 
00153   /// Invoked when the counter reaches 0.
00154   virtual void release_element(bool dropped_by_transport) = 0;
00155 
00156   /// May be used by subclass' implementation of release_element()
00157   /// to determine if any DataLinks dropped the data instead of
00158   /// delivering it.
00159   bool was_dropped() const;
00160 
00161 private:
00162 
00163   /// Common logic for data_dropped() and data_delivered().
00164   bool decision_made(bool dropped_by_transport);
00165   friend class TransportCustomizedElement;
00166 
00167   /// Counts the number of outstanding sub-loans.
00168   ACE_Atomic_Op<ACE_Thread_Mutex, unsigned long> sub_loan_count_;
00169 
00170   /// Flag flipped to true if any DataLink dropped the sample.
00171   bool dropped_;
00172 
00173   /// If the callback to DW is made.
00174   bool released_;
00175 };
00176 
00177 } // namespace DCPS
00178 } // namespace OpenDDS
00179 
00180 #if defined(__ACE_INLINE__)
00181 #include "TransportQueueElement.inl"
00182 #endif /* __ACE_INLINE__ */
00183 
00184 #endif  /* OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H */

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7