ShmemReceiveStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "ShmemReceiveStrategy.h"
00009 #include "ShmemDataLink.h"
00010 
00011 #include "dds/DCPS/transport/framework/TransportHeader.h"
00012 
00013 #include <cstring>
00014 
00015 namespace OpenDDS {
00016 namespace DCPS {
00017 
00018 ShmemReceiveStrategy::ShmemReceiveStrategy(ShmemDataLink* link)
00019   : link_(link)
00020   , current_data_(0)
00021   , partial_recv_remaining_(0)
00022   , partial_recv_ptr_(0)
00023 {
00024 }
00025 
00026 void
00027 ShmemReceiveStrategy::read()
00028 {
00029   if (partial_recv_remaining_) {
00030     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00031           "resuming partial recv\n", link_));
00032     handle_dds_input(ACE_INVALID_HANDLE);
00033     return;
00034   }
00035 
00036   if (bound_name_.empty()) {
00037     bound_name_ = "Write-" + link_->local_address();
00038   }
00039 
00040   ShmemAllocator* alloc = link_->peer_allocator();
00041   void* mem = 0;
00042   if (-1 == alloc->find(bound_name_.c_str(), mem)) {
00043     VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00044               "peer allocator not found, receive_bytes will close link\n",
00045               link_), 1);
00046     handle_dds_input(ACE_INVALID_HANDLE); // will return 0 to the TRecvStrateg.
00047     return;
00048   }
00049 
00050   if (!current_data_) {
00051     current_data_ = reinterpret_cast<ShmemData*>(mem);
00052   }
00053 
00054   for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_FREE ||
00055          current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00056     if (!start) {
00057       start = current_data_;
00058     } else if (start == current_data_) {
00059       return; // none found => don't call handle_dds_input()
00060     }
00061     if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00062       current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
00063     }
00064   }
00065 
00066   VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00067         "reading at control block #%d\n",
00068         link_, current_data_ - reinterpret_cast<ShmemData*>(mem)));
00069   // If we get this far, current_data_ points to the first SHMEM_DATA_IN_USE.
00070   // handle_dds_input() will call our receive_bytes() to get the data.
00071   handle_dds_input(ACE_INVALID_HANDLE);
00072 }
00073 
00074 ssize_t
00075 ShmemReceiveStrategy::receive_bytes(iovec iov[],
00076                                     int n,
00077                                     ACE_INET_Addr& /*remote_address*/,
00078                                     ACE_HANDLE /*fd*/)
00079 {
00080   VDBG((LM_DEBUG,
00081         "(%P|%t) ShmemReceiveStrategy::receive_bytes link %@\n", link_));
00082 
00083   // check that the writer's shared memory is still available
00084   ShmemAllocator* alloc = link_->peer_allocator();
00085   void* mem;
00086   if (-1 == alloc->find(bound_name_.c_str(), mem)
00087       || current_data_->status_ != SHMEM_DATA_IN_USE) {
00088     VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::receive_bytes closing\n"),
00089              1);
00090     gracefully_disconnected_ = true; // do not attempt reconnect via relink()
00091     return 0; // close "connection"
00092   }
00093 
00094   ssize_t total = 0;
00095 
00096   const char* src_iter;
00097   char* dst_iter = 0;
00098   int i = 0; // current iovec index in iov[]
00099   size_t remaining;
00100 
00101   if (partial_recv_remaining_) {
00102     remaining = partial_recv_remaining_;
00103     src_iter = partial_recv_ptr_;
00104     // iov_base is void* on POSIX but char* on Win32, we'll have to cast:
00105     dst_iter = (char*)iov[0].iov_base;
00106 
00107   } else {
00108     remaining = TransportHeader::get_length(current_data_->transport_header_);
00109     const size_t hdr_sz = sizeof(current_data_->transport_header_);
00110     // BUFFER_LOW_WATER in the framework ensures a large enough buffer
00111     if (static_cast<size_t>(iov[0].iov_len) <= hdr_sz) {
00112       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00113                 "receive buffer of length %d is too small\n",
00114                 iov[0].iov_len), 0);
00115       errno = ENOBUFS;
00116       return -1;
00117     }
00118 
00119     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00120           "header %@ payload %@ len %B\n", current_data_->transport_header_,
00121           (char*)current_data_->payload_, remaining));
00122     std::memcpy(iov[0].iov_base, current_data_->transport_header_, hdr_sz);
00123     total += hdr_sz;
00124     src_iter = current_data_->payload_;
00125     if (static_cast<size_t>(iov[0].iov_len) > hdr_sz) {
00126       dst_iter = (char*)iov[0].iov_base + hdr_sz;
00127     } else if (n > 1) {
00128       dst_iter = (char*)iov[1].iov_base;
00129       i = 1;
00130     }
00131   }
00132 
00133   for (; i < n && remaining; ++i) {
00134     const size_t space = (i == 0) ? iov[i].iov_len - total : iov[i].iov_len,
00135       chunk = std::min(space, remaining);
00136 
00137 #ifdef ACE_WIN32
00138     if (alloc->memory_pool().remap((void*)(src_iter + chunk - 1)) == -1) {
00139       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00140                 "shared memory pool couldn't be extended\n"), 0);
00141       errno = ENOMEM;
00142       return -1;
00143     }
00144 #endif
00145 
00146     std::memcpy(dst_iter, src_iter, chunk);
00147     if (i < n - 1) {
00148       dst_iter = (char*)iov[i + 1].iov_base;
00149     }
00150     remaining -= chunk;
00151     total += chunk;
00152     src_iter += chunk;
00153   }
00154 
00155   if (remaining) {
00156     partial_recv_remaining_ = remaining;
00157     partial_recv_ptr_ = src_iter;
00158     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00159           "receive was partial\n"));
00160     link_->signal_semaphore();
00161 
00162   } else {
00163     partial_recv_remaining_ = 0;
00164     partial_recv_ptr_ = 0;
00165     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00166           "receive done\n"));
00167     current_data_->status_ = SHMEM_DATA_RECV_DONE;
00168   }
00169 
00170   return total;
00171 }
00172 
00173 void
00174 ShmemReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00175                                      const ACE_INET_Addr& /*remote_address*/)
00176 {
00177   switch (sample.header_.message_id_) {
00178 
00179   case TRANSPORT_CONTROL:
00180     link_->control_received(sample);
00181     break;
00182 
00183   default:
00184     link_->data_received(sample);
00185   }
00186 }
00187 
00188 int
00189 ShmemReceiveStrategy::start_i()
00190 {
00191   return 0;
00192 }
00193 
00194 void
00195 ShmemReceiveStrategy::stop_i()
00196 {
00197 }
00198 
00199 } // namespace DCPS
00200 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7