OpenDDS  Snapshot(2023/04/28-20:55)
ShmemReceiveStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "ShmemReceiveStrategy.h"
9 
10 #include "ShmemDataLink.h"
11 #include "ShmemInst.h"
12 #include "ShmemTransport.h"
13 
15 
16 #include <cstring>
17 
19 
20 namespace OpenDDS {
21 namespace DCPS {
22 
24  : TransportReceiveStrategy<>(link->config())
25  , link_(link)
26  , current_data_(0)
27  , partial_recv_remaining_(0)
28  , partial_recv_ptr_(0)
29 {
30 }
31 
32 void
34 {
36  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
37  "resuming partial recv\n", link_));
38  handle_dds_input(ACE_INVALID_HANDLE);
39  return;
40  }
41 
42  if (bound_name_.empty()) {
43  bound_name_ = "Write-" + link_->local_address();
44  }
45 
47  void* mem = 0;
48  if (alloc == 0 || -1 == alloc->find(bound_name_.c_str(), mem)) {
49  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
50  "peer allocator not found, receive_bytes will close link\n",
51  link_), 1);
52  handle_dds_input(ACE_INVALID_HANDLE); // will return 0 to the TRecvStrateg.
53  return;
54  }
55 
56  if (!current_data_) {
57  current_data_ = reinterpret_cast<ShmemData*>(mem);
58  }
59 
62  if (!start) {
64  } else if (start == current_data_) {
65  return; // none found => don't call handle_dds_input()
66  }
67  if (current_data_[1].status_ == ShmemData::EndOfAlloc) {
68  current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
69  }
70  }
71 
72  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
73  "reading at control block #%d\n",
74  link_, current_data_ - reinterpret_cast<ShmemData*>(mem)));
75  // If we get this far, current_data_ points to the first ShmemData::DataInUse.
76  // handle_dds_input() will call our receive_bytes() to get the data.
77  handle_dds_input(ACE_INVALID_HANDLE);
78 }
79 
80 ssize_t
82  int n,
83  ACE_INET_Addr& /*remote_address*/,
84  ACE_HANDLE /*fd*/,
85  bool& /*stop*/)
86 {
87  VDBG((LM_DEBUG,
88  "(%P|%t) ShmemReceiveStrategy::receive_bytes link %@\n", link_));
89 
90  // check that the writer's shared memory is still available
92  void* mem;
93  if (!alloc || -1 == alloc->find(bound_name_.c_str(), mem) || !current_data_
95  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes closing\n"),
96  1);
97  gracefully_disconnected_ = true; // do not attempt reconnect via relink()
98  return 0; // close "connection"
99  }
100 
101  ssize_t total = 0;
102 
103  const char* src_iter;
104  char* dst_iter = 0;
105  int i = 0; // current iovec index in iov[]
106  size_t remaining;
107 
109  remaining = partial_recv_remaining_;
110  src_iter = partial_recv_ptr_;
111  // iov_base is void* on POSIX but char* on Win32, we'll have to cast:
112  dst_iter = (char*)iov[0].iov_base;
113 
114  } else {
116  const size_t hdr_sz = sizeof(current_data_->transport_header_);
117  // BUFFER_LOW_WATER in the framework ensures a large enough buffer
118  if (static_cast<size_t>(iov[0].iov_len) <= hdr_sz) {
119  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
120  "receive buffer of length %d is too small\n",
121  iov[0].iov_len), 0);
122  errno = ENOBUFS;
123  return -1;
124  }
125 
126  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
127  "header %@ payload %@ len %B\n", current_data_->transport_header_,
128  (char*)current_data_->payload_, remaining));
129  std::memcpy(iov[0].iov_base, current_data_->transport_header_, hdr_sz);
130  total += hdr_sz;
131  src_iter = current_data_->payload_;
132  if (static_cast<size_t>(iov[0].iov_len) > hdr_sz) {
133  dst_iter = (char*)iov[0].iov_base + hdr_sz;
134  } else if (n > 1) {
135  dst_iter = (char*)iov[1].iov_base;
136  i = 1;
137  }
138  }
139 
140  for (; i < n && remaining; ++i) {
141  const size_t space = (i == 0) ? iov[i].iov_len - total : iov[i].iov_len,
142  chunk = std::min(space, remaining);
143 
144 #ifdef OPENDDS_SHMEM_WINDOWS
145  if (alloc->memory_pool().remap((void*)(src_iter + chunk - 1)) == -1) {
146  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
147  "shared memory pool couldn't be extended\n"), 0);
148  errno = ENOMEM;
149  return -1;
150  }
151 #endif
152 
153  std::memcpy(dst_iter, src_iter, chunk);
154  if (i < n - 1) {
155  dst_iter = (char*)iov[i + 1].iov_base;
156  }
157  remaining -= chunk;
158  total += chunk;
159  src_iter += chunk;
160  }
161 
162  if (remaining) {
163  partial_recv_remaining_ = remaining;
164  partial_recv_ptr_ = src_iter;
165  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
166  "receive was partial\n"));
168 
169  } else {
171  partial_recv_ptr_ = 0;
172  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
173  "receive done\n"));
175  }
176 
177  return total;
178 }
179 
180 void
182  const ACE_INET_Addr& /*remote_address*/)
183 {
184  switch (sample.header_.message_id_) {
185 
186  case REQUEST_ACK:
187  link_->request_ack_received(sample);
188  break;
189 
190  case TRANSPORT_CONTROL:
191  link_->control_received(sample);
192  break;
193 
194  default:
195  link_->data_received(sample);
196  }
197 }
198 
199 int
201 {
202  return 0;
203 }
204 
205 void
207 {
208 }
209 
210 } // namespace DCPS
211 } // namespace OpenDDS
212 
DataSampleHeader header_
The demarshalled sample header.
virtual void stop_i()
Let the subclass stop.
char message_id_
The enum MessageId.
static ACE_UINT32 get_length(const char *marshaled_transport_header)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
int find(const char *name, void *&pointer)
int ssize_t
ShmemAllocator * peer_allocator()
virtual void deliver_sample(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
Called when there is a ReceivedDataSample to be delivered.
LM_DEBUG
#define VDBG(DBG_ARGS)
Holds a data sample received by the transport.
void control_received(ReceivedDataSample &sample)
char transport_header_[TRANSPORT_HDR_SERIALIZED_SZ]
Definition: ShmemDataLink.h:46
ACE_Based_Pointer_Basic< char > payload_
Definition: ShmemDataLink.h:47
virtual ssize_t receive_bytes(iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
Only our subclass knows how to do this.
virtual int start_i()
Let the subclass start.
void request_ack_received(ReceivedDataSample &sample)
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define ENOBUFS
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
MEMORY_POOL & memory_pool(void)