OpenDDS  Snapshot(2023/04/28-20:55)
ShmemTransport.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "ShmemTransport.h"
7 
8 #include "ShmemInst.h"
9 #include "ShmemSendStrategy.h"
10 #include "ShmemReceiveStrategy.h"
11 
12 #include <dds/DCPS/debug.h>
17 
18 #include <ace/Log_Msg.h>
19 
20 #include <sstream>
21 #include <cstring>
22 
24 
25 namespace OpenDDS {
26 namespace DCPS {
27 
29  : TransportImpl(inst)
30 {
31  if (!(configure_i(inst) && open())) {
33  }
34 }
35 
38 {
40 }
41 
43 ShmemTransport::make_datalink(const std::string& remote_address)
44 {
45  ShmemDataLink_rch link = make_rch<ShmemDataLink>(rchandle_from(this));
46 
47  if (!link->open(remote_address)) {
48  if (log_level >= LogLevel::Error) {
49  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ShmemTransport::make_datalink: "
50  "failed to open DataLink!\n"));
51  }
52  return ShmemDataLink_rch();
53  }
54 
55  if (!links_.insert(ShmemDataLinkMap::value_type(remote_address, link)).second) {
56  if (log_level >= LogLevel::Error) {
57  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ShmemTransport::make_datalink: "
58  "there is an existing link for %C!\n", remote_address.c_str()));
59  }
60  return ShmemDataLink_rch();
61  }
62 
63  return link;
64 }
65 
67  const char* caller, const RemoteTransport& remote)
68 {
69  const std::pair<std::string, std::string> key = blob_to_key(remote.blob_);
70  ShmemInst_rch cfg = config();
71  if (!cfg || key.first != cfg->hostname()) {
72  if (log_level >= LogLevel::Error) {
73  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ShmemTransport::get_or_make_datalink: "
74  "%C link %C:%C not found, hostname %C.\n",
75  caller, key.first.c_str(), key.second.c_str(),
76  cfg ? cfg->hostname().c_str() : "(no config)"));
77  }
78  return ShmemDataLink_rch();
79  }
80 
81  GuardType guard(links_lock_);
82  ShmemDataLinkMap::iterator iter = links_.find(key.second);
83  const bool make = iter == links_.end();
84  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemTransport::get_or_make_datalink: %C using %C link %C:%C\n",
85  caller, make ? "new" : "existing", key.first.c_str(), key.second.c_str()), 2);
86  return make ? make_datalink(key.second) : iter->second;
87 }
88 
91  const ConnectionAttribs&,
92  const TransportClient_rch& client)
93 {
94  ShmemDataLink_rch link = get_or_make_datalink("connect_datalink", remote);
95  if (!link) {
96  return AcceptConnectResult();
97  }
98  // Wait for invoke_on_start_callbacks to actually start a writer. This is done
99  // when the writer gets an association message from the reader in the writer
100  // case of ShmemDataLink::request_ack_received.
101  link->add_on_start_callback(client, remote.repo_id_);
102  add_pending_connection(client, link);
104 }
105 
108  const ConnectionAttribs& /*attribs*/,
109  const TransportClient_rch& /*client*/)
110 {
111  return AcceptConnectResult(get_or_make_datalink("accept_datalink", remote));
112 }
113 
115  const TransportClient_wrch& client, const GUID_t& remote_id,
116  bool /*disassociate*/, bool /*association_failed*/)
117 {
118  ACE_GUARD(ACE_Thread_Mutex, links_guard, links_lock_);
120  typedef PendConnMap::iterator Iter;
121  const std::pair<Iter, Iter> range = pending_connections_.equal_range(client);
122  for (Iter iter = range.first; iter != range.second; ++iter) {
124  TransportClient_rch tc = client.lock();
125  if (tc) {
126  sdl->stop_resend_association_msgs(tc->get_guid(), remote_id);
127  }
128  }
129  pending_connections_.erase(range.first, range.second);
130 }
131 
132 bool
134 {
135  if (!config) {
136  return false;
137  }
138 
139  create_reactor_task(false, "ShmemTransport" + config->name());
140 
141 #ifdef OPENDDS_SHMEM_UNSUPPORTED
142  ACE_UNUSED_ARG(config);
143  if (log_level >= LogLevel::Error) {
144  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ShmemTransport::configure_i: "
145  "no platform support for shared memory!\n"));
146  }
147  return false;
148 #else /* OPENDDS_SHMEM_UNSUPPORTED */
149 
151 # if defined OPENDDS_SHMEM_WINDOWS
152  alloc_opts.max_size_ = config->pool_size_;
153 # elif defined OPENDDS_SHMEM_UNIX
154  alloc_opts.base_addr_ = 0;
155  alloc_opts.segment_size_ = config->pool_size_;
156  alloc_opts.minimum_bytes_ = alloc_opts.segment_size_;
157  alloc_opts.max_segments_ = 1;
158 # endif /* OPENDDS_SHMEM_WINDOWS */
159 
160  alloc_.reset(
161  new ShmemAllocator(ACE_TEXT_CHAR_TO_TCHAR(config->poolname().c_str()),
162  0 /*lock_name is optional*/, &alloc_opts));
163 
164  void* mem = alloc_->malloc(sizeof(ShmemSharedSemaphore));
165  if (mem == 0) {
166  if (log_level >= LogLevel::Error) {
167  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ShmemTrasport::configure_i: failed to allocate"
168  " space for semaphore in shared memory!\n"));
169  }
170  return false;
171  }
172 
173  ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
174  alloc_->bind("Semaphore", pSem);
175 
176  bool ok;
177 # if defined OPENDDS_SHMEM_WINDOWS
178  *pSem = ::CreateSemaphoreW(0 /*default security*/,
179  0 /*initial count*/,
180  0x7fffffff /*max count (ACE's default)*/,
181  0 /*no name*/);
182  ACE_sema_t ace_sema = *pSem;
183  ok = (*pSem != 0);
184 # elif defined OPENDDS_SHMEM_UNIX
185  ok = (0 == ::sem_init(pSem, 1 /*process shared*/, 0 /*initial count*/));
186  ACE_sema_t ace_sema;
187  std::memset(&ace_sema, 0, sizeof ace_sema);
188  ace_sema.sema_ = pSem;
189 # ifdef OPENDDS_SHMEM_UNIX_EMULATE_SEM_TIMEOUT
190  ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER;
191  ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER;
192 # endif
193 # endif /* OPENDDS_SHMEM_WINDOWS */
194  if (!ok) {
195  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
196  ACE_TEXT("ShmemTransport::configure_i: ")
197  ACE_TEXT("could not create semaphore\n")),
198  false);
199  }
200 
201  read_task_.reset(new ReadTask(this, ace_sema));
202 
203  VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemTransport %@ configured with address %C\n",
204  this, config->poolname().c_str()), 1);
205 
206  return true;
207 #endif /* OPENDDS_SHMEM_UNSUPPORTED */
208 }
209 
210 void
212 {
213  if (read_task_) {
214  read_task_->stop();
215  ThreadStatusManager::Sleeper s(TheServiceParticipant->get_thread_status_manager());
216  read_task_->wait();
217  }
218 
219  // Shutdown reserved datalinks and release configuration:
220  GuardType guard(links_lock_);
221  for (ShmemDataLinkMap::iterator it(links_.begin());
222  it != links_.end(); ++it) {
223  it->second->transport_shutdown();
224  }
225  links_.clear();
226 
227  read_task_.reset();
228 
229  if (alloc_) {
230 #ifndef OPENDDS_SHMEM_UNSUPPORTED
231  void* mem = 0;
232  alloc_->find("Semaphore", mem);
233  ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
234 # if defined OPENDDS_SHMEM_WINDOWS
235  ::CloseHandle(*pSem);
236 # elif defined OPENDDS_SHMEM_UNIX
237  ::sem_destroy(pSem);
238 # endif /* OPENDDS_SHMEM_WINDOWS */
239 #endif /* OPENDDS_SHMEM_UNSUPPORTED */
240 
241  alloc_->release(1 /*close*/);
242  alloc_.reset();
243  }
244 }
245 
246 bool
248 {
249  ShmemInst_rch cfg = config();
250  if (cfg) {
251  cfg->populate_locator(info, flags);
252  return true;
253  }
254  return false;
255 }
256 
257 std::pair<std::string, std::string>
259 {
260  const char* const c_str = reinterpret_cast<const char*>(blob.get_buffer());
261  const std::string host(c_str);
262  const size_t host_len = host.size();
263 
264  const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1);
265  return make_pair(host, pool);
266 }
267 
268 void
270 {
271  GuardType guard(links_lock_);
272  for (ShmemDataLinkMap::iterator it(links_.begin());
273  it != links_.end(); ++it) {
274  // We are guaranteed to have exactly one matching DataLink
275  // in the map; release any resources held and return.
276  if (link == static_cast<DataLink*>(it->second.in())) {
277  link->stop();
278  links_.erase(it);
279  return;
280  }
281  }
282 }
283 
285  : outer_(outer)
286  , semaphore_(semaphore)
287  , stopped_(false)
288 {
289  activate();
290 }
291 
292 int
294 {
295  ThreadStatusManager::Start s(TheServiceParticipant->get_thread_status_manager(), "ShmemTransport");
296 
297  while (!stopped_) {
299  if (stopped_) {
300  return 0;
301  }
303  }
304  return 0;
305 }
306 
307 void
309 {
310  if (stopped_) {
311  return;
312  }
313  stopped_ = true;
315  ThreadStatusManager::Sleeper s(TheServiceParticipant->get_thread_status_manager());;
316  wait();
317 }
318 
319 void
321 {
322  if (stopped_) {
323  return;
324  }
326 }
327 
328 void
330 {
331  std::vector<ShmemDataLink_rch> dl_copies;
332  {
333  GuardType guard(links_lock_);
334  typedef ShmemDataLinkMap::iterator iter_t;
335  for (iter_t it = links_.begin(); it != links_.end(); ++it) {
336  dl_copies.push_back(it->second);
337  }
338  }
339 
340  typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t;
341  for (dl_iter_t dl_it = dl_copies.begin(); !is_shut_down() && dl_it != dl_copies.end(); ++dl_it) {
342  dl_it->in()->read();
343  }
344 }
345 
346 void
348 {
349  if (is_shut_down()) {
350  return;
351  }
352  read_task_->signal_semaphore();
353 }
354 
355 std::string
357 {
358  ShmemInst_rch cfg = config();
359  return cfg ? cfg->poolname() : std::string();
360 }
361 
362 } // namespace DCPS
363 } // namespace OpenDDS
364 
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
ACE_MEM_POOL_OPTIONS MEMORY_POOL_OPTIONS
ShmemDataLink_rch get_or_make_datalink(const char *caller, const RemoteTransport &remote)
ShmemTransport(const ShmemInst_rch &inst)
RcHandle< ShmemDataLink > ShmemDataLink_rch
unique_ptr< ShmemAllocator > alloc_
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
sequence< octet > key
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
virtual bool connection_info_i(TransportLocator &info, ConnectionInfoFlags flags) const
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
int ShmemSharedSemaphore
std::pair< std::string, std::string > blob_to_key(const TransportBLOB &blob)
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
LM_DEBUG
ShmemDataLink_rch make_datalink(const std::string &remote_address)
Create the DataLink object and start it.
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
virtual void release_datalink(DataLink *link)
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.
virtual int wait(void)
ACE_TEXT("TCP_Factory")
ReadTask(ShmemTransport *outer, ACE_sema_t semaphore)
bool configure_i(const ShmemInst_rch &config)
OpenDDS_Dcps_Export LogLevel log_level
void stop()
The stop method is used to stop the DataLink prior to shutdown.
Definition: DataLink.cpp:355
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int sema_post(ACE_sema_t *s)
int sema_wait(ACE_sema_t *s)
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > lock() const
Definition: RcObject.h:188
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
#define TheServiceParticipant
DDS::OctetSeq TransportBLOB
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
ShmemInst_rch config() const
unique_ptr< ReadTask > read_task_
size_t ConnectionInfoFlags
TransportInst_rch config() const