00001
00002
00003
00004
00005
00006
00007
00008 #include "ShmemTransport.h"
00009 #include "ShmemInst.h"
00010 #include "ShmemSendStrategy.h"
00011 #include "ShmemReceiveStrategy.h"
00012
00013 #include "dds/DCPS/AssociationData.h"
00014 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00015 #include "dds/DCPS/transport/framework/TransportExceptions.h"
00016
00017 #include "ace/Log_Msg.h"
00018
00019 #include <sstream>
00020 #include <cstring>
00021
00022 namespace OpenDDS {
00023 namespace DCPS {
00024
00025 ShmemTransport::ShmemTransport(const TransportInst_rch& inst)
00026 : alloc_(0)
00027 , read_task_(0)
00028 {
00029 if (!inst.is_nil()) {
00030 if (!configure(inst.in())) {
00031 throw Transport::UnableToCreate();
00032 }
00033 }
00034 }
00035
00036 ShmemDataLink*
00037 ShmemTransport::make_datalink(const std::string& remote_address)
00038 {
00039 ShmemDataLink_rch link;
00040 ACE_NEW_RETURN(link, ShmemDataLink(this), 0);
00041
00042 if (link.is_nil()) {
00043 ACE_ERROR_RETURN((LM_ERROR,
00044 ACE_TEXT("(%P|%t) ERROR: ")
00045 ACE_TEXT("ShmemTransport::make_datalink: ")
00046 ACE_TEXT("failed to create DataLink!\n")),
00047 0);
00048 }
00049
00050 link->configure(config_i_.in());
00051
00052
00053 ShmemSendStrategy* send_strategy;
00054 ACE_NEW_RETURN(send_strategy, ShmemSendStrategy(link.in()), 0);
00055 link->send_strategy(send_strategy);
00056
00057
00058 ShmemReceiveStrategy* recv_strategy;
00059 ACE_NEW_RETURN(recv_strategy, ShmemReceiveStrategy(link.in()), 0);
00060 link->receive_strategy(recv_strategy);
00061
00062
00063 if (!link->open(remote_address)) {
00064 ACE_ERROR_RETURN((LM_ERROR,
00065 ACE_TEXT("(%P|%t) ERROR: ")
00066 ACE_TEXT("ShmemTransport::make_datalink: ")
00067 ACE_TEXT("failed to open DataLink!\n")),
00068 0);
00069 }
00070
00071 return link._retn();
00072 }
00073
00074 TransportImpl::AcceptConnectResult
00075 ShmemTransport::connect_datalink(const RemoteTransport& remote,
00076 const ConnectionAttribs&,
00077 TransportClient*)
00078 {
00079 const std::pair<std::string, std::string> key = blob_to_key(remote.blob_);
00080 if (key.first != this->config_i_->hostname()) {
00081 return AcceptConnectResult();
00082 }
00083 GuardType guard(links_lock_);
00084 ShmemDataLinkMap::iterator iter = links_.find(key.second);
00085 if (iter != links_.end()) {
00086 ShmemDataLink_rch link = iter->second;
00087 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00088 ACE_TEXT("link found.\n")), 2);
00089 return AcceptConnectResult(link._retn());
00090 }
00091 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) ShmemTransport::connect_datalink ")
00092 ACE_TEXT("new link.\n")), 2);
00093 return AcceptConnectResult(add_datalink(key.second));
00094 }
00095
00096 DataLink*
00097 ShmemTransport::add_datalink(const std::string& remote_address)
00098 {
00099 ShmemDataLink_rch link = make_datalink(remote_address);
00100 links_.insert(ShmemDataLinkMap::value_type(remote_address, link));
00101 return link._retn();
00102 }
00103
00104 TransportImpl::AcceptConnectResult
00105 ShmemTransport::accept_datalink(const RemoteTransport& remote,
00106 const ConnectionAttribs& attribs,
00107 TransportClient* client)
00108 {
00109 return connect_datalink(remote, attribs, client);
00110 }
00111
00112 void
00113 ShmemTransport::stop_accepting_or_connecting(TransportClient*, const RepoId&)
00114 {
00115
00116 }
00117
00118 bool
00119 ShmemTransport::configure_i(TransportInst* config)
00120 {
00121 #if (!defined ACE_WIN32 && defined ACE_LACKS_SYSV_SHMEM) || defined ACE_HAS_WINCE
00122 ACE_UNUSED_ARG(config);
00123 ACE_ERROR_RETURN((LM_ERROR,
00124 ACE_TEXT("(%P|%t) ERROR: ")
00125 ACE_TEXT("ShmemTransport::configure_i: ")
00126 ACE_TEXT("no platform support for shared memory!\n")),
00127 false);
00128 #else
00129 config_i_ = dynamic_cast<ShmemInst*>(config);
00130 if (config_i_ == 0) {
00131 ACE_ERROR_RETURN((LM_ERROR,
00132 ACE_TEXT("(%P|%t) ERROR: ")
00133 ACE_TEXT("ShmemTransport::configure_i: ")
00134 ACE_TEXT("invalid configuration!\n")),
00135 false);
00136 }
00137 config_i_->_add_ref();
00138
00139 ShmemAllocator::MEMORY_POOL_OPTIONS alloc_opts;
00140 # ifdef ACE_WIN32
00141 alloc_opts.max_size_ = config_i_->pool_size_;
00142 # elif !defined ACE_LACKS_SYSV_SHMEM
00143 alloc_opts.base_addr_ = 0;
00144 alloc_opts.segment_size_ = config_i_->pool_size_;
00145 alloc_opts.minimum_bytes_ = alloc_opts.segment_size_;
00146 alloc_opts.max_segments_ = 1;
00147 # endif
00148
00149 alloc_ =
00150 new ShmemAllocator(ACE_TEXT_CHAR_TO_TCHAR(this->config_i_->poolname().c_str()),
00151 0 , &alloc_opts);
00152
00153 void* mem = alloc_->malloc(sizeof(ShmemSharedSemaphore));
00154 if (mem == 0) {
00155 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00156 ACE_TEXT("ShmemTrasport::configure_i: failed to allocate")
00157 ACE_TEXT(" space for semaphore in shared memory!\n")),
00158 false);
00159 }
00160
00161 ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00162 alloc_->bind("Semaphore", pSem);
00163
00164 bool ok;
00165 # ifdef ACE_WIN32
00166 *pSem = ::CreateSemaphoreW(0 ,
00167 0 ,
00168 0x7fffffff ,
00169 0 );
00170 ACE_sema_t ace_sema = *pSem;
00171 ok = (*pSem != 0);
00172 # elif !defined ACE_LACKS_UNNAMED_SEMAPHORE
00173 ok = (0 == ::sem_init(pSem, 1 , 0 ));
00174 ACE_sema_t ace_sema;
00175 std::memset(&ace_sema, 0, sizeof ace_sema);
00176 ace_sema.sema_ = pSem;
00177 # if !defined (ACE_HAS_POSIX_SEM_TIMEOUT) && !defined (ACE_DISABLE_POSIX_SEM_TIMEOUT_EMULATION)
00178 ace_sema.lock_ = PTHREAD_MUTEX_INITIALIZER;
00179 ace_sema.count_nonzero_ = PTHREAD_COND_INITIALIZER;
00180 # endif
00181 # else
00182 ok = false;
00183 ACE_sema_t ace_sema;
00184 # endif
00185 if (!ok) {
00186 ACE_ERROR_RETURN((LM_ERROR,
00187 ACE_TEXT("(%P|%t) ERROR: ")
00188 ACE_TEXT("ShmemTransport::configure_i: ")
00189 ACE_TEXT("could not create semaphore\n")),
00190 false);
00191 }
00192
00193 read_task_ = new ReadTask(this, ace_sema);
00194
00195 VDBG_LVL((LM_INFO, "(%P|%t) ShmemTransport %@ configured with address %C\n",
00196 this, this->config_i_->poolname().c_str()), 1);
00197
00198 return true;
00199 #endif
00200 }
00201
00202 void
00203 ShmemTransport::shutdown_i()
00204 {
00205
00206 GuardType guard(links_lock_);
00207 for (ShmemDataLinkMap::iterator it(links_.begin());
00208 it != links_.end(); ++it) {
00209 it->second->transport_shutdown();
00210 }
00211 links_.clear();
00212
00213 if (read_task_) read_task_->stop();
00214 delete read_task_;
00215 read_task_ = 0;
00216
00217 void* mem = 0;
00218 alloc_->find("Semaphore", mem);
00219 ShmemSharedSemaphore* pSem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00220 #ifdef ACE_WIN32
00221 ::CloseHandle(*pSem);
00222 #elif defined ACE_HAS_POSIX_SEM && !defined ACE_LACKS_UNNAMED_SEMAPHORE
00223 ::sem_destroy(pSem);
00224 #else
00225 ACE_UNUSED_ARG(pSem);
00226 #endif
00227
00228 alloc_->release(1 );
00229 delete alloc_;
00230 alloc_ = 0;
00231
00232 config_i_ = 0;
00233 }
00234
00235 bool
00236 ShmemTransport::connection_info_i(TransportLocator& info) const
00237 {
00238 this->config_i_->populate_locator(info);
00239 return true;
00240 }
00241
00242 std::pair<std::string, std::string>
00243 ShmemTransport::blob_to_key(const TransportBLOB& blob)
00244 {
00245 const char* const c_str = reinterpret_cast<const char*>(blob.get_buffer());
00246 const std::string host(c_str);
00247 const size_t host_len = host.size();
00248
00249 const std::string pool(c_str + host_len + 1, blob.length() - host_len - 1);
00250 return make_pair(host, pool);
00251 }
00252
00253 void
00254 ShmemTransport::release_datalink(DataLink* link)
00255 {
00256 GuardType guard(links_lock_);
00257 for (ShmemDataLinkMap::iterator it(links_.begin());
00258 it != links_.end(); ++it) {
00259
00260
00261 if (link == static_cast<DataLink*>(it->second.in())) {
00262 link->stop();
00263 links_.erase(it);
00264 return;
00265 }
00266 }
00267 }
00268
00269 ShmemTransport::ReadTask::ReadTask(ShmemTransport* outer, ACE_sema_t semaphore)
00270 : outer_(outer)
00271 , semaphore_(semaphore)
00272 , stopped_(false)
00273 {
00274 activate();
00275 }
00276
00277 int
00278 ShmemTransport::ReadTask::svc()
00279 {
00280 while (true) {
00281 ACE_OS::sema_wait(&semaphore_);
00282 if (stopped_) {
00283 return 0;
00284 }
00285 outer_->read_from_links();
00286 }
00287 return 1;
00288 }
00289
00290 void
00291 ShmemTransport::ReadTask::stop()
00292 {
00293 stopped_ = true;
00294 ACE_OS::sema_post(&semaphore_);
00295 wait();
00296 }
00297
00298 void
00299 ShmemTransport::read_from_links()
00300 {
00301 std::vector<ShmemDataLink_rch> dl_copies;
00302 {
00303 GuardType guard(links_lock_);
00304 typedef ShmemDataLinkMap::iterator iter_t;
00305 for (iter_t it = links_.begin(); it != links_.end(); ++it) {
00306 ShmemDataLink_rch link = it->second;
00307 dl_copies.push_back(link);
00308 }
00309 }
00310
00311 typedef std::vector<ShmemDataLink_rch>::iterator dl_iter_t;
00312 for (dl_iter_t dl_it = dl_copies.begin(); dl_it != dl_copies.end(); ++dl_it) {
00313 dl_it->in()->read();
00314 }
00315 }
00316
00317 void
00318 ShmemTransport::signal_semaphore()
00319 {
00320 ACE_OS::sema_post(&read_task_->semaphore_);
00321 }
00322
00323 std::string
00324 ShmemTransport::address()
00325 {
00326 return this->config_i_->poolname();
00327 }
00328
00329 }
00330 }