Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : 10 : #include "ReceivedDataSample.h" 11 : 12 : #include <algorithm> 13 : 14 : #include <cstring> 15 : 16 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 17 : namespace OpenDDS { 18 : namespace DCPS { 19 : 20 638 : ReceivedDataSample::ReceivedDataSample() 21 638 : : fragment_size_(0) 22 : { 23 638 : } 24 : 25 698 : ReceivedDataSample::ReceivedDataSample(const ACE_Message_Block& payload) 26 698 : : fragment_size_(0) 27 : { 28 698 : const ACE_Message_Block* amb = &payload; 29 : do { 30 700 : blocks_.push_back(MessageBlock(*amb)); 31 700 : amb = amb->cont(); 32 700 : } while (amb); 33 698 : } 34 : 35 752 : size_t ReceivedDataSample::data_length() const 36 : { 37 752 : size_t len = 0; 38 2028 : for (size_t i = 0; i < blocks_.size(); ++i) { 39 1276 : len += blocks_[i].len(); 40 : } 41 752 : return len; 42 : } 43 : 44 : namespace { 45 69 : ACE_Message_Block* make_mb(ACE_Allocator* mb_alloc) 46 : { 47 69 : if (!mb_alloc) { 48 68 : mb_alloc = ACE_Allocator::instance(); 49 : } 50 69 : ACE_Message_Block* mb = 0; 51 69 : ACE_NEW_MALLOC_RETURN(mb, 52 : (ACE_Message_Block*) mb_alloc->malloc(sizeof(ACE_Message_Block)), 53 : ACE_Message_Block(mb_alloc), 54 : 0); 55 69 : return mb; 56 : } 57 : } 58 : 59 26 : ACE_Message_Block* ReceivedDataSample::data(ACE_Allocator* mb_alloc) const 60 : { 61 26 : ACE_Message_Block* first = 0; 62 26 : ACE_Message_Block* last = 0; 63 95 : for (size_t i = 0; i < blocks_.size(); ++i) { 64 69 : const MessageBlock& element = blocks_[i]; 65 69 : ACE_Message_Block* const mb = make_mb(mb_alloc); 66 69 : mb->data_block(element.duplicate_data()); 67 69 : mb->rd_ptr(element.rd_ptr()); 68 69 : mb->wr_ptr(element.wr_ptr()); 69 69 : if (first) { 70 43 : last->cont(mb); 71 : } else { 72 26 : first = mb; 73 : } 74 69 : last = mb; 75 : } 76 26 : return first; 77 : } 78 : 79 1 : bool ReceivedDataSample::write_data(Serializer& ser) const 80 : { 81 3 : for (size_t i = 0; i < blocks_.size(); ++i) { 82 2 : const MessageBlock& element = blocks_[i]; 83 2 : const unsigned int len = static_cast<unsigned int>(element.len()); 84 2 : const char* const data = element.rd_ptr(); 85 2 : if (!ser.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(data), len)) { 86 0 : return false; 87 : } 88 : } 89 1 : return true; 90 : } 91 : 92 721 : DDS::OctetSeq ReceivedDataSample::copy_data() const 93 : { 94 721 : DDS::OctetSeq dst; 95 721 : dst.length(static_cast<unsigned int>(data_length())); 96 721 : unsigned char* out_iter = dst.get_buffer(); 97 1923 : for (size_t i = 0; i < blocks_.size(); ++i) { 98 1202 : const MessageBlock& element = blocks_[i]; 99 1202 : const unsigned int len = static_cast<unsigned int>(element.len()); 100 1202 : const char* const data = element.rd_ptr(); 101 1202 : std::memcpy(out_iter, data, len); 102 1202 : out_iter += len; 103 : } 104 721 : return dst; 105 0 : } 106 : 107 2 : unsigned char ReceivedDataSample::peek(size_t offset) const 108 : { 109 2 : size_t remain = offset; 110 3 : for (size_t i = 0; i < blocks_.size(); ++i) { 111 3 : const MessageBlock& element = blocks_[i]; 112 3 : const size_t len = element.len(); 113 3 : if (remain < len) { 114 2 : return element.rd_ptr()[remain]; 115 : } 116 1 : remain -= len; 117 : } 118 0 : return 0; 119 : } 120 : 121 : #ifdef ACE_HAS_CPP11 122 : #define OPENDDS_MOVE_OR_COPY std::move 123 : #else 124 : #define OPENDDS_MOVE_OR_COPY std::copy 125 : #endif 126 : 127 262 : void ReceivedDataSample::prepend(ReceivedDataSample& prefix) 128 : { 129 524 : OPENDDS_MOVE_OR_COPY(prefix.blocks_.begin(), prefix.blocks_.end(), 130 262 : std::inserter(blocks_, blocks_.begin())); 131 262 : prefix.clear(); 132 262 : } 133 : 134 270 : void ReceivedDataSample::append(ReceivedDataSample& suffix) 135 : { 136 270 : OPENDDS_MOVE_OR_COPY(suffix.blocks_.begin(), suffix.blocks_.end(), 137 270 : std::back_inserter(blocks_)); 138 270 : suffix.clear(); 139 270 : } 140 : 141 1 : void ReceivedDataSample::append(const char* data, size_t size) 142 : { 143 1 : blocks_.push_back(MessageBlock(size)); 144 1 : std::memcpy(blocks_.back().base(), data, size); 145 1 : blocks_.back().write(size); 146 1 : } 147 : 148 1 : void ReceivedDataSample::replace(const char* data, size_t size) 149 : { 150 1 : clear(); 151 1 : blocks_.push_back(MessageBlock(data, size)); 152 1 : } 153 : 154 : ReceivedDataSample 155 9 : ReceivedDataSample::get_fragment_range(FragmentNumber start_frag, FragmentNumber end_frag) 156 : { 157 9 : ReceivedDataSample result; 158 9 : result.header_ = header_; 159 : 160 9 : const size_t fsize = static_cast<size_t>(fragment_size_); 161 9 : const size_t start_offset = static_cast<size_t>(start_frag) * fsize; 162 9 : const size_t end_offset = end_frag == INVALID_FRAGMENT ? std::numeric_limits<size_t>::max() : static_cast<size_t>(end_frag + 1) * fsize - 1; 163 : 164 9 : size_t current_offset = 0; 165 : 166 9 : bool default_push = false; 167 : 168 24 : for (OPENDDS_VECTOR(MessageBlock)::iterator it = blocks_.begin(); current_offset < end_offset && it != blocks_.end(); ++it) { 169 15 : const size_t len = it->len(); 170 15 : if (default_push) { 171 5 : result.blocks_.push_back(*it); 172 5 : if (end_offset < current_offset + len) { 173 1 : result.blocks_.back().write(end_offset - current_offset - len); // this should be negative 174 : } 175 10 : } else if (start_offset < current_offset + len) { 176 9 : default_push = true; 177 9 : result.blocks_.push_back(*it); 178 9 : if (current_offset < start_offset) { 179 4 : result.blocks_.back().read(start_offset - current_offset); 180 : } 181 : } 182 15 : current_offset += len; 183 : } 184 : 185 9 : return result; 186 0 : } 187 : 188 : } 189 : } 190 : OPENDDS_END_VERSIONED_NAMESPACE_DECL