#include <RtpsUdpReceiveStrategy.h>
Classes | |
struct | MessageReceiver |
Public Types | |
typedef std::pair < SequenceNumber, RTPS::FragmentNumberSet > | SeqFragPair |
Public Member Functions | |
RtpsUdpReceiveStrategy (RtpsUdpDataLink *link, const GuidPrefix_t &local_prefix) | |
virtual int | handle_input (ACE_HANDLE fd) |
bool | remove_frags_from_bitmap (CORBA::Long bitmap[], CORBA::ULong num_bits, const SequenceNumber &base, const RepoId &pub_id) |
void | remove_fragments (const SequenceRange &range, const RepoId &pub_id) |
typedef | OPENDDS_VECTOR (SeqFragPair) FragmentInfo |
bool | has_fragments (const SequenceRange &range, const RepoId &pub_id, FragmentInfo *frag_info=0) |
const ReceivedDataSample * | withhold_data_from (const RepoId &sub_id) |
void | do_not_withhold_data_from (const RepoId &sub_id) |
Private Member Functions | |
virtual ssize_t | receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd) |
Only our subclass knows how to do this. | |
virtual void | deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address) |
Called when there is a ReceivedDataSample to be delivered. | |
void | deliver_sample_i (ReceivedDataSample &sample, const RTPS::Submessage &submessage) |
virtual int | start_i () |
Let the subclass start. | |
virtual void | stop_i () |
Let the subclass stop. | |
virtual bool | check_header (const RtpsTransportHeader &header) |
Check the transport header for suitability. | |
virtual bool | check_header (const RtpsSampleHeader &header) |
Check the data sample header for suitability. | |
virtual bool | reassemble (ReceivedDataSample &data) |
Private Attributes | |
RtpsUdpDataLink * | link_ |
SequenceNumber | last_received_ |
const ReceivedDataSample * | recvd_sample_ |
RepoIdSet | readers_withheld_ |
RepoIdSet | readers_selected_ |
SequenceRange | frags_ |
TransportReassembly | reassembly_ |
MessageReceiver | receiver_ |
ACE_INET_Addr | remote_address_ |
Definition at line 32 of file RtpsUdpReceiveStrategy.h.
typedef std::pair<SequenceNumber, RTPS::FragmentNumberSet> OpenDDS::DCPS::RtpsUdpReceiveStrategy::SeqFragPair |
Definition at line 53 of file RtpsUdpReceiveStrategy.h.
OpenDDS::DCPS::RtpsUdpReceiveStrategy::RtpsUdpReceiveStrategy | ( | RtpsUdpDataLink * | link, | |
const GuidPrefix_t & | local_prefix | |||
) | [explicit] |
Definition at line 24 of file RtpsUdpReceiveStrategy.cpp.
References defined(), if(), and OpenDDS::DCPS::SUBMESSAGE_NONE.
00025 : link_(link) 00026 , last_received_() 00027 , recvd_sample_(0) 00028 , receiver_(local_prefix) 00029 #if defined(OPENDDS_SECURITY) 00030 , secure_sample_(0) 00031 #endif 00032 { 00033 #if defined(OPENDDS_SECURITY) 00034 secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE; 00035 #endif 00036 }
bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header | ( | const RtpsSampleHeader & | header | ) | [private, virtual] |
Check the data sample header for suitability.
Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.
Definition at line 538 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::RTPS::DATA_FRAG, OpenDDS::RTPS::Submessage::data_frag_sm, OpenDDS::RTPS::DataFragSubmessage::fragmentsInSubmessage, OpenDDS::RTPS::DataFragSubmessage::fragmentStartingNum, frags_, receiver_, OpenDDS::DCPS::RtpsSampleHeader::submessage_, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::submsg(), and OpenDDS::DCPS::RtpsSampleHeader::valid().
00539 { 00540 00541 #if defined(OPENDDS_SECURITY) 00542 if (secure_prefix_.smHeader.submessageId) { 00543 return header.valid(); 00544 } 00545 #endif 00546 00547 receiver_.submsg(header.submessage_); 00548 00549 // save fragmentation details for use in reassemble() 00550 if (header.valid() && header.submessage_._d() == RTPS::DATA_FRAG) { 00551 const RTPS::DataFragSubmessage& rtps = header.submessage_.data_frag_sm(); 00552 frags_.first = rtps.fragmentStartingNum.value; 00553 frags_.second = frags_.first + (rtps.fragmentsInSubmessage - 1); 00554 } 00555 00556 return header.valid(); 00557 }
bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::check_header | ( | const RtpsTransportHeader & | header | ) | [private, virtual] |
Check the transport header for suitability.
Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.
Definition at line 526 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::DCPS::RtpsTransportHeader::header_, receiver_, remote_address_, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::reset(), OpenDDS::DCPS::SUBMESSAGE_NONE, and OpenDDS::DCPS::RtpsTransportHeader::valid().
00527 { 00528 receiver_.reset(remote_address_, header.header_); 00529 00530 #if defined(OPENDDS_SECURITY) 00531 secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE; 00532 #endif 00533 00534 return header.valid(); 00535 }
void OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample | ( | ReceivedDataSample & | sample, | |
const ACE_INET_Addr & | remote_address | |||
) | [private, virtual] |
Called when there is a ReceivedDataSample to be delivered.
Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.
Definition at line 73 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::RTPS::DATA, deliver_sample_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::dest_guid_prefix_, link_, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::received_sample_header(), receiver_, OpenDDS::RTPS::SEC_POSTFIX, OpenDDS::RTPS::SEC_PREFIX, OpenDDS::RTPS::SRTPS_POSTFIX, OpenDDS::RTPS::SRTPS_PREFIX, and OpenDDS::DCPS::RtpsSampleHeader::submessage_.
00075 { 00076 using namespace RTPS; 00077 00078 if (std::memcmp(receiver_.dest_guid_prefix_, link_->local_prefix(), 00079 sizeof(GuidPrefix_t))) { 00080 // Not our message, we may be on multicast listening to all the others. 00081 return; 00082 } 00083 00084 const RtpsSampleHeader& rsh = received_sample_header(); 00085 00086 #if defined(OPENDDS_SECURITY) 00087 const SubmessageKind kind = rsh.submessage_._d(); 00088 00089 if ((secure_prefix_.smHeader.submessageId == SRTPS_PREFIX 00090 && kind != SRTPS_POSTFIX) || 00091 (secure_prefix_.smHeader.submessageId == SEC_PREFIX 00092 && kind != SEC_POSTFIX)) { 00093 // secure envelope in progress, defer processing 00094 secure_submessages_.push_back(rsh.submessage_); 00095 if (kind == DATA) { 00096 // SRTPS: once full-message protection is supported, this technique will 00097 // need to be extended to support > 1 data payload (auth. only) 00098 secure_sample_ = sample; 00099 } 00100 return; 00101 } 00102 #endif 00103 00104 deliver_sample_i(sample, rsh.submessage_); 00105 }
void OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i | ( | ReceivedDataSample & | sample, | |
const RTPS::Submessage & | submessage | |||
) | [private] |
Definition at line 108 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::RTPS::ACKNACK, OpenDDS::RTPS::Submessage::acknack_sm, OpenDDS::RTPS::DATA, OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::DataLink::data_received_include(), OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::DATAWRITER_LIVELINESS, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::RTPS::FLAG_L, OpenDDS::RTPS::GAP, OpenDDS::RTPS::Submessage::gap_sm, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::Submessage::hb_frag_sm, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::RTPS::HEARTBEAT, OpenDDS::RTPS::HEARTBEAT_FRAG, OpenDDS::RTPS::Submessage::heartbeat_sm, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_REPLY, OpenDDS::RTPS::INFO_REPLY_IP4, OpenDDS::RTPS::INFO_SRC, OpenDDS::RTPS::INFO_TS, link_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::RtpsUdpDataLink::local_prefix(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::RTPS::NACK_FRAG, OpenDDS::RTPS::Submessage::nack_frag_sm, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, readers_selected_, readers_withheld_, OpenDDS::DCPS::RtpsUdpDataLink::received(), receiver_, recvd_sample_, OpenDDS::RTPS::SEC_POSTFIX, OpenDDS::RTPS::SEC_PREFIX, OpenDDS::RTPS::Submessage::security_sm, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::source_guid_prefix_, OpenDDS::RTPS::SRTPS_POSTFIX, OpenDDS::RTPS::SRTPS_PREFIX, OpenDDS::DCPS::SUBMESSAGE_NONE, and OpenDDS::DCPS::Transport_debug_level.
Referenced by deliver_sample().
00110 { 00111 using namespace RTPS; 00112 const SubmessageKind kind = submessage._d(); 00113 00114 switch (kind) { 00115 case INFO_SRC: 00116 case INFO_REPLY_IP4: 00117 case INFO_DST: 00118 case INFO_REPLY: 00119 case INFO_TS: 00120 // No-op: the INFO_* submessages only modify the state of the 00121 // MessageReceiver (see check_header()), they are not passed up to DCPS. 00122 break; 00123 00124 case DATA: { 00125 receiver_.fill_header(sample.header_); 00126 const DataSubmessage& data = submessage.data_sm(); 00127 recvd_sample_ = &sample; 00128 readers_selected_.clear(); 00129 readers_withheld_.clear(); 00130 // If this sample should be withheld from some readers in order to maintain 00131 // in-order delivery, link_->received() will add it to readers_withheld_ otherwise 00132 // it will be added to readers_selected_ 00133 link_->received(data, receiver_.source_guid_prefix_); 00134 recvd_sample_ = 0; 00135 00136 if (data.readerId != ENTITYID_UNKNOWN) { 00137 RepoId reader; 00138 std::memcpy(reader.guidPrefix, link_->local_prefix(), 00139 sizeof(GuidPrefix_t)); 00140 reader.entityId = data.readerId; 00141 if (!readers_withheld_.count(reader)) { 00142 if (Transport_debug_level > 5) { 00143 GuidConverter reader_conv(reader); 00144 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q to reader %C\n", this, 00145 sample.header_.sequence_.getValue(), 00146 OPENDDS_STRING(reader_conv).c_str())); 00147 } 00148 #if defined(OPENDDS_SECURITY) 00149 if (decode_payload(sample, data)) { 00150 link_->data_received(sample, reader); 00151 } 00152 #else 00153 link_->data_received(sample, reader); 00154 #endif 00155 00156 } 00157 00158 } else { 00159 if (Transport_debug_level > 5) { 00160 OPENDDS_STRING included_ids; 00161 bool first = true; 00162 RepoIdSet::iterator iter = readers_selected_.begin(); 00163 while(iter != readers_selected_.end()) { 00164 included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter)); 00165 first = false; 00166 ++iter; 00167 } 00168 OPENDDS_STRING excluded_ids; 00169 first = true; 00170 RepoIdSet::iterator iter2 = this->readers_withheld_.begin(); 00171 while(iter2 != readers_withheld_.end()) { 00172 excluded_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter2)); 00173 first = false; 00174 ++iter2; 00175 } 00176 ACE_DEBUG((LM_DEBUG, "(%P|%t) - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_selected ids:\n%C\n", this, included_ids.c_str())); 00177 ACE_DEBUG((LM_DEBUG, "(%P|%t) - RtpsUdpReceiveStrategy[%@]::deliver_sample \nreaders_withheld ids:\n%C\n", this, excluded_ids.c_str())); 00178 } 00179 00180 if (readers_withheld_.empty() && readers_selected_.empty()) { 00181 if (Transport_debug_level > 5) { 00182 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received for seq: %q TO ALL, no exclusion or inclusion\n", this, 00183 sample.header_.sequence_.getValue())); 00184 } 00185 00186 #if defined(OPENDDS_SECURITY) 00187 if (decode_payload(sample, data)) { 00188 link_->data_received(sample); 00189 } 00190 #else 00191 link_->data_received(sample); 00192 #endif 00193 00194 } else { 00195 if (Transport_debug_level > 5) { 00196 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpReceiveStrategy[%@]::deliver_sample - calling DataLink::data_received_include for seq: %q to readers_selected_\n", this, 00197 sample.header_.sequence_.getValue())); 00198 } 00199 00200 #if defined(OPENDDS_SECURITY) 00201 if (decode_payload(sample, data)) { 00202 link_->data_received_include(sample, readers_selected_); 00203 } 00204 #else 00205 link_->data_received_include(sample, readers_selected_); 00206 #endif 00207 00208 } 00209 } 00210 break; 00211 } 00212 case GAP: 00213 link_->received(submessage.gap_sm(), receiver_.source_guid_prefix_); 00214 break; 00215 00216 case HEARTBEAT: 00217 link_->received(submessage.heartbeat_sm(), 00218 receiver_.source_guid_prefix_); 00219 if (submessage.heartbeat_sm().smHeader.flags & FLAG_L) { 00220 // Liveliness has been asserted. Create a DATAWRITER_LIVELINESS message. 00221 sample.header_.message_id_ = DATAWRITER_LIVELINESS; 00222 receiver_.fill_header(sample.header_); 00223 sample.header_.publication_id_.entityId = submessage.heartbeat_sm().writerId; 00224 link_->data_received(sample); 00225 } 00226 break; 00227 00228 case ACKNACK: 00229 link_->received(submessage.acknack_sm(), 00230 receiver_.source_guid_prefix_); 00231 break; 00232 00233 case HEARTBEAT_FRAG: 00234 link_->received(submessage.hb_frag_sm(), 00235 receiver_.source_guid_prefix_); 00236 break; 00237 00238 case NACK_FRAG: 00239 link_->received(submessage.nack_frag_sm(), 00240 receiver_.source_guid_prefix_); 00241 break; 00242 00243 /* no case DATA_FRAG: by the time deliver_sample() is called, reassemble() 00244 has successfully reassembled the fragments and we now have a DATA submsg 00245 */ 00246 00247 #if defined(OPENDDS_SECURITY) 00248 case SRTPS_PREFIX: 00249 case SEC_PREFIX: 00250 secure_prefix_ = submessage.security_sm(); 00251 break; 00252 00253 case SRTPS_POSTFIX: 00254 secure_prefix_.smHeader.submessageId = SUBMESSAGE_NONE; 00255 secure_sample_ = ReceivedDataSample(0); 00256 ACE_ERROR((LM_ERROR, "ERROR: RtpsUdpReceiveStrategy SRTPS unsupported.\n")); 00257 break; 00258 00259 case SEC_POSTFIX: 00260 deliver_from_secure(submessage); 00261 break; 00262 #endif 00263 00264 default: 00265 break; 00266 } 00267 }
void OpenDDS::DCPS::RtpsUdpReceiveStrategy::do_not_withhold_data_from | ( | const RepoId & | sub_id | ) |
Definition at line 567 of file RtpsUdpReceiveStrategy.cpp.
References readers_selected_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::process_data_i().
00568 { 00569 readers_selected_.insert(sub_id); 00570 }
int OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input | ( | ACE_HANDLE | fd | ) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 39 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::handle_dds_input().
00040 { 00041 return handle_dds_input(fd); 00042 }
bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments | ( | const SequenceRange & | range, | |
const RepoId & | pub_id, | |||
FragmentInfo * | frag_info = 0 | |||
) |
Definition at line 644 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::TransportReassembly::get_gaps(), OpenDDS::DCPS::TransportReassembly::has_frags(), OpenDDS::RTPS::FragmentNumberSet::numBits, and reassembly_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::generate_nack_frags(), and OpenDDS::DCPS::RtpsUdpDataLink::process_heartbeat_i().
00647 { 00648 for (SequenceNumber sn = range.first; sn <= range.second; ++sn) { 00649 if (reassembly_.has_frags(sn, pub_id)) { 00650 if (frag_info) { 00651 std::pair<SequenceNumber, RTPS::FragmentNumberSet> p; 00652 p.first = sn; 00653 frag_info->push_back(p); 00654 RTPS::FragmentNumberSet& missing_frags = frag_info->back().second; 00655 missing_frags.bitmap.length(8); // start at max length 00656 missing_frags.bitmapBase.value = 00657 reassembly_.get_gaps(sn, pub_id, missing_frags.bitmap.get_buffer(), 00658 8, missing_frags.numBits); 00659 // reduce length in case get_gaps() didn't need all that room 00660 missing_frags.bitmap.length((missing_frags.numBits + 31) / 32); 00661 } else { 00662 return true; 00663 } 00664 } 00665 } 00666 return frag_info ? !frag_info->empty() : false; 00667 }
typedef OpenDDS::DCPS::RtpsUdpReceiveStrategy::OPENDDS_VECTOR | ( | SeqFragPair | ) |
bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble | ( | ReceivedDataSample & | data | ) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.
Definition at line 573 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::RTPS::DATA, OpenDDS::RTPS::Submessage::data_frag_sm, OpenDDS::RTPS::DATA_OCTETS_TO_IQOS, OpenDDS::RTPS::Submessage::data_sm, OpenDDS::DCPS::RtpsUdpReceiveStrategy::MessageReceiver::fill_header(), OpenDDS::RTPS::FLAG_D, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_K_IN_DATA, frags_, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::TransportReassembly::reassemble(), reassembly_, OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >::received_sample_header(), receiver_, OpenDDS::DCPS::ReceivedDataSample::sample_, and OpenDDS::DCPS::RtpsSampleHeader::submessage_.
00574 { 00575 using namespace RTPS; 00576 receiver_.fill_header(data.header_); // set publication_id_.guidPrefix 00577 if (reassembly_.reassemble(frags_, data)) { 00578 00579 // Reassembly was successful, replace DataFrag with Data. This doesn't have 00580 // to be a fully-formed DataSubmessage, just enough for this class to use 00581 // in deliver_sample() which ends up calling RtpsUdpDataLink::received(). 00582 // In particular we will need the SequenceNumber, but ignore the iQoS. 00583 00584 // Peek at the byte order from the encapsulation containing the payload. 00585 data.header_.byte_order_ = data.sample_->rd_ptr()[1] & FLAG_E; 00586 00587 RtpsSampleHeader& rsh = received_sample_header(); 00588 const DataFragSubmessage& dfsm = rsh.submessage_.data_frag_sm(); 00589 00590 const CORBA::Octet data_flags = (data.header_.byte_order_ ? FLAG_E : 0) 00591 | (data.header_.key_fields_only_ ? FLAG_K_IN_DATA : FLAG_D); 00592 const DataSubmessage dsm = { 00593 {DATA, data_flags, 0}, 0, DATA_OCTETS_TO_IQOS, 00594 dfsm.readerId, dfsm.writerId, dfsm.writerSN, ParameterList()}; 00595 rsh.submessage_.data_sm(dsm); 00596 return true; 00597 } 00598 return false; 00599 }
ssize_t OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes | ( | iovec | iov[], | |
int | n, | |||
ACE_INET_Addr & | remote_address, | |||
ACE_HANDLE | fd | |||
) | [private, virtual] |
Only our subclass knows how to do this.
Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.
Definition at line 45 of file RtpsUdpReceiveStrategy.cpp.
References ACE_IPC_SAP::get_handle(), link_, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), ACE_SOCK_Dgram::recv(), remote_address_, socket(), and OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket().
00049 { 00050 const ACE_SOCK_Dgram& socket = 00051 (fd == link_->unicast_socket().get_handle()) 00052 ? link_->unicast_socket() : link_->multicast_socket(); 00053 #ifdef ACE_LACKS_SENDMSG 00054 char buffer[0x10000]; 00055 ssize_t scatter = socket.recv(buffer, sizeof buffer, remote_address); 00056 char* iter = buffer; 00057 for (int i = 0; scatter > 0 && i < n; ++i) { 00058 const size_t chunk = std::min(static_cast<size_t>(iov[i].iov_len), // int on LynxOS 00059 static_cast<size_t>(scatter)); 00060 std::memcpy(iov[i].iov_base, iter, chunk); 00061 scatter -= chunk; 00062 iter += chunk; 00063 } 00064 const ssize_t ret = (scatter < 0) ? scatter : (iter - buffer); 00065 #else 00066 const ssize_t ret = socket.recv(iov, n, remote_address); 00067 #endif 00068 remote_address_ = remote_address; 00069 return ret; 00070 }
void OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments | ( | const SequenceRange & | range, | |
const RepoId & | pub_id | |||
) |
Remove any saved fragments. We do not expect to receive any more fragments with sequence numbers in "range" from publication "pub_id".
Definition at line 635 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::DCPS::TransportReassembly::data_unavailable(), and reassembly_.
00637 { 00638 for (SequenceNumber sn = range.first; sn <= range.second; ++sn) { 00639 reassembly_.data_unavailable(sn, pub_id); 00640 } 00641 }
bool OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap | ( | CORBA::Long | bitmap[], | |
CORBA::ULong | num_bits, | |||
const SequenceNumber & | base, | |||
const RepoId & | pub_id | |||
) |
For each "1" bit in the bitmap, change it to a "0" if there are fragments from publication "pub_id" for the sequence number represented by that position in the bitmap. Returns true if the bitmap was changed.
Definition at line 602 of file RtpsUdpReceiveStrategy.cpp.
References OpenDDS::DCPS::TransportReassembly::has_frags(), and reassembly_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::send_ack_nacks().
00606 { 00607 bool modified = false; 00608 for (CORBA::ULong i = 0, x = 0, bit = 0; i < num_bits; ++i, ++bit) { 00609 if (bit == 32) bit = 0; 00610 00611 if (bit == 0) { 00612 x = static_cast<CORBA::ULong>(bitmap[i / 32]); 00613 if (x == 0) { 00614 // skip an entire Long if it's all 0's (adds 32 due to ++i) 00615 i += 31; 00616 bit = 31; 00617 //FUTURE: this could be generalized with something like the x86 "bsr" 00618 // instruction using compiler intrinsics, VC++ _BitScanReverse() 00619 // and GCC __builtin_clz() 00620 continue; 00621 } 00622 } 00623 00624 const CORBA::ULong mask = 1 << (31 - bit); 00625 if ((x & mask) && reassembly_.has_frags(base + i, pub_id)) { 00626 x &= ~mask; 00627 bitmap[i / 32] = x; 00628 modified = true; 00629 } 00630 } 00631 return modified; 00632 }
int OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i | ( | ) | [private, virtual] |
Let the subclass start.
Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.
Definition at line 458 of file RtpsUdpReceiveStrategy.cpp.
References ACE_TEXT(), OpenDDS::DCPS::RtpsUdpDataLink::config(), ACE_IPC_SAP::control(), ACE_IPC_SAP::get_handle(), OpenDDS::DCPS::RtpsUdpDataLink::get_reactor(), link_, LM_ERROR, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::register_handler(), OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket(), and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.
00459 { 00460 ACE_Reactor* reactor = link_->get_reactor(); 00461 if (reactor == 0) { 00462 ACE_ERROR_RETURN((LM_ERROR, 00463 ACE_TEXT("(%P|%t) ERROR: ") 00464 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ") 00465 ACE_TEXT("NULL reactor reference!\n")), 00466 -1); 00467 } 00468 00469 #ifdef ACE_WIN32 00470 // By default Winsock will cause reads to fail with "connection reset" 00471 // when UDP sends result in ICMP "port unreachable" messages. 00472 // The transport framework is not set up for this since returning <= 0 00473 // from our receive_bytes causes the framework to close down the datalink 00474 // which in this case is used to receive from multiple peers. 00475 BOOL recv_udp_connreset = FALSE; 00476 link_->unicast_socket().control(SIO_UDP_CONNRESET, &recv_udp_connreset); 00477 #endif 00478 00479 if (reactor->register_handler(link_->unicast_socket().get_handle(), this, 00480 ACE_Event_Handler::READ_MASK) != 0) { 00481 ACE_ERROR_RETURN((LM_ERROR, 00482 ACE_TEXT("(%P|%t) ERROR: ") 00483 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ") 00484 ACE_TEXT("failed to register handler for unicast ") 00485 ACE_TEXT("socket %d\n"), 00486 link_->unicast_socket().get_handle()), 00487 -1); 00488 } 00489 00490 if (link_->config().use_multicast_) { 00491 if (reactor->register_handler(link_->multicast_socket().get_handle(), this, 00492 ACE_Event_Handler::READ_MASK) != 0) { 00493 ACE_ERROR_RETURN((LM_ERROR, 00494 ACE_TEXT("(%P|%t) ERROR: ") 00495 ACE_TEXT("RtpsUdpReceiveStrategy::start_i: ") 00496 ACE_TEXT("failed to register handler for multicast\n")), 00497 -1); 00498 } 00499 } 00500 00501 return 0; 00502 }
void OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i | ( | ) | [private, virtual] |
Let the subclass stop.
Implements OpenDDS::DCPS::TransportReceiveStrategy< RtpsTransportHeader, RtpsSampleHeader >.
Definition at line 505 of file RtpsUdpReceiveStrategy.cpp.
References ACE_TEXT(), OpenDDS::DCPS::RtpsUdpDataLink::config(), ACE_IPC_SAP::get_handle(), OpenDDS::DCPS::RtpsUdpDataLink::get_reactor(), link_, LM_ERROR, OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket(), ACE_Event_Handler::reactor(), ACE_Event_Handler::READ_MASK, ACE_Reactor::remove_handler(), OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket(), and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.
00506 { 00507 ACE_Reactor* reactor = link_->get_reactor(); 00508 if (reactor == 0) { 00509 ACE_ERROR((LM_ERROR, 00510 ACE_TEXT("(%P|%t) ERROR: ") 00511 ACE_TEXT("RtpsUdpReceiveStrategy::stop_i: ") 00512 ACE_TEXT("NULL reactor reference!\n"))); 00513 return; 00514 } 00515 00516 reactor->remove_handler(link_->unicast_socket().get_handle(), 00517 ACE_Event_Handler::READ_MASK); 00518 00519 if (link_->config().use_multicast_) { 00520 reactor->remove_handler(link_->multicast_socket().get_handle(), 00521 ACE_Event_Handler::READ_MASK); 00522 } 00523 }
const ReceivedDataSample * OpenDDS::DCPS::RtpsUdpReceiveStrategy::withhold_data_from | ( | const RepoId & | sub_id | ) |
Prevent delivery of the currently in-progress data sample to the subscription sub_id. Returns pointer to the in-progress data so it can be stored for later delivery.
Definition at line 560 of file RtpsUdpReceiveStrategy.cpp.
References readers_withheld_, and recvd_sample_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::process_data_i().
00561 { 00562 readers_withheld_.insert(sub_id); 00563 return recvd_sample_; 00564 }
Definition at line 102 of file RtpsUdpReceiveStrategy.h.
Referenced by check_header(), and reassemble().
Definition at line 97 of file RtpsUdpReceiveStrategy.h.
Definition at line 96 of file RtpsUdpReceiveStrategy.h.
Referenced by deliver_sample(), deliver_sample_i(), receive_bytes(), start_i(), and stop_i().
RepoIdSet OpenDDS::DCPS::RtpsUdpReceiveStrategy::readers_selected_ [private] |
Definition at line 100 of file RtpsUdpReceiveStrategy.h.
Referenced by deliver_sample_i(), and do_not_withhold_data_from().
RepoIdSet OpenDDS::DCPS::RtpsUdpReceiveStrategy::readers_withheld_ [private] |
Definition at line 100 of file RtpsUdpReceiveStrategy.h.
Referenced by deliver_sample_i(), and withhold_data_from().
Definition at line 103 of file RtpsUdpReceiveStrategy.h.
Referenced by has_fragments(), reassemble(), remove_fragments(), and remove_frags_from_bitmap().
Definition at line 131 of file RtpsUdpReceiveStrategy.h.
Referenced by check_header(), deliver_sample(), deliver_sample_i(), and reassemble().
const ReceivedDataSample* OpenDDS::DCPS::RtpsUdpReceiveStrategy::recvd_sample_ [private] |
Definition at line 99 of file RtpsUdpReceiveStrategy.h.
Referenced by deliver_sample_i(), and withhold_data_from().
Definition at line 132 of file RtpsUdpReceiveStrategy.h.
Referenced by check_header(), and receive_bytes().