OpenDDS::DCPS::ShmemTransport Class Reference

#include <ShmemTransport.h>

Inheritance diagram for OpenDDS::DCPS::ShmemTransport:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ShmemTransport:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ShmemTransport (const TransportInst_rch &inst)
ShmemAllocatoralloc ()
std::string address ()
void signal_semaphore ()

Protected Member Functions

virtual AcceptConnectResult connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual AcceptConnectResult accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client)
virtual void stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id)
virtual bool configure_i (TransportInst *config)
virtual void shutdown_i ()
virtual bool connection_info_i (TransportLocator &info) const
virtual void release_datalink (DataLink *link)
virtual std::string transport_type () const

Private Types

typedef ACE_Thread_Mutex LockType
typedef ACE_Guard< LockTypeGuardType

Private Member Functions

DataLinkadd_datalink (const std::string &remote_address)
 Create a new link (using make_datalink) and add it to the map.
ShmemDataLinkmake_datalink (const std::string &remote_address)
 Create the DataLink object and start it.
std::pair< std::string, std::string > blob_to_key (const TransportBLOB &blob)
void read_from_links ()
typedef OPENDDS_MAP (std::string, ShmemDataLink_rch) ShmemDataLinkMap

Private Attributes

RcHandle< ShmemInstconfig_i_
LockType links_lock_
ShmemDataLinkMap links_
ShmemAllocatoralloc_
OpenDDS::DCPS::ShmemTransport::ReadTaskread_task_

Classes

struct  ReadTask

Detailed Description

Definition at line 26 of file ShmemTransport.h.


Member Typedef Documentation

typedef ACE_Guard<LockType> OpenDDS::DCPS::ShmemTransport::GuardType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 72 of file ShmemTransport.h.

typedef ACE_Thread_Mutex OpenDDS::DCPS::ShmemTransport::LockType [private]

Reimplemented from OpenDDS::DCPS::TransportImpl.

Definition at line 71 of file ShmemTransport.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ShmemTransport::ShmemTransport ( const TransportInst_rch inst  )  [explicit]

Definition at line 25 of file ShmemTransport.cpp.

References OpenDDS::DCPS::TransportImpl::configure(), OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().

00026   : alloc_(0)
00027   , read_task_(0)
00028 {
00029   if (!inst.is_nil()) {
00030     if (!configure(inst.in())) {
00031       throw Transport::UnableToCreate();
00032     }
00033   }
00034 }


Member Function Documentation

TransportImpl::AcceptConnectResult OpenDDS::DCPS::ShmemTransport::accept_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [protected, virtual]

Definition at line 105 of file ShmemTransport.cpp.

References connect_datalink().

00108 {
00109   return connect_datalink(remote, attribs, client);
00110 }

DataLink * OpenDDS::DCPS::ShmemTransport::add_datalink ( const std::string &  remote_address  )  [private]

Create a new link (using make_datalink) and add it to the map.

Definition at line 97 of file ShmemTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), links_, and make_datalink().

Referenced by connect_datalink().

00098 {
00099   ShmemDataLink_rch link = make_datalink(remote_address);
00100   links_.insert(ShmemDataLinkMap::value_type(remote_address, link));
00101   return link._retn();
00102 }

std::string OpenDDS::DCPS::ShmemTransport::address (  ) 

Definition at line 324 of file ShmemTransport.cpp.

References config_i_.

00325 {
00326   return this->config_i_->poolname();
00327 }

ShmemAllocator* OpenDDS::DCPS::ShmemTransport::alloc (  )  [inline]

Definition at line 31 of file ShmemTransport.h.

00031 { return alloc_; }

std::pair< std::string, std::string > OpenDDS::DCPS::ShmemTransport::blob_to_key ( const TransportBLOB blob  )  [private]

Definition at line 243 of file ShmemTransport.cpp.

Referenced by connect_datalink().

00244 {
00245   const char* const c_str = reinterpret_cast<const char*>(blob.get_buffer());
00246   const std::string host(c_str);
00247   const size_t host_len = host.size();
00248 
00249   const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1);
00250   return make_pair(host, pool);
00251 }

bool OpenDDS::DCPS::ShmemTransport::configure_i ( TransportInst config  )  [protected, virtual]

Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 119 of file ShmemTransport.cpp.

References alloc_, OpenDDS::DCPS::TransportImpl::config(), config_i_, read_task_, and VDBG_LVL.

00120 {
00121 #if (!defined ACE_WIN32 && defined ACE_LACKS_SYSV_SHMEM) || defined ACE_HAS_WINCE
00122   ACE_UNUSED_ARG(config);
00123   ACE_ERROR_RETURN((LM_ERROR,
00124                     ACE_TEXT("(%P|%t) ERROR: ")
00125                     ACE_TEXT("ShmemTransport::configure_i: ")
00126                     ACE_TEXT("no platform support for shared memory!\n")),
00127                    false);
00128 #else
00129   config_i_ = dynamic_cast<ShmemInst*>(config);
00130   if (config_i_ == 0) {
00131     ACE_ERROR_RETURN((LM_ERROR,
00132                       ACE_TEXT("(%P|%t) ERROR: ")
00133                       ACE_TEXT("ShmemTransport::configure_i: ")
00134                       ACE_TEXT("invalid configuration!\n")),
00135                      false);
00136   }
00137   config_i_->_add_ref();
00138 
00139   ShmemAllocator::MEMORY_POOL_OPTIONS alloc_opts;
00140 # ifdef ACE_WIN32
00141   alloc_opts.max_size_ = config_i_->pool_size_;
00142 # elif !defined ACE_LACKS_SYSV_SHMEM
00143   alloc_opts.base_addr_ = 0;
00144   alloc_opts.segment_size_ = config_i_->pool_size_;
00145   alloc_opts.minimum_bytes_ = alloc_opts.segment_size_;
00146   alloc_opts.max_segments_ = 1;
00147 # endif
00148 
00149   alloc_ =
00150     new ShmemAllocator(ACE_TEXT_CHAR_TO_TCHAR(this->config_i_->poolname().c_str()),
00151                        0 /*lock_name is optional*/, &alloc_opts);
00152 
00153   void* mem = alloc_->malloc(sizeof(ShmemSharedSemaphore));
00154   if (mem == 0) {
00155     ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00156                       ACE_TEXT("ShmemTrasport::configure_i: failed to allocate")
00157                       ACE_TEXT(" space for semaphore in shared memory!\n")),
00158                      false);
00159   }
00160 
00161   ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00162   alloc_->bind("Semaphore", pSem);
00163 
00164   bool ok;
00165 # ifdef ACE_WIN32
00166   *pSem = ::CreateSemaphoreW(0 /*default security*/,
00167                              0 /*initial count*/,
00168                              0x7fffffff /*max count (ACE's default)*/,
00169                              0 /*no name*/);
00170   ACE_sema_t ace_sema = *pSem;
00171   ok = (*pSem != 0);
00172 # elif !defined ACE_LACKS_UNNAMED_SEMAPHORE
00173   ok = (0 == ::sem_init(pSem, 1 /*process shared*/, 0 /*initial count*/));
00174   ACE_sema_t ace_sema;
00175   std::memset(&ace_sema, 0, sizeof ace_sema);
00176   ace_sema.sema_ = pSem;
00177 #  if !defined (ACE_HAS_POSIX_SEM_TIMEOUT) && !defined (ACE_DISABLE_POSIX_SEM_TIMEOUT_EMULATION)
00178   ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER;
00179   ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER;
00180 #  endif
00181 # else
00182   ok = false;
00183   ACE_sema_t ace_sema;
00184 # endif
00185   if (!ok) {
00186     ACE_ERROR_RETURN((LM_ERROR,
00187                       ACE_TEXT("(%P|%t) ERROR: ")
00188                       ACE_TEXT("ShmemTransport::configure_i: ")
00189                       ACE_TEXT("could not create semaphore\n")),
00190                      false);
00191   }
00192 
00193   read_task_ = new ReadTask(this, ace_sema);
00194 
00195   VDBG_LVL((LM_INFO, "(%P|%t) ShmemTransport %@ configured with address %C\n",
00196             this, this->config_i_->poolname().c_str()), 1);
00197 
00198   return true;
00199 #endif
00200 }

TransportImpl::AcceptConnectResult OpenDDS::DCPS::ShmemTransport::connect_datalink ( const RemoteTransport &  remote,
const ConnectionAttribs &  attribs,
TransportClient client 
) [protected, virtual]

Definition at line 75 of file ShmemTransport.cpp.

References add_datalink(), blob_to_key(), links_, links_lock_, and VDBG_LVL.

Referenced by accept_datalink().

00078 {
00079   const std::pair<std::string, std::string> key = blob_to_key(remote.blob_);
00080   if (key.first != this->config_i_->hostname()) {
00081     return AcceptConnectResult();
00082   }
00083   GuardType guard(links_lock_);
00084   ShmemDataLinkMap::iterator iter = links_.find(key.second);
00085   if (iter != links_.end()) {
00086     ShmemDataLink_rch link = iter->second;
00087     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00088               ACE_TEXT("link found.\n")), 2);
00089     return AcceptConnectResult(link._retn());
00090   }
00091     VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00092               ACE_TEXT("new link.\n")), 2);
00093   return AcceptConnectResult(add_datalink(key.second));
00094 }

bool OpenDDS::DCPS::ShmemTransport::connection_info_i ( TransportLocator info  )  const [protected, virtual]

Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 236 of file ShmemTransport.cpp.

References config_i_.

00237 {
00238   this->config_i_->populate_locator(info);
00239   return true;
00240 }

ShmemDataLink * OpenDDS::DCPS::ShmemTransport::make_datalink ( const std::string &  remote_address  )  [private]

Create the DataLink object and start it.

Definition at line 37 of file ShmemTransport.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), config_i_, OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().

Referenced by add_datalink().

00038 {
00039   ShmemDataLink_rch link;
00040   ACE_NEW_RETURN(link, ShmemDataLink(this), 0);
00041 
00042   if (link.is_nil()) {
00043     ACE_ERROR_RETURN((LM_ERROR,
00044                       ACE_TEXT("(%P|%t) ERROR: ")
00045                       ACE_TEXT("ShmemTransport::make_datalink: ")
00046                       ACE_TEXT("failed to create DataLink!\n")),
00047                      0);
00048   }
00049 
00050   link->configure(config_i_.in());
00051 
00052   // Assign send strategy:
00053   ShmemSendStrategy* send_strategy;
00054   ACE_NEW_RETURN(send_strategy, ShmemSendStrategy(link.in()), 0);
00055   link->send_strategy(send_strategy);
00056 
00057   // Assign receive strategy:
00058   ShmemReceiveStrategy* recv_strategy;
00059   ACE_NEW_RETURN(recv_strategy, ShmemReceiveStrategy(link.in()), 0);
00060   link->receive_strategy(recv_strategy);
00061 
00062   // Open logical connection:
00063   if (!link->open(remote_address)) {
00064     ACE_ERROR_RETURN((LM_ERROR,
00065                       ACE_TEXT("(%P|%t) ERROR: ")
00066                       ACE_TEXT("ShmemTransport::make_datalink: ")
00067                       ACE_TEXT("failed to open DataLink!\n")),
00068                      0);
00069   }
00070 
00071   return link._retn();
00072 }

typedef OpenDDS::DCPS::ShmemTransport::OPENDDS_MAP ( std::string  ,
ShmemDataLink_rch   
) [private]

Map of fully associated DataLinks for this transport. Protected by links_lock_.

void OpenDDS::DCPS::ShmemTransport::read_from_links (  )  [private]

Definition at line 299 of file ShmemTransport.cpp.

References links_, and links_lock_.

Referenced by OpenDDS::DCPS::ShmemTransport::ReadTask::svc().

00300 {
00301   std::vector<ShmemDataLink_rch> dl_copies;
00302   {
00303     GuardType guard(links_lock_);
00304     typedef ShmemDataLinkMap::iterator iter_t;
00305     for (iter_t it = links_.begin(); it != links_.end(); ++it) {
00306       ShmemDataLink_rch link = it->second;
00307       dl_copies.push_back(link);
00308     }
00309   }
00310 
00311   typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t;
00312   for (dl_iter_t dl_it = dl_copies.begin(); dl_it != dl_copies.end(); ++dl_it) {
00313     dl_it->in()->read();
00314   }
00315 }

void OpenDDS::DCPS::ShmemTransport::release_datalink ( DataLink link  )  [protected, virtual]

The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 254 of file ShmemTransport.cpp.

References links_, links_lock_, and OpenDDS::DCPS::DataLink::stop().

00255 {
00256   GuardType guard(links_lock_);
00257   for (ShmemDataLinkMap::iterator it(links_.begin());
00258        it != links_.end(); ++it) {
00259     // We are guaranteed to have exactly one matching DataLink
00260     // in the map; release any resources held and return.
00261     if (link == static_cast<DataLink*>(it->second.in())) {
00262       link->stop();
00263       links_.erase(it);
00264       return;
00265     }
00266   }
00267 }

void OpenDDS::DCPS::ShmemTransport::shutdown_i (  )  [protected, virtual]

Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 203 of file ShmemTransport.cpp.

References alloc_, config_i_, links_, links_lock_, read_task_, and OpenDDS::DCPS::ShmemTransport::ReadTask::stop().

00204 {
00205   // Shutdown reserved datalinks and release configuration:
00206   GuardType guard(links_lock_);
00207   for (ShmemDataLinkMap::iterator it(links_.begin());
00208        it != links_.end(); ++it) {
00209     it->second->transport_shutdown();
00210   }
00211   links_.clear();
00212 
00213   if (read_task_) read_task_->stop();
00214   delete read_task_;
00215   read_task_ = 0;
00216 
00217   void* mem = 0;
00218   alloc_->find("Semaphore", mem);
00219   ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00220 #ifdef ACE_WIN32
00221   ::CloseHandle(*pSem);
00222 #elif defined ACE_HAS_POSIX_SEM && !defined ACE_LACKS_UNNAMED_SEMAPHORE
00223   ::sem_destroy(pSem);
00224 #else
00225   ACE_UNUSED_ARG(pSem);
00226 #endif
00227 
00228   alloc_->release(1 /*close*/);
00229   delete alloc_;
00230   alloc_ = 0;
00231 
00232   config_i_ = 0;
00233 }

void OpenDDS::DCPS::ShmemTransport::signal_semaphore (  ) 

Definition at line 318 of file ShmemTransport.cpp.

References read_task_, and OpenDDS::DCPS::ShmemTransport::ReadTask::semaphore_.

00319 {
00320   ACE_OS::sema_post(&read_task_->semaphore_);
00321 }

void OpenDDS::DCPS::ShmemTransport::stop_accepting_or_connecting ( TransportClient client,
const RepoId remote_id 
) [protected, virtual]

stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 113 of file ShmemTransport.cpp.

00114 {
00115   // no-op: accept and connect either complete or fail immediately
00116 }

virtual std::string OpenDDS::DCPS::ShmemTransport::transport_type (  )  const [inline, protected, virtual]

Implements OpenDDS::DCPS::TransportImpl.

Definition at line 55 of file ShmemTransport.h.

00055 { return "shmem"; }


Member Data Documentation

ShmemAllocator* OpenDDS::DCPS::ShmemTransport::alloc_ [private]

Definition at line 81 of file ShmemTransport.h.

Referenced by configure_i(), and shutdown_i().

RcHandle<ShmemInst> OpenDDS::DCPS::ShmemTransport::config_i_ [private]

Definition at line 69 of file ShmemTransport.h.

Referenced by address(), configure_i(), connection_info_i(), make_datalink(), and shutdown_i().

ShmemDataLinkMap OpenDDS::DCPS::ShmemTransport::links_ [private]

Definition at line 79 of file ShmemTransport.h.

Referenced by add_datalink(), connect_datalink(), read_from_links(), release_datalink(), and shutdown_i().

LockType OpenDDS::DCPS::ShmemTransport::links_lock_ [private]

Definition at line 74 of file ShmemTransport.h.

Referenced by connect_datalink(), read_from_links(), release_datalink(), and shutdown_i().

OpenDDS::DCPS::ShmemTransport::ReadTask* OpenDDS::DCPS::ShmemTransport::read_task_ [private]

Referenced by configure_i(), shutdown_i(), and signal_semaphore().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:38 2016 for OpenDDS by  doxygen 1.4.7