ShmemTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "ShmemTransport.h"
00009 #include "ShmemInst.h"
00010 #include "ShmemSendStrategy.h"
00011 #include "ShmemReceiveStrategy.h"
00012 
00013 #include "dds/DCPS/AssociationData.h"
00014 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00015 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00016 
00017 #include "ace/Log_Msg.h"
00018 
00019 #include <sstream>
00020 #include <cstring>
00021 
00022 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00023 
00024 namespace OpenDDS {
00025 namespace DCPS {
00026 
00027 ShmemTransport::ShmemTransport(ShmemInst& inst)
00028   : TransportImpl(inst)
00029 {
00030   if (! (configure_i(inst) && open()) ) {
00031     throw Transport::UnableToCreate();
00032   }
00033 }
00034 
00035 ShmemInst&
00036 ShmemTransport::config() const
00037 {
00038   return static_cast<ShmemInst&>(TransportImpl::config());
00039 }
00040 
00041 ShmemDataLink_rch
00042 ShmemTransport::make_datalink(const std::string& remote_address)
00043 {
00044 
00045   ShmemDataLink_rch link = make_rch<ShmemDataLink>(ref(*this));
00046 
00047   // Open logical connection:
00048   if (link->open(remote_address))
00049     return link;
00050 
00051   ACE_ERROR((LM_ERROR,
00052                     ACE_TEXT("(%P|%t) ERROR: ")
00053                     ACE_TEXT("ShmemTransport::make_datalink: ")
00054                     ACE_TEXT("failed to open DataLink!\n")));
00055   return ShmemDataLink_rch();
00056 }
00057 
00058 TransportImpl::AcceptConnectResult
00059 ShmemTransport::connect_datalink(const RemoteTransport& remote,
00060                                  const ConnectionAttribs&,
00061                                  const TransportClient_rch&)
00062 {
00063   const std::pair<std::string, std::string> key = blob_to_key(remote.blob_);
00064   if (key.first != this->config().hostname()) {
00065     return AcceptConnectResult();
00066   }
00067   GuardType guard(links_lock_);
00068   ShmemDataLinkMap::iterator iter = links_.find(key.second);
00069   if (iter != links_.end()) {
00070     ShmemDataLink_rch link = iter->second;
00071     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00072               ACE_TEXT("link found.\n")), 2);
00073     return AcceptConnectResult(link);
00074   }
00075     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00076               ACE_TEXT("new link.\n")), 2);
00077   return AcceptConnectResult(add_datalink(key.second));
00078 }
00079 
00080 DataLink_rch
00081 ShmemTransport::add_datalink(const std::string& remote_address)
00082 {
00083   ShmemDataLink_rch link = make_datalink(remote_address);
00084   links_.insert(ShmemDataLinkMap::value_type(remote_address, link));
00085   return link;
00086 }
00087 
00088 TransportImpl::AcceptConnectResult
00089 ShmemTransport::accept_datalink(const RemoteTransport& remote,
00090                                 const ConnectionAttribs& attribs,
00091                                 const TransportClient_rch& client)
00092 {
00093   return connect_datalink(remote, attribs, client);
00094 }
00095 
00096 void
00097 ShmemTransport::stop_accepting_or_connecting(const TransportClient_wrch&, const RepoId&)
00098 {
00099   // no-op: accept and connect either complete or fail immediately
00100 }
00101 
00102 bool
00103 ShmemTransport::configure_i(ShmemInst& config)
00104 {
00105 #if (!defined ACE_WIN32 && defined ACE_LACKS_SYSV_SHMEM) || defined ACE_HAS_WINCE
00106   ACE_UNUSED_ARG(config);
00107   ACE_ERROR_RETURN((LM_ERROR,
00108                     ACE_TEXT("(%P|%t) ERROR: ")
00109                     ACE_TEXT("ShmemTransport::configure_i: ")
00110                     ACE_TEXT("no platform support for shared memory!\n")),
00111                    false);
00112 #else
00113 
00114   ShmemAllocator::MEMORY_POOL_OPTIONS alloc_opts;
00115 # ifdef ACE_WIN32
00116   alloc_opts.max_size_ = config.pool_size_;
00117 # elif !defined ACE_LACKS_SYSV_SHMEM
00118   alloc_opts.base_addr_ = 0;
00119   alloc_opts.segment_size_ = config.pool_size_;
00120   alloc_opts.minimum_bytes_ = alloc_opts.segment_size_;
00121   alloc_opts.max_segments_ = 1;
00122 # endif
00123 
00124   alloc_.reset(
00125     new ShmemAllocator(ACE_TEXT_CHAR_TO_TCHAR(config.poolname().c_str()),
00126                        0 /*lock_name is optional*/, &alloc_opts));
00127 
00128   void* mem = alloc_->malloc(sizeof(ShmemSharedSemaphore));
00129   if (mem == 0) {
00130     ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00131                       ACE_TEXT("ShmemTrasport::configure_i: failed to allocate")
00132                       ACE_TEXT(" space for semaphore in shared memory!\n")),
00133                      false);
00134   }
00135 
00136   ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00137   alloc_->bind("Semaphore", pSem);
00138 
00139   bool ok;
00140 # ifdef ACE_WIN32
00141   *pSem = ::CreateSemaphoreW(0 /*default security*/,
00142                              0 /*initial count*/,
00143                              0x7fffffff /*max count (ACE's default)*/,
00144                              0 /*no name*/);
00145   ACE_sema_t ace_sema = *pSem;
00146   ok = (*pSem != 0);
00147 # elif !defined ACE_LACKS_UNNAMED_SEMAPHORE
00148   ok = (0 == ::sem_init(pSem, 1 /*process shared*/, 0 /*initial count*/));
00149   ACE_sema_t ace_sema;
00150   std::memset(&ace_sema, 0, sizeof ace_sema);
00151   ace_sema.sema_ = pSem;
00152 #  if !defined (ACE_HAS_POSIX_SEM_TIMEOUT) && !defined (ACE_DISABLE_POSIX_SEM_TIMEOUT_EMULATION)
00153   ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER;
00154   ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER;
00155 #  endif
00156 # else
00157   ok = false;
00158   ACE_sema_t ace_sema;
00159 # endif
00160   if (!ok) {
00161     ACE_ERROR_RETURN((LM_ERROR,
00162                       ACE_TEXT("(%P|%t) ERROR: ")
00163                       ACE_TEXT("ShmemTransport::configure_i: ")
00164                       ACE_TEXT("could not create semaphore\n")),
00165                      false);
00166   }
00167 
00168   read_task_.reset(new ReadTask(this, ace_sema));
00169 
00170   VDBG_LVL((LM_INFO, "(%P|%t) ShmemTransport %@ configured with address %C\n",
00171             this, config.poolname().c_str()), 1);
00172 
00173   return true;
00174 #endif
00175 }
00176 
00177 void
00178 ShmemTransport::shutdown_i()
00179 {
00180   // Shutdown reserved datalinks and release configuration:
00181   GuardType guard(links_lock_);
00182   if (read_task_) read_task_->stop();
00183 
00184   for (ShmemDataLinkMap::iterator it(links_.begin());
00185        it != links_.end(); ++it) {
00186     it->second->transport_shutdown();
00187   }
00188   links_.clear();
00189 
00190   read_task_.reset();
00191 
00192   if (alloc_) {
00193     void* mem = 0;
00194     alloc_->find("Semaphore", mem);
00195     ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00196 #ifdef ACE_WIN32
00197     ::CloseHandle(*pSem);
00198 #elif defined ACE_HAS_POSIX_SEM && !defined ACE_LACKS_UNNAMED_SEMAPHORE
00199     ::sem_destroy(pSem);
00200 #else
00201     ACE_UNUSED_ARG(pSem);
00202 #endif
00203 
00204     alloc_->release(1 /*close*/);
00205     alloc_.reset();
00206   }
00207 }
00208 
00209 bool
00210 ShmemTransport::connection_info_i(TransportLocator& info) const
00211 {
00212   this->config().populate_locator(info);
00213   return true;
00214 }
00215 
00216 std::pair<std::string, std::string>
00217 ShmemTransport::blob_to_key(const TransportBLOB& blob)
00218 {
00219   const char* const c_str = reinterpret_cast<const char*>(blob.get_buffer());
00220   const std::string host(c_str);
00221   const size_t host_len = host.size();
00222 
00223   const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1);
00224   return make_pair(host, pool);
00225 }
00226 
00227 void
00228 ShmemTransport::release_datalink(DataLink* link)
00229 {
00230   GuardType guard(links_lock_);
00231   for (ShmemDataLinkMap::iterator it(links_.begin());
00232        it != links_.end(); ++it) {
00233     // We are guaranteed to have exactly one matching DataLink
00234     // in the map; release any resources held and return.
00235     if (link == static_cast<DataLink*>(it->second.in())) {
00236       link->stop();
00237       links_.erase(it);
00238       return;
00239     }
00240   }
00241 }
00242 
00243 ShmemTransport::ReadTask::ReadTask(ShmemTransport* outer, ACE_sema_t semaphore)
00244   : outer_(outer)
00245   , semaphore_(semaphore)
00246   , stopped_(false)
00247 {
00248   activate();
00249 }
00250 
00251 int
00252 ShmemTransport::ReadTask::svc()
00253 {
00254   while (true) {
00255     ACE_OS::sema_wait(&semaphore_);
00256     if (stopped_) {
00257       return 0;
00258     }
00259     outer_->read_from_links();
00260   }
00261   return 1;
00262 }
00263 
00264 void
00265 ShmemTransport::ReadTask::stop()
00266 {
00267   stopped_ = true;
00268   ACE_OS::sema_post(&semaphore_);
00269   wait();
00270 }
00271 
00272 void
00273 ShmemTransport::read_from_links()
00274 {
00275   std::vector<ShmemDataLink_rch> dl_copies;
00276   {
00277     GuardType guard(links_lock_);
00278     typedef ShmemDataLinkMap::iterator iter_t;
00279     for (iter_t it = links_.begin(); it != links_.end(); ++it) {
00280       dl_copies.push_back(it->second);
00281     }
00282   }
00283 
00284   typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t;
00285   for (dl_iter_t dl_it = dl_copies.begin(); dl_it != dl_copies.end(); ++dl_it) {
00286     dl_it->in()->read();
00287   }
00288 }
00289 
00290 void
00291 ShmemTransport::signal_semaphore()
00292 {
00293   ACE_OS::sema_post(&read_task_->semaphore_);
00294 }
00295 
00296 std::string
00297 ShmemTransport::address()
00298 {
00299   return this->config().poolname();
00300 }
00301 
00302 } // namespace DCPS
00303 } // namespace OpenDDS
00304 
00305 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1