ShmemTransport.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "ShmemTransport.h"
00009 #include "ShmemInst.h"
00010 #include "ShmemSendStrategy.h"
00011 #include "ShmemReceiveStrategy.h"
00012 
00013 #include "dds/DCPS/AssociationData.h"
00014 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00015 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00016 
00017 #include "ace/Log_Msg.h"
00018 
00019 #include <sstream>
00020 #include <cstring>
00021 
00022 namespace OpenDDS {
00023 namespace DCPS {
00024 
00025 ShmemTransport::ShmemTransport(const TransportInst_rch& inst)
00026   : alloc_(0)
00027   , read_task_(0)
00028 {
00029   if (!inst.is_nil()) {
00030     if (!configure(inst.in())) {
00031       throw Transport::UnableToCreate();
00032     }
00033   }
00034 }
00035 
00036 ShmemDataLink*
00037 ShmemTransport::make_datalink(const std::string& remote_address)
00038 {
00039   ShmemDataLink_rch link;
00040   ACE_NEW_RETURN(link, ShmemDataLink(this), 0);
00041 
00042   if (link.is_nil()) {
00043     ACE_ERROR_RETURN((LM_ERROR,
00044                       ACE_TEXT("(%P|%t) ERROR: ")
00045                       ACE_TEXT("ShmemTransport::make_datalink: ")
00046                       ACE_TEXT("failed to create DataLink!\n")),
00047                      0);
00048   }
00049 
00050   link->configure(config_i_.in());
00051 
00052   // Assign send strategy:
00053   ShmemSendStrategy* send_strategy;
00054   ACE_NEW_RETURN(send_strategy, ShmemSendStrategy(link.in()), 0);
00055   link->send_strategy(send_strategy);
00056 
00057   // Assign receive strategy:
00058   ShmemReceiveStrategy* recv_strategy;
00059   ACE_NEW_RETURN(recv_strategy, ShmemReceiveStrategy(link.in()), 0);
00060   link->receive_strategy(recv_strategy);
00061 
00062   // Open logical connection:
00063   if (!link->open(remote_address)) {
00064     ACE_ERROR_RETURN((LM_ERROR,
00065                       ACE_TEXT("(%P|%t) ERROR: ")
00066                       ACE_TEXT("ShmemTransport::make_datalink: ")
00067                       ACE_TEXT("failed to open DataLink!\n")),
00068                      0);
00069   }
00070 
00071   return link._retn();
00072 }
00073 
00074 TransportImpl::AcceptConnectResult
00075 ShmemTransport::connect_datalink(const RemoteTransport& remote,
00076                                  const ConnectionAttribs&,
00077                                  TransportClient*)
00078 {
00079   const std::pair<std::string, std::string> key = blob_to_key(remote.blob_);
00080   if (key.first != this->config_i_->hostname()) {
00081     return AcceptConnectResult();
00082   }
00083   GuardType guard(links_lock_);
00084   ShmemDataLinkMap::iterator iter = links_.find(key.second);
00085   if (iter != links_.end()) {
00086     ShmemDataLink_rch link = iter->second;
00087     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00088               ACE_TEXT("link found.\n")), 2);
00089     return AcceptConnectResult(link._retn());
00090   }
00091     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00092               ACE_TEXT("new link.\n")), 2);
00093   return AcceptConnectResult(add_datalink(key.second));
00094 }
00095 
00096 DataLink*
00097 ShmemTransport::add_datalink(const std::string& remote_address)
00098 {
00099   ShmemDataLink_rch link = make_datalink(remote_address);
00100   links_.insert(ShmemDataLinkMap::value_type(remote_address, link));
00101   return link._retn();
00102 }
00103 
00104 TransportImpl::AcceptConnectResult
00105 ShmemTransport::accept_datalink(const RemoteTransport& remote,
00106                                 const ConnectionAttribs& attribs,
00107                                 TransportClient* client)
00108 {
00109   return connect_datalink(remote, attribs, client);
00110 }
00111 
00112 void
00113 ShmemTransport::stop_accepting_or_connecting(TransportClient*, const RepoId&)
00114 {
00115   // no-op: accept and connect either complete or fail immediately
00116 }
00117 
00118 bool
00119 ShmemTransport::configure_i(TransportInst* config)
00120 {
00121 #if (!defined ACE_WIN32 && defined ACE_LACKS_SYSV_SHMEM) || defined ACE_HAS_WINCE
00122   ACE_UNUSED_ARG(config);
00123   ACE_ERROR_RETURN((LM_ERROR,
00124                     ACE_TEXT("(%P|%t) ERROR: ")
00125                     ACE_TEXT("ShmemTransport::configure_i: ")
00126                     ACE_TEXT("no platform support for shared memory!\n")),
00127                    false);
00128 #else
00129   config_i_ = dynamic_cast<ShmemInst*>(config);
00130   if (config_i_ == 0) {
00131     ACE_ERROR_RETURN((LM_ERROR,
00132                       ACE_TEXT("(%P|%t) ERROR: ")
00133                       ACE_TEXT("ShmemTransport::configure_i: ")
00134                       ACE_TEXT("invalid configuration!\n")),
00135                      false);
00136   }
00137   config_i_->_add_ref();
00138 
00139   ShmemAllocator::MEMORY_POOL_OPTIONS alloc_opts;
00140 # ifdef ACE_WIN32
00141   alloc_opts.max_size_ = config_i_->pool_size_;
00142 # elif !defined ACE_LACKS_SYSV_SHMEM
00143   alloc_opts.base_addr_ = 0;
00144   alloc_opts.segment_size_ = config_i_->pool_size_;
00145   alloc_opts.minimum_bytes_ = alloc_opts.segment_size_;
00146   alloc_opts.max_segments_ = 1;
00147 # endif
00148 
00149   alloc_ =
00150     new ShmemAllocator(ACE_TEXT_CHAR_TO_TCHAR(this->config_i_->poolname().c_str()),
00151                        0 /*lock_name is optional*/, &alloc_opts);
00152 
00153   void* mem = alloc_->malloc(sizeof(ShmemSharedSemaphore));
00154   if (mem == 0) {
00155     ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00156                       ACE_TEXT("ShmemTrasport::configure_i: failed to allocate")
00157                       ACE_TEXT(" space for semaphore in shared memory!\n")),
00158                      false);
00159   }
00160 
00161   ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00162   alloc_->bind("Semaphore", pSem);
00163 
00164   bool ok;
00165 # ifdef ACE_WIN32
00166   *pSem = ::CreateSemaphoreW(0 /*default security*/,
00167                              0 /*initial count*/,
00168                              0x7fffffff /*max count (ACE's default)*/,
00169                              0 /*no name*/);
00170   ACE_sema_t ace_sema = *pSem;
00171   ok = (*pSem != 0);
00172 # elif !defined ACE_LACKS_UNNAMED_SEMAPHORE
00173   ok = (0 == ::sem_init(pSem, 1 /*process shared*/, 0 /*initial count*/));
00174   ACE_sema_t ace_sema;
00175   std::memset(&ace_sema, 0, sizeof ace_sema);
00176   ace_sema.sema_ = pSem;
00177 #  if !defined (ACE_HAS_POSIX_SEM_TIMEOUT) && !defined (ACE_DISABLE_POSIX_SEM_TIMEOUT_EMULATION)
00178   ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER;
00179   ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER;
00180 #  endif
00181 # else
00182   ok = false;
00183   ACE_sema_t ace_sema;
00184 # endif
00185   if (!ok) {
00186     ACE_ERROR_RETURN((LM_ERROR,
00187                       ACE_TEXT("(%P|%t) ERROR: ")
00188                       ACE_TEXT("ShmemTransport::configure_i: ")
00189                       ACE_TEXT("could not create semaphore\n")),
00190                      false);
00191   }
00192 
00193   read_task_ = new ReadTask(this, ace_sema);
00194 
00195   VDBG_LVL((LM_INFO, "(%P|%t) ShmemTransport %@ configured with address %C\n",
00196             this, this->config_i_->poolname().c_str()), 1);
00197 
00198   return true;
00199 #endif
00200 }
00201 
00202 void
00203 ShmemTransport::shutdown_i()
00204 {
00205   // Shutdown reserved datalinks and release configuration:
00206   GuardType guard(links_lock_);
00207   for (ShmemDataLinkMap::iterator it(links_.begin());
00208        it != links_.end(); ++it) {
00209     it->second->transport_shutdown();
00210   }
00211   links_.clear();
00212 
00213   if (read_task_) read_task_->stop();
00214   delete read_task_;
00215   read_task_ = 0;
00216 
00217   void* mem = 0;
00218   alloc_->find("Semaphore", mem);
00219   ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00220 #ifdef ACE_WIN32
00221   ::CloseHandle(*pSem);
00222 #elif defined ACE_HAS_POSIX_SEM && !defined ACE_LACKS_UNNAMED_SEMAPHORE
00223   ::sem_destroy(pSem);
00224 #else
00225   ACE_UNUSED_ARG(pSem);
00226 #endif
00227 
00228   alloc_->release(1 /*close*/);
00229   delete alloc_;
00230   alloc_ = 0;
00231 
00232   config_i_ = 0;
00233 }
00234 
00235 bool
00236 ShmemTransport::connection_info_i(TransportLocator& info) const
00237 {
00238   this->config_i_->populate_locator(info);
00239   return true;
00240 }
00241 
00242 std::pair<std::string, std::string>
00243 ShmemTransport::blob_to_key(const TransportBLOB& blob)
00244 {
00245   const char* const c_str = reinterpret_cast<const char*>(blob.get_buffer());
00246   const std::string host(c_str);
00247   const size_t host_len = host.size();
00248 
00249   const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1);
00250   return make_pair(host, pool);
00251 }
00252 
00253 void
00254 ShmemTransport::release_datalink(DataLink* link)
00255 {
00256   GuardType guard(links_lock_);
00257   for (ShmemDataLinkMap::iterator it(links_.begin());
00258        it != links_.end(); ++it) {
00259     // We are guaranteed to have exactly one matching DataLink
00260     // in the map; release any resources held and return.
00261     if (link == static_cast<DataLink*>(it->second.in())) {
00262       link->stop();
00263       links_.erase(it);
00264       return;
00265     }
00266   }
00267 }
00268 
00269 ShmemTransport::ReadTask::ReadTask(ShmemTransport* outer, ACE_sema_t semaphore)
00270   : outer_(outer)
00271   , semaphore_(semaphore)
00272   , stopped_(false)
00273 {
00274   activate();
00275 }
00276 
00277 int
00278 ShmemTransport::ReadTask::svc()
00279 {
00280   while (true) {
00281     ACE_OS::sema_wait(&semaphore_);
00282     if (stopped_) {
00283       return 0;
00284     }
00285     outer_->read_from_links();
00286   }
00287   return 1;
00288 }
00289 
00290 void
00291 ShmemTransport::ReadTask::stop()
00292 {
00293   stopped_ = true;
00294   ACE_OS::sema_post(&semaphore_);
00295   wait();
00296 }
00297 
00298 void
00299 ShmemTransport::read_from_links()
00300 {
00301   std::vector<ShmemDataLink_rch> dl_copies;
00302   {
00303     GuardType guard(links_lock_);
00304     typedef ShmemDataLinkMap::iterator iter_t;
00305     for (iter_t it = links_.begin(); it != links_.end(); ++it) {
00306       ShmemDataLink_rch link = it->second;
00307       dl_copies.push_back(link);
00308     }
00309   }
00310 
00311   typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t;
00312   for (dl_iter_t dl_it = dl_copies.begin(); dl_it != dl_copies.end(); ++dl_it) {
00313     dl_it->in()->read();
00314   }
00315 }
00316 
00317 void
00318 ShmemTransport::signal_semaphore()
00319 {
00320   ACE_OS::sema_post(&read_task_->semaphore_);
00321 }
00322 
00323 std::string
00324 ShmemTransport::address()
00325 {
00326   return this->config_i_->poolname();
00327 }
00328 
00329 } // namespace DCPS
00330 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7