00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportReassembly.h"
00010 #include "TransportDebug.h"
00011
00012 #include "dds/DCPS/GuidConverter.h"
00013 #include "dds/DCPS/DisjointSequence.h"
00014
00015 namespace OpenDDS {
00016 namespace DCPS {
00017
00018 TransportReassembly::FragKey::FragKey(const PublicationId& pubId,
00019 const SequenceNumber& dataSampleSeq)
00020 : publication_(pubId)
00021 , data_sample_seq_(dataSampleSeq)
00022 {
00023 }
00024
00025 GUID_tKeyLessThan TransportReassembly::FragKey::compare_;
00026
00027 TransportReassembly::FragRange::FragRange(const SequenceRange& seqRange,
00028 const ReceivedDataSample& data)
00029 : transport_seq_(seqRange)
00030 , rec_ds_(data)
00031 {
00032 }
00033
00034 namespace {
00035 inline void join_err(const char* detail)
00036 {
00037 ACE_ERROR((LM_ERROR,
00038 ACE_TEXT("(%P|%t) ERROR: TransportReassembly::insert() - ")
00039 ACE_TEXT("DataSampleHeaders could not be joined: %C\n"), detail));
00040 }
00041 }
00042
00043 bool
00044 TransportReassembly::insert(OPENDDS_LIST(FragRange)& flist,
00045 const SequenceRange& seqRange,
00046 ReceivedDataSample& data)
00047 {
00048 const SequenceNumber::Value prev = seqRange.first.getValue() - 1,
00049 next = seqRange.second.getValue() + 1;
00050
00051 for (OPENDDS_LIST(FragRange)::iterator it = flist.begin(); it != flist.end(); ++it) {
00052 FragRange& fr = *it;
00053 if (next < fr.transport_seq_.first.getValue()) {
00054
00055 flist.insert(it, FragRange(seqRange, data));
00056 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00057 "inserted on left\n"));
00058 return true;
00059
00060 } else if (next == fr.transport_seq_.first.getValue()) {
00061
00062 DataSampleHeader joined;
00063 if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) {
00064 join_err("left");
00065 return false;
00066 }
00067 fr.rec_ds_.header_ = joined;
00068 if (fr.rec_ds_.sample_ && !data.sample_) {
00069 ACE_Message_Block::release(fr.rec_ds_.sample_);
00070 fr.rec_ds_.sample_ = 0;
00071 } else if (!fr.rec_ds_.sample_) {
00072 ACE_Message_Block::release(data.sample_);
00073 } else {
00074 ACE_Message_Block* last;
00075 for (last = data.sample_; last->cont(); last = last->cont()) ;
00076 last->cont(fr.rec_ds_.sample_);
00077 fr.rec_ds_.sample_ = data.sample_;
00078 }
00079 data.sample_ = 0;
00080 fr.transport_seq_.first = seqRange.first;
00081 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00082 "combined on left\n"));
00083 return true;
00084
00085 } else if (prev == fr.transport_seq_.second.getValue()) {
00086
00087 if (!fr.rec_ds_.sample_) {
00088 fr.rec_ds_.header_.more_fragments_ = true;
00089 }
00090 DataSampleHeader joined;
00091 if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) {
00092 join_err("right");
00093 return false;
00094 }
00095 fr.rec_ds_.header_ = joined;
00096 if (fr.rec_ds_.sample_ && !data.sample_) {
00097 ACE_Message_Block::release(fr.rec_ds_.sample_);
00098 fr.rec_ds_.sample_ = 0;
00099 } else if (!fr.rec_ds_.sample_) {
00100 ACE_Message_Block::release(data.sample_);
00101 } else {
00102 ACE_Message_Block* last;
00103 for (last = fr.rec_ds_.sample_; last->cont(); last = last->cont()) ;
00104 last->cont(data.sample_);
00105 }
00106 data.sample_ = 0;
00107 fr.transport_seq_.second = seqRange.second;
00108 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00109 "combined on right\n"));
00110
00111
00112 if (++it != flist.end()) {
00113 if (next == it->transport_seq_.first.getValue()) {
00114 if (!fr.rec_ds_.sample_) {
00115 fr.rec_ds_.header_.more_fragments_ = true;
00116 }
00117 if (!DataSampleHeader::join(fr.rec_ds_.header_, it->rec_ds_.header_,
00118 joined)) {
00119 join_err("combined next");
00120 return false;
00121 }
00122 fr.rec_ds_.header_ = joined;
00123 if (!it->rec_ds_.sample_) {
00124 ACE_Message_Block::release(fr.rec_ds_.sample_);
00125 fr.rec_ds_.sample_ = 0;
00126 } else {
00127 if (!fr.rec_ds_.sample_) {
00128 ACE_ERROR((LM_ERROR,
00129 ACE_TEXT("(%P|%t) ERROR: ")
00130 ACE_TEXT("OpenDDS::DCPS::TransportReassembly::insert, ")
00131 ACE_TEXT("Cannot dereference null pointer fr.rec_ds_.sample_\n")));
00132 return false;
00133 }
00134 ACE_Message_Block* last;
00135 for (last = fr.rec_ds_.sample_; last->cont(); last = last->cont()) ;
00136 last->cont(it->rec_ds_.sample_);
00137 it->rec_ds_.sample_ = 0;
00138 }
00139 fr.transport_seq_.second = it->transport_seq_.second;
00140 flist.erase(it);
00141 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00142 "coalesced on right\n"));
00143 }
00144 }
00145 return true;
00146 }
00147 }
00148
00149
00150 flist.push_back(FragRange(seqRange, data));
00151 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00152 "inserted at end of list\n"));
00153 return true;
00154 }
00155
00156 bool
00157 TransportReassembly::has_frags(const SequenceNumber& seq,
00158 const RepoId& pub_id) const
00159 {
00160 return fragments_.count(FragKey(pub_id, seq));
00161 }
00162
00163 CORBA::ULong
00164 TransportReassembly::get_gaps(const SequenceNumber& seq, const RepoId& pub_id,
00165 CORBA::Long bitmap[], CORBA::ULong length,
00166 CORBA::ULong& numBits) const
00167 {
00168
00169
00170 const FragMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
00171 if (iter == fragments_.end() || length == 0) {
00172
00173 return 0;
00174 }
00175
00176
00177
00178
00179
00180 const OPENDDS_LIST(FragRange)& flist = iter->second;
00181 const SequenceNumber& first = flist.front().transport_seq_.first;
00182 const CORBA::ULong base = (first == 1)
00183 ? flist.front().transport_seq_.second.getLow() + 1
00184 : 1;
00185
00186 if (first != 1) {
00187
00188
00189 DisjointSequence::fill_bitmap_range(0, first.getLow() - 2,
00190 bitmap, length, numBits);
00191 } else if (flist.size() == 1) {
00192
00193 DisjointSequence::fill_bitmap_range(0, 0,
00194 bitmap, length, numBits);
00195
00196
00197
00198 return base;
00199 }
00200
00201 typedef OPENDDS_LIST(FragRange)::const_iterator list_iterator;
00202 for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00203 const list_iterator it_next = ++list_iterator(it);
00204 if (it_next == flist.end()) {
00205 break;
00206 }
00207 const CORBA::ULong low = it->transport_seq_.second.getLow() + 1 - base,
00208 high = it_next->transport_seq_.first.getLow() - 1 - base;
00209 DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits);
00210 }
00211
00212 return base;
00213 }
00214
00215 bool
00216 TransportReassembly::reassemble(const SequenceRange& seqRange,
00217 ReceivedDataSample& data)
00218 {
00219 return reassemble_i(seqRange, seqRange.first == 1, data);
00220 }
00221
00222 bool
00223 TransportReassembly::reassemble(const SequenceNumber& transportSeq,
00224 bool firstFrag,
00225 ReceivedDataSample& data)
00226 {
00227 return reassemble_i(SequenceRange(transportSeq, transportSeq),
00228 firstFrag, data);
00229 }
00230
00231 bool
00232 TransportReassembly::reassemble_i(const SequenceRange& seqRange,
00233 bool firstFrag,
00234 ReceivedDataSample& data)
00235 {
00236 if (Transport_debug_level > 5) {
00237 GuidConverter conv(data.header_.publication_id_);
00238 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() "
00239 "tseq %q-%q first %d dseq %q pub %C\n", seqRange.first.getValue(),
00240 seqRange.second.getValue(), firstFrag ? 1 : 0,
00241 data.header_.sequence_.getValue(), OPENDDS_STRING(conv).c_str()));
00242 }
00243
00244 const FragKey key(data.header_.publication_id_, data.header_.sequence_);
00245
00246 if (firstFrag) {
00247 have_first_.insert(key);
00248 }
00249
00250 FragMap::iterator iter = fragments_.find(key);
00251 if (iter == fragments_.end()) {
00252 fragments_[key].push_back(FragRange(seqRange, data));
00253
00254 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() "
00255 "stored first frag, returning false (incomplete)\n"));
00256 return false;
00257 }
00258
00259 if (!insert(iter->second, seqRange, data)) {
00260
00261 return false;
00262 }
00263
00264
00265
00266
00267
00268 if (have_first_.count(key)
00269 && iter->second.size() == 1
00270 && !iter->second.front().rec_ds_.header_.more_fragments_) {
00271 swap(data, iter->second.front().rec_ds_);
00272 fragments_.erase(iter);
00273 have_first_.erase(key);
00274 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() "
00275 "removed frag, returning %C\n", data.sample_ ? "true" : "false"));
00276 return data.sample_;
00277 }
00278
00279 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() "
00280 "returning false (incomplete)\n"));
00281 return false;
00282 }
00283
00284 void
00285 TransportReassembly::data_unavailable(const SequenceRange& dropped)
00286 {
00287 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::data_unavailable() "
00288 "dropped %q-%q\n", dropped.first.getValue(), dropped.second.getValue()));
00289 typedef OPENDDS_LIST(FragRange)::iterator list_iterator;
00290
00291 for (FragMap::iterator iter = fragments_.begin(); iter != fragments_.end();
00292 ++iter) {
00293 const FragKey& key = iter->first;
00294 OPENDDS_LIST(FragRange)& flist = iter->second;
00295
00296 ReceivedDataSample dummy(0);
00297 dummy.header_.sequence_ = key.data_sample_seq_;
00298
00299
00300 const SequenceNumber::Value prev =
00301 flist.front().transport_seq_.first.getValue() - 1;
00302 if (dropped.second.getValue() == prev && !have_first_.count(key)) {
00303 have_first_.insert(key);
00304 dummy.header_.more_fragments_ = true;
00305 insert(flist, dropped, dummy);
00306 continue;
00307 }
00308
00309
00310 for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00311 list_iterator it_next = it;
00312 ++it_next;
00313 if (it_next == flist.end()) {
00314 break;
00315 }
00316 FragRange& fr1 = *it;
00317 FragRange& fr2 = *it_next;
00318 if (dropped.first > fr1.transport_seq_.second
00319 && dropped.second < fr2.transport_seq_.first) {
00320 dummy.header_.more_fragments_ = true;
00321 insert(flist, dropped, dummy);
00322 break;
00323 }
00324 }
00325
00326
00327 const SequenceNumber next =
00328 ++SequenceNumber(flist.back().transport_seq_.second);
00329 if (dropped.first == next) {
00330 flist.back().rec_ds_.header_.more_fragments_ = true;
00331 insert(flist, dropped, dummy);
00332 }
00333 }
00334 }
00335
00336 void
00337 TransportReassembly::data_unavailable(const SequenceNumber& dataSampleSeq,
00338 const RepoId& pub_id)
00339 {
00340 const FragKey key(pub_id, dataSampleSeq);
00341 fragments_.erase(key);
00342 have_first_.erase(key);
00343 }
00344
00345 }
00346 }