#include <ShmemSendStrategy.h>
Inheritance diagram for OpenDDS::DCPS::ShmemSendStrategy:
Public Member Functions | |
ShmemSendStrategy (ShmemDataLink *link) | |
virtual bool | start_i () |
Let the subclass start. | |
virtual void | stop_i () |
Let the subclass stop. | |
Protected Member Functions | |
virtual ssize_t | send_bytes_i (const iovec iov[], int n) |
Private Attributes | |
ShmemDataLink * | link_ |
std::string | bound_name_ |
ACE_sema_t | peer_semaphore_ |
ShmemData * | current_data_ |
Definition at line 25 of file ShmemSendStrategy.h.
OpenDDS::DCPS::ShmemSendStrategy::ShmemSendStrategy | ( | ShmemDataLink * | link | ) | [explicit] |
Definition at line 19 of file ShmemSendStrategy.cpp.
References peer_semaphore_.
00020 : TransportSendStrategy(0, TransportInst_rch(link->config(), false), 00021 0, // synch_resource 00022 link->transport_priority(), 00023 new NullSynchStrategy) 00024 , link_(link) 00025 , current_data_(0) 00026 { 00027 #ifdef ACE_HAS_POSIX_SEM 00028 peer_semaphore_.name_ = 0; 00029 peer_semaphore_.sema_ = 0; 00030 #endif 00031 }
ssize_t OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i | ( | const iovec | iov[], | |
int | n | |||
) | [protected, virtual] |
Implements OpenDDS::DCPS::TransportSendStrategy.
Definition at line 76 of file ShmemSendStrategy.cpp.
References bound_name_, current_data_, OpenDDS::DCPS::TransportHeader::DCPS_PROTOCOL, link_, OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, OpenDDS::DCPS::SHMEM_DATA_FREE, OpenDDS::DCPS::SHMEM_DATA_RECV_DONE, OpenDDS::DCPS::ShmemData::transport_header_, and VDBG_LVL.
00077 { 00078 const size_t hdr_sz = sizeof(current_data_->transport_header_); 00079 if (static_cast<size_t>(iov[0].iov_len) != hdr_sz) { 00080 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ " 00081 "expecting iov[0] of size %B, got %B\n", 00082 link_, hdr_sz, iov[0].iov_len), 0); 00083 return -1; 00084 } 00085 // see TransportHeader::valid(), but this is for the marshaled form 00086 if (std::memcmp(&TransportHeader::DCPS_PROTOCOL[0], iov[0].iov_base, 00087 sizeof(TransportHeader::DCPS_PROTOCOL)) != 0) { 00088 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ " 00089 "expecting iov[0] to contain the transport header\n", link_), 0); 00090 return -1; 00091 } 00092 00093 //FUTURE: use the ShmemTransport object to see if we already have the 00094 // same payload data available in the pool (from other DataLinks), 00095 // and if so, add a refcount to the start of the "from_pool" allocation 00096 00097 size_t pool_alloc_size = 0; 00098 for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) { 00099 pool_alloc_size += iov[i].iov_len; 00100 } 00101 00102 ShmemAllocator* alloc = link_->local_allocator(); 00103 void* from_pool = alloc->malloc(pool_alloc_size); 00104 if (from_pool == 0) { 00105 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed " 00106 "to allocate %B bytes for data\n", link_, pool_alloc_size), 0); 00107 errno = ENOMEM; 00108 return -1; 00109 } 00110 00111 char* payload = reinterpret_cast<char*>(from_pool); 00112 char* iter = payload; 00113 for (int i = 1 /* skip TransportHeader in [0] */; i < n; ++i) { 00114 std::memcpy(iter, iov[i].iov_base, iov[i].iov_len); 00115 iter += iov[i].iov_len; 00116 } 00117 00118 void* mem = 0; 00119 alloc->find(bound_name_.c_str(), mem); 00120 00121 for (ShmemData* iter = reinterpret_cast<ShmemData*>(mem); 00122 iter->status_ != SHMEM_DATA_END_OF_ALLOC; ++iter) { 00123 if (iter->status_ == SHMEM_DATA_RECV_DONE) { 00124 alloc->free(iter->payload_); 00125 // This will eventually be refcounted so instead of a free(), the previous 00126 // statement would decrement the refcount and check for 0 before free(). 00127 // See the 'FUTURE' comment above. 00128 iter->status_ = SHMEM_DATA_FREE; 00129 VDBG_LVL((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ " 00130 "releasing control block #%d\n", link_, 00131 iter - reinterpret_cast<ShmemData*>(mem)), 5); 00132 } 00133 } 00134 00135 if (!current_data_) { 00136 current_data_ = reinterpret_cast<ShmemData*>(mem); 00137 } 00138 00139 for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_IN_USE || 00140 current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) { 00141 if (!start) { 00142 start = current_data_; 00143 } else if (start == current_data_) { 00144 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ out of " 00145 "space for control\n", link_), 0); 00146 return -1; 00147 } 00148 if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) { 00149 current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop 00150 } 00151 } 00152 00153 if (current_data_->status_ == SHMEM_DATA_FREE) { 00154 VDBG((LM_DEBUG, "(%P|%t) ShmemSendStrategy for link %@ " 00155 "writing at control block #%d header %@ payload %@ len %B\n", 00156 link_, current_data_ - reinterpret_cast<ShmemData*>(mem), 00157 current_data_->transport_header_, payload, pool_alloc_size)); 00158 std::memcpy(current_data_->transport_header_, iov[0].iov_base, 00159 sizeof(current_data_->transport_header_)); 00160 current_data_->payload_ = payload; 00161 current_data_->status_ = SHMEM_DATA_IN_USE; 00162 } else { 00163 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ " 00164 "failed to find space for control\n", link_), 0); 00165 return -1; 00166 } 00167 00168 ACE_OS::sema_post(&peer_semaphore_); 00169 00170 return pool_alloc_size + iov[0].iov_len; 00171 }
bool OpenDDS::DCPS::ShmemSendStrategy::start_i | ( | ) | [virtual] |
Let the subclass start.
Reimplemented from OpenDDS::DCPS::TransportSendStrategy.
Definition at line 34 of file ShmemSendStrategy.cpp.
References bound_name_, OpenDDS::DCPS::ShmemDataLink::config(), OpenDDS::DCPS::ShmemInst::datalink_control_size_, link_, OpenDDS::DCPS::ShmemDataLink::local_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_address(), OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::ShmemDataLink::peer_pid(), peer_semaphore_, OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, and VDBG_LVL.
00035 { 00036 bound_name_ = "Write-" + link_->peer_address(); 00037 ShmemAllocator* alloc = link_->local_allocator(); 00038 00039 const size_t n_bytes = link_->config()->datalink_control_size_, 00040 n_elems = n_bytes / sizeof(ShmemData), 00041 extra = n_bytes % sizeof(ShmemData); 00042 00043 void* mem = alloc->calloc(n_bytes); 00044 if (mem == 0) { 00045 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemSendStrategy for link %@ failed " 00046 "to allocate %B bytes for control\n", link_, n_bytes), 0); 00047 return false; 00048 } 00049 00050 ShmemData* data = reinterpret_cast<ShmemData*>(mem); 00051 data[(extra >= sizeof(int)) ? n_elems : (n_elems - 1)].status_ = 00052 SHMEM_DATA_END_OF_ALLOC; 00053 alloc->bind(bound_name_.c_str(), mem); 00054 00055 ShmemAllocator* peer = link_->peer_allocator(); 00056 peer->find("Semaphore", mem); 00057 ShmemSharedSemaphore* sem = reinterpret_cast<ShmemSharedSemaphore*>(mem); 00058 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE 00059 HANDLE srcProc = ::OpenProcess(PROCESS_DUP_HANDLE, false /*bInheritHandle*/, 00060 link_->peer_pid()); 00061 ::DuplicateHandle(srcProc, *sem, GetCurrentProcess(), &peer_semaphore_, 00062 0 /*dwDesiredAccess -- ignored*/, 00063 false /*bInheritHandle*/, 00064 DUPLICATE_SAME_ACCESS /*dwOptions*/); 00065 ::CloseHandle(srcProc); 00066 #elif defined ACE_HAS_POSIX_SEM 00067 peer_semaphore_.sema_ = sem; 00068 peer_semaphore_.name_ = 0; 00069 #else 00070 ACE_UNUSED_ARG(sem); 00071 #endif 00072 return true; 00073 }
void OpenDDS::DCPS::ShmemSendStrategy::stop_i | ( | ) | [virtual] |
Let the subclass stop.
Implements OpenDDS::DCPS::TransportSendStrategy.
Definition at line 174 of file ShmemSendStrategy.cpp.
References peer_semaphore_.
00175 { 00176 #if defined ACE_WIN32 && !defined ACE_HAS_WINCE 00177 ::CloseHandle(peer_semaphore_); 00178 #endif 00179 }
std::string OpenDDS::DCPS::ShmemSendStrategy::bound_name_ [private] |
ACE_sema_t OpenDDS::DCPS::ShmemSendStrategy::peer_semaphore_ [private] |
Definition at line 39 of file ShmemSendStrategy.h.
Referenced by ShmemSendStrategy(), start_i(), and stop_i().