OpenDDS::DCPS::ShmemSendStrategy Class Reference

#include <ShmemSendStrategy.h>

Inheritance diagram for OpenDDS::DCPS::ShmemSendStrategy:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ShmemSendStrategy:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ShmemSendStrategy (ShmemDataLink *link)
virtual bool start_i ()
 Let the subclass start.
virtual void stop_i ()
 Let the subclass stop.

Protected Member Functions

virtual ssize_t send_bytes_i (const iovec iov[], int n)

Private Attributes

ShmemDataLinklink_
std::string bound_name_
ACE_sema_t peer_semaphore_
ShmemDatacurrent_data_

Detailed Description

Definition at line 25 of file ShmemSendStrategy.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ShmemSendStrategy::ShmemSendStrategy ( ShmemDataLink link  )  [explicit]

Definition at line 19 of file ShmemSendStrategy.cpp.

References peer_semaphore_.

00020   : TransportSendStrategy(0, TransportInst_rch(link->config(), false),
00021                           0,  // synch_resource
00022                           link->transport_priority(),
00023                           new NullSynchStrategy)
00024   , link_(link)
00025   , current_data_(0)
00026 {
00027 #ifdef ACE_HAS_POSIX_SEM
00028   peer_semaphore_.name_ = 0;
00029   peer_semaphore_.sema_ = 0;
00030 #endif
00031 }


Member Function Documentation

ssize_t OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i ( const iovec  iov[],
int  n 
) [protected, virtual]

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 76 of file ShmemSendStrategy.cpp.

References bound_name_, current_data_, OpenDDS::DCPS::TransportHeader::DCPS_PROTOCOL, link_, OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, OpenDDS::DCPS::SHMEM_DATA_FREE, OpenDDS::DCPS::SHMEM_DATA_RECV_DONE, OpenDDS::DCPS::ShmemData::transport_header_, and VDBG_LVL.

00077 {
00078   const size_t hdr_sz = sizeof(current_data_->transport_header_);
00079   if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) {
00080     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00081               "expecting iov[0] of size %B, got %B\n",
00082               link_, hdr_sz, iov[0].iov_len), 0);
00083     return -1;
00084   }
00085   // see TransportHeader::valid(), but this is for the marshaled form
00086   if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base,
00087                   sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) {
00088     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00089               "expecting iov[0] to contain the transport header\n", link_), 0);
00090     return -1;
00091   }
00092 
00093   //FUTURE: use the ShmemTransport object to see if we already have the
00094   //        same payload data available in the pool (from other DataLinks),
00095   //        and if so, add a refcount to the start of the "from_pool" allocation
00096 
00097   size_t pool_alloc_size = 0;
00098   for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
00099     pool_alloc_size += iov[i].iov_len;
00100   }
00101 
00102   ShmemAllocator* alloc = link_->local_allocator();
00103   void* from_pool = alloc->malloc(pool_alloc_size);
00104   if (from_pool == 0) {
00105     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00106               "to allocate %B bytes for data\n", link_, pool_alloc_size), 0);
00107     errno = ENOMEM;
00108     return -1;
00109   }
00110 
00111   char* payload = reinterpret_cast<char*>(from_pool);
00112   char* iter = payload;
00113   for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
00114     std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00115     iter += iov[i].iov_len;
00116   }
00117 
00118   void* mem = 0;
00119   alloc->find(bound_name_.c_str(), mem);
00120 
00121   for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem);
00122        iter->status_ != SHMEM_DATA_END_OF_ALLOC; ++iter) {
00123     if (iter->status_ == SHMEM_DATA_RECV_DONE) {
00124       alloc->free(iter->payload_);
00125       // This will eventually be refcounted so instead of a free(), the previous
00126       // statement would decrement the refcount and check for 0 before free().
00127       // See the 'FUTURE' comment above.
00128       iter->status_ = SHMEM_DATA_FREE;
00129       VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00130                 "releasing control block #%d\n", link_,
00131                 iter - reinterpret_cast<ShmemData*>(mem)), 5);
00132     }
00133   }
00134 
00135   if (!current_data_) {
00136     current_data_ = reinterpret_cast<ShmemData*>(mem);
00137   }
00138 
00139   for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_IN_USE ||
00140          current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00141     if (!start) {
00142       start = current_data_;
00143     } else if (start == current_data_) {
00144       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of "
00145                 "space for control\n", link_), 0);
00146       return -1;
00147     }
00148     if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00149       current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
00150     }
00151   }
00152 
00153   if (current_data_->status_ == SHMEM_DATA_FREE) {
00154     VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00155           "writing at control block #%d header %@ payload %@ len %B\n",
00156           link_, current_data_ - reinterpret_cast<ShmemData*>(mem),
00157           current_data_->transport_header_, payload, pool_alloc_size));
00158     std::memcpy(current_data_->transport_header_, iov[0].iov_base,
00159                 sizeof(current_data_->transport_header_));
00160     current_data_->payload_ = payload;
00161     current_data_->status_ = SHMEM_DATA_IN_USE;
00162   } else {
00163     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00164               "failed to find space for control\n", link_), 0);
00165     return -1;
00166   }
00167 
00168   ACE_OS::sema_post(&peer_semaphore_);
00169 
00170   return pool_alloc_size + iov[0].iov_len;
00171 }

bool OpenDDS::DCPS::ShmemSendStrategy::start_i (  )  [virtual]

Let the subclass start.

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 34 of file ShmemSendStrategy.cpp.

References bound_name_, OpenDDS::DCPS::ShmemDataLink::config(), OpenDDS::DCPS::ShmemInst::datalink_control_size_, link_, OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_address(), OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_pid(), peer_semaphore_, OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, and VDBG_LVL.

00035 {
00036   bound_name_ = "Write-" + link_->peer_address();
00037   ShmemAllocator* alloc = link_->local_allocator();
00038 
00039   const size_t n_bytes = link_->config()->datalink_control_size_,
00040     n_elems = n_bytes / sizeof(ShmemData),
00041     extra = n_bytes % sizeof(ShmemData);
00042 
00043   void* mem = alloc->calloc(n_bytes);
00044   if (mem == 0) {
00045     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00046               "to allocate %B bytes for control\n", link_, n_bytes), 0);
00047     return false;
00048   }
00049 
00050   ShmemData* data = reinterpret_cast<ShmemData*>(mem);
00051   data[(extra >= sizeof(int)) ? n_elems : (n_elems - 1)].status_ =
00052     SHMEM_DATA_END_OF_ALLOC;
00053   alloc->bind(bound_name_.c_str(), mem);
00054 
00055   ShmemAllocator* peer = link_->peer_allocator();
00056   peer->find("Semaphore", mem);
00057   ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00058 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00059   HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false /*bInheritHandle*/,
00060                                  link_->peer_pid());
00061   ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_,
00062                     0 /*dwDesiredAccess -- ignored*/,
00063                     false /*bInheritHandle*/,
00064                     DUPLICATE_SAME_ACCESS /*dwOptions*/);
00065   ::CloseHandle(srcProc);
00066 #elif defined ACE_HAS_POSIX_SEM
00067   peer_semaphore_.sema_ = sem;
00068   peer_semaphore_.name_ = 0;
00069 #else
00070   ACE_UNUSED_ARG(sem);
00071 #endif
00072   return true;
00073 }

void OpenDDS::DCPS::ShmemSendStrategy::stop_i (  )  [virtual]

Let the subclass stop.

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 174 of file ShmemSendStrategy.cpp.

References peer_semaphore_.

00175 {
00176 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00177   ::CloseHandle(peer_semaphore_);
00178 #endif
00179 }


Member Data Documentation

std::string OpenDDS::DCPS::ShmemSendStrategy::bound_name_ [private]

Definition at line 38 of file ShmemSendStrategy.h.

Referenced by send_bytes_i(), and start_i().

ShmemData* OpenDDS::DCPS::ShmemSendStrategy::current_data_ [private]

Definition at line 40 of file ShmemSendStrategy.h.

Referenced by send_bytes_i().

ShmemDataLink* OpenDDS::DCPS::ShmemSendStrategy::link_ [private]

Definition at line 37 of file ShmemSendStrategy.h.

Referenced by send_bytes_i(), and start_i().

ACE_sema_t OpenDDS::DCPS::ShmemSendStrategy::peer_semaphore_ [private]

Definition at line 39 of file ShmemSendStrategy.h.

Referenced by ShmemSendStrategy(), start_i(), and stop_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:38 2016 for OpenDDS by  doxygen 1.4.7