Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTREASSEMBLY_H 9 : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTREASSEMBLY_H 10 : 11 : #include "ReceivedDataSample.h" 12 : 13 : #include "dds/DCPS/dcps_export.h" 14 : #include "dds/DCPS/Definitions.h" 15 : #include "dds/DCPS/DisjointSequence.h" 16 : #include "dds/DCPS/PoolAllocator.h" 17 : #include "dds/DCPS/RcObject.h" 18 : #include "dds/DCPS/TimeTypes.h" 19 : 20 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 21 : 22 : namespace OpenDDS { 23 : namespace DCPS { 24 : 25 : #pragma pack(push, 1) 26 : 27 : // A FragKey represents the identifier for an original (pre-fragmentation) 28 : // message. Since DataSampleHeader sequence numbers are distinct for each 29 : // "publication" (DataWriter), the partially-received messages need to be 30 : // stored in a structure that's keyed off of both the GUID_t and the 31 : // SequenceNumber. 32 : struct FragKey { 33 : FragKey(const GUID_t& pubId, const SequenceNumber& dataSampleSeq); 34 : 35 : bool operator<(const FragKey& rhs) const 36 : { 37 : if (compare_(this->publication_, rhs.publication_)) return true; 38 : if (compare_(rhs.publication_, this->publication_)) return false; 39 : return this->data_sample_seq_ < rhs.data_sample_seq_; 40 : } 41 : 42 576 : bool operator==(const FragKey& other) const 43 : { 44 576 : return publication_ == other.publication_ && data_sample_seq_ == other.data_sample_seq_; 45 : } 46 : 47 : bool operator!=(const FragKey& other) const 48 : { 49 : return !(*this == other); 50 : } 51 : 52 : static GUID_tKeyLessThan compare_; 53 : GUID_t publication_; 54 : SequenceNumber data_sample_seq_; 55 : }; 56 : 57 : #pragma pack(pop) 58 : 59 : #if defined ACE_HAS_CPP11 60 1031 : OPENDDS_OOAT_CUSTOM_HASH(FragKey, OpenDDS_Dcps_Export, FragKeyHash); 61 : #endif 62 : 63 : typedef std::pair<FragmentNumber, FragmentNumber> FragmentRange; 64 : 65 : class OpenDDS_Dcps_Export TransportReassembly : public virtual RcObject { 66 : public: 67 : explicit TransportReassembly(const TimeDuration& timeout = TimeDuration(300)); 68 : 69 : /// Called by TransportReceiveStrategy if the fragmentation header flag 70 : /// is set. Returns true/false to indicate if data should be delivered to 71 : /// the datalink. The 'data' argument may be modified by this method. 72 : bool reassemble(const SequenceNumber& transportSeq, bool firstFrag, 73 : ReceivedDataSample& data, ACE_UINT32 total_frags = 0); 74 : 75 : bool reassemble(const FragmentRange& fragRange, ReceivedDataSample& data, ACE_UINT32 total_frags = 0); 76 : 77 : /// Called by TransportReceiveStrategy to indicate that we can 78 : /// stop tracking partially-reassembled messages when we know the 79 : /// remaining fragments are not expected to arrive. 80 : void data_unavailable(const FragmentRange& transportSeqDropped); 81 : 82 : void data_unavailable(const SequenceNumber& dataSampleSeq, 83 : const GUID_t& pub_id); 84 : 85 : /// Clears out "completed" sequence numbers in order to allow resends for 86 : /// new associations to the same (given) publication id 87 : void clear_completed(const GUID_t& pub_id); 88 : 89 : /// Returns true if this object is storing fragments for the given 90 : /// DataSampleHeader sequence number from the given publication. 91 3 : bool has_frags(const SequenceNumber& seq, const GUID_t& pub_id) const 92 : { 93 : ACE_UINT32 total_frags; 94 6 : return has_frags(seq, pub_id, total_frags); 95 : } 96 : 97 : /// Returns true if this object is storing fragments for the given 98 : /// DataSampleHeader sequence number from the given publication. 99 : bool has_frags(const SequenceNumber& seq, const GUID_t& pub_id, ACE_UINT32& total_frags) const; 100 : 101 : /// Populates bitmap for missing fragment sequence numbers and set numBits 102 : /// for the given message sequence and publisher ID. 103 : /// @returns the base fragment sequence number for bit zero in the bitmap 104 : CORBA::ULong get_gaps(const SequenceNumber& msg_seq, const GUID_t& pub_id, 105 : CORBA::Long bitmap[], CORBA::ULong length, 106 : CORBA::ULong& numBits) const; 107 : 108 : private: 109 : 110 : bool reassemble_i(const FragmentRange& fragRange, bool firstFrag, 111 : ReceivedDataSample& data, ACE_UINT32 total_frags); 112 : 113 : // A FragSample represents a chunk of a partially-reassembled message. 114 : // The frag_range_ range is the range of transport sequence numbers 115 : // that were used to send the given chunk of data. 116 : struct FragSample { 117 : FragSample(const FragmentRange& fragRange, 118 : const ReceivedDataSample& data); 119 : 120 : FragmentRange frag_range_; 121 : ReceivedDataSample rec_ds_; 122 : }; 123 : 124 : struct FragInfo { 125 : typedef OPENDDS_LIST(FragSample) FragSampleList; 126 : typedef OPENDDS_MAP(FragmentNumber, FragSampleList::iterator) FragSampleListIterMap; 127 : 128 : typedef OPENDDS_LIST(FragmentRange) FragGapList; 129 : typedef OPENDDS_MAP(FragmentNumber, FragGapList::iterator) FragGapListIterMap; 130 : 131 : FragInfo(); 132 : FragInfo(bool hf, const FragSampleList& rl, ACE_UINT32 tf, const MonotonicTimePoint& expiration); 133 : FragInfo(const FragInfo& val); 134 : 135 : FragInfo& operator=(const FragInfo& rhs); 136 : 137 : bool insert(const FragmentRange& fragRange, ReceivedDataSample& data); 138 : 139 : bool have_first_; 140 : FragSampleList sample_list_; 141 : FragSampleListIterMap sample_finder_; 142 : FragGapList gap_list_; 143 : FragGapListIterMap gap_finder_; 144 : ACE_UINT32 total_frags_; 145 : MonotonicTimePoint expiration_; 146 : }; 147 : 148 : mutable ACE_Thread_Mutex mutex_; 149 : 150 : #ifdef ACE_HAS_CPP11 151 : typedef OPENDDS_UNORDERED_MAP_CHASH(FragKey, FragInfo, FragKeyHash) FragInfoMap; 152 : #else 153 : typedef OPENDDS_MAP(FragKey, FragInfo) FragInfoMap; 154 : #endif 155 : FragInfoMap fragments_; 156 : 157 : typedef std::pair<MonotonicTimePoint, FragKey> ElementType; 158 : typedef OPENDDS_LIST(ElementType) ExpirationQueue; 159 : ExpirationQueue expiration_queue_; 160 : 161 : typedef OPENDDS_MAP_CMP(GUID_t, DisjointSequence, GUID_tKeyLessThan) CompletedMap; 162 : CompletedMap completed_; 163 : 164 : TimeDuration timeout_; 165 : 166 : void check_expirations(const MonotonicTimePoint& now); 167 : }; 168 : 169 : typedef RcHandle<TransportReassembly> TransportReassembly_rch; 170 : 171 : } 172 : } 173 : 174 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 175 : 176 : #endif