OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::ShmemReceiveStrategy Class Reference

#include <ShmemReceiveStrategy.h>

Inheritance diagram for OpenDDS::DCPS::ShmemReceiveStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ShmemReceiveStrategy:
Collaboration graph
[legend]

Public Member Functions

 ShmemReceiveStrategy (ShmemDataLink *link)
 
void read ()
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
virtual ~TransportReceiveStrategy ()
 
int start ()
 
void stop ()
 
int handle_dds_input (ACE_HANDLE fd)
 
virtual void relink (bool do_suspend=true)
 
const TransportHeaderreceived_header () const
 
TransportHeaderreceived_header ()
 
const DataSampleHeaderreceived_sample_header () const
 
DataSampleHeaderreceived_sample_header ()
 
ACE_Message_Blockto_msgblock (const ReceivedDataSample &sample)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportStrategy
virtual ~TransportStrategy ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
 Only our subclass knows how to do this. More...
 
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
 Called when there is a ReceivedDataSample to be delivered. More...
 
virtual int start_i ()
 Let the subclass start. More...
 
virtual void stop_i ()
 Let the subclass stop. More...
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
 TransportReceiveStrategy (const TransportInst_rch &config, size_t receive_buffers_count=RECEIVE_BUFFERS)
 
virtual bool check_header (const TransportHeader &header)
 Check the transport header for suitability. More...
 
virtual bool check_header (const DataSampleHeader &header)
 Check the data sample header for suitability. More...
 
virtual void begin_transport_header_processing ()
 Begin Current Transport Header Processing. More...
 
virtual void end_transport_header_processing ()
 End Current Transport Header Processing. More...
 
virtual void finish_message ()
 
int skip_bad_pdus ()
 Ignore bad PDUs by skipping over them. More...
 
void reset ()
 
size_t pdu_remaining () const
 
size_t successor_index (size_t index) const
 Manage an index into the receive buffer array. More...
 
void update_buffer_index (bool &done)
 
virtual bool reassemble (ReceivedDataSample &data)
 
 OPENDDS_VECTOR (ACE_Message_Block *) receive_buffers_
 Set of receive buffers in use. More...
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Attributes

ShmemDataLinklink_
 
std::string bound_name_
 
ShmemDatacurrent_data_
 
size_t partial_recv_remaining_
 
const char * partial_recv_ptr_
 
ACE_Thread_Mutex mutex_
 

Additional Inherited Members

- Static Public Attributes inherited from OpenDDS::DCPS::TransportReceiveConstants
static const size_t RECEIVE_BUFFERS = DEFAULT_TRANSPORT_RECEIVE_BUFFERS
 
static const size_t BUFFER_LOW_WATER = 4096
 
static const size_t MESSAGE_BLOCKS = 1000
 
static const size_t DATA_BLOCKS = 100
 
- Protected Attributes inherited from OpenDDS::DCPS::TransportReceiveStrategy<>
bool gracefully_disconnected_
 Flag indicates if the GRACEFUL_DISCONNECT message is received. More...
 
size_t receive_sample_remaining_
 Bytes remaining in the current DataSample. More...
 
TransportHeader receive_transport_header_
 Current receive TransportHeader. More...
 
TransportMessageBlockAllocator mb_allocator_
 
TransportDataBlockAllocator db_allocator_
 
TransportDataAllocator data_allocator_
 
ACE_Lock_Adapter< ACE_SYNCH_MUTEXreceive_lock_
 Locking strategy for the allocators. More...
 
size_t buffer_index_
 Current receive buffer index in use. More...
 
DataSampleHeader data_sample_header_
 Current data sample header. More...
 
ACE_Message_Blockpayload_
 
bool good_pdu_
 
size_t pdu_remaining_
 Amount of the current PDU that has not been processed yet. More...
 

Detailed Description

Definition at line 25 of file ShmemReceiveStrategy.h.

Constructor & Destructor Documentation

◆ ShmemReceiveStrategy()

OpenDDS::DCPS::ShmemReceiveStrategy::ShmemReceiveStrategy ( ShmemDataLink link)
explicit

Member Function Documentation

◆ deliver_sample()

void OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
)
protectedvirtual

Called when there is a ReceivedDataSample to be delivered.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 181 of file ShmemReceiveStrategy.cpp.

References OpenDDS::DCPS::ShmemDataLink::control_received(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, link_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::REQUEST_ACK, OpenDDS::DCPS::ShmemDataLink::request_ack_received(), and OpenDDS::DCPS::TRANSPORT_CONTROL.

183 {
184  switch (sample.header_.message_id_) {
185 
186  case REQUEST_ACK:
187  link_->request_ack_received(sample);
188  break;
189 
190  case TRANSPORT_CONTROL:
191  link_->control_received(sample);
192  break;
193 
194  default:
195  link_->data_received(sample);
196  }
197 }
void request_ack_received(ReceivedDataSample &sample)
void control_received(ReceivedDataSample &sample)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690

◆ read()

void OpenDDS::DCPS::ShmemReceiveStrategy::read ( void  )

Definition at line 33 of file ShmemReceiveStrategy.cpp.

References bound_name_, current_data_, OpenDDS::DCPS::ShmemData::EndOfAlloc, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), OpenDDS::DCPS::ShmemData::Free, OpenDDS::DCPS::TransportReceiveStrategy<>::handle_dds_input(), link_, LM_DEBUG, OpenDDS::DCPS::ShmemDataLink::local_address(), partial_recv_remaining_, OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::ShmemData::RecvDone, OpenDDS::DCPS::TransportReceiveStrategy<>::start(), OpenDDS::DCPS::ShmemData::status_, VDBG, and VDBG_LVL.

34 {
36  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
37  "resuming partial recv\n", link_));
38  handle_dds_input(ACE_INVALID_HANDLE);
39  return;
40  }
41 
42  if (bound_name_.empty()) {
43  bound_name_ = "Write-" + link_->local_address();
44  }
45 
47  void* mem = 0;
48  if (alloc == 0 || -1 == alloc->find(bound_name_.c_str(), mem)) {
49  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
50  "peer allocator not found, receive_bytes will close link\n",
51  link_), 1);
52  handle_dds_input(ACE_INVALID_HANDLE); // will return 0 to the TRecvStrateg.
53  return;
54  }
55 
56  if (!current_data_) {
57  current_data_ = reinterpret_cast<ShmemData*>(mem);
58  }
59 
60  for (ShmemData* start = 0; current_data_->status_ == ShmemData::Free ||
62  if (!start) {
64  } else if (start == current_data_) {
65  return; // none found => don't call handle_dds_input()
66  }
67  if (current_data_[1].status_ == ShmemData::EndOfAlloc) {
68  current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
69  }
70  }
71 
72  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
73  "reading at control block #%d\n",
74  link_, current_data_ - reinterpret_cast<ShmemData*>(mem)));
75  // If we get this far, current_data_ points to the first ShmemData::DataInUse.
76  // handle_dds_input() will call our receive_bytes() to get the data.
77  handle_dds_input(ACE_INVALID_HANDLE);
78 }
ShmemAllocator * peer_allocator()
#define VDBG(DBG_ARGS)
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ receive_bytes()

ssize_t OpenDDS::DCPS::ShmemReceiveStrategy::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr remote_address,
ACE_HANDLE  fd,
bool &  stop 
)
protectedvirtual

Only our subclass knows how to do this.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 81 of file ShmemReceiveStrategy.cpp.

References bound_name_, current_data_, ENOBUFS, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), OpenDDS::DCPS::TransportHeader::get_length(), OpenDDS::DCPS::TransportReceiveStrategy<>::gracefully_disconnected_, OpenDDS::DCPS::ShmemData::InUse, link_, LM_DEBUG, LM_ERROR, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::memory_pool(), partial_recv_ptr_, partial_recv_remaining_, OpenDDS::DCPS::ShmemData::payload_, OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::ShmemData::RecvDone, OpenDDS::DCPS::ShmemDataLink::signal_semaphore(), OpenDDS::DCPS::ShmemData::status_, OpenDDS::DCPS::ShmemData::transport_header_, VDBG, and VDBG_LVL.

86 {
87  VDBG((LM_DEBUG,
88  "(%P|%t) ShmemReceiveStrategy::receive_bytes link %@\n", link_));
89 
90  // check that the writer's shared memory is still available
92  void* mem;
93  if (!alloc || -1 == alloc->find(bound_name_.c_str(), mem) || !current_data_
95  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes closing\n"),
96  1);
97  gracefully_disconnected_ = true; // do not attempt reconnect via relink()
98  return 0; // close "connection"
99  }
100 
101  ssize_t total = 0;
102 
103  const char* src_iter;
104  char* dst_iter = 0;
105  int i = 0; // current iovec index in iov[]
106  size_t remaining;
107 
109  remaining = partial_recv_remaining_;
110  src_iter = partial_recv_ptr_;
111  // iov_base is void* on POSIX but char* on Win32, we'll have to cast:
112  dst_iter = (char*)iov[0].iov_base;
113 
114  } else {
116  const size_t hdr_sz = sizeof(current_data_->transport_header_);
117  // BUFFER_LOW_WATER in the framework ensures a large enough buffer
118  if (static_cast<size_t>(iov[0].iov_len) <= hdr_sz) {
119  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
120  "receive buffer of length %d is too small\n",
121  iov[0].iov_len), 0);
122  errno = ENOBUFS;
123  return -1;
124  }
125 
126  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
127  "header %@ payload %@ len %B\n", current_data_->transport_header_,
128  (char*)current_data_->payload_, remaining));
129  std::memcpy(iov[0].iov_base, current_data_->transport_header_, hdr_sz);
130  total += hdr_sz;
131  src_iter = current_data_->payload_;
132  if (static_cast<size_t>(iov[0].iov_len) > hdr_sz) {
133  dst_iter = (char*)iov[0].iov_base + hdr_sz;
134  } else if (n > 1) {
135  dst_iter = (char*)iov[1].iov_base;
136  i = 1;
137  }
138  }
139 
140  for (; i < n && remaining; ++i) {
141  const size_t space = (i == 0) ? iov[i].iov_len - total : iov[i].iov_len,
142  chunk = std::min(space, remaining);
143 
144 #ifdef OPENDDS_SHMEM_WINDOWS
145  if (alloc->memory_pool().remap((void*)(src_iter + chunk - 1)) == -1) {
146  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
147  "shared memory pool couldn't be extended\n"), 0);
148  errno = ENOMEM;
149  return -1;
150  }
151 #endif
152 
153  std::memcpy(dst_iter, src_iter, chunk);
154  if (i < n - 1) {
155  dst_iter = (char*)iov[i + 1].iov_base;
156  }
157  remaining -= chunk;
158  total += chunk;
159  src_iter += chunk;
160  }
161 
162  if (remaining) {
163  partial_recv_remaining_ = remaining;
164  partial_recv_ptr_ = src_iter;
165  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
166  "receive was partial\n"));
168 
169  } else {
171  partial_recv_ptr_ = 0;
172  VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
173  "receive done\n"));
175  }
176 
177  return total;
178 }
ShmemAllocator * peer_allocator()
int ssize_t
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
#define VDBG(DBG_ARGS)
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
static ACE_UINT32 get_length(const char *marshaled_transport_header)
#define VDBG_LVL(DBG_ARGS, LEVEL)
char transport_header_[TRANSPORT_HDR_SERIALIZED_SZ]
Definition: ShmemDataLink.h:46
ACE_Based_Pointer_Basic< char > payload_
Definition: ShmemDataLink.h:47
#define ENOBUFS

◆ start_i()

int OpenDDS::DCPS::ShmemReceiveStrategy::start_i ( )
protectedvirtual

Let the subclass start.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 200 of file ShmemReceiveStrategy.cpp.

201 {
202  return 0;
203 }

◆ stop_i()

void OpenDDS::DCPS::ShmemReceiveStrategy::stop_i ( )
protectedvirtual

Let the subclass stop.

Implements OpenDDS::DCPS::TransportReceiveStrategy<>.

Definition at line 206 of file ShmemReceiveStrategy.cpp.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL.

207 {
208 }

Member Data Documentation

◆ bound_name_

std::string OpenDDS::DCPS::ShmemReceiveStrategy::bound_name_
private

Definition at line 47 of file ShmemReceiveStrategy.h.

Referenced by read(), and receive_bytes().

◆ current_data_

ShmemData* OpenDDS::DCPS::ShmemReceiveStrategy::current_data_
private

Definition at line 48 of file ShmemReceiveStrategy.h.

Referenced by read(), and receive_bytes().

◆ link_

ShmemDataLink* OpenDDS::DCPS::ShmemReceiveStrategy::link_
private

Definition at line 46 of file ShmemReceiveStrategy.h.

Referenced by deliver_sample(), read(), and receive_bytes().

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::ShmemReceiveStrategy::mutex_
private

Definition at line 51 of file ShmemReceiveStrategy.h.

◆ partial_recv_ptr_

const char* OpenDDS::DCPS::ShmemReceiveStrategy::partial_recv_ptr_
private

Definition at line 50 of file ShmemReceiveStrategy.h.

Referenced by receive_bytes().

◆ partial_recv_remaining_

size_t OpenDDS::DCPS::ShmemReceiveStrategy::partial_recv_remaining_
private

Definition at line 49 of file ShmemReceiveStrategy.h.

Referenced by read(), and receive_bytes().


The documentation for this class was generated from the following files: