#include <RtpsUdpDataLink.h>
Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink:
Public Member Functions | |
RtpsUdpDataLink (RtpsUdpTransport *transport, const GuidPrefix_t &local_prefix, RtpsUdpInst *config, TransportReactorTask *reactor_task) | |
void | send_strategy (RtpsUdpSendStrategy *send_strategy) |
void | receive_strategy (RtpsUdpReceiveStrategy *recv_strategy) |
bool | add_delayed_notification (TransportQueueElement *element) |
void | do_remove_sample (const RepoId &pub_id, const TransportQueueElement::MatchCriteria &criteria) |
RtpsUdpInst * | config () |
ACE_Reactor * | get_reactor () |
bool | reactor_is_shut_down () |
ACE_SOCK_Dgram & | unicast_socket () |
ACE_SOCK_Dgram_Mcast & | multicast_socket () |
bool | open (const ACE_SOCK_Dgram &unicast_socket) |
void | received (const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix) |
void | received (const RTPS::GapSubmessage &gap, const GuidPrefix_t &src_prefix) |
void | received (const RTPS::HeartBeatSubmessage &heartbeat, const GuidPrefix_t &src_prefix) |
void | received (const RTPS::HeartBeatFragSubmessage &hb_frag, const GuidPrefix_t &src_prefix) |
void | received (const RTPS::AckNackSubmessage &acknack, const GuidPrefix_t &src_prefix) |
void | received (const RTPS::NackFragSubmessage &nackfrag, const GuidPrefix_t &src_prefix) |
const GuidPrefix_t & | local_prefix () const |
void | add_locator (const RepoId &remote_id, const ACE_INET_Addr &address, bool requires_inline_qos) |
void | get_locators (const RepoId &local_id, OPENDDS_SET(ACE_INET_Addr)&addrs) const |
ACE_INET_Addr | get_locator (const RepoId &remote_id) const |
void | associated (const RepoId &local, const RepoId &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable) |
bool | check_handshake_complete (const RepoId &local, const RepoId &remote) |
void | register_for_reader (const RepoId &writerid, const RepoId &readerid, const ACE_INET_Addr &address, OpenDDS::DCPS::DiscoveryListener *listener) |
void | unregister_for_reader (const RepoId &writerid, const RepoId &readerid) |
void | register_for_writer (const RepoId &readerid, const RepoId &writerid, const ACE_INET_Addr &address, OpenDDS::DCPS::DiscoveryListener *listener) |
void | unregister_for_writer (const RepoId &readerid, const RepoId &writerid) |
virtual void | pre_stop_i () |
virtual void | send_final_acks (const RepoId &readerid) |
Private Types | |
typedef void(RtpsUdpDataLink::*) | PMF () |
typedef std::pair< RepoId, InterestingRemote > | CallbackType |
Private Member Functions | |
virtual void | stop_i () |
virtual void | send_i (TransportQueueElement *element, bool relink=true) |
virtual TransportQueueElement * | customize_queue_element (TransportQueueElement *element) |
virtual void | release_remote_i (const RepoId &remote_id) |
virtual void | release_reservations_i (const RepoId &remote_id, const RepoId &local_id) |
bool | requires_inline_qos (const PublicationId &pub_id) |
typedef | OPENDDS_MAP_CMP (RepoId, OPENDDS_VECTOR(RepoId), GUID_tKeyLessThan) DestToEntityMap |
void | add_gap_submsg (RTPS::SubmessageSeq &msg, const TransportQueueElement &tqe, const DestToEntityMap &dtem) |
OPENDDS_MAP_CMP (RepoId, RemoteInfo, GUID_tKeyLessThan) locators_ | |
typedef | OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) ReaderInfoMap |
typedef | OPENDDS_MAP_CMP (RepoId, RtpsWriter, GUID_tKeyLessThan) RtpsWriterMap |
void | end_historic_samples (RtpsWriterMap::iterator writer, const DataSampleHeader &header, ACE_Message_Block *body) |
typedef | OPENDDS_MAP_CMP (RepoId, WriterInfo, GUID_tKeyLessThan) WriterInfoMap |
typedef | OPENDDS_MAP_CMP (RepoId, RtpsReader, GUID_tKeyLessThan) RtpsReaderMap |
typedef | OPENDDS_MULTIMAP_CMP (RepoId, RtpsReaderMap::iterator, GUID_tKeyLessThan) RtpsReaderIndex |
void | deliver_held_data (const RepoId &readerId, WriterInfo &info, bool durable) |
size_t | generate_nack_frags (OPENDDS_VECTOR(RTPS::NackFragSubmessage)&nack_frags, WriterInfo &wi, const RepoId &pub_id) |
bool | process_heartbeat_i (const RTPS::HeartBeatSubmessage &heartbeat, const RepoId &src, RtpsReaderMap::value_type &rr) |
bool | process_hb_frag_i (const RTPS::HeartBeatFragSubmessage &hb_frag, const RepoId &src, RtpsReaderMap::value_type &rr) |
bool | process_gap_i (const RTPS::GapSubmessage &gap, const RepoId &src, RtpsReaderMap::value_type &rr) |
bool | process_data_i (const RTPS::DataSubmessage &data, const RepoId &src, RtpsReaderMap::value_type &rr) |
void | durability_resend (TransportQueueElement *element) |
void | send_durability_gaps (const RepoId &writer, const RepoId &reader, const DisjointSequence &gaps) |
ACE_Message_Block * | marshal_gaps (const RepoId &writer, const RepoId &reader, const DisjointSequence &gaps, bool durable=false) |
void | send_nackfrag_replies (RtpsWriter &writer, DisjointSequence &gaps, OPENDDS_SET(ACE_INET_Addr)&gap_recipients) |
template<typename T, typename FN> | |
void | datareader_dispatch (const T &submessage, const GuidPrefix_t &src_prefix, const FN &func) |
void | send_nack_replies () |
void | process_acked_by_all_i (ACE_Guard< ACE_Thread_Mutex > &g, const RepoId &pub_id) |
void | send_heartbeats () |
void | check_heartbeats () |
void | send_heartbeats_manual (const TransportSendControlElement *tsce) |
void | send_heartbeat_replies () |
typedef | OPENDDS_MULTIMAP_CMP (RepoId, InterestingRemote, DCPS::GUID_tKeyLessThan) InterestingRemoteMapType |
OPENDDS_VECTOR (CallbackType) writerDoesNotExistCallbacks_ | |
OPENDDS_VECTOR (CallbackType) readerDoesNotExistCallbacks_ | |
typedef | OPENDDS_MAP_CMP (RepoId, CORBA::Long, DCPS::GUID_tKeyLessThan) HeartBeatCountMapType |
typedef | OPENDDS_SET (InterestingAckNack) InterestingAckNackSetType |
void | send_ack_nacks (RtpsReaderMap::iterator rr, bool finalFlag=false) |
Static Private Member Functions | |
static void | extend_bitmap_range (RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent) |
Private Attributes | |
RtpsUdpInst * | config_ |
TransportReactorTask_rch | reactor_task_ |
RtpsUdpSendStrategy_rch | send_strategy_ |
The transport send strategy object for this DataLink. | |
RtpsUdpReceiveStrategy_rch | recv_strategy_ |
GuidPrefix_t | local_prefix_ |
ACE_SOCK_Dgram | unicast_socket_ |
ACE_SOCK_Dgram_Mcast | multicast_socket_ |
RtpsCustomizedElementAllocator | rtps_customized_element_allocator_ |
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer | multi_buff_ |
RtpsWriterMap | writers_ |
RtpsReaderMap | readers_ |
RtpsReaderIndex | reader_index_ |
ACE_Thread_Mutex | lock_ |
CORBA::Long | best_effort_heartbeat_count_ |
OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay | nack_reply_ |
OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay | heartbeat_reply_ |
OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat | heartbeat_ |
OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat | heartbeatchecker_ |
InterestingRemoteMapType | interesting_readers_ |
InterestingRemoteMapType | interesting_writers_ |
ACE_Thread_Mutex | writer_no_longer_exists_lock_ |
ACE_Thread_Mutex | reader_no_longer_exists_lock_ |
HeartBeatCountMapType | heartbeat_counts_ |
InterestingAckNackSetType | interesting_ack_nacks_ |
Static Private Attributes | |
static bool | force_inline_qos_ = false |
static member used by testing code to force inline qos | |
Friends | |
class | ::DDS_TEST |
Classes | |
struct | HeartBeat |
struct | InterestingAckNack |
struct | InterestingRemote |
Data structure representing an "interesting" remote entity for static discovery. More... | |
struct | MultiSendBuffer |
struct | ReaderInfo |
struct | RemoteInfo |
struct | RtpsReader |
struct | RtpsWriter |
struct | TimedDelay |
struct | WriterInfo |
Definition at line 44 of file RtpsUdpDataLink.h.
typedef std::pair<RepoId, InterestingRemote> OpenDDS::DCPS::RtpsUdpDataLink::CallbackType [private] |
Definition at line 452 of file RtpsUdpDataLink.h.
typedef void(RtpsUdpDataLink::*) OpenDDS::DCPS::RtpsUdpDataLink::PMF() [private] |
Definition at line 350 of file RtpsUdpDataLink.h.
OpenDDS::DCPS::RtpsUdpDataLink::RtpsUdpDataLink | ( | RtpsUdpTransport * | transport, | |
const GuidPrefix_t & | local_prefix, | |||
RtpsUdpInst * | config, | |||
TransportReactorTask * | reactor_task | |||
) |
Definition at line 51 of file RtpsUdpDataLink.cpp.
References local_prefix_.
00055 : DataLink(transport, // 3 data link "attributes", below, are unused 00056 0, // priority 00057 false, // is_loopback 00058 false), // is_active 00059 config_(config), 00060 reactor_task_(reactor_task, false), 00061 rtps_customized_element_allocator_(40, sizeof(RtpsCustomizedElement)), 00062 multi_buff_(this, config->nak_depth_), 00063 best_effort_heartbeat_count_(0), 00064 nack_reply_(this, &RtpsUdpDataLink::send_nack_replies, 00065 config->nak_response_delay_), 00066 heartbeat_reply_(this, &RtpsUdpDataLink::send_heartbeat_replies, 00067 config->heartbeat_response_delay_), 00068 heartbeat_(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::send_heartbeats), 00069 heartbeatchecker_(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::check_heartbeats) 00070 { 00071 std::memcpy(local_prefix_, local_prefix, sizeof(GuidPrefix_t)); 00072 }
bool OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification | ( | TransportQueueElement * | element | ) |
Definition at line 75 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::TransportQueueElement::publication_id(), and writers_.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification().
00076 { 00077 RtpsWriterMap::iterator iter = writers_.find(element->publication_id()); 00078 if (iter != writers_.end()) { 00079 00080 iter->second.add_elem_awaiting_ack(element); 00081 return true; 00082 } 00083 return false; 00084 }
void OpenDDS::DCPS::RtpsUdpDataLink::add_gap_submsg | ( | RTPS::SubmessageSeq & | msg, | |
const TransportQueueElement & | tqe, | |||
const DestToEntityMap & | dtem | |||
) | [private] |
Definition at line 830 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::expected_, OpenDDS::RTPS::GAP, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::TransportQueueElement::subscription_id(), and writers_.
Referenced by customize_queue_element().
00833 { 00834 // These are the GAP submessages that we'll send directly in-line with the 00835 // DATA when we notice that the DataWriter has deliberately skipped seq #s. 00836 // There are other GAP submessages generated in response to reader ACKNACKS, 00837 // see send_nack_replies(). 00838 using namespace OpenDDS::RTPS; 00839 00840 const SequenceNumber seq = tqe.sequence(); 00841 const RepoId pub = tqe.publication_id(); 00842 if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || pub == GUID_UNKNOWN 00843 || tqe.subscription_id() != GUID_UNKNOWN) { 00844 return; 00845 } 00846 00847 const RtpsWriterMap::iterator wi = writers_.find(pub); 00848 if (wi == writers_.end()) { 00849 return; // not a reliable writer, does not send GAPs 00850 } 00851 00852 RtpsWriter& rw = wi->second; 00853 00854 if (seq != rw.expected_) { 00855 SequenceNumber firstMissing = rw.expected_; 00856 00857 // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range 00858 // [gapStart, gapListBase) and those in the SNSet. 00859 const SequenceNumber_t gapStart = {firstMissing.getHigh(), 00860 firstMissing.getLow()}, 00861 gapListBase = {seq.getHigh(), 00862 seq.getLow()}; 00863 00864 // We are not going to enable any bits in the "bitmap" of the SNSet, 00865 // but the "numBits" and the bitmap.length must both be > 0. 00866 LongSeq8 bitmap; 00867 bitmap.length(1); 00868 bitmap[0] = 0; 00869 00870 GapSubmessage gap = { 00871 {GAP, 1 /*FLAG_E*/, 0 /*length determined below*/}, 00872 ENTITYID_UNKNOWN, // readerId: applies to all matched readers 00873 pub.entityId, 00874 gapStart, 00875 {gapListBase, 1, bitmap} 00876 }; 00877 00878 size_t size = 0, padding = 0; 00879 gen_find_size(gap, size, padding); 00880 gap.smHeader.submessageLength = 00881 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ; 00882 00883 if (!rw.durable_) { 00884 const CORBA::ULong i = msg.length(); 00885 msg.length(i + 1); 00886 msg[i].gap_sm(gap); 00887 } else { 00888 InfoDestinationSubmessage idst = { 00889 {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ}, 00890 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 00891 }; 00892 CORBA::ULong ml = msg.length(); 00893 00894 //Change the non-directed Gap into multiple directed gaps to prevent 00895 //delivering to currently undiscovered durable readers 00896 DestToEntityMap::const_iterator iter = dtem.begin(); 00897 while (iter != dtem.end()) { 00898 std::memcpy(idst.guidPrefix, iter->first.guidPrefix, sizeof(GuidPrefix_t)); 00899 msg.length(ml + 1); 00900 msg[ml++].info_dst_sm(idst); 00901 00902 const OPENDDS_VECTOR(RepoId)& readers = iter->second; 00903 for (size_t i = 0; i < readers.size(); ++i) { 00904 gap.readerId = readers.at(i).entityId; 00905 msg.length(ml + 1); 00906 msg[ml++].gap_sm(gap); 00907 } //END iter over reader entity ids 00908 ++iter; 00909 } //END iter over reader GuidPrefix_t's 00910 } 00911 } 00912 }
void OpenDDS::DCPS::RtpsUdpDataLink::add_locator | ( | const RepoId & | remote_id, | |
const ACE_INET_Addr & | address, | |||
bool | requires_inline_qos | |||
) |
Definition at line 178 of file RtpsUdpDataLink.cpp.
References lock_.
00181 { 00182 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00183 locators_[remote_id] = RemoteInfo(address, requires_inline_qos); 00184 }
void OpenDDS::DCPS::RtpsUdpDataLink::associated | ( | const RepoId & | local, | |
const RepoId & | remote, | |||
bool | local_reliable, | |||
bool | remote_reliable, | |||
bool | local_durable, | |||
bool | remote_durable | |||
) |
Definition at line 226 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::GuidConverter::entityKind(), heartbeat_, heartbeat_counts_, OpenDDS::DCPS::KIND_READER, OpenDDS::DCPS::KIND_WRITER, lock_, reader_index_, readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::schedule_enable(), and writers_.
00229 { 00230 if (!local_reliable) { 00231 return; 00232 } 00233 00234 bool enable_heartbeat = false; 00235 00236 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00237 const GuidConverter conv(local_id); 00238 const EntityKind kind = conv.entityKind(); 00239 if (kind == KIND_WRITER && remote_reliable) { 00240 // Insert count if not already there. 00241 heartbeat_counts_.insert(HeartBeatCountMapType::value_type(local_id, 0)); 00242 RtpsWriter& w = writers_[local_id]; 00243 w.remote_readers_[remote_id].durable_ = remote_durable; 00244 w.durable_ = local_durable; 00245 enable_heartbeat = true; 00246 00247 } else if (kind == KIND_READER) { 00248 RtpsReaderMap::iterator rr = readers_.find(local_id); 00249 if (rr == readers_.end()) { 00250 rr = readers_.insert(RtpsReaderMap::value_type(local_id, RtpsReader())) 00251 .first; 00252 rr->second.durable_ = local_durable; 00253 } 00254 rr->second.remote_writers_[remote_id]; 00255 reader_index_.insert(RtpsReaderIndex::value_type(remote_id, rr)); 00256 } 00257 00258 g.release(); 00259 if (enable_heartbeat) { 00260 heartbeat_.schedule_enable(); 00261 } 00262 }
bool OpenDDS::DCPS::RtpsUdpDataLink::check_handshake_complete | ( | const RepoId & | local, | |
const RepoId & | remote | |||
) |
Definition at line 265 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::GuidConverter::entityKind(), OpenDDS::DCPS::KIND_READER, OpenDDS::DCPS::KIND_WRITER, and writers_.
00267 { 00268 const GuidConverter conv(local_id); 00269 const EntityKind kind = conv.entityKind(); 00270 if (kind == KIND_WRITER) { 00271 RtpsWriterMap::iterator rw = writers_.find(local_id); 00272 if (rw == writers_.end()) { 00273 return true; // not reliable, no handshaking 00274 } 00275 ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote_id); 00276 if (ri == rw->second.remote_readers_.end()) { 00277 return true; // not reliable, no handshaking 00278 } 00279 return ri->second.handshake_done_; 00280 00281 } else if (kind == KIND_READER) { 00282 return true; // no handshaking for local reader 00283 } 00284 return false; 00285 }
void OpenDDS::DCPS::RtpsUdpDataLink::check_heartbeats | ( | ) | [private] |
Definition at line 2352 of file RtpsUdpDataLink.cpp.
References config_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::DCPS::RtpsUdpInst::heartbeat_period_, interesting_writers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, lock_, OPENDDS_VECTOR(), OpenDDS::DCPS::DiscoveryListener::writer_does_not_exist(), and writer_no_longer_exists_lock_.
02353 { 02354 // Have any interesting writers timed out? 02355 const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config_->heartbeat_period_; 02356 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 02357 ACE_GUARD(ACE_Thread_Mutex, c, writer_no_longer_exists_lock_); 02358 for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end(); 02359 pos != limit; 02360 ++pos) { 02361 if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) { 02362 CallbackType callback(pos->first, pos->second); 02363 writerDoesNotExistCallbacks_.push_back(callback); 02364 pos->second.status = InterestingRemote::DOES_NOT_EXIST; 02365 } 02366 } 02367 c.release(); 02368 g.release(); 02369 while(true) { 02370 c.acquire(); 02371 if (writerDoesNotExistCallbacks_.empty()) { 02372 break; 02373 } 02374 OPENDDS_VECTOR(CallbackType)::iterator iter = writerDoesNotExistCallbacks_.begin(); 02375 const RepoId& rid = iter->first; 02376 const InterestingRemote& remote = iter->second; 02377 writerDoesNotExistCallbacks_.erase(iter); 02378 c.release(); 02379 remote.listener->writer_does_not_exist(rid, remote.localid); 02380 } 02381 }
ACE_INLINE RtpsUdpInst * OpenDDS::DCPS::RtpsUdpDataLink::config | ( | ) |
Definition at line 24 of file RtpsUdpDataLink.inl.
References config_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().
00025 { 00026 return config_; 00027 }
TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element | ( | TransportQueueElement * | element | ) | [private, virtual] |
Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 629 of file RtpsUdpDataLink.cpp.
References add_gap_submsg(), OpenDDS::DCPS::RtpsCustomizedElement::alloc(), OpenDDS::DCPS::RtpsSampleHeader::control_message_supported(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, end_historic_samples(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::historic_sample_, lock_, OpenDDS::DCPS::TransportQueueElement::msg(), OPENDDS_STRING, OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::TransportQueueElement::publication_id(), requires_inline_qos(), rtps_customized_element_allocator_, OpenDDS::DCPS::TransportSendElement::sample(), send_heartbeats_manual(), OpenDDS::DCPS::RtpsCustomizedElement::sequence(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN, OpenDDS::DCPS::submsgs_to_msgblock(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::Transport_debug_level, and writers_.
00630 { 00631 const ACE_Message_Block* msg = element->msg(); 00632 if (!msg) { 00633 return element; 00634 } 00635 00636 const RepoId pub_id = element->publication_id(); 00637 00638 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, 0); 00639 00640 RTPS::SubmessageSeq subm; 00641 00642 const RtpsWriterMap::iterator rw = writers_.find(pub_id); 00643 00644 bool gap_ok = true; 00645 DestToEntityMap gap_receivers; 00646 if (rw != writers_.end() && !rw->second.remote_readers_.empty()) { 00647 for (ReaderInfoMap::iterator ri = rw->second.remote_readers_.begin(); 00648 ri != rw->second.remote_readers_.end(); ++ri) { 00649 RepoId tmp; 00650 std::memcpy(tmp.guidPrefix, ri->first.guidPrefix, sizeof(GuidPrefix_t)); 00651 tmp.entityId = ENTITYID_UNKNOWN; 00652 gap_receivers[tmp].push_back(ri->first); 00653 00654 if (ri->second.expecting_durable_data()) { 00655 // Can't add an in-line GAP if some Data Reader is expecting durable 00656 // data, the GAP could cause that Data Reader to ignore the durable 00657 // data. The other readers will eventually learn about the GAP by 00658 // sending an ACKNACK and getting a GAP reply. 00659 gap_ok = false; 00660 break; 00661 } 00662 } 00663 } 00664 00665 if (gap_ok) { 00666 add_gap_submsg(subm, *element, gap_receivers); 00667 } 00668 00669 const SequenceNumber seq = element->sequence(); 00670 if (rw != writers_.end() && seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 00671 rw->second.expected_ = seq; 00672 ++rw->second.expected_; 00673 } 00674 00675 TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element); 00676 TransportCustomizedElement* tce = 00677 dynamic_cast<TransportCustomizedElement*>(element); 00678 TransportSendControlElement* tsce = 00679 dynamic_cast<TransportSendControlElement*>(element); 00680 00681 ACE_Message_Block* data = 0; 00682 bool durable = false; 00683 00684 // Based on the type of 'element', find and duplicate the data payload 00685 // continuation block. 00686 if (tsce) { // Control message 00687 if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) { 00688 data = msg->cont()->duplicate(); 00689 // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader 00690 RtpsSampleHeader::populate_data_control_submessages( 00691 subm, *tsce, requires_inline_qos(pub_id)); 00692 } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) { 00693 end_historic_samples(rw, tsce->header(), msg->cont()); 00694 element->data_delivered(); 00695 return 0; 00696 } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) { 00697 send_heartbeats_manual(tsce); 00698 element->data_delivered(); 00699 return 0; 00700 } else { 00701 element->data_dropped(true /*dropped_by_transport*/); 00702 return 0; 00703 } 00704 00705 } else if (tse) { // Basic data message 00706 // {DataSampleHeader} -> {Data Payload} 00707 data = msg->cont()->duplicate(); 00708 const DataSampleElement* dsle = tse->sample(); 00709 // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader 00710 RtpsSampleHeader::populate_data_sample_submessages( 00711 subm, *dsle, requires_inline_qos(pub_id)); 00712 durable = dsle->get_header().historic_sample_; 00713 00714 } else if (tce) { // Customized data message 00715 // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload} 00716 data = msg->cont()->cont()->duplicate(); 00717 const DataSampleElement* dsle = tce->original_send_element()->sample(); 00718 // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader 00719 RtpsSampleHeader::populate_data_sample_submessages( 00720 subm, *dsle, requires_inline_qos(pub_id)); 00721 durable = dsle->get_header().historic_sample_; 00722 00723 } else { 00724 return element; 00725 } 00726 00727 ACE_Message_Block* hdr = submsgs_to_msgblock(subm); 00728 hdr->cont(data); 00729 RtpsCustomizedElement* rtps = 00730 RtpsCustomizedElement::alloc(element, hdr, 00731 &rtps_customized_element_allocator_); 00732 00733 // Handle durability resends 00734 if (durable && rw != writers_.end()) { 00735 const RepoId sub = element->subscription_id(); 00736 if (sub != GUID_UNKNOWN) { 00737 ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(sub); 00738 if (ri != rw->second.remote_readers_.end()) { 00739 ri->second.durable_data_[rtps->sequence()] = rtps; 00740 ri->second.durable_timestamp_ = ACE_OS::gettimeofday(); 00741 if (Transport_debug_level > 3) { 00742 const GuidConverter conv(pub_id), sub_conv(sub); 00743 ACE_DEBUG((LM_DEBUG, 00744 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - " 00745 "storing durable data for local %C remote %C\n", 00746 OPENDDS_STRING(conv).c_str(), OPENDDS_STRING(sub_conv).c_str())); 00747 } 00748 return 0; 00749 } 00750 } 00751 } else if (durable && (Transport_debug_level)) { 00752 const GuidConverter conv(pub_id); 00753 ACE_DEBUG((LM_ERROR, 00754 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - " 00755 "WARNING: no RtpsWriter to store durable data for local %C\n", 00756 OPENDDS_STRING(conv).c_str())); 00757 } 00758 00759 return rtps; 00760 }
void OpenDDS::DCPS::RtpsUdpDataLink::datareader_dispatch | ( | const T & | submessage, | |
const GuidPrefix_t & | src_prefix, | |||
const FN & | func | |||
) | [inline, private] |
Definition at line 305 of file RtpsUdpDataLink.h.
References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, and OpenDDS::DCPS::GUID_t::guidPrefix.
Referenced by received().
00307 { 00308 using std::pair; 00309 RepoId local; 00310 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t)); 00311 local.entityId = submessage.readerId; 00312 00313 RepoId src; 00314 std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t)); 00315 src.entityId = submessage.writerId; 00316 00317 bool schedule_timer = false; 00318 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00319 if (local.entityId == ENTITYID_UNKNOWN) { 00320 for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters = 00321 reader_index_.equal_range(src); 00322 iters.first != iters.second; ++iters.first) { 00323 schedule_timer |= (this->*func)(submessage, src, *iters.first->second); 00324 } 00325 00326 } else { 00327 const RtpsReaderMap::iterator rr = readers_.find(local); 00328 if (rr == readers_.end()) { 00329 return; 00330 } 00331 schedule_timer = (this->*func)(submessage, src, *rr); 00332 } 00333 g.release(); 00334 if (schedule_timer) { 00335 heartbeat_reply_.schedule(); 00336 } 00337 }
void OpenDDS::DCPS::RtpsUdpDataLink::deliver_held_data | ( | const RepoId & | readerId, | |
WriterInfo & | info, | |||
bool | durable | |||
) | [private] |
Definition at line 995 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, and OpenDDS::DCPS::Transport_debug_level.
Referenced by process_data_i(), process_gap_i(), and process_heartbeat_i().
00997 { 00998 if (durable && (info.recvd_.empty() || info.recvd_.low() > 1)) return; 00999 const SequenceNumber ca = info.recvd_.cumulative_ack(); 01000 typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter; 01001 const iter end = info.held_.upper_bound(ca); 01002 for (iter it = info.held_.begin(); it != end; /*increment in loop body*/) { 01003 if (Transport_debug_level > 5) { 01004 GuidConverter reader(readerId); 01005 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::deliver_held_data -") 01006 ACE_TEXT(" deliver sequence: %q to %C\n"), 01007 it->second.header_.sequence_.getValue(), 01008 OPENDDS_STRING(reader).c_str())); 01009 } 01010 data_received(it->second, readerId); 01011 info.held_.erase(it++); 01012 } 01013 }
void OpenDDS::DCPS::RtpsUdpDataLink::do_remove_sample | ( | const RepoId & | pub_id, | |
const TransportQueueElement::MatchCriteria & | criteria | |||
) |
Definition at line 86 of file RtpsUdpDataLink.cpp.
References lock_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), OPENDDS_SET(), and writers_.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::do_remove_sample().
00088 { 00089 RtpsWriter::SnToTqeMap sn_tqe_map; 00090 RtpsWriter::SnToTqeMap to_deliver; 00091 typedef RtpsWriter::SnToTqeMap::iterator iter_t; 00092 00093 { 00094 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00095 00096 RtpsWriterMap::iterator iter = writers_.find(pub_id); 00097 if (iter != writers_.end() && !iter->second.elems_not_acked_.empty()) { 00098 to_deliver.insert(iter->second.to_deliver_.begin(), iter->second.to_deliver_.end()); 00099 iter->second.to_deliver_.clear(); 00100 iter_t it = iter->second.elems_not_acked_.begin(); 00101 OPENDDS_SET(SequenceNumber) sns_to_release; 00102 while (it != iter->second.elems_not_acked_.end()) { 00103 if (criteria.matches(*it->second)) { 00104 sn_tqe_map.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second)); 00105 sns_to_release.insert(it->first); 00106 iter_t last = it; 00107 ++it; 00108 iter->second.elems_not_acked_.erase(last); 00109 } else { 00110 ++it; 00111 } 00112 } 00113 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin(); 00114 while (sns_it != sns_to_release.end()) { 00115 iter->second.send_buff_->release_acked(*sns_it); 00116 ++sns_it; 00117 } 00118 } 00119 } 00120 iter_t deliver_iter = to_deliver.begin(); 00121 while (deliver_iter != to_deliver.end()) { 00122 deliver_iter->second->data_delivered(); 00123 ++deliver_iter; 00124 } 00125 iter_t drop_iter = sn_tqe_map.begin(); 00126 while (drop_iter != sn_tqe_map.end()) { 00127 drop_iter->second->data_dropped(true); 00128 ++drop_iter; 00129 } 00130 }
void OpenDDS::DCPS::RtpsUdpDataLink::durability_resend | ( | TransportQueueElement * | element | ) | [private] |
Definition at line 2157 of file RtpsUdpDataLink.cpp.
References get_locator(), OpenDDS::DCPS::TransportQueueElement::msg(), send_strategy_, and OpenDDS::DCPS::TransportQueueElement::subscription_id().
Referenced by received().
02158 { 02159 ACE_Message_Block* msg = const_cast<ACE_Message_Block*>(element->msg()); 02160 send_strategy_->send_rtps_control(*msg, 02161 get_locator(element->subscription_id())); 02162 }
void OpenDDS::DCPS::RtpsUdpDataLink::end_historic_samples | ( | RtpsWriterMap::iterator | writer, | |
const DataSampleHeader & | header, | |||
ACE_Message_Block * | body | |||
) | [private] |
Definition at line 763 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::GUID_UNKNOWN, header, OPENDDS_STRING, OpenDDS::DCPS::Transport_debug_level, and writers_.
Referenced by customize_queue_element().
00766 { 00767 // Set the ReaderInfo::durable_timestamp_ for the case where no 00768 // durable samples exist in the DataWriter. 00769 if (writer != writers_.end() && writer->second.durable_) { 00770 const ACE_Time_Value now = ACE_OS::gettimeofday(); 00771 RepoId sub = GUID_UNKNOWN; 00772 if (body && header.message_length_ >= sizeof(sub)) { 00773 std::memcpy(&sub, body->rd_ptr(), header.message_length_); 00774 } 00775 typedef ReaderInfoMap::iterator iter_t; 00776 if (sub == GUID_UNKNOWN) { 00777 if (Transport_debug_level > 3) { 00778 const GuidConverter conv(writer->first); 00779 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples " 00780 "local %C all readers\n", OPENDDS_STRING(conv).c_str())); 00781 } 00782 for (iter_t iter = writer->second.remote_readers_.begin(); 00783 iter != writer->second.remote_readers_.end(); ++iter) { 00784 if (iter->second.durable_) { 00785 iter->second.durable_timestamp_ = now; 00786 } 00787 } 00788 } else { 00789 iter_t iter = writer->second.remote_readers_.find(sub); 00790 if (iter != writer->second.remote_readers_.end()) { 00791 if (iter->second.durable_) { 00792 iter->second.durable_timestamp_ = now; 00793 if (Transport_debug_level > 3) { 00794 const GuidConverter conv(writer->first), sub_conv(sub); 00795 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples" 00796 " local %C remote %C\n", OPENDDS_STRING(conv).c_str(), 00797 OPENDDS_STRING(sub_conv).c_str())); 00798 } 00799 } 00800 } 00801 } 00802 } 00803 }
void OpenDDS::DCPS::RtpsUdpDataLink::extend_bitmap_range | ( | RTPS::FragmentNumberSet & | fnSet, | |
CORBA::ULong | extent | |||
) | [static, private] |
Extend the FragmentNumberSet to cover the fragments that are missing from our last known fragment to the extent
fnSet | FragmentNumberSet for the message sequence number in question | |
extent | is the highest fragement sequence number for this FragmentNumberSet |
Definition at line 1513 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), and OpenDDS::RTPS::FragmentNumberSet::numBits.
Referenced by generate_nack_frags().
01515 { 01516 if (extent < fnSet.bitmapBase.value) { 01517 return; // can't extend to some number under the base 01518 } 01519 // calculate the index to the extent to determine the new_num_bits 01520 const CORBA::ULong new_num_bits = std::min(CORBA::ULong(255), 01521 extent - fnSet.bitmapBase.value + 1), 01522 len = (new_num_bits + 31) / 32; 01523 if (new_num_bits < fnSet.numBits) { 01524 return; // bitmap already extends past "extent" 01525 } 01526 fnSet.bitmap.length(len); 01527 // We are missing from one past old bitmap end to the new end 01528 DisjointSequence::fill_bitmap_range(fnSet.numBits + 1, new_num_bits, 01529 fnSet.bitmap.get_buffer(), len, 01530 fnSet.numBits); 01531 }
size_t OpenDDS::DCPS::RtpsUdpDataLink::generate_nack_frags | ( | OPENDDS_VECTOR(RTPS::NackFragSubmessage)& | nack_frags, | |
WriterInfo & | wi, | |||
const RepoId & | pub_id | |||
) | [private] |
Definition at line 1428 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::ENTITYID_UNKNOWN, extend_bitmap_range(), OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::hb_range_, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::DisjointSequence::last_ack(), OpenDDS::RTPS::NACK_FRAG, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::nackfrag_count_, OpenDDS::RTPS::FragmentNumberSet::numBits, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_VECTOR(), recv_strategy_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::NackFragSubmessage::smHeader, and OpenDDS::RTPS::NackFragSubmessage::writerSN.
Referenced by send_ack_nacks().
01430 { 01431 typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t; 01432 typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t; 01433 RtpsUdpReceiveStrategy::FragmentInfo frag_info; 01434 01435 // Populate frag_info with two possible sources of NackFrags: 01436 // 1. sequence #s in the reception gaps that we have partially received 01437 OPENDDS_VECTOR(SequenceRange) missing = wi.recvd_.missing_sequence_ranges(); 01438 for (size_t i = 0; i < missing.size(); ++i) { 01439 recv_strategy_->has_fragments(missing[i], pub_id, &frag_info); 01440 } 01441 // 1b. larger than the last received seq# but less than the heartbeat.lastSN 01442 if (!wi.recvd_.empty()) { 01443 const SequenceRange range(wi.recvd_.high(), wi.hb_range_.second); 01444 recv_strategy_->has_fragments(range, pub_id, &frag_info); 01445 } 01446 for (size_t i = 0; i < frag_info.size(); ++i) { 01447 // If we've received a HeartbeatFrag, we know the last (available) frag # 01448 const iter_t heartbeat_frag = wi.frags_.find(frag_info[i].first); 01449 if (heartbeat_frag != wi.frags_.end()) { 01450 extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value); 01451 } 01452 } 01453 01454 // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag 01455 const iter_t low = wi.frags_.lower_bound(wi.recvd_.cumulative_ack()), 01456 high = wi.frags_.upper_bound(wi.recvd_.last_ack()), 01457 end = wi.frags_.end(); 01458 for (iter_t iter = wi.frags_.begin(); iter != end; ++iter) { 01459 if (iter == low) { 01460 // skip over the range covered by step #1 above 01461 if (high == end) { 01462 break; 01463 } 01464 iter = high; 01465 } 01466 01467 const SequenceRange range(iter->first, iter->first); 01468 if (recv_strategy_->has_fragments(range, pub_id, &frag_info)) { 01469 extend_bitmap_range(frag_info.back().second, iter->second.value); 01470 } else { 01471 // it was not in the recv strategy, so the entire range is "missing" 01472 frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet())); 01473 RTPS::FragmentNumberSet& fnSet = frag_info.back().second; 01474 fnSet.bitmapBase.value = 1; 01475 fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value); 01476 fnSet.bitmap.length((fnSet.numBits + 31) / 32); 01477 for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) { 01478 fnSet.bitmap[i] = 0xFFFFFFFF; 01479 } 01480 } 01481 } 01482 01483 if (frag_info.empty()) { 01484 return 0; 01485 } 01486 01487 const RTPS::NackFragSubmessage nackfrag_prototype = { 01488 {RTPS::NACK_FRAG, 1 /*FLAG_E*/, 0 /* length set below */}, 01489 ENTITYID_UNKNOWN, // readerId will be filled-in by send_heartbeat_replies() 01490 ENTITYID_UNKNOWN, // writerId will be filled-in by send_heartbeat_replies() 01491 {0, 0}, // writerSN set below 01492 RTPS::FragmentNumberSet(), // fragmentNumberState set below 01493 {0} // count set below 01494 }; 01495 01496 size_t size = 0, padding = 0; 01497 for (size_t i = 0; i < frag_info.size(); ++i) { 01498 nf.push_back(nackfrag_prototype); 01499 RTPS::NackFragSubmessage& nackfrag = nf.back(); 01500 nackfrag.writerSN.low = frag_info[i].first.getLow(); 01501 nackfrag.writerSN.high = frag_info[i].first.getHigh(); 01502 nackfrag.fragmentNumberState = frag_info[i].second; 01503 nackfrag.count.value = ++wi.nackfrag_count_; 01504 const size_t before_size = size; 01505 gen_find_size(nackfrag, size, padding); 01506 nackfrag.smHeader.submessageLength = 01507 static_cast<CORBA::UShort>(size - before_size) - RTPS::SMHDR_SZ; 01508 } 01509 return size; 01510 }
ACE_INET_Addr OpenDDS::DCPS::RtpsUdpDataLink::get_locator | ( | const RepoId & | remote_id | ) | const |
Definition at line 212 of file RtpsUdpDataLink.cpp.
References OPENDDS_MAP_CMP(), and OPENDDS_STRING.
Referenced by durability_resend(), get_locators(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i(), and send_durability_gaps().
00213 { 00214 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t; 00215 const iter_t iter = locators_.find(remote_id); 00216 if (iter == locators_.end()) { 00217 const GuidConverter conv(remote_id); 00218 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::get_locator_i() - " 00219 "no locator found for peer %C\n", OPENDDS_STRING(conv).c_str())); 00220 return ACE_INET_Addr(); 00221 } 00222 return iter->second.addr_; 00223 }
void OpenDDS::DCPS::RtpsUdpDataLink::get_locators | ( | const RepoId & | local_id, | |
OPENDDS_SET(ACE_INET_Addr)& | addrs | |||
) | const |
Given a 'local_id' of a publication or subscription, populate the set of 'addrs' with the network addresses of any remote peers (or if 'local_id' is GUID_UNKNOWN, all known addresses).
Definition at line 187 of file RtpsUdpDataLink.cpp.
References get_locator(), OpenDDS::DCPS::GUID_UNKNOWN, OPENDDS_MAP_CMP(), and OpenDDS::DCPS::DataLink::peer_ids().
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i(), and send_heartbeats_manual().
00189 { 00190 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t; 00191 00192 if (local_id == GUID_UNKNOWN) { 00193 for (iter_t iter = locators_.begin(); iter != locators_.end(); ++iter) { 00194 addrs.insert(iter->second.addr_); 00195 } 00196 return; 00197 } 00198 00199 const GUIDSeq_var peers = peer_ids(local_id); 00200 if (!peers.ptr()) { 00201 return; 00202 } 00203 for (CORBA::ULong i = 0; i < peers->length(); ++i) { 00204 const ACE_INET_Addr addr = get_locator(peers[i]); 00205 if (addr != ACE_INET_Addr()) { 00206 addrs.insert(addr); 00207 } 00208 } 00209 }
ACE_INLINE ACE_Reactor * OpenDDS::DCPS::RtpsUdpDataLink::get_reactor | ( | ) |
Definition at line 30 of file RtpsUdpDataLink.inl.
References reactor_task_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::cancel(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::disable(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::enable(), OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().
00031 { 00032 if (reactor_task_ == 0) return 0; 00033 return reactor_task_->get_reactor(); 00034 }
const GuidPrefix_t& OpenDDS::DCPS::RtpsUdpDataLink::local_prefix | ( | ) | const [inline] |
Definition at line 83 of file RtpsUdpDataLink.h.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), and OpenDDS::DCPS::RtpsUdpSendStrategy::RtpsUdpSendStrategy().
00083 { return local_prefix_; }
ACE_Message_Block * OpenDDS::DCPS::RtpsUdpDataLink::marshal_gaps | ( | const RepoId & | writer, | |
const RepoId & | reader, | |||
const DisjointSequence & | gaps, | |||
bool | durable = false | |||
) | [private] |
Definition at line 2055 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, bitmap_num_longs(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::GAP, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::DisjointSequence::to_bitmap(), OpenDDS::DCPS::Transport_debug_level, and writers_.
Referenced by send_durability_gaps(), and send_nack_replies().
02057 { 02058 using namespace RTPS; 02059 // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range 02060 // [gapStart, gapListBase) and those in the SNSet. 02061 const SequenceNumber firstMissing = gaps.low(), 02062 base = ++SequenceNumber(gaps.cumulative_ack()); 02063 const SequenceNumber_t gapStart = {firstMissing.getHigh(), 02064 firstMissing.getLow()}, 02065 gapListBase = {base.getHigh(), base.getLow()}; 02066 CORBA::ULong num_bits = 0; 02067 LongSeq8 bitmap; 02068 02069 if (gaps.disjoint()) { 02070 bitmap.length(bitmap_num_longs(base, gaps.high())); 02071 gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits); 02072 02073 } else { 02074 bitmap.length(1); 02075 bitmap[0] = 0; 02076 num_bits = 1; 02077 } 02078 02079 GapSubmessage gap = { 02080 {GAP, 1 /*FLAG_E*/, 0 /*length determined below*/}, 02081 reader.entityId, 02082 writer.entityId, 02083 gapStart, 02084 {gapListBase, num_bits, bitmap} 02085 }; 02086 02087 if (Transport_debug_level > 5) { 02088 const GuidConverter conv(writer); 02089 SequenceRange sr; 02090 sr.first.setValue(gap.gapStart.high, gap.gapStart.low); 02091 SequenceNumber srbase; 02092 srbase.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low); 02093 sr.second = srbase.previous(); 02094 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::marshal_gaps " 02095 "GAP with range [%q, %q] from %C\n", 02096 sr.first.getValue(), sr.second.getValue(), 02097 OPENDDS_STRING(conv).c_str())); 02098 } 02099 02100 size_t gap_size = 0, padding = 0; 02101 gen_find_size(gap, gap_size, padding); 02102 gap.smHeader.submessageLength = 02103 static_cast<CORBA::UShort>(gap_size + padding) - SMHDR_SZ; 02104 02105 // For durable writers, change a non-directed Gap into multiple directed gaps. 02106 OPENDDS_VECTOR(RepoId) readers; 02107 if (durable && reader.entityId == ENTITYID_UNKNOWN) { 02108 if (Transport_debug_level > 5) { 02109 const GuidConverter local_conv(writer); 02110 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps local %C " 02111 "durable writer\n", OPENDDS_STRING(local_conv).c_str())); 02112 } 02113 const RtpsWriterMap::iterator iter = writers_.find(writer); 02114 RtpsWriter& rw = iter->second; 02115 for (ReaderInfoMap::iterator ri = rw.remote_readers_.begin(); 02116 ri != rw.remote_readers_.end(); ++ri) { 02117 if (!ri->second.expecting_durable_data()) { 02118 readers.push_back(ri->first); 02119 } else if (Transport_debug_level > 5) { 02120 const GuidConverter remote_conv(ri->first); 02121 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps reader " 02122 "%C is expecting durable data, no GAP sent\n", 02123 OPENDDS_STRING(remote_conv).c_str())); 02124 } 02125 } 02126 if (readers.empty()) return 0; 02127 } 02128 02129 const size_t size_per_idst = INFO_DST_SZ + SMHDR_SZ, 02130 prefix_sz = sizeof(reader.guidPrefix); 02131 // no additional padding needed for INFO_DST 02132 const size_t total_sz = readers.empty() ? (gap_size + padding) : 02133 (readers.size() * (gap_size + padding + size_per_idst)); 02134 02135 ACE_Message_Block* mb_gap = new ACE_Message_Block(total_sz); 02136 //FUTURE: allocators? 02137 // byte swapping is handled in the operator<<() implementation 02138 Serializer ser(mb_gap, false, Serializer::ALIGN_CDR); 02139 if (readers.empty()) { 02140 ser << gap; 02141 } else { 02142 InfoDestinationSubmessage idst = { 02143 {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ}, 02144 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 02145 }; 02146 for (size_t i = 0; i < readers.size(); ++i) { 02147 std::memcpy(idst.guidPrefix, readers[i].guidPrefix, prefix_sz); 02148 gap.readerId = readers[i].entityId; 02149 ser << idst; 02150 ser << gap; 02151 } 02152 } 02153 return mb_gap; 02154 }
ACE_INLINE ACE_SOCK_Dgram_Mcast & OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket | ( | ) |
Definition at line 50 of file RtpsUdpDataLink.inl.
References multicast_socket_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().
00051 { 00052 return multicast_socket_; 00053 }
bool OpenDDS::DCPS::RtpsUdpDataLink::open | ( | const ACE_SOCK_Dgram & | unicast_socket | ) |
Definition at line 133 of file RtpsUdpDataLink.cpp.
References config_, multi_buff_, OpenDDS::DCPS::RtpsUdpInst::multicast_group_address_, OpenDDS::DCPS::RtpsUdpInst::multicast_interface_, multicast_socket_, OPENDDS_STRING, recv_strategy_, send_strategy_, OpenDDS::DCPS::set_socket_multicast_ttl(), OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::RtpsUdpInst::ttl_, unicast_socket_, and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.
00134 { 00135 unicast_socket_ = unicast_socket; 00136 00137 if (config_->use_multicast_) { 00138 const OPENDDS_STRING& net_if = config_->multicast_interface_; 00139 #ifdef ACE_HAS_MAC_OSX 00140 multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | 00141 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE); 00142 #endif 00143 if (multicast_socket_.join(config_->multicast_group_address_, 1, 00144 net_if.empty() ? 0 : 00145 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) { 00146 ACE_ERROR_RETURN((LM_ERROR, 00147 ACE_TEXT("(%P|%t) ERROR: ") 00148 ACE_TEXT("RtpsUdpDataLink::open: ") 00149 ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed: %m\n")), 00150 false); 00151 } 00152 } 00153 00154 if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, config_->ttl_)) { 00155 ACE_ERROR_RETURN((LM_ERROR, 00156 ACE_TEXT("(%P|%t) ERROR: ") 00157 ACE_TEXT("RtpsUdpDataLink::open: ") 00158 ACE_TEXT("failed to set TTL: %d\n"), 00159 config_->ttl_), 00160 false); 00161 } 00162 00163 send_strategy_->send_buffer(&multi_buff_); 00164 00165 if (start(static_rchandle_cast<TransportSendStrategy>(send_strategy_), 00166 static_rchandle_cast<TransportStrategy>(recv_strategy_)) != 0) { 00167 stop_i(); 00168 ACE_ERROR_RETURN((LM_ERROR, 00169 ACE_TEXT("(%P|%t) ERROR: ") 00170 ACE_TEXT("UdpDataLink::open: start failed!\n")), 00171 false); 00172 } 00173 00174 return true; 00175 }
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
CORBA::Long | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
RtpsReader | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
WriterInfo | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
RtpsWriter | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
ReaderInfo | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
RemoteInfo | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP | ( | RepoId | , | |
OPENDDS_VECTOR(RepoId) | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
Referenced by get_locator(), get_locators(), and requires_inline_qos().
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP | ( | RepoId | , | |
InterestingRemote | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP | ( | RepoId | , | |
RtpsReaderMap::iterator | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET | ( | InterestingAckNack | ) | [private] |
OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR | ( | CallbackType | ) | [private] |
OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR | ( | CallbackType | ) | [private] |
void OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i | ( | ) | [virtual] |
Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 377 of file RtpsUdpDataLink.cpp.
References DBG_ENTRY_LVL, heartbeat_counts_, lock_, OPENDDS_MULTIMAP, OPENDDS_SET(), OPENDDS_VECTOR(), OpenDDS::DCPS::DataLink::pre_stop_i(), and writers_.
00378 { 00379 DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6); 00380 DataLink::pre_stop_i(); 00381 OPENDDS_VECTOR(TransportQueueElement*) to_deliver; 00382 OPENDDS_VECTOR(TransportQueueElement*) to_drop; 00383 { 00384 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00385 00386 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 00387 00388 RtpsWriterMap::iterator iter = writers_.begin(); 00389 while (iter != writers_.end()) { 00390 RtpsWriter& writer = iter->second; 00391 if (!writer.to_deliver_.empty()) { 00392 iter_t iter = writer.to_deliver_.begin(); 00393 while (iter != writer.to_deliver_.end()) { 00394 to_deliver.push_back(iter->second); 00395 writer.to_deliver_.erase(iter); 00396 iter = writer.to_deliver_.begin(); 00397 } 00398 } 00399 if (!writer.elems_not_acked_.empty()) { 00400 OPENDDS_SET(SequenceNumber) sns_to_release; 00401 iter_t iter = writer.elems_not_acked_.begin(); 00402 while (iter != writer.elems_not_acked_.end()) { 00403 to_drop.push_back(iter->second); 00404 sns_to_release.insert(iter->first); 00405 writer.elems_not_acked_.erase(iter); 00406 iter = writer.elems_not_acked_.begin(); 00407 } 00408 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin(); 00409 while (sns_it != sns_to_release.end()) { 00410 writer.send_buff_->release_acked(*sns_it); 00411 ++sns_it; 00412 } 00413 } 00414 RtpsWriterMap::iterator last = iter; 00415 ++iter; 00416 heartbeat_counts_.erase(last->first); 00417 writers_.erase(last); 00418 } 00419 } 00420 typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter; 00421 tqe_iter deliver_it = to_deliver.begin(); 00422 while (deliver_it != to_deliver.end()) { 00423 (*deliver_it)->data_delivered(); 00424 ++deliver_it; 00425 } 00426 tqe_iter drop_it = to_drop.begin(); 00427 while (drop_it != to_drop.end()) { 00428 (*drop_it)->data_dropped(true); 00429 ++drop_it; 00430 } 00431 }
void OpenDDS::DCPS::RtpsUdpDataLink::process_acked_by_all_i | ( | ACE_Guard< ACE_Thread_Mutex > & | g, | |
const RepoId & | pub_id | |||
) | [private] |
Definition at line 1984 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_, OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_MULTIMAP, OPENDDS_SET(), OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::to_deliver_, and writers_.
Referenced by received().
01985 { 01986 using namespace OpenDDS::RTPS; 01987 typedef RtpsWriterMap::iterator rw_iter; 01988 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 01989 OPENDDS_VECTOR(RepoId) to_check; 01990 rw_iter rw = writers_.find(pub_id); 01991 if (rw == writers_.end()) { 01992 return; 01993 } 01994 RtpsWriter& writer = rw->second; 01995 if (!writer.elems_not_acked_.empty()) { 01996 01997 //start with the max sequence number writer knows about and decrease 01998 //by what the min over all readers is 01999 SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE; 02000 02001 typedef ReaderInfoMap::iterator ri_iter; 02002 const ri_iter end = writer.remote_readers_.end(); 02003 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) { 02004 if (ri->second.cur_cumulative_ack_ < all_readers_ack) { 02005 all_readers_ack = ri->second.cur_cumulative_ack_; 02006 } 02007 } 02008 if (all_readers_ack == SequenceNumber::MAX_VALUE) { 02009 return; 02010 } 02011 OPENDDS_VECTOR(SequenceNumber) sns; 02012 //if any messages fully acked, call data delivered and remove from map 02013 02014 iter_t it = writer.elems_not_acked_.begin(); 02015 OPENDDS_SET(SequenceNumber) sns_to_release; 02016 while (it != writer.elems_not_acked_.end()) { 02017 if (it->first < all_readers_ack) { 02018 writer.to_deliver_.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second)); 02019 sns_to_release.insert(it->first); 02020 iter_t last = it; 02021 ++it; 02022 writer.elems_not_acked_.erase(last); 02023 } else { 02024 break; 02025 } 02026 } 02027 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin(); 02028 while (sns_it != sns_to_release.end()) { 02029 writer.send_buff_->release_acked(*sns_it); 02030 ++sns_it; 02031 } 02032 TransportQueueElement* tqe_to_deliver; 02033 02034 while (true) { 02035 rw_iter deliver_on_writer = writers_.find(pub_id); 02036 if (deliver_on_writer == writers_.end()) { 02037 break; 02038 } 02039 RtpsWriter& writer = deliver_on_writer->second; 02040 iter_t to_deliver_iter = writer.to_deliver_.begin(); 02041 if (to_deliver_iter == writer.to_deliver_.end()) { 02042 break; 02043 } 02044 tqe_to_deliver = to_deliver_iter->second; 02045 writer.to_deliver_.erase(to_deliver_iter); 02046 g.release(); 02047 02048 tqe_to_deliver->data_delivered(); 02049 g.acquire(); 02050 } 02051 } 02052 }
bool OpenDDS::DCPS::RtpsUdpDataLink::process_data_i | ( | const RTPS::DataSubmessage & | data, | |
const RepoId & | src, | |||
RtpsReaderMap::value_type & | rr | |||
) | [private] |
Definition at line 925 of file RtpsUdpDataLink.cpp.
References deliver_held_data(), OpenDDS::DCPS::SequenceNumber::getValue(), OPENDDS_STRING, OpenDDS::DCPS::SequenceNumber::previous(), recv_strategy_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, and OpenDDS::RTPS::DataSubmessage::writerSN.
Referenced by received().
00928 { 00929 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src); 00930 if (wi != rr.second.remote_writers_.end()) { 00931 WriterInfo& info = wi->second; 00932 SequenceNumber seq; 00933 seq.setValue(data.writerSN.high, data.writerSN.low); 00934 info.frags_.erase(seq); 00935 const RepoId& readerId = rr.first; 00936 if (info.recvd_.contains(seq)) { 00937 if (Transport_debug_level > 5) { 00938 GuidConverter writer(src); 00939 GuidConverter reader(readerId); 00940 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -") 00941 ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because ALREADY received\n"), 00942 seq.getValue(), 00943 OPENDDS_STRING(writer).c_str(), 00944 OPENDDS_STRING(reader).c_str())); 00945 } 00946 recv_strategy_->withhold_data_from(readerId); 00947 } else if (info.recvd_.disjoint() || 00948 (!info.recvd_.empty() && info.recvd_.cumulative_ack() != seq.previous()) 00949 || (rr.second.durable_ && !info.recvd_.empty() && info.recvd_.low() > 1) 00950 || (rr.second.durable_ && info.recvd_.empty() && seq > 1)) { 00951 if (Transport_debug_level > 5) { 00952 GuidConverter writer(src); 00953 GuidConverter reader(readerId); 00954 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -") 00955 ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because can't receive yet\n"), 00956 seq.getValue(), 00957 OPENDDS_STRING(writer).c_str(), 00958 OPENDDS_STRING(reader).c_str())); 00959 } 00960 const ReceivedDataSample* sample = 00961 recv_strategy_->withhold_data_from(readerId); 00962 info.held_.insert(std::make_pair(seq, *sample)); 00963 } else { 00964 if (Transport_debug_level > 5) { 00965 GuidConverter writer(src); 00966 GuidConverter reader(readerId); 00967 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -") 00968 ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"), 00969 seq.getValue(), 00970 OPENDDS_STRING(writer).c_str(), 00971 OPENDDS_STRING(reader).c_str())); 00972 } 00973 recv_strategy_->do_not_withhold_data_from(readerId); 00974 } 00975 info.recvd_.insert(seq); 00976 deliver_held_data(readerId, info, rr.second.durable_); 00977 } else { 00978 if (Transport_debug_level > 5) { 00979 GuidConverter writer(src); 00980 GuidConverter reader(rr.first); 00981 SequenceNumber seq; 00982 seq.setValue(data.writerSN.high, data.writerSN.low); 00983 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -") 00984 ACE_TEXT(" data seq: %q from %C to %C OK to deliver (Writer not currently in Reader remote writer map)\n"), 00985 seq.getValue(), 00986 OPENDDS_STRING(writer).c_str(), 00987 OPENDDS_STRING(reader).c_str())); 00988 } 00989 recv_strategy_->do_not_withhold_data_from(rr.first); 00990 } 00991 return false; 00992 }
bool OpenDDS::DCPS::RtpsUdpDataLink::process_gap_i | ( | const RTPS::GapSubmessage & | gap, | |
const RepoId & | src, | |||
RtpsReaderMap::value_type & | rr | |||
) | [private] |
Definition at line 1023 of file RtpsUdpDataLink.cpp.
References deliver_held_data(), OpenDDS::RTPS::GapSubmessage::gapList, OpenDDS::RTPS::GapSubmessage::gapStart, OPENDDS_STRING, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, and VDBG_LVL.
Referenced by received().
01025 { 01026 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src); 01027 if (wi != rr.second.remote_writers_.end()) { 01028 SequenceRange sr; 01029 sr.first.setValue(gap.gapStart.high, gap.gapStart.low); 01030 SequenceNumber base; 01031 base.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low); 01032 sr.second = base.previous(); 01033 if (sr.first <= sr.second) { 01034 if (Transport_debug_level > 5) { 01035 const GuidConverter conv(src); 01036 const GuidConverter rdr(rr.first); 01037 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_gap_i " 01038 "Reader %C received GAP with range [%q, %q] from %C\n", 01039 OPENDDS_STRING(rdr).c_str(), 01040 sr.first.getValue(), sr.second.getValue(), 01041 OPENDDS_STRING(conv).c_str())); 01042 } 01043 wi->second.recvd_.insert(sr); 01044 } else { 01045 const GuidConverter conv(src); 01046 VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_gap_i " 01047 "received GAP with invalid range [%q, %q] from %C\n", 01048 sr.first.getValue(), sr.second.getValue(), 01049 OPENDDS_STRING(conv).c_str()), 2); 01050 } 01051 wi->second.recvd_.insert(base, gap.gapList.numBits, 01052 gap.gapList.bitmap.get_buffer()); 01053 deliver_held_data(rr.first, wi->second, rr.second.durable_); 01054 //FUTURE: to support wait_for_acks(), notify DCPS layer of the GAP 01055 } 01056 return false; 01057 }
bool OpenDDS::DCPS::RtpsUdpDataLink::process_hb_frag_i | ( | const RTPS::HeartBeatFragSubmessage & | hb_frag, | |
const RepoId & | src, | |||
RtpsReaderMap::value_type & | rr | |||
) | [private] |
Definition at line 1541 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::HeartBeatFragSubmessage::count, OpenDDS::RTPS::HeartBeatFragSubmessage::lastFragmentNum, OpenDDS::DCPS::SequenceNumber::setValue(), and OpenDDS::RTPS::HeartBeatFragSubmessage::writerSN.
Referenced by received().
01544 { 01545 WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src); 01546 if (wi == rr.second.remote_writers_.end()) { 01547 // we may not be associated yet, even if the writer thinks we are 01548 return false; 01549 } 01550 01551 if (hb_frag.count.value <= wi->second.hb_frag_recvd_count_) { 01552 return false; 01553 } 01554 01555 wi->second.hb_frag_recvd_count_ = hb_frag.count.value; 01556 01557 SequenceNumber seq; 01558 seq.setValue(hb_frag.writerSN.high, hb_frag.writerSN.low); 01559 01560 // If seq is outside the heartbeat range or we haven't completely received 01561 // it yet, send a NackFrag along with the AckNack. The heartbeat range needs 01562 // to be checked first because recvd_ contains the numbers below the 01563 // heartbeat range (so that we don't NACK those). 01564 if (seq < wi->second.hb_range_.first || seq > wi->second.hb_range_.second 01565 || !wi->second.recvd_.contains(seq)) { 01566 wi->second.frags_[seq] = hb_frag.lastFragmentNum; 01567 wi->second.ack_pending_ = true; 01568 return true; // timer will invoke send_heartbeat_replies() 01569 } 01570 return false; 01571 }
bool OpenDDS::DCPS::RtpsUdpDataLink::process_heartbeat_i | ( | const RTPS::HeartBeatSubmessage & | heartbeat, | |
const RepoId & | src, | |||
RtpsReaderMap::value_type & | rr | |||
) | [private] |
Definition at line 1115 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::ack_pending_, OpenDDS::RTPS::HeartBeatSubmessage::count, deliver_held_data(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::hb_range_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::heartbeat_recvd_count_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::initial_hb_, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::RTPS::HeartBeatSubmessage::lastSN, OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::SequenceNumber::previous(), recv_strategy_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::should_nack(), OpenDDS::RTPS::HeartBeatSubmessage::smHeader, and OpenDDS::DCPS::SequenceNumber::ZERO().
Referenced by received().
01118 { 01119 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src); 01120 if (wi == rr.second.remote_writers_.end()) { 01121 // we may not be associated yet, even if the writer thinks we are 01122 return false; 01123 } 01124 01125 WriterInfo& info = wi->second; 01126 01127 if (heartbeat.count.value <= info.heartbeat_recvd_count_) { 01128 return false; 01129 } 01130 info.heartbeat_recvd_count_ = heartbeat.count.value; 01131 01132 SequenceNumber& first = info.hb_range_.first; 01133 first.setValue(heartbeat.firstSN.high, heartbeat.firstSN.low); 01134 SequenceNumber& last = info.hb_range_.second; 01135 last.setValue(heartbeat.lastSN.high, heartbeat.lastSN.low); 01136 static const SequenceNumber starting, zero = SequenceNumber::ZERO(); 01137 01138 DisjointSequence& recvd = info.recvd_; 01139 if (!rr.second.durable_ && info.initial_hb_) { 01140 if (last.getValue() < starting.getValue()) { 01141 // this is an invalid heartbeat -- last must be positive 01142 return false; 01143 } 01144 // For the non-durable reader, the first received HB or DATA establishes 01145 // a baseline of the lowest sequence number we'd ever need to NACK. 01146 if (recvd.empty() || recvd.low() >= last) { 01147 recvd.insert(SequenceRange(zero, 01148 (last > starting) ? last.previous() : zero)); 01149 } else { 01150 recvd.insert(SequenceRange(zero, recvd.low())); 01151 } 01152 } else if (!recvd.empty()) { 01153 // All sequence numbers below 'first' should not be NACKed. 01154 // The value of 'first' may not decrease with subsequent HBs. 01155 recvd.insert(SequenceRange(zero, 01156 (first > starting) ? first.previous() : zero)); 01157 } 01158 01159 deliver_held_data(rr.first, info, rr.second.durable_); 01160 01161 //FUTURE: to support wait_for_acks(), notify DCPS layer of the sequence 01162 // numbers we no longer expect to receive due to HEARTBEAT 01163 01164 info.initial_hb_ = false; 01165 01166 const bool final = heartbeat.smHeader.flags & 2 /* FLAG_F */, 01167 liveliness = heartbeat.smHeader.flags & 4 /* FLAG_L */; 01168 01169 if (!final || (!liveliness && (info.should_nack() || 01170 rr.second.nack_durable(info) || 01171 recv_strategy_->has_fragments(info.hb_range_, wi->first)))) { 01172 info.ack_pending_ = true; 01173 return true; // timer will invoke send_heartbeat_replies() 01174 } 01175 01176 //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC 01177 return false; 01178 }
ACE_INLINE bool OpenDDS::DCPS::RtpsUdpDataLink::reactor_is_shut_down | ( | ) |
Definition at line 37 of file RtpsUdpDataLink.inl.
References reactor_task_.
00038 { 00039 if (reactor_task_ == 0) return true; 00040 return reactor_task_->is_shut_down(); 00041 }
ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy | ( | RtpsUdpReceiveStrategy * | recv_strategy | ) |
Definition at line 18 of file RtpsUdpDataLink.inl.
References recv_strategy_.
00019 { 00020 recv_strategy_ = recv_strategy; 00021 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::NackFragSubmessage & | nackfrag, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1780 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::GUID_t::guidPrefix, local_prefix_, lock_, nack_reply_, OPENDDS_STRING, OpenDDS::RTPS::NackFragSubmessage::readerId, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, VDBG, OpenDDS::RTPS::NackFragSubmessage::writerId, writers_, and OpenDDS::RTPS::NackFragSubmessage::writerSN.
01782 { 01783 // local side is DW 01784 RepoId local; 01785 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t)); 01786 local.entityId = nackfrag.writerId; // can't be ENTITYID_UNKNOWN 01787 01788 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01789 const RtpsWriterMap::iterator rw = writers_.find(local); 01790 if (rw == writers_.end()) { 01791 if (Transport_debug_level > 5) { 01792 GuidConverter local_conv(local); 01793 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) " 01794 "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str())); 01795 } 01796 return; 01797 } 01798 01799 RepoId remote; 01800 std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t)); 01801 remote.entityId = nackfrag.readerId; 01802 01803 if (Transport_debug_level > 5) { 01804 GuidConverter local_conv(local), remote_conv(remote); 01805 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) " 01806 "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(), 01807 OPENDDS_STRING(remote_conv).c_str())); 01808 } 01809 01810 const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote); 01811 if (ri == rw->second.remote_readers_.end()) { 01812 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) " 01813 "WARNING ReaderInfo not found\n")); 01814 return; 01815 } 01816 01817 if (nackfrag.count.value <= ri->second.nackfrag_recvd_count_) { 01818 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) " 01819 "WARNING Count indicates duplicate, dropping\n")); 01820 return; 01821 } 01822 01823 ri->second.nackfrag_recvd_count_ = nackfrag.count.value; 01824 01825 SequenceNumber seq; 01826 seq.setValue(nackfrag.writerSN.high, nackfrag.writerSN.low); 01827 ri->second.requested_frags_[seq] = nackfrag.fragmentNumberState; 01828 g.release(); 01829 nack_reply_.schedule(); // timer will invoke send_nack_replies() 01830 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::AckNackSubmessage & | acknack, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1577 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::AckNackSubmessage::count, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::DisjointSequence::dump(), durability_resend(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::DisjointSequence::insert(), interesting_readers_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), local_prefix_, lock_, OpenDDS::DCPS::DisjointSequence::low(), nack_reply_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), process_acked_by_all_i(), OpenDDS::RTPS::AckNackSubmessage::readerId, OpenDDS::RTPS::AckNackSubmessage::readerSNState, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), send_durability_gaps(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::RTPS::AckNackSubmessage::smHeader, OpenDDS::DCPS::Transport_debug_level, VDBG, OpenDDS::RTPS::AckNackSubmessage::writerId, writers_, and OpenDDS::DCPS::SequenceNumber::ZERO().
01579 { 01580 // local side is DW 01581 RepoId local; 01582 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t)); 01583 local.entityId = acknack.writerId; // can't be ENTITYID_UNKNOWN 01584 01585 RepoId remote; 01586 std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t)); 01587 remote.entityId = acknack.readerId; 01588 01589 const ACE_Time_Value now = ACE_OS::gettimeofday(); 01590 OPENDDS_VECTOR(DiscoveryListener*) callbacks; 01591 01592 { 01593 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01594 for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote), 01595 limit = interesting_readers_.upper_bound(remote); 01596 pos != limit; 01597 ++pos) { 01598 pos->second.last_activity = now; 01599 // Ensure the acknack was for the writer. 01600 if (local == pos->second.localid) { 01601 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) { 01602 callbacks.push_back(pos->second.listener); 01603 pos->second.status = InterestingRemote::EXISTS; 01604 } 01605 } 01606 } 01607 } 01608 01609 for (size_t i = 0; i < callbacks.size(); ++i) { 01610 callbacks[i]->reader_exists(remote, local); 01611 } 01612 01613 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01614 const RtpsWriterMap::iterator rw = writers_.find(local); 01615 if (rw == writers_.end()) { 01616 if (Transport_debug_level > 5) { 01617 GuidConverter local_conv(local); 01618 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) " 01619 "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str())); 01620 } 01621 return; 01622 } 01623 01624 if (Transport_debug_level > 5) { 01625 GuidConverter local_conv(local), remote_conv(remote); 01626 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) " 01627 "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(), 01628 OPENDDS_STRING(remote_conv).c_str())); 01629 } 01630 01631 const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote); 01632 if (ri == rw->second.remote_readers_.end()) { 01633 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) " 01634 "WARNING ReaderInfo not found\n")); 01635 return; 01636 } 01637 01638 if (acknack.count.value <= ri->second.acknack_recvd_count_) { 01639 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) " 01640 "WARNING Count indicates duplicate, dropping\n")); 01641 return; 01642 } 01643 01644 ri->second.acknack_recvd_count_ = acknack.count.value; 01645 01646 if (!ri->second.handshake_done_) { 01647 ri->second.handshake_done_ = true; 01648 invoke_on_start_callbacks(true); 01649 } 01650 01651 OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks; 01652 const bool final = acknack.smHeader.flags & 2 /* FLAG_F */; 01653 01654 if (!ri->second.durable_data_.empty()) { 01655 if (Transport_debug_level > 5) { 01656 const GuidConverter local_conv(local), remote_conv(remote); 01657 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01658 "local %C has durable for remote %C\n", 01659 OPENDDS_STRING(local_conv).c_str(), 01660 OPENDDS_STRING(remote_conv).c_str())); 01661 } 01662 SequenceNumber ack; 01663 ack.setValue(acknack.readerSNState.bitmapBase.high, 01664 acknack.readerSNState.bitmapBase.low); 01665 const SequenceNumber& dd_last = ri->second.durable_data_.rbegin()->first; 01666 if (ack > dd_last) { 01667 // Reader acknowledges durable data, we no longer need to store it 01668 ri->second.durable_data_.swap(pendingCallbacks); 01669 if (Transport_debug_level > 5) { 01670 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01671 "durable data acked\n")); 01672 } 01673 } else { 01674 DisjointSequence requests; 01675 if (!requests.insert(ack, acknack.readerSNState.numBits, 01676 acknack.readerSNState.bitmap.get_buffer()) 01677 && !final && ack == rw->second.heartbeat_high(ri->second)) { 01678 // This is a non-final AckNack with no bits in the bitmap. 01679 // Attempt to reply to a request for the "base" value which 01680 // is neither Acked nor Nacked, only when it's the HB high. 01681 if (ri->second.durable_data_.count(ack)) requests.insert(ack); 01682 } 01683 // Attempt to reply to nacks for durable data 01684 bool sent_some = false; 01685 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 01686 iter_t it = ri->second.durable_data_.begin(); 01687 const OPENDDS_VECTOR(SequenceRange) psr = requests.present_sequence_ranges(); 01688 SequenceNumber lastSent = SequenceNumber::ZERO(); 01689 if (!requests.empty()) { 01690 lastSent = requests.low().previous(); 01691 } 01692 DisjointSequence gaps; 01693 for (size_t i = 0; i < psr.size(); ++i) { 01694 for (; it != ri->second.durable_data_.end() 01695 && it->first < psr[i].first; ++it) ; // empty for-loop 01696 for (; it != ri->second.durable_data_.end() 01697 && it->first <= psr[i].second; ++it) { 01698 if (Transport_debug_level > 5) { 01699 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01700 "durable resend %d\n", int(it->first.getValue()))); 01701 } 01702 durability_resend(it->second); 01703 //FUTURE: combine multiple resends into one RTPS Message? 01704 sent_some = true; 01705 if (it->first > lastSent + 1) { 01706 gaps.insert(SequenceRange(lastSent + 1, it->first.previous())); 01707 } 01708 lastSent = it->first; 01709 } 01710 if (sent_some && lastSent < psr[i].second && psr[i].second < dd_last) { 01711 gaps.insert(SequenceRange(lastSent + 1, psr[i].second)); 01712 } 01713 } 01714 if (!gaps.empty()) { 01715 if (Transport_debug_level > 5) { 01716 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01717 "sending durability gaps: ")); 01718 gaps.dump(); 01719 } 01720 send_durability_gaps(local, remote, gaps); 01721 } 01722 if (sent_some) { 01723 return; 01724 } 01725 const SequenceNumber& dd_first = ri->second.durable_data_.begin()->first; 01726 if (!requests.empty() && requests.high() < dd_first) { 01727 // All nacks were below the start of the durable data. 01728 if (Transport_debug_level > 5) { 01729 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01730 "sending durability gaps for all requests: ")); 01731 requests.dump(); 01732 } 01733 send_durability_gaps(local, remote, requests); 01734 return; 01735 } 01736 if (!requests.empty() && requests.low() < dd_first) { 01737 // Lowest nack was below the start of the durable data. 01738 for (size_t i = 0; i < psr.size(); ++i) { 01739 if (psr[i].first > dd_first) { 01740 break; 01741 } 01742 gaps.insert(SequenceRange(psr[i].first, 01743 std::min(psr[i].second, dd_first))); 01744 } 01745 if (Transport_debug_level > 5) { 01746 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) " 01747 "sending durability gaps for some requests: ")); 01748 gaps.dump(); 01749 } 01750 send_durability_gaps(local, remote, gaps); 01751 return; 01752 } 01753 } 01754 } 01755 SequenceNumber ack; 01756 ack.setValue(acknack.readerSNState.bitmapBase.high, 01757 acknack.readerSNState.bitmapBase.low); 01758 if (ack != SequenceNumber::SEQUENCENUMBER_UNKNOWN() 01759 && ack != SequenceNumber::ZERO()) { 01760 ri->second.cur_cumulative_ack_ = ack; 01761 } 01762 // If this ACKNACK was final, the DR doesn't expect a reply, and therefore 01763 // we don't need to do anything further. 01764 if (!final) { 01765 ri->second.requested_changes_.push_back(acknack.readerSNState); 01766 } 01767 process_acked_by_all_i(g, local); 01768 g.release(); 01769 if (!final) { 01770 nack_reply_.schedule(); // timer will invoke send_nack_replies() 01771 } 01772 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 01773 for (iter_t it = pendingCallbacks.begin(); 01774 it != pendingCallbacks.end(); ++it) { 01775 it->second->data_delivered(); 01776 } 01777 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::HeartBeatFragSubmessage & | hb_frag, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1534 of file RtpsUdpDataLink.cpp.
References datareader_dispatch(), and process_hb_frag_i().
01536 { 01537 datareader_dispatch(hb_frag, src_prefix, &RtpsUdpDataLink::process_hb_frag_i); 01538 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::HeartBeatSubmessage & | heartbeat, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1060 of file RtpsUdpDataLink.cpp.
References datareader_dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::DCPS::GUID_t::guidPrefix, heartbeat_reply_, interesting_ack_nacks_, interesting_writers_, lock_, OPENDDS_VECTOR(), process_heartbeat_i(), readers_, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), and OpenDDS::RTPS::HeartBeatSubmessage::writerId.
01062 { 01063 RepoId src; 01064 std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t)); 01065 src.entityId = heartbeat.writerId; 01066 01067 bool schedule_acknack = false; 01068 const ACE_Time_Value now = ACE_OS::gettimeofday(); 01069 OPENDDS_VECTOR(InterestingRemote) callbacks; 01070 01071 { 01072 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01073 01074 // We received a heartbeat from a writer. 01075 // We should ACKNACK if the writer is interesting and there is no association. 01076 01077 for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src), 01078 limit = interesting_writers_.upper_bound(src); 01079 pos != limit; 01080 ++pos) { 01081 const RepoId& writerid = src; 01082 const RepoId& readerid = pos->second.localid; 01083 01084 RtpsReaderMap::const_iterator riter = readers_.find(readerid); 01085 if (riter == readers_.end()) { 01086 // Reader has no associations. 01087 interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address)); 01088 } else if (riter->second.remote_writers_.find(writerid) == riter->second.remote_writers_.end()) { 01089 // Reader is not associated with this writer. 01090 interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address)); 01091 } 01092 pos->second.last_activity = now; 01093 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) { 01094 callbacks.push_back(pos->second); 01095 pos->second.status = InterestingRemote::EXISTS; 01096 } 01097 } 01098 01099 schedule_acknack = !interesting_ack_nacks_.empty(); 01100 } 01101 01102 for (size_t i = 0; i < callbacks.size(); ++i) { 01103 callbacks[i].listener->writer_exists(src, callbacks[i].localid); 01104 } 01105 01106 if (schedule_acknack) { 01107 heartbeat_reply_.schedule(); 01108 } 01109 01110 datareader_dispatch(heartbeat, src_prefix, 01111 &RtpsUdpDataLink::process_heartbeat_i); 01112 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::GapSubmessage & | gap, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 1016 of file RtpsUdpDataLink.cpp.
References datareader_dispatch(), and process_gap_i().
01018 { 01019 datareader_dispatch(gap, src_prefix, &RtpsUdpDataLink::process_gap_i); 01020 }
void OpenDDS::DCPS::RtpsUdpDataLink::received | ( | const RTPS::DataSubmessage & | data, | |
const GuidPrefix_t & | src_prefix | |||
) |
Definition at line 918 of file RtpsUdpDataLink.cpp.
References datareader_dispatch(), and process_data_i().
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample().
00920 { 00921 datareader_dispatch(data, src_prefix, &RtpsUdpDataLink::process_data_i); 00922 }
void OpenDDS::DCPS::RtpsUdpDataLink::register_for_reader | ( | const RepoId & | writerid, | |
const RepoId & | readerid, | |||
const ACE_INET_Addr & | address, | |||
OpenDDS::DCPS::DiscoveryListener * | listener | |||
) |
Definition at line 288 of file RtpsUdpDataLink.cpp.
References heartbeat_, heartbeat_counts_, interesting_readers_, lock_, and OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::schedule_enable().
00292 { 00293 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00294 bool enableheartbeat = interesting_readers_.empty(); 00295 interesting_readers_.insert(InterestingRemoteMapType::value_type(readerid, InterestingRemote(writerid, address, listener))); 00296 heartbeat_counts_[writerid] = 0; 00297 g.release(); 00298 if (enableheartbeat) { 00299 heartbeat_.schedule_enable(); 00300 } 00301 }
void OpenDDS::DCPS::RtpsUdpDataLink::register_for_writer | ( | const RepoId & | readerid, | |
const RepoId & | writerid, | |||
const ACE_INET_Addr & | address, | |||
OpenDDS::DCPS::DiscoveryListener * | listener | |||
) |
Definition at line 333 of file RtpsUdpDataLink.cpp.
References heartbeatchecker_, interesting_writers_, lock_, and OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::schedule_enable().
00337 { 00338 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00339 bool enableheartbeatchecker = interesting_writers_.empty(); 00340 interesting_writers_.insert(InterestingRemoteMapType::value_type(writerid, InterestingRemote(readerid, address, listener))); 00341 g.release(); 00342 if (enableheartbeatchecker) { 00343 heartbeatchecker_.schedule_enable(); 00344 } 00345 }
ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::release_remote_i | ( | const RepoId & | remote_id | ) | [private, virtual] |
void OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i | ( | const RepoId & | remote_id, | |
const RepoId & | local_id | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 444 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::GuidConverter::entityKind(), heartbeat_counts_, OpenDDS::DCPS::KIND_READER, OpenDDS::DCPS::KIND_WRITER, lock_, OPENDDS_MULTIMAP, OPENDDS_SET(), OPENDDS_VECTOR(), reader_index_, readers_, and writers_.
00446 { 00447 OPENDDS_VECTOR(TransportQueueElement*) to_deliver; 00448 OPENDDS_VECTOR(TransportQueueElement*) to_drop; 00449 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00450 using std::pair; 00451 const GuidConverter conv(local_id); 00452 const EntityKind kind = conv.entityKind(); 00453 if (kind == KIND_WRITER) { 00454 const RtpsWriterMap::iterator rw = writers_.find(local_id); 00455 00456 if (rw != writers_.end()) { 00457 rw->second.remote_readers_.erase(remote_id); 00458 00459 if (rw->second.remote_readers_.empty()) { 00460 RtpsWriter& writer = rw->second; 00461 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t; 00462 00463 if (!writer.to_deliver_.empty()) { 00464 iter_t iter = writer.to_deliver_.begin(); 00465 while (iter != writer.to_deliver_.end()) { 00466 to_deliver.push_back(iter->second); 00467 writer.to_deliver_.erase(iter); 00468 iter = writer.to_deliver_.begin(); 00469 } 00470 } 00471 if (!writer.elems_not_acked_.empty()) { 00472 OPENDDS_SET(SequenceNumber) sns_to_release; 00473 iter_t iter = writer.elems_not_acked_.begin(); 00474 while (iter != writer.elems_not_acked_.end()) { 00475 to_drop.push_back(iter->second); 00476 sns_to_release.insert(iter->first); 00477 writer.elems_not_acked_.erase(iter); 00478 iter = writer.elems_not_acked_.begin(); 00479 } 00480 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin(); 00481 while (sns_it != sns_to_release.end()) { 00482 writer.send_buff_->release_acked(*sns_it); 00483 ++sns_it; 00484 } 00485 } 00486 heartbeat_counts_.erase(rw->first); 00487 writers_.erase(rw); 00488 } 00489 } 00490 00491 } else if (kind == KIND_READER) { 00492 const RtpsReaderMap::iterator rr = readers_.find(local_id); 00493 00494 if (rr != readers_.end()) { 00495 rr->second.remote_writers_.erase(remote_id); 00496 00497 for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters = 00498 reader_index_.equal_range(remote_id); 00499 iters.first != iters.second;) { 00500 if (iters.first->second == rr) { 00501 reader_index_.erase(iters.first++); 00502 } else { 00503 ++iters.first; 00504 } 00505 } 00506 00507 if (rr->second.remote_writers_.empty()) { 00508 readers_.erase(rr); 00509 } 00510 } 00511 } 00512 g.release(); 00513 typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter; 00514 tqe_iter deliver_it = to_deliver.begin(); 00515 while (deliver_it != to_deliver.end()) { 00516 (*deliver_it)->data_delivered(); 00517 ++deliver_it; 00518 } 00519 tqe_iter drop_it = to_drop.begin(); 00520 while (drop_it != to_drop.end()) { 00521 (*drop_it)->data_dropped(true); 00522 ++drop_it; 00523 } 00524 }
bool OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos | ( | const PublicationId & | pub_id | ) | [private] |
Definition at line 806 of file RtpsUdpDataLink.cpp.
References force_inline_qos_, OPENDDS_MAP_CMP(), and OpenDDS::DCPS::DataLink::peer_ids().
Referenced by customize_queue_element().
00807 { 00808 if (force_inline_qos_) { 00809 // Force true for testing purposes 00810 return true; 00811 } else { 00812 const GUIDSeq_var peers = peer_ids(pub_id); 00813 if (!peers.ptr()) { 00814 return false; 00815 } 00816 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::iterator iter_t; 00817 for (CORBA::ULong i = 0; i < peers->length(); ++i) { 00818 const iter_t iter = locators_.find(peers[i]); 00819 if (iter != locators_.end() && iter->second.requires_inline_qos_) { 00820 return true; 00821 } 00822 } 00823 return false; 00824 } 00825 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_ack_nacks | ( | RtpsReaderMap::iterator | rr, | |
bool | finalFlag = false | |||
) | [private] |
Definition at line 1200 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, bitmap_num_longs(), OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), OpenDDS::DCPS::gen_find_size(), generate_nack_frags(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), recv_strategy_, send_strategy_, OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::Transport_debug_level, and OpenDDS::DCPS::SequenceNumber::ZERO().
Referenced by send_final_acks(), and send_heartbeat_replies().
01201 { 01202 using namespace OpenDDS::RTPS; 01203 01204 WriterInfoMap& writers = rr->second.remote_writers_; 01205 for (WriterInfoMap::iterator wi = writers.begin(); wi != writers.end(); 01206 ++wi) { 01207 01208 // if we have some negative acknowledgments, we'll ask for a reply 01209 DisjointSequence& recvd = wi->second.recvd_; 01210 const bool nack = wi->second.should_nack() || 01211 rr->second.nack_durable(wi->second); 01212 bool final = finalFlag || !nack; 01213 01214 if (wi->second.ack_pending_ || nack || finalFlag) { 01215 const bool prev_ack_pending = wi->second.ack_pending_; 01216 wi->second.ack_pending_ = false; 01217 01218 SequenceNumber ack; 01219 CORBA::ULong num_bits = 1; 01220 LongSeq8 bitmap; 01221 bitmap.length(1); 01222 bitmap[0] = 0; 01223 01224 const SequenceNumber& hb_low = wi->second.hb_range_.first; 01225 const SequenceNumber& hb_high = wi->second.hb_range_.second; 01226 const SequenceNumber::Value hb_low_val = hb_low.getValue(), 01227 hb_high_val = hb_high.getValue(); 01228 01229 if (recvd.empty()) { 01230 // Nack the entire heartbeat range. Only reached when durable. 01231 ack = hb_low; 01232 bitmap.length(bitmap_num_longs(ack, hb_high)); 01233 const CORBA::ULong idx = (hb_high_val > hb_low_val + 255) 01234 ? 255 01235 : CORBA::ULong(hb_high_val - hb_low_val); 01236 DisjointSequence::fill_bitmap_range(0, idx, 01237 bitmap.get_buffer(), 01238 bitmap.length(), num_bits); 01239 } else if (((prev_ack_pending && !nack) || rr->second.nack_durable(wi->second)) && recvd.low() > hb_low) { 01240 // Nack the range between the heartbeat low and the recvd low. 01241 ack = hb_low; 01242 const SequenceNumber& rec_low = recvd.low(); 01243 const SequenceNumber::Value rec_low_val = rec_low.getValue(); 01244 bitmap.length(bitmap_num_longs(ack, rec_low)); 01245 const CORBA::ULong idx = (rec_low_val > hb_low_val + 255) 01246 ? 255 01247 : CORBA::ULong(rec_low_val - hb_low_val); 01248 DisjointSequence::fill_bitmap_range(0, idx, 01249 bitmap.get_buffer(), 01250 bitmap.length(), num_bits); 01251 01252 } else { 01253 ack = ++SequenceNumber(recvd.cumulative_ack()); 01254 if (recvd.low().getValue() > 1) { 01255 // since the "ack" really is cumulative, we need to make 01256 // sure that a lower discontinuity is not possible later 01257 recvd.insert(SequenceRange(SequenceNumber::ZERO(), recvd.low())); 01258 } 01259 01260 if (recvd.disjoint()) { 01261 bitmap.length(bitmap_num_longs(ack, recvd.last_ack().previous())); 01262 recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(), 01263 num_bits, true); 01264 } 01265 } 01266 01267 const SequenceNumber::Value ack_val = ack.getValue(); 01268 01269 if (!recvd.empty() && hb_high > recvd.high()) { 01270 const SequenceNumber eff_high = 01271 (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255); 01272 const SequenceNumber::Value eff_high_val = eff_high.getValue(); 01273 // Nack the range between the received high and the effective high. 01274 const CORBA::ULong old_len = bitmap.length(), 01275 new_len = bitmap_num_longs(ack, eff_high); 01276 if (new_len > old_len) { 01277 bitmap.length(new_len); 01278 for (CORBA::ULong i = old_len; i < new_len; ++i) { 01279 bitmap[i] = 0; 01280 } 01281 } 01282 const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val), 01283 idx_recv_high = recvd.disjoint() ? 01284 CORBA::ULong(recvd.high().getValue() - ack_val) : 0; 01285 DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high, 01286 bitmap.get_buffer(), new_len, 01287 num_bits); 01288 } 01289 01290 // If the receive strategy is holding any fragments, those should 01291 // not be "nacked" in the ACKNACK reply. They will be accounted for 01292 // in the NACK_FRAG(s) instead. 01293 bool frags_modified = 01294 recv_strategy_->remove_frags_from_bitmap(bitmap.get_buffer(), 01295 num_bits, ack, wi->first); 01296 if (frags_modified && !final) { // change to final if bitmap is empty 01297 final = true; 01298 for (CORBA::ULong i = 0; i < bitmap.length(); ++i) { 01299 if ((i + 1) * 32 <= num_bits) { 01300 if (bitmap[i]) { 01301 final = false; 01302 break; 01303 } 01304 } else { 01305 if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) { 01306 final = false; 01307 break; 01308 } 01309 } 01310 } 01311 } 01312 01313 AckNackSubmessage acknack = { 01314 {ACKNACK, 01315 CORBA::Octet(1 /*FLAG_E*/ | (final ? 2 /*FLAG_F*/ : 0)), 01316 0 /*length*/}, 01317 rr->first.entityId, 01318 wi->first.entityId, 01319 { // SequenceNumberSet: acking bitmapBase - 1 01320 {ack.getHigh(), ack.getLow()}, 01321 num_bits, bitmap 01322 }, 01323 {++wi->second.acknack_count_} 01324 }; 01325 01326 size_t size = 0, padding = 0; 01327 gen_find_size(acknack, size, padding); 01328 acknack.smHeader.submessageLength = 01329 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ; 01330 InfoDestinationSubmessage info_dst = { 01331 {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ}, 01332 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 01333 }; 01334 gen_find_size(info_dst, size, padding); 01335 01336 OPENDDS_VECTOR(NackFragSubmessage) nack_frags; 01337 size += generate_nack_frags(nack_frags, wi->second, wi->first); 01338 01339 ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators? 01340 // byte swapping is handled in the operator<<() implementation 01341 Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR); 01342 std::memcpy(info_dst.guidPrefix, wi->first.guidPrefix, 01343 sizeof(GuidPrefix_t)); 01344 ser << info_dst; 01345 // Interoperability note: we used to insert INFO_REPLY submessage here, but 01346 // testing indicated that other DDS implementations didn't accept it. 01347 ser << acknack; 01348 for (size_t i = 0; i < nack_frags.size(); ++i) { 01349 nack_frags[i].readerId = rr->first.entityId; 01350 nack_frags[i].writerId = wi->first.entityId; 01351 ser << nack_frags[i]; // always 4-byte aligned 01352 } 01353 01354 if (!locators_.count(wi->first)) { 01355 if (Transport_debug_level) { 01356 const GuidConverter conv(wi->first); 01357 ACE_DEBUG((LM_ERROR, 01358 "(%P|%t) RtpsUdpDataLink::send_heartbeat_replies() - " 01359 "no locator for remote %C\n", OPENDDS_STRING(conv).c_str())); 01360 } 01361 } else { 01362 send_strategy_->send_rtps_control(mb_acknack, 01363 locators_[wi->first].addr_); 01364 } 01365 } 01366 } 01367 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_durability_gaps | ( | const RepoId & | writer, | |
const RepoId & | reader, | |||
const DisjointSequence & | gaps | |||
) | [private] |
Definition at line 2165 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, get_locator(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, marshal_gaps(), send_strategy_, and OpenDDS::RTPS::SMHDR_SZ.
Referenced by received().
02168 { 02169 ACE_Message_Block mb(RTPS::INFO_DST_SZ + RTPS::SMHDR_SZ); 02170 Serializer ser(&mb, false, Serializer::ALIGN_CDR); 02171 RTPS::InfoDestinationSubmessage info_dst = { 02172 {RTPS::INFO_DST, 1 /*FLAG_E*/, RTPS::INFO_DST_SZ}, 02173 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 02174 }; 02175 std::memcpy(info_dst.guidPrefix, reader.guidPrefix, sizeof(GuidPrefix_t)); 02176 ser << info_dst; 02177 mb.cont(marshal_gaps(writer, reader, gaps)); 02178 send_strategy_->send_rtps_control(mb, get_locator(reader)); 02179 mb.cont()->release(); 02180 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_final_acks | ( | const RepoId & | readerid | ) | [virtual] |
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 2553 of file RtpsUdpDataLink.cpp.
References readers_, and send_ack_nacks().
02554 { 02555 RtpsReaderMap::iterator rr = readers_.find (readerid); 02556 if (rr != readers_.end ()) { 02557 send_ack_nacks (rr, true); 02558 } 02559 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeat_replies | ( | ) | [private] |
Definition at line 1370 of file RtpsUdpDataLink.cpp.
References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::gen_find_size(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, interesting_ack_nacks_, lock_, readers_, send_ack_nacks(), send_strategy_, and OpenDDS::RTPS::SMHDR_SZ.
01371 { 01372 using namespace OpenDDS::RTPS; 01373 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01374 01375 for (InterestingAckNackSetType::const_iterator pos = interesting_ack_nacks_.begin(), 01376 limit = interesting_ack_nacks_.end(); 01377 pos != limit; 01378 ++pos) { 01379 01380 SequenceNumber ack; 01381 LongSeq8 bitmap; 01382 bitmap.length(0); 01383 01384 AckNackSubmessage acknack = { 01385 {ACKNACK, 01386 CORBA::Octet(1 /*FLAG_E*/ | 2 /*FLAG_F*/), 01387 0 /*length*/}, 01388 pos->readerid.entityId, 01389 pos->writerid.entityId, 01390 { // SequenceNumberSet: acking bitmapBase - 1 01391 {ack.getHigh(), ack.getLow()}, 01392 0 /* num_bits */, bitmap 01393 }, 01394 {0 /* acknack count */} 01395 }; 01396 01397 size_t size = 0, padding = 0; 01398 gen_find_size(acknack, size, padding); 01399 acknack.smHeader.submessageLength = 01400 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ; 01401 InfoDestinationSubmessage info_dst = { 01402 {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ}, 01403 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} 01404 }; 01405 gen_find_size(info_dst, size, padding); 01406 01407 ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators? 01408 // byte swapping is handled in the operator<<() implementation 01409 Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR); 01410 std::memcpy(info_dst.guidPrefix, pos->writerid.guidPrefix, 01411 sizeof(GuidPrefix_t)); 01412 ser << info_dst; 01413 // Interoperability note: we used to insert INFO_REPLY submessage here, but 01414 // testing indicated that other DDS implementations didn't accept it. 01415 ser << acknack; 01416 01417 send_strategy_->send_rtps_control(mb_acknack, pos->writer_address); 01418 } 01419 interesting_ack_nacks_.clear(); 01420 01421 for (RtpsReaderMap::iterator rr = readers_.begin(); rr != readers_.end(); 01422 ++rr) { 01423 send_ack_nacks (rr); 01424 } 01425 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats | ( | ) | [private] |
Definition at line 2183 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, config_, OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::disable(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpInst::durable_data_timeout_, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::HEARTBEAT, heartbeat_, heartbeat_counts_, OpenDDS::DCPS::RtpsUdpInst::heartbeat_period_, OpenDDS::RTPS::HEARTBEAT_SZ, interesting_readers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, lock_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_SET(), OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::DiscoveryListener::reader_does_not_exist(), reader_no_longer_exists_lock_, send_strategy_, OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::Transport_debug_level, VDBG_LVL, and writers_.
02184 { 02185 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 02186 ACE_GUARD(ACE_Thread_Mutex, c, reader_no_longer_exists_lock_); 02187 02188 if (writers_.empty() && interesting_readers_.empty()) { 02189 heartbeat_.disable(); 02190 } 02191 02192 using namespace OpenDDS::RTPS; 02193 OPENDDS_VECTOR(HeartBeatSubmessage) subm; 02194 OPENDDS_SET(ACE_INET_Addr) recipients; 02195 OPENDDS_VECTOR(TransportQueueElement*) pendingCallbacks; 02196 const ACE_Time_Value now = ACE_OS::gettimeofday(); 02197 02198 RepoIdSet writers_to_advertise; 02199 02200 const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config_->heartbeat_period_; 02201 const ACE_Time_Value tv3 = ACE_OS::gettimeofday() - 3 * config_->heartbeat_period_; 02202 for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(), 02203 limit = interesting_readers_.end(); 02204 pos != limit; 02205 ++pos) { 02206 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST || 02207 (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) { 02208 recipients.insert(pos->second.address); 02209 writers_to_advertise.insert(pos->second.localid); 02210 } 02211 if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) { 02212 CallbackType callback(pos->first, pos->second); 02213 readerDoesNotExistCallbacks_.push_back(callback); 02214 pos->second.status = InterestingRemote::DOES_NOT_EXIST; 02215 } 02216 } 02217 02218 typedef RtpsWriterMap::iterator rw_iter; 02219 for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) { 02220 const bool has_data = !rw->second.send_buff_.is_nil() 02221 && !rw->second.send_buff_->empty(); 02222 bool final = true, has_durable_data = false; 02223 SequenceNumber durable_max; 02224 02225 typedef ReaderInfoMap::iterator ri_iter; 02226 const ri_iter end = rw->second.remote_readers_.end(); 02227 for (ri_iter ri = rw->second.remote_readers_.begin(); ri != end; ++ri) { 02228 if ((has_data || !ri->second.handshake_done_) 02229 && locators_.count(ri->first)) { 02230 recipients.insert(locators_[ri->first].addr_); 02231 if (final && !ri->second.handshake_done_) { 02232 final = false; 02233 } 02234 } 02235 if (!ri->second.durable_data_.empty()) { 02236 const ACE_Time_Value expiration = 02237 ri->second.durable_timestamp_ + config_->durable_data_timeout_; 02238 if (now > expiration) { 02239 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator 02240 dd_iter; 02241 for (dd_iter it = ri->second.durable_data_.begin(); 02242 it != ri->second.durable_data_.end(); ++it) { 02243 pendingCallbacks.push_back(it->second); 02244 } 02245 ri->second.durable_data_.clear(); 02246 if (Transport_debug_level > 3) { 02247 const GuidConverter gw(rw->first), gr(ri->first); 02248 VDBG_LVL((LM_INFO, "(%P|%t) RtpsUdpDataLink::send_heartbeats - " 02249 "removed expired durable data for %C -> %C\n", 02250 OPENDDS_STRING(gw).c_str(), OPENDDS_STRING(gr).c_str()), 3); 02251 } 02252 } else { 02253 has_durable_data = true; 02254 if (ri->second.durable_data_.rbegin()->first > durable_max) { 02255 durable_max = ri->second.durable_data_.rbegin()->first; 02256 } 02257 if (locators_.count(ri->first)) { 02258 recipients.insert(locators_[ri->first].addr_); 02259 } 02260 } 02261 } 02262 } 02263 02264 if (!rw->second.elems_not_acked_.empty()) { 02265 final = false; 02266 } 02267 02268 if (writers_to_advertise.count(rw->first)) { 02269 final = false; 02270 writers_to_advertise.erase(rw->first); 02271 } 02272 02273 if (final && !has_data && !has_durable_data) { 02274 continue; 02275 } 02276 02277 const SequenceNumber firstSN = (rw->second.durable_ || !has_data) 02278 ? 1 : rw->second.send_buff_->low(), 02279 lastSN = std::max(durable_max, 02280 has_data ? rw->second.send_buff_->high() : 1); 02281 02282 const HeartBeatSubmessage hb = { 02283 {HEARTBEAT, 02284 CORBA::Octet(1 /*FLAG_E*/ | (final ? 2 /*FLAG_F*/ : 0)), 02285 HEARTBEAT_SZ}, 02286 ENTITYID_UNKNOWN, // any matched reader may be interested in this 02287 rw->first.entityId, 02288 {firstSN.getHigh(), firstSN.getLow()}, 02289 {lastSN.getHigh(), lastSN.getLow()}, 02290 {++heartbeat_counts_[rw->first]} 02291 }; 02292 subm.push_back(hb); 02293 } 02294 02295 for (RepoIdSet::const_iterator pos = writers_to_advertise.begin(), 02296 limit = writers_to_advertise.end(); 02297 pos != limit; 02298 ++pos) { 02299 const SequenceNumber SN = 1; 02300 const HeartBeatSubmessage hb = { 02301 {HEARTBEAT, 02302 CORBA::Octet(1 /*FLAG_E*/), 02303 HEARTBEAT_SZ}, 02304 ENTITYID_UNKNOWN, // any matched reader may be interested in this 02305 pos->entityId, 02306 {SN.getHigh(), SN.getLow()}, 02307 {SN.getHigh(), SN.getLow()}, 02308 {++heartbeat_counts_[*pos]} 02309 }; 02310 subm.push_back(hb); 02311 } 02312 02313 if (!subm.empty()) { 02314 ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * subm.size()); //FUTURE: allocators? 02315 // byte swapping is handled in the operator<<() implementation 02316 Serializer ser(&mb, false, Serializer::ALIGN_CDR); 02317 bool send_ok = true; 02318 for (size_t i = 0; i < subm.size(); ++i) { 02319 if (!(ser << subm[i])) { 02320 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats() - " 02321 "failed to serialize HEARTBEAT submessage %B\n", i)); 02322 send_ok = false; 02323 break; 02324 } 02325 } 02326 if (send_ok) { 02327 send_strategy_->send_rtps_control(mb, recipients); 02328 } 02329 } 02330 c.release(); 02331 g.release(); 02332 02333 while(true) { 02334 c.acquire(); 02335 if (readerDoesNotExistCallbacks_.empty()) { 02336 break; 02337 } 02338 OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks_.begin(); 02339 const RepoId& rid = iter->first; 02340 const InterestingRemote& remote = iter->second; 02341 readerDoesNotExistCallbacks_.erase(iter); 02342 c.release(); 02343 remote.listener->reader_does_not_exist(rid, remote.localid); 02344 } 02345 02346 for (size_t i = 0; i < pendingCallbacks.size(); ++i) { 02347 pendingCallbacks[i]->data_dropped(); 02348 } 02349 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual | ( | const TransportSendControlElement * | tsce | ) | [private] |
Definition at line 2384 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::Serializer::ALIGN_CDR, best_effort_heartbeat_count_, config_, OpenDDS::DCPS::RtpsUdpInst::durable_data_timeout_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, get_locators(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::HEARTBEAT, heartbeat_counts_, OpenDDS::RTPS::HEARTBEAT_SZ, OPENDDS_SET(), OpenDDS::DCPS::TransportSendControlElement::publication_id(), send_strategy_, OpenDDS::DCPS::TransportSendControlElement::sequence(), OpenDDS::RTPS::SMHDR_SZ, and writers_.
Referenced by customize_queue_element().
02385 { 02386 using namespace OpenDDS::RTPS; 02387 02388 const RepoId pub_id = tsce->publication_id(); 02389 02390 // Populate the recipients. 02391 OPENDDS_SET(ACE_INET_Addr) recipients; 02392 get_locators (pub_id, recipients); 02393 if (recipients.empty()) { 02394 return; 02395 } 02396 02397 // Populate the sequence numbers and counter. 02398 02399 SequenceNumber firstSN, lastSN; 02400 CORBA::Long counter; 02401 RtpsWriterMap::iterator pos = writers_.find (pub_id); 02402 if (pos != writers_.end ()) { 02403 // Reliable. 02404 const bool has_data = !pos->second.send_buff_.is_nil() && !pos->second.send_buff_->empty(); 02405 SequenceNumber durable_max; 02406 const ACE_Time_Value now = ACE_OS::gettimeofday(); 02407 for (ReaderInfoMap::const_iterator ri = pos->second.remote_readers_.begin(), end = pos->second.remote_readers_.end(); 02408 ri != end; 02409 ++ri) { 02410 if (!ri->second.durable_data_.empty()) { 02411 const ACE_Time_Value expiration = ri->second.durable_timestamp_ + config_->durable_data_timeout_; 02412 if (now <= expiration && 02413 ri->second.durable_data_.rbegin()->first > durable_max) { 02414 durable_max = ri->second.durable_data_.rbegin()->first; 02415 } 02416 } 02417 } 02418 firstSN = (pos->second.durable_ || !has_data) ? 1 : pos->second.send_buff_->low(); 02419 lastSN = std::max(durable_max, has_data ? pos->second.send_buff_->high() : 1); 02420 counter = ++heartbeat_counts_[pos->first]; 02421 } else { 02422 // Unreliable. 02423 firstSN = 1; 02424 lastSN = tsce->sequence(); 02425 counter = ++this->best_effort_heartbeat_count_; 02426 } 02427 02428 const HeartBeatSubmessage hb = { 02429 {HEARTBEAT, 02430 CORBA::Octet(1 /*FLAG_E*/ | 2 /*FLAG_F*/ | 4 /*FLAG_L*/), 02431 HEARTBEAT_SZ}, 02432 ENTITYID_UNKNOWN, // any matched reader may be interested in this 02433 pub_id.entityId, 02434 {firstSN.getHigh(), firstSN.getLow()}, 02435 {lastSN.getHigh(), lastSN.getLow()}, 02436 {counter} 02437 }; 02438 02439 ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * 1); //FUTURE: allocators? 02440 // byte swapping is handled in the operator<<() implementation 02441 Serializer ser(&mb, false, Serializer::ALIGN_CDR); 02442 if ((ser << hb)) { 02443 send_strategy_->send_rtps_control(mb, recipients); 02444 } 02445 else { 02446 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats_manual() - " 02447 "failed to serialize HEARTBEAT submessage\n")); 02448 } 02449 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_i | ( | TransportQueueElement * | element, | |
bool | relink = true | |||
) | [private, virtual] |
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 434 of file RtpsUdpDataLink.cpp.
References lock_, and OpenDDS::DCPS::DataLink::send_i().
00435 { 00436 // Lock here to maintain the locking order: 00437 // RtpsUdpDataLink before RtpsUdpSendStrategy 00438 // which is required for resending due to nacks 00439 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00440 DataLink::send_i(element, relink); 00441 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies | ( | ) | [private] |
Definition at line 1833 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::heartbeat_high(), OpenDDS::DCPS::DisjointSequence::insert(), lock_, marshal_gaps(), OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_SET(), OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::SingleSendBuffer::resend_i(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, send_nackfrag_replies(), send_strategy_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::TransportSendBuffer::strategy_lock(), OpenDDS::DCPS::Transport_debug_level, and writers_.
01834 { 01835 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01836 // Reply from local DW to remote DR: GAP or DATA 01837 using namespace OpenDDS::RTPS; 01838 typedef RtpsWriterMap::iterator rw_iter; 01839 for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) { 01840 01841 // consolidate requests from N readers 01842 OPENDDS_SET(ACE_INET_Addr) recipients; 01843 DisjointSequence requests; 01844 RtpsWriter& writer = rw->second; 01845 01846 //track if any messages have been fully acked by all readers 01847 SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE; 01848 01849 typedef ReaderInfoMap::iterator ri_iter; 01850 const ri_iter end = writer.remote_readers_.end(); 01851 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) { 01852 01853 if (ri->second.cur_cumulative_ack_ < all_readers_ack) { 01854 all_readers_ack = ri->second.cur_cumulative_ack_; 01855 } 01856 01857 for (size_t i = 0; i < ri->second.requested_changes_.size(); ++i) { 01858 const SequenceNumberSet& sn_state = ri->second.requested_changes_[i]; 01859 SequenceNumber base; 01860 base.setValue(sn_state.bitmapBase.high, sn_state.bitmapBase.low); 01861 if (sn_state.numBits == 1 && !(sn_state.bitmap[0] & 1) 01862 && base == writer.heartbeat_high(ri->second)) { 01863 // Since there is an entry in requested_changes_, the DR must have 01864 // sent a non-final AckNack. If the base value is the high end of 01865 // the heartbeat range, treat it as a request for that seq#. 01866 if (!writer.send_buff_.is_nil() && writer.send_buff_->contains(base)) { 01867 requests.insert(base); 01868 } 01869 } else { 01870 requests.insert(base, sn_state.numBits, sn_state.bitmap.get_buffer()); 01871 } 01872 } 01873 01874 if (ri->second.requested_changes_.size()) { 01875 if (locators_.count(ri->first)) { 01876 recipients.insert(locators_[ri->first].addr_); 01877 if (Transport_debug_level > 5) { 01878 const GuidConverter local_conv(rw->first), remote_conv(ri->first); 01879 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies " 01880 "local %C remote %C requested resend\n", 01881 OPENDDS_STRING(local_conv).c_str(), 01882 OPENDDS_STRING(remote_conv).c_str())); 01883 } 01884 } 01885 ri->second.requested_changes_.clear(); 01886 } 01887 } 01888 01889 DisjointSequence gaps; 01890 if (!requests.empty()) { 01891 if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) { 01892 gaps = requests; 01893 } else { 01894 OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges(); 01895 SingleSendBuffer& sb = *writer.send_buff_; 01896 ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock()); 01897 const RtpsUdpSendStrategy::OverrideToken ot = 01898 send_strategy_->override_destinations(recipients); 01899 for (size_t i = 0; i < ranges.size(); ++i) { 01900 if (Transport_debug_level > 5) { 01901 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies " 01902 "resend data %d-%d\n", int(ranges[i].first.getValue()), 01903 int(ranges[i].second.getValue()))); 01904 } 01905 sb.resend_i(ranges[i], &gaps); 01906 } 01907 } 01908 } 01909 01910 send_nackfrag_replies(writer, gaps, recipients); 01911 01912 if (!gaps.empty()) { 01913 if (Transport_debug_level > 5) { 01914 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies " 01915 "GAPs:")); 01916 gaps.dump(); 01917 } 01918 ACE_Message_Block* mb_gap = 01919 marshal_gaps(rw->first, GUID_UNKNOWN, gaps, writer.durable_); 01920 if (mb_gap) { 01921 send_strategy_->send_rtps_control(*mb_gap, recipients); 01922 mb_gap->release(); 01923 } 01924 } 01925 if (all_readers_ack == SequenceNumber::MAX_VALUE) { 01926 continue; 01927 } 01928 } 01929 }
void OpenDDS::DCPS::RtpsUdpDataLink::send_nackfrag_replies | ( | RtpsWriter & | writer, | |
DisjointSequence & | gaps, | |||
OPENDDS_SET(ACE_INET_Addr)& | gap_recipients | |||
) | [private] |
Definition at line 1932 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::OPENDDS_MAP(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, and send_strategy_.
Referenced by send_nack_replies().
01935 { 01936 typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo; 01937 OPENDDS_MAP(ACE_INET_Addr, FragmentInfo) requests; 01938 01939 typedef ReaderInfoMap::iterator ri_iter; 01940 const ri_iter end = writer.remote_readers_.end(); 01941 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) { 01942 01943 if (ri->second.requested_frags_.empty() || !locators_.count(ri->first)) { 01944 continue; 01945 } 01946 01947 const ACE_INET_Addr& remote_addr = locators_[ri->first].addr_; 01948 01949 typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet)::iterator rf_iter; 01950 const rf_iter rf_end = ri->second.requested_frags_.end(); 01951 for (rf_iter rf = ri->second.requested_frags_.begin(); rf != rf_end; ++rf) { 01952 01953 const SequenceNumber& seq = rf->first; 01954 if (writer.send_buff_->contains(seq)) { 01955 FragmentInfo& fi = requests[remote_addr]; 01956 fi[seq].insert(rf->second.bitmapBase.value, rf->second.numBits, 01957 rf->second.bitmap.get_buffer()); 01958 } else { 01959 gaps.insert(seq); 01960 gap_recipients.insert(remote_addr); 01961 } 01962 } 01963 ri->second.requested_frags_.clear(); 01964 } 01965 01966 typedef OPENDDS_MAP(ACE_INET_Addr, FragmentInfo)::iterator req_iter; 01967 for (req_iter req = requests.begin(); req != requests.end(); ++req) { 01968 const FragmentInfo& fi = req->second; 01969 01970 ACE_GUARD(TransportSendBuffer::LockType, guard, 01971 writer.send_buff_->strategy_lock()); 01972 const RtpsUdpSendStrategy::OverrideToken ot = 01973 send_strategy_->override_destinations(req->first); 01974 01975 for (FragmentInfo::const_iterator sn_iter = fi.begin(); 01976 sn_iter != fi.end(); ++sn_iter) { 01977 const SequenceNumber& seq = sn_iter->first; 01978 writer.send_buff_->resend_fragments_i(seq, sn_iter->second); 01979 } 01980 } 01981 }
ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::send_strategy | ( | RtpsUdpSendStrategy * | send_strategy | ) |
Definition at line 12 of file RtpsUdpDataLink.inl.
References send_strategy_.
00013 { 00014 send_strategy_ = send_strategy; 00015 }
void OpenDDS::DCPS::RtpsUdpDataLink::stop_i | ( | ) | [private, virtual] |
This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 527 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::cancel(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::disable(), heartbeat_, heartbeat_reply_, multicast_socket_, nack_reply_, and unicast_socket_.
Referenced by open().
00528 { 00529 nack_reply_.cancel(); 00530 heartbeat_reply_.cancel(); 00531 heartbeat_.disable(); 00532 unicast_socket_.close(); 00533 multicast_socket_.close(); 00534 }
ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket | ( | ) |
Definition at line 44 of file RtpsUdpDataLink.inl.
References unicast_socket_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().
00045 { 00046 return unicast_socket_; 00047 }
void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_reader | ( | const RepoId & | writerid, | |
const RepoId & | readerid | |||
) |
Definition at line 304 of file RtpsUdpDataLink.cpp.
References interesting_readers_, lock_, OPENDDS_VECTOR(), and reader_no_longer_exists_lock_.
00306 { 00307 OPENDDS_VECTOR(CallbackType) to_notify; 00308 { 00309 ACE_GUARD(ACE_Thread_Mutex, c, reader_no_longer_exists_lock_); 00310 to_notify.swap(readerDoesNotExistCallbacks_); 00311 } 00312 OPENDDS_VECTOR(CallbackType)::iterator iter = to_notify.begin(); 00313 while(iter != to_notify.end()) { 00314 const RepoId& rid = iter->first; 00315 const InterestingRemote& remote = iter->second; 00316 remote.listener->reader_does_not_exist(rid, remote.localid); 00317 ++iter; 00318 } 00319 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00320 for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid), 00321 limit = interesting_readers_.upper_bound(readerid); 00322 pos != limit; 00323 ) { 00324 if (pos->second.localid == writerid) { 00325 interesting_readers_.erase(pos++); 00326 } else { 00327 ++pos; 00328 } 00329 } 00330 }
void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_writer | ( | const RepoId & | readerid, | |
const RepoId & | writerid | |||
) |
Definition at line 348 of file RtpsUdpDataLink.cpp.
References interesting_writers_, lock_, OPENDDS_VECTOR(), and writer_no_longer_exists_lock_.
00350 { 00351 OPENDDS_VECTOR(CallbackType) to_notify; 00352 { 00353 ACE_GUARD(ACE_Thread_Mutex, c, writer_no_longer_exists_lock_); 00354 to_notify.swap(writerDoesNotExistCallbacks_); 00355 } 00356 OPENDDS_VECTOR(CallbackType)::iterator iter = to_notify.begin(); 00357 while(iter != to_notify.end()) { 00358 const RepoId& rid = iter->first; 00359 const InterestingRemote& remote = iter->second; 00360 remote.listener->writer_does_not_exist(rid, remote.localid); 00361 ++iter; 00362 } 00363 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00364 for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid), 00365 limit = interesting_writers_.upper_bound(writerid); 00366 pos != limit; 00367 ) { 00368 if (pos->second.localid == readerid) { 00369 interesting_writers_.erase(pos++); 00370 } else { 00371 ++pos; 00372 } 00373 } 00374 }
friend class ::DDS_TEST [friend] |
Definition at line 133 of file RtpsUdpDataLink.h.
CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::best_effort_heartbeat_count_ [private] |
Definition at line 143 of file RtpsUdpDataLink.h.
Referenced by check_heartbeats(), config(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::enable(), open(), send_heartbeats(), and send_heartbeats_manual().
bool OpenDDS::DCPS::RtpsUdpDataLink::force_inline_qos_ = false [static, private] |
static member used by testing code to force inline qos
Definition at line 135 of file RtpsUdpDataLink.h.
Referenced by requires_inline_qos().
Referenced by associated(), register_for_reader(), send_heartbeats(), and stop_i().
HeartBeatCountMapType OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_counts_ [private] |
Definition at line 458 of file RtpsUdpDataLink.h.
Referenced by associated(), pre_stop_i(), register_for_reader(), release_reservations_i(), send_heartbeats(), and send_heartbeats_manual().
OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_reply_ [private] |
Referenced by received(), and stop_i().
OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat OpenDDS::DCPS::RtpsUdpDataLink::heartbeatchecker_ [private] |
Referenced by register_for_writer().
InterestingAckNackSetType OpenDDS::DCPS::RtpsUdpDataLink::interesting_ack_nacks_ [private] |
Definition at line 481 of file RtpsUdpDataLink.h.
Referenced by received(), and send_heartbeat_replies().
InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_readers_ [private] |
Definition at line 446 of file RtpsUdpDataLink.h.
Referenced by received(), register_for_reader(), send_heartbeats(), and unregister_for_reader().
InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_writers_ [private] |
Definition at line 447 of file RtpsUdpDataLink.h.
Referenced by check_heartbeats(), received(), register_for_writer(), and unregister_for_writer().
ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::lock_ [mutable, private] |
lock_ protects data structures accessed by both the transport's thread (TransportReactorTask) and an external thread which is responsible for adding/removing associations from the DataLink.
Definition at line 268 of file RtpsUdpDataLink.h.
Referenced by add_locator(), associated(), check_heartbeats(), customize_queue_element(), do_remove_sample(), pre_stop_i(), received(), register_for_reader(), register_for_writer(), release_reservations_i(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::retain_all(), send_heartbeat_replies(), send_heartbeats(), send_i(), send_nack_replies(), unregister_for_reader(), and unregister_for_writer().
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer OpenDDS::DCPS::RtpsUdpDataLink::multi_buff_ [private] |
Referenced by open().
ACE_SOCK_Dgram_Mcast OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket_ [private] |
Definition at line 161 of file RtpsUdpDataLink.h.
Referenced by multicast_socket(), open(), and stop_i().
Referenced by received(), and stop_i().
Definition at line 144 of file RtpsUdpDataLink.h.
Referenced by get_reactor(), and reactor_is_shut_down().
RtpsReaderIndex OpenDDS::DCPS::RtpsUdpDataLink::reader_index_ [private] |
Definition at line 261 of file RtpsUdpDataLink.h.
Referenced by associated(), and release_reservations_i().
ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::reader_no_longer_exists_lock_ [mutable, private] |
Definition at line 449 of file RtpsUdpDataLink.h.
Referenced by send_heartbeats(), and unregister_for_reader().
RtpsReaderMap OpenDDS::DCPS::RtpsUdpDataLink::readers_ [private] |
Definition at line 257 of file RtpsUdpDataLink.h.
Referenced by associated(), received(), release_reservations_i(), send_final_acks(), and send_heartbeat_replies().
Definition at line 147 of file RtpsUdpDataLink.h.
Referenced by generate_nack_frags(), open(), process_data_i(), process_heartbeat_i(), receive_strategy(), and send_ack_nacks().
The transport send strategy object for this DataLink.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 146 of file RtpsUdpDataLink.h.
Referenced by durability_resend(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert(), open(), send_ack_nacks(), send_durability_gaps(), send_heartbeat_replies(), send_heartbeats(), send_heartbeats_manual(), send_nack_replies(), send_nackfrag_replies(), and send_strategy().
ACE_SOCK_Dgram OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket_ [private] |
Definition at line 160 of file RtpsUdpDataLink.h.
Referenced by open(), stop_i(), and unicast_socket().
ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::writer_no_longer_exists_lock_ [mutable, private] |
Definition at line 449 of file RtpsUdpDataLink.h.
Referenced by check_heartbeats(), and unregister_for_writer().
RtpsWriterMap OpenDDS::DCPS::RtpsUdpDataLink::writers_ [private] |
Definition at line 223 of file RtpsUdpDataLink.h.
Referenced by add_delayed_notification(), add_gap_submsg(), associated(), check_handshake_complete(), customize_queue_element(), do_remove_sample(), end_historic_samples(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert(), marshal_gaps(), pre_stop_i(), process_acked_by_all_i(), received(), release_reservations_i(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::retain_all(), send_heartbeats(), send_heartbeats_manual(), and send_nack_replies().