OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::TransportReassembly Class Reference

#include <TransportReassembly.h>

Inheritance diagram for OpenDDS::DCPS::TransportReassembly:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportReassembly:
Collaboration graph
[legend]

Classes

struct  FragInfo
 
struct  FragSample
 

Public Member Functions

 TransportReassembly (const TimeDuration &timeout=TimeDuration(300))
 
bool reassemble (const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags=0)
 
bool reassemble (const FragmentRange &fragRange, ReceivedDataSample &data, ACE_UINT32 total_frags=0)
 
void data_unavailable (const FragmentRange &transportSeqDropped)
 
void data_unavailable (const SequenceNumber &dataSampleSeq, const GUID_t &pub_id)
 
void clear_completed (const GUID_t &pub_id)
 
bool has_frags (const SequenceNumber &seq, const GUID_t &pub_id) const
 
bool has_frags (const SequenceNumber &seq, const GUID_t &pub_id, ACE_UINT32 &total_frags) const
 
CORBA::ULong get_gaps (const SequenceNumber &msg_seq, const GUID_t &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Types

typedef std::pair< MonotonicTimePoint, FragKeyElementType
 

Private Member Functions

bool reassemble_i (const FragmentRange &fragRange, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags)
 
typedef OPENDDS_MAP (FragKey, FragInfo) FragInfoMap
 
typedef OPENDDS_LIST (ElementType) ExpirationQueue
 
typedef OPENDDS_MAP_CMP (GUID_t, DisjointSequence, GUID_tKeyLessThan) CompletedMap
 
void check_expirations (const MonotonicTimePoint &now)
 

Private Attributes

ACE_Thread_Mutex mutex_
 
FragInfoMap fragments_
 
ExpirationQueue expiration_queue_
 
CompletedMap completed_
 
TimeDuration timeout_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Detailed Description

Definition at line 65 of file TransportReassembly.h.

Member Typedef Documentation

◆ ElementType

Definition at line 157 of file TransportReassembly.h.

Constructor & Destructor Documentation

◆ TransportReassembly()

OpenDDS::DCPS::TransportReassembly::TransportReassembly ( const TimeDuration timeout = TimeDuration(300))
explicit

Definition at line 36 of file TransportReassembly.cpp.

37  : timeout_(timeout)
38 {
39 }

Member Function Documentation

◆ check_expirations()

void OpenDDS::DCPS::TransportReassembly::check_expirations ( const MonotonicTimePoint now)
private

Definition at line 291 of file TransportReassembly.cpp.

References ACE_DEBUG, expiration_queue_, fragments_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_fragment_storage, OpenDDS::DCPS::transport_debug, and OpenDDS::DCPS::Transport_debug_level.

Referenced by reassemble_i().

292 {
293  while (!expiration_queue_.empty() && expiration_queue_.front().first <= now) {
294  FragInfoMap::iterator iter = fragments_.find(expiration_queue_.front().second);
295  if (iter != fragments_.end()) {
296  // FragInfo::expiration_ may have changed after insertion into expiration_queue_
297  if (iter->second.expiration_ <= now) {
298  fragments_.erase(iter);
300  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::check_expirations: "
301  "purge expired leaving %B fragments\n", fragments_.size()));
302  }
303  } else {
304  expiration_queue_.push_back(std::make_pair(iter->second.expiration_, iter->first));
305  }
306  }
307  expiration_queue_.pop_front();
308  }
309 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25

◆ clear_completed()

void OpenDDS::DCPS::TransportReassembly::clear_completed ( const GUID_t pub_id)

Clears out "completed" sequence numbers in order to allow resends for new associations to the same (given) publication id

Definition at line 56 of file TransportReassembly.cpp.

References completed_, and mutex_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::clear_completed_fragments().

◆ data_unavailable() [1/2]

void OpenDDS::DCPS::TransportReassembly::data_unavailable ( const FragmentRange transportSeqDropped)

Called by TransportReceiveStrategy to indicate that we can stop tracking partially-reassembled messages when we know the remaining fragments are not expected to arrive.

Definition at line 228 of file TransportReassembly.cpp.

References OpenDDS::DCPS::FragKey::data_sample_seq_, OpenDDS::DCPS::TransportReassembly::FragSample::frag_range_, fragments_, OpenDDS::DCPS::TransportReassembly::FragInfo::have_first_, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::TransportReassembly::FragInfo::insert(), LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::more_fragments_, mutex_, OPENDDS_LIST(), OpenDDS::DCPS::TransportReassembly::FragInfo::sample_list_, OpenDDS::DCPS::DataSampleHeader::sequence_, and VDBG.

Referenced by OpenDDS::DCPS::BestEffortSession::check_header(), OpenDDS::DCPS::ReliableSession::expire_naks(), OpenDDS::DCPS::ReliableSession::nakack_received(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments().

229 {
231  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::data_unavailable(): "
232  "dropped %q-%q\n", dropped.first, dropped.second));
233  typedef OPENDDS_LIST(FragSample)::iterator list_iterator;
234 
235  for (FragInfoMap::iterator iter = fragments_.begin(); iter != fragments_.end();
236  ++iter) {
237  const FragKey& key = iter->first;
238  FragInfo& finfo = iter->second;
239  FragInfo::FragSampleList& flist = finfo.sample_list_;
240 
241  ReceivedDataSample dummy;
242  dummy.header_.sequence_ = key.data_sample_seq_;
243 
244  // check if we should expand the front element (only if !have_first)
245  const FragmentNumber prev = flist.front().frag_range_.first - 1;
246  if (dropped.second == prev && !finfo.have_first_) {
247  finfo.have_first_ = true;
248  dummy.header_.more_fragments_ = true;
249  finfo.insert(dropped, dummy);
250  continue;
251  }
252 
253  // find a gap between list elements where "dropped" fits
254  for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
255  list_iterator it_next = it;
256  ++it_next;
257  if (it_next == flist.end()) {
258  break;
259  }
260  FragSample& fr1 = *it;
261  FragSample& fr2 = *it_next;
262  if (dropped.first > fr1.frag_range_.second
263  && dropped.second < fr2.frag_range_.first) {
264  dummy.header_.more_fragments_ = true;
265  finfo.insert(dropped, dummy);
266  break;
267  }
268  }
269 
270  // check if we should expand the last element
271  const FragmentNumber next = flist.back().frag_range_.second + 1;
272  if (dropped.first == next) {
273  flist.back().rec_ds_.header_.more_fragments_ = true;
274  finfo.insert(dropped, dummy);
275  }
276  }
277 }
sequence< octet > key
SequenceNumber::Value FragmentNumber
typedef OPENDDS_LIST(ElementType) ExpirationQueue
#define VDBG(DBG_ARGS)

◆ data_unavailable() [2/2]

void OpenDDS::DCPS::TransportReassembly::data_unavailable ( const SequenceNumber dataSampleSeq,
const GUID_t pub_id 
)

Definition at line 280 of file TransportReassembly.cpp.

References ACE_DEBUG, fragments_, LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_fragment_storage, mutex_, OpenDDS::DCPS::transport_debug, and OpenDDS::DCPS::Transport_debug_level.

282 {
284  if (fragments_.erase(FragKey(pub_id, dataSampleSeq)) &&
286  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::data_unavailable: "
287  "removed leaving %B fragments\n", fragments_.size()));
288  }
289 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25

◆ get_gaps()

CORBA::ULong OpenDDS::DCPS::TransportReassembly::get_gaps ( const SequenceNumber msg_seq,
const GUID_t pub_id,
CORBA::Long  bitmap[],
CORBA::ULong  length,
CORBA::ULong numBits 
) const

Populates bitmap for missing fragment sequence numbers and set numBits for the given message sequence and publisher ID.

Returns
the base fragment sequence number for bit zero in the bitmap

Definition at line 63 of file TransportReassembly.cpp.

References OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), fragments_, mutex_, and OPENDDS_LIST().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments().

66 {
68  // length is number of (allocated) words in bitmap, max of 8
69  // numBits is number of valid bits in the bitmap, <= length * 32, to account for partial words
70  if (length == 0) {
71  return 0;
72  }
73 
74  const FragInfoMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
75  if (iter == fragments_.end()) {
76  // Nothing missing
77  return 0;
78  }
79 
80  // RTPS's FragmentNumbers are 32-bit values, so we'll only be using the
81  // low 32 bits of the 64-bit generalized sequence numbers in
82  // FragSample::frag_range_.
83 
84  const OPENDDS_LIST(FragSample)& flist = iter->second.sample_list_;
85  const FragmentNumber first = flist.front().frag_range_.first;
86  const CORBA::ULong base = static_cast<CORBA::ULong>((first == 1)
87  ? flist.front().frag_range_.second + 1
88  : 1);
89 
90  if (first != 1) {
91  // Represent the "gap" before the first list element.
92  // base == 1 and the first 2 args to fill_bitmap_range() are deltas of base
93  ACE_CDR::ULong bits_added = 0;
94  DisjointSequence::fill_bitmap_range(0, static_cast<CORBA::ULong>(first - 2),
95  bitmap, length, numBits, bits_added);
96  } else if (flist.size() == 1) {
97  // No gaps, but we know there are (at least 1) more_fragments
98  if (iter->second.total_frags_ == 0) {
99  ACE_CDR::ULong bits_added = 0;
100  DisjointSequence::fill_bitmap_range(0, 0, bitmap, length, numBits, bits_added);
101  } else {
102  const size_t rlimit = static_cast<size_t>(flist.back().frag_range_.second - 1);
103  const CORBA::ULong ulimit = static_cast<CORBA::ULong>(iter->second.total_frags_ - (base < rlimit ? rlimit : base));
104  ACE_CDR::ULong bits_added = 0;
106  ulimit,
107  bitmap, length, numBits, bits_added);
108  }
109  // NOTE: this could send a nack for fragments that are in flight
110  // need to defer setting bitmap till heartbeat extending logic
111  // in RtpsUdpDataLink::generate_nack_frags
112  return base;
113  }
114 
115  typedef OPENDDS_LIST(FragSample)::const_iterator list_iterator;
116  for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
117  const list_iterator it_next = ++list_iterator(it);
118  if (it_next == flist.end()) {
119  break;
120  }
121  const CORBA::ULong low = static_cast<CORBA::ULong>(it->frag_range_.second + 1 - base),
122  high = static_cast<CORBA::ULong>(it_next->frag_range_.first - 1 - base);
123  ACE_CDR::ULong bits_added = 0;
124  DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits, bits_added);
125  }
126 
127  return base;
128 }
SequenceNumber::Value FragmentNumber
typedef OPENDDS_LIST(ElementType) ExpirationQueue
ACE_CDR::ULong ULong
ACE_UINT32 ULong
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.

◆ has_frags() [1/2]

bool OpenDDS::DCPS::TransportReassembly::has_frags ( const SequenceNumber seq,
const GUID_t pub_id 
) const
inline

Returns true if this object is storing fragments for the given DataSampleHeader sequence number from the given publication.

Definition at line 91 of file TransportReassembly.h.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap().

92  {
93  ACE_UINT32 total_frags;
94  return has_frags(seq, pub_id, total_frags);
95  }
bool has_frags(const SequenceNumber &seq, const GUID_t &pub_id) const

◆ has_frags() [2/2]

bool OpenDDS::DCPS::TransportReassembly::has_frags ( const SequenceNumber seq,
const GUID_t pub_id,
ACE_UINT32 &  total_frags 
) const

Returns true if this object is storing fragments for the given DataSampleHeader sequence number from the given publication.

Definition at line 42 of file TransportReassembly.cpp.

References fragments_, and mutex_.

45 {
47  const FragInfoMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
48  if (iter != fragments_.end()) {
49  total_frags = iter->second.total_frags_;
50  return true;
51  }
52  return false;
53 }

◆ OPENDDS_LIST()

typedef OpenDDS::DCPS::TransportReassembly::OPENDDS_LIST ( ElementType  )
private

Referenced by data_unavailable(), and get_gaps().

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::TransportReassembly::OPENDDS_MAP ( FragKey  ,
FragInfo   
)
private

◆ OPENDDS_MAP_CMP()

typedef OpenDDS::DCPS::TransportReassembly::OPENDDS_MAP_CMP ( GUID_t  ,
DisjointSequence  ,
GUID_tKeyLessThan   
)
private

◆ reassemble() [1/2]

bool OpenDDS::DCPS::TransportReassembly::reassemble ( const SequenceNumber transportSeq,
bool  firstFrag,
ReceivedDataSample data,
ACE_UINT32  total_frags = 0 
)

Called by TransportReceiveStrategy if the fragmentation header flag is set. Returns true/false to indicate if data should be delivered to the datalink. The 'data' argument may be modified by this method.

Definition at line 140 of file TransportReassembly.cpp.

References OpenDDS::DCPS::SequenceNumber::getValue(), mutex_, and reassemble_i().

Referenced by OpenDDS::DCPS::MulticastSession::reassemble(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble_i().

144 {
146  return reassemble_i(FragmentRange(transportSeq.getValue(), transportSeq.getValue()),
147  firstFrag, data, total_frags);
148 }
bool reassemble_i(const FragmentRange &fragRange, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags)
std::pair< FragmentNumber, FragmentNumber > FragmentRange

◆ reassemble() [2/2]

bool OpenDDS::DCPS::TransportReassembly::reassemble ( const FragmentRange fragRange,
ReceivedDataSample data,
ACE_UINT32  total_frags = 0 
)

Definition at line 131 of file TransportReassembly.cpp.

References mutex_, and reassemble_i().

134 {
136  return reassemble_i(fragRange, fragRange.first == 1, data, total_frags);
137 }
bool reassemble_i(const FragmentRange &fragRange, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags)

◆ reassemble_i()

bool OpenDDS::DCPS::TransportReassembly::reassemble_i ( const FragmentRange fragRange,
bool  firstFrag,
ReceivedDataSample data,
ACE_UINT32  total_frags 
)
private

Definition at line 151 of file TransportReassembly.cpp.

References ACE_DEBUG, OpenDDS::DCPS::LogGuid::c_str(), check_expirations(), OpenDDS::DCPS::ReceivedDataSample::clear(), completed_, OpenDDS::DCPS::FragKey::data_sample_seq_, expiration_queue_, fragments_, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataSample::has_data(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::TransportReassembly::FragInfo::insert(), LM_DEBUG, OpenDDS::DCPS::TransportDebug::log_fragment_storage, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OpenDDS::DCPS::FragKey::publication_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::swap(), timeout_, OpenDDS::DCPS::transport_debug, OpenDDS::DCPS::Transport_debug_level, and VDBG.

Referenced by reassemble().

155 {
156  if (Transport_debug_level > 5) {
157  LogGuid logger(data.header_.publication_id_);
158  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
159  "tseq %q-%q first %d dseq %q pub %C\n", fragRange.first,
160  fragRange.second, firstFrag ? 1 : 0,
161  data.header_.sequence_.getValue(), logger.c_str()));
162  }
163 
165  check_expirations(now);
166 
167  const FragKey key(data.header_.publication_id_, data.header_.sequence_);
168  FragInfoMap::iterator iter = fragments_.find(key);
169  const MonotonicTimePoint expiration = now + timeout_;
170 
171  if (iter == fragments_.end()) {
172  FragInfo& finfo = fragments_[key];
173  finfo = FragInfo(firstFrag, FragInfo::FragSampleList(), total_frags, expiration);
174  finfo.insert(fragRange, data);
175  expiration_queue_.push_back(std::make_pair(expiration, key));
176  data.clear();
177  // since this is the first fragment we've seen, it can't possibly be done
179  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
180  "stored first frag, returning false (incomplete) with %B fragments\n",
181  fragments_.size()));
182  }
183  return false;
184  } else {
185  const CompletedMap::const_iterator citer = completed_.find(key.publication_);
186  if (citer != completed_.end() && citer->second.contains(key.data_sample_seq_)) {
187  // already completed, not storing or delivering this message
188  return false;
189  }
190  if (firstFrag) {
191  iter->second.have_first_ = true;
192  }
193  if (iter->second.total_frags_ < total_frags) {
194  iter->second.total_frags_ = total_frags;
195  }
196  iter->second.expiration_ = expiration;
197  }
198 
199  if (!iter->second.insert(fragRange, data)) {
200  // error condition, already logged by insert()
201  return false;
202  }
203 
204  // We can deliver data if all three of these conditions are met:
205  // 1. we've seen the "first fragment" flag [first frag is here]
206  // 2. all fragments have been coalesced [no gaps in the seq numbers]
207  // 3. the "more fragments" flag is not set [last frag is here]
208  if (iter->second.have_first_
209  && iter->second.sample_list_.size() == 1
210  && !iter->second.sample_list_.front().rec_ds_.header_.more_fragments_) {
211  std::swap(data, iter->second.sample_list_.front().rec_ds_);
212  fragments_.erase(iter);
213  completed_[key.publication_].insert(key.data_sample_seq_);
215  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
216  "removed frag, returning %C with %B fragments\n",
217  data.has_data() ? "true (complete)" : "false (incomplete)", fragments_.size()));
218  }
219  return data.has_data(); // could be false if we had data_unavailable()
220  }
221 
222  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
223  "returning false (incomplete)\n"));
224  return false;
225 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
sequence< octet > key
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define VDBG(DBG_ARGS)
void check_expirations(const MonotonicTimePoint &now)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
void swap(MessageBlock &lhs, MessageBlock &rhs)

Member Data Documentation

◆ completed_

CompletedMap OpenDDS::DCPS::TransportReassembly::completed_
private

Definition at line 162 of file TransportReassembly.h.

Referenced by clear_completed(), and reassemble_i().

◆ expiration_queue_

ExpirationQueue OpenDDS::DCPS::TransportReassembly::expiration_queue_
private

Definition at line 159 of file TransportReassembly.h.

Referenced by check_expirations(), and reassemble_i().

◆ fragments_

FragInfoMap OpenDDS::DCPS::TransportReassembly::fragments_
private

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::TransportReassembly::mutex_
mutableprivate

◆ timeout_

TimeDuration OpenDDS::DCPS::TransportReassembly::timeout_
private

Definition at line 164 of file TransportReassembly.h.

Referenced by reassemble_i().


The documentation for this class was generated from the following files: