ShmemSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "ShmemSendStrategy.h"
00009 #include "ShmemDataLink.h"
00010 #include "ShmemInst.h"
00011 
00012 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00013 
00014 #include <cstring>
00015 
00016 namespace OpenDDS {
00017 namespace DCPS {
00018 
00019 ShmemSendStrategy::ShmemSendStrategy(ShmemDataLink* link)
00020   : TransportSendStrategy(0, TransportInst_rch(link->config(), false),
00021                           0,  // synch_resource
00022                           link->transport_priority(),
00023                           new NullSynchStrategy)
00024   , link_(link)
00025   , current_data_(0)
00026 {
00027 #ifdef ACE_HAS_POSIX_SEM
00028   peer_semaphore_.name_ = 0;
00029   peer_semaphore_.sema_ = 0;
00030 #endif
00031 }
00032 
00033 bool
00034 ShmemSendStrategy::start_i()
00035 {
00036   bound_name_ = "Write-" + link_->peer_address();
00037   ShmemAllocator* alloc = link_->local_allocator();
00038 
00039   const size_t n_bytes = link_->config()->datalink_control_size_,
00040     n_elems = n_bytes / sizeof(ShmemData),
00041     extra = n_bytes % sizeof(ShmemData);
00042 
00043   void* mem = alloc->calloc(n_bytes);
00044   if (mem == 0) {
00045     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00046               "to allocate %B bytes for control\n", link_, n_bytes), 0);
00047     return false;
00048   }
00049 
00050   ShmemData* data = reinterpret_cast<ShmemData*>(mem);
00051   data[(extra >= sizeof(int)) ? n_elems : (n_elems - 1)].status_ =
00052     SHMEM_DATA_END_OF_ALLOC;
00053   alloc->bind(bound_name_.c_str(), mem);
00054 
00055   ShmemAllocator* peer = link_->peer_allocator();
00056   peer->find("Semaphore", mem);
00057   ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00058 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00059   HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false /*bInheritHandle*/,
00060                                  link_->peer_pid());
00061   ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_,
00062                     0 /*dwDesiredAccess -- ignored*/,
00063                     false /*bInheritHandle*/,
00064                     DUPLICATE_SAME_ACCESS /*dwOptions*/);
00065   ::CloseHandle(srcProc);
00066 #elif defined ACE_HAS_POSIX_SEM
00067   peer_semaphore_.sema_ = sem;
00068   peer_semaphore_.name_ = 0;
00069 #else
00070   ACE_UNUSED_ARG(sem);
00071 #endif
00072   return true;
00073 }
00074 
00075 ssize_t
00076 ShmemSendStrategy::send_bytes_i(const iovec iov[], int n)
00077 {
00078   const size_t hdr_sz = sizeof(current_data_->transport_header_);
00079   if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) {
00080     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00081               "expecting iov[0] of size %B, got %B\n",
00082               link_, hdr_sz, iov[0].iov_len), 0);
00083     return -1;
00084   }
00085   // see TransportHeader::valid(), but this is for the marshaled form
00086   if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base,
00087                   sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) {
00088     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00089               "expecting iov[0] to contain the transport header\n", link_), 0);
00090     return -1;
00091   }
00092 
00093   //FUTURE: use the ShmemTransport object to see if we already have the
00094   //        same payload data available in the pool (from other DataLinks),
00095   //        and if so, add a refcount to the start of the "from_pool" allocation
00096 
00097   size_t pool_alloc_size = 0;
00098   for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
00099     pool_alloc_size += iov[i].iov_len;
00100   }
00101 
00102   ShmemAllocator* alloc = link_->local_allocator();
00103   void* from_pool = alloc->malloc(pool_alloc_size);
00104   if (from_pool == 0) {
00105     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00106               "to allocate %B bytes for data\n", link_, pool_alloc_size), 0);
00107     errno = ENOMEM;
00108     return -1;
00109   }
00110 
00111   char* payload = reinterpret_cast<char*>(from_pool);
00112   char* iter = payload;
00113   for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
00114     std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00115     iter += iov[i].iov_len;
00116   }
00117 
00118   void* mem = 0;
00119   alloc->find(bound_name_.c_str(), mem);
00120 
00121   for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem);
00122        iter->status_ != SHMEM_DATA_END_OF_ALLOC; ++iter) {
00123     if (iter->status_ == SHMEM_DATA_RECV_DONE) {
00124       alloc->free(iter->payload_);
00125       // This will eventually be refcounted so instead of a free(), the previous
00126       // statement would decrement the refcount and check for 0 before free().
00127       // See the 'FUTURE' comment above.
00128       iter->status_ = SHMEM_DATA_FREE;
00129       VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00130                 "releasing control block #%d\n", link_,
00131                 iter - reinterpret_cast<ShmemData*>(mem)), 5);
00132     }
00133   }
00134 
00135   if (!current_data_) {
00136     current_data_ = reinterpret_cast<ShmemData*>(mem);
00137   }
00138 
00139   for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_IN_USE ||
00140          current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00141     if (!start) {
00142       start = current_data_;
00143     } else if (start == current_data_) {
00144       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of "
00145                 "space for control\n", link_), 0);
00146       return -1;
00147     }
00148     if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00149       current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
00150     }
00151   }
00152 
00153   if (current_data_->status_ == SHMEM_DATA_FREE) {
00154     VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00155           "writing at control block #%d header %@ payload %@ len %B\n",
00156           link_, current_data_ - reinterpret_cast<ShmemData*>(mem),
00157           current_data_->transport_header_, payload, pool_alloc_size));
00158     std::memcpy(current_data_->transport_header_, iov[0].iov_base,
00159                 sizeof(current_data_->transport_header_));
00160     current_data_->payload_ = payload;
00161     current_data_->status_ = SHMEM_DATA_IN_USE;
00162   } else {
00163     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00164               "failed to find space for control\n", link_), 0);
00165     return -1;
00166   }
00167 
00168   ACE_OS::sema_post(&peer_semaphore_);
00169 
00170   return pool_alloc_size + iov[0].iov_len;
00171 }
00172 
00173 void
00174 ShmemSendStrategy::stop_i()
00175 {
00176 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00177   ::CloseHandle(peer_semaphore_);
00178 #endif
00179 }
00180 
00181 } // namespace DCPS
00182 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7