OpenDDS  Snapshot(2023/04/28-20:55)
Public Member Functions | Protected Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::ShmemSendStrategy Class Reference

#include <ShmemSendStrategy.h>

Inheritance diagram for OpenDDS::DCPS::ShmemSendStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ShmemSendStrategy:
Collaboration graph
[legend]

Public Member Functions

 ShmemSendStrategy (ShmemDataLink *link)
 
virtual bool start_i ()
 Let the subclass start. More...
 
virtual void stop_i ()
 Let the subclass stop. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
virtual ~TransportSendStrategy ()
 
void send_buffer (TransportSendBuffer *send_buffer)
 Assigns an optional send buffer. More...
 
int start ()
 
void stop ()
 
void send_start ()
 
void send (TransportQueueElement *element, bool relink=true)
 
void send_stop (GUID_t repoId)
 
RemoveResult remove_sample (const DataSampleElement *sample)
 
void remove_all_msgs (const GUID_t &pub_id)
 
virtual WorkOutcome perform_work ()
 
virtual void relink (bool do_suspend=true)
 
void suspend_send ()
 
void resume_send ()
 
void terminate_send (bool graceful_disconnecting=false)
 Remove all samples in the backpressure queue and packet queue. More...
 
virtual void terminate_send_if_suspended ()
 
void link_released (bool flag)
 
bool isDirectMode ()
 
virtual ACE_HANDLE get_handle ()
 
void deliver_ack_request (TransportQueueElement *element)
 
bool fragmentation_helper (TransportQueueElement *original_element, TqeVector &elements_to_send)
 
void clear (SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
 
SendMode mode () const
 Access the current sending mode. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::ThreadSynchWorker
virtual ~ThreadSynchWorker ()
 
virtual void schedule_output ()
 Indicate that queued data is available to be sent. More...
 
std::size_t id () const
 DataLink reference value for diagnostics. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

virtual ssize_t send_bytes_i (const iovec iov[], int n)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
 TransportSendStrategy (std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
 
virtual ssize_t send_bytes (const iovec iov[], int n, int &bp)
 
virtual ssize_t non_blocking_send (const iovec iov[], int n, int &bp)
 
virtual void prepare_header_i ()
 Specific implementation processing of prepared packet header. More...
 
virtual void prepare_packet_i ()
 Specific implementation processing of prepared packet. More...
 
TransportQueueElementcurrent_packet_first_element () const
 
virtual size_t max_message_size () const
 
void set_graceful_disconnecting (bool flag)
 Set graceful disconnecting flag. More...
 
virtual void add_delayed_notification (TransportQueueElement *element)
 
bool send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0)
 
virtual Security::SecurityConfig_rch security_config () const
 
virtual RemoveResult do_remove_sample (const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
 Implement framework chain visitations to remove a sample. More...
 
ThreadSynchsynch () const
 
void set_header_source (ACE_INT64 source)
 
- Protected Member Functions inherited from OpenDDS::DCPS::ThreadSynchWorker
 ThreadSynchWorker (std::size_t id=0)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Attributes

ShmemDataLinklink_
 
std::string bound_name_
 
ACE_sema_t peer_semaphore_
 
ShmemDatacurrent_data_
 
const size_t datalink_control_size_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::TransportSendStrategy
enum  SendMode {
  MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND,
  MODE_TERMINATED
}
 
typedef BasicQueue< TransportQueueElementQueueType
 
- Public Types inherited from OpenDDS::DCPS::ThreadSynchWorker
enum  WorkOutcome { WORK_OUTCOME_MORE_TO_DO, WORK_OUTCOME_NO_MORE_TO_DO, WORK_OUTCOME_CLOGGED_RESOURCE, WORK_OUTCOME_BROKEN_RESOURCE }
 
- Static Public Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
static int mb_to_iov (const ACE_Message_Block &msg, iovec *iov)
 
- Static Public Attributes inherited from OpenDDS::DCPS::TransportSendStrategy
static const size_t UDP_MAX_MESSAGE_SIZE = 65466
 

Detailed Description

Definition at line 29 of file ShmemSendStrategy.h.

Constructor & Destructor Documentation

◆ ShmemSendStrategy()

OpenDDS::DCPS::ShmemSendStrategy::ShmemSendStrategy ( ShmemDataLink link)

Definition at line 21 of file ShmemSendStrategy.cpp.

References memset(), and peer_semaphore_.

22  : TransportSendStrategy(0, link->impl(),
23  0, // synch_resource
24  link->transport_priority(),
25  make_rch<NullSynchStrategy>())
26  , link_(link)
27  , current_data_(0)
28  , datalink_control_size_(link->config()->datalink_control_size_)
29 {
30 #ifdef OPENDDS_SHMEM_UNIX
32 #endif
33 }
TransportSendStrategy(std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
void * memset(void *s, int c, size_t len)

Member Function Documentation

◆ send_bytes_i()

ssize_t OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i ( const iovec  iov[],
int  n 
)
protectedvirtual

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 77 of file ShmemSendStrategy.cpp.

References bound_name_, current_data_, OpenDDS::DCPS::TransportHeader::DCPS_PROTOCOL, OpenDDS::DCPS::ShmemData::EndOfAlloc, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::free(), OpenDDS::DCPS::ShmemData::Free, OpenDDS::DCPS::ShmemData::InUse, link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::ShmemDataLink::local_allocator(), ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::malloc(), OpenDDS::DCPS::ShmemData::payload_, peer_semaphore_, OpenDDS::DCPS::ShmemData::RecvDone, ACE_OS::sema_post(), OpenDDS::DCPS::TransportSendStrategy::start(), OpenDDS::DCPS::ShmemData::status_, OpenDDS::DCPS::ShmemData::transport_header_, VDBG, and VDBG_LVL.

78 {
79  const size_t hdr_sz = sizeof(current_data_->transport_header_);
80  if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) {
81  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
82  "expecting iov[0] of size %B, got %B\n",
83  link_, hdr_sz, iov[0].iov_len), 0);
84  return -1;
85  }
86  // see TransportHeader::valid(), but this is for the marshaled form
87  if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base,
88  sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) {
89  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
90  "expecting iov[0] to contain the transport header\n", link_), 0);
91  return -1;
92  }
93 
94  //FUTURE: use the ShmemTransport object to see if we already have the
95  // same payload data available in the pool (from other DataLinks),
96  // and if so, add a refcount to the start of the "from_pool" allocation
97 
98  size_t pool_alloc_size = 0;
99  for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
100  pool_alloc_size += iov[i].iov_len;
101  }
102 
104  void* from_pool = 0;
105  if (alloc == 0 || (from_pool = alloc->malloc(pool_alloc_size)) == 0) {
106  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
107  "to allocate %B bytes for data\n", link_, pool_alloc_size), 0);
108  errno = ENOMEM;
109  return -1;
110  }
111 
112  char* payload = reinterpret_cast<char*>(from_pool);
113  char* iter = payload;
114  for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) {
115  std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
116  iter += iov[i].iov_len;
117  }
118 
119  void* mem = 0;
120  if (-1 == alloc->find(bound_name_.c_str(), mem) || mem == 0) {
121  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
122  "to find control segment with bound name %C\n", link_, bound_name_.c_str()), 0);
123  errno = ENOENT;
124  return -1;
125  }
126 
127  for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem);
128  iter->status_ != ShmemData::EndOfAlloc; ++iter) {
129  if (iter->status_ == ShmemData::RecvDone) {
130  alloc->free(iter->payload_);
131  // This will eventually be refcounted so instead of a free(), the previous
132  // statement would decrement the refcount and check for 0 before free().
133  // See the 'FUTURE' comment above.
134  iter->status_ = ShmemData::Free;
135  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
136  "releasing control block #%d\n", link_,
137  iter - reinterpret_cast<ShmemData*>(mem)), 5);
138  }
139  }
140 
141  if (!current_data_) {
142  current_data_ = reinterpret_cast<ShmemData*>(mem);
143  }
144 
145  for (ShmemData* start = 0; current_data_->status_ == ShmemData::InUse ||
147  if (!start) {
149  } else if (start == current_data_) {
150  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of "
151  "space for control\n", link_), 0);
152  return -1;
153  }
154  if (current_data_[1].status_ == ShmemData::EndOfAlloc) {
155  current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
156  }
157  }
158 
160  VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
161  "writing at control block #%d header %@ payload %@ len %B\n",
162  link_, current_data_ - reinterpret_cast<ShmemData*>(mem),
163  current_data_->transport_header_, payload, pool_alloc_size));
164  std::memcpy(current_data_->transport_header_, iov[0].iov_base,
166  current_data_->payload_ = payload;
168  } else {
169  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
170  "failed to find space for control\n", link_), 0);
171  return -1;
172  }
173 
175 
176  return pool_alloc_size + iov[0].iov_len;
177 }
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
ShmemAllocator * local_allocator()
static const ACE_CDR::Octet DCPS_PROTOCOL[6]
#define VDBG(DBG_ARGS)
char transport_header_[TRANSPORT_HDR_SERIALIZED_SZ]
Definition: ShmemDataLink.h:46
ACE_Based_Pointer_Basic< char > payload_
Definition: ShmemDataLink.h:47
#define VDBG_LVL(DBG_ARGS, LEVEL)
int sema_post(ACE_sema_t *s)

◆ start_i()

bool OpenDDS::DCPS::ShmemSendStrategy::start_i ( )
virtual

Let the subclass start.

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 36 of file ShmemSendStrategy.cpp.

References ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::bind(), bound_name_, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::calloc(), datalink_control_size_, OpenDDS::DCPS::ShmemData::EndOfAlloc, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), link_, LM_ERROR, OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_address(), OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_pid(), peer_semaphore_, OpenDDS::DCPS::ShmemData::status_, and VDBG_LVL.

37 {
38  bound_name_ = "Write-" + link_->peer_address();
40 
41  const size_t n_elems = datalink_control_size_ / sizeof(ShmemData),
42  extra = datalink_control_size_ % sizeof(ShmemData);
43 
44  void* mem = 0;
45  if (alloc == 0 || (mem = alloc->calloc(datalink_control_size_)) == 0) {
46  VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
47  "to allocate %B bytes for control\n", link_, datalink_control_size_), 0);
48  return false;
49  }
50 
51  ShmemData* data = reinterpret_cast<ShmemData*>(mem);
52  const size_t limit = (extra >= sizeof(int)) ? n_elems : (n_elems - 1);
53  data[limit].status_ = ShmemData::EndOfAlloc;
54  alloc->bind(bound_name_.c_str(), mem);
55 
57  peer->find("Semaphore", mem);
58  ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
59 #if defined OPENDDS_SHMEM_WINDOWS
60  HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false /*bInheritHandle*/,
61  link_->peer_pid());
62  ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_,
63  0 /*dwDesiredAccess -- ignored*/,
64  false /*bInheritHandle*/,
65  DUPLICATE_SAME_ACCESS /*dwOptions*/);
66  ::CloseHandle(srcProc);
67 #elif defined OPENDDS_SHMEM_UNIX
68  peer_semaphore_.sema_ = sem;
69  peer_semaphore_.name_ = 0;
70 #else
71  ACE_UNUSED_ARG(sem);
72 #endif
73  return true;
74 }
int find(const char *name, void *&pointer)
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
ShmemAllocator * local_allocator()
int ShmemSharedSemaphore
ShmemAllocator * peer_allocator()
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ stop_i()

void OpenDDS::DCPS::ShmemSendStrategy::stop_i ( )
virtual

Let the subclass stop.

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 180 of file ShmemSendStrategy.cpp.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL, and peer_semaphore_.

181 {
182 #ifdef OPENDDS_SHMEM_WINDOWS
183  ::CloseHandle(peer_semaphore_);
184 #endif
185 }

Member Data Documentation

◆ bound_name_

std::string OpenDDS::DCPS::ShmemSendStrategy::bound_name_
private

Definition at line 42 of file ShmemSendStrategy.h.

Referenced by send_bytes_i(), and start_i().

◆ current_data_

ShmemData* OpenDDS::DCPS::ShmemSendStrategy::current_data_
private

Definition at line 44 of file ShmemSendStrategy.h.

Referenced by send_bytes_i().

◆ datalink_control_size_

const size_t OpenDDS::DCPS::ShmemSendStrategy::datalink_control_size_
private

Definition at line 45 of file ShmemSendStrategy.h.

Referenced by start_i().

◆ link_

ShmemDataLink* OpenDDS::DCPS::ShmemSendStrategy::link_
private

Definition at line 41 of file ShmemSendStrategy.h.

Referenced by send_bytes_i(), and start_i().

◆ peer_semaphore_

ACE_sema_t OpenDDS::DCPS::ShmemSendStrategy::peer_semaphore_
private

Definition at line 43 of file ShmemSendStrategy.h.

Referenced by send_bytes_i(), ShmemSendStrategy(), start_i(), and stop_i().


The documentation for this class was generated from the following files: