23 , data_sample_seq_(dataSampleSeq)
31 : frag_range_(fragRange)
44 ACE_UINT32& total_frags)
const 49 total_frags = iter->second.total_frags_;
87 ? flist.front().frag_range_.second + 1
95 bitmap, length, numBits, bits_added);
96 }
else if (flist.size() == 1) {
98 if (iter->second.total_frags_ == 0) {
102 const size_t rlimit =
static_cast<size_t>(flist.back().frag_range_.second - 1);
107 bitmap, length, numBits, bits_added);
116 for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
117 const list_iterator it_next = ++list_iterator(it);
118 if (it_next == flist.end()) {
122 high = static_cast<CORBA::ULong>(it_next->frag_range_.first - 1 - base);
133 ACE_UINT32 total_frags)
136 return reassemble_i(fragRange, fragRange.first == 1, data, total_frags);
143 ACE_UINT32 total_frags)
147 firstFrag, data, total_frags);
154 ACE_UINT32 total_frags)
159 "tseq %q-%q first %d dseq %q pub %C\n", fragRange.first,
160 fragRange.second, firstFrag ? 1 : 0,
168 FragInfoMap::iterator iter =
fragments_.find(key);
173 finfo =
FragInfo(firstFrag, FragInfo::FragSampleList(), total_frags, expiration);
174 finfo.
insert(fragRange, data);
180 "stored first frag, returning false (incomplete) with %B fragments\n",
191 iter->second.have_first_ =
true;
193 if (iter->second.total_frags_ < total_frags) {
194 iter->second.total_frags_ = total_frags;
196 iter->second.expiration_ = expiration;
199 if (!iter->second.insert(fragRange, data)) {
208 if (iter->second.have_first_
209 && iter->second.sample_list_.size() == 1
210 && !iter->second.sample_list_.front().rec_ds_.header_.more_fragments_) {
211 std::swap(data, iter->second.sample_list_.front().rec_ds_);
216 "removed frag, returning %C with %B fragments\n",
222 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::reassemble_i: " 223 "returning false (incomplete)\n"));
231 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::data_unavailable(): " 232 "dropped %q-%q\n", dropped.first, dropped.second));
246 if (dropped.second == prev && !finfo.
have_first_) {
249 finfo.
insert(dropped, dummy);
254 for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
255 list_iterator it_next = it;
257 if (it_next == flist.end()) {
265 finfo.
insert(dropped, dummy);
272 if (dropped.first == next) {
273 flist.back().rec_ds_.header_.more_fragments_ =
true;
274 finfo.
insert(dropped, dummy);
287 "removed leaving %B fragments\n",
fragments_.size()));
297 if (iter->second.expiration_ <= now) {
301 "purge expired leaving %B fragments\n",
fragments_.size()));
304 expiration_queue_.push_back(std::make_pair(iter->second.expiration_, iter->first));
361 inline void join_err(
const char* detail)
364 ACE_TEXT(
"(%P|%t) ERROR: TransportReassembly::FragInfo::insert: ")
365 ACE_TEXT(
"DataSampleHeaders could not be joined: %C\n"), detail));
372 const FragmentNumber prev = fragRange.first - 1, next = fragRange.second + 1;
380 if (start->frag_range_.second != prev && start !=
sample_list_.begin()) {
393 for (FragSampleList::iterator it = start; it !=
sample_list_.end(); ++it) {
399 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::insert: (SN: %q) inserted %q-%q on the left of %q-%q\n", sn, fragRange.first, fragRange.second, fr.
frag_range_.first, fr.
frag_range_.second));
416 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::insert: (SN: %q) combined %q-%q with %q-%q on the left\n", sn, fragRange.first, fragRange.second, fr.
frag_range_.first, fr.
frag_range_.second));
420 }
else if (fragRange.first < fr.
frag_range_.first) {
422 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::insert: (SN: %q) splitting %q-%q into %q-%q and %q-%q and recursively inserting both\n", sn, fragRange.first, fragRange.second, fragRange.first, fr.
frag_range_.first - 1, fr.
frag_range_.first, fragRange.second));
432 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::insert: (SN: %q) splitting %q-%q in order to recursively insert %q-%q\n", sn, fragRange.first, fragRange.second, fr.
frag_range_.second + 1, fragRange.second));
455 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::insert: (SN: %q) combined %q-%q with %q-%q on the right, removing and recursingly inserting\n", sn, fragRange.first, fragRange.second, fr.
frag_range_.first, fr.
frag_range_.second));
463 return insert(range, copy);
466 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::insert: (SN: %q) duplicate fragment range %q-%q, dropping\n", sn, fragRange.first, fragRange.second));
474 VDBG((
LM_DEBUG,
"(%P|%t) TransportReassembly::insert: (SN: %q) inserting %q-%q at the end of the fragment buffer list\n", sn, fragRange.first, fragRange.second));
DataSampleHeader header_
The demarshalled sample header.
void swap(MessageBlock &lhs, MessageBlock &rhs)
FragInfo & operator=(const FragInfo &rhs)
FragKey(const GUID_t &pubId, const SequenceNumber &dataSampleSeq)
bool has_data() const
true if at least one Data Block is stored (even if it has 0 useable bytes)
OpenDDS_Dcps_Export TransportDebug transport_debug
ReceivedDataSample get_fragment_range(FragmentNumber start_frag, FragmentNumber end_frag=INVALID_FRAGMENT)
ReceivedDataSample rec_ds_
TransportReassembly(const TimeDuration &timeout=TimeDuration(300))
const char * c_str() const
FragmentRange frag_range_
FragSampleList sample_list_
ExpirationQueue expiration_queue_
void check_expirations(const MonotonicTimePoint &now)
bool has_frags(const SequenceNumber &seq, const GUID_t &pub_id) const
static TimePoint_T< MonotonicClock > now()
typedef OPENDDS_LIST(ElementType) ExpirationQueue
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
MonotonicTimePoint expiration_
void data_unavailable(const FragmentRange &transportSeqDropped)
std::pair< FragmentNumber, FragmentNumber > FragmentRange
Holds a data sample received by the transport.
SequenceNumber::Value FragmentNumber
bool more_fragments_
The current "Data Sample" needs reassembly before further processing.
bool log_fragment_storage
FragSampleListIterMap sample_finder_
static bool join(const DataSampleHeader &first, const DataSampleHeader &second, DataSampleHeader &result)
void clear_completed(const GUID_t &pub_id)
SequenceNumber data_sample_seq_
Sequence number abstraction. Only allows positive 64 bit values.
bool reassemble(const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags=0)
DDS::ReturnCode_t copy(DDS::DynamicData_ptr dest, DDS::DynamicData_ptr src)
static GUID_tKeyLessThan compare_
FragSample(const FragmentRange &fragRange, const ReceivedDataSample &data)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
CORBA::ULong get_gaps(const SequenceNumber &msg_seq, const GUID_t &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const
bool insert(const FragmentRange &fragRange, ReceivedDataSample &data)
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.
void append(ReceivedDataSample &suffix)
Update this ReceivedDataSample's data payload to include the suffix's data payload after any existing...
void prepend(ReceivedDataSample &prefix)
Update this ReceivedDataSample's data payload to include the prefix's data payload before any existin...
The Internal API and Implementation of OpenDDS.
FragGapListIterMap gap_finder_
bool reassemble_i(const FragmentRange &fragRange, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags)