ShmemReceiveStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "ShmemReceiveStrategy.h"
00009 #include "ShmemDataLink.h"
00010 
00011 #include "dds/DCPS/transport/framework/TransportHeader.h"
00012 
00013 #include <cstring>
00014 
00015 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 namespace OpenDDS {
00018 namespace DCPS {
00019 
00020 ShmemReceiveStrategy::ShmemReceiveStrategy(ShmemDataLink* link)
00021   : link_(link)
00022   , current_data_(0)
00023   , partial_recv_remaining_(0)
00024   , partial_recv_ptr_(0)
00025 {
00026 }
00027 
00028 void
00029 ShmemReceiveStrategy::read()
00030 {
00031   if (partial_recv_remaining_) {
00032     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00033           "resuming partial recv\n", link_));
00034     handle_dds_input(ACE_INVALID_HANDLE);
00035     return;
00036   }
00037 
00038   if (bound_name_.empty()) {
00039     bound_name_ = "Write-" + link_->local_address();
00040   }
00041 
00042   ShmemAllocator* alloc = link_->peer_allocator();
00043   void* mem = 0;
00044   if (-1 == alloc->find(bound_name_.c_str(), mem)) {
00045     VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00046               "peer allocator not found, receive_bytes will close link\n",
00047               link_), 1);
00048     handle_dds_input(ACE_INVALID_HANDLE); // will return 0 to the TRecvStrateg.
00049     return;
00050   }
00051 
00052   if (!current_data_) {
00053     current_data_ = reinterpret_cast<ShmemData*>(mem);
00054   }
00055 
00056   for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_FREE ||
00057          current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00058     if (!start) {
00059       start = current_data_;
00060     } else if (start == current_data_) {
00061       return; // none found => don't call handle_dds_input()
00062     }
00063     if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00064       current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
00065     }
00066   }
00067 
00068   VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00069         "reading at control block #%d\n",
00070         link_, current_data_ - reinterpret_cast<ShmemData*>(mem)));
00071   // If we get this far, current_data_ points to the first SHMEM_DATA_IN_USE.
00072   // handle_dds_input() will call our receive_bytes() to get the data.
00073   handle_dds_input(ACE_INVALID_HANDLE);
00074 }
00075 
00076 ssize_t
00077 ShmemReceiveStrategy::receive_bytes(iovec iov[],
00078                                     int n,
00079                                     ACE_INET_Addr& /*remote_address*/,
00080                                     ACE_HANDLE /*fd*/)
00081 {
00082   VDBG((LM_DEBUG,
00083         "(%P|%t) ShmemReceiveStrategy::receive_bytes link %@\n", link_));
00084 
00085   // check that the writer's shared memory is still available
00086   ShmemAllocator* alloc = link_->peer_allocator();
00087   void* mem;
00088   if (-1 == alloc->find(bound_name_.c_str(), mem)
00089       || current_data_->status_ != SHMEM_DATA_IN_USE) {
00090     VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::receive_bytes closing\n"),
00091              1);
00092     gracefully_disconnected_ = true; // do not attempt reconnect via relink()
00093     return 0; // close "connection"
00094   }
00095 
00096   ssize_t total = 0;
00097 
00098   const char* src_iter;
00099   char* dst_iter = 0;
00100   int i = 0; // current iovec index in iov[]
00101   size_t remaining;
00102 
00103   if (partial_recv_remaining_) {
00104     remaining = partial_recv_remaining_;
00105     src_iter = partial_recv_ptr_;
00106     // iov_base is void* on POSIX but char* on Win32, we'll have to cast:
00107     dst_iter = (char*)iov[0].iov_base;
00108 
00109   } else {
00110     remaining = TransportHeader::get_length(current_data_->transport_header_);
00111     const size_t hdr_sz = sizeof(current_data_->transport_header_);
00112     // BUFFER_LOW_WATER in the framework ensures a large enough buffer
00113     if (static_cast<size_t>(iov[0].iov_len) <= hdr_sz) {
00114       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00115                 "receive buffer of length %d is too small\n",
00116                 iov[0].iov_len), 0);
00117       errno = ENOBUFS;
00118       return -1;
00119     }
00120 
00121     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00122           "header %@ payload %@ len %B\n", current_data_->transport_header_,
00123           (char*)current_data_->payload_, remaining));
00124     std::memcpy(iov[0].iov_base, current_data_->transport_header_, hdr_sz);
00125     total += hdr_sz;
00126     src_iter = current_data_->payload_;
00127     if (static_cast<size_t>(iov[0].iov_len) > hdr_sz) {
00128       dst_iter = (char*)iov[0].iov_base + hdr_sz;
00129     } else if (n > 1) {
00130       dst_iter = (char*)iov[1].iov_base;
00131       i = 1;
00132     }
00133   }
00134 
00135   for (; i < n && remaining; ++i) {
00136     const size_t space = (i == 0) ? iov[i].iov_len - total : iov[i].iov_len,
00137       chunk = std::min(space, remaining);
00138 
00139 #ifdef ACE_WIN32
00140     if (alloc->memory_pool().remap((void*)(src_iter + chunk - 1)) == -1) {
00141       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00142                 "shared memory pool couldn't be extended\n"), 0);
00143       errno = ENOMEM;
00144       return -1;
00145     }
00146 #endif
00147 
00148     std::memcpy(dst_iter, src_iter, chunk);
00149     if (i < n - 1) {
00150       dst_iter = (char*)iov[i + 1].iov_base;
00151     }
00152     remaining -= chunk;
00153     total += chunk;
00154     src_iter += chunk;
00155   }
00156 
00157   if (remaining) {
00158     partial_recv_remaining_ = remaining;
00159     partial_recv_ptr_ = src_iter;
00160     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00161           "receive was partial\n"));
00162     link_->signal_semaphore();
00163 
00164   } else {
00165     partial_recv_remaining_ = 0;
00166     partial_recv_ptr_ = 0;
00167     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00168           "receive done\n"));
00169     current_data_->status_ = SHMEM_DATA_RECV_DONE;
00170   }
00171 
00172   return total;
00173 }
00174 
00175 void
00176 ShmemReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00177                                      const ACE_INET_Addr& /*remote_address*/)
00178 {
00179   switch (sample.header_.message_id_) {
00180 
00181   case TRANSPORT_CONTROL:
00182     link_->control_received(sample);
00183     break;
00184 
00185   default:
00186     link_->data_received(sample);
00187   }
00188 }
00189 
00190 int
00191 ShmemReceiveStrategy::start_i()
00192 {
00193   return 0;
00194 }
00195 
00196 void
00197 ShmemReceiveStrategy::stop_i()
00198 {
00199 }
00200 
00201 } // namespace DCPS
00202 } // namespace OpenDDS
00203 
00204 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1