OpenDDS::DCPS::RtpsUdpReceiveStrategy Class Reference

#include <RtpsUdpReceiveStrategy.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpReceiveStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpReceiveStrategy:
Collaboration graph
[legend]

List of all members.

Classes

struct  MessageReceiver

Public Types

typedef std::pair
< SequenceNumber,
RTPS::FragmentNumberSet
SeqFragPair

Public Member Functions

 RtpsUdpReceiveStrategy (RtpsUdpDataLink *link, const GuidPrefix_t &local_prefix)
virtual int handle_input (ACE_HANDLE fd)
bool remove_frags_from_bitmap (CORBA::Long bitmap[], CORBA::ULong num_bits, const SequenceNumber &base, const RepoId &pub_id)
void remove_fragments (const SequenceRange &range, const RepoId &pub_id)
typedef OPENDDS_VECTOR (SeqFragPair) FragmentInfo
bool has_fragments (const SequenceRange &range, const RepoId &pub_id, FragmentInfo *frag_info=0)
const ReceivedDataSamplewithhold_data_from (const RepoId &sub_id)
void do_not_withhold_data_from (const RepoId &sub_id)

Private Member Functions

virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd)
 Only our subclass knows how to do this.
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
 Called when there is a ReceivedDataSample to be delivered.
void deliver_sample_i (ReceivedDataSample &sample, const RTPS::Submessage &submessage)
virtual int start_i ()
 Let the subclass start.
virtual void stop_i ()
 Let the subclass stop.
virtual bool check_header (const RtpsTransportHeader &header)
 Check the transport header for suitability.
virtual bool check_header (const RtpsSampleHeader &header)
 Check the data sample header for suitability.
virtual bool reassemble (ReceivedDataSample &data)

Private Attributes

RtpsUdpDataLinklink_
SequenceNumber last_received_
const ReceivedDataSamplerecvd_sample_
RepoIdSet readers_withheld_
RepoIdSet readers_selected_
SequenceRange frags_
TransportReassembly reassembly_
MessageReceiver receiver_
ACE_INET_Addr remote_address_

Detailed Description

Definition at line 32 of file RtpsUdpReceiveStrategy.h.


Member Typedef Documentation

Definition at line 53 of file RtpsUdpReceiveStrategy.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::RtpsUdpReceiveStrategy::RtpsUdpReceiveStrategy ( RtpsUdpDataLink link,
const GuidPrefix_t local_prefix 
) [explicit]

Definition at line 24 of file RtpsUdpReceiveStrategy.cpp.

References defined(), if(), and OpenDDS::DCPS::SUBMESSAGE_NONE.

00025   : link_(link)
00026   , last_received_()
00027   , recvd_sample_(0)
00028   , receiver_(local_prefix)
00029 #if defined(OPENDDS_SECURITY)
00030   , secure_sample_(0)
00031 #endif
00032 {
00033 #if defined(OPENDDS_SECURITY)
00034   secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00035 #endif
00036 }

Here is the call graph for this function:


Member Function Documentation

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header ( const RtpsSampleHeader header  )  [private, virtual]

Check the data sample header for suitability.

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 538 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::Submessage::data_frag_sm, OpenDDS::RTPS::DataFragSubmessage::fragmentsInSubmessage, OpenDDS::RTPS::DataFragSubmessage::fragmentStartingNum, frags_, receiver_, OpenDDS::DCPS::RtpsSampleHeader::submessage_, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::submsg(), and OpenDDS::DCPS::RtpsSampleHeader::valid().

00539 {
00540 
00541 #if defined(OPENDDS_SECURITY)
00542   if (secure_prefix_.smHeader.submessageId) {
00543     return header.valid();
00544   }
00545 #endif
00546 
00547   receiver_.submsg(header.submessage_);
00548 
00549   // save fragmentation details for use in reassemble()
00550   if (header.valid() && header.submessage_._d() == RTPS::DATA_FRAG) {
00551     const RTPS::DataFragSubmessage& rtps = header.submessage_.data_frag_sm();
00552     frags_.first = rtps.fragmentStartingNum.value;
00553     frags_.second = frags_.first + (rtps.fragmentsInSubmessage - 1);
00554   }
00555 
00556   return header.valid();
00557 }

Here is the call graph for this function:

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header ( const RtpsTransportHeader header  )  [private, virtual]

Check the transport header for suitability.

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 526 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::RtpsTransportHeader::header_, receiver_, remote_address_, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::reset(), OpenDDS::DCPS::SUBMESSAGE_NONE, and OpenDDS::DCPS::RtpsTransportHeader::valid().

00527 {
00528   receiver_.reset(remote_address_, header.header_);
00529 
00530 #if defined(OPENDDS_SECURITY)
00531   secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00532 #endif
00533 
00534   return header.valid();
00535 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
) [private, virtual]

Called when there is a ReceivedDataSample to be delivered.

Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 73 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::DATA, deliver_sample_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::dest_guid_prefix_, link_, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::received_sample_header(), receiver_, OpenDDS::RTPS::SEC_POSTFIX, OpenDDS::RTPS::SEC_PREFIX, OpenDDS::RTPS::SRTPS_POSTFIX, OpenDDS::RTPS::SRTPS_PREFIX, and OpenDDS::DCPS::RtpsSampleHeader::submessage_.

00075 {
00076   using namespace RTPS;
00077 
00078   if (std::memcmp(receiver_.dest_guid_prefix_, link_->local_prefix(),
00079                   sizeof(GuidPrefix_t))) {
00080     // Not our message, we may be on multicast listening to all the others.
00081     return;
00082   }
00083 
00084   const RtpsSampleHeader& rsh = received_sample_header();
00085 
00086 #if defined(OPENDDS_SECURITY)
00087   const SubmessageKind kind = rsh.submessage_._d();
00088 
00089   if ((secure_prefix_.smHeader.submessageId == SRTPS_PREFIX
00090        && kind != SRTPS_POSTFIX) ||
00091       (secure_prefix_.smHeader.submessageId == SEC_PREFIX
00092        && kind != SEC_POSTFIX)) {
00093     // secure envelope in progress, defer processing
00094     secure_submessages_.push_back(rsh.submessage_);
00095     if (kind == DATA) {
00096       // SRTPS: once full-message protection is supported, this technique will
00097       // need to be extended to support > 1 data payload (auth. only)
00098       secure_sample_ = sample;
00099     }
00100     return;
00101   }
00102 #endif
00103 
00104   deliver_sample_i(sample, rsh.submessage_);
00105 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i ( ReceivedDataSample sample,
const RTPS::Submessage submessage 
) [private]

Definition at line 108 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, OpenDDS::RTPS::DATA, OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::DataLink::data_received_include(), OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::DATAWRITER_LIVELINESS, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::RTPS::FLAG_L, OpenDDS::RTPS::GAP, OpenDDS::RTPS::Submessage::gap_sm, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::Submessage::hb_frag_sm, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::HEARTBEAT_FRAG, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_REPLY, OpenDDS::RTPS::INFO_REPLY_IP4, OpenDDS::RTPS::INFO_SRC, OpenDDS::RTPS::INFO_TS, link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, readers_selected_, readers_withheld_, OpenDDS::DCPS::RtpsUdpDataLink::received(), receiver_, recvd_sample_, OpenDDS::RTPS::SEC_POSTFIX, OpenDDS::RTPS::SEC_PREFIX, OpenDDS::RTPS::Submessage::security_sm, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::source_guid_prefix_, OpenDDS::RTPS::SRTPS_POSTFIX, OpenDDS::RTPS::SRTPS_PREFIX, OpenDDS::DCPS::SUBMESSAGE_NONE, and OpenDDS::DCPS::Transport_debug_level.

Referenced by deliver_sample().

00110 {
00111   using namespace RTPS;
00112   const SubmessageKind kind = submessage._d();
00113 
00114   switch (kind) {
00115   case INFO_SRC:
00116   case INFO_REPLY_IP4:
00117   case INFO_DST:
00118   case INFO_REPLY:
00119   case INFO_TS:
00120     // No-op: the INFO_* submessages only modify the state of the
00121     // MessageReceiver (see check_header()), they are not passed up to DCPS.
00122     break;
00123 
00124   case DATA: {
00125     receiver_.fill_header(sample.header_);
00126     const DataSubmessage& data = submessage.data_sm();
00127     recvd_sample_ = &sample;
00128     readers_selected_.clear();
00129     readers_withheld_.clear();
00130     // If this sample should be withheld from some readers in order to maintain
00131     // in-order delivery, link_->received() will add it to readers_withheld_ otherwise
00132     // it will be added to readers_selected_
00133     link_->received(data, receiver_.source_guid_prefix_);
00134     recvd_sample_ = 0;
00135 
00136     if (data.readerId != ENTITYID_UNKNOWN) {
00137       RepoId reader;
00138       std::memcpy(reader.guidPrefix, link_->local_prefix(),
00139                   sizeof(GuidPrefix_t));
00140       reader.entityId = data.readerId;
00141       if (!readers_withheld_.count(reader)) {
00142         if (Transport_debug_level > 5) {
00143           GuidConverter reader_conv(reader);
00144           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q to reader %C\n", this,
00145                                sample.header_.sequence_.getValue(),
00146                                OPENDDS_STRING(reader_conv).c_str()));
00147         }
00148 #if defined(OPENDDS_SECURITY)
00149         if (decode_payload(sample, data)) {
00150           link_->data_received(sample, reader);
00151         }
00152 #else
00153         link_->data_received(sample, reader);
00154 #endif
00155 
00156       }
00157 
00158     } else {
00159       if (Transport_debug_level > 5) {
00160         OPENDDS_STRING included_ids;
00161         bool first = true;
00162         RepoIdSet::iterator iter = readers_selected_.begin();
00163         while(iter != readers_selected_.end()) {
00164           included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00165           first = false;
00166           ++iter;
00167         }
00168         OPENDDS_STRING excluded_ids;
00169         first = true;
00170         RepoIdSet::iterator iter2 = this->readers_withheld_.begin();
00171         while(iter2 != readers_withheld_.end()) {
00172             excluded_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter2));
00173           first = false;
00174           ++iter2;
00175         }
00176         ACE_DEBUG((LM_DEBUG, "(%P|%t)  - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_selected ids:\n%C\n", this, included_ids.c_str()));
00177         ACE_DEBUG((LM_DEBUG, "(%P|%t)  - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_withheld ids:\n%C\n", this, excluded_ids.c_str()));
00178       }
00179 
00180       if (readers_withheld_.empty() && readers_selected_.empty()) {
00181         if (Transport_debug_level > 5) {
00182           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n", this,
00183                                sample.header_.sequence_.getValue()));
00184         }
00185 
00186 #if defined(OPENDDS_SECURITY)
00187         if (decode_payload(sample, data)) {
00188           link_->data_received(sample);
00189         }
00190 #else
00191         link_->data_received(sample);
00192 #endif
00193 
00194       } else {
00195         if (Transport_debug_level > 5) {
00196           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received_include for seq: %q to readers_selected_\n", this,
00197                                sample.header_.sequence_.getValue()));
00198         }
00199 
00200 #if defined(OPENDDS_SECURITY)
00201         if (decode_payload(sample, data)) {
00202           link_->data_received_include(sample, readers_selected_);
00203         }
00204 #else
00205         link_->data_received_include(sample, readers_selected_);
00206 #endif
00207 
00208       }
00209     }
00210     break;
00211   }
00212   case GAP:
00213     link_->received(submessage.gap_sm(), receiver_.source_guid_prefix_);
00214     break;
00215 
00216   case HEARTBEAT:
00217     link_->received(submessage.heartbeat_sm(),
00218                     receiver_.source_guid_prefix_);
00219     if (submessage.heartbeat_sm().smHeader.flags & FLAG_L) {
00220       // Liveliness has been asserted.  Create a DATAWRITER_LIVELINESS message.
00221       sample.header_.message_id_ = DATAWRITER_LIVELINESS;
00222       receiver_.fill_header(sample.header_);
00223       sample.header_.publication_id_.entityId = submessage.heartbeat_sm().writerId;
00224       link_->data_received(sample);
00225     }
00226     break;
00227 
00228   case ACKNACK:
00229     link_->received(submessage.acknack_sm(),
00230                     receiver_.source_guid_prefix_);
00231     break;
00232 
00233   case HEARTBEAT_FRAG:
00234     link_->received(submessage.hb_frag_sm(),
00235                     receiver_.source_guid_prefix_);
00236     break;
00237 
00238   case NACK_FRAG:
00239     link_->received(submessage.nack_frag_sm(),
00240                     receiver_.source_guid_prefix_);
00241     break;
00242 
00243   /* no case DATA_FRAG: by the time deliver_sample() is called, reassemble()
00244      has successfully reassembled the fragments and we now have a DATA submsg
00245    */
00246 
00247 #if defined(OPENDDS_SECURITY)
00248   case SRTPS_PREFIX:
00249   case SEC_PREFIX:
00250     secure_prefix_ = submessage.security_sm();
00251     break;
00252 
00253   case SRTPS_POSTFIX:
00254     secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE;
00255     secure_sample_ = ReceivedDataSample(0);
00256     ACE_ERROR((LM_ERROR, "ERROR: RtpsUdpReceiveStrategy SRTPS unsupported.\n"));
00257     break;
00258 
00259   case SEC_POSTFIX:
00260     deliver_from_secure(submessage);
00261     break;
00262 #endif
00263 
00264   default:
00265     break;
00266   }
00267 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::do_not_withhold_data_from ( const RepoId sub_id  ) 

Definition at line 567 of file RtpsUdpReceiveStrategy.cpp.

References readers_selected_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::process_data_i().

00568 {
00569   readers_selected_.insert(sub_id);
00570 }

Here is the caller graph for this function:

int OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input ( ACE_HANDLE  fd  )  [virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 39 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::handle_dds_input().

00040 {
00041   return handle_dds_input(fd);
00042 }

Here is the call graph for this function:

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments ( const SequenceRange range,
const RepoId pub_id,
FragmentInfo *  frag_info = 0 
)

Definition at line 644 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::TransportReassembly::get_gaps(), OpenDDS::DCPS::TransportReassembly::has_frags(), OpenDDS::RTPS::FragmentNumberSet::numBits, and reassembly_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::generate_nack_frags(), and OpenDDS::DCPS::RtpsUdpDataLink::process_heartbeat_i().

00647 {
00648   for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00649     if (reassembly_.has_frags(sn, pub_id)) {
00650       if (frag_info) {
00651         std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
00652         p.first = sn;
00653         frag_info->push_back(p);
00654         RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
00655         missing_frags.bitmap.length(8); // start at max length
00656         missing_frags.bitmapBase.value =
00657           reassembly_.get_gaps(sn, pub_id, missing_frags.bitmap.get_buffer(),
00658                                8, missing_frags.numBits);
00659         // reduce length in case get_gaps() didn't need all that room
00660         missing_frags.bitmap.length((missing_frags.numBits + 31) / 32);
00661       } else {
00662         return true;
00663       }
00664     }
00665   }
00666   return frag_info ? !frag_info->empty() : false;
00667 }

Here is the call graph for this function:

Here is the caller graph for this function:

typedef OpenDDS::DCPS::RtpsUdpReceiveStrategy::OPENDDS_VECTOR ( SeqFragPair   ) 
bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble ( ReceivedDataSample data  )  [private, virtual]

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 573 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::Submessage::data_frag_sm, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, frags_, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::TransportReassembly::reassemble(), reassembly_, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::received_sample_header(), receiver_, OpenDDS::DCPS::ReceivedDataSample::sample_, and OpenDDS::DCPS::RtpsSampleHeader::submessage_.

00574 {
00575   using namespace RTPS;
00576   receiver_.fill_header(data.header_); // set publication_id_.guidPrefix
00577   if (reassembly_.reassemble(frags_, data)) {
00578 
00579     // Reassembly was successful, replace DataFrag with Data.  This doesn't have
00580     // to be a fully-formed DataSubmessage, just enough for this class to use
00581     // in deliver_sample() which ends up calling RtpsUdpDataLink::received().
00582     // In particular we will need the SequenceNumber, but ignore the iQoS.
00583 
00584     // Peek at the byte order from the encapsulation containing the payload.
00585     data.header_.byte_order_ = data.sample_->rd_ptr()[1] & FLAG_E;
00586 
00587     RtpsSampleHeader& rsh = received_sample_header();
00588     const DataFragSubmessage& dfsm = rsh.submessage_.data_frag_sm();
00589 
00590     const CORBA::Octet data_flags = (data.header_.byte_order_ ? FLAG_E : 0)
00591       | (data.header_.key_fields_only_ ? FLAG_K_IN_DATA : FLAG_D);
00592     const DataSubmessage dsm = {
00593       {DATA, data_flags, 0}, 0, DATA_OCTETS_TO_IQOS,
00594       dfsm.readerId, dfsm.writerId, dfsm.writerSN, ParameterList()};
00595     rsh.submessage_.data_sm(dsm);
00596     return true;
00597   }
00598   return false;
00599 }

Here is the call graph for this function:

ssize_t OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr remote_address,
ACE_HANDLE  fd 
) [private, virtual]

Only our subclass knows how to do this.

Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 45 of file RtpsUdpReceiveStrategy.cpp.

References ACE_IPC_SAP::get_handle(), link_, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), ACE_SOCK_Dgram::recv(), remote_address_, socket(), and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().

00049 {
00050   const ACE_SOCK_Dgram& socket =
00051     (fd == link_->unicast_socket().get_handle())
00052     ? link_->unicast_socket() : link_->multicast_socket();
00053 #ifdef ACE_LACKS_SENDMSG
00054   char buffer[0x10000];
00055   ssize_t scatter = socket.recv(buffer, sizeof buffer, remote_address);
00056   char* iter = buffer;
00057   for (int i = 0; scatter > 0 && i < n; ++i) {
00058     const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len), // int on LynxOS
00059                                   static_cast<size_t>(scatter));
00060     std::memcpy(iov[i].iov_base, iter, chunk);
00061     scatter -= chunk;
00062     iter += chunk;
00063   }
00064   const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer);
00065 #else
00066   const ssize_t ret = socket.recv(iov, n, remote_address);
00067 #endif
00068   remote_address_ = remote_address;
00069   return ret;
00070 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments ( const SequenceRange range,
const RepoId pub_id 
)

Remove any saved fragments. We do not expect to receive any more fragments with sequence numbers in "range" from publication "pub_id".

Definition at line 635 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReassembly::data_unavailable(), and reassembly_.

00637 {
00638   for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00639     reassembly_.data_unavailable(sn, pub_id);
00640   }
00641 }

Here is the call graph for this function:

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap ( CORBA::Long  bitmap[],
CORBA::ULong  num_bits,
const SequenceNumber base,
const RepoId pub_id 
)

For each "1" bit in the bitmap, change it to a "0" if there are fragments from publication "pub_id" for the sequence number represented by that position in the bitmap. Returns true if the bitmap was changed.

Definition at line 602 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReassembly::has_frags(), and reassembly_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::send_ack_nacks().

00606 {
00607   bool modified = false;
00608   for (CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) {
00609     if (bit == 32) bit = 0;
00610 
00611     if (bit == 0) {
00612       x = static_cast<CORBA::ULong>(bitmap[i / 32]);
00613       if (x == 0) {
00614         // skip an entire Long if it's all 0's (adds 32 due to ++i)
00615         i += 31;
00616         bit = 31;
00617         //FUTURE: this could be generalized with something like the x86 "bsr"
00618         //        instruction using compiler intrinsics, VC++ _BitScanReverse()
00619         //        and GCC __builtin_clz()
00620         continue;
00621       }
00622     }
00623 
00624     const CORBA::ULong mask = 1 << (31 - bit);
00625     if ((x & mask) && reassembly_.has_frags(base + i, pub_id)) {
00626       x &= ~mask;
00627       bitmap[i / 32] = x;
00628       modified = true;
00629     }
00630   }
00631   return modified;
00632 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i (  )  [private, virtual]

Let the subclass start.

Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 458 of file RtpsUdpReceiveStrategy.cpp.

References ACE_TEXT(), OpenDDS::DCPS::RtpsUdpDataLink::config(), ACE_IPC_SAP::control(), ACE_IPC_SAP::get_handle(), OpenDDS::DCPS::RtpsUdpDataLink::get_reactor(), link_, LM_ERROR, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket(), and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.

00459 {
00460   ACE_Reactor* reactor = link_->get_reactor();
00461   if (reactor == 0) {
00462     ACE_ERROR_RETURN((LM_ERROR,
00463                       ACE_TEXT("(%P|%t) ERROR: ")
00464                       ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00465                       ACE_TEXT("NULL reactor reference!\n")),
00466                      -1);
00467   }
00468 
00469 #ifdef ACE_WIN32
00470   // By default Winsock will cause reads to fail with "connection reset"
00471   // when UDP sends result in ICMP "port unreachable" messages.
00472   // The transport framework is not set up for this since returning <= 0
00473   // from our receive_bytes causes the framework to close down the datalink
00474   // which in this case is used to receive from multiple peers.
00475   BOOL recv_udp_connreset = FALSE;
00476   link_->unicast_socket().control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00477 #endif
00478 
00479   if (reactor->register_handler(link_->unicast_socket().get_handle(), this,
00480                                 ACE_Event_Handler::READ_MASK) != 0) {
00481     ACE_ERROR_RETURN((LM_ERROR,
00482                       ACE_TEXT("(%P|%t) ERROR: ")
00483                       ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00484                       ACE_TEXT("failed to register handler for unicast ")
00485                       ACE_TEXT("socket %d\n"),
00486                       link_->unicast_socket().get_handle()),
00487                      -1);
00488   }
00489 
00490   if (link_->config().use_multicast_) {
00491     if (reactor->register_handler(link_->multicast_socket().get_handle(), this,
00492                                   ACE_Event_Handler::READ_MASK) != 0) {
00493       ACE_ERROR_RETURN((LM_ERROR,
00494                         ACE_TEXT("(%P|%t) ERROR: ")
00495                         ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00496                         ACE_TEXT("failed to register handler for multicast\n")),
00497                        -1);
00498     }
00499   }
00500 
00501   return 0;
00502 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i (  )  [private, virtual]

Let the subclass stop.

Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.

Definition at line 505 of file RtpsUdpReceiveStrategy.cpp.

References ACE_TEXT(), OpenDDS::DCPS::RtpsUdpDataLink::config(), ACE_IPC_SAP::get_handle(), OpenDDS::DCPS::RtpsUdpDataLink::get_reactor(), link_, LM_ERROR, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::remove_handler(), OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket(), and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.

00506 {
00507   ACE_Reactor* reactor = link_->get_reactor();
00508   if (reactor == 0) {
00509     ACE_ERROR((LM_ERROR,
00510                ACE_TEXT("(%P|%t) ERROR: ")
00511                ACE_TEXT("RtpsUdpReceiveStrategy::stop_i: ")
00512                ACE_TEXT("NULL reactor reference!\n")));
00513     return;
00514   }
00515 
00516   reactor->remove_handler(link_->unicast_socket().get_handle(),
00517                           ACE_Event_Handler::READ_MASK);
00518 
00519   if (link_->config().use_multicast_) {
00520     reactor->remove_handler(link_->multicast_socket().get_handle(),
00521                             ACE_Event_Handler::READ_MASK);
00522   }
00523 }

Here is the call graph for this function:

const ReceivedDataSample * OpenDDS::DCPS::RtpsUdpReceiveStrategy::withhold_data_from ( const RepoId sub_id  ) 

Prevent delivery of the currently in-progress data sample to the subscription sub_id. Returns pointer to the in-progress data so it can be stored for later delivery.

Definition at line 560 of file RtpsUdpReceiveStrategy.cpp.

References readers_withheld_, and recvd_sample_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::process_data_i().

00561 {
00562   readers_withheld_.insert(sub_id);
00563   return recvd_sample_;
00564 }

Here is the caller graph for this function:


Member Data Documentation

Definition at line 102 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), and reassemble().

Definition at line 97 of file RtpsUdpReceiveStrategy.h.

Definition at line 100 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample_i(), and do_not_withhold_data_from().

Definition at line 100 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample_i(), and withhold_data_from().

Definition at line 99 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample_i(), and withhold_data_from().

Definition at line 132 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), and receive_bytes().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1