OpenDDS  Snapshot(2023/04/28-20:55)
ShmemSendStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "ShmemSendStrategy.h"
9 #include "ShmemDataLink.h"
10 #include "ShmemInst.h"
11 
13 
14 #include <cstring>
15 
17 
18 namespace OpenDDS {
19 namespace DCPS {
20 
22  : TransportSendStrategy(0, link->impl(),
23  0, // synch_resource
24  link->transport_priority(),
26  , link_(link)
27  , current_data_(0)
28  , datalink_control_size_(link->config()->datalink_control_size_)
29 {
30 #ifdef OPENDDS_SHMEM_UNIX
32 #endif
33 }
34 
35 bool
37 {
38  bound_name_ = "Write-" + link_->peer_address();
40 
41  const size_t n_elems = datalink_control_size_ / sizeof(ShmemData),
42  extra = datalink_control_size_ % sizeof(ShmemData);
43 
44  void* mem = 0;
45  if (alloc == 0 || (mem = alloc->calloc(datalink_control_size_)) == 0) {
46  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
47  "to allocate %B bytes for control\n", link_, datalink_control_size_), 0);
48  return false;
49  }
50 
51  ShmemData* data = reinterpret_cast<ShmemData*>(mem);
52  const size_t limit = (extra >= sizeof(int)) ? n_elems : (n_elems - 1);
53  data[limit].status_ = ShmemData::EndOfAlloc;
54  alloc->bind(bound_name_.c_str(), mem);
55 
57  peer->find("Semaphore", mem);
58  ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
59 #if defined OPENDDS_SHMEM_WINDOWS
60  HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false /*bInheritHandle*/,
61  link_->peer_pid());
62  ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_,
63  0 /*dwDesiredAccess -- ignored*/,
64  false /*bInheritHandle*/,
65  DUPLICATE_SAME_ACCESS /*dwOptions*/);
66  ::CloseHandle(srcProc);
67 #elif defined OPENDDS_SHMEM_UNIX
68  peer_semaphore_.sema_ = sem;
69  peer_semaphore_.name_ = 0;
70 #else
71  ACE_UNUSED_ARG(sem);
72 #endif
73  return true;
74 }
75 
76 ssize_t
77 ShmemSendStrategy::send_bytes_i(const iovec iov[], int n)
78 {
79  const size_t hdr_sz = sizeof(current_data_->transport_header_);
80  if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) {
81  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
82  "expecting iov[0] of size %B, got %B\n",
83  link_, hdr_sz, iov[0].iov_len), 0);
84  return -1;
85  }
86  // see TransportHeader::valid(), but this is for the marshaled form
87  if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base,
88  sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) {
89  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
90  "expecting iov[0] to contain the transport header\n", link_), 0);
91  return -1;
92  }
93 
94  //FUTURE: use the ShmemTransport object to see if we already have the
95  // same payload data available in the pool (from other DataLinks),
96  // and if so, add a refcount to the start of the "from_pool" allocation
97 
98  size_t pool_alloc_size = 0;
99  for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
100  pool_alloc_size += iov[i].iov_len;
101  }
102 
104  void* from_pool = 0;
105  if (alloc == 0 || (from_pool = alloc->malloc(pool_alloc_size)) == 0) {
106  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
107  "to allocate %B bytes for data\n", link_, pool_alloc_size), 0);
108  errno = ENOMEM;
109  return -1;
110  }
111 
112  char* payload = reinterpret_cast<char*>(from_pool);
113  char* iter = payload;
114  for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
115  std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
116  iter += iov[i].iov_len;
117  }
118 
119  void* mem = 0;
120  if (-1 == alloc->find(bound_name_.c_str(), mem) || mem == 0) {
121  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
122  "to find control segment with bound name %C\n", link_, bound_name_.c_str()), 0);
123  errno = ENOENT;
124  return -1;
125  }
126 
127  for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem);
128  iter->status_ != ShmemData::EndOfAlloc; ++iter) {
129  if (iter->status_ == ShmemData::RecvDone) {
130  alloc->free(iter->payload_);
131  // This will eventually be refcounted so instead of a free(), the previous
132  // statement would decrement the refcount and check for 0 before free().
133  // See the 'FUTURE' comment above.
134  iter->status_ = ShmemData::Free;
135  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
136  "releasing control block #%d\n", link_,
137  iter - reinterpret_cast<ShmemData*>(mem)), 5);
138  }
139  }
140 
141  if (!current_data_) {
142  current_data_ = reinterpret_cast<ShmemData*>(mem);
143  }
144 
147  if (!start) {
149  } else if (start == current_data_) {
150  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of "
151  "space for control\n", link_), 0);
152  return -1;
153  }
154  if (current_data_[1].status_ == ShmemData::EndOfAlloc) {
155  current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
156  }
157  }
158 
160  VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
161  "writing at control block #%d header %@ payload %@ len %B\n",
162  link_, current_data_ - reinterpret_cast<ShmemData*>(mem),
163  current_data_->transport_header_, payload, pool_alloc_size));
164  std::memcpy(current_data_->transport_header_, iov[0].iov_base,
166  current_data_->payload_ = payload;
168  } else {
169  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
170  "failed to find space for control\n", link_), 0);
171  return -1;
172  }
173 
175 
176  return pool_alloc_size + iov[0].iov_len;
177 }
178 
179 void
181 {
182 #ifdef OPENDDS_SHMEM_WINDOWS
183  ::CloseHandle(peer_semaphore_);
184 #endif
185 }
186 
187 } // namespace DCPS
188 } // namespace OpenDDS
189 
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
int find(const char *name, void *&pointer)
virtual bool start_i()
Let the subclass start.
ShmemAllocator * local_allocator()
int ssize_t
int ShmemSharedSemaphore
ShmemAllocator * peer_allocator()
static const ACE_CDR::Octet DCPS_PROTOCOL[6]
void free(void *ptr)
int bind(const char *name, void *pointer, int duplicates=0)
LM_DEBUG
virtual ssize_t send_bytes_i(const iovec iov[], int n)
virtual void stop_i()
Let the subclass stop.
#define VDBG(DBG_ARGS)
void * malloc(size_t nbytes)
char transport_header_[TRANSPORT_HDR_SERIALIZED_SZ]
Definition: ShmemDataLink.h:46
ACE_Based_Pointer_Basic< char > payload_
Definition: ShmemDataLink.h:47
void * memset(void *s, int c, size_t len)
void * calloc(size_t nbytes, char initial_value='\0')
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int sema_post(ACE_sema_t *s)
ShmemSendStrategy(ShmemDataLink *link)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28