#include <ShmemTransport.h>
Classes | |
struct | ReadTask |
Public Member Functions | |
ShmemTransport (ShmemInst &inst) | |
ShmemAllocator * | alloc () |
std::string | address () |
void | signal_semaphore () |
ShmemInst & | config () const |
Protected Member Functions | |
virtual AcceptConnectResult | connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client) |
virtual AcceptConnectResult | accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client) |
virtual void | stop_accepting_or_connecting (const TransportClient_wrch &client, const RepoId &remote_id) |
bool | configure_i (ShmemInst &config) |
virtual void | shutdown_i () |
virtual bool | connection_info_i (TransportLocator &info) const |
virtual void | release_datalink (DataLink *link) |
virtual std::string | transport_type () const |
Private Types | |
typedef ACE_Thread_Mutex | LockType |
typedef ACE_Guard< LockType > | GuardType |
Private Member Functions | |
DataLink_rch | add_datalink (const std::string &remote_address) |
Create a new link (using make_datalink) and add it to the map. | |
ShmemDataLink_rch | make_datalink (const std::string &remote_address) |
Create the DataLink object and start it. | |
std::pair< std::string, std::string > | blob_to_key (const TransportBLOB &blob) |
void | read_from_links () |
typedef | OPENDDS_MAP (std::string, ShmemDataLink_rch) ShmemDataLinkMap |
Private Attributes | |
LockType | links_lock_ |
ShmemDataLinkMap | links_ |
unique_ptr< ShmemAllocator > | alloc_ |
unique_ptr< ReadTask > | read_task_ |
Definition at line 28 of file ShmemTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::ShmemTransport::GuardType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 74 of file ShmemTransport.h.
typedef ACE_Thread_Mutex OpenDDS::DCPS::ShmemTransport::LockType [private] |
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 73 of file ShmemTransport.h.
OpenDDS::DCPS::ShmemTransport::ShmemTransport | ( | ShmemInst & | inst | ) | [explicit] |
Definition at line 27 of file ShmemTransport.cpp.
References configure_i(), and OpenDDS::DCPS::TransportImpl::open().
00028 : TransportImpl(inst) 00029 { 00030 if (! (configure_i(inst) && open()) ) { 00031 throw Transport::UnableToCreate(); 00032 } 00033 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::ShmemTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [protected, virtual] |
accept_datalink() is called from TransportClient to initiate an association as the passive peer. A DataLink may be returned if one is already connected and ready to use, otherwise passively wait for a physical connection from the active side (either in the form of a connection event or handshaking message). Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 89 of file ShmemTransport.cpp.
References connect_datalink().
00092 { 00093 return connect_datalink(remote, attribs, client); 00094 }
DataLink_rch OpenDDS::DCPS::ShmemTransport::add_datalink | ( | const std::string & | remote_address | ) | [private] |
Create a new link (using make_datalink) and add it to the map.
Definition at line 81 of file ShmemTransport.cpp.
References links_, and make_datalink().
Referenced by connect_datalink().
00082 { 00083 ShmemDataLink_rch link = make_datalink(remote_address); 00084 links_.insert(ShmemDataLinkMap::value_type(remote_address, link)); 00085 return link; 00086 }
std::string OpenDDS::DCPS::ShmemTransport::address | ( | void | ) |
Definition at line 297 of file ShmemTransport.cpp.
References config(), and OpenDDS::DCPS::ShmemInst::poolname().
Referenced by OpenDDS::DCPS::ShmemDataLink::local_address().
00298 { 00299 return this->config().poolname(); 00300 }
ShmemAllocator* OpenDDS::DCPS::ShmemTransport::alloc | ( | void | ) | [inline] |
Definition at line 33 of file ShmemTransport.h.
Referenced by OpenDDS::DCPS::ShmemDataLink::local_allocator().
00033 { return alloc_.get(); }
std::pair< std::string, std::string > OpenDDS::DCPS::ShmemTransport::blob_to_key | ( | const TransportBLOB & | blob | ) | [private] |
Definition at line 217 of file ShmemTransport.cpp.
Referenced by connect_datalink().
00218 { 00219 const char* const c_str = reinterpret_cast<const char*>(blob.get_buffer()); 00220 const std::string host(c_str); 00221 const size_t host_len = host.size(); 00222 00223 const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1); 00224 return make_pair(host, pool); 00225 }
ShmemInst & OpenDDS::DCPS::ShmemTransport::config | ( | ) | const |
Expose the configuration information so others can see what we can do.
Reimplemented from OpenDDS::DCPS::TransportImpl.
Definition at line 36 of file ShmemTransport.cpp.
Referenced by address(), and connection_info_i().
00037 { 00038 return static_cast<ShmemInst&>(TransportImpl::config()); 00039 }
bool OpenDDS::DCPS::ShmemTransport::configure_i | ( | ShmemInst & | config | ) | [protected] |
Definition at line 103 of file ShmemTransport.cpp.
References ACE_TEXT(), ACE_TEXT_CHAR_TO_TCHAR, alloc_, ACE_sema_t::count_nonzero_, LM_ERROR, LM_INFO, ACE_sema_t::lock_, OpenDDS::DCPS::ShmemInst::pool_size_, OpenDDS::DCPS::ShmemInst::poolname(), read_task_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), ACE_sema_t::sema_, and VDBG_LVL.
Referenced by ShmemTransport().
00104 { 00105 #if (!defined ACE_WIN32 && defined ACE_LACKS_SYSV_SHMEM) || defined ACE_HAS_WINCE 00106 ACE_UNUSED_ARG(config); 00107 ACE_ERROR_RETURN((LM_ERROR, 00108 ACE_TEXT("(%P|%t) ERROR: ") 00109 ACE_TEXT("ShmemTransport::configure_i: ") 00110 ACE_TEXT("no platform support for shared memory!\n")), 00111 false); 00112 #else 00113 00114 ShmemAllocator::MEMORY_POOL_OPTIONS alloc_opts; 00115 # ifdef ACE_WIN32 00116 alloc_opts.max_size_ = config.pool_size_; 00117 # elif !defined ACE_LACKS_SYSV_SHMEM 00118 alloc_opts.base_addr_ = 0; 00119 alloc_opts.segment_size_ = config.pool_size_; 00120 alloc_opts.minimum_bytes_ = alloc_opts.segment_size_; 00121 alloc_opts.max_segments_ = 1; 00122 # endif 00123 00124 alloc_.reset( 00125 new ShmemAllocator(ACE_TEXT_CHAR_TO_TCHAR(config.poolname().c_str()), 00126 0 /*lock_name is optional*/, &alloc_opts)); 00127 00128 void* mem = alloc_->malloc(sizeof(ShmemSharedSemaphore)); 00129 if (mem == 0) { 00130 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00131 ACE_TEXT("ShmemTrasport::configure_i: failed to allocate") 00132 ACE_TEXT(" space for semaphore in shared memory!\n")), 00133 false); 00134 } 00135 00136 ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem); 00137 alloc_->bind("Semaphore", pSem); 00138 00139 bool ok; 00140 # ifdef ACE_WIN32 00141 *pSem = ::CreateSemaphoreW(0 /*default security*/, 00142 0 /*initial count*/, 00143 0x7fffffff /*max count (ACE's default)*/, 00144 0 /*no name*/); 00145 ACE_sema_t ace_sema = *pSem; 00146 ok = (*pSem != 0); 00147 # elif !defined ACE_LACKS_UNNAMED_SEMAPHORE 00148 ok = (0 == ::sem_init(pSem, 1 /*process shared*/, 0 /*initial count*/)); 00149 ACE_sema_t ace_sema; 00150 std::memset(&ace_sema, 0, sizeof ace_sema); 00151 ace_sema.sema_ = pSem; 00152 # if !defined (ACE_HAS_POSIX_SEM_TIMEOUT) && !defined (ACE_DISABLE_POSIX_SEM_TIMEOUT_EMULATION) 00153 ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER; 00154 ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER; 00155 # endif 00156 # else 00157 ok = false; 00158 ACE_sema_t ace_sema; 00159 # endif 00160 if (!ok) { 00161 ACE_ERROR_RETURN((LM_ERROR, 00162 ACE_TEXT("(%P|%t) ERROR: ") 00163 ACE_TEXT("ShmemTransport::configure_i: ") 00164 ACE_TEXT("could not create semaphore\n")), 00165 false); 00166 } 00167 00168 read_task_.reset(new ReadTask(this, ace_sema)); 00169 00170 VDBG_LVL((LM_INFO, "(%P|%t) ShmemTransport %@ configured with address %C\n", 00171 this, config.poolname().c_str()), 1); 00172 00173 return true; 00174 #endif 00175 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::ShmemTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
const TransportClient_rch & | client | |||
) | [protected, virtual] |
connect_datalink() is called from TransportClient to initiate an association as the active peer. A DataLink may be returned if one is already connected and ready to use, otherwise initiate a connection to the passive side and return from this method. Upon completion of the physical connection, the transport calls back to TransportClient::use_datalink().
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 59 of file ShmemTransport.cpp.
References ACE_TEXT(), add_datalink(), OpenDDS::DCPS::TransportImpl::RemoteTransport::blob_, blob_to_key(), links_, links_lock_, LM_DEBUG, and VDBG_LVL.
Referenced by accept_datalink().
00062 { 00063 const std::pair<std::string, std::string> key = blob_to_key(remote.blob_); 00064 if (key.first != this->config().hostname()) { 00065 return AcceptConnectResult(); 00066 } 00067 GuardType guard(links_lock_); 00068 ShmemDataLinkMap::iterator iter = links_.find(key.second); 00069 if (iter != links_.end()) { 00070 ShmemDataLink_rch link = iter->second; 00071 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ") 00072 ACE_TEXT("link found.\n")), 2); 00073 return AcceptConnectResult(link); 00074 } 00075 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ") 00076 ACE_TEXT("new link.\n")), 2); 00077 return AcceptConnectResult(add_datalink(key.second)); 00078 }
bool OpenDDS::DCPS::ShmemTransport::connection_info_i | ( | TransportLocator & | local_info | ) | const [protected, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 210 of file ShmemTransport.cpp.
References config(), and OpenDDS::DCPS::ShmemInst::populate_locator().
00211 { 00212 this->config().populate_locator(info); 00213 return true; 00214 }
ShmemDataLink_rch OpenDDS::DCPS::ShmemTransport::make_datalink | ( | const std::string & | remote_address | ) | [private] |
Create the DataLink object and start it.
Definition at line 42 of file ShmemTransport.cpp.
References ACE_TEXT(), LM_ERROR, and OpenDDS::DCPS::ref().
Referenced by add_datalink().
00043 { 00044 00045 ShmemDataLink_rch link = make_rch<ShmemDataLink>(ref(*this)); 00046 00047 // Open logical connection: 00048 if (link->open(remote_address)) 00049 return link; 00050 00051 ACE_ERROR((LM_ERROR, 00052 ACE_TEXT("(%P|%t) ERROR: ") 00053 ACE_TEXT("ShmemTransport::make_datalink: ") 00054 ACE_TEXT("failed to open DataLink!\n"))); 00055 return ShmemDataLink_rch(); 00056 }
typedef OpenDDS::DCPS::ShmemTransport::OPENDDS_MAP | ( | std::string | , | |
ShmemDataLink_rch | ||||
) | [private] |
Map of fully associated DataLinks for this transport. Protected by links_lock_.
void OpenDDS::DCPS::ShmemTransport::read_from_links | ( | ) | [private] |
Definition at line 273 of file ShmemTransport.cpp.
References links_, and links_lock_.
Referenced by OpenDDS::DCPS::ShmemTransport::ReadTask::svc().
00274 { 00275 std::vector<ShmemDataLink_rch> dl_copies; 00276 { 00277 GuardType guard(links_lock_); 00278 typedef ShmemDataLinkMap::iterator iter_t; 00279 for (iter_t it = links_.begin(); it != links_.end(); ++it) { 00280 dl_copies.push_back(it->second); 00281 } 00282 } 00283 00284 typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t; 00285 for (dl_iter_t dl_it = dl_copies.begin(); dl_it != dl_copies.end(); ++dl_it) { 00286 dl_it->in()->read(); 00287 } 00288 }
void OpenDDS::DCPS::ShmemTransport::release_datalink | ( | DataLink * | link | ) | [protected, virtual] |
Called by the TransportRegistry when this TransportImpl object is released while the TransportRegistry is handling a release() "event". The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 228 of file ShmemTransport.cpp.
References links_, links_lock_, and OpenDDS::DCPS::DataLink::stop().
00229 { 00230 GuardType guard(links_lock_); 00231 for (ShmemDataLinkMap::iterator it(links_.begin()); 00232 it != links_.end(); ++it) { 00233 // We are guaranteed to have exactly one matching DataLink 00234 // in the map; release any resources held and return. 00235 if (link == static_cast<DataLink*>(it->second.in())) { 00236 link->stop(); 00237 links_.erase(it); 00238 return; 00239 } 00240 } 00241 }
void OpenDDS::DCPS::ShmemTransport::shutdown_i | ( | ) | [protected, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 178 of file ShmemTransport.cpp.
References alloc_, links_, links_lock_, read_task_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), and OpenDDS::DCPS::unique_ptr< T, Deleter >::reset().
00179 { 00180 // Shutdown reserved datalinks and release configuration: 00181 GuardType guard(links_lock_); 00182 if (read_task_) read_task_->stop(); 00183 00184 for (ShmemDataLinkMap::iterator it(links_.begin()); 00185 it != links_.end(); ++it) { 00186 it->second->transport_shutdown(); 00187 } 00188 links_.clear(); 00189 00190 read_task_.reset(); 00191 00192 if (alloc_) { 00193 void* mem = 0; 00194 alloc_->find("Semaphore", mem); 00195 ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem); 00196 #ifdef ACE_WIN32 00197 ::CloseHandle(*pSem); 00198 #elif defined ACE_HAS_POSIX_SEM && !defined ACE_LACKS_UNNAMED_SEMAPHORE 00199 ::sem_destroy(pSem); 00200 #else 00201 ACE_UNUSED_ARG(pSem); 00202 #endif 00203 00204 alloc_->release(1 /*close*/); 00205 alloc_.reset(); 00206 } 00207 }
void OpenDDS::DCPS::ShmemTransport::signal_semaphore | ( | ) |
Definition at line 291 of file ShmemTransport.cpp.
References read_task_, and ACE_OS::sema_post().
Referenced by OpenDDS::DCPS::ShmemDataLink::signal_semaphore().
00292 { 00293 ACE_OS::sema_post(&read_task_->semaphore_); 00294 }
void OpenDDS::DCPS::ShmemTransport::stop_accepting_or_connecting | ( | const TransportClient_wrch & | client, | |
const RepoId & | remote_id | |||
) | [protected, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 97 of file ShmemTransport.cpp.
virtual std::string OpenDDS::DCPS::ShmemTransport::transport_type | ( | ) | const [inline, protected, virtual] |
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 59 of file ShmemTransport.h.
Definition at line 83 of file ShmemTransport.h.
Referenced by configure_i(), and shutdown_i().
ShmemDataLinkMap OpenDDS::DCPS::ShmemTransport::links_ [private] |
Definition at line 81 of file ShmemTransport.h.
Referenced by add_datalink(), connect_datalink(), read_from_links(), release_datalink(), and shutdown_i().
Definition at line 76 of file ShmemTransport.h.
Referenced by connect_datalink(), read_from_links(), release_datalink(), and shutdown_i().
Definition at line 95 of file ShmemTransport.h.
Referenced by configure_i(), shutdown_i(), and signal_semaphore().