#include <RtpsUdpDataLink.h>
Definition at line 54 of file RtpsUdpDataLink.h.
typedef std::pair<RepoId, InterestingRemote> OpenDDS::DCPS::RtpsUdpDataLink::CallbackType [private] |
Definition at line 481 of file RtpsUdpDataLink.h.
typedef void(RtpsUdpDataLink::* OpenDDS::DCPS::RtpsUdpDataLink::PMF)() [private] |
Definition at line 380 of file RtpsUdpDataLink.h.
OpenDDS::DCPS::RtpsUdpDataLink::RtpsUdpDataLink | ( | RtpsUdpTransport & | transport, | |
const GuidPrefix_t & | local_prefix, | |||
const RtpsUdpInst & | config, | |||
const TransportReactorTask_rch & | reactor_task | |||
) |
Definition at line 80 of file RtpsUdpDataLink.cpp.
References defined(), if(), local_prefix_, OpenDDS::DCPS::DataLink::receive_strategy_, Security, and OpenDDS::DCPS::DataLink::send_strategy_.
00084 : DataLink(transport, // 3 data link "attributes", below, are unused 00085 0, // priority 00086 false, // is_loopback 00087 false), // is_active 00088 reactor_task_(reactor_task), 00089 multi_buff_(this, config.nak_depth_), 00090 best_effort_heartbeat_count_(0), 00091 nack_reply_(this, &RtpsUdpDataLink::send_nack_replies, 00092 config.nak_response_delay_), 00093 heartbeat_reply_(this, &RtpsUdpDataLink::send_heartbeat_replies, 00094 config.heartbeat_response_delay_), 00095 heartbeat_(make_rch<HeartBeat>(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::send_heartbeats)), 00096 heartbeatchecker_(make_rch<HeartBeat>(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::check_heartbeats)), 00097 #if defined(OPENDDS_SECURITY) 00098 held_data_delivery_handler_(this), 00099 security_config_(Security::SecurityRegistry::instance()->default_config()), 00100 local_crypto_handle_(DDS::HANDLE_NIL) 00101 #else 00102 held_data_delivery_handler_(this) 00103 #endif 00104 { 00105 this->send_strategy_ = make_rch<RtpsUdpSendStrategy>(this, local_prefix); 00106 this->receive_strategy_ = make_rch<RtpsUdpReceiveStrategy>(this, local_prefix); 00107 std::memcpy(local_prefix_, local_prefix, sizeof(GuidPrefix_t)); 00108 }
bool OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification | ( | TransportQueueElement * | element | ) |
Definition at line 117 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::TransportQueueElement::publication_id(), and writers_.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification().
00118 { 00119 RtpsWriterMap::iterator iter = writers_.find(element->publication_id()); 00120 if (iter != writers_.end()) { 00121 00122 iter->second.add_elem_awaiting_ack(element); 00123 return true; 00124 } 00125 return false; 00126 }
void OpenDDS::DCPS::RtpsUdpDataLink::add_gap_submsg | ( | RTPS::SubmessageSeq & | msg, | |
const TransportQueueElement & | tqe, | |||
const DestToEntityMap & | dtem | |||
) | [private] |
Definition at line 891 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::expected_, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::TransportQueueElement::subscription_id(), and writers_.
Referenced by customize_queue_element().
00894 { 00895 // These are the GAP submessages that we'll send directly in-line with the 00896 // DATA when we notice that the DataWriter has deliberately skipped seq #s. 00897 // There are other GAP submessages generated in response to reader ACKNACKS, 00898 // see send_nack_replies(). 00899 using namespace OpenDDS::RTPS; 00900 00901 const SequenceNumber seq = tqe.sequence(); 00902 const RepoId pub = tqe.publication_id(); 00903 if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || pub == GUID_UNKNOWN 00904 || tqe.subscription_id() != GUID_UNKNOWN) { 00905 return; 00906 } 00907 00908 const RtpsWriterMap::iterator wi = writers_.find(pub); 00909 if (wi == writers_.end()) { 00910 return; // not a reliable writer, does not send GAPs 00911 } 00912 00913 RtpsWriter& rw = wi->second; 00914 00915 if (seq != rw.expected_) { 00916 SequenceNumber firstMissing = rw.expected_; 00917 00918 // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range 00919 // [gapStart, gapListBase) and those in the SNSet. 00920 const SequenceNumber_t gapStart = {firstMissing.getHigh(), 00921 firstMissing.getLow()}, 00922 gapListBase = {seq.getHigh(), 00923 seq.getLow()}; 00924 00925 // We are not going to enable any bits in the "bitmap" of the SNSet, 00926 // but the "numBits" and the bitmap.length must both be > 0. 00927 LongSeq8 bitmap; 00928 bitmap.length(1); 00929 bitmap[0] = 0; 00930 00931 GapSubmessage gap = { 00932 {GAP, FLAG_E, 0 /*length determined below*/}, 00933 ENTITYID_UNKNOWN, // readerId: applies to all matched readers 00934 pub.entityId, 00935 gapStart, 00936 {gapListBase, 1, bitmap} 00937 }; 00938 00939 size_t size = 0, padding = 0; 00940 gen_find_size(gap, size, padding); 00941 gap.smHeader.submessageLength = 00942 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ; 00943 00944 if (!rw.durable_) { 00945 const CORBA::ULong i = msg.length(); 00946 msg.length(i + 1); 00947 msg[i].gap_sm(gap); 00948 } else { 00949 InfoDestinationSubmessage idst = { 00950 {INFO_DST, FLAG_E, INFO_DST_SZ}, 00951 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 00952 }; 00953 CORBA::ULong ml = msg.length(); 00954 00955 //Change the non-directed Gap into multiple directed gaps to prevent 00956 //delivering to currently undiscovered durable readers 00957 DestToEntityMap::const_iterator iter = dtem.begin(); 00958 while (iter != dtem.end()) { 00959 std::memcpy(idst.guidPrefix, iter->first.guidPrefix, sizeof(GuidPrefix_t)); 00960 msg.length(ml + 1); 00961 msg[ml++].info_dst_sm(idst); 00962 00963 const OPENDDS_VECTOR(RepoId)& readers = iter->second; 00964 for (size_t i = 0; i < readers.size(); ++i) { 00965 gap.readerId = readers.at(i).entityId; 00966 msg.length(ml + 1); 00967 msg[ml++].gap_sm(gap); 00968 } //END iter over reader entity ids 00969 ++iter; 00970 } //END iter over reader GuidPrefix_t's 00971 } 00972 } 00973 }
void OpenDDS::DCPS::RtpsUdpDataLink::add_locator | ( | const RepoId & | remote_id, | |
const ACE_INET_Addr & | address, | |||
bool | requires_inline_qos | |||
) |
Definition at line 254 of file RtpsUdpDataLink.cpp.
References lock_.
00257 { 00258 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00259 locators_[remote_id] = RemoteInfo(address, requires_inline_qos); 00260 }
void OpenDDS::DCPS::RtpsUdpDataLink::associated | ( | const RepoId & | local, | |
const RepoId & | remote, | |||
bool | local_reliable, | |||
bool | remote_reliable, | |||
bool | local_durable, | |||
bool | remote_durable | |||
) |
Definition at line 302 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, heartbeat_, heartbeat_counts_, OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), lock_, reader_index_, readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, and writers_.
00305 { 00306 if (!local_reliable) { 00307 return; 00308 } 00309 00310 bool enable_heartbeat = false; 00311 00312 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00313 const GuidConverter conv(local_id); 00314 if (conv.isWriter() && remote_reliable) { 00315 // Insert count if not already there. 00316 heartbeat_counts_.insert(HeartBeatCountMapType::value_type(local_id, 0)); 00317 RtpsWriter& w = writers_[local_id]; 00318 w.remote_readers_[remote_id].durable_ = remote_durable; 00319 w.durable_ = local_durable; 00320 enable_heartbeat = true; 00321 00322 } else if (conv.isReader()) { 00323 RtpsReaderMap::iterator rr = readers_.find(local_id); 00324 if (rr == readers_.end()) { 00325 rr = readers_.insert(RtpsReaderMap::value_type(local_id, RtpsReader())) 00326 .first; 00327 rr->second.durable_ = local_durable; 00328 } 00329 rr->second.remote_writers_[remote_id]; 00330 reader_index_.insert(RtpsReaderIndex::value_type(remote_id, rr)); 00331 } 00332 00333 g.release(); 00334 if (enable_heartbeat) { 00335 heartbeat_->schedule_enable(); 00336 } 00337 }
bool OpenDDS::DCPS::RtpsUdpDataLink::check_handshake_complete | ( | const RepoId & | local, | |
const RepoId & | remote | |||
) |
Definition at line 340 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), and writers_.
00342 { 00343 const GuidConverter conv(local_id); 00344 if (conv.isWriter()) { 00345 RtpsWriterMap::iterator rw = writers_.find(local_id); 00346 if (rw == writers_.end()) { 00347 return true; // not reliable, no handshaking 00348 } 00349 ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote_id); 00350 if (ri == rw->second.remote_readers_.end()) { 00351 return true; // not reliable, no handshaking 00352 } 00353 return ri->second.handshake_done_; 00354 00355 } else if (conv.isReader()) { 00356 return true; // no handshaking for local reader 00357 } 00358 return false; 00359 }
void OpenDDS::DCPS::RtpsUdpDataLink::check_heartbeats | ( | ) | [private] |
Definition at line 2531 of file RtpsUdpDataLink.cpp.
References config(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, ACE_OS::gettimeofday(), OpenDDS::DCPS::RtpsUdpInst::heartbeat_period_, interesting_writers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, lock_, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), and OpenDDS::DCPS::DiscoveryListener::writer_does_not_exist().
02532 { 02533 OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks; 02534 02535 // Have any interesting writers timed out? 02536 const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * this->config().heartbeat_period_; 02537 { 02538 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 02539 02540 for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end(); 02541 pos != limit; 02542 ++pos) { 02543 if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) { 02544 CallbackType callback(pos->first, pos->second); 02545 writerDoesNotExistCallbacks.push_back(callback); 02546 pos->second.status = InterestingRemote::DOES_NOT_EXIST; 02547 } 02548 } 02549 } 02550 02551 OPENDDS_VECTOR(CallbackType)::iterator iter; 02552 for (iter = writerDoesNotExistCallbacks.begin(); iter != writerDoesNotExistCallbacks.end(); ++iter) { 02553 const RepoId& rid = iter->first; 02554 const InterestingRemote& remote = iter->second; 02555 remote.listener->writer_does_not_exist(rid, remote.localid); 02556 } 02557 }
RtpsUdpInst & OpenDDS::DCPS::RtpsUdpDataLink::config | ( | ) | const |
Definition at line 111 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::DataLink::impl().
Referenced by check_heartbeats(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::enable(), open(), send_heartbeats(), send_heartbeats_manual(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().
TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element | ( | TransportQueueElement * | element | ) | [private, virtual] |
Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 688 of file RtpsUdpDataLink.cpp.
References add_gap_submsg(), ACE_Message_Block::cont(), OpenDDS::DCPS::RtpsSampleHeader::control_message_supported(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, ACE_Message_Block::duplicate(), end_historic_samples(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::DataSampleElement::get_header(), ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportSendControlElement::header(), OpenDDS::DCPS::DataSampleHeader::historic_sample_, LM_DEBUG, LM_ERROR, lock_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::TransportQueueElement::msg(), OPENDDS_STRING, OpenDDS::DCPS::TransportCustomizedElement::original_send_element(), OpenDDS::DCPS::DataLink::peer_ids(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), requires_inline_qos(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::TransportSendElement::sample(), send_heartbeats_manual(), send_strategy(), OpenDDS::DCPS::RtpsCustomizedElement::sequence(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN, OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::Transport_debug_level, and writers_.
00689 { 00690 const ACE_Message_Block* msg = element->msg(); 00691 if (!msg) { 00692 return element; 00693 } 00694 00695 const RepoId pub_id = element->publication_id(); 00696 GUIDSeq_var peers = peer_ids(pub_id); 00697 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, 0); 00698 bool requires_inline_qos = this->requires_inline_qos(peers); 00699 00700 RTPS::SubmessageSeq subm; 00701 00702 const RtpsWriterMap::iterator rw = writers_.find(pub_id); 00703 00704 bool gap_ok = true; 00705 DestToEntityMap gap_receivers; 00706 if (rw != writers_.end() && !rw->second.remote_readers_.empty()) { 00707 for (ReaderInfoMap::iterator ri = rw->second.remote_readers_.begin(); 00708 ri != rw->second.remote_readers_.end(); ++ri) { 00709 RepoId tmp; 00710 std::memcpy(tmp.guidPrefix, ri->first.guidPrefix, sizeof(GuidPrefix_t)); 00711 tmp.entityId = ENTITYID_UNKNOWN; 00712 gap_receivers[tmp].push_back(ri->first); 00713 00714 if (ri->second.expecting_durable_data()) { 00715 // Can't add an in-line GAP if some Data Reader is expecting durable 00716 // data, the GAP could cause that Data Reader to ignore the durable 00717 // data. The other readers will eventually learn about the GAP by 00718 // sending an ACKNACK and getting a GAP reply. 00719 gap_ok = false; 00720 break; 00721 } 00722 } 00723 } 00724 00725 if (gap_ok) { 00726 add_gap_submsg(subm, *element, gap_receivers); 00727 } 00728 00729 const SequenceNumber seq = element->sequence(); 00730 if (rw != writers_.end() && seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 00731 rw->second.expected_ = seq; 00732 ++rw->second.expected_; 00733 } 00734 00735 TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element); 00736 TransportCustomizedElement* tce = 00737 dynamic_cast<TransportCustomizedElement*>(element); 00738 TransportSendControlElement* tsce = 00739 dynamic_cast<TransportSendControlElement*>(element); 00740 00741 Message_Block_Ptr data; 00742 bool durable = false; 00743 00744 // Based on the type of 'element', find and duplicate the data payload 00745 // continuation block. 00746 if (tsce) { // Control message 00747 if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) { 00748 data.reset(msg->cont()->duplicate()); 00749 // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader 00750 RtpsSampleHeader::populate_data_control_submessages( 00751 subm, *tsce, requires_inline_qos); 00752 } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) { 00753 end_historic_samples(rw, tsce->header(), msg->cont()); 00754 element->data_delivered(); 00755 return 0; 00756 } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) { 00757 send_heartbeats_manual(tsce); 00758 element->data_delivered(); 00759 return 0; 00760 } else { 00761 element->data_dropped(true /*dropped_by_transport*/); 00762 return 0; 00763 } 00764 00765 } else if (tse) { // Basic data message 00766 // {DataSampleHeader} -> {Data Payload} 00767 data.reset(msg->cont()->duplicate()); 00768 const DataSampleElement* dsle = tse->sample(); 00769 // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader 00770 RtpsSampleHeader::populate_data_sample_submessages( 00771 subm, *dsle, requires_inline_qos); 00772 durable = dsle->get_header().historic_sample_; 00773 00774 } else if (tce) { // Customized data message 00775 // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload} 00776 data.reset(msg->cont()->cont()->duplicate()); 00777 const DataSampleElement* dsle = tce->original_send_element()->sample(); 00778 // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader 00779 RtpsSampleHeader::populate_data_sample_submessages( 00780 subm, *dsle, requires_inline_qos); 00781 durable = dsle->get_header().historic_sample_; 00782 00783 } else { 00784 return element; 00785 } 00786 00787 #if defined(OPENDDS_SECURITY) 00788 send_strategy()->encode_payload(pub_id, data, subm); 00789 #endif 00790 00791 Message_Block_Ptr hdr(submsgs_to_msgblock(subm)); 00792 hdr->cont(data.release()); 00793 RtpsCustomizedElement* rtps = 00794 new RtpsCustomizedElement(element, move(hdr)); 00795 00796 // Handle durability resends 00797 if (durable && rw != writers_.end()) { 00798 const RepoId sub = element->subscription_id(); 00799 if (sub != GUID_UNKNOWN) { 00800 ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(sub); 00801 if (ri != rw->second.remote_readers_.end()) { 00802 ri->second.durable_data_[rtps->sequence()] = rtps; 00803 ri->second.durable_timestamp_ = ACE_OS::gettimeofday(); 00804 if (Transport_debug_level > 3) { 00805 const GuidConverter conv(pub_id), sub_conv(sub); 00806 ACE_DEBUG((LM_DEBUG, 00807 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - " 00808 "storing durable data for local %C remote %C\n", 00809 OPENDDS_STRING(conv).c_str(), OPENDDS_STRING(sub_conv).c_str())); 00810 } 00811 return 0; 00812 } 00813 } 00814 } else if (durable && (Transport_debug_level)) { 00815 const GuidConverter conv(pub_id); 00816 ACE_ERROR((LM_ERROR, 00817 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - " 00818 "WARNING: no RtpsWriter to store durable data for local %C\n", 00819 OPENDDS_STRING(conv).c_str())); 00820 } 00821 00822 return rtps; 00823 }
void OpenDDS::DCPS::RtpsUdpDataLink::datareader_dispatch | ( | const T & | submessage, | |
const GuidPrefix_t & | src_prefix, | |||
const FN & | func | |||
) | [inline, private] |
Definition at line 331 of file RtpsUdpDataLink.h.
References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, and OpenDDS::DCPS::GUID_t::guidPrefix.
Referenced by received().
00333 { 00334 using std::pair; 00335 RepoId local; 00336 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t)); 00337 local.entityId = submessage.readerId; 00338 00339 RepoId src; 00340 std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t)); 00341 src.entityId = submessage.writerId; 00342 00343 bool schedule_timer = false; 00344 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00345 if (local.entityId == ENTITYID_UNKNOWN) { 00346 for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters = 00347 reader_index_.equal_range(src); 00348 iters.first != iters.second; ++iters.first) { 00349 schedule_timer |= (this->*func)(submessage, src, *iters.first->second); 00350 } 00351 00352 } else { 00353 const RtpsReaderMap::iterator rr = readers_.find(local); 00354 if (rr == readers_.end()) { 00355 return; 00356 } 00357 schedule_timer = (this->*func)(submessage, src, *rr); 00358 } 00359 g.release(); 00360 if (schedule_timer) { 00361 heartbeat_reply_.schedule(); 00362 } 00363 }
void OpenDDS::DCPS::RtpsUdpDataLink::deliver_held_data | ( | const RepoId & | readerId, | |
WriterInfo & | info, | |||
bool | durable | |||
) | [private] |
Definition at line 1056 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::DisjointSequence::empty(), held_data_delivery_handler_, OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::RtpsUdpDataLink::HeldDataDeliveryHandler::notify_delivery(), and OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_.
Referenced by process_data_i(), process_gap_i(), and process_heartbeat_i().
01058 { 01059 if (durable && (info.recvd_.empty() || info.recvd_.low() > 1)) return; 01060 held_data_delivery_handler_.notify_delivery(readerId, info); 01061 }
void OpenDDS::DCPS::RtpsUdpDataLink::do_remove_sample | ( | const RepoId & | pub_id, | |
const TransportQueueElement::MatchCriteria & | criteria, | |||
ACE_Guard< ACE_Thread_Mutex > & | guard | |||
) |
Definition at line 128 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), OPENDDS_SET(), ACE_Guard< ACE_LOCK >::release(), and writers_.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::do_remove_sample().
00131 { 00132 RtpsWriter::SnToTqeMap sn_tqe_map; 00133 RtpsWriter::SnToTqeMap to_deliver; 00134 typedef RtpsWriter::SnToTqeMap::iterator iter_t; 00135 00136 RtpsWriterMap::iterator iter = writers_.find(pub_id); 00137 if (iter != writers_.end() && !iter->second.elems_not_acked_.empty()) { 00138 to_deliver.insert(iter->second.to_deliver_.begin(), iter->second.to_deliver_.end()); 00139 iter->second.to_deliver_.clear(); 00140 iter_t it = iter->second.elems_not_acked_.begin(); 00141 OPENDDS_SET(SequenceNumber) sns_to_release; 00142 while (it != iter->second.elems_not_acked_.end()) { 00143 if (criteria.matches(*it->second)) { 00144 sn_tqe_map.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second)); 00145 sns_to_release.insert(it->first); 00146 iter_t last = it; 00147 ++it; 00148 iter->second.elems_not_acked_.erase(last); 00149 } else { 00150 ++it; 00151 } 00152 } 00153 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin(); 00154 while (sns_it != sns_to_release.end()) { 00155 iter->second.send_buff_->release_acked(*sns_it); 00156 ++sns_it; 00157 } 00158 } 00159 00160 // see comment in RtpsUdpDataLink::send_i() for lock order 00161 // reverse guard can't be used since that involves re-locking 00162 guard.release(); 00163 00164 iter_t deliver_iter = to_deliver.begin(); 00165 while (deliver_iter != to_deliver.end()) { 00166 deliver_iter->second->data_delivered(); 00167 ++deliver_iter; 00168 } 00169 iter_t drop_iter = sn_tqe_map.begin(); 00170 while (drop_iter != sn_tqe_map.end()) { 00171 drop_iter->second->data_dropped(true); 00172 ++drop_iter; 00173 } 00174 }
void OpenDDS::DCPS::RtpsUdpDataLink::durability_resend | ( | TransportQueueElement * | element | ) | [private] |
Definition at line 2287 of file RtpsUdpDataLink.cpp.
References get_locator(), OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), and OpenDDS::DCPS::TransportQueueElement::subscription_id().
Referenced by received().
02288 { 02289 ACE_Message_Block* msg = const_cast<ACE_Message_Block*>(element->msg()); 02290 send_strategy()->send_rtps_control(*msg, 02291 get_locator(element->subscription_id())); 02292 }
void OpenDDS::DCPS::RtpsUdpDataLink::end_historic_samples | ( | RtpsWriterMap::iterator | writer, | |
const DataSampleHeader & | header, | |||
ACE_Message_Block * | body | |||
) | [private] |
Definition at line 826 of file RtpsUdpDataLink.cpp.
References ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_UNKNOWN, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_length_, OPENDDS_STRING, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::Transport_debug_level, and writers_.
Referenced by customize_queue_element().
00829 { 00830 // Set the ReaderInfo::durable_timestamp_ for the case where no 00831 // durable samples exist in the DataWriter. 00832 if (writer != writers_.end() && writer->second.durable_) { 00833 const ACE_Time_Value now = ACE_OS::gettimeofday(); 00834 RepoId sub = GUID_UNKNOWN; 00835 if (body && header.message_length_ >= sizeof(sub)) { 00836 std::memcpy(&sub, body->rd_ptr(), header.message_length_); 00837 } 00838 typedef ReaderInfoMap::iterator iter_t; 00839 if (sub == GUID_UNKNOWN) { 00840 if (Transport_debug_level > 3) { 00841 const GuidConverter conv(writer->first); 00842 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples " 00843 "local %C all readers\n", OPENDDS_STRING(conv).c_str())); 00844 } 00845 for (iter_t iter = writer->second.remote_readers_.begin(); 00846 iter != writer->second.remote_readers_.end(); ++iter) { 00847 if (iter->second.durable_) { 00848 iter->second.durable_timestamp_ = now; 00849 } 00850 } 00851 } else { 00852 iter_t iter = writer->second.remote_readers_.find(sub); 00853 if (iter != writer->second.remote_readers_.end()) { 00854 if (iter->second.durable_) { 00855 iter->second.durable_timestamp_ = now; 00856 if (Transport_debug_level > 3) { 00857 const GuidConverter conv(writer->first), sub_conv(sub); 00858 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples" 00859 " local %C remote %C\n", OPENDDS_STRING(conv).c_str(), 00860 OPENDDS_STRING(sub_conv).c_str())); 00861 } 00862 } 00863 } 00864 } 00865 } 00866 }
void OpenDDS::DCPS::RtpsUdpDataLink::extend_bitmap_range | ( | RTPS::FragmentNumberSet & | fnSet, | |
CORBA::ULong | extent | |||
) | [static, private] |
Extend the FragmentNumberSet to cover the fragments that are missing from our last known fragment to the extent
fnSet | FragmentNumberSet for the message sequence number in question | |
extent | is the highest fragment sequence number for this FragmentNumberSet |
Definition at line 1568 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), and OpenDDS::RTPS::FragmentNumberSet::numBits.
Referenced by generate_nack_frags().
01570 { 01571 if (extent < fnSet.bitmapBase.value) { 01572 return; // can't extend to some number under the base 01573 } 01574 // calculate the index to the extent to determine the new_num_bits 01575 const CORBA::ULong new_num_bits = std::min(CORBA::ULong(255), 01576 extent - fnSet.bitmapBase.value + 1), 01577 len = (new_num_bits + 31) / 32; 01578 if (new_num_bits < fnSet.numBits) { 01579 return; // bitmap already extends past "extent" 01580 } 01581 fnSet.bitmap.length(len); 01582 // We are missing from one past old bitmap end to the new end 01583 DisjointSequence::fill_bitmap_range(fnSet.numBits + 1, new_num_bits, 01584 fnSet.bitmap.get_buffer(), len, 01585 fnSet.numBits); 01586 }
size_t OpenDDS::DCPS::RtpsUdpDataLink::generate_nack_frags | ( | OPENDDS_VECTOR(RTPS::NackFragSubmessage)& | nack_frags, | |
WriterInfo & | wi, | |||
const RepoId & | pub_id | |||
) | [private] |
Definition at line 1483 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::ENTITYID_UNKNOWN, extend_bitmap_range(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::hb_range_, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::DisjointSequence::last_ack(), OpenDDS::RTPS::NACK_FRAG, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::nackfrag_count_, OpenDDS::RTPS::FragmentNumberSet::numBits, OpenDDS::DCPS::OPENDDS_MAP(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), receive_strategy(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::NackFragSubmessage::smHeader, and OpenDDS::RTPS::NackFragSubmessage::writerSN.
Referenced by send_ack_nacks().
01485 { 01486 typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t; 01487 typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t; 01488 RtpsUdpReceiveStrategy::FragmentInfo frag_info; 01489 01490 // Populate frag_info with two possible sources of NackFrags: 01491 // 1. sequence #s in the reception gaps that we have partially received 01492 OPENDDS_VECTOR(SequenceRange) missing = wi.recvd_.missing_sequence_ranges(); 01493 for (size_t i = 0; i < missing.size(); ++i) { 01494 receive_strategy()->has_fragments(missing[i], pub_id, &frag_info); 01495 } 01496 // 1b. larger than the last received seq# but less than the heartbeat.lastSN 01497 if (!wi.recvd_.empty()) { 01498 const SequenceRange range(wi.recvd_.high(), wi.hb_range_.second); 01499 receive_strategy()->has_fragments(range, pub_id, &frag_info); 01500 } 01501 for (size_t i = 0; i < frag_info.size(); ++i) { 01502 // If we've received a HeartbeatFrag, we know the last (available) frag # 01503 const iter_t heartbeat_frag = wi.frags_.find(frag_info[i].first); 01504 if (heartbeat_frag != wi.frags_.end()) { 01505 extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value); 01506 } 01507 } 01508 01509 // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag 01510 const iter_t low = wi.frags_.lower_bound(wi.recvd_.cumulative_ack()), 01511 high = wi.frags_.upper_bound(wi.recvd_.last_ack()), 01512 end = wi.frags_.end(); 01513 for (iter_t iter = wi.frags_.begin(); iter != end; ++iter) { 01514 if (iter == low) { 01515 // skip over the range covered by step #1 above 01516 if (high == end) { 01517 break; 01518 } 01519 iter = high; 01520 } 01521 01522 const SequenceRange range(iter->first, iter->first); 01523 if (receive_strategy()->has_fragments(range, pub_id, &frag_info)) { 01524 extend_bitmap_range(frag_info.back().second, iter->second.value); 01525 } else { 01526 // it was not in the recv strategy, so the entire range is "missing" 01527 frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet())); 01528 RTPS::FragmentNumberSet& fnSet = frag_info.back().second; 01529 fnSet.bitmapBase.value = 1; 01530 fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value); 01531 fnSet.bitmap.length((fnSet.numBits + 31) / 32); 01532 for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) { 01533 fnSet.bitmap[i] = 0xFFFFFFFF; 01534 } 01535 } 01536 } 01537 01538 if (frag_info.empty()) { 01539 return 0; 01540 } 01541 01542 const RTPS::NackFragSubmessage nackfrag_prototype = { 01543 {RTPS::NACK_FRAG, RTPS::FLAG_E, 0 /* length set below */}, 01544 ENTITYID_UNKNOWN, // readerId will be filled-in by send_heartbeat_replies() 01545 ENTITYID_UNKNOWN, // writerId will be filled-in by send_heartbeat_replies() 01546 {0, 0}, // writerSN set below 01547 RTPS::FragmentNumberSet(), // fragmentNumberState set below 01548 {0} // count set below 01549 }; 01550 01551 size_t size = 0, padding = 0; 01552 for (size_t i = 0; i < frag_info.size(); ++i) { 01553 nf.push_back(nackfrag_prototype); 01554 RTPS::NackFragSubmessage& nackfrag = nf.back(); 01555 nackfrag.writerSN.low = frag_info[i].first.getLow(); 01556 nackfrag.writerSN.high = frag_info[i].first.getHigh(); 01557 nackfrag.fragmentNumberState = frag_info[i].second; 01558 nackfrag.count.value = ++wi.nackfrag_count_; 01559 const size_t before_size = size; 01560 gen_find_size(nackfrag, size, padding); 01561 nackfrag.smHeader.submessageLength = 01562 static_cast<CORBA::UShort>(size - before_size) - RTPS::SMHDR_SZ; 01563 } 01564 return size; 01565 }
ACE_INET_Addr OpenDDS::DCPS::RtpsUdpDataLink::get_locator | ( | const RepoId & | remote_id | ) | const |
Definition at line 288 of file RtpsUdpDataLink.cpp.
References LM_ERROR, OPENDDS_MAP_CMP(), and OPENDDS_STRING.
Referenced by durability_resend(), get_locators(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper(), and send_durability_gaps().
00289 { 00290 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t; 00291 const iter_t iter = locators_.find(remote_id); 00292 if (iter == locators_.end()) { 00293 const GuidConverter conv(remote_id); 00294 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::get_locator_i() - " 00295 "no locator found for peer %C\n", OPENDDS_STRING(conv).c_str())); 00296 return ACE_INET_Addr(); 00297 } 00298 return iter->second.addr_; 00299 }
void OpenDDS::DCPS::RtpsUdpDataLink::get_locators | ( | const RepoId & | local_id, | |
OPENDDS_SET(ACE_INET_Addr)& | addrs | |||
) | const |
Given a 'local_id' of a publication or subscription, populate the set of 'addrs' with the network addresses of any remote peers (or if 'local_id' is GUID_UNKNOWN, all known addresses).
Definition at line 263 of file RtpsUdpDataLink.cpp.
References get_locator(), OpenDDS::DCPS::GUID_UNKNOWN, OPENDDS_MAP_CMP(), and OpenDDS::DCPS::DataLink::peer_ids().
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper(), and send_heartbeats_manual().
00265 { 00266 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t; 00267 00268 if (local_id == GUID_UNKNOWN) { 00269 for (iter_t iter = locators_.begin(); iter != locators_.end(); ++iter) { 00270 addrs.insert(iter->second.addr_); 00271 } 00272 return; 00273 } 00274 00275 const GUIDSeq_var peers = peer_ids(local_id); 00276 if (!peers.ptr()) { 00277 return; 00278 } 00279 for (CORBA::ULong i = 0; i < peers->length(); ++i) { 00280 const ACE_INET_Addr addr = get_locator(peers[i]); 00281 if (addr != ACE_INET_Addr()) { 00282 addrs.insert(addr); 00283 } 00284 } 00285 }
ACE_INLINE ACE_Reactor * OpenDDS::DCPS::RtpsUdpDataLink::get_reactor | ( | void | ) |
Definition at line 16 of file RtpsUdpDataLink.inl.
References reactor_task_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::cancel(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::disable(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::enable(), OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().
00017 { 00018 if (!reactor_task_) return 0; 00019 return reactor_task_->get_reactor(); 00020 }
const GuidPrefix_t& OpenDDS::DCPS::RtpsUdpDataLink::local_prefix | ( | ) | const [inline] |
Definition at line 95 of file RtpsUdpDataLink.h.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().
00095 { return local_prefix_; }
ACE_Message_Block * OpenDDS::DCPS::RtpsUdpDataLink::marshal_gaps | ( | const RepoId & | writer, | |
const RepoId & | reader, | |||
const DisjointSequence & | gaps, | |||
bool | durable = false | |||
) | [private] |
Definition at line 2185 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::DisjointSequence::to_bitmap(), OpenDDS::DCPS::Transport_debug_level, and writers_.
Referenced by send_directed_nack_replies(), send_durability_gaps(), and send_nack_replies().
02187 { 02188 using namespace RTPS; 02189 // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range 02190 // [gapStart, gapListBase) and those in the SNSet. 02191 const SequenceNumber firstMissing = gaps.low(), 02192 base = ++SequenceNumber(gaps.cumulative_ack()); 02193 const SequenceNumber_t gapStart = {firstMissing.getHigh(), 02194 firstMissing.getLow()}, 02195 gapListBase = {base.getHigh(), base.getLow()}; 02196 CORBA::ULong num_bits = 0; 02197 LongSeq8 bitmap; 02198 02199 if (gaps.disjoint()) { 02200 bitmap.length(bitmap_num_longs(base, gaps.high())); 02201 gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits); 02202 02203 } else { 02204 bitmap.length(1); 02205 bitmap[0] = 0; 02206 num_bits = 1; 02207 } 02208 02209 GapSubmessage gap = { 02210 {GAP, FLAG_E, 0 /*length determined below*/}, 02211 reader.entityId, 02212 writer.entityId, 02213 gapStart, 02214 {gapListBase, num_bits, bitmap} 02215 }; 02216 02217 if (Transport_debug_level > 5) { 02218 const GuidConverter conv(writer); 02219 SequenceRange sr; 02220 sr.first.setValue(gap.gapStart.high, gap.gapStart.low); 02221 SequenceNumber srbase; 02222 srbase.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low); 02223 sr.second = srbase.previous(); 02224 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::marshal_gaps " 02225 "GAP with range [%q, %q] from %C\n", 02226 sr.first.getValue(), sr.second.getValue(), 02227 OPENDDS_STRING(conv).c_str())); 02228 } 02229 02230 size_t gap_size = 0, padding = 0; 02231 gen_find_size(gap, gap_size, padding); 02232 gap.smHeader.submessageLength = 02233 static_cast<CORBA::UShort>(gap_size + padding) - SMHDR_SZ; 02234 02235 // For durable writers, change a non-directed Gap into multiple directed gaps. 02236 OPENDDS_VECTOR(RepoId) readers; 02237 if (durable && reader.entityId == ENTITYID_UNKNOWN) { 02238 if (Transport_debug_level > 5) { 02239 const GuidConverter local_conv(writer); 02240 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps local %C " 02241 "durable writer\n", OPENDDS_STRING(local_conv).c_str())); 02242 } 02243 const RtpsWriterMap::iterator iter = writers_.find(writer); 02244 RtpsWriter& rw = iter->second; 02245 for (ReaderInfoMap::iterator ri = rw.remote_readers_.begin(); 02246 ri != rw.remote_readers_.end(); ++ri) { 02247 if (!ri->second.expecting_durable_data()) { 02248 readers.push_back(ri->first); 02249 } else if (Transport_debug_level > 5) { 02250 const GuidConverter remote_conv(ri->first); 02251 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps reader " 02252 "%C is expecting durable data, no GAP sent\n", 02253 OPENDDS_STRING(remote_conv).c_str())); 02254 } 02255 } 02256 if (readers.empty()) return 0; 02257 } 02258 02259 const size_t size_per_idst = INFO_DST_SZ + SMHDR_SZ, 02260 prefix_sz = sizeof(reader.guidPrefix); 02261 // no additional padding needed for INFO_DST 02262 const size_t total_sz = readers.empty() ? (gap_size + padding) : 02263 (readers.size() * (gap_size + padding + size_per_idst)); 02264 02265 ACE_Message_Block* mb_gap = new ACE_Message_Block(total_sz); 02266 //FUTURE: allocators? 02267 // byte swapping is handled in the operator<<() implementation 02268 Serializer ser(mb_gap, false, Serializer::ALIGN_CDR); 02269 if (readers.empty()) { 02270 ser << gap; 02271 } else { 02272 InfoDestinationSubmessage idst = { 02273 {INFO_DST, FLAG_E, INFO_DST_SZ}, 02274 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 02275 }; 02276 for (size_t i = 0; i < readers.size(); ++i) { 02277 std::memcpy(idst.guidPrefix, readers[i].guidPrefix, prefix_sz); 02278 gap.readerId = readers[i].entityId; 02279 ser << idst; 02280 ser << gap; 02281 } 02282 } 02283 return mb_gap; 02284 }
ACE_INLINE ACE_SOCK_Dgram_Mcast & OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket | ( | ) |
Definition at line 36 of file RtpsUdpDataLink.inl.
References multicast_socket_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().
00037 { 00038 return multicast_socket_; 00039 }
bool OpenDDS::DCPS::RtpsUdpDataLink::open | ( | const ACE_SOCK_Dgram & | unicast_socket | ) |
Definition at line 177 of file RtpsUdpDataLink.cpp.
References ACE_TEXT(), ACE_TEXT_CHAR_TO_TCHAR, config(), ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, ACE_SOCK_Dgram_Mcast::join(), LM_ERROR, multi_buff_, OpenDDS::DCPS::RtpsUdpInst::multicast_group_address_, OpenDDS::DCPS::RtpsUdpInst::multicast_interface_, multicast_socket_, OPENDDS_STRING, ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), OpenDDS::DCPS::RtpsUdpInst::rcv_buffer_size_, OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::TransportSendStrategy::send_buffer(), OpenDDS::DCPS::RtpsUdpInst::send_buffer_size_, send_strategy(), OpenDDS::DCPS::DataLink::send_strategy_, ACE_SOCK::set_option(), OpenDDS::DCPS::set_socket_multicast_ttl(), OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::RtpsUdpInst::ttl_, unicast_socket_, and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.
00178 { 00179 unicast_socket_ = unicast_socket; 00180 00181 RtpsUdpInst& config = this->config(); 00182 00183 if (config.use_multicast_) { 00184 const OPENDDS_STRING& net_if = config.multicast_interface_; 00185 #ifdef ACE_HAS_MAC_OSX 00186 multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | 00187 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE); 00188 #endif 00189 if (multicast_socket_.join(config.multicast_group_address_, 1, 00190 net_if.empty() ? 0 : 00191 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) { 00192 ACE_ERROR_RETURN((LM_ERROR, 00193 ACE_TEXT("(%P|%t) ERROR: ") 00194 ACE_TEXT("RtpsUdpDataLink::open: ") 00195 ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed: %m\n")), 00196 false); 00197 } 00198 } 00199 00200 if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, config.ttl_)) { 00201 ACE_ERROR_RETURN((LM_ERROR, 00202 ACE_TEXT("(%P|%t) ERROR: ") 00203 ACE_TEXT("RtpsUdpDataLink::open: ") 00204 ACE_TEXT("failed to set TTL: %d\n"), 00205 config.ttl_), 00206 false); 00207 } 00208 00209 if (config.send_buffer_size_ > 0) { 00210 int snd_size = config.send_buffer_size_; 00211 if (this->unicast_socket_.set_option(SOL_SOCKET, 00212 SO_SNDBUF, 00213 (void *) &snd_size, 00214 sizeof(snd_size)) < 0 00215 && errno != ENOTSUP) { 00216 ACE_ERROR_RETURN((LM_ERROR, 00217 ACE_TEXT("(%P|%t) ERROR: ") 00218 ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"), 00219 snd_size), 00220 false); 00221 } 00222 } 00223 00224 if (config.rcv_buffer_size_ > 0) { 00225 int rcv_size = config.rcv_buffer_size_; 00226 if (this->unicast_socket_.set_option(SOL_SOCKET, 00227 SO_RCVBUF, 00228 (void *) &rcv_size, 00229 sizeof(int)) < 0 00230 && errno != ENOTSUP) { 00231 ACE_ERROR_RETURN((LM_ERROR, 00232 ACE_TEXT("(%P|%t) ERROR: ") 00233 ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"), 00234 rcv_size), 00235 false); 00236 } 00237 } 00238 00239 send_strategy()->send_buffer(&multi_buff_); 00240 00241 if (start(send_strategy_, 00242 receive_strategy_) != 0) { 00243 stop_i(); 00244 ACE_ERROR_RETURN((LM_ERROR, 00245 ACE_TEXT("(%P|%t) ERROR: ") 00246 ACE_TEXT("UdpDataLink::open: start failed!\n")), 00247 false); 00248 } 00249 00250 return true; 00251 }
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
CORBA::Long | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
RtpsReader | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
WriterInfo | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
RtpsWriter | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
ReaderInfo | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
RemoteInfo | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
OPENDDS_VECTOR(RepoId) | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
Referenced by get_locator(), get_locators(), and requires_inline_qos().
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP | ( | RepoId | , | |
InterestingRemote | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP | ( | RepoId | , | |
RtpsReaderMap::iterator | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET | ( | InterestingAckNack | ) | [private] |
Referenced by do_remove_sample(), pre_stop_i(), process_acked_by_all_i(), release_reservations_i(), send_heartbeats(), send_heartbeats_manual(), and send_nack_replies().
void OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i | ( | ) | [virtual] |
Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 427 of file RtpsUdpDataLink.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_, heartbeat_counts_, lock_, OPENDDS_MULTIMAP, OPENDDS_SET(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::to_deliver_, and writers_.
00428 { 00429 DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6); 00430 DataLink::pre_stop_i(); 00431 OPENDDS_VECTOR(TransportQueueElement*) to_deliver; 00432 OPENDDS_VECTOR(TransportQueueElement*) to_drop; 00433 { 00434 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00435 00436 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 00437 00438 RtpsWriterMap::iterator iter = writers_.begin(); 00439 while (iter != writers_.end()) { 00440 RtpsWriter& writer = iter->second; 00441 if (!writer.to_deliver_.empty()) { 00442 iter_t iter = writer.to_deliver_.begin(); 00443 while (iter != writer.to_deliver_.end()) { 00444 to_deliver.push_back(iter->second); 00445 writer.to_deliver_.erase(iter); 00446 iter = writer.to_deliver_.begin(); 00447 } 00448 } 00449 if (!writer.elems_not_acked_.empty()) { 00450 OPENDDS_SET(SequenceNumber) sns_to_release; 00451 iter_t iter = writer.elems_not_acked_.begin(); 00452 while (iter != writer.elems_not_acked_.end()) { 00453 to_drop.push_back(iter->second); 00454 sns_to_release.insert(iter->first); 00455 writer.elems_not_acked_.erase(iter); 00456 iter = writer.elems_not_acked_.begin(); 00457 } 00458 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin(); 00459 while (sns_it != sns_to_release.end()) { 00460 writer.send_buff_->release_acked(*sns_it); 00461 ++sns_it; 00462 } 00463 } 00464 RtpsWriterMap::iterator last = iter; 00465 ++iter; 00466 heartbeat_counts_.erase(last->first); 00467 writers_.erase(last); 00468 } 00469 } 00470 typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter; 00471 tqe_iter deliver_it = to_deliver.begin(); 00472 while (deliver_it != to_deliver.end()) { 00473 (*deliver_it)->data_delivered(); 00474 ++deliver_it; 00475 } 00476 tqe_iter drop_it = to_drop.begin(); 00477 while (drop_it != to_drop.end()) { 00478 (*drop_it)->data_dropped(true); 00479 ++drop_it; 00480 } 00481 }
void OpenDDS::DCPS::RtpsUdpDataLink::process_acked_by_all_i | ( | ACE_Guard< ACE_Thread_Mutex > & | g, | |
const RepoId & | pub_id | |||
) | [private] |
Definition at line 2114 of file RtpsUdpDataLink.cpp.
References ACE_Guard< ACE_LOCK >::acquire(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_, OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_MULTIMAP, OPENDDS_SET(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::to_deliver_, and writers_.
Referenced by received(), and release_reservations_i().
02115 { 02116 using namespace OpenDDS::RTPS; 02117 typedef RtpsWriterMap::iterator rw_iter; 02118 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 02119 OPENDDS_VECTOR(RepoId) to_check; 02120 rw_iter rw = writers_.find(pub_id); 02121 if (rw == writers_.end()) { 02122 return; 02123 } 02124 RtpsWriter& writer = rw->second; 02125 if (!writer.elems_not_acked_.empty()) { 02126 02127 //start with the max sequence number writer knows about and decrease 02128 //by what the min over all readers is 02129 SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE; 02130 02131 typedef ReaderInfoMap::iterator ri_iter; 02132 const ri_iter end = writer.remote_readers_.end(); 02133 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) { 02134 if (ri->second.cur_cumulative_ack_ < all_readers_ack) { 02135 all_readers_ack = ri->second.cur_cumulative_ack_; 02136 } 02137 } 02138 if (all_readers_ack == SequenceNumber::MAX_VALUE) { 02139 return; 02140 } 02141 OPENDDS_VECTOR(SequenceNumber) sns; 02142 //if any messages fully acked, call data delivered and remove from map 02143 02144 iter_t it = writer.elems_not_acked_.begin(); 02145 OPENDDS_SET(SequenceNumber) sns_to_release; 02146 while (it != writer.elems_not_acked_.end()) { 02147 if (it->first < all_readers_ack) { 02148 writer.to_deliver_.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second)); 02149 sns_to_release.insert(it->first); 02150 iter_t last = it; 02151 ++it; 02152 writer.elems_not_acked_.erase(last); 02153 } else { 02154 break; 02155 } 02156 } 02157 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin(); 02158 while (sns_it != sns_to_release.end()) { 02159 writer.send_buff_->release_acked(*sns_it); 02160 ++sns_it; 02161 } 02162 TransportQueueElement* tqe_to_deliver; 02163 02164 while (true) { 02165 rw_iter deliver_on_writer = writers_.find(pub_id); 02166 if (deliver_on_writer == writers_.end()) { 02167 break; 02168 } 02169 RtpsWriter& writer = deliver_on_writer->second; 02170 iter_t to_deliver_iter = writer.to_deliver_.begin(); 02171 if (to_deliver_iter == writer.to_deliver_.end()) { 02172 break; 02173 } 02174 tqe_to_deliver = to_deliver_iter->second; 02175 writer.to_deliver_.erase(to_deliver_iter); 02176 g.release(); 02177 02178 tqe_to_deliver->data_delivered(); 02179 g.acquire(); 02180 } 02181 } 02182 }
bool OpenDDS::DCPS::RtpsUdpDataLink::process_data_i | ( | const RTPS::DataSubmessage & | data, | |
const RepoId & | src, | |||
RtpsReaderMap::value_type & | rr | |||
) | [private] |
Definition at line 986 of file RtpsUdpDataLink.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DisjointSequence::contains(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), deliver_held_data(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::do_not_withhold_data_from(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_STRING, OpenDDS::DCPS::SequenceNumber::previous(), receive_strategy(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::RtpsUdpReceiveStrategy::withhold_data_from(), and OpenDDS::RTPS::DataSubmessage::writerSN.
Referenced by received().
00989 { 00990 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src); 00991 if (wi != rr.second.remote_writers_.end()) { 00992 WriterInfo& info = wi->second; 00993 SequenceNumber seq; 00994 seq.setValue(data.writerSN.high, data.writerSN.low); 00995 info.frags_.erase(seq); 00996 const RepoId& readerId = rr.first; 00997 if (info.recvd_.contains(seq)) { 00998 if (Transport_debug_level > 5) { 00999 GuidConverter writer(src); 01000 GuidConverter reader(readerId); 01001 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -") 01002 ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because ALREADY received\n"), 01003 seq.getValue(), 01004 OPENDDS_STRING(writer).c_str(), 01005 OPENDDS_STRING(reader).c_str())); 01006 } 01007 receive_strategy()->withhold_data_from(readerId); 01008 } else if (info.recvd_.disjoint() || 01009 (!info.recvd_.empty() && info.recvd_.cumulative_ack() != seq.previous()) 01010 || (rr.second.durable_ && !info.recvd_.empty() && info.recvd_.low() > 1) 01011 || (rr.second.durable_ && info.recvd_.empty() && seq > 1)) { 01012 if (Transport_debug_level > 5) { 01013 GuidConverter writer(src); 01014 GuidConverter reader(readerId); 01015 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -") 01016 ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because can't receive yet\n"), 01017 seq.getValue(), 01018 OPENDDS_STRING(writer).c_str(), 01019 OPENDDS_STRING(reader).c_str())); 01020 } 01021 const ReceivedDataSample* sample = 01022 receive_strategy()->withhold_data_from(readerId); 01023 info.held_.insert(std::make_pair(seq, *sample)); 01024 } else { 01025 if (Transport_debug_level > 5) { 01026 GuidConverter writer(src); 01027 GuidConverter reader(readerId); 01028 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -") 01029 ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"), 01030 seq.getValue(), 01031 OPENDDS_STRING(writer).c_str(), 01032 OPENDDS_STRING(reader).c_str())); 01033 } 01034 receive_strategy()->do_not_withhold_data_from(readerId); 01035 } 01036 info.recvd_.insert(seq); 01037 deliver_held_data(readerId, info, rr.second.durable_); 01038 } else { 01039 if (Transport_debug_level > 5) { 01040 GuidConverter writer(src); 01041 GuidConverter reader(rr.first); 01042 SequenceNumber seq; 01043 seq.setValue(data.writerSN.high, data.writerSN.low); 01044 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -") 01045 ACE_TEXT(" data seq: %q from %C to %C OK to deliver (Writer not currently in Reader remote writer map)\n"), 01046 seq.getValue(), 01047 OPENDDS_STRING(writer).c_str(), 01048 OPENDDS_STRING(reader).c_str())); 01049 } 01050 receive_strategy()->do_not_withhold_data_from(rr.first); 01051 } 01052 return false; 01053 }
bool OpenDDS::DCPS::RtpsUdpDataLink::process_gap_i | ( | const RTPS::GapSubmessage & | gap, | |
const RepoId & | src, | |||
RtpsReaderMap::value_type & | rr | |||
) | [private] |
Definition at line 1071 of file RtpsUdpDataLink.cpp.
References deliver_held_data(), OpenDDS::RTPS::GapSubmessage::gapList, OpenDDS::RTPS::GapSubmessage::gapStart, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, LM_WARNING, OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, and VDBG_LVL.
Referenced by received().
01073 { 01074 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src); 01075 if (wi != rr.second.remote_writers_.end()) { 01076 SequenceRange sr; 01077 sr.first.setValue(gap.gapStart.high, gap.gapStart.low); 01078 SequenceNumber base; 01079 base.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low); 01080 SequenceNumber first_received = SequenceNumber::MAX_VALUE; 01081 if (!wi->second.recvd_.empty()) { 01082 OPENDDS_VECTOR(SequenceRange) missing = wi->second.recvd_.missing_sequence_ranges(); 01083 if (!missing.empty()) { 01084 first_received = missing.front().second; 01085 } 01086 } 01087 sr.second = std::min(first_received, base.previous()); 01088 if (sr.first <= sr.second) { 01089 if (Transport_debug_level > 5) { 01090 const GuidConverter conv(src); 01091 const GuidConverter rdr(rr.first); 01092 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_gap_i " 01093 "Reader %C received GAP with range [%q, %q] (inserting range [%q, %q]) from %C\n", 01094 OPENDDS_STRING(rdr).c_str(), 01095 sr.first.getValue(), base.previous().getValue(), 01096 sr.first.getValue(), sr.second.getValue(), 01097 OPENDDS_STRING(conv).c_str())); 01098 } 01099 wi->second.recvd_.insert(sr); 01100 } else { 01101 const GuidConverter conv(src); 01102 VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_gap_i " 01103 "received GAP with invalid range [%q, %q] from %C\n", 01104 sr.first.getValue(), sr.second.getValue(), 01105 OPENDDS_STRING(conv).c_str()), 2); 01106 } 01107 wi->second.recvd_.insert(base, gap.gapList.numBits, 01108 gap.gapList.bitmap.get_buffer()); 01109 deliver_held_data(rr.first, wi->second, rr.second.durable_); 01110 //FUTURE: to support wait_for_acks(), notify DCPS layer of the GAP 01111 } 01112 return false; 01113 }
bool OpenDDS::DCPS::RtpsUdpDataLink::process_hb_frag_i | ( | const RTPS::HeartBeatFragSubmessage & | hb_frag, | |
const RepoId & | src, | |||
RtpsReaderMap::value_type & | rr | |||
) | [private] |
Definition at line 1596 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::HeartBeatFragSubmessage::count, OpenDDS::RTPS::HeartBeatFragSubmessage::lastFragmentNum, OpenDDS::DCPS::SequenceNumber::setValue(), and OpenDDS::RTPS::HeartBeatFragSubmessage::writerSN.
Referenced by received().
01599 { 01600 WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src); 01601 if (wi == rr.second.remote_writers_.end()) { 01602 // we may not be associated yet, even if the writer thinks we are 01603 return false; 01604 } 01605 01606 if (hb_frag.count.value <= wi->second.hb_frag_recvd_count_) { 01607 return false; 01608 } 01609 01610 wi->second.hb_frag_recvd_count_ = hb_frag.count.value; 01611 01612 SequenceNumber seq; 01613 seq.setValue(hb_frag.writerSN.high, hb_frag.writerSN.low); 01614 01615 // If seq is outside the heartbeat range or we haven't completely received 01616 // it yet, send a NackFrag along with the AckNack. The heartbeat range needs 01617 // to be checked first because recvd_ contains the numbers below the 01618 // heartbeat range (so that we don't NACK those). 01619 if (seq < wi->second.hb_range_.first || seq > wi->second.hb_range_.second 01620 || !wi->second.recvd_.contains(seq)) { 01621 wi->second.frags_[seq] = hb_frag.lastFragmentNum; 01622 wi->second.ack_pending_ = true; 01623 return true; // timer will invoke send_heartbeat_replies() 01624 } 01625 return false; 01626 }
bool OpenDDS::DCPS::RtpsUdpDataLink::process_heartbeat_i | ( | const RTPS::HeartBeatSubmessage & | heartbeat, | |
const RepoId & | src, | |||
RtpsReaderMap::value_type & | rr | |||
) | [private] |
Definition at line 1171 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::ack_pending_, OpenDDS::RTPS::HeartBeatSubmessage::count, deliver_held_data(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::FLAG_L, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::hb_range_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::heartbeat_recvd_count_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::initial_hb_, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::RTPS::HeartBeatSubmessage::lastSN, OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::SequenceNumber::previous(), receive_strategy(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::should_nack(), OpenDDS::RTPS::HeartBeatSubmessage::smHeader, and OpenDDS::DCPS::SequenceNumber::ZERO().
Referenced by received().
01174 { 01175 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src); 01176 if (wi == rr.second.remote_writers_.end()) { 01177 // we may not be associated yet, even if the writer thinks we are 01178 return false; 01179 } 01180 01181 WriterInfo& info = wi->second; 01182 01183 if (heartbeat.count.value <= info.heartbeat_recvd_count_) { 01184 return false; 01185 } 01186 info.heartbeat_recvd_count_ = heartbeat.count.value; 01187 01188 SequenceNumber& first = info.hb_range_.first; 01189 first.setValue(heartbeat.firstSN.high, heartbeat.firstSN.low); 01190 SequenceNumber& last = info.hb_range_.second; 01191 last.setValue(heartbeat.lastSN.high, heartbeat.lastSN.low); 01192 static const SequenceNumber starting, zero = SequenceNumber::ZERO(); 01193 01194 DisjointSequence& recvd = info.recvd_; 01195 if (!rr.second.durable_ && info.initial_hb_) { 01196 if (last.getValue() < starting.getValue()) { 01197 // this is an invalid heartbeat -- last must be positive 01198 return false; 01199 } 01200 // For the non-durable reader, the first received HB or DATA establishes 01201 // a baseline of the lowest sequence number we'd ever need to NACK. 01202 if (recvd.empty() || recvd.low() >= last) { 01203 recvd.insert(SequenceRange(zero, last)); 01204 } else { 01205 recvd.insert(SequenceRange(zero, recvd.low())); 01206 } 01207 } else if (!recvd.empty()) { 01208 // All sequence numbers below 'first' should not be NACKed. 01209 // The value of 'first' may not decrease with subsequent HBs. 01210 recvd.insert(SequenceRange(zero, 01211 (first > starting) ? first.previous() : zero)); 01212 } 01213 01214 deliver_held_data(rr.first, info, rr.second.durable_); 01215 01216 //FUTURE: to support wait_for_acks(), notify DCPS layer of the sequence 01217 // numbers we no longer expect to receive due to HEARTBEAT 01218 01219 info.initial_hb_ = false; 01220 01221 const bool final = heartbeat.smHeader.flags & RTPS::FLAG_F, 01222 liveliness = heartbeat.smHeader.flags & RTPS::FLAG_L; 01223 01224 if (!final || (!liveliness && (info.should_nack() || 01225 rr.second.nack_durable(info) || 01226 receive_strategy()->has_fragments(info.hb_range_, wi->first)))) { 01227 info.ack_pending_ = true; 01228 return true; // timer will invoke send_heartbeat_replies() 01229 } 01230 01231 //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC 01232 return false; 01233 }
void OpenDDS::DCPS::RtpsUdpDataLink::process_requested_changes | ( | DisjointSequence & | requests, | |
const RtpsWriter & | writer, | |||
const ReaderInfo & | reader | |||
) | [private] |
Definition at line 2039 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::SequenceNumberSet::bitmap, OpenDDS::RTPS::SequenceNumberSet::bitmapBase, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::heartbeat_high(), OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::RTPS::SequenceNumberSet::numBits, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, and OpenDDS::DCPS::SequenceNumber::setValue().
Referenced by send_directed_nack_replies(), and send_nack_replies().
02042 { 02043 for (size_t i = 0; i < reader.requested_changes_.size(); ++i) { 02044 const RTPS::SequenceNumberSet& sn_state = reader.requested_changes_[i]; 02045 SequenceNumber base; 02046 base.setValue(sn_state.bitmapBase.high, sn_state.bitmapBase.low); 02047 if (sn_state.numBits == 1 && !(sn_state.bitmap[0] & 1) 02048 && base == writer.heartbeat_high(reader)) { 02049 // Since there is an entry in requested_changes_, the DR must have 02050 // sent a non-final AckNack. If the base value is the high end of 02051 // the heartbeat range, treat it as a request for that seq#. 02052 if (!writer.send_buff_.is_nil() && writer.send_buff_->contains(base)) { 02053 requests.insert(base); 02054 } 02055 } else { 02056 requests.insert(base, sn_state.numBits, sn_state.bitmap.get_buffer()); 02057 } 02058 } 02059 }
ACE_INLINE bool OpenDDS::DCPS::RtpsUdpDataLink::reactor_is_shut_down | ( | ) |
Definition at line 23 of file RtpsUdpDataLink.inl.
References reactor_task_.
00024 { 00025 if (!reactor_task_) return true; 00026 return reactor_task_->is_shut_down(); 00027 }
OpenDDS::DCPS::RtpsUdpReceiveStrategy * OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy | ( | ) | [private] |
Definition at line 2865 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::DataLink::receive_strategy_.
Referenced by generate_nack_frags(), process_data_i(), process_heartbeat_i(), and send_ack_nacks().
02866 { 02867 return static_cast<OpenDDS::DCPS::RtpsUdpReceiveStrategy*>(receive_strategy_.in()); 02868 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::NackFragSubmessage & | nackfrag, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1836 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::GUID_t::guidPrefix, LM_DEBUG, LM_WARNING, local_prefix_, lock_, nack_reply_, OPENDDS_STRING, OpenDDS::RTPS::NackFragSubmessage::readerId, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, VDBG, OpenDDS::RTPS::NackFragSubmessage::writerId, writers_, and OpenDDS::RTPS::NackFragSubmessage::writerSN.
01838 { 01839 // local side is DW 01840 RepoId local; 01841 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t)); 01842 local.entityId = nackfrag.writerId; // can't be ENTITYID_UNKNOWN 01843 01844 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01845 const RtpsWriterMap::iterator rw = writers_.find(local); 01846 if (rw == writers_.end()) { 01847 if (Transport_debug_level > 5) { 01848 GuidConverter local_conv(local); 01849 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) " 01850 "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str())); 01851 } 01852 return; 01853 } 01854 01855 RepoId remote; 01856 std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t)); 01857 remote.entityId = nackfrag.readerId; 01858 01859 if (Transport_debug_level > 5) { 01860 GuidConverter local_conv(local), remote_conv(remote); 01861 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) " 01862 "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(), 01863 OPENDDS_STRING(remote_conv).c_str())); 01864 } 01865 01866 const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote); 01867 if (ri == rw->second.remote_readers_.end()) { 01868 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) " 01869 "WARNING ReaderInfo not found\n")); 01870 return; 01871 } 01872 01873 if (nackfrag.count.value <= ri->second.nackfrag_recvd_count_) { 01874 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) " 01875 "WARNING Count indicates duplicate, dropping\n")); 01876 return; 01877 } 01878 01879 ri->second.nackfrag_recvd_count_ = nackfrag.count.value; 01880 01881 SequenceNumber seq; 01882 seq.setValue(nackfrag.writerSN.high, nackfrag.writerSN.low); 01883 ri->second.requested_frags_[seq] = nackfrag.fragmentNumberState; 01884 g.release(); 01885 nack_reply_.schedule(); // timer will invoke send_nack_replies() 01886 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::AckNackSubmessage & | acknack, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1632 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::AckNackSubmessage::count, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::DisjointSequence::dump(), durability_resend(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::RTPS::FLAG_F, ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::DisjointSequence::insert(), interesting_readers_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), LM_DEBUG, LM_WARNING, local_prefix_, lock_, OpenDDS::DCPS::DisjointSequence::low(), nack_reply_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), process_acked_by_all_i(), OpenDDS::RTPS::AckNackSubmessage::readerId, OpenDDS::RTPS::AckNackSubmessage::readerSNState, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), send_durability_gaps(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::RTPS::AckNackSubmessage::smHeader, OpenDDS::DCPS::Transport_debug_level, VDBG, OpenDDS::RTPS::AckNackSubmessage::writerId, writers_, and OpenDDS::DCPS::SequenceNumber::ZERO().
01634 { 01635 // local side is DW 01636 RepoId local; 01637 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t)); 01638 local.entityId = acknack.writerId; // can't be ENTITYID_UNKNOWN 01639 01640 RepoId remote; 01641 std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t)); 01642 remote.entityId = acknack.readerId; 01643 01644 const ACE_Time_Value now = ACE_OS::gettimeofday(); 01645 OPENDDS_VECTOR(DiscoveryListener*) callbacks; 01646 01647 { 01648 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01649 for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote), 01650 limit = interesting_readers_.upper_bound(remote); 01651 pos != limit; 01652 ++pos) { 01653 pos->second.last_activity = now; 01654 // Ensure the acknack was for the writer. 01655 if (local == pos->second.localid) { 01656 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) { 01657 callbacks.push_back(pos->second.listener); 01658 pos->second.status = InterestingRemote::EXISTS; 01659 } 01660 } 01661 } 01662 } 01663 01664 for (size_t i = 0; i < callbacks.size(); ++i) { 01665 callbacks[i]->reader_exists(remote, local); 01666 } 01667 01668 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01669 const RtpsWriterMap::iterator rw = writers_.find(local); 01670 if (rw == writers_.end()) { 01671 if (Transport_debug_level > 5) { 01672 GuidConverter local_conv(local); 01673 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) " 01674 "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str())); 01675 } 01676 return; 01677 } 01678 01679 if (Transport_debug_level > 5) { 01680 GuidConverter local_conv(local), remote_conv(remote); 01681 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) " 01682 "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(), 01683 OPENDDS_STRING(remote_conv).c_str())); 01684 } 01685 01686 const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote); 01687 if (ri == rw->second.remote_readers_.end()) { 01688 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) " 01689 "WARNING ReaderInfo not found\n")); 01690 return; 01691 } 01692 01693 if (acknack.count.value <= ri->second.acknack_recvd_count_) { 01694 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) " 01695 "WARNING Count indicates duplicate, dropping\n")); 01696 return; 01697 } 01698 01699 ri->second.acknack_recvd_count_ = acknack.count.value; 01700 01701 if (!ri->second.handshake_done_) { 01702 ri->second.handshake_done_ = true; 01703 invoke_on_start_callbacks(true); 01704 } 01705 01706 OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks; 01707 const bool final = acknack.smHeader.flags & RTPS::FLAG_F; 01708 01709 if (!ri->second.durable_data_.empty()) { 01710 if (Transport_debug_level > 5) { 01711 const GuidConverter local_conv(local), remote_conv(remote); 01712 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01713 "local %C has durable for remote %C\n", 01714 OPENDDS_STRING(local_conv).c_str(), 01715 OPENDDS_STRING(remote_conv).c_str())); 01716 } 01717 SequenceNumber ack; 01718 ack.setValue(acknack.readerSNState.bitmapBase.high, 01719 acknack.readerSNState.bitmapBase.low); 01720 const SequenceNumber& dd_last = ri->second.durable_data_.rbegin()->first; 01721 if (ack > dd_last) { 01722 // Reader acknowledges durable data, we no longer need to store it 01723 ri->second.durable_data_.swap(pendingCallbacks); 01724 if (Transport_debug_level > 5) { 01725 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01726 "durable data acked\n")); 01727 } 01728 } else { 01729 DisjointSequence requests; 01730 if (!requests.insert(ack, acknack.readerSNState.numBits, 01731 acknack.readerSNState.bitmap.get_buffer()) 01732 && !final && ack == rw->second.heartbeat_high(ri->second)) { 01733 // This is a non-final AckNack with no bits in the bitmap. 01734 // Attempt to reply to a request for the "base" value which 01735 // is neither Acked nor Nacked, only when it's the HB high. 01736 if (ri->second.durable_data_.count(ack)) requests.insert(ack); 01737 } 01738 // Attempt to reply to nacks for durable data 01739 bool sent_some = false; 01740 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 01741 iter_t it = ri->second.durable_data_.begin(); 01742 const OPENDDS_VECTOR(SequenceRange) psr = requests.present_sequence_ranges(); 01743 SequenceNumber lastSent = SequenceNumber::ZERO(); 01744 if (!requests.empty()) { 01745 lastSent = requests.low().previous(); 01746 } 01747 DisjointSequence gaps; 01748 for (size_t i = 0; i < psr.size(); ++i) { 01749 for (; it != ri->second.durable_data_.end() 01750 && it->first < psr[i].first; ++it) ; // empty for-loop 01751 for (; it != ri->second.durable_data_.end() 01752 && it->first <= psr[i].second; ++it) { 01753 if (Transport_debug_level > 5) { 01754 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01755 "durable resend %d\n", int(it->first.getValue()))); 01756 } 01757 durability_resend(it->second); 01758 //FUTURE: combine multiple resends into one RTPS Message? 01759 sent_some = true; 01760 if (it->first > lastSent + 1) { 01761 gaps.insert(SequenceRange(lastSent + 1, it->first.previous())); 01762 } 01763 lastSent = it->first; 01764 } 01765 if (sent_some && lastSent < psr[i].second && psr[i].second < dd_last) { 01766 gaps.insert(SequenceRange(lastSent + 1, psr[i].second)); 01767 } 01768 } 01769 if (!gaps.empty()) { 01770 if (Transport_debug_level > 5) { 01771 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01772 "sending durability gaps: ")); 01773 gaps.dump(); 01774 } 01775 send_durability_gaps(local, remote, gaps); 01776 } 01777 if (sent_some) { 01778 return; 01779 } 01780 const SequenceNumber& dd_first = ri->second.durable_data_.begin()->first; 01781 if (!requests.empty() && requests.high() < dd_first) { 01782 // All nacks were below the start of the durable data. 01783 requests.insert(SequenceRange(requests.high(), dd_first.previous())); 01784 if (Transport_debug_level > 5) { 01785 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01786 "sending durability gaps for all requests: ")); 01787 requests.dump(); 01788 } 01789 send_durability_gaps(local, remote, requests); 01790 return; 01791 } 01792 if (!requests.empty() && requests.low() < dd_first) { 01793 // Lowest nack was below the start of the durable data. 01794 for (size_t i = 0; i < psr.size(); ++i) { 01795 if (psr[i].first > dd_first) { 01796 break; 01797 } 01798 gaps.insert(SequenceRange(psr[i].first, 01799 std::min(psr[i].second, dd_first))); 01800 } 01801 if (Transport_debug_level > 5) { 01802 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01803 "sending durability gaps for some requests: ")); 01804 gaps.dump(); 01805 } 01806 send_durability_gaps(local, remote, gaps); 01807 return; 01808 } 01809 } 01810 } 01811 SequenceNumber ack; 01812 ack.setValue(acknack.readerSNState.bitmapBase.high, 01813 acknack.readerSNState.bitmapBase.low); 01814 if (ack != SequenceNumber::SEQUENCENUMBER_UNKNOWN() 01815 && ack != SequenceNumber::ZERO()) { 01816 ri->second.cur_cumulative_ack_ = ack; 01817 } 01818 // If this ACKNACK was final, the DR doesn't expect a reply, and therefore 01819 // we don't need to do anything further. 01820 if (!final || bitmapNonEmpty(acknack.readerSNState)) { 01821 ri->second.requested_changes_.push_back(acknack.readerSNState); 01822 } 01823 process_acked_by_all_i(g, local); 01824 g.release(); 01825 if (!final) { 01826 nack_reply_.schedule(); // timer will invoke send_nack_replies() 01827 } 01828 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 01829 for (iter_t it = pendingCallbacks.begin(); 01830 it != pendingCallbacks.end(); ++it) { 01831 it->second->data_delivered(); 01832 } 01833 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::HeartBeatFragSubmessage & | hb_frag, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1589 of file RtpsUdpDataLink.cpp.
References datareader_dispatch(), and process_hb_frag_i().
01591 { 01592 datareader_dispatch(hb_frag, src_prefix, &RtpsUdpDataLink::process_hb_frag_i); 01593 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::HeartBeatSubmessage & | heartbeat, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1116 of file RtpsUdpDataLink.cpp.
References datareader_dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_t::guidPrefix, heartbeat_reply_, interesting_ack_nacks_, interesting_writers_, lock_, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), process_heartbeat_i(), readers_, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), and OpenDDS::RTPS::HeartBeatSubmessage::writerId.
01118 { 01119 RepoId src; 01120 std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t)); 01121 src.entityId = heartbeat.writerId; 01122 01123 bool schedule_acknack = false; 01124 const ACE_Time_Value now = ACE_OS::gettimeofday(); 01125 OPENDDS_VECTOR(InterestingRemote) callbacks; 01126 01127 { 01128 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01129 01130 // We received a heartbeat from a writer. 01131 // We should ACKNACK if the writer is interesting and there is no association. 01132 01133 for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src), 01134 limit = interesting_writers_.upper_bound(src); 01135 pos != limit; 01136 ++pos) { 01137 const RepoId& writerid = src; 01138 const RepoId& readerid = pos->second.localid; 01139 01140 RtpsReaderMap::const_iterator riter = readers_.find(readerid); 01141 if (riter == readers_.end()) { 01142 // Reader has no associations. 01143 interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address)); 01144 } else if (riter->second.remote_writers_.find(writerid) == riter->second.remote_writers_.end()) { 01145 // Reader is not associated with this writer. 01146 interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address)); 01147 } 01148 pos->second.last_activity = now; 01149 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) { 01150 callbacks.push_back(pos->second); 01151 pos->second.status = InterestingRemote::EXISTS; 01152 } 01153 } 01154 01155 schedule_acknack = !interesting_ack_nacks_.empty(); 01156 } 01157 01158 for (size_t i = 0; i < callbacks.size(); ++i) { 01159 callbacks[i].listener->writer_exists(src, callbacks[i].localid); 01160 } 01161 01162 if (schedule_acknack) { 01163 heartbeat_reply_.schedule(); 01164 } 01165 01166 datareader_dispatch(heartbeat, src_prefix, 01167 &RtpsUdpDataLink::process_heartbeat_i); 01168 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::GapSubmessage & | gap, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1064 of file RtpsUdpDataLink.cpp.
References datareader_dispatch(), and process_gap_i().
01066 { 01067 datareader_dispatch(gap, src_prefix, &RtpsUdpDataLink::process_gap_i); 01068 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::DataSubmessage & | data, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 979 of file RtpsUdpDataLink.cpp.
References datareader_dispatch(), and process_data_i().
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().
00981 { 00982 datareader_dispatch(data, src_prefix, &RtpsUdpDataLink::process_data_i); 00983 }
void OpenDDS::DCPS::RtpsUdpDataLink::register_for_reader | ( | const RepoId & | writerid, | |
const RepoId & | readerid, | |||
const ACE_INET_Addr & | address, | |||
OpenDDS::DCPS::DiscoveryListener * | listener | |||
) |
Definition at line 362 of file RtpsUdpDataLink.cpp.
References heartbeat_, heartbeat_counts_, interesting_readers_, and lock_.
00366 { 00367 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00368 bool enableheartbeat = interesting_readers_.empty(); 00369 interesting_readers_.insert(InterestingRemoteMapType::value_type(readerid, InterestingRemote(writerid, address, listener))); 00370 heartbeat_counts_[writerid] = 0; 00371 g.release(); 00372 if (enableheartbeat) { 00373 heartbeat_->schedule_enable(); 00374 } 00375 }
void OpenDDS::DCPS::RtpsUdpDataLink::register_for_writer | ( | const RepoId & | readerid, | |
const RepoId & | writerid, | |||
const ACE_INET_Addr & | address, | |||
OpenDDS::DCPS::DiscoveryListener * | listener | |||
) |
Definition at line 395 of file RtpsUdpDataLink.cpp.
References heartbeatchecker_, interesting_writers_, and lock_.
00399 { 00400 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00401 bool enableheartbeatchecker = interesting_writers_.empty(); 00402 interesting_writers_.insert(InterestingRemoteMapType::value_type(writerid, InterestingRemote(readerid, address, listener))); 00403 g.release(); 00404 if (enableheartbeatchecker) { 00405 heartbeatchecker_->schedule_enable(); 00406 } 00407 }
ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::release_remote_i | ( | const RepoId & | remote_id | ) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 42 of file RtpsUdpDataLink.inl.
References lock_.
00043 { 00044 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00045 locators_.erase(remote_id); 00046 }
void OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i | ( | const RepoId & | remote_id, | |
const RepoId & | local_id | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 502 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_, heartbeat_counts_, OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), lock_, OPENDDS_MULTIMAP, OPENDDS_SET(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), process_acked_by_all_i(), reader_index_, readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::to_deliver_, and writers_.
00504 { 00505 OPENDDS_VECTOR(TransportQueueElement*) to_deliver; 00506 OPENDDS_VECTOR(TransportQueueElement*) to_drop; 00507 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00508 using std::pair; 00509 const GuidConverter conv(local_id); 00510 if (conv.isWriter()) { 00511 const RtpsWriterMap::iterator rw = writers_.find(local_id); 00512 00513 if (rw != writers_.end()) { 00514 rw->second.remote_readers_.erase(remote_id); 00515 00516 if (rw->second.remote_readers_.empty()) { 00517 RtpsWriter& writer = rw->second; 00518 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 00519 00520 if (!writer.to_deliver_.empty()) { 00521 iter_t iter = writer.to_deliver_.begin(); 00522 while (iter != writer.to_deliver_.end()) { 00523 to_deliver.push_back(iter->second); 00524 writer.to_deliver_.erase(iter); 00525 iter = writer.to_deliver_.begin(); 00526 } 00527 } 00528 if (!writer.elems_not_acked_.empty()) { 00529 OPENDDS_SET(SequenceNumber) sns_to_release; 00530 iter_t iter = writer.elems_not_acked_.begin(); 00531 while (iter != writer.elems_not_acked_.end()) { 00532 to_drop.push_back(iter->second); 00533 sns_to_release.insert(iter->first); 00534 writer.elems_not_acked_.erase(iter); 00535 iter = writer.elems_not_acked_.begin(); 00536 } 00537 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin(); 00538 while (sns_it != sns_to_release.end()) { 00539 writer.send_buff_->release_acked(*sns_it); 00540 ++sns_it; 00541 } 00542 } 00543 heartbeat_counts_.erase(rw->first); 00544 writers_.erase(rw); 00545 } else { 00546 process_acked_by_all_i(g, local_id); 00547 } 00548 } 00549 00550 } else if (conv.isReader()) { 00551 const RtpsReaderMap::iterator rr = readers_.find(local_id); 00552 00553 if (rr != readers_.end()) { 00554 rr->second.remote_writers_.erase(remote_id); 00555 00556 for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters = 00557 reader_index_.equal_range(remote_id); 00558 iters.first != iters.second;) { 00559 if (iters.first->second == rr) { 00560 reader_index_.erase(iters.first++); 00561 } else { 00562 ++iters.first; 00563 } 00564 } 00565 00566 if (rr->second.remote_writers_.empty()) { 00567 readers_.erase(rr); 00568 } 00569 } 00570 } 00571 g.release(); 00572 typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter; 00573 tqe_iter deliver_it = to_deliver.begin(); 00574 while (deliver_it != to_deliver.end()) { 00575 (*deliver_it)->data_delivered(); 00576 ++deliver_it; 00577 } 00578 tqe_iter drop_it = to_drop.begin(); 00579 while (drop_it != to_drop.end()) { 00580 (*drop_it)->data_dropped(true); 00581 ++drop_it; 00582 } 00583 }
RemoveResult OpenDDS::DCPS::RtpsUdpDataLink::remove_sample | ( | const DataSampleElement * | sample, | |
void * | context | |||
) | [private, virtual] |
This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 494 of file RtpsUdpDataLink.cpp.
References lock_, and OpenDDS::DCPS::REMOVE_ERROR.
00495 { 00496 // see comment in RtpsUdpDataLink::send_i() for lock order 00497 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, REMOVE_ERROR); 00498 return DataLink::remove_sample(sample, &g); 00499 }
bool OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos | ( | const GUIDSeq_var & | peers | ) | [private] |
Definition at line 868 of file RtpsUdpDataLink.cpp.
References force_inline_qos_, and OPENDDS_MAP_CMP().
Referenced by customize_queue_element().
00869 { 00870 if (force_inline_qos_) { 00871 // Force true for testing purposes 00872 return true; 00873 } else { 00874 if (!peers.ptr()) { 00875 return false; 00876 } 00877 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::iterator iter_t; 00878 for (CORBA::ULong i = 0; i < peers->length(); ++i) { 00879 const iter_t iter = locators_.find(peers[i]); 00880 if (iter != locators_.end() && iter->second.requires_inline_qos_) { 00881 return true; 00882 } 00883 } 00884 return false; 00885 } 00886 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_ack_nacks | ( | RtpsReaderMap::iterator | rr, | |
bool | finalFlag = false | |||
) | [private] |
Definition at line 1255 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::DCPS::gen_find_size(), generate_nack_frags(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::DisjointSequence::last_ack(), LM_ERROR, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), receive_strategy(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::DisjointSequence::to_bitmap(), OpenDDS::DCPS::Transport_debug_level, and OpenDDS::DCPS::SequenceNumber::ZERO().
Referenced by send_final_acks(), and send_heartbeat_replies().
01256 { 01257 using namespace OpenDDS::RTPS; 01258 01259 WriterInfoMap& writers = rr->second.remote_writers_; 01260 for (WriterInfoMap::iterator wi = writers.begin(); wi != writers.end(); 01261 ++wi) { 01262 01263 // if we have some negative acknowledgments, we'll ask for a reply 01264 DisjointSequence& recvd = wi->second.recvd_; 01265 const bool nack = wi->second.should_nack() || 01266 rr->second.nack_durable(wi->second); 01267 bool final = finalFlag || !nack; 01268 01269 if (wi->second.ack_pending_ || nack || finalFlag) { 01270 const bool prev_ack_pending = wi->second.ack_pending_; 01271 wi->second.ack_pending_ = false; 01272 01273 SequenceNumber ack; 01274 CORBA::ULong num_bits = 1; 01275 LongSeq8 bitmap; 01276 bitmap.length(1); 01277 bitmap[0] = 0; 01278 01279 const SequenceNumber& hb_low = wi->second.hb_range_.first; 01280 const SequenceNumber& hb_high = wi->second.hb_range_.second; 01281 const SequenceNumber::Value hb_low_val = hb_low.getValue(), 01282 hb_high_val = hb_high.getValue(); 01283 01284 if (recvd.empty()) { 01285 // Nack the entire heartbeat range. Only reached when durable. 01286 ack = hb_low; 01287 bitmap.length(bitmap_num_longs(ack, hb_high)); 01288 const CORBA::ULong idx = (hb_high_val > hb_low_val + 255) 01289 ? 255 01290 : CORBA::ULong(hb_high_val - hb_low_val); 01291 DisjointSequence::fill_bitmap_range(0, idx, 01292 bitmap.get_buffer(), 01293 bitmap.length(), num_bits); 01294 } else if (((prev_ack_pending && !nack) || rr->second.nack_durable(wi->second)) && recvd.low() > hb_low) { 01295 // Nack the range between the heartbeat low and the recvd low. 01296 ack = hb_low; 01297 const SequenceNumber& rec_low = recvd.low(); 01298 const SequenceNumber::Value rec_low_val = rec_low.getValue(); 01299 bitmap.length(bitmap_num_longs(ack, rec_low)); 01300 const CORBA::ULong idx = (rec_low_val > hb_low_val + 255) 01301 ? 255 01302 : CORBA::ULong(rec_low_val - hb_low_val); 01303 DisjointSequence::fill_bitmap_range(0, idx, 01304 bitmap.get_buffer(), 01305 bitmap.length(), num_bits); 01306 01307 } else { 01308 ack = ++SequenceNumber(recvd.cumulative_ack()); 01309 if (recvd.low().getValue() > 1) { 01310 // since the "ack" really is cumulative, we need to make 01311 // sure that a lower discontinuity is not possible later 01312 recvd.insert(SequenceRange(SequenceNumber::ZERO(), recvd.low())); 01313 } 01314 01315 if (recvd.disjoint()) { 01316 bitmap.length(bitmap_num_longs(ack, recvd.last_ack().previous())); 01317 recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(), 01318 num_bits, true); 01319 } 01320 } 01321 01322 const SequenceNumber::Value ack_val = ack.getValue(); 01323 01324 if (!recvd.empty() && hb_high > recvd.high()) { 01325 const SequenceNumber eff_high = 01326 (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255); 01327 const SequenceNumber::Value eff_high_val = eff_high.getValue(); 01328 // Nack the range between the received high and the effective high. 01329 const CORBA::ULong old_len = bitmap.length(), 01330 new_len = bitmap_num_longs(ack, eff_high); 01331 if (new_len > old_len) { 01332 bitmap.length(new_len); 01333 for (CORBA::ULong i = old_len; i < new_len; ++i) { 01334 bitmap[i] = 0; 01335 } 01336 } 01337 const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val), 01338 idx_recv_high = recvd.disjoint() ? 01339 CORBA::ULong(recvd.high().getValue() - ack_val) : 0; 01340 DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high, 01341 bitmap.get_buffer(), new_len, 01342 num_bits); 01343 } 01344 01345 // If the receive strategy is holding any fragments, those should 01346 // not be "nacked" in the ACKNACK reply. They will be accounted for 01347 // in the NACK_FRAG(s) instead. 01348 bool frags_modified = 01349 receive_strategy()->remove_frags_from_bitmap(bitmap.get_buffer(), 01350 num_bits, ack, wi->first); 01351 if (frags_modified && !final) { // change to final if bitmap is empty 01352 final = true; 01353 for (CORBA::ULong i = 0; i < bitmap.length(); ++i) { 01354 if ((i + 1) * 32 <= num_bits) { 01355 if (bitmap[i]) { 01356 final = false; 01357 break; 01358 } 01359 } else { 01360 if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) { 01361 final = false; 01362 break; 01363 } 01364 } 01365 } 01366 } 01367 01368 AckNackSubmessage acknack = { 01369 {ACKNACK, 01370 CORBA::Octet(FLAG_E | (final ? FLAG_F : 0)), 01371 0 /*length*/}, 01372 rr->first.entityId, 01373 wi->first.entityId, 01374 { // SequenceNumberSet: acking bitmapBase - 1 01375 {ack.getHigh(), ack.getLow()}, 01376 num_bits, bitmap 01377 }, 01378 {++wi->second.acknack_count_} 01379 }; 01380 01381 size_t size = 0, padding = 0; 01382 gen_find_size(acknack, size, padding); 01383 acknack.smHeader.submessageLength = 01384 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ; 01385 InfoDestinationSubmessage info_dst = { 01386 {INFO_DST, FLAG_E, INFO_DST_SZ}, 01387 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 01388 }; 01389 gen_find_size(info_dst, size, padding); 01390 01391 OPENDDS_VECTOR(NackFragSubmessage) nack_frags; 01392 size += generate_nack_frags(nack_frags, wi->second, wi->first); 01393 01394 ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators? 01395 // byte swapping is handled in the operator<<() implementation 01396 Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR); 01397 std::memcpy(info_dst.guidPrefix, wi->first.guidPrefix, 01398 sizeof(GuidPrefix_t)); 01399 ser << info_dst; 01400 // Interoperability note: we used to insert INFO_REPLY submessage here, but 01401 // testing indicated that other DDS implementations didn't accept it. 01402 ser << acknack; 01403 for (size_t i = 0; i < nack_frags.size(); ++i) { 01404 nack_frags[i].readerId = rr->first.entityId; 01405 nack_frags[i].writerId = wi->first.entityId; 01406 ser << nack_frags[i]; // always 4-byte aligned 01407 } 01408 01409 if (!locators_.count(wi->first)) { 01410 if (Transport_debug_level) { 01411 const GuidConverter conv(wi->first); 01412 ACE_ERROR((LM_ERROR, 01413 "(%P|%t) RtpsUdpDataLink::send_heartbeat_replies() - " 01414 "no locator for remote %C\n", OPENDDS_STRING(conv).c_str())); 01415 } 01416 } else { 01417 send_strategy()->send_rtps_control(mb_acknack, 01418 locators_[wi->first].addr_); 01419 } 01420 } 01421 } 01422 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_directed_heartbeats | ( | OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)& | hbs | ) | [private] |
Definition at line 2480 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::assign(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, OpenDDS::RTPS::FLAG_E, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, local_prefix_, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::InfoDestinationSubmessage::smHeader, and writers_.
Referenced by send_heartbeats().
02481 { 02482 #if defined(OPENDDS_SECURITY) 02483 const EntityId_t& volatile_writer = 02484 RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER; 02485 02486 RTPS::InfoDestinationSubmessage idst; 02487 idst.smHeader.submessageId = RTPS::INFO_DST; 02488 idst.smHeader.flags = RTPS::FLAG_E; 02489 idst.smHeader.submessageLength = RTPS::INFO_DST_SZ; 02490 const size_t block_size = RTPS::INFO_DST_SZ + RTPS::HEARTBEAT_SZ 02491 + 2 * RTPS::SMHDR_SZ; 02492 Message_Block_Ptr mb; 02493 02494 typedef OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)::iterator iter_t; 02495 iter_t it = hbs.begin(), last = hbs.end(); 02496 while (it != last) { 02497 if (0 == std::memcmp(&it->writerId, &volatile_writer, sizeof(EntityId_t))) { 02498 RepoId local; 02499 RTPS::assign(local.guidPrefix, local_prefix_); 02500 local.entityId = it->writerId; 02501 RtpsWriterMap::const_iterator rw = writers_.find(local); 02502 if (rw != writers_.end()) { 02503 const ReaderInfoMap& rinfo = rw->second.remote_readers_; 02504 for (ReaderInfoMap::const_iterator ri = rinfo.begin(); 02505 ri != rinfo.end(); ++ri) { 02506 RTPS::assign(idst.guidPrefix, ri->first.guidPrefix); 02507 it->readerId = ri->first.entityId; 02508 if (mb) { 02509 mb->reset(); 02510 } else { 02511 mb.reset(new ACE_Message_Block(block_size)); 02512 } 02513 Serializer ser(mb.get(), false, Serializer::ALIGN_CDR); 02514 ser << idst; 02515 ser << *it; 02516 send_strategy()->send_rtps_control(*mb, locators_[ri->first].addr_); 02517 } 02518 } 02519 std::iter_swap(it, --last); 02520 } else { 02521 ++it; 02522 } 02523 } 02524 hbs.erase(last, hbs.end()); 02525 #else 02526 ACE_UNUSED_ARG(hbs); 02527 #endif 02528 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_directed_nack_replies | ( | const RepoId & | writerId, | |
RtpsWriter & | writer, | |||
const RepoId & | readerId, | |||
ReaderInfo & | reader | |||
) | [private] |
Definition at line 2062 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, marshal_gaps(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations(), process_requested_changes(), ACE_Message_Block::release(), OpenDDS::DCPS::SingleSendBuffer::resend_i(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::DCPS::TransportSendBuffer::strategy_lock(), and OpenDDS::DCPS::Transport_debug_level.
Referenced by send_nack_replies().
02066 { 02067 if (!locators_.count(readerId)) { 02068 return; 02069 } 02070 02071 DisjointSequence requests; 02072 process_requested_changes(requests, writer, reader); 02073 reader.requested_changes_.clear(); 02074 02075 DisjointSequence gaps; 02076 ACE_INET_Addr addr = locators_[readerId].addr_; 02077 02078 if (!requests.empty()) { 02079 if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) { 02080 gaps = requests; 02081 } else { 02082 OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges(); 02083 SingleSendBuffer& sb = *writer.send_buff_; 02084 ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock()); 02085 const RtpsUdpSendStrategy::OverrideToken ot = 02086 send_strategy()->override_destinations(addr); 02087 for (size_t i = 0; i < ranges.size(); ++i) { 02088 if (Transport_debug_level > 5) { 02089 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_directed_nack_replies " 02090 "resend data %d-%d\n", int(ranges[i].first.getValue()), 02091 int(ranges[i].second.getValue()))); 02092 } 02093 sb.resend_i(ranges[i], &gaps, readerId); 02094 } 02095 } 02096 } 02097 02098 if (gaps.empty()) { 02099 return; 02100 } 02101 if (Transport_debug_level > 5) { 02102 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_directed_nack_replies GAPs: ")); 02103 gaps.dump(); 02104 } 02105 ACE_Message_Block* mb_gap = 02106 marshal_gaps(writerId, readerId, gaps, writer.durable_); 02107 if (mb_gap) { 02108 send_strategy()->send_rtps_control(*mb_gap, addr); 02109 mb_gap->release(); 02110 } 02111 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_durability_gaps | ( | const RepoId & | writer, | |
const RepoId & | reader, | |||
const DisjointSequence & | gaps | |||
) | [private] |
Definition at line 2295 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, ACE_Message_Block::cont(), OpenDDS::RTPS::FLAG_E, get_locator(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, marshal_gaps(), ACE_Message_Block::release(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), and OpenDDS::RTPS::SMHDR_SZ.
Referenced by received().
02298 { 02299 ACE_Message_Block mb(RTPS::INFO_DST_SZ + RTPS::SMHDR_SZ); 02300 Serializer ser(&mb, false, Serializer::ALIGN_CDR); 02301 RTPS::InfoDestinationSubmessage info_dst = { 02302 {RTPS::INFO_DST, RTPS::FLAG_E, RTPS::INFO_DST_SZ}, 02303 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 02304 }; 02305 std::memcpy(info_dst.guidPrefix, reader.guidPrefix, sizeof(GuidPrefix_t)); 02306 ser << info_dst; 02307 mb.cont(marshal_gaps(writer, reader, gaps)); 02308 send_strategy()->send_rtps_control(mb, get_locator(reader)); 02309 mb.cont()->release(); 02310 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_final_acks | ( | const RepoId & | readerid | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 2800 of file RtpsUdpDataLink.cpp.
References lock_, readers_, and send_ack_nacks().
02801 { 02802 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 02803 RtpsReaderMap::iterator rr = readers_.find (readerid); 02804 if (rr != readers_.end ()) { 02805 send_ack_nacks (rr, true); 02806 } 02807 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeat_replies | ( | ) | [private] |
Definition at line 1425 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, interesting_ack_nacks_, lock_, readers_, send_ack_nacks(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), and OpenDDS::RTPS::SMHDR_SZ.
01426 { 01427 using namespace OpenDDS::RTPS; 01428 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01429 01430 for (InterestingAckNackSetType::const_iterator pos = interesting_ack_nacks_.begin(), 01431 limit = interesting_ack_nacks_.end(); 01432 pos != limit; 01433 ++pos) { 01434 01435 SequenceNumber ack; 01436 LongSeq8 bitmap; 01437 bitmap.length(0); 01438 01439 AckNackSubmessage acknack = { 01440 {ACKNACK, 01441 CORBA::Octet(FLAG_E | FLAG_F), 01442 0 /*length*/}, 01443 pos->readerid.entityId, 01444 pos->writerid.entityId, 01445 { // SequenceNumberSet: acking bitmapBase - 1 01446 {ack.getHigh(), ack.getLow()}, 01447 0 /* num_bits */, bitmap 01448 }, 01449 {0 /* acknack count */} 01450 }; 01451 01452 size_t size = 0, padding = 0; 01453 gen_find_size(acknack, size, padding); 01454 acknack.smHeader.submessageLength = 01455 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ; 01456 InfoDestinationSubmessage info_dst = { 01457 {INFO_DST, FLAG_E, INFO_DST_SZ}, 01458 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 01459 }; 01460 gen_find_size(info_dst, size, padding); 01461 01462 ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators? 01463 // byte swapping is handled in the operator<<() implementation 01464 Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR); 01465 std::memcpy(info_dst.guidPrefix, pos->writerid.guidPrefix, 01466 sizeof(GuidPrefix_t)); 01467 ser << info_dst; 01468 // Interoperability note: we used to insert INFO_REPLY submessage here, but 01469 // testing indicated that other DDS implementations didn't accept it. 01470 ser << acknack; 01471 01472 send_strategy()->send_rtps_control(mb_acknack, pos->writer_address); 01473 } 01474 interesting_ack_nacks_.clear(); 01475 01476 for (RtpsReaderMap::iterator rr = readers_.begin(); rr != readers_.end(); 01477 ++rr) { 01478 send_ack_nacks (rr); 01479 } 01480 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats | ( | ) | [private] |
Definition at line 2313 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, config(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpInst::durable_data_timeout_, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), ACE_OS::gettimeofday(), OpenDDS::RTPS::HEARTBEAT, heartbeat_, heartbeat_counts_, OpenDDS::DCPS::RtpsUdpInst::heartbeat_period_, OpenDDS::RTPS::HEARTBEAT_SZ, interesting_readers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, LM_ERROR, LM_INFO, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, lock_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_SET(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::DiscoveryListener::reader_does_not_exist(), send_directed_heartbeats(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::Transport_debug_level, VDBG_LVL, and writers_.
02314 { 02315 OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks; 02316 OPENDDS_VECTOR(TransportQueueElement*) pendingCallbacks; 02317 02318 { 02319 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 02320 02321 if (writers_.empty() && interesting_readers_.empty()) { 02322 heartbeat_->disable(); 02323 } 02324 02325 using namespace OpenDDS::RTPS; 02326 OPENDDS_VECTOR(HeartBeatSubmessage) subm; 02327 OPENDDS_SET(ACE_INET_Addr) recipients; 02328 const ACE_Time_Value now = ACE_OS::gettimeofday(); 02329 02330 RepoIdSet writers_to_advertise; 02331 02332 RtpsUdpInst& config = this->config(); 02333 02334 const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config.heartbeat_period_; 02335 const ACE_Time_Value tv3 = ACE_OS::gettimeofday() - 3 * config.heartbeat_period_; 02336 for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(), 02337 limit = interesting_readers_.end(); 02338 pos != limit; 02339 ++pos) { 02340 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST || 02341 (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) { 02342 recipients.insert(pos->second.address); 02343 writers_to_advertise.insert(pos->second.localid); 02344 } 02345 if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) { 02346 CallbackType callback(pos->first, pos->second); 02347 readerDoesNotExistCallbacks.push_back(callback); 02348 pos->second.status = InterestingRemote::DOES_NOT_EXIST; 02349 } 02350 } 02351 02352 typedef RtpsWriterMap::iterator rw_iter; 02353 for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) { 02354 const bool has_data = !rw->second.send_buff_.is_nil() 02355 && !rw->second.send_buff_->empty(); 02356 bool final = true, has_durable_data = false; 02357 SequenceNumber durable_max; 02358 02359 typedef ReaderInfoMap::iterator ri_iter; 02360 const ri_iter end = rw->second.remote_readers_.end(); 02361 for (ri_iter ri = rw->second.remote_readers_.begin(); ri != end; ++ri) { 02362 if ((has_data || !ri->second.handshake_done_) 02363 && locators_.count(ri->first)) { 02364 recipients.insert(locators_[ri->first].addr_); 02365 if (final && !ri->second.handshake_done_) { 02366 final = false; 02367 } 02368 } 02369 if (!ri->second.durable_data_.empty()) { 02370 const ACE_Time_Value expiration = 02371 ri->second.durable_timestamp_ + config.durable_data_timeout_; 02372 if (now > expiration) { 02373 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator 02374 dd_iter; 02375 for (dd_iter it = ri->second.durable_data_.begin(); 02376 it != ri->second.durable_data_.end(); ++it) { 02377 pendingCallbacks.push_back(it->second); 02378 } 02379 ri->second.durable_data_.clear(); 02380 if (Transport_debug_level > 3) { 02381 const GuidConverter gw(rw->first), gr(ri->first); 02382 VDBG_LVL((LM_INFO, "(%P|%t) RtpsUdpDataLink::send_heartbeats - " 02383 "removed expired durable data for %C -> %C\n", 02384 OPENDDS_STRING(gw).c_str(), OPENDDS_STRING(gr).c_str()), 3); 02385 } 02386 } else { 02387 has_durable_data = true; 02388 if (ri->second.durable_data_.rbegin()->first > durable_max) { 02389 durable_max = ri->second.durable_data_.rbegin()->first; 02390 } 02391 if (locators_.count(ri->first)) { 02392 recipients.insert(locators_[ri->first].addr_); 02393 } 02394 } 02395 } 02396 } 02397 02398 if (!rw->second.elems_not_acked_.empty()) { 02399 final = false; 02400 } 02401 02402 if (writers_to_advertise.count(rw->first)) { 02403 final = false; 02404 writers_to_advertise.erase(rw->first); 02405 } 02406 02407 if (final && !has_data && !has_durable_data) { 02408 continue; 02409 } 02410 02411 const SequenceNumber firstSN = (rw->second.durable_ || !has_data) 02412 ? 1 : rw->second.send_buff_->low(), 02413 lastSN = std::max(durable_max, 02414 has_data ? rw->second.send_buff_->high() : 1); 02415 02416 const HeartBeatSubmessage hb = { 02417 {HEARTBEAT, 02418 CORBA::Octet(FLAG_E | (final ? FLAG_F : 0)), 02419 HEARTBEAT_SZ}, 02420 ENTITYID_UNKNOWN, // any matched reader may be interested in this 02421 rw->first.entityId, 02422 {firstSN.getHigh(), firstSN.getLow()}, 02423 {lastSN.getHigh(), lastSN.getLow()}, 02424 {++heartbeat_counts_[rw->first]} 02425 }; 02426 subm.push_back(hb); 02427 } 02428 02429 for (RepoIdSet::const_iterator pos = writers_to_advertise.begin(), 02430 limit = writers_to_advertise.end(); 02431 pos != limit; 02432 ++pos) { 02433 const SequenceNumber SN = 1; 02434 const HeartBeatSubmessage hb = { 02435 {HEARTBEAT, 02436 FLAG_E, 02437 HEARTBEAT_SZ}, 02438 ENTITYID_UNKNOWN, // any matched reader may be interested in this 02439 pos->entityId, 02440 {SN.getHigh(), SN.getLow()}, 02441 {SN.getHigh(), SN.getLow()}, 02442 {++heartbeat_counts_[*pos]} 02443 }; 02444 subm.push_back(hb); 02445 } 02446 02447 send_directed_heartbeats(subm); 02448 02449 if (!subm.empty()) { 02450 ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * subm.size()); //FUTURE: allocators? 02451 // byte swapping is handled in the operator<<() implementation 02452 Serializer ser(&mb, false, Serializer::ALIGN_CDR); 02453 bool send_ok = true; 02454 for (size_t i = 0; i < subm.size(); ++i) { 02455 if (!(ser << subm[i])) { 02456 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats() - " 02457 "failed to serialize HEARTBEAT submessage %B\n", i)); 02458 send_ok = false; 02459 break; 02460 } 02461 } 02462 if (send_ok) { 02463 send_strategy()->send_rtps_control(mb, recipients); 02464 } 02465 } 02466 } 02467 02468 for (OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks.begin(); 02469 iter != readerDoesNotExistCallbacks.end(); ++iter){ 02470 const InterestingRemote& remote = iter->second; 02471 remote.listener->reader_does_not_exist(iter->first, remote.localid); 02472 } 02473 02474 for (size_t i = 0; i < pendingCallbacks.size(); ++i) { 02475 pendingCallbacks[i]->data_dropped(); 02476 } 02477 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual | ( | const TransportSendControlElement * | tsce | ) | [private] |
Definition at line 2560 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, best_effort_heartbeat_count_, config(), OpenDDS::DCPS::RtpsUdpInst::durable_data_timeout_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::FLAG_L, get_locators(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), ACE_OS::gettimeofday(), OpenDDS::RTPS::HEARTBEAT, heartbeat_counts_, OpenDDS::RTPS::HEARTBEAT_SZ, LM_ERROR, OPENDDS_SET(), OpenDDS::DCPS::TransportSendControlElement::publication_id(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::DCPS::TransportSendControlElement::sequence(), OpenDDS::RTPS::SMHDR_SZ, and writers_.
Referenced by customize_queue_element().
02561 { 02562 using namespace OpenDDS::RTPS; 02563 02564 const RepoId pub_id = tsce->publication_id(); 02565 02566 // Populate the recipients. 02567 OPENDDS_SET(ACE_INET_Addr) recipients; 02568 get_locators (pub_id, recipients); 02569 if (recipients.empty()) { 02570 return; 02571 } 02572 02573 // Populate the sequence numbers and counter. 02574 02575 SequenceNumber firstSN, lastSN; 02576 CORBA::Long counter; 02577 RtpsWriterMap::iterator pos = writers_.find (pub_id); 02578 if (pos != writers_.end ()) { 02579 // Reliable. 02580 const bool has_data = !pos->second.send_buff_.is_nil() && !pos->second.send_buff_->empty(); 02581 SequenceNumber durable_max; 02582 const ACE_Time_Value now = ACE_OS::gettimeofday(); 02583 for (ReaderInfoMap::const_iterator ri = pos->second.remote_readers_.begin(), end = pos->second.remote_readers_.end(); 02584 ri != end; 02585 ++ri) { 02586 if (!ri->second.durable_data_.empty()) { 02587 const ACE_Time_Value expiration = ri->second.durable_timestamp_ + config().durable_data_timeout_; 02588 if (now <= expiration && 02589 ri->second.durable_data_.rbegin()->first > durable_max) { 02590 durable_max = ri->second.durable_data_.rbegin()->first; 02591 } 02592 } 02593 } 02594 firstSN = (pos->second.durable_ || !has_data) ? 1 : pos->second.send_buff_->low(); 02595 lastSN = std::max(durable_max, has_data ? pos->second.send_buff_->high() : 1); 02596 counter = ++heartbeat_counts_[pos->first]; 02597 } else { 02598 // Unreliable. 02599 firstSN = 1; 02600 lastSN = tsce->sequence(); 02601 counter = ++this->best_effort_heartbeat_count_; 02602 } 02603 02604 const HeartBeatSubmessage hb = { 02605 {HEARTBEAT, 02606 CORBA::Octet(FLAG_E | FLAG_F | FLAG_L), 02607 HEARTBEAT_SZ}, 02608 ENTITYID_UNKNOWN, // any matched reader may be interested in this 02609 pub_id.entityId, 02610 {firstSN.getHigh(), firstSN.getLow()}, 02611 {lastSN.getHigh(), lastSN.getLow()}, 02612 {counter} 02613 }; 02614 02615 ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * 1); //FUTURE: allocators? 02616 // byte swapping is handled in the operator<<() implementation 02617 Serializer ser(&mb, false, Serializer::ALIGN_CDR); 02618 if ((ser << hb)) { 02619 send_strategy()->send_rtps_control(mb, recipients); 02620 } 02621 else { 02622 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats_manual() - " 02623 "failed to serialize HEARTBEAT submessage\n")); 02624 } 02625 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_i | ( | TransportQueueElement * | element, | |
bool | relink = true | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 484 of file RtpsUdpDataLink.cpp.
References lock_.
00485 { 00486 // Lock here to maintain the locking order: 00487 // RtpsUdpDataLink before RtpsUdpSendStrategy 00488 // which is required for resending due to nacks 00489 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00490 DataLink::send_i(element, relink); 00491 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies | ( | ) | [private] |
Definition at line 1889 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, lock_, marshal_gaps(), OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_SET(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations(), process_requested_changes(), ACE_Message_Block::release(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::SingleSendBuffer::resend_i(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, send_directed_nack_replies(), send_nackfrag_replies(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::DCPS::TransportSendBuffer::strategy_lock(), OpenDDS::DCPS::Transport_debug_level, and writers_.
01890 { 01891 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01892 // Reply from local DW to remote DR: GAP or DATA 01893 using namespace OpenDDS::RTPS; 01894 typedef RtpsWriterMap::iterator rw_iter; 01895 for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) { 01896 01897 // consolidate requests from N readers 01898 OPENDDS_SET(ACE_INET_Addr) recipients; 01899 DisjointSequence requests; 01900 RtpsWriter& writer = rw->second; 01901 01902 //track if any messages have been fully acked by all readers 01903 SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE; 01904 01905 #if defined(OPENDDS_SECURITY) 01906 const EntityId_t& pvs_writer = 01907 RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER; 01908 const bool is_pvs_writer = 01909 0 == std::memcmp(&pvs_writer, &rw->first.entityId, sizeof pvs_writer); 01910 #endif 01911 01912 typedef ReaderInfoMap::iterator ri_iter; 01913 const ri_iter end = writer.remote_readers_.end(); 01914 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) { 01915 01916 if (ri->second.cur_cumulative_ack_ < all_readers_ack) { 01917 all_readers_ack = ri->second.cur_cumulative_ack_; 01918 } 01919 01920 #if defined(OPENDDS_SECURITY) 01921 if (is_pvs_writer && !ri->second.requested_changes_.empty()) { 01922 send_directed_nack_replies(rw->first, writer, ri->first, ri->second); 01923 continue; 01924 } 01925 #endif 01926 01927 process_requested_changes(requests, writer, ri->second); 01928 01929 if (!ri->second.requested_changes_.empty()) { 01930 if (locators_.count(ri->first)) { 01931 recipients.insert(locators_[ri->first].addr_); 01932 if (Transport_debug_level > 5) { 01933 const GuidConverter local_conv(rw->first), remote_conv(ri->first); 01934 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies " 01935 "local %C remote %C requested resend\n", 01936 OPENDDS_STRING(local_conv).c_str(), 01937 OPENDDS_STRING(remote_conv).c_str())); 01938 } 01939 } 01940 ri->second.requested_changes_.clear(); 01941 } 01942 } 01943 01944 DisjointSequence gaps; 01945 if (!requests.empty()) { 01946 if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) { 01947 gaps = requests; 01948 } else { 01949 OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges(); 01950 SingleSendBuffer& sb = *writer.send_buff_; 01951 ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock()); 01952 const RtpsUdpSendStrategy::OverrideToken ot = 01953 send_strategy()->override_destinations(recipients); 01954 for (size_t i = 0; i < ranges.size(); ++i) { 01955 if (Transport_debug_level > 5) { 01956 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies " 01957 "resend data %d-%d\n", int(ranges[i].first.getValue()), 01958 int(ranges[i].second.getValue()))); 01959 } 01960 sb.resend_i(ranges[i], &gaps); 01961 } 01962 } 01963 } 01964 01965 send_nackfrag_replies(writer, gaps, recipients); 01966 01967 if (!gaps.empty()) { 01968 if (Transport_debug_level > 5) { 01969 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies " 01970 "GAPs:")); 01971 gaps.dump(); 01972 } 01973 ACE_Message_Block* mb_gap = 01974 marshal_gaps(rw->first, GUID_UNKNOWN, gaps, writer.durable_); 01975 if (mb_gap) { 01976 send_strategy()->send_rtps_control(*mb_gap, recipients); 01977 mb_gap->release(); 01978 } 01979 } 01980 if (all_readers_ack == SequenceNumber::MAX_VALUE) { 01981 continue; 01982 } 01983 } 01984 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_nackfrag_replies | ( | RtpsWriter & | writer, | |
DisjointSequence & | gaps, | |||
OPENDDS_SET(ACE_INET_Addr)& | gap_recipients | |||
) | [private] |
Definition at line 1987 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::OPENDDS_MAP(), OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, and send_strategy().
Referenced by send_nack_replies().
01990 { 01991 typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo; 01992 OPENDDS_MAP(ACE_INET_Addr, FragmentInfo) requests; 01993 01994 typedef ReaderInfoMap::iterator ri_iter; 01995 const ri_iter end = writer.remote_readers_.end(); 01996 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) { 01997 01998 if (ri->second.requested_frags_.empty() || !locators_.count(ri->first)) { 01999 continue; 02000 } 02001 02002 const ACE_INET_Addr& remote_addr = locators_[ri->first].addr_; 02003 02004 typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet)::iterator rf_iter; 02005 const rf_iter rf_end = ri->second.requested_frags_.end(); 02006 for (rf_iter rf = ri->second.requested_frags_.begin(); rf != rf_end; ++rf) { 02007 02008 const SequenceNumber& seq = rf->first; 02009 if (writer.send_buff_->contains(seq)) { 02010 FragmentInfo& fi = requests[remote_addr]; 02011 fi[seq].insert(rf->second.bitmapBase.value, rf->second.numBits, 02012 rf->second.bitmap.get_buffer()); 02013 } else { 02014 gaps.insert(seq); 02015 gap_recipients.insert(remote_addr); 02016 } 02017 } 02018 ri->second.requested_frags_.clear(); 02019 } 02020 02021 typedef OPENDDS_MAP(ACE_INET_Addr, FragmentInfo)::iterator req_iter; 02022 for (req_iter req = requests.begin(); req != requests.end(); ++req) { 02023 const FragmentInfo& fi = req->second; 02024 02025 ACE_GUARD(TransportSendBuffer::LockType, guard, 02026 writer.send_buff_->strategy_lock()); 02027 const RtpsUdpSendStrategy::OverrideToken ot = 02028 send_strategy()->override_destinations(req->first); 02029 02030 for (FragmentInfo::const_iterator sn_iter = fi.begin(); 02031 sn_iter != fi.end(); ++sn_iter) { 02032 const SequenceNumber& seq = sn_iter->first; 02033 writer.send_buff_->resend_fragments_i(seq, sn_iter->second); 02034 } 02035 } 02036 }
OpenDDS::DCPS::RtpsUdpSendStrategy * OpenDDS::DCPS::RtpsUdpDataLink::send_strategy | ( | ) | [private] |
Definition at line 2859 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::DataLink::send_strategy_.
Referenced by customize_queue_element(), durability_resend(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert(), open(), send_ack_nacks(), send_directed_heartbeats(), send_directed_nack_replies(), send_durability_gaps(), send_heartbeat_replies(), send_heartbeats(), send_heartbeats_manual(), send_nack_replies(), and send_nackfrag_replies().
02860 { 02861 return static_cast<OpenDDS::DCPS::RtpsUdpSendStrategy*>(send_strategy_.in()); 02862 }
void OpenDDS::DCPS::RtpsUdpDataLink::stop_i | ( | ) | [private, virtual] |
This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 586 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::cancel(), ACE_SOCK::close(), heartbeat_, heartbeat_reply_, multicast_socket_, nack_reply_, and unicast_socket_.
Referenced by open().
00587 { 00588 nack_reply_.cancel(); 00589 heartbeat_reply_.cancel(); 00590 heartbeat_->disable(); 00591 unicast_socket_.close(); 00592 multicast_socket_.close(); 00593 }
ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket | ( | ) |
Definition at line 30 of file RtpsUdpDataLink.inl.
References unicast_socket_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().
00031 { 00032 return unicast_socket_; 00033 }
void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_reader | ( | const RepoId & | writerid, | |
const RepoId & | readerid | |||
) |
Definition at line 378 of file RtpsUdpDataLink.cpp.
References interesting_readers_, and lock_.
00380 { 00381 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00382 for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid), 00383 limit = interesting_readers_.upper_bound(readerid); 00384 pos != limit; 00385 ) { 00386 if (pos->second.localid == writerid) { 00387 interesting_readers_.erase(pos++); 00388 } else { 00389 ++pos; 00390 } 00391 } 00392 }
void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_writer | ( | const RepoId & | readerid, | |
const RepoId & | writerid | |||
) |
Definition at line 410 of file RtpsUdpDataLink.cpp.
References interesting_writers_, and lock_.
00412 { 00413 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00414 for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid), 00415 limit = interesting_writers_.upper_bound(writerid); 00416 pos != limit; 00417 ) { 00418 if (pos->second.localid == readerid) { 00419 interesting_writers_.erase(pos++); 00420 } else { 00421 ++pos; 00422 } 00423 } 00424 }
friend class ::DDS_TEST [friend] |
Definition at line 162 of file RtpsUdpDataLink.h.
Definition at line 378 of file RtpsUdpDataLink.h.
Referenced by send_heartbeats_manual().
bool OpenDDS::DCPS::RtpsUdpDataLink::force_inline_qos_ = false [static, private] |
static member used by testing code to force inline qos
Definition at line 164 of file RtpsUdpDataLink.h.
Referenced by requires_inline_qos().
Definition at line 453 of file RtpsUdpDataLink.h.
Referenced by associated(), register_for_reader(), send_heartbeats(), and stop_i().
HeartBeatCountMapType OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_counts_ [private] |
Definition at line 485 of file RtpsUdpDataLink.h.
Referenced by associated(), pre_stop_i(), register_for_reader(), release_reservations_i(), send_heartbeats(), and send_heartbeats_manual().
OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_reply_ [private] |
Referenced by received(), and stop_i().
Definition at line 453 of file RtpsUdpDataLink.h.
Referenced by register_for_writer().
Definition at line 531 of file RtpsUdpDataLink.h.
Referenced by deliver_held_data().
InterestingAckNackSetType OpenDDS::DCPS::RtpsUdpDataLink::interesting_ack_nacks_ [private] |
Definition at line 508 of file RtpsUdpDataLink.h.
Referenced by received(), and send_heartbeat_replies().
InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_readers_ [private] |
Definition at line 478 of file RtpsUdpDataLink.h.
Referenced by received(), register_for_reader(), send_heartbeats(), and unregister_for_reader().
InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_writers_ [private] |
Definition at line 479 of file RtpsUdpDataLink.h.
Referenced by check_heartbeats(), received(), register_for_writer(), and unregister_for_writer().
Definition at line 177 of file RtpsUdpDataLink.h.
Referenced by received(), RtpsUdpDataLink(), and send_directed_heartbeats().
ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::lock_ [mutable, private] |
lock_ protects data structures accessed by both the transport's thread (TransportReactorTask) and an external thread which is responsible for adding/removing associations from the DataLink.
Definition at line 294 of file RtpsUdpDataLink.h.
Referenced by add_locator(), associated(), check_heartbeats(), customize_queue_element(), pre_stop_i(), received(), register_for_reader(), register_for_writer(), release_remote_i(), release_reservations_i(), remove_sample(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::retain_all(), send_final_acks(), send_heartbeat_replies(), send_heartbeats(), send_i(), send_nack_replies(), unregister_for_reader(), and unregister_for_writer().
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer OpenDDS::DCPS::RtpsUdpDataLink::multi_buff_ [private] |
Referenced by open().
Definition at line 189 of file RtpsUdpDataLink.h.
Referenced by multicast_socket(), open(), and stop_i().
Referenced by received(), and stop_i().
Definition at line 172 of file RtpsUdpDataLink.h.
Referenced by get_reactor(), OpenDDS::DCPS::RtpsUdpDataLink::HeldDataDeliveryHandler::handle_exception(), OpenDDS::DCPS::RtpsUdpDataLink::HeldDataDeliveryHandler::notify_delivery(), and reactor_is_shut_down().
RtpsReaderIndex OpenDDS::DCPS::RtpsUdpDataLink::reader_index_ [private] |
Definition at line 287 of file RtpsUdpDataLink.h.
Referenced by associated(), and release_reservations_i().
RtpsReaderMap OpenDDS::DCPS::RtpsUdpDataLink::readers_ [private] |
Definition at line 283 of file RtpsUdpDataLink.h.
Referenced by associated(), received(), release_reservations_i(), send_final_acks(), and send_heartbeat_replies().
Definition at line 188 of file RtpsUdpDataLink.h.
Referenced by open(), stop_i(), and unicast_socket().
RtpsWriterMap OpenDDS::DCPS::RtpsUdpDataLink::writers_ [private] |
Definition at line 249 of file RtpsUdpDataLink.h.
Referenced by add_delayed_notification(), add_gap_submsg(), associated(), check_handshake_complete(), customize_queue_element(), do_remove_sample(), end_historic_samples(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert(), marshal_gaps(), pre_stop_i(), process_acked_by_all_i(), received(), release_reservations_i(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::retain_all(), send_directed_heartbeats(), send_heartbeats(), send_heartbeats_manual(), and send_nack_replies().