00001
00002
00003
00004
00005
00006
00007
00008 #include "ShmemSendStrategy.h"
00009 #include "ShmemDataLink.h"
00010 #include "ShmemInst.h"
00011
00012 #include "dds/DCPS/transport/framework/NullSynchStrategy.h"
00013
00014 #include <cstring>
00015
00016 namespace OpenDDS {
00017 namespace DCPS {
00018
00019 ShmemSendStrategy::ShmemSendStrategy(ShmemDataLink* link)
00020 : TransportSendStrategy(0, TransportInst_rch(link->config(), false),
00021 0,
00022 link->transport_priority(),
00023 new NullSynchStrategy)
00024 , link_(link)
00025 , current_data_(0)
00026 {
00027 #ifdef ACE_HAS_POSIX_SEM
00028 peer_semaphore_.name_ = 0;
00029 peer_semaphore_.sema_ = 0;
00030 #endif
00031 }
00032
00033 bool
00034 ShmemSendStrategy::start_i()
00035 {
00036 bound_name_ = "Write-" + link_->peer_address();
00037 ShmemAllocator* alloc = link_->local_allocator();
00038
00039 const size_t n_bytes = link_->config()->datalink_control_size_,
00040 n_elems = n_bytes / sizeof(ShmemData),
00041 extra = n_bytes % sizeof(ShmemData);
00042
00043 void* mem = alloc->calloc(n_bytes);
00044 if (mem == 0) {
00045 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00046 "to allocate %B bytes for control\n", link_, n_bytes), 0);
00047 return false;
00048 }
00049
00050 ShmemData* data = reinterpret_cast<ShmemData*>(mem);
00051 data[(extra >= sizeof(int)) ? n_elems : (n_elems - 1)].status_ =
00052 SHMEM_DATA_END_OF_ALLOC;
00053 alloc->bind(bound_name_.c_str(), mem);
00054
00055 ShmemAllocator* peer = link_->peer_allocator();
00056 peer->find("Semaphore", mem);
00057 ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem);
00058 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00059 HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false ,
00060 link_->peer_pid());
00061 ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_,
00062 0 ,
00063 false ,
00064 DUPLICATE_SAME_ACCESS );
00065 ::CloseHandle(srcProc);
00066 #elif defined ACE_HAS_POSIX_SEM
00067 peer_semaphore_.sema_ = sem;
00068 peer_semaphore_.name_ = 0;
00069 #else
00070 ACE_UNUSED_ARG(sem);
00071 #endif
00072 return true;
00073 }
00074
00075 ssize_t
00076 ShmemSendStrategy::send_bytes_i(const iovec iov[], int n)
00077 {
00078 const size_t hdr_sz = sizeof(current_data_->transport_header_);
00079 if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) {
00080 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00081 "expecting iov[0] of size %B, got %B\n",
00082 link_, hdr_sz, iov[0].iov_len), 0);
00083 return -1;
00084 }
00085
00086 if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base,
00087 sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) {
00088 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00089 "expecting iov[0] to contain the transport header\n", link_), 0);
00090 return -1;
00091 }
00092
00093
00094
00095
00096
00097 size_t pool_alloc_size = 0;
00098 for (int i = 1 ; i < n; ++i) {
00099 pool_alloc_size += iov[i].iov_len;
00100 }
00101
00102 ShmemAllocator* alloc = link_->local_allocator();
00103 void* from_pool = alloc->malloc(pool_alloc_size);
00104 if (from_pool == 0) {
00105 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed "
00106 "to allocate %B bytes for data\n", link_, pool_alloc_size), 0);
00107 errno = ENOMEM;
00108 return -1;
00109 }
00110
00111 char* payload = reinterpret_cast<char*>(from_pool);
00112 char* iter = payload;
00113 for (int i = 1 ; i < n; ++i) {
00114 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
00115 iter += iov[i].iov_len;
00116 }
00117
00118 void* mem = 0;
00119 alloc->find(bound_name_.c_str(), mem);
00120
00121 for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem);
00122 iter->status_ != SHMEM_DATA_END_OF_ALLOC; ++iter) {
00123 if (iter->status_ == SHMEM_DATA_RECV_DONE) {
00124 alloc->free(iter->payload_);
00125
00126
00127
00128 iter->status_ = SHMEM_DATA_FREE;
00129 VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00130 "releasing control block #%d\n", link_,
00131 iter - reinterpret_cast<ShmemData*>(mem)), 5);
00132 }
00133 }
00134
00135 if (!current_data_) {
00136 current_data_ = reinterpret_cast<ShmemData*>(mem);
00137 }
00138
00139 for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_IN_USE ||
00140 current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00141 if (!start) {
00142 start = current_data_;
00143 } else if (start == current_data_) {
00144 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of "
00145 "space for control\n", link_), 0);
00146 return -1;
00147 }
00148 if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00149 current_data_ = reinterpret_cast<ShmemData*>(mem) - 1;
00150 }
00151 }
00152
00153 if (current_data_->status_ == SHMEM_DATA_FREE) {
00154 VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ "
00155 "writing at control block #%d header %@ payload %@ len %B\n",
00156 link_, current_data_ - reinterpret_cast<ShmemData*>(mem),
00157 current_data_->transport_header_, payload, pool_alloc_size));
00158 std::memcpy(current_data_->transport_header_, iov[0].iov_base,
00159 sizeof(current_data_->transport_header_));
00160 current_data_->payload_ = payload;
00161 current_data_->status_ = SHMEM_DATA_IN_USE;
00162 } else {
00163 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ "
00164 "failed to find space for control\n", link_), 0);
00165 return -1;
00166 }
00167
00168 ACE_OS::sema_post(&peer_semaphore_);
00169
00170 return pool_alloc_size + iov[0].iov_len;
00171 }
00172
00173 void
00174 ShmemSendStrategy::stop_i()
00175 {
00176 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE
00177 ::CloseHandle(peer_semaphore_);
00178 #endif
00179 }
00180
00181 }
00182 }