OpenDDS::DCPS::ShmemReceiveStrategy Class Reference

#include <ShmemReceiveStrategy.h>

Inheritance diagram for OpenDDS::DCPS::ShmemReceiveStrategy:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ShmemReceiveStrategy:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ShmemReceiveStrategy (ShmemDataLink *link)
void read ()

Protected Member Functions

virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd)
 Only our subclass knows how to do this.
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
 Called when there is a ReceivedDataSample to be delivered.
virtual int start_i ()
 Let the subclass start.
virtual void stop_i ()
 Let the subclass stop.

Private Attributes

ShmemDataLinklink_
std::string bound_name_
ShmemDatacurrent_data_
size_t partial_recv_remaining_
const char * partial_recv_ptr_

Detailed Description

Definition at line 24 of file ShmemReceiveStrategy.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ShmemReceiveStrategy::ShmemReceiveStrategy ( ShmemDataLink link  )  [explicit]

Definition at line 18 of file ShmemReceiveStrategy.cpp.

00019   : link_(link)
00020   , current_data_(0)
00021   , partial_recv_remaining_(0)
00022   , partial_recv_ptr_(0)
00023 {
00024 }


Member Function Documentation

void OpenDDS::DCPS::ShmemReceiveStrategy::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr &  remote_address 
) [protected, virtual]

Called when there is a ReceivedDataSample to be delivered.

Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 174 of file ShmemReceiveStrategy.cpp.

References OpenDDS::DCPS::ShmemDataLink::control_received(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::ReceivedDataSample::header_, link_, OpenDDS::DCPS::DataSampleHeader::message_id_, and OpenDDS::DCPS::TRANSPORT_CONTROL.

00176 {
00177   switch (sample.header_.message_id_) {
00178 
00179   case TRANSPORT_CONTROL:
00180     link_->control_received(sample);
00181     break;
00182 
00183   default:
00184     link_->data_received(sample);
00185   }
00186 }

void OpenDDS::DCPS::ShmemReceiveStrategy::read (  ) 

Definition at line 27 of file ShmemReceiveStrategy.cpp.

References bound_name_, current_data_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), link_, OpenDDS::DCPS::ShmemDataLink::local_address(), partial_recv_remaining_, OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::SHMEM_DATA_END_OF_ALLOC, OpenDDS::DCPS::SHMEM_DATA_FREE, OpenDDS::DCPS::SHMEM_DATA_RECV_DONE, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start(), OpenDDS::DCPS::ShmemData::status_, VDBG, and VDBG_LVL.

00028 {
00029   if (partial_recv_remaining_) {
00030     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00031           "resuming partial recv\n", link_));
00032     handle_dds_input(ACE_INVALID_HANDLE);
00033     return;
00034   }
00035 
00036   if (bound_name_.empty()) {
00037     bound_name_ = "Write-" + link_->local_address();
00038   }
00039 
00040   ShmemAllocator* alloc = link_->peer_allocator();
00041   void* mem = 0;
00042   if (-1 == alloc->find(bound_name_.c_str(), mem)) {
00043     VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00044               "peer allocator not found, receive_bytes will close link\n",
00045               link_), 1);
00046     handle_dds_input(ACE_INVALID_HANDLE); // will return 0 to the TRecvStrateg.
00047     return;
00048   }
00049 
00050   if (!current_data_) {
00051     current_data_ = reinterpret_cast<ShmemData*>(mem);
00052   }
00053 
00054   for (ShmemData* start = 0; current_data_->status_ == SHMEM_DATA_FREE ||
00055          current_data_->status_ == SHMEM_DATA_RECV_DONE; ++current_data_) {
00056     if (!start) {
00057       start = current_data_;
00058     } else if (start == current_data_) {
00059       return; // none found => don't call handle_dds_input()
00060     }
00061     if (current_data_[1].status_ == SHMEM_DATA_END_OF_ALLOC) {
00062       current_data_ = reinterpret_cast<ShmemData*>(mem) - 1; // incremented by the for loop
00063     }
00064   }
00065 
00066   VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::read link %@ "
00067         "reading at control block #%d\n",
00068         link_, current_data_ - reinterpret_cast<ShmemData*>(mem)));
00069   // If we get this far, current_data_ points to the first SHMEM_DATA_IN_USE.
00070   // handle_dds_input() will call our receive_bytes() to get the data.
00071   handle_dds_input(ACE_INVALID_HANDLE);
00072 }

ssize_t OpenDDS::DCPS::ShmemReceiveStrategy::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr &  remote_address,
ACE_HANDLE  fd 
) [protected, virtual]

Only our subclass knows how to do this.

Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 75 of file ShmemReceiveStrategy.cpp.

References bound_name_, current_data_, OpenDDS::DCPS::TransportHeader::get_length(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_, link_, partial_recv_ptr_, partial_recv_remaining_, OpenDDS::DCPS::ShmemData::payload_, OpenDDS::DCPS::ShmemDataLink::peer_allocator(), OpenDDS::DCPS::SHMEM_DATA_IN_USE, OpenDDS::DCPS::SHMEM_DATA_RECV_DONE, OpenDDS::DCPS::ShmemDataLink::signal_semaphore(), OpenDDS::DCPS::ShmemData::status_, OpenDDS::DCPS::ShmemData::transport_header_, VDBG, and VDBG_LVL.

00079 {
00080   VDBG((LM_DEBUG,
00081         "(%P|%t) ShmemReceiveStrategy::receive_bytes link %@\n", link_));
00082 
00083   // check that the writer's shared memory is still available
00084   ShmemAllocator* alloc = link_->peer_allocator();
00085   void* mem;
00086   if (-1 == alloc->find(bound_name_.c_str(), mem)
00087       || current_data_->status_ != SHMEM_DATA_IN_USE) {
00088     VDBG_LVL((LM_INFO, "(%P|%t) ShmemReceiveStrategy::receive_bytes closing\n"),
00089              1);
00090     gracefully_disconnected_ = true; // do not attempt reconnect via relink()
00091     return 0; // close "connection"
00092   }
00093 
00094   ssize_t total = 0;
00095 
00096   const char* src_iter;
00097   char* dst_iter = 0;
00098   int i = 0; // current iovec index in iov[]
00099   size_t remaining;
00100 
00101   if (partial_recv_remaining_) {
00102     remaining = partial_recv_remaining_;
00103     src_iter = partial_recv_ptr_;
00104     // iov_base is void* on POSIX but char* on Win32, we'll have to cast:
00105     dst_iter = (char*)iov[0].iov_base;
00106 
00107   } else {
00108     remaining = TransportHeader::get_length(current_data_->transport_header_);
00109     const size_t hdr_sz = sizeof(current_data_->transport_header_);
00110     // BUFFER_LOW_WATER in the framework ensures a large enough buffer
00111     if (static_cast<size_t>(iov[0].iov_len) <= hdr_sz) {
00112       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00113                 "receive buffer of length %d is too small\n",
00114                 iov[0].iov_len), 0);
00115       errno = ENOBUFS;
00116       return -1;
00117     }
00118 
00119     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00120           "header %@ payload %@ len %B\n", current_data_->transport_header_,
00121           (char*)current_data_->payload_, remaining));
00122     std::memcpy(iov[0].iov_base, current_data_->transport_header_, hdr_sz);
00123     total += hdr_sz;
00124     src_iter = current_data_->payload_;
00125     if (static_cast<size_t>(iov[0].iov_len) > hdr_sz) {
00126       dst_iter = (char*)iov[0].iov_base + hdr_sz;
00127     } else if (n > 1) {
00128       dst_iter = (char*)iov[1].iov_base;
00129       i = 1;
00130     }
00131   }
00132 
00133   for (; i < n && remaining; ++i) {
00134     const size_t space = (i == 0) ? iov[i].iov_len - total : iov[i].iov_len,
00135       chunk = std::min(space, remaining);
00136 
00137 #ifdef ACE_WIN32
00138     if (alloc->memory_pool().remap((void*)(src_iter + chunk - 1)) == -1) {
00139       VDBG_LVL((LM_ERROR, "(%P|%t) ERROR: ShmemReceiveStrategy::receive_bytes "
00140                 "shared memory pool couldn't be extended\n"), 0);
00141       errno = ENOMEM;
00142       return -1;
00143     }
00144 #endif
00145 
00146     std::memcpy(dst_iter, src_iter, chunk);
00147     if (i < n - 1) {
00148       dst_iter = (char*)iov[i + 1].iov_base;
00149     }
00150     remaining -= chunk;
00151     total += chunk;
00152     src_iter += chunk;
00153   }
00154 
00155   if (remaining) {
00156     partial_recv_remaining_ = remaining;
00157     partial_recv_ptr_ = src_iter;
00158     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00159           "receive was partial\n"));
00160     link_->signal_semaphore();
00161 
00162   } else {
00163     partial_recv_remaining_ = 0;
00164     partial_recv_ptr_ = 0;
00165     VDBG((LM_DEBUG, "(%P|%t) ShmemReceiveStrategy::receive_bytes "
00166           "receive done\n"));
00167     current_data_->status_ = SHMEM_DATA_RECV_DONE;
00168   }
00169 
00170   return total;
00171 }

int OpenDDS::DCPS::ShmemReceiveStrategy::start_i (  )  [protected, virtual]

Let the subclass start.

Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 189 of file ShmemReceiveStrategy.cpp.

00190 {
00191   return 0;
00192 }

void OpenDDS::DCPS::ShmemReceiveStrategy::stop_i (  )  [protected, virtual]

Let the subclass stop.

Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 195 of file ShmemReceiveStrategy.cpp.

00196 {
00197 }


Member Data Documentation

std::string OpenDDS::DCPS::ShmemReceiveStrategy::bound_name_ [private]

Definition at line 45 of file ShmemReceiveStrategy.h.

Referenced by read(), and receive_bytes().

ShmemData* OpenDDS::DCPS::ShmemReceiveStrategy::current_data_ [private]

Definition at line 46 of file ShmemReceiveStrategy.h.

Referenced by read(), and receive_bytes().

ShmemDataLink* OpenDDS::DCPS::ShmemReceiveStrategy::link_ [private]

Definition at line 44 of file ShmemReceiveStrategy.h.

Referenced by deliver_sample(), read(), and receive_bytes().

const char* OpenDDS::DCPS::ShmemReceiveStrategy::partial_recv_ptr_ [private]

Definition at line 48 of file ShmemReceiveStrategy.h.

Referenced by receive_bytes().

size_t OpenDDS::DCPS::ShmemReceiveStrategy::partial_recv_remaining_ [private]

Definition at line 47 of file ShmemReceiveStrategy.h.

Referenced by read(), and receive_bytes().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:37 2016 for OpenDDS by  doxygen 1.4.7