LCOV - code coverage report
Current view: top level - DCPS/transport/framework - TransportReassembly.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 5 5 100.0 %
Date: 2023-04-30 01:32:43 Functions: 3 3 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTREASSEMBLY_H
       9             : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTREASSEMBLY_H
      10             : 
      11             : #include "ReceivedDataSample.h"
      12             : 
      13             : #include "dds/DCPS/dcps_export.h"
      14             : #include "dds/DCPS/Definitions.h"
      15             : #include "dds/DCPS/DisjointSequence.h"
      16             : #include "dds/DCPS/PoolAllocator.h"
      17             : #include "dds/DCPS/RcObject.h"
      18             : #include "dds/DCPS/TimeTypes.h"
      19             : 
      20             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      21             : 
      22             : namespace OpenDDS {
      23             : namespace DCPS {
      24             : 
      25             : #pragma pack(push, 1)
      26             : 
      27             : // A FragKey represents the identifier for an original (pre-fragmentation)
      28             : // message.  Since DataSampleHeader sequence numbers are distinct for each
      29             : // "publication" (DataWriter), the partially-received messages need to be
      30             : // stored in a structure that's keyed off of both the GUID_t and the
      31             : // SequenceNumber.
      32             : struct FragKey {
      33             :   FragKey(const GUID_t& pubId, const SequenceNumber& dataSampleSeq);
      34             : 
      35             :   bool operator<(const FragKey& rhs) const
      36             :   {
      37             :     if (compare_(this->publication_, rhs.publication_)) return true;
      38             :     if (compare_(rhs.publication_, this->publication_)) return false;
      39             :     return this->data_sample_seq_ < rhs.data_sample_seq_;
      40             :   }
      41             : 
      42         576 :   bool operator==(const FragKey& other) const
      43             :   {
      44         576 :     return publication_ == other.publication_ && data_sample_seq_ == other.data_sample_seq_;
      45             :   }
      46             : 
      47             :   bool operator!=(const FragKey& other) const
      48             :   {
      49             :     return !(*this == other);
      50             :   }
      51             : 
      52             :   static GUID_tKeyLessThan compare_;
      53             :   GUID_t publication_;
      54             :   SequenceNumber data_sample_seq_;
      55             : };
      56             : 
      57             : #pragma pack(pop)
      58             : 
      59             : #if defined ACE_HAS_CPP11
      60        1031 :   OPENDDS_OOAT_CUSTOM_HASH(FragKey, OpenDDS_Dcps_Export, FragKeyHash);
      61             : #endif
      62             : 
      63             : typedef std::pair<FragmentNumber, FragmentNumber> FragmentRange;
      64             : 
      65             : class OpenDDS_Dcps_Export TransportReassembly : public virtual RcObject {
      66             : public:
      67             :   explicit TransportReassembly(const TimeDuration& timeout = TimeDuration(300));
      68             : 
      69             :   /// Called by TransportReceiveStrategy if the fragmentation header flag
      70             :   /// is set.  Returns true/false to indicate if data should be delivered to
      71             :   /// the datalink.  The 'data' argument may be modified by this method.
      72             :   bool reassemble(const SequenceNumber& transportSeq, bool firstFrag,
      73             :                   ReceivedDataSample& data, ACE_UINT32 total_frags = 0);
      74             : 
      75             :   bool reassemble(const FragmentRange& fragRange, ReceivedDataSample& data, ACE_UINT32 total_frags = 0);
      76             : 
      77             :   /// Called by TransportReceiveStrategy to indicate that we can
      78             :   /// stop tracking partially-reassembled messages when we know the
      79             :   /// remaining fragments are not expected to arrive.
      80             :   void data_unavailable(const FragmentRange& transportSeqDropped);
      81             : 
      82             :   void data_unavailable(const SequenceNumber& dataSampleSeq,
      83             :                         const GUID_t& pub_id);
      84             : 
      85             :   /// Clears out "completed" sequence numbers in order to allow resends for
      86             :   /// new associations to the same (given) publication id
      87             :   void clear_completed(const GUID_t& pub_id);
      88             : 
      89             :   /// Returns true if this object is storing fragments for the given
      90             :   /// DataSampleHeader sequence number from the given publication.
      91           3 :   bool has_frags(const SequenceNumber& seq, const GUID_t& pub_id) const
      92             :   {
      93             :     ACE_UINT32 total_frags;
      94           6 :     return has_frags(seq, pub_id, total_frags);
      95             :   }
      96             : 
      97             :   /// Returns true if this object is storing fragments for the given
      98             :   /// DataSampleHeader sequence number from the given publication.
      99             :   bool has_frags(const SequenceNumber& seq, const GUID_t& pub_id, ACE_UINT32& total_frags) const;
     100             : 
     101             :   /// Populates bitmap for missing fragment sequence numbers and set numBits
     102             :   /// for the given message sequence and publisher ID.
     103             :   /// @returns the base fragment sequence number for bit zero in the bitmap
     104             :   CORBA::ULong get_gaps(const SequenceNumber& msg_seq, const GUID_t& pub_id,
     105             :                         CORBA::Long bitmap[], CORBA::ULong length,
     106             :                         CORBA::ULong& numBits) const;
     107             : 
     108             : private:
     109             : 
     110             :   bool reassemble_i(const FragmentRange& fragRange, bool firstFrag,
     111             :                     ReceivedDataSample& data, ACE_UINT32 total_frags);
     112             : 
     113             :   // A FragSample represents a chunk of a partially-reassembled message.
     114             :   // The frag_range_ range is the range of transport sequence numbers
     115             :   // that were used to send the given chunk of data.
     116             :   struct FragSample {
     117             :     FragSample(const FragmentRange& fragRange,
     118             :               const ReceivedDataSample& data);
     119             : 
     120             :     FragmentRange frag_range_;
     121             :     ReceivedDataSample rec_ds_;
     122             :   };
     123             : 
     124             :   struct FragInfo {
     125             :     typedef OPENDDS_LIST(FragSample) FragSampleList;
     126             :     typedef OPENDDS_MAP(FragmentNumber, FragSampleList::iterator) FragSampleListIterMap;
     127             : 
     128             :     typedef OPENDDS_LIST(FragmentRange) FragGapList;
     129             :     typedef OPENDDS_MAP(FragmentNumber, FragGapList::iterator) FragGapListIterMap;
     130             : 
     131             :     FragInfo();
     132             :     FragInfo(bool hf, const FragSampleList& rl, ACE_UINT32 tf, const MonotonicTimePoint& expiration);
     133             :     FragInfo(const FragInfo& val);
     134             : 
     135             :     FragInfo& operator=(const FragInfo& rhs);
     136             : 
     137             :     bool insert(const FragmentRange& fragRange, ReceivedDataSample& data);
     138             : 
     139             :     bool have_first_;
     140             :     FragSampleList sample_list_;
     141             :     FragSampleListIterMap sample_finder_;
     142             :     FragGapList gap_list_;
     143             :     FragGapListIterMap gap_finder_;
     144             :     ACE_UINT32 total_frags_;
     145             :     MonotonicTimePoint expiration_;
     146             :   };
     147             : 
     148             :   mutable ACE_Thread_Mutex mutex_;
     149             : 
     150             : #ifdef ACE_HAS_CPP11
     151             :   typedef OPENDDS_UNORDERED_MAP_CHASH(FragKey, FragInfo, FragKeyHash) FragInfoMap;
     152             : #else
     153             :   typedef OPENDDS_MAP(FragKey, FragInfo) FragInfoMap;
     154             : #endif
     155             :   FragInfoMap fragments_;
     156             : 
     157             :   typedef std::pair<MonotonicTimePoint, FragKey> ElementType;
     158             :   typedef OPENDDS_LIST(ElementType) ExpirationQueue;
     159             :   ExpirationQueue expiration_queue_;
     160             : 
     161             :   typedef OPENDDS_MAP_CMP(GUID_t, DisjointSequence, GUID_tKeyLessThan) CompletedMap;
     162             :   CompletedMap completed_;
     163             : 
     164             :   TimeDuration timeout_;
     165             : 
     166             :   void check_expirations(const MonotonicTimePoint& now);
     167             : };
     168             : 
     169             : typedef RcHandle<TransportReassembly> TransportReassembly_rch;
     170             : 
     171             : }
     172             : }
     173             : 
     174             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     175             : 
     176             : #endif

Generated by: LCOV version 1.16