OpenDDS  Snapshot(2023/04/28-20:55)
TransportReassembly.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTREASSEMBLY_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTREASSEMBLY_H
10 
11 #include "ReceivedDataSample.h"
12 
13 #include "dds/DCPS/dcps_export.h"
14 #include "dds/DCPS/Definitions.h"
16 #include "dds/DCPS/PoolAllocator.h"
17 #include "dds/DCPS/RcObject.h"
18 #include "dds/DCPS/TimeTypes.h"
19 
21 
22 namespace OpenDDS {
23 namespace DCPS {
24 
25 #pragma pack(push, 1)
26 
27 // A FragKey represents the identifier for an original (pre-fragmentation)
28 // message. Since DataSampleHeader sequence numbers are distinct for each
29 // "publication" (DataWriter), the partially-received messages need to be
30 // stored in a structure that's keyed off of both the GUID_t and the
31 // SequenceNumber.
32 struct FragKey {
33  FragKey(const GUID_t& pubId, const SequenceNumber& dataSampleSeq);
34 
35  bool operator<(const FragKey& rhs) const
36  {
37  if (compare_(this->publication_, rhs.publication_)) return true;
38  if (compare_(rhs.publication_, this->publication_)) return false;
39  return this->data_sample_seq_ < rhs.data_sample_seq_;
40  }
41 
42  bool operator==(const FragKey& other) const
43  {
44  return publication_ == other.publication_ && data_sample_seq_ == other.data_sample_seq_;
45  }
46 
47  bool operator!=(const FragKey& other) const
48  {
49  return !(*this == other);
50  }
51 
55 };
56 
57 #pragma pack(pop)
58 
59 #if defined ACE_HAS_CPP11
60  OPENDDS_OOAT_CUSTOM_HASH(FragKey, OpenDDS_Dcps_Export, FragKeyHash);
61 #endif
62 
63 typedef std::pair<FragmentNumber, FragmentNumber> FragmentRange;
64 
66 public:
67  explicit TransportReassembly(const TimeDuration& timeout = TimeDuration(300));
68 
69  /// Called by TransportReceiveStrategy if the fragmentation header flag
70  /// is set. Returns true/false to indicate if data should be delivered to
71  /// the datalink. The 'data' argument may be modified by this method.
72  bool reassemble(const SequenceNumber& transportSeq, bool firstFrag,
73  ReceivedDataSample& data, ACE_UINT32 total_frags = 0);
74 
75  bool reassemble(const FragmentRange& fragRange, ReceivedDataSample& data, ACE_UINT32 total_frags = 0);
76 
77  /// Called by TransportReceiveStrategy to indicate that we can
78  /// stop tracking partially-reassembled messages when we know the
79  /// remaining fragments are not expected to arrive.
80  void data_unavailable(const FragmentRange& transportSeqDropped);
81 
82  void data_unavailable(const SequenceNumber& dataSampleSeq,
83  const GUID_t& pub_id);
84 
85  /// Clears out "completed" sequence numbers in order to allow resends for
86  /// new associations to the same (given) publication id
87  void clear_completed(const GUID_t& pub_id);
88 
89  /// Returns true if this object is storing fragments for the given
90  /// DataSampleHeader sequence number from the given publication.
91  bool has_frags(const SequenceNumber& seq, const GUID_t& pub_id) const
92  {
93  ACE_UINT32 total_frags;
94  return has_frags(seq, pub_id, total_frags);
95  }
96 
97  /// Returns true if this object is storing fragments for the given
98  /// DataSampleHeader sequence number from the given publication.
99  bool has_frags(const SequenceNumber& seq, const GUID_t& pub_id, ACE_UINT32& total_frags) const;
100 
101  /// Populates bitmap for missing fragment sequence numbers and set numBits
102  /// for the given message sequence and publisher ID.
103  /// @returns the base fragment sequence number for bit zero in the bitmap
104  CORBA::ULong get_gaps(const SequenceNumber& msg_seq, const GUID_t& pub_id,
105  CORBA::Long bitmap[], CORBA::ULong length,
106  CORBA::ULong& numBits) const;
107 
108 private:
109 
110  bool reassemble_i(const FragmentRange& fragRange, bool firstFrag,
111  ReceivedDataSample& data, ACE_UINT32 total_frags);
112 
113  // A FragSample represents a chunk of a partially-reassembled message.
114  // The frag_range_ range is the range of transport sequence numbers
115  // that were used to send the given chunk of data.
116  struct FragSample {
117  FragSample(const FragmentRange& fragRange,
118  const ReceivedDataSample& data);
119 
120  FragmentRange frag_range_;
122  };
123 
124  struct FragInfo {
125  typedef OPENDDS_LIST(FragSample) FragSampleList;
126  typedef OPENDDS_MAP(FragmentNumber, FragSampleList::iterator) FragSampleListIterMap;
127 
128  typedef OPENDDS_LIST(FragmentRange) FragGapList;
129  typedef OPENDDS_MAP(FragmentNumber, FragGapList::iterator) FragGapListIterMap;
130 
131  FragInfo();
132  FragInfo(bool hf, const FragSampleList& rl, ACE_UINT32 tf, const MonotonicTimePoint& expiration);
133  FragInfo(const FragInfo& val);
134 
135  FragInfo& operator=(const FragInfo& rhs);
136 
137  bool insert(const FragmentRange& fragRange, ReceivedDataSample& data);
138 
140  FragSampleList sample_list_;
141  FragSampleListIterMap sample_finder_;
142  FragGapList gap_list_;
143  FragGapListIterMap gap_finder_;
144  ACE_UINT32 total_frags_;
146  };
147 
149 
150 #ifdef ACE_HAS_CPP11
151  typedef OPENDDS_UNORDERED_MAP_CHASH(FragKey, FragInfo, FragKeyHash) FragInfoMap;
152 #else
153  typedef OPENDDS_MAP(FragKey, FragInfo) FragInfoMap;
154 #endif
155  FragInfoMap fragments_;
156 
157  typedef std::pair<MonotonicTimePoint, FragKey> ElementType;
158  typedef OPENDDS_LIST(ElementType) ExpirationQueue;
159  ExpirationQueue expiration_queue_;
160 
162  CompletedMap completed_;
163 
165 
166  void check_expirations(const MonotonicTimePoint& now);
167 };
168 
170 
171 }
172 }
173 
175 
176 #endif
ACE_CDR::Long Long
FragKey(const GUID_t &pubId, const SequenceNumber &dataSampleSeq)
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
RcHandle< TransportReassembly > TransportReassembly_rch
bool operator!=(const FragKey &other) const
bool has_frags(const SequenceNumber &seq, const GUID_t &pub_id) const
ACE_CDR::ULong ULong
std::pair< FragmentNumber, FragmentNumber > FragmentRange
Holds a data sample received by the transport.
SequenceNumber::Value FragmentNumber
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
SequenceNumber data_sample_seq_
Sequence number abstraction. Only allows positive 64 bit values.
static GUID_tKeyLessThan compare_
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
std::pair< MonotonicTimePoint, FragKey > ElementType
bool operator<(const FragKey &rhs) const
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
typedef OPENDDS_LIST(SubsectionPair) KeyList
bool operator==(const FragKey &other) const