ShmemTransport.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "ShmemTransport.h"
00009 #include "ShmemInst.h"
00010 #include "ShmemSendStrategy.h"
00011 #include "ShmemReceiveStrategy.h"
00012
00013 #include "dds/DCPS/AssociationData.h"
00014 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00015 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00016
00017 #include "ace/Log_Msg.h"
00018
00019 #include <sstream>
00020 #include <cstring>
00021
00022 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00023
00024 namespace OpenDDS {
00025 namespace DCPS {
00026
00027 ShmemTransport::ShmemTransport(ShmemInst& inst)
00028 : TransportImpl(inst)
00029 {
00030 if (! (configure_i(inst) && open()) ) {
00031 throw Transport::UnableToCreate();
00032 }
00033 }
00034
00035 ShmemInst&
00036 ShmemTransport::config() const
00037 {
00038 return static_cast<ShmemInst&>(TransportImpl::config());
00039 }
00040
00041 ShmemDataLink_rch
00042 ShmemTransport::make_datalink(const std::string& remote_address)
00043 {
00044
00045 ShmemDataLink_rch link = make_rch<ShmemDataLink>(ref(*this));
00046
00047
00048 if (link->open(remote_address))
00049 return link;
00050
00051 ACE_ERROR((LM_ERROR,
00052 ACE_TEXT("(%P|%t) ERROR: ")
00053 ACE_TEXT("ShmemTransport::make_datalink: ")
00054 ACE_TEXT("failed to open DataLink!\n")));
00055 return ShmemDataLink_rch();
00056 }
00057
00058 TransportImpl::AcceptConnectResult
00059 ShmemTransport::connect_datalink(const RemoteTransport& remote,
00060 const ConnectionAttribs&,
00061 const TransportClient_rch&)
00062 {
00063 const std::pair<std::string, std::string> key = blob_to_key(remote.blob_);
00064 if (key.first != this->config().hostname()) {
00065 return AcceptConnectResult();
00066 }
00067 GuardType guard(links_lock_);
00068 ShmemDataLinkMap::iterator iter = links_.find(key.second);
00069 if (iter != links_.end()) {
00070 ShmemDataLink_rch link = iter->second;
00071 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00072 ACE_TEXT("link found.\n")), 2);
00073 return AcceptConnectResult(link);
00074 }
00075 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00076 ACE_TEXT("new link.\n")), 2);
00077 return AcceptConnectResult(add_datalink(key.second));
00078 }
00079
00080 DataLink_rch
00081 ShmemTransport::add_datalink(const std::string& remote_address)
00082 {
00083 ShmemDataLink_rch link = make_datalink(remote_address);
00084 links_.insert(ShmemDataLinkMap::value_type(remote_address, link));
00085 return link;
00086 }
00087
00088 TransportImpl::AcceptConnectResult
00089 ShmemTransport::accept_datalink(const RemoteTransport& remote,
00090 const ConnectionAttribs& attribs,
00091 const TransportClient_rch& client)
00092 {
00093 return connect_datalink(remote, attribs, client);
00094 }
00095
00096 void
00097 ShmemTransport::stop_accepting_or_connecting(const TransportClient_wrch&, const RepoId&)
00098 {
00099
00100 }
00101
00102 bool
00103 ShmemTransport::configure_i(ShmemInst& config)
00104 {
00105 #if (!defined ACE_WIN32 && defined ACE_LACKS_SYSV_SHMEM) || defined ACE_HAS_WINCE
00106 ACE_UNUSED_ARG(config);
00107 ACE_ERROR_RETURN((LM_ERROR,
00108 ACE_TEXT("(%P|%t) ERROR: ")
00109 ACE_TEXT("ShmemTransport::configure_i: ")
00110 ACE_TEXT("no platform support for shared memory!\n")),
00111 false);
00112 #else
00113
00114 ShmemAllocator::MEMORY_POOL_OPTIONS alloc_opts;
00115 # ifdef ACE_WIN32
00116 alloc_opts.max_size_ = config.pool_size_;
00117 # elif !defined ACE_LACKS_SYSV_SHMEM
00118 alloc_opts.base_addr_ = 0;
00119 alloc_opts.segment_size_ = config.pool_size_;
00120 alloc_opts.minimum_bytes_ = alloc_opts.segment_size_;
00121 alloc_opts.max_segments_ = 1;
00122 # endif
00123
00124 alloc_.reset(
00125 new ShmemAllocator(ACE_TEXT_CHAR_TO_TCHAR(config.poolname().c_str()),
00126 0 , &alloc_opts));
00127
00128 void* mem = alloc_->malloc(sizeof(ShmemSharedSemaphore));
00129 if (mem == 0) {
00130 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00131 ACE_TEXT("ShmemTrasport::configure_i: failed to allocate")
00132 ACE_TEXT(" space for semaphore in shared memory!\n")),
00133 false);
00134 }
00135
00136 ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00137 alloc_->bind("Semaphore", pSem);
00138
00139 bool ok;
00140 # ifdef ACE_WIN32
00141 *pSem = ::CreateSemaphoreW(0 ,
00142 0 ,
00143 0x7fffffff ,
00144 0 );
00145 ACE_sema_t ace_sema = *pSem;
00146 ok = (*pSem != 0);
00147 # elif !defined ACE_LACKS_UNNAMED_SEMAPHORE
00148 ok = (0 == ::sem_init(pSem, 1 , 0 ));
00149 ACE_sema_t ace_sema;
00150 std::memset(&ace_sema, 0, sizeof ace_sema);
00151 ace_sema.sema_ = pSem;
00152 # if !defined (ACE_HAS_POSIX_SEM_TIMEOUT) && !defined (ACE_DISABLE_POSIX_SEM_TIMEOUT_EMULATION)
00153 ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER;
00154 ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER;
00155 # endif
00156 # else
00157 ok = false;
00158 ACE_sema_t ace_sema;
00159 # endif
00160 if (!ok) {
00161 ACE_ERROR_RETURN((LM_ERROR,
00162 ACE_TEXT("(%P|%t) ERROR: ")
00163 ACE_TEXT("ShmemTransport::configure_i: ")
00164 ACE_TEXT("could not create semaphore\n")),
00165 false);
00166 }
00167
00168 read_task_.reset(new ReadTask(this, ace_sema));
00169
00170 VDBG_LVL((LM_INFO, "(%P|%t) ShmemTransport %@ configured with address %C\n",
00171 this, config.poolname().c_str()), 1);
00172
00173 return true;
00174 #endif
00175 }
00176
00177 void
00178 ShmemTransport::shutdown_i()
00179 {
00180
00181 GuardType guard(links_lock_);
00182 if (read_task_) read_task_->stop();
00183
00184 for (ShmemDataLinkMap::iterator it(links_.begin());
00185 it != links_.end(); ++it) {
00186 it->second->transport_shutdown();
00187 }
00188 links_.clear();
00189
00190 read_task_.reset();
00191
00192 if (alloc_) {
00193 void* mem = 0;
00194 alloc_->find("Semaphore", mem);
00195 ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00196 #ifdef ACE_WIN32
00197 ::CloseHandle(*pSem);
00198 #elif defined ACE_HAS_POSIX_SEM && !defined ACE_LACKS_UNNAMED_SEMAPHORE
00199 ::sem_destroy(pSem);
00200 #else
00201 ACE_UNUSED_ARG(pSem);
00202 #endif
00203
00204 alloc_->release(1 );
00205 alloc_.reset();
00206 }
00207 }
00208
00209 bool
00210 ShmemTransport::connection_info_i(TransportLocator& info) const
00211 {
00212 this->config().populate_locator(info);
00213 return true;
00214 }
00215
00216 std::pair<std::string, std::string>
00217 ShmemTransport::blob_to_key(const TransportBLOB& blob)
00218 {
00219 const char* const c_str = reinterpret_cast<const char*>(blob.get_buffer());
00220 const std::string host(c_str);
00221 const size_t host_len = host.size();
00222
00223 const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1);
00224 return make_pair(host, pool);
00225 }
00226
00227 void
00228 ShmemTransport::release_datalink(DataLink* link)
00229 {
00230 GuardType guard(links_lock_);
00231 for (ShmemDataLinkMap::iterator it(links_.begin());
00232 it != links_.end(); ++it) {
00233
00234
00235 if (link == static_cast<DataLink*>(it->second.in())) {
00236 link->stop();
00237 links_.erase(it);
00238 return;
00239 }
00240 }
00241 }
00242
00243 ShmemTransport::ReadTask::ReadTask(ShmemTransport* outer, ACE_sema_t semaphore)
00244 : outer_(outer)
00245 , semaphore_(semaphore)
00246 , stopped_(false)
00247 {
00248 activate();
00249 }
00250
00251 int
00252 ShmemTransport::ReadTask::svc()
00253 {
00254 while (true) {
00255 ACE_OS::sema_wait(&semaphore_);
00256 if (stopped_) {
00257 return 0;
00258 }
00259 outer_->read_from_links();
00260 }
00261 return 1;
00262 }
00263
00264 void
00265 ShmemTransport::ReadTask::stop()
00266 {
00267 stopped_ = true;
00268 ACE_OS::sema_post(&semaphore_);
00269 wait();
00270 }
00271
00272 void
00273 ShmemTransport::read_from_links()
00274 {
00275 std::vector<ShmemDataLink_rch> dl_copies;
00276 {
00277 GuardType guard(links_lock_);
00278 typedef ShmemDataLinkMap::iterator iter_t;
00279 for (iter_t it = links_.begin(); it != links_.end(); ++it) {
00280 dl_copies.push_back(it->second);
00281 }
00282 }
00283
00284 typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t;
00285 for (dl_iter_t dl_it = dl_copies.begin(); dl_it != dl_copies.end(); ++dl_it) {
00286 dl_it->in()->read();
00287 }
00288 }
00289
00290 void
00291 ShmemTransport::signal_semaphore()
00292 {
00293 ACE_OS::sema_post(&read_task_->semaphore_);
00294 }
00295
00296 std::string
00297 ShmemTransport::address()
00298 {
00299 return this->config().poolname();
00300 }
00301
00302 }
00303 }
00304
00305 OPENDDS_END_VERSIONED_NAMESPACE_DECL