00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportReassembly.h"
00010 #include "TransportDebug.h"
00011
00012 #include "dds/DCPS/GuidConverter.h"
00013 #include "dds/DCPS/DisjointSequence.h"
00014
00015 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00016
00017 namespace OpenDDS {
00018 namespace DCPS {
00019
00020 TransportReassembly::FragKey::FragKey(const PublicationId& pubId,
00021 const SequenceNumber& dataSampleSeq)
00022 : publication_(pubId)
00023 , data_sample_seq_(dataSampleSeq)
00024 {
00025 }
00026
00027 GUID_tKeyLessThan TransportReassembly::FragKey::compare_;
00028
00029 TransportReassembly::FragRange::FragRange(const SequenceRange& seqRange,
00030 const ReceivedDataSample& data)
00031 : transport_seq_(seqRange)
00032 , rec_ds_(data)
00033 {
00034 }
00035
00036 namespace {
00037 inline void join_err(const char* detail)
00038 {
00039 ACE_ERROR((LM_ERROR,
00040 ACE_TEXT("(%P|%t) ERROR: TransportReassembly::insert() - ")
00041 ACE_TEXT("DataSampleHeaders could not be joined: %C\n"), detail));
00042 }
00043 }
00044
00045 bool
00046 TransportReassembly::insert(OPENDDS_LIST(FragRange)& flist,
00047 const SequenceRange& seqRange,
00048 ReceivedDataSample& data)
00049 {
00050 const SequenceNumber::Value prev = seqRange.first.getValue() - 1,
00051 next = seqRange.second.getValue() + 1;
00052
00053 for (OPENDDS_LIST(FragRange)::iterator it = flist.begin(); it != flist.end(); ++it) {
00054 FragRange& fr = *it;
00055 if (next < fr.transport_seq_.first.getValue()) {
00056
00057 flist.insert(it, FragRange(seqRange, data));
00058 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00059 "inserted on left\n"));
00060 return true;
00061
00062 } else if (next == fr.transport_seq_.first.getValue()) {
00063
00064 DataSampleHeader joined;
00065 if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) {
00066 join_err("left");
00067 return false;
00068 }
00069 fr.rec_ds_.header_ = joined;
00070 if (fr.rec_ds_.sample_ && data.sample_) {
00071 ACE_Message_Block* last;
00072 for (last = data.sample_.get(); last->cont(); last = last->cont()) ;
00073 last->cont(fr.rec_ds_.sample_.release());
00074 fr.rec_ds_.sample_.reset(data.sample_.release());
00075 }
00076 data.sample_.reset();
00077 fr.transport_seq_.first = seqRange.first;
00078 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00079 "combined on left\n"));
00080 return true;
00081
00082 } else if (prev == fr.transport_seq_.second.getValue()) {
00083
00084 if (!fr.rec_ds_.sample_) {
00085 fr.rec_ds_.header_.more_fragments_ = true;
00086 }
00087 DataSampleHeader joined;
00088 if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) {
00089 join_err("right");
00090 return false;
00091 }
00092 fr.rec_ds_.header_ = joined;
00093 if (fr.rec_ds_.sample_ && data.sample_) {
00094 ACE_Message_Block* last;
00095 for (last = fr.rec_ds_.sample_.get(); last->cont(); last = last->cont()) ;
00096 last->cont(data.sample_.release());
00097 }
00098 else {
00099 fr.rec_ds_.sample_.reset();
00100 data.sample_.reset();
00101 }
00102
00103 fr.transport_seq_.second = seqRange.second;
00104 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00105 "combined on right\n"));
00106
00107
00108 if (++it != flist.end()) {
00109 if (next == it->transport_seq_.first.getValue()) {
00110 if (!fr.rec_ds_.sample_) {
00111 fr.rec_ds_.header_.more_fragments_ = true;
00112 }
00113 if (!DataSampleHeader::join(fr.rec_ds_.header_, it->rec_ds_.header_,
00114 joined)) {
00115 join_err("combined next");
00116 return false;
00117 }
00118 fr.rec_ds_.header_ = joined;
00119 if (!it->rec_ds_.sample_) {
00120 fr.rec_ds_.sample_.reset();
00121 } else {
00122 if (!fr.rec_ds_.sample_) {
00123 ACE_ERROR((LM_ERROR,
00124 ACE_TEXT("(%P|%t) ERROR: ")
00125 ACE_TEXT("OpenDDS::DCPS::TransportReassembly::insert, ")
00126 ACE_TEXT("Cannot dereference null pointer fr.rec_ds_.sample_\n")));
00127 return false;
00128 }
00129 ACE_Message_Block* last;
00130 for (last = fr.rec_ds_.sample_.get(); last->cont(); last = last->cont()) ;
00131 last->cont(it->rec_ds_.sample_.release());
00132 }
00133 fr.transport_seq_.second = it->transport_seq_.second;
00134 flist.erase(it);
00135 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00136 "coalesced on right\n"));
00137 }
00138 }
00139 return true;
00140 }
00141 }
00142
00143
00144 flist.push_back(FragRange(seqRange, data));
00145 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() "
00146 "inserted at end of list\n"));
00147 return true;
00148 }
00149
00150 bool
00151 TransportReassembly::has_frags(const SequenceNumber& seq,
00152 const RepoId& pub_id) const
00153 {
00154 return fragments_.count(FragKey(pub_id, seq));
00155 }
00156
00157 CORBA::ULong
00158 TransportReassembly::get_gaps(const SequenceNumber& seq, const RepoId& pub_id,
00159 CORBA::Long bitmap[], CORBA::ULong length,
00160 CORBA::ULong& numBits) const
00161 {
00162
00163
00164 const FragMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
00165 if (iter == fragments_.end() || length == 0) {
00166
00167 return 0;
00168 }
00169
00170
00171
00172
00173
00174 const OPENDDS_LIST(FragRange)& flist = iter->second;
00175 const SequenceNumber& first = flist.front().transport_seq_.first;
00176 const CORBA::ULong base = (first == 1)
00177 ? flist.front().transport_seq_.second.getLow() + 1
00178 : 1;
00179
00180 if (first != 1) {
00181
00182
00183 DisjointSequence::fill_bitmap_range(0, first.getLow() - 2,
00184 bitmap, length, numBits);
00185 } else if (flist.size() == 1) {
00186
00187 DisjointSequence::fill_bitmap_range(0, 0,
00188 bitmap, length, numBits);
00189
00190
00191
00192 return base;
00193 }
00194
00195 typedef OPENDDS_LIST(FragRange)::const_iterator list_iterator;
00196 for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00197 const list_iterator it_next = ++list_iterator(it);
00198 if (it_next == flist.end()) {
00199 break;
00200 }
00201 const CORBA::ULong low = it->transport_seq_.second.getLow() + 1 - base,
00202 high = it_next->transport_seq_.first.getLow() - 1 - base;
00203 DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits);
00204 }
00205
00206 return base;
00207 }
00208
00209 bool
00210 TransportReassembly::reassemble(const SequenceRange& seqRange,
00211 ReceivedDataSample& data)
00212 {
00213 return reassemble_i(seqRange, seqRange.first == 1, data);
00214 }
00215
00216 bool
00217 TransportReassembly::reassemble(const SequenceNumber& transportSeq,
00218 bool firstFrag,
00219 ReceivedDataSample& data)
00220 {
00221 return reassemble_i(SequenceRange(transportSeq, transportSeq),
00222 firstFrag, data);
00223 }
00224
00225 bool
00226 TransportReassembly::reassemble_i(const SequenceRange& seqRange,
00227 bool firstFrag,
00228 ReceivedDataSample& data)
00229 {
00230 if (Transport_debug_level > 5) {
00231 GuidConverter conv(data.header_.publication_id_);
00232 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() "
00233 "tseq %q-%q first %d dseq %q pub %C\n", seqRange.first.getValue(),
00234 seqRange.second.getValue(), firstFrag ? 1 : 0,
00235 data.header_.sequence_.getValue(), OPENDDS_STRING(conv).c_str()));
00236 }
00237
00238 const FragKey key(data.header_.publication_id_, data.header_.sequence_);
00239
00240 if (firstFrag) {
00241 have_first_.insert(key);
00242 }
00243
00244 FragMap::iterator iter = fragments_.find(key);
00245 if (iter == fragments_.end()) {
00246 fragments_[key].push_back(FragRange(seqRange, data));
00247
00248 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() "
00249 "stored first frag, returning false (incomplete)\n"));
00250 return false;
00251 }
00252
00253 if (!insert(iter->second, seqRange, data)) {
00254
00255 return false;
00256 }
00257
00258
00259
00260
00261
00262 if (have_first_.count(key)
00263 && iter->second.size() == 1
00264 && !iter->second.front().rec_ds_.header_.more_fragments_) {
00265 swap(data, iter->second.front().rec_ds_);
00266 fragments_.erase(iter);
00267 have_first_.erase(key);
00268 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() "
00269 "removed frag, returning %C\n", data.sample_ ? "true" : "false"));
00270 return data.sample_.get();
00271 }
00272
00273 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() "
00274 "returning false (incomplete)\n"));
00275 return false;
00276 }
00277
00278 void
00279 TransportReassembly::data_unavailable(const SequenceRange& dropped)
00280 {
00281 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::data_unavailable() "
00282 "dropped %q-%q\n", dropped.first.getValue(), dropped.second.getValue()));
00283 typedef OPENDDS_LIST(FragRange)::iterator list_iterator;
00284
00285 for (FragMap::iterator iter = fragments_.begin(); iter != fragments_.end();
00286 ++iter) {
00287 const FragKey& key = iter->first;
00288 OPENDDS_LIST(FragRange)& flist = iter->second;
00289
00290 ReceivedDataSample dummy(0);
00291 dummy.header_.sequence_ = key.data_sample_seq_;
00292
00293
00294 const SequenceNumber::Value prev =
00295 flist.front().transport_seq_.first.getValue() - 1;
00296 if (dropped.second.getValue() == prev && !have_first_.count(key)) {
00297 have_first_.insert(key);
00298 dummy.header_.more_fragments_ = true;
00299 insert(flist, dropped, dummy);
00300 continue;
00301 }
00302
00303
00304 for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00305 list_iterator it_next = it;
00306 ++it_next;
00307 if (it_next == flist.end()) {
00308 break;
00309 }
00310 FragRange& fr1 = *it;
00311 FragRange& fr2 = *it_next;
00312 if (dropped.first > fr1.transport_seq_.second
00313 && dropped.second < fr2.transport_seq_.first) {
00314 dummy.header_.more_fragments_ = true;
00315 insert(flist, dropped, dummy);
00316 break;
00317 }
00318 }
00319
00320
00321 const SequenceNumber next =
00322 ++SequenceNumber(flist.back().transport_seq_.second);
00323 if (dropped.first == next) {
00324 flist.back().rec_ds_.header_.more_fragments_ = true;
00325 insert(flist, dropped, dummy);
00326 }
00327 }
00328 }
00329
00330 void
00331 TransportReassembly::data_unavailable(const SequenceNumber& dataSampleSeq,
00332 const RepoId& pub_id)
00333 {
00334 const FragKey key(pub_id, dataSampleSeq);
00335 fragments_.erase(key);
00336 have_first_.erase(key);
00337 }
00338
00339 }
00340 }
00341
00342 OPENDDS_END_VERSIONED_NAMESPACE_DECL