OpenDDS::DCPS::ShmemSendStrategy Class Reference

#include <ShmemSendStrategy.h>

Inheritance diagram for OpenDDS::DCPS::ShmemSendStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ShmemSendStrategy:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 ShmemSendStrategy (ShmemDataLink *link)
virtual bool start_i ()
 Let the subclass start.
virtual void stop_i ()
 Let the subclass stop.

Protected Member Functions

virtual ssize_t send_bytes_i (const iovec iov[], int n)

Private Attributes

ShmemDataLinklink_
std::string bound_name_
ACE_sema_t peer_semaphore_
ShmemDatacurrent_data_
const size_t datalink_control_size_

Detailed Description

Definition at line 29 of file ShmemSendStrategy.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ShmemSendStrategy::ShmemSendStrategy ( ShmemDataLink link  ) 

Definition at line 21 of file ShmemSendStrategy.cpp.

References memset(), and peer_semaphore_.

00022   : TransportSendStrategy(0, link->impl(),
00023                           0,  // synch_resource
00024                           link->transport_priority(),
00025                           make_rch<NullSynchStrategy>())
00026   , link_(link)
00027   , current_data_(0)
00028   , datalink_control_size_(link->impl().config().datalink_control_size_)
00029 {
00030 #ifdef ACE_HAS_POSIX_SEM
00031   memset(&peer_semaphore_, 0, sizeof(peer_semaphore_));
00032 #endif
00033 }

Here is the call graph for this function:


Member Function Documentation

ssize_t OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i ( const iovec  iov[],
int  n 
) [protected, virtual]

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 77 of file ShmemSendStrategy.cpp.

References bound_name_, current_data_, OpenDDS::DCPS::TransportHeader::DCPS_PROTOCOL, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::free(), iovec::iov_base, iovec::iov_len, link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::ShmemDataLink::local_allocator(), ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::malloc(), OpenDDS::DCPS::ShmemData::payload_, peer_semaphore_, ACE_OS::sema_post(), OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, OpenDDS::DCPS::SHMEM_DATA_FREE, OpenDDS::DCPS::SHMEM_DATA_IN_USE, OpenDDS::DCPS::SHMEM_DATA_RECV_DONE, OpenDDS::DCPS::TransportSendStrategy::start(), OpenDDS::DCPS::ShmemData::status_, OpenDDS::DCPS::ShmemData::transport_header_, VDBG, and VDBG_LVL.

00078 {
00079   const size_t hdr_sz = sizeof(current_data_->transport_header_);
00080   if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) {
00081     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00082               "expecting iov[0] of size %B, got %B\n",
00083               link_, hdr_sz, iov[0].iov_len), 0);
00084     return -1;
00085   }
00086   // see TransportHeader::valid(), but this is for the marshaled form
00087   if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base,
00088                   sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) {
00089     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00090               "expecting iov[0] to contain the transport header\n", link_), 0);
00091     return -1;
00092   }
00093 
00094   //FUTURE: use the ShmemTransport object to see if we already have the
00095   //        same payload data available in the pool (from other DataLinks),
00096   //        and if so, add a refcount to the start of the "from_pool" allocation
00097 
00098   size_t pool_alloc_size = 0;
00099   for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
00100     pool_alloc_size += iov[i].iov_len;
00101   }
00102 
00103   ShmemAllocator* alloc = link_->local_allocator();
00104   void* from_pool = alloc->malloc(pool_alloc_size);
00105   if (from_pool == 0) {
00106     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00107               "to allocate %B bytes for data\n", link_, pool_alloc_size), 0);
00108     errno = ENOMEM;
00109     return -1;
00110   }
00111 
00112   char* payload = reinterpret_cast<char*>(from_pool);
00113   char* iter = payload;
00114   for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
00115     std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00116     iter += iov[i].iov_len;
00117   }
00118 
00119   void* mem = 0;
00120   alloc->find(bound_name_.c_str(), mem);
00121 
00122   for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem);
00123        iter->status_ != SHMEM_DATA_END_OF_ALLOC; ++iter) {
00124     if (iter->status_ == SHMEM_DATA_RECV_DONE) {
00125       alloc->free(iter->payload_);
00126       // This will eventually be refcounted so instead of a free(), the previous
00127       // statement would decrement the refcount and check for 0 before free().
00128       // See the 'FUTURE' comment above.
00129       iter->status_ = SHMEM_DATA_FREE;
00130       VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00131                 "releasing control block #%d\n", link_,
00132                 iter - reinterpret_cast<ShmemData*>(mem)), 5);
00133     }
00134   }
00135 
00136   if (!current_data_) {
00137     current_data_ = reinterpret_cast<ShmemData*>(mem);
00138   }
00139 
00140   for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_IN_USE ||
00141          current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00142     if (!start) {
00143       start = current_data_;
00144     } else if (start == current_data_) {
00145       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of "
00146                 "space for control\n", link_), 0);
00147       return -1;
00148     }
00149     if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00150       current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
00151     }
00152   }
00153 
00154   if (current_data_->status_ == SHMEM_DATA_FREE) {
00155     VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00156           "writing at control block #%d header %@ payload %@ len %B\n",
00157           link_, current_data_ - reinterpret_cast<ShmemData*>(mem),
00158           current_data_->transport_header_, payload, pool_alloc_size));
00159     std::memcpy(current_data_->transport_header_, iov[0].iov_base,
00160                 sizeof(current_data_->transport_header_));
00161     current_data_->payload_ = payload;
00162     current_data_->status_ = SHMEM_DATA_IN_USE;
00163   } else {
00164     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00165               "failed to find space for control\n", link_), 0);
00166     return -1;
00167   }
00168 
00169   ACE_OS::sema_post(&peer_semaphore_);
00170 
00171   return pool_alloc_size + iov[0].iov_len;
00172 }

Here is the call graph for this function:

bool OpenDDS::DCPS::ShmemSendStrategy::start_i (  )  [virtual]

Let the subclass start.

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 36 of file ShmemSendStrategy.cpp.

References ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::bind(), bound_name_, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::calloc(), datalink_control_size_, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), link_, LM_ERROR, OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_address(), OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_pid(), peer_semaphore_, OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, and VDBG_LVL.

00037 {
00038   bound_name_ = "Write-" + link_->peer_address();
00039   ShmemAllocator* alloc = link_->local_allocator();
00040 
00041   const size_t n_elems = datalink_control_size_ / sizeof(ShmemData),
00042     extra = datalink_control_size_ % sizeof(ShmemData);
00043 
00044   void* mem = alloc->calloc(datalink_control_size_);
00045   if (mem == 0) {
00046     VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00047               "to allocate %B bytes for control\n", link_, datalink_control_size_), 0);
00048     return false;
00049   }
00050 
00051   ShmemData* data = reinterpret_cast<ShmemData*>(mem);
00052   data[(extra >= sizeof(int)) ? n_elems : (n_elems - 1)].status_ =
00053     SHMEM_DATA_END_OF_ALLOC;
00054   alloc->bind(bound_name_.c_str(), mem);
00055 
00056   ShmemAllocator* peer = link_->peer_allocator();
00057   peer->find("Semaphore", mem);
00058   ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00059 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00060   HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false /*bInheritHandle*/,
00061                                  link_->peer_pid());
00062   ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_,
00063                     0 /*dwDesiredAccess -- ignored*/,
00064                     false /*bInheritHandle*/,
00065                     DUPLICATE_SAME_ACCESS /*dwOptions*/);
00066   ::CloseHandle(srcProc);
00067 #elif defined ACE_HAS_POSIX_SEM
00068   peer_semaphore_.sema_ = sem;
00069   peer_semaphore_.name_ = 0;
00070 #else
00071   ACE_UNUSED_ARG(sem);
00072 #endif
00073   return true;
00074 }

Here is the call graph for this function:

void OpenDDS::DCPS::ShmemSendStrategy::stop_i (  )  [virtual]

Let the subclass stop.

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 175 of file ShmemSendStrategy.cpp.

References peer_semaphore_.

00176 {
00177 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00178   ::CloseHandle(peer_semaphore_);
00179 #endif
00180 }


Member Data Documentation

Definition at line 42 of file ShmemSendStrategy.h.

Referenced by send_bytes_i(), and start_i().

Definition at line 44 of file ShmemSendStrategy.h.

Referenced by send_bytes_i().

Definition at line 45 of file ShmemSendStrategy.h.

Referenced by start_i().

Definition at line 41 of file ShmemSendStrategy.h.

Referenced by send_bytes_i(), and start_i().

Definition at line 43 of file ShmemSendStrategy.h.

Referenced by send_bytes_i(), ShmemSendStrategy(), start_i(), and stop_i().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1