#include <ShmemSendStrategy.h>
Public Member Functions | |
ShmemSendStrategy (ShmemDataLink *link) | |
virtual bool | start_i () |
Let the subclass start. | |
virtual void | stop_i () |
Let the subclass stop. | |
Protected Member Functions | |
virtual ssize_t | send_bytes_i (const iovec iov[], int n) |
Private Attributes | |
ShmemDataLink * | link_ |
std::string | bound_name_ |
ACE_sema_t | peer_semaphore_ |
ShmemData * | current_data_ |
const size_t | datalink_control_size_ |
Definition at line 29 of file ShmemSendStrategy.h.
OpenDDS::DCPS::ShmemSendStrategy::ShmemSendStrategy | ( | ShmemDataLink * | link | ) |
Definition at line 21 of file ShmemSendStrategy.cpp.
References memset(), and peer_semaphore_.
00022 : TransportSendStrategy(0, link->impl(), 00023 0, // synch_resource 00024 link->transport_priority(), 00025 make_rch<NullSynchStrategy>()) 00026 , link_(link) 00027 , current_data_(0) 00028 , datalink_control_size_(link->impl().config().datalink_control_size_) 00029 { 00030 #ifdef ACE_HAS_POSIX_SEM 00031 memset(&peer_semaphore_, 0, sizeof(peer_semaphore_)); 00032 #endif 00033 }
ssize_t OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i | ( | const iovec | iov[], | |
int | n | |||
) | [protected, virtual] |
Implements OpenDDS::DCPS::TransportSendStrategy.
Definition at line 77 of file ShmemSendStrategy.cpp.
References bound_name_, current_data_, OpenDDS::DCPS::TransportHeader::DCPS_PROTOCOL, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::free(), iovec::iov_base, iovec::iov_len, link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::ShmemDataLink::local_allocator(), ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::malloc(), OpenDDS::DCPS::ShmemData::payload_, peer_semaphore_, ACE_OS::sema_post(), OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, OpenDDS::DCPS::SHMEM_DATA_FREE, OpenDDS::DCPS::SHMEM_DATA_IN_USE, OpenDDS::DCPS::SHMEM_DATA_RECV_DONE, OpenDDS::DCPS::TransportSendStrategy::start(), OpenDDS::DCPS::ShmemData::status_, OpenDDS::DCPS::ShmemData::transport_header_, VDBG, and VDBG_LVL.
00078 { 00079 const size_t hdr_sz = sizeof(current_data_->transport_header_); 00080 if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) { 00081 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ " 00082 "expecting iov[0] of size %B, got %B\n", 00083 link_, hdr_sz, iov[0].iov_len), 0); 00084 return -1; 00085 } 00086 // see TransportHeader::valid(), but this is for the marshaled form 00087 if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base, 00088 sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) { 00089 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ " 00090 "expecting iov[0] to contain the transport header\n", link_), 0); 00091 return -1; 00092 } 00093 00094 //FUTURE: use the ShmemTransport object to see if we already have the 00095 // same payload data available in the pool (from other DataLinks), 00096 // and if so, add a refcount to the start of the "from_pool" allocation 00097 00098 size_t pool_alloc_size = 0; 00099 for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) { 00100 pool_alloc_size += iov[i].iov_len; 00101 } 00102 00103 ShmemAllocator* alloc = link_->local_allocator(); 00104 void* from_pool = alloc->malloc(pool_alloc_size); 00105 if (from_pool == 0) { 00106 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed " 00107 "to allocate %B bytes for data\n", link_, pool_alloc_size), 0); 00108 errno = ENOMEM; 00109 return -1; 00110 } 00111 00112 char* payload = reinterpret_cast<char*>(from_pool); 00113 char* iter = payload; 00114 for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) { 00115 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len); 00116 iter += iov[i].iov_len; 00117 } 00118 00119 void* mem = 0; 00120 alloc->find(bound_name_.c_str(), mem); 00121 00122 for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem); 00123 iter->status_ != SHMEM_DATA_END_OF_ALLOC; ++iter) { 00124 if (iter->status_ == SHMEM_DATA_RECV_DONE) { 00125 alloc->free(iter->payload_); 00126 // This will eventually be refcounted so instead of a free(), the previous 00127 // statement would decrement the refcount and check for 0 before free(). 00128 // See the 'FUTURE' comment above. 00129 iter->status_ = SHMEM_DATA_FREE; 00130 VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ " 00131 "releasing control block #%d\n", link_, 00132 iter - reinterpret_cast<ShmemData*>(mem)), 5); 00133 } 00134 } 00135 00136 if (!current_data_) { 00137 current_data_ = reinterpret_cast<ShmemData*>(mem); 00138 } 00139 00140 for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_IN_USE || 00141 current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) { 00142 if (!start) { 00143 start = current_data_; 00144 } else if (start == current_data_) { 00145 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of " 00146 "space for control\n", link_), 0); 00147 return -1; 00148 } 00149 if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) { 00150 current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop 00151 } 00152 } 00153 00154 if (current_data_->status_ == SHMEM_DATA_FREE) { 00155 VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ " 00156 "writing at control block #%d header %@ payload %@ len %B\n", 00157 link_, current_data_ - reinterpret_cast<ShmemData*>(mem), 00158 current_data_->transport_header_, payload, pool_alloc_size)); 00159 std::memcpy(current_data_->transport_header_, iov[0].iov_base, 00160 sizeof(current_data_->transport_header_)); 00161 current_data_->payload_ = payload; 00162 current_data_->status_ = SHMEM_DATA_IN_USE; 00163 } else { 00164 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ " 00165 "failed to find space for control\n", link_), 0); 00166 return -1; 00167 } 00168 00169 ACE_OS::sema_post(&peer_semaphore_); 00170 00171 return pool_alloc_size + iov[0].iov_len; 00172 }
bool OpenDDS::DCPS::ShmemSendStrategy::start_i | ( | ) | [virtual] |
Let the subclass start.
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 36 of file ShmemSendStrategy.cpp.
References ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::bind(), bound_name_, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::calloc(), datalink_control_size_, ACE_Malloc_T< class, ACE_LOCK, ACE_CB >::find(), link_, LM_ERROR, OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_address(), OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_pid(), peer_semaphore_, OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, and VDBG_LVL.
00037 { 00038 bound_name_ = "Write-" + link_->peer_address(); 00039 ShmemAllocator* alloc = link_->local_allocator(); 00040 00041 const size_t n_elems = datalink_control_size_ / sizeof(ShmemData), 00042 extra = datalink_control_size_ % sizeof(ShmemData); 00043 00044 void* mem = alloc->calloc(datalink_control_size_); 00045 if (mem == 0) { 00046 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed " 00047 "to allocate %B bytes for control\n", link_, datalink_control_size_), 0); 00048 return false; 00049 } 00050 00051 ShmemData* data = reinterpret_cast<ShmemData*>(mem); 00052 data[(extra >= sizeof(int)) ? n_elems : (n_elems - 1)].status_ = 00053 SHMEM_DATA_END_OF_ALLOC; 00054 alloc->bind(bound_name_.c_str(), mem); 00055 00056 ShmemAllocator* peer = link_->peer_allocator(); 00057 peer->find("Semaphore", mem); 00058 ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem); 00059 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE 00060 HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false /*bInheritHandle*/, 00061 link_->peer_pid()); 00062 ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_, 00063 0 /*dwDesiredAccess -- ignored*/, 00064 false /*bInheritHandle*/, 00065 DUPLICATE_SAME_ACCESS /*dwOptions*/); 00066 ::CloseHandle(srcProc); 00067 #elif defined ACE_HAS_POSIX_SEM 00068 peer_semaphore_.sema_ = sem; 00069 peer_semaphore_.name_ = 0; 00070 #else 00071 ACE_UNUSED_ARG(sem); 00072 #endif 00073 return true; 00074 }
void OpenDDS::DCPS::ShmemSendStrategy::stop_i | ( | ) | [virtual] |
Let the subclass stop.
Implements OpenDDS::DCPS::TransportSendStrategy.
Definition at line 175 of file ShmemSendStrategy.cpp.
References peer_semaphore_.
00176 { 00177 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE 00178 ::CloseHandle(peer_semaphore_); 00179 #endif 00180 }
std::string OpenDDS::DCPS::ShmemSendStrategy::bound_name_ [private] |
Definition at line 42 of file ShmemSendStrategy.h.
Referenced by send_bytes_i(), and start_i().
Definition at line 44 of file ShmemSendStrategy.h.
Referenced by send_bytes_i().
const size_t OpenDDS::DCPS::ShmemSendStrategy::datalink_control_size_ [private] |
Definition at line 45 of file ShmemSendStrategy.h.
Referenced by start_i().
Definition at line 41 of file ShmemSendStrategy.h.
Referenced by send_bytes_i(), and start_i().
Definition at line 43 of file ShmemSendStrategy.h.
Referenced by send_bytes_i(), ShmemSendStrategy(), start_i(), and stop_i().