00001
00002
00003
00004
00005
00006
00007
00008 #include "ShmemReceiveStrategy.h"
00009 #include "ShmemDataLink.h"
00010
00011 #include "dds/DCPS/transport/framework/TransportHeader.h"
00012
00013 #include <cstring>
00014
00015 namespace OpenDDS {
00016 namespace DCPS {
00017
00018 ShmemReceiveStrategy::ShmemReceiveStrategy(ShmemDataLink* link)
00019 : link_(link)
00020 , current_data_(0)
00021 , partial_recv_remaining_(0)
00022 , partial_recv_ptr_(0)
00023 {
00024 }
00025
00026 void
00027 ShmemReceiveStrategy::read()
00028 {
00029 if (partial_recv_remaining_) {
00030 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00031 "resuming partial recv\n", link_));
00032 handle_dds_input(ACE_INVALID_HANDLE);
00033 return;
00034 }
00035
00036 if (bound_name_.empty()) {
00037 bound_name_ = "Write-" + link_->local_address();
00038 }
00039
00040 ShmemAllocator* alloc = link_->peer_allocator();
00041 void* mem = 0;
00042 if (-1 == alloc->find(bound_name_.c_str(), mem)) {
00043 VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00044 "peer allocator not found, receive_bytes will close link\n",
00045 link_), 1);
00046 handle_dds_input(ACE_INVALID_HANDLE);
00047 return;
00048 }
00049
00050 if (!current_data_) {
00051 current_data_ = reinterpret_cast<ShmemData*>(mem);
00052 }
00053
00054 for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_FREE ||
00055 current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00056 if (!start) {
00057 start = current_data_;
00058 } else if (start == current_data_) {
00059 return;
00060 }
00061 if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00062 current_data_ = reinterpret_cast<ShmemData*>(mem) - 1;
00063 }
00064 }
00065
00066 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00067 "reading at control block #%d\n",
00068 link_, current_data_ - reinterpret_cast<ShmemData*>(mem)));
00069
00070
00071 handle_dds_input(ACE_INVALID_HANDLE);
00072 }
00073
00074 ssize_t
00075 ShmemReceiveStrategy::receive_bytes(iovec iov[],
00076 int n,
00077 ACE_INET_Addr& ,
00078 ACE_HANDLE )
00079 {
00080 VDBG((LM_DEBUG,
00081 "(%P|%t) ShmemReceiveStrategy::receive_bytes link %@\n", link_));
00082
00083
00084 ShmemAllocator* alloc = link_->peer_allocator();
00085 void* mem;
00086 if (-1 == alloc->find(bound_name_.c_str(), mem)
00087 || current_data_->status_ != SHMEM_DATA_IN_USE) {
00088 VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::receive_bytes closing\n"),
00089 1);
00090 gracefully_disconnected_ = true;
00091 return 0;
00092 }
00093
00094 ssize_t total = 0;
00095
00096 const char* src_iter;
00097 char* dst_iter = 0;
00098 int i = 0;
00099 size_t remaining;
00100
00101 if (partial_recv_remaining_) {
00102 remaining = partial_recv_remaining_;
00103 src_iter = partial_recv_ptr_;
00104
00105 dst_iter = (char*)iov[0].iov_base;
00106
00107 } else {
00108 remaining = TransportHeader::get_length(current_data_->transport_header_);
00109 const size_t hdr_sz = sizeof(current_data_->transport_header_);
00110
00111 if (static_cast<size_t>(iov[0].iov_len) <= hdr_sz) {
00112 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00113 "receive buffer of length %d is too small\n",
00114 iov[0].iov_len), 0);
00115 errno = ENOBUFS;
00116 return -1;
00117 }
00118
00119 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00120 "header %@ payload %@ len %B\n", current_data_->transport_header_,
00121 (char*)current_data_->payload_, remaining));
00122 std::memcpy(iov[0].iov_base, current_data_->transport_header_, hdr_sz);
00123 total += hdr_sz;
00124 src_iter = current_data_->payload_;
00125 if (static_cast<size_t>(iov[0].iov_len) > hdr_sz) {
00126 dst_iter = (char*)iov[0].iov_base + hdr_sz;
00127 } else if (n > 1) {
00128 dst_iter = (char*)iov[1].iov_base;
00129 i = 1;
00130 }
00131 }
00132
00133 for (; i < n && remaining; ++i) {
00134 const size_t space = (i == 0) ? iov[i].iov_len - total : iov[i].iov_len,
00135 chunk = std::min(space, remaining);
00136
00137 #ifdef ACE_WIN32
00138 if (alloc->memory_pool().remap((void*)(src_iter + chunk - 1)) == -1) {
00139 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00140 "shared memory pool couldn't be extended\n"), 0);
00141 errno = ENOMEM;
00142 return -1;
00143 }
00144 #endif
00145
00146 std::memcpy(dst_iter, src_iter, chunk);
00147 if (i < n - 1) {
00148 dst_iter = (char*)iov[i + 1].iov_base;
00149 }
00150 remaining -= chunk;
00151 total += chunk;
00152 src_iter += chunk;
00153 }
00154
00155 if (remaining) {
00156 partial_recv_remaining_ = remaining;
00157 partial_recv_ptr_ = src_iter;
00158 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00159 "receive was partial\n"));
00160 link_->signal_semaphore();
00161
00162 } else {
00163 partial_recv_remaining_ = 0;
00164 partial_recv_ptr_ = 0;
00165 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00166 "receive done\n"));
00167 current_data_->status_ = SHMEM_DATA_RECV_DONE;
00168 }
00169
00170 return total;
00171 }
00172
00173 void
00174 ShmemReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00175 const ACE_INET_Addr& )
00176 {
00177 switch (sample.header_.message_id_) {
00178
00179 case TRANSPORT_CONTROL:
00180 link_->control_received(sample);
00181 break;
00182
00183 default:
00184 link_->data_received(sample);
00185 }
00186 }
00187
00188 int
00189 ShmemReceiveStrategy::start_i()
00190 {
00191 return 0;
00192 }
00193
00194 void
00195 ShmemReceiveStrategy::stop_i()
00196 {
00197 }
00198
00199 }
00200 }