#include <ShmemTransport.h>
Inheritance diagram for OpenDDS::DCPS::ShmemTransport:
Public Member Functions | |
ShmemTransport (const TransportInst_rch &inst) | |
ShmemAllocator * | alloc () |
std::string | address () |
void | signal_semaphore () |
Protected Member Functions | |
virtual AcceptConnectResult | connect_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual AcceptConnectResult | accept_datalink (const RemoteTransport &remote, const ConnectionAttribs &attribs, TransportClient *client) |
virtual void | stop_accepting_or_connecting (TransportClient *client, const RepoId &remote_id) |
virtual bool | configure_i (TransportInst *config) |
virtual void | shutdown_i () |
virtual bool | connection_info_i (TransportLocator &info) const |
virtual void | release_datalink (DataLink *link) |
virtual std::string | transport_type () const |
Private Types | |
typedef ACE_Thread_Mutex | LockType |
typedef ACE_Guard< LockType > | GuardType |
Private Member Functions | |
DataLink * | add_datalink (const std::string &remote_address) |
Create a new link (using make_datalink) and add it to the map. | |
ShmemDataLink * | make_datalink (const std::string &remote_address) |
Create the DataLink object and start it. | |
std::pair< std::string, std::string > | blob_to_key (const TransportBLOB &blob) |
void | read_from_links () |
typedef | OPENDDS_MAP (std::string, ShmemDataLink_rch) ShmemDataLinkMap |
Private Attributes | |
RcHandle< ShmemInst > | config_i_ |
LockType | links_lock_ |
ShmemDataLinkMap | links_ |
ShmemAllocator * | alloc_ |
OpenDDS::DCPS::ShmemTransport::ReadTask * | read_task_ |
Classes | |
struct | ReadTask |
Definition at line 26 of file ShmemTransport.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::ShmemTransport::GuardType [private] |
typedef ACE_Thread_Mutex OpenDDS::DCPS::ShmemTransport::LockType [private] |
OpenDDS::DCPS::ShmemTransport::ShmemTransport | ( | const TransportInst_rch & | inst | ) | [explicit] |
Definition at line 25 of file ShmemTransport.cpp.
References OpenDDS::DCPS::TransportImpl::configure(), OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().
00026 : alloc_(0) 00027 , read_task_(0) 00028 { 00029 if (!inst.is_nil()) { 00030 if (!configure(inst.in())) { 00031 throw Transport::UnableToCreate(); 00032 } 00033 } 00034 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::ShmemTransport::accept_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [protected, virtual] |
Definition at line 105 of file ShmemTransport.cpp.
References connect_datalink().
00108 { 00109 return connect_datalink(remote, attribs, client); 00110 }
DataLink * OpenDDS::DCPS::ShmemTransport::add_datalink | ( | const std::string & | remote_address | ) | [private] |
Create a new link (using make_datalink) and add it to the map.
Definition at line 97 of file ShmemTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), links_, and make_datalink().
Referenced by connect_datalink().
00098 { 00099 ShmemDataLink_rch link = make_datalink(remote_address); 00100 links_.insert(ShmemDataLinkMap::value_type(remote_address, link)); 00101 return link._retn(); 00102 }
std::string OpenDDS::DCPS::ShmemTransport::address | ( | ) |
Definition at line 324 of file ShmemTransport.cpp.
References config_i_.
00325 { 00326 return this->config_i_->poolname(); 00327 }
ShmemAllocator* OpenDDS::DCPS::ShmemTransport::alloc | ( | ) | [inline] |
std::pair< std::string, std::string > OpenDDS::DCPS::ShmemTransport::blob_to_key | ( | const TransportBLOB & | blob | ) | [private] |
Definition at line 243 of file ShmemTransport.cpp.
Referenced by connect_datalink().
00244 { 00245 const char* const c_str = reinterpret_cast<const char*>(blob.get_buffer()); 00246 const std::string host(c_str); 00247 const size_t host_len = host.size(); 00248 00249 const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1); 00250 return make_pair(host, pool); 00251 }
bool OpenDDS::DCPS::ShmemTransport::configure_i | ( | TransportInst * | config | ) | [protected, virtual] |
Concrete subclass gets a shot at the config object. The subclass will likely downcast the TransportInst object to a subclass type that it expects/requires.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 119 of file ShmemTransport.cpp.
References alloc_, OpenDDS::DCPS::TransportImpl::config(), config_i_, read_task_, and VDBG_LVL.
00120 { 00121 #if (!defined ACE_WIN32 && defined ACE_LACKS_SYSV_SHMEM) || defined ACE_HAS_WINCE 00122 ACE_UNUSED_ARG(config); 00123 ACE_ERROR_RETURN((LM_ERROR, 00124 ACE_TEXT("(%P|%t) ERROR: ") 00125 ACE_TEXT("ShmemTransport::configure_i: ") 00126 ACE_TEXT("no platform support for shared memory!\n")), 00127 false); 00128 #else 00129 config_i_ = dynamic_cast<ShmemInst*>(config); 00130 if (config_i_ == 0) { 00131 ACE_ERROR_RETURN((LM_ERROR, 00132 ACE_TEXT("(%P|%t) ERROR: ") 00133 ACE_TEXT("ShmemTransport::configure_i: ") 00134 ACE_TEXT("invalid configuration!\n")), 00135 false); 00136 } 00137 config_i_->_add_ref(); 00138 00139 ShmemAllocator::MEMORY_POOL_OPTIONS alloc_opts; 00140 # ifdef ACE_WIN32 00141 alloc_opts.max_size_ = config_i_->pool_size_; 00142 # elif !defined ACE_LACKS_SYSV_SHMEM 00143 alloc_opts.base_addr_ = 0; 00144 alloc_opts.segment_size_ = config_i_->pool_size_; 00145 alloc_opts.minimum_bytes_ = alloc_opts.segment_size_; 00146 alloc_opts.max_segments_ = 1; 00147 # endif 00148 00149 alloc_ = 00150 new ShmemAllocator(ACE_TEXT_CHAR_TO_TCHAR(this->config_i_->poolname().c_str()), 00151 0 /*lock_name is optional*/, &alloc_opts); 00152 00153 void* mem = alloc_->malloc(sizeof(ShmemSharedSemaphore)); 00154 if (mem == 0) { 00155 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00156 ACE_TEXT("ShmemTrasport::configure_i: failed to allocate") 00157 ACE_TEXT(" space for semaphore in shared memory!\n")), 00158 false); 00159 } 00160 00161 ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem); 00162 alloc_->bind("Semaphore", pSem); 00163 00164 bool ok; 00165 # ifdef ACE_WIN32 00166 *pSem = ::CreateSemaphoreW(0 /*default security*/, 00167 0 /*initial count*/, 00168 0x7fffffff /*max count (ACE's default)*/, 00169 0 /*no name*/); 00170 ACE_sema_t ace_sema = *pSem; 00171 ok = (*pSem != 0); 00172 # elif !defined ACE_LACKS_UNNAMED_SEMAPHORE 00173 ok = (0 == ::sem_init(pSem, 1 /*process shared*/, 0 /*initial count*/)); 00174 ACE_sema_t ace_sema; 00175 std::memset(&ace_sema, 0, sizeof ace_sema); 00176 ace_sema.sema_ = pSem; 00177 # if !defined (ACE_HAS_POSIX_SEM_TIMEOUT) && !defined (ACE_DISABLE_POSIX_SEM_TIMEOUT_EMULATION) 00178 ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER; 00179 ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER; 00180 # endif 00181 # else 00182 ok = false; 00183 ACE_sema_t ace_sema; 00184 # endif 00185 if (!ok) { 00186 ACE_ERROR_RETURN((LM_ERROR, 00187 ACE_TEXT("(%P|%t) ERROR: ") 00188 ACE_TEXT("ShmemTransport::configure_i: ") 00189 ACE_TEXT("could not create semaphore\n")), 00190 false); 00191 } 00192 00193 read_task_ = new ReadTask(this, ace_sema); 00194 00195 VDBG_LVL((LM_INFO, "(%P|%t) ShmemTransport %@ configured with address %C\n", 00196 this, this->config_i_->poolname().c_str()), 1); 00197 00198 return true; 00199 #endif 00200 }
TransportImpl::AcceptConnectResult OpenDDS::DCPS::ShmemTransport::connect_datalink | ( | const RemoteTransport & | remote, | |
const ConnectionAttribs & | attribs, | |||
TransportClient * | client | |||
) | [protected, virtual] |
Definition at line 75 of file ShmemTransport.cpp.
References add_datalink(), blob_to_key(), links_, links_lock_, and VDBG_LVL.
Referenced by accept_datalink().
00078 { 00079 const std::pair<std::string, std::string> key = blob_to_key(remote.blob_); 00080 if (key.first != this->config_i_->hostname()) { 00081 return AcceptConnectResult(); 00082 } 00083 GuardType guard(links_lock_); 00084 ShmemDataLinkMap::iterator iter = links_.find(key.second); 00085 if (iter != links_.end()) { 00086 ShmemDataLink_rch link = iter->second; 00087 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ") 00088 ACE_TEXT("link found.\n")), 2); 00089 return AcceptConnectResult(link._retn()); 00090 } 00091 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ") 00092 ACE_TEXT("new link.\n")), 2); 00093 return AcceptConnectResult(add_datalink(key.second)); 00094 }
bool OpenDDS::DCPS::ShmemTransport::connection_info_i | ( | TransportLocator & | info | ) | const [protected, virtual] |
Called by our connection_info() method to allow the concrete TransportImpl subclass to do the dirty work since it really is the one that knows how to populate the supplied TransportLocator object.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 236 of file ShmemTransport.cpp.
References config_i_.
00237 { 00238 this->config_i_->populate_locator(info); 00239 return true; 00240 }
ShmemDataLink * OpenDDS::DCPS::ShmemTransport::make_datalink | ( | const std::string & | remote_address | ) | [private] |
Create the DataLink object and start it.
Definition at line 37 of file ShmemTransport.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), config_i_, OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::RcHandle< T >::is_nil().
Referenced by add_datalink().
00038 { 00039 ShmemDataLink_rch link; 00040 ACE_NEW_RETURN(link, ShmemDataLink(this), 0); 00041 00042 if (link.is_nil()) { 00043 ACE_ERROR_RETURN((LM_ERROR, 00044 ACE_TEXT("(%P|%t) ERROR: ") 00045 ACE_TEXT("ShmemTransport::make_datalink: ") 00046 ACE_TEXT("failed to create DataLink!\n")), 00047 0); 00048 } 00049 00050 link->configure(config_i_.in()); 00051 00052 // Assign send strategy: 00053 ShmemSendStrategy* send_strategy; 00054 ACE_NEW_RETURN(send_strategy, ShmemSendStrategy(link.in()), 0); 00055 link->send_strategy(send_strategy); 00056 00057 // Assign receive strategy: 00058 ShmemReceiveStrategy* recv_strategy; 00059 ACE_NEW_RETURN(recv_strategy, ShmemReceiveStrategy(link.in()), 0); 00060 link->receive_strategy(recv_strategy); 00061 00062 // Open logical connection: 00063 if (!link->open(remote_address)) { 00064 ACE_ERROR_RETURN((LM_ERROR, 00065 ACE_TEXT("(%P|%t) ERROR: ") 00066 ACE_TEXT("ShmemTransport::make_datalink: ") 00067 ACE_TEXT("failed to open DataLink!\n")), 00068 0); 00069 } 00070 00071 return link._retn(); 00072 }
typedef OpenDDS::DCPS::ShmemTransport::OPENDDS_MAP | ( | std::string | , | |
ShmemDataLink_rch | ||||
) | [private] |
Map of fully associated DataLinks for this transport. Protected by links_lock_.
void OpenDDS::DCPS::ShmemTransport::read_from_links | ( | ) | [private] |
Definition at line 299 of file ShmemTransport.cpp.
References links_, and links_lock_.
Referenced by OpenDDS::DCPS::ShmemTransport::ReadTask::svc().
00300 { 00301 std::vector<ShmemDataLink_rch> dl_copies; 00302 { 00303 GuardType guard(links_lock_); 00304 typedef ShmemDataLinkMap::iterator iter_t; 00305 for (iter_t it = links_.begin(); it != links_.end(); ++it) { 00306 ShmemDataLink_rch link = it->second; 00307 dl_copies.push_back(link); 00308 } 00309 } 00310 00311 typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t; 00312 for (dl_iter_t dl_it = dl_copies.begin(); dl_it != dl_copies.end(); ++dl_it) { 00313 dl_it->in()->read(); 00314 } 00315 }
void OpenDDS::DCPS::ShmemTransport::release_datalink | ( | DataLink * | link | ) | [protected, virtual] |
The DataLink itself calls this method when it thinks it is no longer used for any associations. This occurs during a "remove associations" operation being performed by some TransportClient that uses this TransportImpl. The TransportClient is known to have acquired our reservation_lock_, so there won't be any reserve_datalink() calls being made from any other threads while we perform this release.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 254 of file ShmemTransport.cpp.
References links_, links_lock_, and OpenDDS::DCPS::DataLink::stop().
00255 { 00256 GuardType guard(links_lock_); 00257 for (ShmemDataLinkMap::iterator it(links_.begin()); 00258 it != links_.end(); ++it) { 00259 // We are guaranteed to have exactly one matching DataLink 00260 // in the map; release any resources held and return. 00261 if (link == static_cast<DataLink*>(it->second.in())) { 00262 link->stop(); 00263 links_.erase(it); 00264 return; 00265 } 00266 } 00267 }
void OpenDDS::DCPS::ShmemTransport::shutdown_i | ( | ) | [protected, virtual] |
Called during the shutdown() method in order to give the concrete TransportImpl subclass a chance to do something when the shutdown "event" occurs.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 203 of file ShmemTransport.cpp.
References alloc_, config_i_, links_, links_lock_, read_task_, and OpenDDS::DCPS::ShmemTransport::ReadTask::stop().
00204 { 00205 // Shutdown reserved datalinks and release configuration: 00206 GuardType guard(links_lock_); 00207 for (ShmemDataLinkMap::iterator it(links_.begin()); 00208 it != links_.end(); ++it) { 00209 it->second->transport_shutdown(); 00210 } 00211 links_.clear(); 00212 00213 if (read_task_) read_task_->stop(); 00214 delete read_task_; 00215 read_task_ = 0; 00216 00217 void* mem = 0; 00218 alloc_->find("Semaphore", mem); 00219 ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem); 00220 #ifdef ACE_WIN32 00221 ::CloseHandle(*pSem); 00222 #elif defined ACE_HAS_POSIX_SEM && !defined ACE_LACKS_UNNAMED_SEMAPHORE 00223 ::sem_destroy(pSem); 00224 #else 00225 ACE_UNUSED_ARG(pSem); 00226 #endif 00227 00228 alloc_->release(1 /*close*/); 00229 delete alloc_; 00230 alloc_ = 0; 00231 00232 config_i_ = 0; 00233 }
void OpenDDS::DCPS::ShmemTransport::signal_semaphore | ( | ) |
Definition at line 318 of file ShmemTransport.cpp.
References read_task_, and OpenDDS::DCPS::ShmemTransport::ReadTask::semaphore_.
00319 { 00320 ACE_OS::sema_post(&read_task_->semaphore_); 00321 }
void OpenDDS::DCPS::ShmemTransport::stop_accepting_or_connecting | ( | TransportClient * | client, | |
const RepoId & | remote_id | |||
) | [protected, virtual] |
stop_accepting_or_connecting() is called from TransportClient to terminate the accepting process begun by accept_datalink() or connect_datalink(). This allows the TransportImpl to clean up any resources associated with this pending connection. The TransportClient* passed in to accept or connect is not valid after this method is called.
Implements OpenDDS::DCPS::TransportImpl.
Definition at line 113 of file ShmemTransport.cpp.
virtual std::string OpenDDS::DCPS::ShmemTransport::transport_type | ( | ) | const [inline, protected, virtual] |
Definition at line 69 of file ShmemTransport.h.
Referenced by address(), configure_i(), connection_info_i(), make_datalink(), and shutdown_i().
ShmemDataLinkMap OpenDDS::DCPS::ShmemTransport::links_ [private] |
Definition at line 79 of file ShmemTransport.h.
Referenced by add_datalink(), connect_datalink(), read_from_links(), release_datalink(), and shutdown_i().
Definition at line 74 of file ShmemTransport.h.
Referenced by connect_datalink(), read_from_links(), release_datalink(), and shutdown_i().
Referenced by configure_i(), shutdown_i(), and signal_semaphore().