OpenDDS::DCPS::RtpsUdpReceiveStrategy Class Reference

#include <RtpsUdpReceiveStrategy.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpReceiveStrategy:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpReceiveStrategy:

Collaboration graph
[legend]
List of all members.

Public Types

typedef std::pair< SequenceNumber,
RTPS::FragmentNumberSet
SeqFragPair

Public Member Functions

 RtpsUdpReceiveStrategy (RtpsUdpDataLink *link)
virtual int handle_input (ACE_HANDLE fd)
bool remove_frags_from_bitmap (CORBA::Long bitmap[], CORBA::ULong num_bits, const SequenceNumber &base, const RepoId &pub_id)
void remove_fragments (const SequenceRange &range, const RepoId &pub_id)
typedef OPENDDS_VECTOR (SeqFragPair) FragmentInfo
bool has_fragments (const SequenceRange &range, const RepoId &pub_id, FragmentInfo *frag_info=0)
const ReceivedDataSamplewithhold_data_from (const RepoId &sub_id)
void do_not_withhold_data_from (const RepoId &sub_id)

Private Member Functions

virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd)
 Only our subclass knows how to do this.
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
 Called when there is a ReceivedDataSample to be delivered.
virtual int start_i ()
 Let the subclass start.
virtual void stop_i ()
 Let the subclass stop.
virtual bool check_header (const RtpsTransportHeader &header)
virtual bool check_header (const RtpsSampleHeader &header)
virtual bool reassemble (ReceivedDataSample &data)

Private Attributes

RtpsUdpDataLinklink_
SequenceNumber last_received_
const ReceivedDataSamplerecvd_sample_
RepoIdSet readers_withheld_
RepoIdSet readers_selected_
SequenceRange frags_
TransportReassembly reassembly_
MessageReceiver receiver_
ACE_INET_Addr remote_address_

Classes

struct  MessageReceiver

Detailed Description

Definition at line 30 of file RtpsUdpReceiveStrategy.h.


Member Typedef Documentation

typedef std::pair<SequenceNumber, RTPS::FragmentNumberSet> OpenDDS::DCPS::RtpsUdpReceiveStrategy::SeqFragPair

Definition at line 50 of file RtpsUdpReceiveStrategy.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::RtpsUdpReceiveStrategy::RtpsUdpReceiveStrategy ( RtpsUdpDataLink link  )  [explicit]

Definition at line 23 of file RtpsUdpReceiveStrategy.cpp.

00024   : link_(link)
00025   , last_received_()
00026   , recvd_sample_(0)
00027   , receiver_(link->local_prefix())
00028 {
00029 }


Member Function Documentation

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header ( const RtpsSampleHeader header  )  [private, virtual]

Definition at line 270 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::DATA_FRAG, frags_, header, receiver_, and OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::submsg().

00271 {
00272   receiver_.submsg(header.submessage_);
00273 
00274   // save fragmentation details for use in reassemble()
00275   if (header.valid() && header.submessage_._d() == RTPS::DATA_FRAG) {
00276     const RTPS::DataFragSubmessage& rtps = header.submessage_.data_frag_sm();
00277     frags_.first = rtps.fragmentStartingNum.value;
00278     frags_.second = frags_.first + (rtps.fragmentsInSubmessage - 1);
00279   }
00280 
00281   return header.valid();
00282 }

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header ( const RtpsTransportHeader header  )  [private, virtual]

Definition at line 263 of file RtpsUdpReceiveStrategy.cpp.

References header, receiver_, remote_address_, and OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::reset().

00264 {
00265   receiver_.reset(remote_address_, header.header_);
00266   return header.valid();
00267 }

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr &  remote_address 
) [private, virtual]

Called when there is a ReceivedDataSample to be delivered.

Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 66 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, OpenDDS::RTPS::DATA, OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::DataLink::data_received_include(), OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::DATAWRITER_LIVELINESS, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::dest_guid_prefix_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::RTPS::GAP, OpenDDS::RTPS::Submessage::gap_sm, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::Submessage::hb_frag_sm, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::HEARTBEAT_FRAG, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_REPLY, OpenDDS::RTPS::INFO_REPLY_IP4, OpenDDS::RTPS::INFO_SRC, OpenDDS::RTPS::INFO_TS, link_, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, readers_selected_, readers_withheld_, OpenDDS::DCPS::RtpsUdpDataLink::received(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header(), receiver_, recvd_sample_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::source_guid_prefix_, OpenDDS::DCPS::RtpsSampleHeader::submessage_, and OpenDDS::DCPS::Transport_debug_level.

00068 {
00069   using namespace RTPS;
00070 
00071   if (std::memcmp(receiver_.dest_guid_prefix_, link_->local_prefix(),
00072                   sizeof(GuidPrefix_t))) {
00073     // Not our message, we may be on multicast listening to all the others.
00074     return;
00075   }
00076 
00077   const RtpsSampleHeader& rsh = received_sample_header();
00078   const SubmessageKind kind = rsh.submessage_._d();
00079 
00080   switch (kind) {
00081   case INFO_SRC:
00082   case INFO_REPLY_IP4:
00083   case INFO_DST:
00084   case INFO_REPLY:
00085   case INFO_TS:
00086     // No-op: the INFO_* submessages only modify the state of the
00087     // MessageReceiver (see check_header()), they are not passed up to DCPS.
00088     break;
00089 
00090   case DATA: {
00091     receiver_.fill_header(sample.header_);
00092     const DataSubmessage& data = rsh.submessage_.data_sm();
00093     recvd_sample_ = &sample;
00094     readers_selected_.clear();
00095     readers_withheld_.clear();
00096     // If this sample should be withheld from some readers in order to maintain
00097     // in-order delivery, link_->received() will add it to readers_withheld_ otherwise
00098     // it will be added to readers_selected_
00099     link_->received(data, receiver_.source_guid_prefix_);
00100     recvd_sample_ = 0;
00101 
00102     if (data.readerId != ENTITYID_UNKNOWN) {
00103       RepoId reader;
00104       std::memcpy(reader.guidPrefix, link_->local_prefix(),
00105                   sizeof(GuidPrefix_t));
00106       reader.entityId = data.readerId;
00107       if (!readers_withheld_.count(reader)) {
00108         if (Transport_debug_level > 5) {
00109           GuidConverter reader_conv(reader);
00110           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q to reader %C\n", this,
00111                                sample.header_.sequence_.getValue(),
00112                                OPENDDS_STRING(reader_conv).c_str()));
00113         }
00114         link_->data_received(sample, reader);
00115       }
00116 
00117     } else {
00118       if (Transport_debug_level > 5) {
00119         OPENDDS_STRING included_ids;
00120         bool first = true;
00121         RepoIdSet::iterator iter = readers_selected_.begin();
00122         while(iter != readers_selected_.end()) {
00123           included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00124           first = false;
00125           ++iter;
00126         }
00127         OPENDDS_STRING excluded_ids;
00128         first = true;
00129         RepoIdSet::iterator iter2 = this->readers_withheld_.begin();
00130         while(iter2 != readers_withheld_.end()) {
00131             excluded_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter2));
00132           first = false;
00133           ++iter2;
00134         }
00135         ACE_DEBUG((LM_DEBUG, "(%P|%t)  - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_selected ids:\n%C\n", this, included_ids.c_str()));
00136         ACE_DEBUG((LM_DEBUG, "(%P|%t)  - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_withheld ids:\n%C\n", this, excluded_ids.c_str()));
00137       }
00138 
00139       if (readers_withheld_.empty() && readers_selected_.empty()) {
00140         if (Transport_debug_level > 5) {
00141           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n", this,
00142                                sample.header_.sequence_.getValue()));
00143         }
00144         link_->data_received(sample);
00145       } else {
00146         if (Transport_debug_level > 5) {
00147           ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received_include for seq: %q to readers_selected_\n", this,
00148                                sample.header_.sequence_.getValue()));
00149         }
00150         link_->data_received_include(sample, readers_selected_);
00151       }
00152     }
00153     break;
00154   }
00155   case GAP:
00156     link_->received(rsh.submessage_.gap_sm(), receiver_.source_guid_prefix_);
00157     break;
00158 
00159   case HEARTBEAT:
00160     link_->received(rsh.submessage_.heartbeat_sm(),
00161                     receiver_.source_guid_prefix_);
00162     if (rsh.submessage_.heartbeat_sm().smHeader.flags & 4 /*FLAG_L*/) {
00163       // Liveliness has been asserted.  Create a DATAWRITER_LIVELINESS message.
00164       sample.header_.message_id_ = DATAWRITER_LIVELINESS;
00165       receiver_.fill_header(sample.header_);
00166       sample.header_.publication_id_.entityId = rsh.submessage_.heartbeat_sm().writerId;
00167       link_->data_received(sample);
00168     }
00169     break;
00170 
00171   case ACKNACK:
00172     link_->received(rsh.submessage_.acknack_sm(),
00173                     receiver_.source_guid_prefix_);
00174     break;
00175 
00176   case HEARTBEAT_FRAG:
00177     link_->received(rsh.submessage_.hb_frag_sm(),
00178                     receiver_.source_guid_prefix_);
00179     break;
00180 
00181   case NACK_FRAG:
00182     link_->received(rsh.submessage_.nack_frag_sm(),
00183                     receiver_.source_guid_prefix_);
00184     break;
00185 
00186   /* no case DATA_FRAG: by the time deliver_sample() is called, reassemble()
00187      has successfully reassembled the fragments and we now have a DATA submsg
00188    */
00189   default:
00190     break;
00191   }
00192 }

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::do_not_withhold_data_from ( const RepoId sub_id  ) 

Definition at line 292 of file RtpsUdpReceiveStrategy.cpp.

References readers_selected_.

00293 {
00294   readers_selected_.insert(sub_id);
00295 }

int OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input ( ACE_HANDLE  fd  )  [virtual]

Definition at line 32 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00033 {
00034   return handle_dds_input(fd);
00035 }

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments ( const SequenceRange range,
const RepoId pub_id,
FragmentInfo *  frag_info = 0 
)

Definition at line 369 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::TransportReassembly::get_gaps(), OpenDDS::DCPS::TransportReassembly::has_frags(), OpenDDS::RTPS::FragmentNumberSet::numBits, and reassembly_.

00372 {
00373   for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00374     if (reassembly_.has_frags(sn, pub_id)) {
00375       if (frag_info) {
00376         std::pair<SequenceNumber, RTPS::FragmentNumberSet> p;
00377         p.first = sn;
00378         frag_info->push_back(p);
00379         RTPS::FragmentNumberSet& missing_frags = frag_info->back().second;
00380         missing_frags.bitmap.length(8); // start at max length
00381         missing_frags.bitmapBase.value =
00382           reassembly_.get_gaps(sn, pub_id, missing_frags.bitmap.get_buffer(),
00383                                8, missing_frags.numBits);
00384         // reduce length in case get_gaps() didn't need all that room
00385         missing_frags.bitmap.length((missing_frags.numBits + 31) / 32);
00386       } else {
00387         return true;
00388       }
00389     }
00390   }
00391   return frag_info ? !frag_info->empty() : false;
00392 }

typedef OpenDDS::DCPS::RtpsUdpReceiveStrategy::OPENDDS_VECTOR ( SeqFragPair   ) 

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble ( ReceivedDataSample data  )  [private, virtual]

Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 298 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::Submessage::data_frag_sm, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), frags_, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::TransportReassembly::reassemble(), reassembly_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header(), receiver_, OpenDDS::DCPS::ReceivedDataSample::sample_, and OpenDDS::DCPS::RtpsSampleHeader::submessage_.

00299 {
00300   using namespace RTPS;
00301   receiver_.fill_header(data.header_); // set publication_id_.guidPrefix
00302   if (reassembly_.reassemble(frags_, data)) {
00303 
00304     // Reassembly was successful, replace DataFrag with Data.  This doesn't have
00305     // to be a fully-formed DataSubmessage, just enough for this class to use
00306     // in deliver_sample() which ends up calling RtpsUdpDataLink::received().
00307     // In particular we will need the SequenceNumber, but ignore the iQoS.
00308 
00309     // Peek at the byte order from the encapsulation containing the payload.
00310     data.header_.byte_order_ = data.sample_->rd_ptr()[1] & 1 /*FLAG_E*/;
00311 
00312     RtpsSampleHeader& rsh = received_sample_header();
00313     const DataFragSubmessage& dfsm = rsh.submessage_.data_frag_sm();
00314 
00315     const CORBA::Octet data_flags = (data.header_.byte_order_ ? 1 : 0) // FLAG_E
00316       | (data.header_.key_fields_only_ ? 8 : 4); // FLAG_K : FLAG_D
00317     const DataSubmessage dsm = {
00318       {DATA, data_flags, 0}, 0, DATA_OCTETS_TO_IQOS,
00319       dfsm.readerId, dfsm.writerId, dfsm.writerSN, ParameterList()};
00320     rsh.submessage_.data_sm(dsm);
00321     return true;
00322   }
00323   return false;
00324 }

ssize_t OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr &  remote_address,
ACE_HANDLE  fd 
) [private, virtual]

Only our subclass knows how to do this.

Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 38 of file RtpsUdpReceiveStrategy.cpp.

References link_, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), remote_address_, and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().

00042 {
00043   const ACE_SOCK_Dgram& socket =
00044     (fd == link_->unicast_socket().get_handle())
00045     ? link_->unicast_socket() : link_->multicast_socket();
00046 #ifdef ACE_LACKS_SENDMSG
00047   char buffer[0x10000];
00048   ssize_t scatter = socket.recv(buffer, sizeof buffer, remote_address);
00049   char* iter = buffer;
00050   for (int i = 0; scatter > 0 && i < n; ++i) {
00051     const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len), // int on LynxOS
00052                                   static_cast<size_t>(scatter));
00053     std::memcpy(iov[i].iov_base, iter, chunk);
00054     scatter -= chunk;
00055     iter += chunk;
00056   }
00057   const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer);
00058 #else
00059   const ssize_t ret = socket.recv(iov, n, remote_address);
00060 #endif
00061   remote_address_ = remote_address;
00062   return ret;
00063 }

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments ( const SequenceRange range,
const RepoId pub_id 
)

Remove any saved fragments. We do not expect to receive any more fragments with sequence numbers in "range" from publication "pub_id".

Definition at line 360 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReassembly::data_unavailable(), and reassembly_.

00362 {
00363   for (SequenceNumber sn = range.first; sn <= range.second; ++sn) {
00364     reassembly_.data_unavailable(sn, pub_id);
00365   }
00366 }

bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap ( CORBA::Long  bitmap[],
CORBA::ULong  num_bits,
const SequenceNumber base,
const RepoId pub_id 
)

For each "1" bit in the bitmap, change it to a "0" if there are fragments from publication "pub_id" for the sequence number represented by that position in the bitmap. Returns true if the bitmap was changed.

Definition at line 327 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::TransportReassembly::has_frags(), and reassembly_.

00331 {
00332   bool modified = false;
00333   for (CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) {
00334     if (bit == 32) bit = 0;
00335 
00336     if (bit == 0) {
00337       x = static_cast<CORBA::ULong>(bitmap[i / 32]);
00338       if (x == 0) {
00339         // skip an entire Long if it's all 0's (adds 32 due to ++i)
00340         i += 31;
00341         bit = 31;
00342         //FUTURE: this could be generalized with something like the x86 "bsr"
00343         //        instruction using compiler intrinsics, VC++ _BitScanReverse()
00344         //        and GCC __builtin_clz()
00345         continue;
00346       }
00347     }
00348 
00349     const CORBA::ULong mask = 1 << (31 - bit);
00350     if ((x & mask) && reassembly_.has_frags(base + i, pub_id)) {
00351       x &= ~mask;
00352       bitmap[i / 32] = x;
00353       modified = true;
00354     }
00355   }
00356   return modified;
00357 }

int OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i (  )  [private, virtual]

Let the subclass start.

Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 195 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::DCPS::RtpsUdpDataLink::get_reactor(), link_, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket(), and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.

00196 {
00197   ACE_Reactor* reactor = link_->get_reactor();
00198   if (reactor == 0) {
00199     ACE_ERROR_RETURN((LM_ERROR,
00200                       ACE_TEXT("(%P|%t) ERROR: ")
00201                       ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00202                       ACE_TEXT("NULL reactor reference!\n")),
00203                      -1);
00204   }
00205 
00206 #ifdef ACE_WIN32
00207   // By default Winsock will cause reads to fail with "connection reset"
00208   // when UDP sends result in ICMP "port unreachable" messages.
00209   // The transport framework is not set up for this since returning <= 0
00210   // from our receive_bytes causes the framework to close down the datalink
00211   // which in this case is used to receive from multiple peers.
00212   BOOL recv_udp_connreset = FALSE;
00213   link_->unicast_socket().control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00214 #endif
00215 
00216   if (reactor->register_handler(link_->unicast_socket().get_handle(), this,
00217                                 ACE_Event_Handler::READ_MASK) != 0) {
00218     ACE_ERROR_RETURN((LM_ERROR,
00219                       ACE_TEXT("(%P|%t) ERROR: ")
00220                       ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00221                       ACE_TEXT("failed to register handler for unicast ")
00222                       ACE_TEXT("socket %d\n"),
00223                       link_->unicast_socket().get_handle()),
00224                      -1);
00225   }
00226 
00227   if (link_->config()->use_multicast_) {
00228     if (reactor->register_handler(link_->multicast_socket().get_handle(), this,
00229                                   ACE_Event_Handler::READ_MASK) != 0) {
00230       ACE_ERROR_RETURN((LM_ERROR,
00231                         ACE_TEXT("(%P|%t) ERROR: ")
00232                         ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ")
00233                         ACE_TEXT("failed to register handler for multicast\n")),
00234                        -1);
00235     }
00236   }
00237 
00238   return 0;
00239 }

void OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i (  )  [private, virtual]

Let the subclass stop.

Implements OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >.

Definition at line 242 of file RtpsUdpReceiveStrategy.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::config(), OpenDDS::DCPS::RtpsUdpDataLink::get_reactor(), link_, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket(), and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.

00243 {
00244   ACE_Reactor* reactor = link_->get_reactor();
00245   if (reactor == 0) {
00246     ACE_ERROR((LM_ERROR,
00247                ACE_TEXT("(%P|%t) ERROR: ")
00248                ACE_TEXT("RtpsUdpReceiveStrategy::stop_i: ")
00249                ACE_TEXT("NULL reactor reference!\n")));
00250     return;
00251   }
00252 
00253   reactor->remove_handler(link_->unicast_socket().get_handle(),
00254                           ACE_Event_Handler::READ_MASK);
00255 
00256   if (link_->config()->use_multicast_) {
00257     reactor->remove_handler(link_->multicast_socket().get_handle(),
00258                             ACE_Event_Handler::READ_MASK);
00259   }
00260 }

const ReceivedDataSample * OpenDDS::DCPS::RtpsUdpReceiveStrategy::withhold_data_from ( const RepoId sub_id  ) 

Prevent delivery of the currently in-progress data sample to the subscription sub_id. Returns pointer to the in-progress data so it can be stored for later delivery.

Definition at line 285 of file RtpsUdpReceiveStrategy.cpp.

References readers_withheld_, and recvd_sample_.

00286 {
00287   readers_withheld_.insert(sub_id);
00288   return recvd_sample_;
00289 }


Member Data Documentation

SequenceRange OpenDDS::DCPS::RtpsUdpReceiveStrategy::frags_ [private]

Definition at line 87 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), and reassemble().

SequenceNumber OpenDDS::DCPS::RtpsUdpReceiveStrategy::last_received_ [private]

Definition at line 82 of file RtpsUdpReceiveStrategy.h.

RtpsUdpDataLink* OpenDDS::DCPS::RtpsUdpReceiveStrategy::link_ [private]

Definition at line 81 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample(), receive_bytes(), start_i(), and stop_i().

RepoIdSet OpenDDS::DCPS::RtpsUdpReceiveStrategy::readers_selected_ [private]

Definition at line 85 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample(), and do_not_withhold_data_from().

RepoIdSet OpenDDS::DCPS::RtpsUdpReceiveStrategy::readers_withheld_ [private]

Definition at line 85 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample(), and withhold_data_from().

TransportReassembly OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassembly_ [private]

Definition at line 88 of file RtpsUdpReceiveStrategy.h.

Referenced by has_fragments(), reassemble(), remove_fragments(), and remove_frags_from_bitmap().

MessageReceiver OpenDDS::DCPS::RtpsUdpReceiveStrategy::receiver_ [private]

Definition at line 116 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), deliver_sample(), and reassemble().

const ReceivedDataSample* OpenDDS::DCPS::RtpsUdpReceiveStrategy::recvd_sample_ [private]

Definition at line 84 of file RtpsUdpReceiveStrategy.h.

Referenced by deliver_sample(), and withhold_data_from().

ACE_INET_Addr OpenDDS::DCPS::RtpsUdpReceiveStrategy::remote_address_ [private]

Definition at line 117 of file RtpsUdpReceiveStrategy.h.

Referenced by check_header(), and receive_bytes().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:37 2016 for OpenDDS by  doxygen 1.4.7