#include <ShmemReceiveStrategy.h>
Inheritance diagram for OpenDDS::DCPS::ShmemReceiveStrategy:
Public Member Functions | |
ShmemReceiveStrategy (ShmemDataLink *link) | |
void | read () |
Protected Member Functions | |
virtual ssize_t | receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd) |
Only our subclass knows how to do this. | |
virtual void | deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address) |
Called when there is a ReceivedDataSample to be delivered. | |
virtual int | start_i () |
Let the subclass start. | |
virtual void | stop_i () |
Let the subclass stop. | |
Private Attributes | |
ShmemDataLink * | link_ |
std::string | bound_name_ |
ShmemData * | current_data_ |
size_t | partial_recv_remaining_ |
const char * | partial_recv_ptr_ |
Definition at line 24 of file ShmemReceiveStrategy.h.
OpenDDS::DCPS::ShmemReceiveStrategy::ShmemReceiveStrategy | ( | ShmemDataLink * | link | ) | [explicit] |
Definition at line 18 of file ShmemReceiveStrategy.cpp.
00019 : link_(link) 00020 , current_data_(0) 00021 , partial_recv_remaining_(0) 00022 , partial_recv_ptr_(0) 00023 { 00024 }
void OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample | ( | ReceivedDataSample & | sample, | |
const ACE_INET_Addr & | remote_address | |||
) | [protected, virtual] |
Called when there is a ReceivedDataSample to be delivered.
Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 174 of file ShmemReceiveStrategy.cpp.
References OpenDDS::DCPS::ShmemDataLink::control_received(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, link_, OpenDDS::DCPS::DataSampleHeader::message_id_, and OpenDDS::DCPS::TRANSPORT_CONTROL.
00176 { 00177 switch (sample.header_.message_id_) { 00178 00179 case TRANSPORT_CONTROL: 00180 link_->control_received(sample); 00181 break; 00182 00183 default: 00184 link_->data_received(sample); 00185 } 00186 }
void OpenDDS::DCPS::ShmemReceiveStrategy::read | ( | ) |
Definition at line 27 of file ShmemReceiveStrategy.cpp.
References bound_name_, current_data_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), link_, OpenDDS::DCPS::ShmemDataLink::local_address(), partial_recv_remaining_, OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, OpenDDS::DCPS::SHMEM_DATA_FREE, OpenDDS::DCPS::SHMEM_DATA_RECV_DONE, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start(), OpenDDS::DCPS::ShmemData::status_, VDBG, and VDBG_LVL.
00028 { 00029 if (partial_recv_remaining_) { 00030 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ " 00031 "resuming partial recv\n", link_)); 00032 handle_dds_input(ACE_INVALID_HANDLE); 00033 return; 00034 } 00035 00036 if (bound_name_.empty()) { 00037 bound_name_ = "Write-" + link_->local_address(); 00038 } 00039 00040 ShmemAllocator* alloc = link_->peer_allocator(); 00041 void* mem = 0; 00042 if (-1 == alloc->find(bound_name_.c_str(), mem)) { 00043 VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::read link %@ " 00044 "peer allocator not found, receive_bytes will close link\n", 00045 link_), 1); 00046 handle_dds_input(ACE_INVALID_HANDLE); // will return 0 to the TRecvStrateg. 00047 return; 00048 } 00049 00050 if (!current_data_) { 00051 current_data_ = reinterpret_cast<ShmemData*>(mem); 00052 } 00053 00054 for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_FREE || 00055 current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) { 00056 if (!start) { 00057 start = current_data_; 00058 } else if (start == current_data_) { 00059 return; // none found => don't call handle_dds_input() 00060 } 00061 if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) { 00062 current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop 00063 } 00064 } 00065 00066 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ " 00067 "reading at control block #%d\n", 00068 link_, current_data_ - reinterpret_cast<ShmemData*>(mem))); 00069 // If we get this far, current_data_ points to the first SHMEM_DATA_IN_USE. 00070 // handle_dds_input() will call our receive_bytes() to get the data. 00071 handle_dds_input(ACE_INVALID_HANDLE); 00072 }
ssize_t OpenDDS::DCPS::ShmemReceiveStrategy::receive_bytes | ( | iovec | iov[], | |
int | n, | |||
ACE_INET_Addr & | remote_address, | |||
ACE_HANDLE | fd | |||
) | [protected, virtual] |
Only our subclass knows how to do this.
Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 75 of file ShmemReceiveStrategy.cpp.
References bound_name_, current_data_, OpenDDS::DCPS::TransportHeader::get_length(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_, link_, partial_recv_ptr_, partial_recv_remaining_, OpenDDS::DCPS::ShmemData::payload_, OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::SHMEM_DATA_IN_USE, OpenDDS::DCPS::SHMEM_DATA_RECV_DONE, OpenDDS::DCPS::ShmemDataLink::signal_semaphore(), OpenDDS::DCPS::ShmemData::status_, OpenDDS::DCPS::ShmemData::transport_header_, VDBG, and VDBG_LVL.
00079 { 00080 VDBG((LM_DEBUG, 00081 "(%P|%t) ShmemReceiveStrategy::receive_bytes link %@\n", link_)); 00082 00083 // check that the writer's shared memory is still available 00084 ShmemAllocator* alloc = link_->peer_allocator(); 00085 void* mem; 00086 if (-1 == alloc->find(bound_name_.c_str(), mem) 00087 || current_data_->status_ != SHMEM_DATA_IN_USE) { 00088 VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::receive_bytes closing\n"), 00089 1); 00090 gracefully_disconnected_ = true; // do not attempt reconnect via relink() 00091 return 0; // close "connection" 00092 } 00093 00094 ssize_t total = 0; 00095 00096 const char* src_iter; 00097 char* dst_iter = 0; 00098 int i = 0; // current iovec index in iov[] 00099 size_t remaining; 00100 00101 if (partial_recv_remaining_) { 00102 remaining = partial_recv_remaining_; 00103 src_iter = partial_recv_ptr_; 00104 // iov_base is void* on POSIX but char* on Win32, we'll have to cast: 00105 dst_iter = (char*)iov[0].iov_base; 00106 00107 } else { 00108 remaining = TransportHeader::get_length(current_data_->transport_header_); 00109 const size_t hdr_sz = sizeof(current_data_->transport_header_); 00110 // BUFFER_LOW_WATER in the framework ensures a large enough buffer 00111 if (static_cast<size_t>(iov[0].iov_len) <= hdr_sz) { 00112 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes " 00113 "receive buffer of length %d is too small\n", 00114 iov[0].iov_len), 0); 00115 errno = ENOBUFS; 00116 return -1; 00117 } 00118 00119 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes " 00120 "header %@ payload %@ len %B\n", current_data_->transport_header_, 00121 (char*)current_data_->payload_, remaining)); 00122 std::memcpy(iov[0].iov_base, current_data_->transport_header_, hdr_sz); 00123 total += hdr_sz; 00124 src_iter = current_data_->payload_; 00125 if (static_cast<size_t>(iov[0].iov_len) > hdr_sz) { 00126 dst_iter = (char*)iov[0].iov_base + hdr_sz; 00127 } else if (n > 1) { 00128 dst_iter = (char*)iov[1].iov_base; 00129 i = 1; 00130 } 00131 } 00132 00133 for (; i < n && remaining; ++i) { 00134 const size_t space = (i == 0) ? iov[i].iov_len - total : iov[i].iov_len, 00135 chunk = std::min(space, remaining); 00136 00137 #ifdef ACE_WIN32 00138 if (alloc->memory_pool().remap((void*)(src_iter + chunk - 1)) == -1) { 00139 VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes " 00140 "shared memory pool couldn't be extended\n"), 0); 00141 errno = ENOMEM; 00142 return -1; 00143 } 00144 #endif 00145 00146 std::memcpy(dst_iter, src_iter, chunk); 00147 if (i < n - 1) { 00148 dst_iter = (char*)iov[i + 1].iov_base; 00149 } 00150 remaining -= chunk; 00151 total += chunk; 00152 src_iter += chunk; 00153 } 00154 00155 if (remaining) { 00156 partial_recv_remaining_ = remaining; 00157 partial_recv_ptr_ = src_iter; 00158 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes " 00159 "receive was partial\n")); 00160 link_->signal_semaphore(); 00161 00162 } else { 00163 partial_recv_remaining_ = 0; 00164 partial_recv_ptr_ = 0; 00165 VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes " 00166 "receive done\n")); 00167 current_data_->status_ = SHMEM_DATA_RECV_DONE; 00168 } 00169 00170 return total; 00171 }
int OpenDDS::DCPS::ShmemReceiveStrategy::start_i | ( | ) | [protected, virtual] |
Let the subclass start.
Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 189 of file ShmemReceiveStrategy.cpp.
void OpenDDS::DCPS::ShmemReceiveStrategy::stop_i | ( | ) | [protected, virtual] |
Let the subclass stop.
Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.
Definition at line 195 of file ShmemReceiveStrategy.cpp.
std::string OpenDDS::DCPS::ShmemReceiveStrategy::bound_name_ [private] |
Definition at line 44 of file ShmemReceiveStrategy.h.
Referenced by deliver_sample(), read(), and receive_bytes().
const char* OpenDDS::DCPS::ShmemReceiveStrategy::partial_recv_ptr_ [private] |
size_t OpenDDS::DCPS::ShmemReceiveStrategy::partial_recv_remaining_ [private] |