ShmemReceiveStrategy.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ShmemReceiveStrategy.h"
00009 #include "ShmemDataLink.h"
00010
00011 #include "dds/DCPS/transport/framework/TransportHeader.h"
00012
00013 #include <cstring>
00014
00015 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00016
00017 namespace OpenDDS {
00018 namespace DCPS {
00019
00020 ShmemReceiveStrategy::ShmemReceiveStrategy(ShmemDataLink* link)
00021 : link_(link)
00022 , current_data_(0)
00023 , partial_recv_remaining_(0)
00024 , partial_recv_ptr_(0)
00025 {
00026 }
00027
00028 void
00029 ShmemReceiveStrategy::read()
00030 {
00031 if (partial_recv_remaining_) {
00032 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00033 "resuming partial recv\n", link_));
00034 handle_dds_input(ACE_INVALID_HANDLE);
00035 return;
00036 }
00037
00038 if (bound_name_.empty()) {
00039 bound_name_ = "Write-" + link_->local_address();
00040 }
00041
00042 ShmemAllocator* alloc = link_->peer_allocator();
00043 void* mem = 0;
00044 if (-1 == alloc->find(bound_name_.c_str(), mem)) {
00045 VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00046 "peer allocator not found, receive_bytes will close link\n",
00047 link_), 1);
00048 handle_dds_input(ACE_INVALID_HANDLE);
00049 return;
00050 }
00051
00052 if (!current_data_) {
00053 current_data_ = reinterpret_cast<ShmemData*>(mem);
00054 }
00055
00056 for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_FREE ||
00057 current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00058 if (!start) {
00059 start = current_data_;
00060 } else if (start == current_data_) {
00061 return;
00062 }
00063 if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00064 current_data_ = reinterpret_cast<ShmemData*>(mem) - 1;
00065 }
00066 }
00067
00068 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00069 "reading at control block #%d\n",
00070 link_, current_data_ - reinterpret_cast<ShmemData*>(mem)));
00071
00072
00073 handle_dds_input(ACE_INVALID_HANDLE);
00074 }
00075
00076 ssize_t
00077 ShmemReceiveStrategy::receive_bytes(iovec iov[],
00078 int n,
00079 ACE_INET_Addr& ,
00080 ACE_HANDLE )
00081 {
00082 VDBG((LM_DEBUG,
00083 "(%P|%t) ShmemReceiveStrategy::receive_bytes link %@\n", link_));
00084
00085
00086 ShmemAllocator* alloc = link_->peer_allocator();
00087 void* mem;
00088 if (-1 == alloc->find(bound_name_.c_str(), mem)
00089 || current_data_->status_ != SHMEM_DATA_IN_USE) {
00090 VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::receive_bytes closing\n"),
00091 1);
00092 gracefully_disconnected_ = true;
00093 return 0;
00094 }
00095
00096 ssize_t total = 0;
00097
00098 const char* src_iter;
00099 char* dst_iter = 0;
00100 int i = 0;
00101 size_t remaining;
00102
00103 if (partial_recv_remaining_) {
00104 remaining = partial_recv_remaining_;
00105 src_iter = partial_recv_ptr_;
00106
00107 dst_iter = (char*)iov[0].iov_base;
00108
00109 } else {
00110 remaining = TransportHeader::get_length(current_data_->transport_header_);
00111 const size_t hdr_sz = sizeof(current_data_->transport_header_);
00112
00113 if (static_cast<size_t>(iov[0].iov_len) <= hdr_sz) {
00114 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00115 "receive buffer of length %d is too small\n",
00116 iov[0].iov_len), 0);
00117 errno = ENOBUFS;
00118 return -1;
00119 }
00120
00121 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00122 "header %@ payload %@ len %B\n", current_data_->transport_header_,
00123 (char*)current_data_->payload_, remaining));
00124 std::memcpy(iov[0].iov_base, current_data_->transport_header_, hdr_sz);
00125 total += hdr_sz;
00126 src_iter = current_data_->payload_;
00127 if (static_cast<size_t>(iov[0].iov_len) > hdr_sz) {
00128 dst_iter = (char*)iov[0].iov_base + hdr_sz;
00129 } else if (n > 1) {
00130 dst_iter = (char*)iov[1].iov_base;
00131 i = 1;
00132 }
00133 }
00134
00135 for (; i < n && remaining; ++i) {
00136 const size_t space = (i == 0) ? iov[i].iov_len - total : iov[i].iov_len,
00137 chunk = std::min(space, remaining);
00138
00139 #ifdef ACE_WIN32
00140 if (alloc->memory_pool().remap((void*)(src_iter + chunk - 1)) == -1) {
00141 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00142 "shared memory pool couldn't be extended\n"), 0);
00143 errno = ENOMEM;
00144 return -1;
00145 }
00146 #endif
00147
00148 std::memcpy(dst_iter, src_iter, chunk);
00149 if (i < n - 1) {
00150 dst_iter = (char*)iov[i + 1].iov_base;
00151 }
00152 remaining -= chunk;
00153 total += chunk;
00154 src_iter += chunk;
00155 }
00156
00157 if (remaining) {
00158 partial_recv_remaining_ = remaining;
00159 partial_recv_ptr_ = src_iter;
00160 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00161 "receive was partial\n"));
00162 link_->signal_semaphore();
00163
00164 } else {
00165 partial_recv_remaining_ = 0;
00166 partial_recv_ptr_ = 0;
00167 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00168 "receive done\n"));
00169 current_data_->status_ = SHMEM_DATA_RECV_DONE;
00170 }
00171
00172 return total;
00173 }
00174
00175 void
00176 ShmemReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00177 const ACE_INET_Addr& )
00178 {
00179 switch (sample.header_.message_id_) {
00180
00181 case TRANSPORT_CONTROL:
00182 link_->control_received(sample);
00183 break;
00184
00185 default:
00186 link_->data_received(sample);
00187 }
00188 }
00189
00190 int
00191 ShmemReceiveStrategy::start_i()
00192 {
00193 return 0;
00194 }
00195
00196 void
00197 ShmemReceiveStrategy::stop_i()
00198 {
00199 }
00200
00201 }
00202 }
00203
00204 OPENDDS_END_VERSIONED_NAMESPACE_DECL