00001
00002
00003
00004
00005
00006
00007
00008 #include "ShmemSendStrategy.h"
00009 #include "ShmemDataLink.h"
00010 #include "ShmemInst.h"
00011
00012 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00013
00014 #include <cstring>
00015
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 ShmemSendStrategy::ShmemSendStrategy(ShmemDataLink* link)
00022 : TransportSendStrategy(0, link->impl(),
00023 0,
00024 link->transport_priority(),
00025 make_rch<NullSynchStrategy>())
00026 , link_(link)
00027 , current_data_(0)
00028 , datalink_control_size_(link->impl().config().datalink_control_size_)
00029 {
00030 #ifdef ACE_HAS_POSIX_SEM
00031 memset(&peer_semaphore_, 0, sizeof(peer_semaphore_));
00032 #endif
00033 }
00034
00035 bool
00036 ShmemSendStrategy::start_i()
00037 {
00038 bound_name_ = "Write-" + link_->peer_address();
00039 ShmemAllocator* alloc = link_->local_allocator();
00040
00041 const size_t n_elems = datalink_control_size_ / sizeof(ShmemData),
00042 extra = datalink_control_size_ % sizeof(ShmemData);
00043
00044 void* mem = alloc->calloc(datalink_control_size_);
00045 if (mem == 0) {
00046 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00047 "to allocate %B bytes for control\n", link_, datalink_control_size_), 0);
00048 return false;
00049 }
00050
00051 ShmemData* data = reinterpret_cast<ShmemData*>(mem);
00052 data[(extra >= sizeof(int)) ? n_elems : (n_elems - 1)].status_ =
00053 SHMEM_DATA_END_OF_ALLOC;
00054 alloc->bind(bound_name_.c_str(), mem);
00055
00056 ShmemAllocator* peer = link_->peer_allocator();
00057 peer->find("Semaphore", mem);
00058 ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00059 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00060 HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false ,
00061 link_->peer_pid());
00062 ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_,
00063 0 ,
00064 false ,
00065 DUPLICATE_SAME_ACCESS );
00066 ::CloseHandle(srcProc);
00067 #elif defined ACE_HAS_POSIX_SEM
00068 peer_semaphore_.sema_ = sem;
00069 peer_semaphore_.name_ = 0;
00070 #else
00071 ACE_UNUSED_ARG(sem);
00072 #endif
00073 return true;
00074 }
00075
00076 ssize_t
00077 ShmemSendStrategy::send_bytes_i(const iovec iov[], int n)
00078 {
00079 const size_t hdr_sz = sizeof(current_data_->transport_header_);
00080 if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) {
00081 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00082 "expecting iov[0] of size %B, got %B\n",
00083 link_, hdr_sz, iov[0].iov_len), 0);
00084 return -1;
00085 }
00086
00087 if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base,
00088 sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) {
00089 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00090 "expecting iov[0] to contain the transport header\n", link_), 0);
00091 return -1;
00092 }
00093
00094
00095
00096
00097
00098 size_t pool_alloc_size = 0;
00099 for (int i = 1 ; i < n; ++i) {
00100 pool_alloc_size += iov[i].iov_len;
00101 }
00102
00103 ShmemAllocator* alloc = link_->local_allocator();
00104 void* from_pool = alloc->malloc(pool_alloc_size);
00105 if (from_pool == 0) {
00106 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00107 "to allocate %B bytes for data\n", link_, pool_alloc_size), 0);
00108 errno = ENOMEM;
00109 return -1;
00110 }
00111
00112 char* payload = reinterpret_cast<char*>(from_pool);
00113 char* iter = payload;
00114 for (int i = 1 ; i < n; ++i) {
00115 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00116 iter += iov[i].iov_len;
00117 }
00118
00119 void* mem = 0;
00120 alloc->find(bound_name_.c_str(), mem);
00121
00122 for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem);
00123 iter->status_ != SHMEM_DATA_END_OF_ALLOC; ++iter) {
00124 if (iter->status_ == SHMEM_DATA_RECV_DONE) {
00125 alloc->free(iter->payload_);
00126
00127
00128
00129 iter->status_ = SHMEM_DATA_FREE;
00130 VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00131 "releasing control block #%d\n", link_,
00132 iter - reinterpret_cast<ShmemData*>(mem)), 5);
00133 }
00134 }
00135
00136 if (!current_data_) {
00137 current_data_ = reinterpret_cast<ShmemData*>(mem);
00138 }
00139
00140 for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_IN_USE ||
00141 current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00142 if (!start) {
00143 start = current_data_;
00144 } else if (start == current_data_) {
00145 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of "
00146 "space for control\n", link_), 0);
00147 return -1;
00148 }
00149 if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00150 current_data_ = reinterpret_cast<ShmemData*>(mem) - 1;
00151 }
00152 }
00153
00154 if (current_data_->status_ == SHMEM_DATA_FREE) {
00155 VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00156 "writing at control block #%d header %@ payload %@ len %B\n",
00157 link_, current_data_ - reinterpret_cast<ShmemData*>(mem),
00158 current_data_->transport_header_, payload, pool_alloc_size));
00159 std::memcpy(current_data_->transport_header_, iov[0].iov_base,
00160 sizeof(current_data_->transport_header_));
00161 current_data_->payload_ = payload;
00162 current_data_->status_ = SHMEM_DATA_IN_USE;
00163 } else {
00164 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00165 "failed to find space for control\n", link_), 0);
00166 return -1;
00167 }
00168
00169 ACE_OS::sema_post(&peer_semaphore_);
00170
00171 return pool_alloc_size + iov[0].iov_len;
00172 }
00173
00174 void
00175 ShmemSendStrategy::stop_i()
00176 {
00177 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00178 ::CloseHandle(peer_semaphore_);
00179 #endif
00180 }
00181
00182 }
00183 }
00184
00185 OPENDDS_END_VERSIONED_NAMESPACE_DECL