47 if (!link->open(remote_address)) {
50 "failed to open DataLink!\n"));
55 if (!
links_.insert(ShmemDataLinkMap::value_type(remote_address, link)).second) {
58 "there is an existing link for %C!\n", remote_address.c_str()));
71 if (!cfg || key.first != cfg->hostname()) {
74 "%C link %C:%C not found, hostname %C.\n",
75 caller, key.first.c_str(), key.second.c_str(),
76 cfg ? cfg->hostname().c_str() :
"(no config)"));
82 ShmemDataLinkMap::iterator iter =
links_.find(key.second);
83 const bool make = iter ==
links_.end();
84 VDBG_LVL((
LM_DEBUG,
"(%P|%t) ShmemTransport::get_or_make_datalink: %C using %C link %C:%C\n",
85 caller, make ?
"new" :
"existing", key.first.c_str(), key.second.c_str()), 2);
101 link->add_on_start_callback(client, remote.
repo_id_);
120 typedef PendConnMap::iterator Iter;
122 for (Iter iter = range.first; iter != range.second; ++iter) {
126 sdl->stop_resend_association_msgs(tc->get_guid(), remote_id);
141 #ifdef OPENDDS_SHMEM_UNSUPPORTED 142 ACE_UNUSED_ARG(config);
145 "no platform support for shared memory!\n"));
151 # if defined OPENDDS_SHMEM_WINDOWS 152 alloc_opts.max_size_ = config->pool_size_;
153 # elif defined OPENDDS_SHMEM_UNIX 154 alloc_opts.base_addr_ = 0;
155 alloc_opts.segment_size_ = config->pool_size_;
156 alloc_opts.minimum_bytes_ = alloc_opts.segment_size_;
157 alloc_opts.max_segments_ = 1;
167 ACE_ERROR((
LM_ERROR,
"(%P|%t) ERROR: ShmemTrasport::configure_i: failed to allocate" 168 " space for semaphore in shared memory!\n"));
174 alloc_->bind(
"Semaphore", pSem);
177 # if defined OPENDDS_SHMEM_WINDOWS 178 *pSem = ::CreateSemaphoreW(0 ,
182 ACE_sema_t ace_sema = *pSem;
184 # elif defined OPENDDS_SHMEM_UNIX 185 ok = (0 == ::sem_init(pSem, 1 , 0 ));
187 std::memset(&ace_sema, 0,
sizeof ace_sema);
188 ace_sema.sema_ = pSem;
189 # ifdef OPENDDS_SHMEM_UNIX_EMULATE_SEM_TIMEOUT 190 ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER;
191 ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER;
196 ACE_TEXT(
"ShmemTransport::configure_i: ")
197 ACE_TEXT(
"could not create semaphore\n")),
203 VDBG_LVL((
LM_DEBUG,
"(%P|%t) ShmemTransport %@ configured with address %C\n",
204 this, config->poolname().c_str()), 1);
221 for (ShmemDataLinkMap::iterator it(
links_.begin());
222 it !=
links_.end(); ++it) {
223 it->second->transport_shutdown();
230 #ifndef OPENDDS_SHMEM_UNSUPPORTED 232 alloc_->find(
"Semaphore", mem);
234 # if defined OPENDDS_SHMEM_WINDOWS 235 ::CloseHandle(*pSem);
236 # elif defined OPENDDS_SHMEM_UNIX 251 cfg->populate_locator(info, flags);
257 std::pair<std::string, std::string>
260 const char*
const c_str =
reinterpret_cast<const char*
>(blob.get_buffer());
261 const std::string host(c_str);
262 const size_t host_len = host.size();
264 const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1);
265 return make_pair(host, pool);
272 for (ShmemDataLinkMap::iterator it(
links_.begin());
273 it !=
links_.end(); ++it) {
276 if (link == static_cast<DataLink*>(it->second.in())) {
286 , semaphore_(semaphore)
331 std::vector<ShmemDataLink_rch> dl_copies;
334 typedef ShmemDataLinkMap::iterator iter_t;
335 for (iter_t it =
links_.begin(); it !=
links_.end(); ++it) {
336 dl_copies.push_back(it->second);
340 typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t;
341 for (dl_iter_t dl_it = dl_copies.begin(); !
is_shut_down() && dl_it != dl_copies.end(); ++dl_it) {
359 return cfg ? cfg->poolname() : std::string();
RcHandle< T > rchandle_from(T *pointer)
PendConnMap pending_connections_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
ACE_MEM_POOL_OPTIONS MEMORY_POOL_OPTIONS
ShmemDataLink_rch get_or_make_datalink(const char *caller, const RemoteTransport &remote)
ShmemTransport(const ShmemInst_rch &inst)
RcHandle< ShmemDataLink > ShmemDataLink_rch
unique_ptr< ShmemAllocator > alloc_
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
virtual bool connection_info_i(TransportLocator &info, ConnectionInfoFlags flags) const
virtual void shutdown_i()
ACE_Malloc_T< ShmemPool, ACE_Process_Mutex, ACE_PI_Control_Block > ShmemAllocator
std::pair< std::string, std::string > blob_to_key(const TransportBLOB &blob)
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
ShmemDataLink_rch make_datalink(const std::string &remote_address)
Create the DataLink object and start it.
bool is_shut_down() const
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
virtual void release_datalink(DataLink *link)
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.
ReadTask(ShmemTransport *outer, ACE_sema_t semaphore)
bool configure_i(const ShmemInst_rch &config)
OpenDDS_Dcps_Export LogLevel log_level
void stop()
The stop method is used to stop the DataLink prior to shutdown.
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int sema_post(ACE_sema_t *s)
int sema_wait(ACE_sema_t *s)
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > lock() const
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
#define TheServiceParticipant
DDS::OctetSeq TransportBLOB
The Internal API and Implementation of OpenDDS.
ShmemInst_rch config() const
unique_ptr< ReadTask > read_task_
size_t ConnectionInfoFlags
TransportInst_rch config() const