OpenDDS::DCPS::RtpsUdpDataLink Class Reference

#include <RtpsUdpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpDataLink:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 RtpsUdpDataLink (RtpsUdpTransport *transport, const GuidPrefix_t &local_prefix, RtpsUdpInst *config, TransportReactorTask *reactor_task)
void send_strategy (RtpsUdpSendStrategy *send_strategy)
void receive_strategy (RtpsUdpReceiveStrategy *recv_strategy)
bool add_delayed_notification (TransportQueueElement *element)
void do_remove_sample (const RepoId &pub_id, const TransportQueueElement::MatchCriteria &criteria)
RtpsUdpInstconfig ()
ACE_Reactor * get_reactor ()
bool reactor_is_shut_down ()
ACE_SOCK_Dgram & unicast_socket ()
ACE_SOCK_Dgram_Mcast & multicast_socket ()
bool open (const ACE_SOCK_Dgram &unicast_socket)
void received (const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix)
void received (const RTPS::GapSubmessage &gap, const GuidPrefix_t &src_prefix)
void received (const RTPS::HeartBeatSubmessage &heartbeat, const GuidPrefix_t &src_prefix)
void received (const RTPS::HeartBeatFragSubmessage &hb_frag, const GuidPrefix_t &src_prefix)
void received (const RTPS::AckNackSubmessage &acknack, const GuidPrefix_t &src_prefix)
void received (const RTPS::NackFragSubmessage &nackfrag, const GuidPrefix_t &src_prefix)
const GuidPrefix_tlocal_prefix () const
void add_locator (const RepoId &remote_id, const ACE_INET_Addr &address, bool requires_inline_qos)
void get_locators (const RepoId &local_id, OPENDDS_SET(ACE_INET_Addr)&addrs) const
ACE_INET_Addr get_locator (const RepoId &remote_id) const
void associated (const RepoId &local, const RepoId &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable)
bool check_handshake_complete (const RepoId &local, const RepoId &remote)
void register_for_reader (const RepoId &writerid, const RepoId &readerid, const ACE_INET_Addr &address, OpenDDS::DCPS::DiscoveryListener *listener)
void unregister_for_reader (const RepoId &writerid, const RepoId &readerid)
void register_for_writer (const RepoId &readerid, const RepoId &writerid, const ACE_INET_Addr &address, OpenDDS::DCPS::DiscoveryListener *listener)
void unregister_for_writer (const RepoId &readerid, const RepoId &writerid)
virtual void pre_stop_i ()
virtual void send_final_acks (const RepoId &readerid)

Private Types

typedef void(RtpsUdpDataLink::*) PMF ()
typedef std::pair< RepoId,
InterestingRemote
CallbackType

Private Member Functions

virtual void stop_i ()
virtual void send_i (TransportQueueElement *element, bool relink=true)
virtual TransportQueueElementcustomize_queue_element (TransportQueueElement *element)
virtual void release_remote_i (const RepoId &remote_id)
virtual void release_reservations_i (const RepoId &remote_id, const RepoId &local_id)
bool requires_inline_qos (const PublicationId &pub_id)
typedef OPENDDS_MAP_CMP (RepoId, OPENDDS_VECTOR(RepoId), GUID_tKeyLessThan) DestToEntityMap
void add_gap_submsg (RTPS::SubmessageSeq &msg, const TransportQueueElement &tqe, const DestToEntityMap &dtem)
 OPENDDS_MAP_CMP (RepoId, RemoteInfo, GUID_tKeyLessThan) locators_
typedef OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) ReaderInfoMap
typedef OPENDDS_MAP_CMP (RepoId, RtpsWriter, GUID_tKeyLessThan) RtpsWriterMap
void end_historic_samples (RtpsWriterMap::iterator writer, const DataSampleHeader &header, ACE_Message_Block *body)
typedef OPENDDS_MAP_CMP (RepoId, WriterInfo, GUID_tKeyLessThan) WriterInfoMap
typedef OPENDDS_MAP_CMP (RepoId, RtpsReader, GUID_tKeyLessThan) RtpsReaderMap
typedef OPENDDS_MULTIMAP_CMP (RepoId, RtpsReaderMap::iterator, GUID_tKeyLessThan) RtpsReaderIndex
void deliver_held_data (const RepoId &readerId, WriterInfo &info, bool durable)
size_t generate_nack_frags (OPENDDS_VECTOR(RTPS::NackFragSubmessage)&nack_frags, WriterInfo &wi, const RepoId &pub_id)
bool process_heartbeat_i (const RTPS::HeartBeatSubmessage &heartbeat, const RepoId &src, RtpsReaderMap::value_type &rr)
bool process_hb_frag_i (const RTPS::HeartBeatFragSubmessage &hb_frag, const RepoId &src, RtpsReaderMap::value_type &rr)
bool process_gap_i (const RTPS::GapSubmessage &gap, const RepoId &src, RtpsReaderMap::value_type &rr)
bool process_data_i (const RTPS::DataSubmessage &data, const RepoId &src, RtpsReaderMap::value_type &rr)
void durability_resend (TransportQueueElement *element)
void send_durability_gaps (const RepoId &writer, const RepoId &reader, const DisjointSequence &gaps)
ACE_Message_Block * marshal_gaps (const RepoId &writer, const RepoId &reader, const DisjointSequence &gaps, bool durable=false)
void send_nackfrag_replies (RtpsWriter &writer, DisjointSequence &gaps, OPENDDS_SET(ACE_INET_Addr)&gap_recipients)
template<typename T, typename FN>
void datareader_dispatch (const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
void send_nack_replies ()
void process_acked_by_all_i (ACE_Guard< ACE_Thread_Mutex > &g, const RepoId &pub_id)
void send_heartbeats ()
void check_heartbeats ()
void send_heartbeats_manual (const TransportSendControlElement *tsce)
void send_heartbeat_replies ()
typedef OPENDDS_MULTIMAP_CMP (RepoId, InterestingRemote, DCPS::GUID_tKeyLessThan) InterestingRemoteMapType
 OPENDDS_VECTOR (CallbackType) writerDoesNotExistCallbacks_
 OPENDDS_VECTOR (CallbackType) readerDoesNotExistCallbacks_
typedef OPENDDS_MAP_CMP (RepoId, CORBA::Long, DCPS::GUID_tKeyLessThan) HeartBeatCountMapType
typedef OPENDDS_SET (InterestingAckNack) InterestingAckNackSetType
void send_ack_nacks (RtpsReaderMap::iterator rr, bool finalFlag=false)

Static Private Member Functions

static void extend_bitmap_range (RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent)

Private Attributes

RtpsUdpInstconfig_
TransportReactorTask_rch reactor_task_
RtpsUdpSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink.
RtpsUdpReceiveStrategy_rch recv_strategy_
GuidPrefix_t local_prefix_
ACE_SOCK_Dgram unicast_socket_
ACE_SOCK_Dgram_Mcast multicast_socket_
RtpsCustomizedElementAllocator rtps_customized_element_allocator_
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
RtpsWriterMap writers_
RtpsReaderMap readers_
RtpsReaderIndex reader_index_
ACE_Thread_Mutex lock_
CORBA::Long best_effort_heartbeat_count_
OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay nack_reply_
OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay heartbeat_reply_
OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat heartbeat_
OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat heartbeatchecker_
InterestingRemoteMapType interesting_readers_
InterestingRemoteMapType interesting_writers_
ACE_Thread_Mutex writer_no_longer_exists_lock_
ACE_Thread_Mutex reader_no_longer_exists_lock_
HeartBeatCountMapType heartbeat_counts_
InterestingAckNackSetType interesting_ack_nacks_

Static Private Attributes

static bool force_inline_qos_ = false
 static member used by testing code to force inline qos

Friends

class ::DDS_TEST

Classes

struct  HeartBeat
struct  InterestingAckNack
struct  InterestingRemote
 Data structure representing an "interesting" remote entity for static discovery. More...
struct  MultiSendBuffer
struct  ReaderInfo
struct  RemoteInfo
struct  RtpsReader
struct  RtpsWriter
struct  TimedDelay
struct  WriterInfo

Detailed Description

Definition at line 44 of file RtpsUdpDataLink.h.


Member Typedef Documentation

typedef std::pair<RepoId, InterestingRemote> OpenDDS::DCPS::RtpsUdpDataLink::CallbackType [private]

Definition at line 452 of file RtpsUdpDataLink.h.

typedef void(RtpsUdpDataLink::*) OpenDDS::DCPS::RtpsUdpDataLink::PMF() [private]

Definition at line 350 of file RtpsUdpDataLink.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::RtpsUdpDataLink::RtpsUdpDataLink ( RtpsUdpTransport transport,
const GuidPrefix_t local_prefix,
RtpsUdpInst config,
TransportReactorTask reactor_task 
)

Definition at line 51 of file RtpsUdpDataLink.cpp.

References local_prefix_.

00055   : DataLink(transport, // 3 data link "attributes", below, are unused
00056              0,         // priority
00057              false,     // is_loopback
00058              false),    // is_active
00059     config_(config),
00060     reactor_task_(reactor_task, false),
00061     rtps_customized_element_allocator_(40, sizeof(RtpsCustomizedElement)),
00062     multi_buff_(this, config->nak_depth_),
00063     best_effort_heartbeat_count_(0),
00064     nack_reply_(this, &RtpsUdpDataLink::send_nack_replies,
00065                 config->nak_response_delay_),
00066     heartbeat_reply_(this, &RtpsUdpDataLink::send_heartbeat_replies,
00067                      config->heartbeat_response_delay_),
00068   heartbeat_(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::send_heartbeats),
00069   heartbeatchecker_(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::check_heartbeats)
00070 {
00071   std::memcpy(local_prefix_, local_prefix, sizeof(GuidPrefix_t));
00072 }


Member Function Documentation

bool OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification ( TransportQueueElement element  ) 

Definition at line 75 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransportQueueElement::publication_id(), and writers_.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification().

00076 {
00077   RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
00078   if (iter != writers_.end()) {
00079 
00080     iter->second.add_elem_awaiting_ack(element);
00081     return true;
00082   }
00083   return false;
00084 }

void OpenDDS::DCPS::RtpsUdpDataLink::add_gap_submsg ( RTPS::SubmessageSeq msg,
const TransportQueueElement tqe,
const DestToEntityMap &  dtem 
) [private]

Definition at line 830 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::expected_, OpenDDS::RTPS::GAP, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::TransportQueueElement::subscription_id(), and writers_.

Referenced by customize_queue_element().

00833 {
00834   // These are the GAP submessages that we'll send directly in-line with the
00835   // DATA when we notice that the DataWriter has deliberately skipped seq #s.
00836   // There are other GAP submessages generated in response to reader ACKNACKS,
00837   // see send_nack_replies().
00838   using namespace OpenDDS::RTPS;
00839 
00840   const SequenceNumber seq = tqe.sequence();
00841   const RepoId pub = tqe.publication_id();
00842   if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || pub == GUID_UNKNOWN
00843       || tqe.subscription_id() != GUID_UNKNOWN) {
00844     return;
00845   }
00846 
00847   const RtpsWriterMap::iterator wi = writers_.find(pub);
00848   if (wi == writers_.end()) {
00849     return; // not a reliable writer, does not send GAPs
00850   }
00851 
00852   RtpsWriter& rw = wi->second;
00853 
00854   if (seq != rw.expected_) {
00855     SequenceNumber firstMissing = rw.expected_;
00856 
00857     // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
00858     // [gapStart, gapListBase) and those in the SNSet.
00859     const SequenceNumber_t gapStart = {firstMissing.getHigh(),
00860                                        firstMissing.getLow()},
00861                            gapListBase = {seq.getHigh(),
00862                                           seq.getLow()};
00863 
00864     // We are not going to enable any bits in the "bitmap" of the SNSet,
00865     // but the "numBits" and the bitmap.length must both be > 0.
00866     LongSeq8 bitmap;
00867     bitmap.length(1);
00868     bitmap[0] = 0;
00869 
00870     GapSubmessage gap = {
00871       {GAP, 1 /*FLAG_E*/, 0 /*length determined below*/},
00872       ENTITYID_UNKNOWN, // readerId: applies to all matched readers
00873       pub.entityId,
00874       gapStart,
00875       {gapListBase, 1, bitmap}
00876     };
00877 
00878     size_t size = 0, padding = 0;
00879     gen_find_size(gap, size, padding);
00880     gap.smHeader.submessageLength =
00881       static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
00882 
00883     if (!rw.durable_) {
00884       const CORBA::ULong i = msg.length();
00885       msg.length(i + 1);
00886       msg[i].gap_sm(gap);
00887     } else {
00888       InfoDestinationSubmessage idst = {
00889         {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ},
00890         {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
00891       };
00892       CORBA::ULong ml = msg.length();
00893 
00894       //Change the non-directed Gap into multiple directed gaps to prevent
00895       //delivering to currently undiscovered durable readers
00896       DestToEntityMap::const_iterator iter = dtem.begin();
00897       while (iter != dtem.end()) {
00898         std::memcpy(idst.guidPrefix, iter->first.guidPrefix, sizeof(GuidPrefix_t));
00899         msg.length(ml + 1);
00900         msg[ml++].info_dst_sm(idst);
00901 
00902         const OPENDDS_VECTOR(RepoId)& readers = iter->second;
00903         for (size_t i = 0; i < readers.size(); ++i) {
00904           gap.readerId = readers.at(i).entityId;
00905           msg.length(ml + 1);
00906           msg[ml++].gap_sm(gap);
00907         } //END iter over reader entity ids
00908         ++iter;
00909       } //END iter over reader GuidPrefix_t's
00910     }
00911   }
00912 }

void OpenDDS::DCPS::RtpsUdpDataLink::add_locator ( const RepoId remote_id,
const ACE_INET_Addr &  address,
bool  requires_inline_qos 
)

Definition at line 178 of file RtpsUdpDataLink.cpp.

References lock_.

00181 {
00182   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00183   locators_[remote_id] = RemoteInfo(address, requires_inline_qos);
00184 }

void OpenDDS::DCPS::RtpsUdpDataLink::associated ( const RepoId local,
const RepoId remote,
bool  local_reliable,
bool  remote_reliable,
bool  local_durable,
bool  remote_durable 
)

Definition at line 226 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::GuidConverter::entityKind(), heartbeat_, heartbeat_counts_, OpenDDS::DCPS::KIND_READER, OpenDDS::DCPS::KIND_WRITER, lock_, reader_index_, readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::schedule_enable(), and writers_.

00229 {
00230   if (!local_reliable) {
00231     return;
00232   }
00233 
00234   bool enable_heartbeat = false;
00235 
00236   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00237   const GuidConverter conv(local_id);
00238   const EntityKind kind = conv.entityKind();
00239   if (kind == KIND_WRITER && remote_reliable) {
00240     // Insert count if not already there.
00241     heartbeat_counts_.insert(HeartBeatCountMapType::value_type(local_id, 0));
00242     RtpsWriter& w = writers_[local_id];
00243     w.remote_readers_[remote_id].durable_ = remote_durable;
00244     w.durable_ = local_durable;
00245     enable_heartbeat = true;
00246 
00247   } else if (kind == KIND_READER) {
00248     RtpsReaderMap::iterator rr = readers_.find(local_id);
00249     if (rr == readers_.end()) {
00250       rr = readers_.insert(RtpsReaderMap::value_type(local_id, RtpsReader()))
00251         .first;
00252       rr->second.durable_ = local_durable;
00253     }
00254     rr->second.remote_writers_[remote_id];
00255     reader_index_.insert(RtpsReaderIndex::value_type(remote_id, rr));
00256   }
00257 
00258   g.release();
00259   if (enable_heartbeat) {
00260     heartbeat_.schedule_enable();
00261   }
00262 }

bool OpenDDS::DCPS::RtpsUdpDataLink::check_handshake_complete ( const RepoId local,
const RepoId remote 
)

Definition at line 265 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::GuidConverter::entityKind(), OpenDDS::DCPS::KIND_READER, OpenDDS::DCPS::KIND_WRITER, and writers_.

00267 {
00268   const GuidConverter conv(local_id);
00269   const EntityKind kind = conv.entityKind();
00270   if (kind == KIND_WRITER) {
00271     RtpsWriterMap::iterator rw = writers_.find(local_id);
00272     if (rw == writers_.end()) {
00273       return true; // not reliable, no handshaking
00274     }
00275     ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote_id);
00276     if (ri == rw->second.remote_readers_.end()) {
00277       return true; // not reliable, no handshaking
00278     }
00279     return ri->second.handshake_done_;
00280 
00281   } else if (kind == KIND_READER) {
00282     return true; // no handshaking for local reader
00283   }
00284   return false;
00285 }

void OpenDDS::DCPS::RtpsUdpDataLink::check_heartbeats (  )  [private]

Definition at line 2352 of file RtpsUdpDataLink.cpp.

References config_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::DCPS::RtpsUdpInst::heartbeat_period_, interesting_writers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, lock_, OPENDDS_VECTOR(), OpenDDS::DCPS::DiscoveryListener::writer_does_not_exist(), and writer_no_longer_exists_lock_.

02353 {
02354   // Have any interesting writers timed out?
02355   const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config_->heartbeat_period_;
02356   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02357   ACE_GUARD(ACE_Thread_Mutex, c, writer_no_longer_exists_lock_);
02358   for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
02359        pos != limit;
02360        ++pos) {
02361     if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02362       CallbackType callback(pos->first, pos->second);
02363       writerDoesNotExistCallbacks_.push_back(callback);
02364       pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02365     }
02366   }
02367   c.release();
02368   g.release();
02369   while(true) {
02370     c.acquire();
02371     if (writerDoesNotExistCallbacks_.empty()) {
02372       break;
02373     }
02374     OPENDDS_VECTOR(CallbackType)::iterator iter = writerDoesNotExistCallbacks_.begin();
02375     const RepoId& rid = iter->first;
02376     const InterestingRemote& remote = iter->second;
02377     writerDoesNotExistCallbacks_.erase(iter);
02378     c.release();
02379     remote.listener->writer_does_not_exist(rid, remote.localid);
02380   }
02381 }

ACE_INLINE RtpsUdpInst * OpenDDS::DCPS::RtpsUdpDataLink::config (  ) 

Definition at line 24 of file RtpsUdpDataLink.inl.

References config_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().

00025 {
00026   return config_;
00027 }

TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element ( TransportQueueElement element  )  [private, virtual]

Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 629 of file RtpsUdpDataLink.cpp.

References add_gap_submsg(), OpenDDS::DCPS::RtpsCustomizedElement::alloc(), OpenDDS::DCPS::RtpsSampleHeader::control_message_supported(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, end_historic_samples(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DataSampleHeader::historic_sample_, lock_, OpenDDS::DCPS::TransportQueueElement::msg(), OPENDDS_STRING, OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::TransportQueueElement::publication_id(), requires_inline_qos(), rtps_customized_element_allocator_, OpenDDS::DCPS::TransportSendElement::sample(), send_heartbeats_manual(), OpenDDS::DCPS::RtpsCustomizedElement::sequence(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN, OpenDDS::DCPS::submsgs_to_msgblock(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::Transport_debug_level, and writers_.

00630 {
00631   const ACE_Message_Block* msg = element->msg();
00632   if (!msg) {
00633     return element;
00634   }
00635 
00636   const RepoId pub_id = element->publication_id();
00637 
00638   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, 0);
00639 
00640   RTPS::SubmessageSeq subm;
00641 
00642   const RtpsWriterMap::iterator rw = writers_.find(pub_id);
00643 
00644   bool gap_ok = true;
00645   DestToEntityMap gap_receivers;
00646   if (rw != writers_.end() && !rw->second.remote_readers_.empty()) {
00647     for (ReaderInfoMap::iterator ri = rw->second.remote_readers_.begin();
00648          ri != rw->second.remote_readers_.end(); ++ri) {
00649       RepoId tmp;
00650       std::memcpy(tmp.guidPrefix, ri->first.guidPrefix, sizeof(GuidPrefix_t));
00651       tmp.entityId = ENTITYID_UNKNOWN;
00652       gap_receivers[tmp].push_back(ri->first);
00653 
00654       if (ri->second.expecting_durable_data()) {
00655         // Can't add an in-line GAP if some Data Reader is expecting durable
00656         // data, the GAP could cause that Data Reader to ignore the durable
00657         // data.  The other readers will eventually learn about the GAP by
00658         // sending an ACKNACK and getting a GAP reply.
00659         gap_ok = false;
00660         break;
00661       }
00662     }
00663   }
00664 
00665   if (gap_ok) {
00666     add_gap_submsg(subm, *element, gap_receivers);
00667   }
00668 
00669   const SequenceNumber seq = element->sequence();
00670   if (rw != writers_.end() && seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00671     rw->second.expected_ = seq;
00672     ++rw->second.expected_;
00673   }
00674 
00675   TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
00676   TransportCustomizedElement* tce =
00677     dynamic_cast<TransportCustomizedElement*>(element);
00678   TransportSendControlElement* tsce =
00679     dynamic_cast<TransportSendControlElement*>(element);
00680 
00681   ACE_Message_Block* data = 0;
00682   bool durable = false;
00683 
00684   // Based on the type of 'element', find and duplicate the data payload
00685   // continuation block.
00686   if (tsce) {        // Control message
00687     if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
00688       data = msg->cont()->duplicate();
00689       // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00690       RtpsSampleHeader::populate_data_control_submessages(
00691                 subm, *tsce, requires_inline_qos(pub_id));
00692     } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
00693       end_historic_samples(rw, tsce->header(), msg->cont());
00694       element->data_delivered();
00695       return 0;
00696     } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
00697       send_heartbeats_manual(tsce);
00698       element->data_delivered();
00699       return 0;
00700     } else {
00701       element->data_dropped(true /*dropped_by_transport*/);
00702       return 0;
00703     }
00704 
00705   } else if (tse) {  // Basic data message
00706     // {DataSampleHeader} -> {Data Payload}
00707     data = msg->cont()->duplicate();
00708     const DataSampleElement* dsle = tse->sample();
00709     // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00710     RtpsSampleHeader::populate_data_sample_submessages(
00711               subm, *dsle, requires_inline_qos(pub_id));
00712     durable = dsle->get_header().historic_sample_;
00713 
00714   } else if (tce) {  // Customized data message
00715     // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
00716     data = msg->cont()->cont()->duplicate();
00717     const DataSampleElement* dsle = tce->original_send_element()->sample();
00718     // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00719     RtpsSampleHeader::populate_data_sample_submessages(
00720               subm, *dsle, requires_inline_qos(pub_id));
00721     durable = dsle->get_header().historic_sample_;
00722 
00723   } else {
00724     return element;
00725   }
00726 
00727   ACE_Message_Block* hdr = submsgs_to_msgblock(subm);
00728   hdr->cont(data);
00729   RtpsCustomizedElement* rtps =
00730     RtpsCustomizedElement::alloc(element, hdr,
00731       &rtps_customized_element_allocator_);
00732 
00733   // Handle durability resends
00734   if (durable && rw != writers_.end()) {
00735     const RepoId sub = element->subscription_id();
00736     if (sub != GUID_UNKNOWN) {
00737       ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(sub);
00738       if (ri != rw->second.remote_readers_.end()) {
00739         ri->second.durable_data_[rtps->sequence()] = rtps;
00740         ri->second.durable_timestamp_ = ACE_OS::gettimeofday();
00741         if (Transport_debug_level > 3) {
00742           const GuidConverter conv(pub_id), sub_conv(sub);
00743           ACE_DEBUG((LM_DEBUG,
00744             "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00745             "storing durable data for local %C remote %C\n",
00746             OPENDDS_STRING(conv).c_str(), OPENDDS_STRING(sub_conv).c_str()));
00747         }
00748         return 0;
00749       }
00750     }
00751   } else if (durable && (Transport_debug_level)) {
00752     const GuidConverter conv(pub_id);
00753     ACE_DEBUG((LM_ERROR,
00754       "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00755       "WARNING: no RtpsWriter to store durable data for local %C\n",
00756       OPENDDS_STRING(conv).c_str()));
00757   }
00758 
00759   return rtps;
00760 }

template<typename T, typename FN>
void OpenDDS::DCPS::RtpsUdpDataLink::datareader_dispatch ( const T &  submessage,
const GuidPrefix_t src_prefix,
const FN &  func 
) [inline, private]

Definition at line 305 of file RtpsUdpDataLink.h.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, and OpenDDS::DCPS::GUID_t::guidPrefix.

Referenced by received().

00307   {
00308     using std::pair;
00309     RepoId local;
00310     std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
00311     local.entityId = submessage.readerId;
00312 
00313     RepoId src;
00314     std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
00315     src.entityId = submessage.writerId;
00316 
00317     bool schedule_timer = false;
00318     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00319     if (local.entityId == ENTITYID_UNKNOWN) {
00320       for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00321              reader_index_.equal_range(src);
00322            iters.first != iters.second; ++iters.first) {
00323         schedule_timer |= (this->*func)(submessage, src, *iters.first->second);
00324       }
00325 
00326     } else {
00327       const RtpsReaderMap::iterator rr = readers_.find(local);
00328       if (rr == readers_.end()) {
00329         return;
00330       }
00331       schedule_timer = (this->*func)(submessage, src, *rr);
00332     }
00333     g.release();
00334     if (schedule_timer) {
00335       heartbeat_reply_.schedule();
00336     }
00337   }

void OpenDDS::DCPS::RtpsUdpDataLink::deliver_held_data ( const RepoId readerId,
WriterInfo info,
bool  durable 
) [private]

Definition at line 995 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, and OpenDDS::DCPS::Transport_debug_level.

Referenced by process_data_i(), process_gap_i(), and process_heartbeat_i().

00997 {
00998   if (durable && (info.recvd_.empty() || info.recvd_.low() > 1)) return;
00999   const SequenceNumber ca = info.recvd_.cumulative_ack();
01000   typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter;
01001   const iter end = info.held_.upper_bound(ca);
01002   for (iter it = info.held_.begin(); it != end; /*increment in loop body*/) {
01003     if (Transport_debug_level > 5) {
01004       GuidConverter reader(readerId);
01005       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::deliver_held_data -")
01006                            ACE_TEXT(" deliver sequence: %q to %C\n"),
01007                            it->second.header_.sequence_.getValue(),
01008                            OPENDDS_STRING(reader).c_str()));
01009     }
01010     data_received(it->second, readerId);
01011     info.held_.erase(it++);
01012   }
01013 }

void OpenDDS::DCPS::RtpsUdpDataLink::do_remove_sample ( const RepoId pub_id,
const TransportQueueElement::MatchCriteria criteria 
)

Definition at line 86 of file RtpsUdpDataLink.cpp.

References lock_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), OPENDDS_SET(), and writers_.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::do_remove_sample().

00088 {
00089   RtpsWriter::SnToTqeMap sn_tqe_map;
00090   RtpsWriter::SnToTqeMap to_deliver;
00091   typedef RtpsWriter::SnToTqeMap::iterator iter_t;
00092 
00093   {
00094     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00095 
00096     RtpsWriterMap::iterator iter = writers_.find(pub_id);
00097     if (iter != writers_.end() && !iter->second.elems_not_acked_.empty()) {
00098       to_deliver.insert(iter->second.to_deliver_.begin(), iter->second.to_deliver_.end());
00099       iter->second.to_deliver_.clear();
00100       iter_t it = iter->second.elems_not_acked_.begin();
00101       OPENDDS_SET(SequenceNumber) sns_to_release;
00102       while (it != iter->second.elems_not_acked_.end()) {
00103         if (criteria.matches(*it->second)) {
00104           sn_tqe_map.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
00105           sns_to_release.insert(it->first);
00106           iter_t last = it;
00107           ++it;
00108           iter->second.elems_not_acked_.erase(last);
00109         } else {
00110           ++it;
00111         }
00112       }
00113       OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00114       while (sns_it != sns_to_release.end()) {
00115         iter->second.send_buff_->release_acked(*sns_it);
00116         ++sns_it;
00117       }
00118     }
00119   }
00120   iter_t deliver_iter = to_deliver.begin();
00121   while (deliver_iter != to_deliver.end()) {
00122     deliver_iter->second->data_delivered();
00123     ++deliver_iter;
00124   }
00125   iter_t drop_iter = sn_tqe_map.begin();
00126   while (drop_iter != sn_tqe_map.end()) {
00127     drop_iter->second->data_dropped(true);
00128     ++drop_iter;
00129   }
00130 }

void OpenDDS::DCPS::RtpsUdpDataLink::durability_resend ( TransportQueueElement element  )  [private]

Definition at line 2157 of file RtpsUdpDataLink.cpp.

References get_locator(), OpenDDS::DCPS::TransportQueueElement::msg(), send_strategy_, and OpenDDS::DCPS::TransportQueueElement::subscription_id().

Referenced by received().

02158 {
02159   ACE_Message_Block* msg = const_cast<ACE_Message_Block*>(element->msg());
02160   send_strategy_->send_rtps_control(*msg,
02161                                     get_locator(element->subscription_id()));
02162 }

void OpenDDS::DCPS::RtpsUdpDataLink::end_historic_samples ( RtpsWriterMap::iterator  writer,
const DataSampleHeader header,
ACE_Message_Block *  body 
) [private]

Definition at line 763 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN, header, OPENDDS_STRING, OpenDDS::DCPS::Transport_debug_level, and writers_.

Referenced by customize_queue_element().

00766 {
00767   // Set the ReaderInfo::durable_timestamp_ for the case where no
00768   // durable samples exist in the DataWriter.
00769   if (writer != writers_.end() && writer->second.durable_) {
00770     const ACE_Time_Value now = ACE_OS::gettimeofday();
00771     RepoId sub = GUID_UNKNOWN;
00772     if (body && header.message_length_ >= sizeof(sub)) {
00773       std::memcpy(&sub, body->rd_ptr(), header.message_length_);
00774     }
00775     typedef ReaderInfoMap::iterator iter_t;
00776     if (sub == GUID_UNKNOWN) {
00777       if (Transport_debug_level > 3) {
00778         const GuidConverter conv(writer->first);
00779         ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
00780                    "local %C all readers\n", OPENDDS_STRING(conv).c_str()));
00781       }
00782       for (iter_t iter = writer->second.remote_readers_.begin();
00783            iter != writer->second.remote_readers_.end(); ++iter) {
00784         if (iter->second.durable_) {
00785           iter->second.durable_timestamp_ = now;
00786         }
00787       }
00788     } else {
00789       iter_t iter = writer->second.remote_readers_.find(sub);
00790       if (iter != writer->second.remote_readers_.end()) {
00791         if (iter->second.durable_) {
00792           iter->second.durable_timestamp_ = now;
00793           if (Transport_debug_level > 3) {
00794             const GuidConverter conv(writer->first), sub_conv(sub);
00795             ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
00796                        " local %C remote %C\n", OPENDDS_STRING(conv).c_str(),
00797                        OPENDDS_STRING(sub_conv).c_str()));
00798           }
00799         }
00800       }
00801     }
00802   }
00803 }

void OpenDDS::DCPS::RtpsUdpDataLink::extend_bitmap_range ( RTPS::FragmentNumberSet fnSet,
CORBA::ULong  extent 
) [static, private]

Extend the FragmentNumberSet to cover the fragments that are missing from our last known fragment to the extent

Parameters:
fnSet FragmentNumberSet for the message sequence number in question
extent is the highest fragement sequence number for this FragmentNumberSet

Definition at line 1513 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), and OpenDDS::RTPS::FragmentNumberSet::numBits.

Referenced by generate_nack_frags().

01515 {
01516   if (extent < fnSet.bitmapBase.value) {
01517     return; // can't extend to some number under the base
01518   }
01519   // calculate the index to the extent to determine the new_num_bits
01520   const CORBA::ULong new_num_bits = std::min(CORBA::ULong(255),
01521                                              extent - fnSet.bitmapBase.value + 1),
01522                      len = (new_num_bits + 31) / 32;
01523   if (new_num_bits < fnSet.numBits) {
01524     return; // bitmap already extends past "extent"
01525   }
01526   fnSet.bitmap.length(len);
01527   // We are missing from one past old bitmap end to the new end
01528   DisjointSequence::fill_bitmap_range(fnSet.numBits + 1, new_num_bits,
01529                                       fnSet.bitmap.get_buffer(), len,
01530                                       fnSet.numBits);
01531 }

size_t OpenDDS::DCPS::RtpsUdpDataLink::generate_nack_frags ( OPENDDS_VECTOR(RTPS::NackFragSubmessage)&  nack_frags,
WriterInfo wi,
const RepoId pub_id 
) [private]

Definition at line 1428 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::ENTITYID_UNKNOWN, extend_bitmap_range(), OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::hb_range_, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::DisjointSequence::last_ack(), OpenDDS::RTPS::NACK_FRAG, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::nackfrag_count_, OpenDDS::RTPS::FragmentNumberSet::numBits, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_VECTOR(), recv_strategy_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::NackFragSubmessage::smHeader, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

Referenced by send_ack_nacks().

01430 {
01431   typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
01432   typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
01433   RtpsUdpReceiveStrategy::FragmentInfo frag_info;
01434 
01435   // Populate frag_info with two possible sources of NackFrags:
01436   // 1. sequence #s in the reception gaps that we have partially received
01437   OPENDDS_VECTOR(SequenceRange) missing = wi.recvd_.missing_sequence_ranges();
01438   for (size_t i = 0; i < missing.size(); ++i) {
01439     recv_strategy_->has_fragments(missing[i], pub_id, &frag_info);
01440   }
01441   // 1b. larger than the last received seq# but less than the heartbeat.lastSN
01442   if (!wi.recvd_.empty()) {
01443     const SequenceRange range(wi.recvd_.high(), wi.hb_range_.second);
01444     recv_strategy_->has_fragments(range, pub_id, &frag_info);
01445   }
01446   for (size_t i = 0; i < frag_info.size(); ++i) {
01447     // If we've received a HeartbeatFrag, we know the last (available) frag #
01448     const iter_t heartbeat_frag = wi.frags_.find(frag_info[i].first);
01449     if (heartbeat_frag != wi.frags_.end()) {
01450       extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value);
01451     }
01452   }
01453 
01454   // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag
01455   const iter_t low = wi.frags_.lower_bound(wi.recvd_.cumulative_ack()),
01456               high = wi.frags_.upper_bound(wi.recvd_.last_ack()),
01457                end = wi.frags_.end();
01458   for (iter_t iter = wi.frags_.begin(); iter != end; ++iter) {
01459     if (iter == low) {
01460       // skip over the range covered by step #1 above
01461       if (high == end) {
01462         break;
01463       }
01464       iter = high;
01465     }
01466 
01467     const SequenceRange range(iter->first, iter->first);
01468     if (recv_strategy_->has_fragments(range, pub_id, &frag_info)) {
01469       extend_bitmap_range(frag_info.back().second, iter->second.value);
01470     } else {
01471       // it was not in the recv strategy, so the entire range is "missing"
01472       frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
01473       RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
01474       fnSet.bitmapBase.value = 1;
01475       fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
01476       fnSet.bitmap.length((fnSet.numBits + 31) / 32);
01477       for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
01478         fnSet.bitmap[i] = 0xFFFFFFFF;
01479       }
01480     }
01481   }
01482 
01483   if (frag_info.empty()) {
01484     return 0;
01485   }
01486 
01487   const RTPS::NackFragSubmessage nackfrag_prototype = {
01488     {RTPS::NACK_FRAG, 1 /*FLAG_E*/, 0 /* length set below */},
01489     ENTITYID_UNKNOWN, // readerId will be filled-in by send_heartbeat_replies()
01490     ENTITYID_UNKNOWN, // writerId will be filled-in by send_heartbeat_replies()
01491     {0, 0}, // writerSN set below
01492     RTPS::FragmentNumberSet(), // fragmentNumberState set below
01493     {0} // count set below
01494   };
01495 
01496   size_t size = 0, padding = 0;
01497   for (size_t i = 0; i < frag_info.size(); ++i) {
01498     nf.push_back(nackfrag_prototype);
01499     RTPS::NackFragSubmessage& nackfrag = nf.back();
01500     nackfrag.writerSN.low = frag_info[i].first.getLow();
01501     nackfrag.writerSN.high = frag_info[i].first.getHigh();
01502     nackfrag.fragmentNumberState = frag_info[i].second;
01503     nackfrag.count.value = ++wi.nackfrag_count_;
01504     const size_t before_size = size;
01505     gen_find_size(nackfrag, size, padding);
01506     nackfrag.smHeader.submessageLength =
01507       static_cast<CORBA::UShort>(size - before_size) - RTPS::SMHDR_SZ;
01508   }
01509   return size;
01510 }

ACE_INET_Addr OpenDDS::DCPS::RtpsUdpDataLink::get_locator ( const RepoId remote_id  )  const

Definition at line 212 of file RtpsUdpDataLink.cpp.

References OPENDDS_MAP_CMP(), and OPENDDS_STRING.

Referenced by durability_resend(), get_locators(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i(), and send_durability_gaps().

00213 {
00214   typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00215   const iter_t iter = locators_.find(remote_id);
00216   if (iter == locators_.end()) {
00217     const GuidConverter conv(remote_id);
00218     ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::get_locator_i() - "
00219       "no locator found for peer %C\n", OPENDDS_STRING(conv).c_str()));
00220     return ACE_INET_Addr();
00221   }
00222   return iter->second.addr_;
00223 }

void OpenDDS::DCPS::RtpsUdpDataLink::get_locators ( const RepoId local_id,
OPENDDS_SET(ACE_INET_Addr)&  addrs 
) const

Given a 'local_id' of a publication or subscription, populate the set of 'addrs' with the network addresses of any remote peers (or if 'local_id' is GUID_UNKNOWN, all known addresses).

Definition at line 187 of file RtpsUdpDataLink.cpp.

References get_locator(), OpenDDS::DCPS::GUID_UNKNOWN, OPENDDS_MAP_CMP(), and OpenDDS::DCPS::DataLink::peer_ids().

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i(), and send_heartbeats_manual().

00189 {
00190   typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00191 
00192   if (local_id == GUID_UNKNOWN) {
00193     for (iter_t iter = locators_.begin(); iter != locators_.end(); ++iter) {
00194       addrs.insert(iter->second.addr_);
00195     }
00196     return;
00197   }
00198 
00199   const GUIDSeq_var peers = peer_ids(local_id);
00200   if (!peers.ptr()) {
00201     return;
00202   }
00203   for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00204     const ACE_INET_Addr addr = get_locator(peers[i]);
00205     if (addr != ACE_INET_Addr()) {
00206       addrs.insert(addr);
00207     }
00208   }
00209 }

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::RtpsUdpDataLink::get_reactor (  ) 

Definition at line 30 of file RtpsUdpDataLink.inl.

References reactor_task_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::cancel(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::disable(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::enable(), OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().

00031 {
00032   if (reactor_task_ == 0) return 0;
00033   return reactor_task_->get_reactor();
00034 }

const GuidPrefix_t& OpenDDS::DCPS::RtpsUdpDataLink::local_prefix (  )  const [inline]

Definition at line 83 of file RtpsUdpDataLink.h.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), and OpenDDS::DCPS::RtpsUdpSendStrategy::RtpsUdpSendStrategy().

00083 { return local_prefix_; }

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpDataLink::marshal_gaps ( const RepoId writer,
const RepoId reader,
const DisjointSequence gaps,
bool  durable = false 
) [private]

Definition at line 2055 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, bitmap_num_longs(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::GAP, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::DisjointSequence::to_bitmap(), OpenDDS::DCPS::Transport_debug_level, and writers_.

Referenced by send_durability_gaps(), and send_nack_replies().

02057 {
02058   using namespace RTPS;
02059   // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
02060   // [gapStart, gapListBase) and those in the SNSet.
02061   const SequenceNumber firstMissing = gaps.low(),
02062                        base = ++SequenceNumber(gaps.cumulative_ack());
02063   const SequenceNumber_t gapStart = {firstMissing.getHigh(),
02064                                      firstMissing.getLow()},
02065                          gapListBase = {base.getHigh(), base.getLow()};
02066   CORBA::ULong num_bits = 0;
02067   LongSeq8 bitmap;
02068 
02069   if (gaps.disjoint()) {
02070     bitmap.length(bitmap_num_longs(base, gaps.high()));
02071     gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits);
02072 
02073   } else {
02074     bitmap.length(1);
02075     bitmap[0] = 0;
02076     num_bits = 1;
02077   }
02078 
02079   GapSubmessage gap = {
02080     {GAP, 1 /*FLAG_E*/, 0 /*length determined below*/},
02081     reader.entityId,
02082     writer.entityId,
02083     gapStart,
02084     {gapListBase, num_bits, bitmap}
02085   };
02086 
02087   if (Transport_debug_level > 5) {
02088     const GuidConverter conv(writer);
02089     SequenceRange sr;
02090     sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
02091     SequenceNumber srbase;
02092     srbase.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
02093     sr.second = srbase.previous();
02094     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::marshal_gaps "
02095               "GAP with range [%q, %q] from %C\n",
02096               sr.first.getValue(), sr.second.getValue(),
02097               OPENDDS_STRING(conv).c_str()));
02098   }
02099 
02100   size_t gap_size = 0, padding = 0;
02101   gen_find_size(gap, gap_size, padding);
02102   gap.smHeader.submessageLength =
02103     static_cast<CORBA::UShort>(gap_size + padding) - SMHDR_SZ;
02104 
02105   // For durable writers, change a non-directed Gap into multiple directed gaps.
02106   OPENDDS_VECTOR(RepoId) readers;
02107   if (durable && reader.entityId == ENTITYID_UNKNOWN) {
02108     if (Transport_debug_level > 5) {
02109       const GuidConverter local_conv(writer);
02110       ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps local %C "
02111                  "durable writer\n", OPENDDS_STRING(local_conv).c_str()));
02112     }
02113     const RtpsWriterMap::iterator iter = writers_.find(writer);
02114     RtpsWriter& rw = iter->second;
02115     for (ReaderInfoMap::iterator ri = rw.remote_readers_.begin();
02116          ri != rw.remote_readers_.end(); ++ri) {
02117       if (!ri->second.expecting_durable_data()) {
02118         readers.push_back(ri->first);
02119       } else if (Transport_debug_level > 5) {
02120         const GuidConverter remote_conv(ri->first);
02121         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps reader "
02122                    "%C is expecting durable data, no GAP sent\n",
02123                    OPENDDS_STRING(remote_conv).c_str()));
02124       }
02125     }
02126     if (readers.empty()) return 0;
02127   }
02128 
02129   const size_t size_per_idst = INFO_DST_SZ + SMHDR_SZ,
02130     prefix_sz = sizeof(reader.guidPrefix);
02131   // no additional padding needed for INFO_DST
02132   const size_t total_sz = readers.empty() ? (gap_size + padding) :
02133     (readers.size() * (gap_size + padding + size_per_idst));
02134 
02135   ACE_Message_Block* mb_gap = new ACE_Message_Block(total_sz);
02136   //FUTURE: allocators?
02137   // byte swapping is handled in the operator<<() implementation
02138   Serializer ser(mb_gap, false, Serializer::ALIGN_CDR);
02139   if (readers.empty()) {
02140     ser << gap;
02141   } else {
02142     InfoDestinationSubmessage idst = {
02143       {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ},
02144       {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02145     };
02146     for (size_t i = 0; i < readers.size(); ++i) {
02147       std::memcpy(idst.guidPrefix, readers[i].guidPrefix, prefix_sz);
02148       gap.readerId = readers[i].entityId;
02149       ser << idst;
02150       ser << gap;
02151     }
02152   }
02153   return mb_gap;
02154 }

ACE_INLINE ACE_SOCK_Dgram_Mcast & OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket (  ) 

Definition at line 50 of file RtpsUdpDataLink.inl.

References multicast_socket_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().

00051 {
00052   return multicast_socket_;
00053 }

bool OpenDDS::DCPS::RtpsUdpDataLink::open ( const ACE_SOCK_Dgram &  unicast_socket  ) 

Definition at line 133 of file RtpsUdpDataLink.cpp.

References config_, multi_buff_, OpenDDS::DCPS::RtpsUdpInst::multicast_group_address_, OpenDDS::DCPS::RtpsUdpInst::multicast_interface_, multicast_socket_, OPENDDS_STRING, recv_strategy_, send_strategy_, OpenDDS::DCPS::set_socket_multicast_ttl(), OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::RtpsUdpInst::ttl_, unicast_socket_, and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.

00134 {
00135   unicast_socket_ = unicast_socket;
00136 
00137   if (config_->use_multicast_) {
00138     const OPENDDS_STRING& net_if = config_->multicast_interface_;
00139 #ifdef ACE_HAS_MAC_OSX
00140     multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00141                            ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00142 #endif
00143     if (multicast_socket_.join(config_->multicast_group_address_, 1,
00144                                net_if.empty() ? 0 :
00145                                ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00146       ACE_ERROR_RETURN((LM_ERROR,
00147                         ACE_TEXT("(%P|%t) ERROR: ")
00148                         ACE_TEXT("RtpsUdpDataLink::open: ")
00149                         ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed: %m\n")),
00150                        false);
00151     }
00152   }
00153 
00154   if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, config_->ttl_)) {
00155     ACE_ERROR_RETURN((LM_ERROR,
00156                       ACE_TEXT("(%P|%t) ERROR: ")
00157                       ACE_TEXT("RtpsUdpDataLink::open: ")
00158                       ACE_TEXT("failed to set TTL: %d\n"),
00159                       config_->ttl_),
00160                      false);
00161   }
00162 
00163   send_strategy_->send_buffer(&multi_buff_);
00164 
00165   if (start(static_rchandle_cast<TransportSendStrategy>(send_strategy_),
00166             static_rchandle_cast<TransportStrategy>(recv_strategy_)) != 0) {
00167     stop_i();
00168     ACE_ERROR_RETURN((LM_ERROR,
00169                       ACE_TEXT("(%P|%t) ERROR: ")
00170                       ACE_TEXT("UdpDataLink::open: start failed!\n")),
00171                      false);
00172   }
00173 
00174   return true;
00175 }

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
CORBA::Long  ,
DCPS::GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
RtpsReader  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
WriterInfo  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
RtpsWriter  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
ReaderInfo  ,
GUID_tKeyLessThan   
) [private]

OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
RemoteInfo  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
OPENDDS_VECTOR(RepoId ,
GUID_tKeyLessThan   
) [private]

Referenced by get_locator(), get_locators(), and requires_inline_qos().

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP ( RepoId  ,
InterestingRemote  ,
DCPS::GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP ( RepoId  ,
RtpsReaderMap::iterator  ,
GUID_tKeyLessThan   
) [private]

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET ( InterestingAckNack   )  [private]

Referenced by do_remove_sample(), pre_stop_i(), process_acked_by_all_i(), release_reservations_i(), send_heartbeats(), send_heartbeats_manual(), and send_nack_replies().

OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( CallbackType   )  [private]

OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_VECTOR ( CallbackType   )  [private]

Referenced by add_gap_submsg(), check_heartbeats(), generate_nack_frags(), marshal_gaps(), pre_stop_i(), process_acked_by_all_i(), received(), release_reservations_i(), send_ack_nacks(), send_heartbeats(), send_nack_replies(), unregister_for_reader(), and unregister_for_writer().

void OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i (  )  [virtual]

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 377 of file RtpsUdpDataLink.cpp.

References DBG_ENTRY_LVL, heartbeat_counts_, lock_, OPENDDS_MULTIMAP, OPENDDS_SET(), OPENDDS_VECTOR(), OpenDDS::DCPS::DataLink::pre_stop_i(), and writers_.

00378 {
00379   DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
00380   DataLink::pre_stop_i();
00381   OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00382   OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00383   {
00384     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00385 
00386     typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00387 
00388     RtpsWriterMap::iterator iter = writers_.begin();
00389     while (iter != writers_.end()) {
00390       RtpsWriter& writer = iter->second;
00391       if (!writer.to_deliver_.empty()) {
00392         iter_t iter = writer.to_deliver_.begin();
00393         while (iter != writer.to_deliver_.end()) {
00394           to_deliver.push_back(iter->second);
00395           writer.to_deliver_.erase(iter);
00396           iter = writer.to_deliver_.begin();
00397         }
00398       }
00399       if (!writer.elems_not_acked_.empty()) {
00400         OPENDDS_SET(SequenceNumber) sns_to_release;
00401         iter_t iter = writer.elems_not_acked_.begin();
00402         while (iter != writer.elems_not_acked_.end()) {
00403           to_drop.push_back(iter->second);
00404           sns_to_release.insert(iter->first);
00405           writer.elems_not_acked_.erase(iter);
00406           iter = writer.elems_not_acked_.begin();
00407         }
00408         OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00409         while (sns_it != sns_to_release.end()) {
00410           writer.send_buff_->release_acked(*sns_it);
00411           ++sns_it;
00412         }
00413       }
00414       RtpsWriterMap::iterator last = iter;
00415       ++iter;
00416       heartbeat_counts_.erase(last->first);
00417       writers_.erase(last);
00418     }
00419   }
00420   typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00421   tqe_iter deliver_it = to_deliver.begin();
00422   while (deliver_it != to_deliver.end()) {
00423     (*deliver_it)->data_delivered();
00424     ++deliver_it;
00425   }
00426   tqe_iter drop_it = to_drop.begin();
00427   while (drop_it != to_drop.end()) {
00428     (*drop_it)->data_dropped(true);
00429     ++drop_it;
00430   }
00431 }

void OpenDDS::DCPS::RtpsUdpDataLink::process_acked_by_all_i ( ACE_Guard< ACE_Thread_Mutex > &  g,
const RepoId pub_id 
) [private]

Definition at line 1984 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_, OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_MULTIMAP, OPENDDS_SET(), OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::to_deliver_, and writers_.

Referenced by received().

01985 {
01986   using namespace OpenDDS::RTPS;
01987   typedef RtpsWriterMap::iterator rw_iter;
01988   typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01989   OPENDDS_VECTOR(RepoId) to_check;
01990   rw_iter rw = writers_.find(pub_id);
01991   if (rw == writers_.end()) {
01992     return;
01993   }
01994   RtpsWriter& writer = rw->second;
01995   if (!writer.elems_not_acked_.empty()) {
01996 
01997     //start with the max sequence number writer knows about and decrease
01998     //by what the min over all readers is
01999     SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
02000 
02001     typedef ReaderInfoMap::iterator ri_iter;
02002     const ri_iter end = writer.remote_readers_.end();
02003     for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
02004       if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
02005         all_readers_ack = ri->second.cur_cumulative_ack_;
02006       }
02007     }
02008     if (all_readers_ack == SequenceNumber::MAX_VALUE) {
02009       return;
02010     }
02011     OPENDDS_VECTOR(SequenceNumber) sns;
02012     //if any messages fully acked, call data delivered and remove from map
02013 
02014     iter_t it = writer.elems_not_acked_.begin();
02015     OPENDDS_SET(SequenceNumber) sns_to_release;
02016     while (it != writer.elems_not_acked_.end()) {
02017       if (it->first < all_readers_ack) {
02018         writer.to_deliver_.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
02019         sns_to_release.insert(it->first);
02020         iter_t last = it;
02021         ++it;
02022         writer.elems_not_acked_.erase(last);
02023       } else {
02024         break;
02025       }
02026     }
02027     OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
02028     while (sns_it != sns_to_release.end()) {
02029       writer.send_buff_->release_acked(*sns_it);
02030       ++sns_it;
02031     }
02032     TransportQueueElement* tqe_to_deliver;
02033 
02034     while (true) {
02035       rw_iter deliver_on_writer = writers_.find(pub_id);
02036       if (deliver_on_writer == writers_.end()) {
02037         break;
02038       }
02039       RtpsWriter& writer = deliver_on_writer->second;
02040       iter_t to_deliver_iter = writer.to_deliver_.begin();
02041       if (to_deliver_iter == writer.to_deliver_.end()) {
02042         break;
02043       }
02044       tqe_to_deliver = to_deliver_iter->second;
02045       writer.to_deliver_.erase(to_deliver_iter);
02046       g.release();
02047 
02048       tqe_to_deliver->data_delivered();
02049       g.acquire();
02050     }
02051   }
02052 }

bool OpenDDS::DCPS::RtpsUdpDataLink::process_data_i ( const RTPS::DataSubmessage data,
const RepoId src,
RtpsReaderMap::value_type &  rr 
) [private]

Definition at line 925 of file RtpsUdpDataLink.cpp.

References deliver_held_data(), OpenDDS::DCPS::SequenceNumber::getValue(), OPENDDS_STRING, OpenDDS::DCPS::SequenceNumber::previous(), recv_strategy_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by received().

00928 {
00929   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
00930   if (wi != rr.second.remote_writers_.end()) {
00931     WriterInfo& info = wi->second;
00932     SequenceNumber seq;
00933     seq.setValue(data.writerSN.high, data.writerSN.low);
00934     info.frags_.erase(seq);
00935     const RepoId& readerId = rr.first;
00936     if (info.recvd_.contains(seq)) {
00937       if (Transport_debug_level > 5) {
00938         GuidConverter writer(src);
00939         GuidConverter reader(readerId);
00940         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00941                              ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because ALREADY received\n"),
00942                              seq.getValue(),
00943                              OPENDDS_STRING(writer).c_str(),
00944                              OPENDDS_STRING(reader).c_str()));
00945       }
00946       recv_strategy_->withhold_data_from(readerId);
00947     } else if (info.recvd_.disjoint() ||
00948         (!info.recvd_.empty() && info.recvd_.cumulative_ack() != seq.previous())
00949         || (rr.second.durable_ && !info.recvd_.empty() && info.recvd_.low() > 1)
00950         || (rr.second.durable_ && info.recvd_.empty() && seq > 1)) {
00951       if (Transport_debug_level > 5) {
00952         GuidConverter writer(src);
00953         GuidConverter reader(readerId);
00954         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00955                              ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because can't receive yet\n"),
00956                              seq.getValue(),
00957                              OPENDDS_STRING(writer).c_str(),
00958                              OPENDDS_STRING(reader).c_str()));
00959       }
00960       const ReceivedDataSample* sample =
00961         recv_strategy_->withhold_data_from(readerId);
00962       info.held_.insert(std::make_pair(seq, *sample));
00963     } else {
00964       if (Transport_debug_level > 5) {
00965         GuidConverter writer(src);
00966         GuidConverter reader(readerId);
00967         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00968                              ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
00969                              seq.getValue(),
00970                              OPENDDS_STRING(writer).c_str(),
00971                              OPENDDS_STRING(reader).c_str()));
00972       }
00973       recv_strategy_->do_not_withhold_data_from(readerId);
00974     }
00975     info.recvd_.insert(seq);
00976     deliver_held_data(readerId, info, rr.second.durable_);
00977   } else {
00978     if (Transport_debug_level > 5) {
00979       GuidConverter writer(src);
00980       GuidConverter reader(rr.first);
00981       SequenceNumber seq;
00982       seq.setValue(data.writerSN.high, data.writerSN.low);
00983       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00984                            ACE_TEXT(" data seq: %q from %C to %C OK to deliver (Writer not currently in Reader remote writer map)\n"),
00985                            seq.getValue(),
00986                            OPENDDS_STRING(writer).c_str(),
00987                            OPENDDS_STRING(reader).c_str()));
00988     }
00989     recv_strategy_->do_not_withhold_data_from(rr.first);
00990   }
00991   return false;
00992 }

bool OpenDDS::DCPS::RtpsUdpDataLink::process_gap_i ( const RTPS::GapSubmessage gap,
const RepoId src,
RtpsReaderMap::value_type &  rr 
) [private]

Definition at line 1023 of file RtpsUdpDataLink.cpp.

References deliver_held_data(), OpenDDS::RTPS::GapSubmessage::gapList, OpenDDS::RTPS::GapSubmessage::gapStart, OPENDDS_STRING, OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, and VDBG_LVL.

Referenced by received().

01025 {
01026   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01027   if (wi != rr.second.remote_writers_.end()) {
01028     SequenceRange sr;
01029     sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
01030     SequenceNumber base;
01031     base.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
01032     sr.second = base.previous();
01033     if (sr.first <= sr.second) {
01034       if (Transport_debug_level > 5) {
01035         const GuidConverter conv(src);
01036         const GuidConverter rdr(rr.first);
01037         ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01038                   "Reader %C received GAP with range [%q, %q] from %C\n",
01039                   OPENDDS_STRING(rdr).c_str(),
01040                   sr.first.getValue(), sr.second.getValue(),
01041                   OPENDDS_STRING(conv).c_str()));
01042       }
01043       wi->second.recvd_.insert(sr);
01044     } else {
01045       const GuidConverter conv(src);
01046       VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01047                 "received GAP with invalid range [%q, %q] from %C\n",
01048                 sr.first.getValue(), sr.second.getValue(),
01049                 OPENDDS_STRING(conv).c_str()), 2);
01050     }
01051     wi->second.recvd_.insert(base, gap.gapList.numBits,
01052                              gap.gapList.bitmap.get_buffer());
01053     deliver_held_data(rr.first, wi->second, rr.second.durable_);
01054     //FUTURE: to support wait_for_acks(), notify DCPS layer of the GAP
01055   }
01056   return false;
01057 }

bool OpenDDS::DCPS::RtpsUdpDataLink::process_hb_frag_i ( const RTPS::HeartBeatFragSubmessage hb_frag,
const RepoId src,
RtpsReaderMap::value_type &  rr 
) [private]

Definition at line 1541 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::HeartBeatFragSubmessage::count, OpenDDS::RTPS::HeartBeatFragSubmessage::lastFragmentNum, OpenDDS::DCPS::SequenceNumber::setValue(), and OpenDDS::RTPS::HeartBeatFragSubmessage::writerSN.

Referenced by received().

01544 {
01545   WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01546   if (wi == rr.second.remote_writers_.end()) {
01547     // we may not be associated yet, even if the writer thinks we are
01548     return false;
01549   }
01550 
01551   if (hb_frag.count.value <= wi->second.hb_frag_recvd_count_) {
01552     return false;
01553   }
01554 
01555   wi->second.hb_frag_recvd_count_ = hb_frag.count.value;
01556 
01557   SequenceNumber seq;
01558   seq.setValue(hb_frag.writerSN.high, hb_frag.writerSN.low);
01559 
01560   // If seq is outside the heartbeat range or we haven't completely received
01561   // it yet, send a NackFrag along with the AckNack.  The heartbeat range needs
01562   // to be checked first because recvd_ contains the numbers below the
01563   // heartbeat range (so that we don't NACK those).
01564   if (seq < wi->second.hb_range_.first || seq > wi->second.hb_range_.second
01565       || !wi->second.recvd_.contains(seq)) {
01566     wi->second.frags_[seq] = hb_frag.lastFragmentNum;
01567     wi->second.ack_pending_ = true;
01568     return true; // timer will invoke send_heartbeat_replies()
01569   }
01570   return false;
01571 }

bool OpenDDS::DCPS::RtpsUdpDataLink::process_heartbeat_i ( const RTPS::HeartBeatSubmessage heartbeat,
const RepoId src,
RtpsReaderMap::value_type &  rr 
) [private]

Definition at line 1115 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::ack_pending_, OpenDDS::RTPS::HeartBeatSubmessage::count, deliver_held_data(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::hb_range_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::heartbeat_recvd_count_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::initial_hb_, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::RTPS::HeartBeatSubmessage::lastSN, OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::SequenceNumber::previous(), recv_strategy_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::should_nack(), OpenDDS::RTPS::HeartBeatSubmessage::smHeader, and OpenDDS::DCPS::SequenceNumber::ZERO().

Referenced by received().

01118 {
01119   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01120   if (wi == rr.second.remote_writers_.end()) {
01121     // we may not be associated yet, even if the writer thinks we are
01122     return false;
01123   }
01124 
01125   WriterInfo& info = wi->second;
01126 
01127   if (heartbeat.count.value <= info.heartbeat_recvd_count_) {
01128     return false;
01129   }
01130   info.heartbeat_recvd_count_ = heartbeat.count.value;
01131 
01132   SequenceNumber& first = info.hb_range_.first;
01133   first.setValue(heartbeat.firstSN.high, heartbeat.firstSN.low);
01134   SequenceNumber& last = info.hb_range_.second;
01135   last.setValue(heartbeat.lastSN.high, heartbeat.lastSN.low);
01136   static const SequenceNumber starting, zero = SequenceNumber::ZERO();
01137 
01138   DisjointSequence& recvd = info.recvd_;
01139   if (!rr.second.durable_ && info.initial_hb_) {
01140     if (last.getValue() < starting.getValue()) {
01141       // this is an invalid heartbeat -- last must be positive
01142       return false;
01143     }
01144     // For the non-durable reader, the first received HB or DATA establishes
01145     // a baseline of the lowest sequence number we'd ever need to NACK.
01146     if (recvd.empty() || recvd.low() >= last) {
01147       recvd.insert(SequenceRange(zero,
01148                                  (last > starting) ? last.previous() : zero));
01149     } else {
01150       recvd.insert(SequenceRange(zero, recvd.low()));
01151     }
01152   } else if (!recvd.empty()) {
01153     // All sequence numbers below 'first' should not be NACKed.
01154     // The value of 'first' may not decrease with subsequent HBs.
01155     recvd.insert(SequenceRange(zero,
01156                                (first > starting) ? first.previous() : zero));
01157   }
01158 
01159   deliver_held_data(rr.first, info, rr.second.durable_);
01160 
01161   //FUTURE: to support wait_for_acks(), notify DCPS layer of the sequence
01162   //        numbers we no longer expect to receive due to HEARTBEAT
01163 
01164   info.initial_hb_ = false;
01165 
01166   const bool final = heartbeat.smHeader.flags & 2 /* FLAG_F */,
01167     liveliness = heartbeat.smHeader.flags & 4 /* FLAG_L */;
01168 
01169   if (!final || (!liveliness && (info.should_nack() ||
01170       rr.second.nack_durable(info) ||
01171       recv_strategy_->has_fragments(info.hb_range_, wi->first)))) {
01172     info.ack_pending_ = true;
01173     return true; // timer will invoke send_heartbeat_replies()
01174   }
01175 
01176   //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC
01177   return false;
01178 }

ACE_INLINE bool OpenDDS::DCPS::RtpsUdpDataLink::reactor_is_shut_down (  ) 

Definition at line 37 of file RtpsUdpDataLink.inl.

References reactor_task_.

00038 {
00039   if (reactor_task_ == 0) return true;
00040   return reactor_task_->is_shut_down();
00041 }

ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy ( RtpsUdpReceiveStrategy recv_strategy  ) 

Definition at line 18 of file RtpsUdpDataLink.inl.

References recv_strategy_.

00019 {
00020   recv_strategy_ = recv_strategy;
00021 }

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::NackFragSubmessage nackfrag,
const GuidPrefix_t src_prefix 
)

Definition at line 1780 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::GUID_t::guidPrefix, local_prefix_, lock_, nack_reply_, OPENDDS_STRING, OpenDDS::RTPS::NackFragSubmessage::readerId, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, VDBG, OpenDDS::RTPS::NackFragSubmessage::writerId, writers_, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

01782 {
01783   // local side is DW
01784   RepoId local;
01785   std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01786   local.entityId = nackfrag.writerId; // can't be ENTITYID_UNKNOWN
01787 
01788   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01789   const RtpsWriterMap::iterator rw = writers_.find(local);
01790   if (rw == writers_.end()) {
01791     if (Transport_debug_level > 5) {
01792       GuidConverter local_conv(local);
01793       ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01794         "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01795     }
01796     return;
01797   }
01798 
01799   RepoId remote;
01800   std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01801   remote.entityId = nackfrag.readerId;
01802 
01803   if (Transport_debug_level > 5) {
01804     GuidConverter local_conv(local), remote_conv(remote);
01805     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01806       "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01807       OPENDDS_STRING(remote_conv).c_str()));
01808   }
01809 
01810   const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01811   if (ri == rw->second.remote_readers_.end()) {
01812     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01813       "WARNING ReaderInfo not found\n"));
01814     return;
01815   }
01816 
01817   if (nackfrag.count.value <= ri->second.nackfrag_recvd_count_) {
01818     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01819       "WARNING Count indicates duplicate, dropping\n"));
01820     return;
01821   }
01822 
01823   ri->second.nackfrag_recvd_count_ = nackfrag.count.value;
01824 
01825   SequenceNumber seq;
01826   seq.setValue(nackfrag.writerSN.high, nackfrag.writerSN.low);
01827   ri->second.requested_frags_[seq] = nackfrag.fragmentNumberState;
01828   g.release();
01829   nack_reply_.schedule(); // timer will invoke send_nack_replies()
01830 }

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::AckNackSubmessage acknack,
const GuidPrefix_t src_prefix 
)

Definition at line 1577 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::AckNackSubmessage::count, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::DisjointSequence::dump(), durability_resend(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::DisjointSequence::insert(), interesting_readers_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), local_prefix_, lock_, OpenDDS::DCPS::DisjointSequence::low(), nack_reply_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), process_acked_by_all_i(), OpenDDS::RTPS::AckNackSubmessage::readerId, OpenDDS::RTPS::AckNackSubmessage::readerSNState, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), send_durability_gaps(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::RTPS::AckNackSubmessage::smHeader, OpenDDS::DCPS::Transport_debug_level, VDBG, OpenDDS::RTPS::AckNackSubmessage::writerId, writers_, and OpenDDS::DCPS::SequenceNumber::ZERO().

01579 {
01580   // local side is DW
01581   RepoId local;
01582   std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01583   local.entityId = acknack.writerId; // can't be ENTITYID_UNKNOWN
01584 
01585   RepoId remote;
01586   std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01587   remote.entityId = acknack.readerId;
01588 
01589   const ACE_Time_Value now = ACE_OS::gettimeofday();
01590   OPENDDS_VECTOR(DiscoveryListener*) callbacks;
01591 
01592   {
01593     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01594     for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
01595            limit = interesting_readers_.upper_bound(remote);
01596          pos != limit;
01597          ++pos) {
01598       pos->second.last_activity = now;
01599       // Ensure the acknack was for the writer.
01600       if (local == pos->second.localid) {
01601         if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01602           callbacks.push_back(pos->second.listener);
01603           pos->second.status = InterestingRemote::EXISTS;
01604         }
01605       }
01606     }
01607   }
01608 
01609   for (size_t i = 0; i < callbacks.size(); ++i) {
01610     callbacks[i]->reader_exists(remote, local);
01611   }
01612 
01613   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01614   const RtpsWriterMap::iterator rw = writers_.find(local);
01615   if (rw == writers_.end()) {
01616     if (Transport_debug_level > 5) {
01617       GuidConverter local_conv(local);
01618       ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01619         "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01620     }
01621     return;
01622   }
01623 
01624   if (Transport_debug_level > 5) {
01625     GuidConverter local_conv(local), remote_conv(remote);
01626     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01627       "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01628       OPENDDS_STRING(remote_conv).c_str()));
01629   }
01630 
01631   const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01632   if (ri == rw->second.remote_readers_.end()) {
01633     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01634       "WARNING ReaderInfo not found\n"));
01635     return;
01636   }
01637 
01638   if (acknack.count.value <= ri->second.acknack_recvd_count_) {
01639     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01640       "WARNING Count indicates duplicate, dropping\n"));
01641     return;
01642   }
01643 
01644   ri->second.acknack_recvd_count_ = acknack.count.value;
01645 
01646   if (!ri->second.handshake_done_) {
01647     ri->second.handshake_done_ = true;
01648     invoke_on_start_callbacks(true);
01649   }
01650 
01651   OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
01652   const bool final = acknack.smHeader.flags & 2 /* FLAG_F */;
01653 
01654   if (!ri->second.durable_data_.empty()) {
01655     if (Transport_debug_level > 5) {
01656       const GuidConverter local_conv(local), remote_conv(remote);
01657       ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01658                  "local %C has durable for remote %C\n",
01659                  OPENDDS_STRING(local_conv).c_str(),
01660                  OPENDDS_STRING(remote_conv).c_str()));
01661     }
01662     SequenceNumber ack;
01663     ack.setValue(acknack.readerSNState.bitmapBase.high,
01664                  acknack.readerSNState.bitmapBase.low);
01665     const SequenceNumber& dd_last = ri->second.durable_data_.rbegin()->first;
01666     if (ack > dd_last) {
01667       // Reader acknowledges durable data, we no longer need to store it
01668       ri->second.durable_data_.swap(pendingCallbacks);
01669       if (Transport_debug_level > 5) {
01670         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01671                    "durable data acked\n"));
01672       }
01673     } else {
01674       DisjointSequence requests;
01675       if (!requests.insert(ack, acknack.readerSNState.numBits,
01676                            acknack.readerSNState.bitmap.get_buffer())
01677           && !final && ack == rw->second.heartbeat_high(ri->second)) {
01678         // This is a non-final AckNack with no bits in the bitmap.
01679         // Attempt to reply to a request for the "base" value which
01680         // is neither Acked nor Nacked, only when it's the HB high.
01681         if (ri->second.durable_data_.count(ack)) requests.insert(ack);
01682       }
01683       // Attempt to reply to nacks for durable data
01684       bool sent_some = false;
01685       typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01686       iter_t it = ri->second.durable_data_.begin();
01687       const OPENDDS_VECTOR(SequenceRange) psr = requests.present_sequence_ranges();
01688       SequenceNumber lastSent = SequenceNumber::ZERO();
01689       if (!requests.empty()) {
01690         lastSent = requests.low().previous();
01691       }
01692       DisjointSequence gaps;
01693       for (size_t i = 0; i < psr.size(); ++i) {
01694         for (; it != ri->second.durable_data_.end()
01695              && it->first < psr[i].first; ++it) ; // empty for-loop
01696         for (; it != ri->second.durable_data_.end()
01697              && it->first <= psr[i].second; ++it) {
01698           if (Transport_debug_level > 5) {
01699             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01700                        "durable resend %d\n", int(it->first.getValue())));
01701           }
01702           durability_resend(it->second);
01703           //FUTURE: combine multiple resends into one RTPS Message?
01704           sent_some = true;
01705           if (it->first > lastSent + 1) {
01706             gaps.insert(SequenceRange(lastSent + 1, it->first.previous()));
01707           }
01708           lastSent = it->first;
01709         }
01710         if (sent_some && lastSent < psr[i].second && psr[i].second < dd_last) {
01711           gaps.insert(SequenceRange(lastSent + 1, psr[i].second));
01712         }
01713       }
01714       if (!gaps.empty()) {
01715         if (Transport_debug_level > 5) {
01716           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01717                      "sending durability gaps: "));
01718           gaps.dump();
01719         }
01720         send_durability_gaps(local, remote, gaps);
01721       }
01722       if (sent_some) {
01723         return;
01724       }
01725       const SequenceNumber& dd_first = ri->second.durable_data_.begin()->first;
01726       if (!requests.empty() && requests.high() < dd_first) {
01727         // All nacks were below the start of the durable data.
01728         if (Transport_debug_level > 5) {
01729           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01730                      "sending durability gaps for all requests: "));
01731           requests.dump();
01732         }
01733         send_durability_gaps(local, remote, requests);
01734         return;
01735       }
01736       if (!requests.empty() && requests.low() < dd_first) {
01737         // Lowest nack was below the start of the durable data.
01738         for (size_t i = 0; i < psr.size(); ++i) {
01739           if (psr[i].first > dd_first) {
01740             break;
01741           }
01742           gaps.insert(SequenceRange(psr[i].first,
01743                                     std::min(psr[i].second, dd_first)));
01744         }
01745         if (Transport_debug_level > 5) {
01746           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01747                      "sending durability gaps for some requests: "));
01748           gaps.dump();
01749         }
01750         send_durability_gaps(local, remote, gaps);
01751         return;
01752       }
01753     }
01754   }
01755   SequenceNumber ack;
01756   ack.setValue(acknack.readerSNState.bitmapBase.high,
01757                acknack.readerSNState.bitmapBase.low);
01758   if (ack != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
01759       && ack != SequenceNumber::ZERO()) {
01760     ri->second.cur_cumulative_ack_ = ack;
01761   }
01762   // If this ACKNACK was final, the DR doesn't expect a reply, and therefore
01763   // we don't need to do anything further.
01764   if (!final) {
01765     ri->second.requested_changes_.push_back(acknack.readerSNState);
01766   }
01767   process_acked_by_all_i(g, local);
01768   g.release();
01769   if (!final) {
01770     nack_reply_.schedule(); // timer will invoke send_nack_replies()
01771   }
01772   typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01773   for (iter_t it = pendingCallbacks.begin();
01774        it != pendingCallbacks.end(); ++it) {
01775     it->second->data_delivered();
01776   }
01777 }

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::HeartBeatFragSubmessage hb_frag,
const GuidPrefix_t src_prefix 
)

Definition at line 1534 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), and process_hb_frag_i().

01536 {
01537   datareader_dispatch(hb_frag, src_prefix, &RtpsUdpDataLink::process_hb_frag_i);
01538 }

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::HeartBeatSubmessage heartbeat,
const GuidPrefix_t src_prefix 
)

Definition at line 1060 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::DCPS::GUID_t::guidPrefix, heartbeat_reply_, interesting_ack_nacks_, interesting_writers_, lock_, OPENDDS_VECTOR(), process_heartbeat_i(), readers_, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), and OpenDDS::RTPS::HeartBeatSubmessage::writerId.

01062 {
01063   RepoId src;
01064   std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01065   src.entityId = heartbeat.writerId;
01066 
01067   bool schedule_acknack = false;
01068   const ACE_Time_Value now = ACE_OS::gettimeofday();
01069   OPENDDS_VECTOR(InterestingRemote) callbacks;
01070 
01071   {
01072     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01073 
01074     // We received a heartbeat from a writer.
01075     // We should ACKNACK if the writer is interesting and there is no association.
01076 
01077     for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
01078            limit = interesting_writers_.upper_bound(src);
01079          pos != limit;
01080          ++pos) {
01081       const RepoId& writerid = src;
01082       const RepoId& readerid = pos->second.localid;
01083 
01084       RtpsReaderMap::const_iterator riter = readers_.find(readerid);
01085       if (riter == readers_.end()) {
01086         // Reader has no associations.
01087         interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01088       } else if (riter->second.remote_writers_.find(writerid) == riter->second.remote_writers_.end()) {
01089         // Reader is not associated with this writer.
01090         interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01091       }
01092       pos->second.last_activity = now;
01093       if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01094         callbacks.push_back(pos->second);
01095         pos->second.status = InterestingRemote::EXISTS;
01096       }
01097     }
01098 
01099     schedule_acknack = !interesting_ack_nacks_.empty();
01100   }
01101 
01102   for (size_t i = 0; i < callbacks.size(); ++i) {
01103     callbacks[i].listener->writer_exists(src, callbacks[i].localid);
01104   }
01105 
01106   if (schedule_acknack) {
01107     heartbeat_reply_.schedule();
01108   }
01109 
01110   datareader_dispatch(heartbeat, src_prefix,
01111                       &RtpsUdpDataLink::process_heartbeat_i);
01112 }

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::GapSubmessage gap,
const GuidPrefix_t src_prefix 
)

Definition at line 1016 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), and process_gap_i().

01018 {
01019   datareader_dispatch(gap, src_prefix, &RtpsUdpDataLink::process_gap_i);
01020 }

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::DataSubmessage data,
const GuidPrefix_t src_prefix 
)

Definition at line 918 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), and process_data_i().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample().

00920 {
00921   datareader_dispatch(data, src_prefix, &RtpsUdpDataLink::process_data_i);
00922 }

void OpenDDS::DCPS::RtpsUdpDataLink::register_for_reader ( const RepoId writerid,
const RepoId readerid,
const ACE_INET_Addr &  address,
OpenDDS::DCPS::DiscoveryListener listener 
)

Definition at line 288 of file RtpsUdpDataLink.cpp.

References heartbeat_, heartbeat_counts_, interesting_readers_, lock_, and OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::schedule_enable().

00292 {
00293   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00294   bool enableheartbeat = interesting_readers_.empty();
00295   interesting_readers_.insert(InterestingRemoteMapType::value_type(readerid, InterestingRemote(writerid, address, listener)));
00296   heartbeat_counts_[writerid] = 0;
00297   g.release();
00298   if (enableheartbeat) {
00299     heartbeat_.schedule_enable();
00300   }
00301 }

void OpenDDS::DCPS::RtpsUdpDataLink::register_for_writer ( const RepoId readerid,
const RepoId writerid,
const ACE_INET_Addr &  address,
OpenDDS::DCPS::DiscoveryListener listener 
)

Definition at line 333 of file RtpsUdpDataLink.cpp.

References heartbeatchecker_, interesting_writers_, lock_, and OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::schedule_enable().

00337 {
00338   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00339   bool enableheartbeatchecker = interesting_writers_.empty();
00340   interesting_writers_.insert(InterestingRemoteMapType::value_type(writerid, InterestingRemote(readerid, address, listener)));
00341   g.release();
00342   if (enableheartbeatchecker) {
00343     heartbeatchecker_.schedule_enable();
00344   }
00345 }

ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::release_remote_i ( const RepoId remote_id  )  [private, virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 56 of file RtpsUdpDataLink.inl.

00057 {
00058   locators_.erase(remote_id);
00059 }

void OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i ( const RepoId remote_id,
const RepoId local_id 
) [private, virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 444 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::GuidConverter::entityKind(), heartbeat_counts_, OpenDDS::DCPS::KIND_READER, OpenDDS::DCPS::KIND_WRITER, lock_, OPENDDS_MULTIMAP, OPENDDS_SET(), OPENDDS_VECTOR(), reader_index_, readers_, and writers_.

00446 {
00447   OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00448   OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00449   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00450   using std::pair;
00451   const GuidConverter conv(local_id);
00452   const EntityKind kind = conv.entityKind();
00453   if (kind == KIND_WRITER) {
00454     const RtpsWriterMap::iterator rw = writers_.find(local_id);
00455 
00456     if (rw != writers_.end()) {
00457       rw->second.remote_readers_.erase(remote_id);
00458 
00459       if (rw->second.remote_readers_.empty()) {
00460         RtpsWriter& writer = rw->second;
00461         typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00462 
00463         if (!writer.to_deliver_.empty()) {
00464           iter_t iter = writer.to_deliver_.begin();
00465           while (iter != writer.to_deliver_.end()) {
00466             to_deliver.push_back(iter->second);
00467             writer.to_deliver_.erase(iter);
00468             iter = writer.to_deliver_.begin();
00469           }
00470         }
00471         if (!writer.elems_not_acked_.empty()) {
00472           OPENDDS_SET(SequenceNumber) sns_to_release;
00473           iter_t iter = writer.elems_not_acked_.begin();
00474           while (iter != writer.elems_not_acked_.end()) {
00475             to_drop.push_back(iter->second);
00476             sns_to_release.insert(iter->first);
00477             writer.elems_not_acked_.erase(iter);
00478             iter = writer.elems_not_acked_.begin();
00479           }
00480           OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00481           while (sns_it != sns_to_release.end()) {
00482             writer.send_buff_->release_acked(*sns_it);
00483             ++sns_it;
00484           }
00485         }
00486         heartbeat_counts_.erase(rw->first);
00487         writers_.erase(rw);
00488       }
00489     }
00490 
00491   } else if (kind == KIND_READER) {
00492     const RtpsReaderMap::iterator rr = readers_.find(local_id);
00493 
00494     if (rr != readers_.end()) {
00495       rr->second.remote_writers_.erase(remote_id);
00496 
00497       for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00498              reader_index_.equal_range(remote_id);
00499            iters.first != iters.second;) {
00500         if (iters.first->second == rr) {
00501           reader_index_.erase(iters.first++);
00502         } else {
00503           ++iters.first;
00504         }
00505       }
00506 
00507       if (rr->second.remote_writers_.empty()) {
00508         readers_.erase(rr);
00509       }
00510     }
00511   }
00512   g.release();
00513   typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00514   tqe_iter deliver_it = to_deliver.begin();
00515   while (deliver_it != to_deliver.end()) {
00516     (*deliver_it)->data_delivered();
00517     ++deliver_it;
00518   }
00519   tqe_iter drop_it = to_drop.begin();
00520   while (drop_it != to_drop.end()) {
00521     (*drop_it)->data_dropped(true);
00522     ++drop_it;
00523   }
00524 }

bool OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos ( const PublicationId pub_id  )  [private]

Definition at line 806 of file RtpsUdpDataLink.cpp.

References force_inline_qos_, OPENDDS_MAP_CMP(), and OpenDDS::DCPS::DataLink::peer_ids().

Referenced by customize_queue_element().

00807 {
00808   if (force_inline_qos_) {
00809     // Force true for testing purposes
00810     return true;
00811   } else {
00812     const GUIDSeq_var peers = peer_ids(pub_id);
00813     if (!peers.ptr()) {
00814       return false;
00815     }
00816     typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::iterator iter_t;
00817     for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00818       const iter_t iter = locators_.find(peers[i]);
00819       if (iter != locators_.end() && iter->second.requires_inline_qos_) {
00820         return true;
00821       }
00822     }
00823     return false;
00824   }
00825 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_ack_nacks ( RtpsReaderMap::iterator  rr,
bool  finalFlag = false 
) [private]

Definition at line 1200 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, bitmap_num_longs(), OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), OpenDDS::DCPS::gen_find_size(), generate_nack_frags(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), recv_strategy_, send_strategy_, OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::Transport_debug_level, and OpenDDS::DCPS::SequenceNumber::ZERO().

Referenced by send_final_acks(), and send_heartbeat_replies().

01201 {
01202   using namespace OpenDDS::RTPS;
01203 
01204   WriterInfoMap& writers = rr->second.remote_writers_;
01205   for (WriterInfoMap::iterator wi = writers.begin(); wi != writers.end();
01206        ++wi) {
01207 
01208     // if we have some negative acknowledgments, we'll ask for a reply
01209     DisjointSequence& recvd = wi->second.recvd_;
01210     const bool nack = wi->second.should_nack() ||
01211       rr->second.nack_durable(wi->second);
01212     bool final = finalFlag || !nack;
01213 
01214     if (wi->second.ack_pending_ || nack || finalFlag) {
01215       const bool prev_ack_pending = wi->second.ack_pending_;
01216       wi->second.ack_pending_ = false;
01217 
01218       SequenceNumber ack;
01219       CORBA::ULong num_bits = 1;
01220       LongSeq8 bitmap;
01221       bitmap.length(1);
01222       bitmap[0] = 0;
01223 
01224       const SequenceNumber& hb_low = wi->second.hb_range_.first;
01225       const SequenceNumber& hb_high = wi->second.hb_range_.second;
01226       const SequenceNumber::Value hb_low_val = hb_low.getValue(),
01227         hb_high_val = hb_high.getValue();
01228 
01229       if (recvd.empty()) {
01230         // Nack the entire heartbeat range.  Only reached when durable.
01231         ack = hb_low;
01232         bitmap.length(bitmap_num_longs(ack, hb_high));
01233         const CORBA::ULong idx = (hb_high_val > hb_low_val + 255)
01234           ? 255
01235           : CORBA::ULong(hb_high_val - hb_low_val);
01236         DisjointSequence::fill_bitmap_range(0, idx,
01237                                             bitmap.get_buffer(),
01238                                             bitmap.length(), num_bits);
01239       } else if (((prev_ack_pending && !nack) || rr->second.nack_durable(wi->second)) && recvd.low() > hb_low) {
01240         // Nack the range between the heartbeat low and the recvd low.
01241         ack = hb_low;
01242         const SequenceNumber& rec_low = recvd.low();
01243         const SequenceNumber::Value rec_low_val = rec_low.getValue();
01244         bitmap.length(bitmap_num_longs(ack, rec_low));
01245         const CORBA::ULong idx = (rec_low_val > hb_low_val + 255)
01246           ? 255
01247           : CORBA::ULong(rec_low_val - hb_low_val);
01248         DisjointSequence::fill_bitmap_range(0, idx,
01249                                             bitmap.get_buffer(),
01250                                             bitmap.length(), num_bits);
01251 
01252       } else {
01253         ack = ++SequenceNumber(recvd.cumulative_ack());
01254         if (recvd.low().getValue() > 1) {
01255           // since the "ack" really is cumulative, we need to make
01256           // sure that a lower discontinuity is not possible later
01257           recvd.insert(SequenceRange(SequenceNumber::ZERO(), recvd.low()));
01258         }
01259 
01260         if (recvd.disjoint()) {
01261           bitmap.length(bitmap_num_longs(ack, recvd.last_ack().previous()));
01262           recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
01263                           num_bits, true);
01264         }
01265       }
01266 
01267       const SequenceNumber::Value ack_val = ack.getValue();
01268 
01269       if (!recvd.empty() && hb_high > recvd.high()) {
01270         const SequenceNumber eff_high =
01271           (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
01272         const SequenceNumber::Value eff_high_val = eff_high.getValue();
01273         // Nack the range between the received high and the effective high.
01274         const CORBA::ULong old_len = bitmap.length(),
01275           new_len = bitmap_num_longs(ack, eff_high);
01276         if (new_len > old_len) {
01277           bitmap.length(new_len);
01278           for (CORBA::ULong i = old_len; i < new_len; ++i) {
01279             bitmap[i] = 0;
01280           }
01281         }
01282         const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
01283           idx_recv_high = recvd.disjoint() ?
01284           CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
01285         DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
01286                                             bitmap.get_buffer(), new_len,
01287                                             num_bits);
01288       }
01289 
01290       // If the receive strategy is holding any fragments, those should
01291       // not be "nacked" in the ACKNACK reply.  They will be accounted for
01292       // in the NACK_FRAG(s) instead.
01293       bool frags_modified =
01294         recv_strategy_->remove_frags_from_bitmap(bitmap.get_buffer(),
01295                                                  num_bits, ack, wi->first);
01296       if (frags_modified && !final) { // change to final if bitmap is empty
01297         final = true;
01298         for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
01299           if ((i + 1) * 32 <= num_bits) {
01300             if (bitmap[i]) {
01301               final = false;
01302               break;
01303             }
01304           } else {
01305             if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
01306               final = false;
01307               break;
01308             }
01309           }
01310         }
01311       }
01312 
01313       AckNackSubmessage acknack = {
01314         {ACKNACK,
01315          CORBA::Octet(1 /*FLAG_E*/ | (final ? 2 /*FLAG_F*/ : 0)),
01316          0 /*length*/},
01317         rr->first.entityId,
01318         wi->first.entityId,
01319         { // SequenceNumberSet: acking bitmapBase - 1
01320           {ack.getHigh(), ack.getLow()},
01321           num_bits, bitmap
01322         },
01323         {++wi->second.acknack_count_}
01324       };
01325 
01326       size_t size = 0, padding = 0;
01327       gen_find_size(acknack, size, padding);
01328       acknack.smHeader.submessageLength =
01329         static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01330       InfoDestinationSubmessage info_dst = {
01331         {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ},
01332         {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01333       };
01334       gen_find_size(info_dst, size, padding);
01335 
01336       OPENDDS_VECTOR(NackFragSubmessage) nack_frags;
01337       size += generate_nack_frags(nack_frags, wi->second, wi->first);
01338 
01339       ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators?
01340       // byte swapping is handled in the operator<<() implementation
01341       Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01342       std::memcpy(info_dst.guidPrefix, wi->first.guidPrefix,
01343                   sizeof(GuidPrefix_t));
01344       ser << info_dst;
01345       // Interoperability note: we used to insert INFO_REPLY submessage here, but
01346       // testing indicated that other DDS implementations didn't accept it.
01347       ser << acknack;
01348       for (size_t i = 0; i < nack_frags.size(); ++i) {
01349         nack_frags[i].readerId = rr->first.entityId;
01350         nack_frags[i].writerId = wi->first.entityId;
01351         ser << nack_frags[i]; // always 4-byte aligned
01352       }
01353 
01354       if (!locators_.count(wi->first)) {
01355         if (Transport_debug_level) {
01356           const GuidConverter conv(wi->first);
01357           ACE_DEBUG((LM_ERROR,
01358                      "(%P|%t) RtpsUdpDataLink::send_heartbeat_replies() - "
01359                      "no locator for remote %C\n", OPENDDS_STRING(conv).c_str()));
01360         }
01361       } else {
01362         send_strategy_->send_rtps_control(mb_acknack,
01363                                           locators_[wi->first].addr_);
01364       }
01365     }
01366   }
01367 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_durability_gaps ( const RepoId writer,
const RepoId reader,
const DisjointSequence gaps 
) [private]

Definition at line 2165 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, get_locator(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, marshal_gaps(), send_strategy_, and OpenDDS::RTPS::SMHDR_SZ.

Referenced by received().

02168 {
02169   ACE_Message_Block mb(RTPS::INFO_DST_SZ + RTPS::SMHDR_SZ);
02170   Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02171   RTPS::InfoDestinationSubmessage info_dst = {
02172     {RTPS::INFO_DST, 1 /*FLAG_E*/, RTPS::INFO_DST_SZ},
02173     {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02174   };
02175   std::memcpy(info_dst.guidPrefix, reader.guidPrefix, sizeof(GuidPrefix_t));
02176   ser << info_dst;
02177   mb.cont(marshal_gaps(writer, reader, gaps));
02178   send_strategy_->send_rtps_control(mb, get_locator(reader));
02179   mb.cont()->release();
02180 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_final_acks ( const RepoId readerid  )  [virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 2553 of file RtpsUdpDataLink.cpp.

References readers_, and send_ack_nacks().

02554 {
02555   RtpsReaderMap::iterator rr = readers_.find (readerid);
02556   if (rr != readers_.end ()) {
02557     send_ack_nacks (rr, true);
02558   }
02559 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeat_replies (  )  [private]

Definition at line 1370 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::gen_find_size(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, interesting_ack_nacks_, lock_, readers_, send_ack_nacks(), send_strategy_, and OpenDDS::RTPS::SMHDR_SZ.

01371 {
01372   using namespace OpenDDS::RTPS;
01373   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01374 
01375   for (InterestingAckNackSetType::const_iterator pos = interesting_ack_nacks_.begin(),
01376          limit = interesting_ack_nacks_.end();
01377        pos != limit;
01378        ++pos) {
01379 
01380     SequenceNumber ack;
01381     LongSeq8 bitmap;
01382     bitmap.length(0);
01383 
01384     AckNackSubmessage acknack = {
01385       {ACKNACK,
01386        CORBA::Octet(1 /*FLAG_E*/ | 2 /*FLAG_F*/),
01387        0 /*length*/},
01388       pos->readerid.entityId,
01389       pos->writerid.entityId,
01390       { // SequenceNumberSet: acking bitmapBase - 1
01391         {ack.getHigh(), ack.getLow()},
01392         0 /* num_bits */, bitmap
01393       },
01394       {0 /* acknack count */}
01395     };
01396 
01397     size_t size = 0, padding = 0;
01398     gen_find_size(acknack, size, padding);
01399     acknack.smHeader.submessageLength =
01400       static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01401     InfoDestinationSubmessage info_dst = {
01402       {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ},
01403       {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01404     };
01405     gen_find_size(info_dst, size, padding);
01406 
01407     ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators?
01408     // byte swapping is handled in the operator<<() implementation
01409     Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01410     std::memcpy(info_dst.guidPrefix, pos->writerid.guidPrefix,
01411                 sizeof(GuidPrefix_t));
01412     ser << info_dst;
01413     // Interoperability note: we used to insert INFO_REPLY submessage here, but
01414     // testing indicated that other DDS implementations didn't accept it.
01415     ser << acknack;
01416 
01417     send_strategy_->send_rtps_control(mb_acknack, pos->writer_address);
01418   }
01419   interesting_ack_nacks_.clear();
01420 
01421   for (RtpsReaderMap::iterator rr = readers_.begin(); rr != readers_.end();
01422        ++rr) {
01423     send_ack_nacks (rr);
01424   }
01425 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats (  )  [private]

Definition at line 2183 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, config_, OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::disable(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpInst::durable_data_timeout_, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::HEARTBEAT, heartbeat_, heartbeat_counts_, OpenDDS::DCPS::RtpsUdpInst::heartbeat_period_, OpenDDS::RTPS::HEARTBEAT_SZ, interesting_readers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, lock_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_SET(), OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::DiscoveryListener::reader_does_not_exist(), reader_no_longer_exists_lock_, send_strategy_, OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::Transport_debug_level, VDBG_LVL, and writers_.

02184 {
02185   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02186   ACE_GUARD(ACE_Thread_Mutex, c, reader_no_longer_exists_lock_);
02187 
02188   if (writers_.empty() && interesting_readers_.empty()) {
02189     heartbeat_.disable();
02190   }
02191 
02192   using namespace OpenDDS::RTPS;
02193   OPENDDS_VECTOR(HeartBeatSubmessage) subm;
02194   OPENDDS_SET(ACE_INET_Addr) recipients;
02195   OPENDDS_VECTOR(TransportQueueElement*) pendingCallbacks;
02196   const ACE_Time_Value now = ACE_OS::gettimeofday();
02197 
02198   RepoIdSet writers_to_advertise;
02199 
02200   const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config_->heartbeat_period_;
02201   const ACE_Time_Value tv3 = ACE_OS::gettimeofday() - 3 * config_->heartbeat_period_;
02202   for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
02203          limit = interesting_readers_.end();
02204        pos != limit;
02205        ++pos) {
02206     if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
02207         (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
02208       recipients.insert(pos->second.address);
02209       writers_to_advertise.insert(pos->second.localid);
02210     }
02211     if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02212       CallbackType callback(pos->first, pos->second);
02213       readerDoesNotExistCallbacks_.push_back(callback);
02214       pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02215     }
02216   }
02217 
02218   typedef RtpsWriterMap::iterator rw_iter;
02219   for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
02220     const bool has_data = !rw->second.send_buff_.is_nil()
02221                           && !rw->second.send_buff_->empty();
02222     bool final = true, has_durable_data = false;
02223     SequenceNumber durable_max;
02224 
02225     typedef ReaderInfoMap::iterator ri_iter;
02226     const ri_iter end = rw->second.remote_readers_.end();
02227     for (ri_iter ri = rw->second.remote_readers_.begin(); ri != end; ++ri) {
02228       if ((has_data || !ri->second.handshake_done_)
02229           && locators_.count(ri->first)) {
02230         recipients.insert(locators_[ri->first].addr_);
02231         if (final && !ri->second.handshake_done_) {
02232           final = false;
02233         }
02234       }
02235       if (!ri->second.durable_data_.empty()) {
02236         const ACE_Time_Value expiration =
02237           ri->second.durable_timestamp_ + config_->durable_data_timeout_;
02238         if (now > expiration) {
02239           typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator
02240             dd_iter;
02241           for (dd_iter it = ri->second.durable_data_.begin();
02242                it != ri->second.durable_data_.end(); ++it) {
02243             pendingCallbacks.push_back(it->second);
02244           }
02245           ri->second.durable_data_.clear();
02246           if (Transport_debug_level > 3) {
02247             const GuidConverter gw(rw->first), gr(ri->first);
02248             VDBG_LVL((LM_INFO, "(%P|%t) RtpsUdpDataLink::send_heartbeats - "
02249               "removed expired durable data for %C -> %C\n",
02250               OPENDDS_STRING(gw).c_str(), OPENDDS_STRING(gr).c_str()), 3);
02251           }
02252         } else {
02253           has_durable_data = true;
02254           if (ri->second.durable_data_.rbegin()->first > durable_max) {
02255             durable_max = ri->second.durable_data_.rbegin()->first;
02256           }
02257           if (locators_.count(ri->first)) {
02258             recipients.insert(locators_[ri->first].addr_);
02259           }
02260         }
02261       }
02262     }
02263 
02264     if (!rw->second.elems_not_acked_.empty()) {
02265       final = false;
02266     }
02267 
02268     if (writers_to_advertise.count(rw->first)) {
02269       final = false;
02270       writers_to_advertise.erase(rw->first);
02271     }
02272 
02273     if (final && !has_data && !has_durable_data) {
02274       continue;
02275     }
02276 
02277     const SequenceNumber firstSN = (rw->second.durable_ || !has_data)
02278                                    ? 1 : rw->second.send_buff_->low(),
02279         lastSN = std::max(durable_max,
02280                           has_data ? rw->second.send_buff_->high() : 1);
02281 
02282     const HeartBeatSubmessage hb = {
02283       {HEARTBEAT,
02284        CORBA::Octet(1 /*FLAG_E*/ | (final ? 2 /*FLAG_F*/ : 0)),
02285        HEARTBEAT_SZ},
02286       ENTITYID_UNKNOWN, // any matched reader may be interested in this
02287       rw->first.entityId,
02288       {firstSN.getHigh(), firstSN.getLow()},
02289       {lastSN.getHigh(), lastSN.getLow()},
02290       {++heartbeat_counts_[rw->first]}
02291     };
02292     subm.push_back(hb);
02293   }
02294 
02295   for (RepoIdSet::const_iterator pos = writers_to_advertise.begin(),
02296          limit = writers_to_advertise.end();
02297        pos != limit;
02298        ++pos) {
02299     const SequenceNumber SN = 1;
02300     const HeartBeatSubmessage hb = {
02301       {HEARTBEAT,
02302        CORBA::Octet(1 /*FLAG_E*/),
02303        HEARTBEAT_SZ},
02304       ENTITYID_UNKNOWN, // any matched reader may be interested in this
02305       pos->entityId,
02306       {SN.getHigh(), SN.getLow()},
02307       {SN.getHigh(), SN.getLow()},
02308       {++heartbeat_counts_[*pos]}
02309     };
02310     subm.push_back(hb);
02311   }
02312 
02313   if (!subm.empty()) {
02314     ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * subm.size()); //FUTURE: allocators?
02315     // byte swapping is handled in the operator<<() implementation
02316     Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02317     bool send_ok = true;
02318     for (size_t i = 0; i < subm.size(); ++i) {
02319       if (!(ser << subm[i])) {
02320         ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats() - "
02321           "failed to serialize HEARTBEAT submessage %B\n", i));
02322         send_ok = false;
02323         break;
02324       }
02325     }
02326     if (send_ok) {
02327       send_strategy_->send_rtps_control(mb, recipients);
02328     }
02329   }
02330   c.release();
02331   g.release();
02332 
02333   while(true) {
02334     c.acquire();
02335     if (readerDoesNotExistCallbacks_.empty()) {
02336       break;
02337     }
02338     OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks_.begin();
02339     const RepoId& rid = iter->first;
02340     const InterestingRemote& remote = iter->second;
02341     readerDoesNotExistCallbacks_.erase(iter);
02342     c.release();
02343     remote.listener->reader_does_not_exist(rid, remote.localid);
02344   }
02345 
02346   for (size_t i = 0; i < pendingCallbacks.size(); ++i) {
02347     pendingCallbacks[i]->data_dropped();
02348   }
02349 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual ( const TransportSendControlElement tsce  )  [private]

Definition at line 2384 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, best_effort_heartbeat_count_, config_, OpenDDS::DCPS::RtpsUdpInst::durable_data_timeout_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, get_locators(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::HEARTBEAT, heartbeat_counts_, OpenDDS::RTPS::HEARTBEAT_SZ, OPENDDS_SET(), OpenDDS::DCPS::TransportSendControlElement::publication_id(), send_strategy_, OpenDDS::DCPS::TransportSendControlElement::sequence(), OpenDDS::RTPS::SMHDR_SZ, and writers_.

Referenced by customize_queue_element().

02385 {
02386   using namespace OpenDDS::RTPS;
02387 
02388   const RepoId pub_id = tsce->publication_id();
02389 
02390   // Populate the recipients.
02391   OPENDDS_SET(ACE_INET_Addr) recipients;
02392   get_locators (pub_id, recipients);
02393   if (recipients.empty()) {
02394     return;
02395   }
02396 
02397   // Populate the sequence numbers and counter.
02398 
02399   SequenceNumber firstSN, lastSN;
02400   CORBA::Long counter;
02401   RtpsWriterMap::iterator pos = writers_.find (pub_id);
02402   if (pos != writers_.end ()) {
02403     // Reliable.
02404     const bool has_data = !pos->second.send_buff_.is_nil() && !pos->second.send_buff_->empty();
02405     SequenceNumber durable_max;
02406     const ACE_Time_Value now = ACE_OS::gettimeofday();
02407     for (ReaderInfoMap::const_iterator ri = pos->second.remote_readers_.begin(), end = pos->second.remote_readers_.end();
02408          ri != end;
02409          ++ri) {
02410       if (!ri->second.durable_data_.empty()) {
02411         const ACE_Time_Value expiration = ri->second.durable_timestamp_ + config_->durable_data_timeout_;
02412         if (now <= expiration &&
02413             ri->second.durable_data_.rbegin()->first > durable_max) {
02414           durable_max = ri->second.durable_data_.rbegin()->first;
02415         }
02416       }
02417     }
02418     firstSN = (pos->second.durable_ || !has_data) ? 1 : pos->second.send_buff_->low();
02419     lastSN = std::max(durable_max, has_data ? pos->second.send_buff_->high() : 1);
02420     counter = ++heartbeat_counts_[pos->first];
02421   } else {
02422     // Unreliable.
02423     firstSN = 1;
02424     lastSN = tsce->sequence();
02425     counter = ++this->best_effort_heartbeat_count_;
02426   }
02427 
02428   const HeartBeatSubmessage hb = {
02429     {HEARTBEAT,
02430      CORBA::Octet(1 /*FLAG_E*/ | 2 /*FLAG_F*/ | 4 /*FLAG_L*/),
02431      HEARTBEAT_SZ},
02432     ENTITYID_UNKNOWN, // any matched reader may be interested in this
02433     pub_id.entityId,
02434     {firstSN.getHigh(), firstSN.getLow()},
02435     {lastSN.getHigh(), lastSN.getLow()},
02436     {counter}
02437   };
02438 
02439   ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * 1); //FUTURE: allocators?
02440   // byte swapping is handled in the operator<<() implementation
02441   Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02442   if ((ser << hb)) {
02443     send_strategy_->send_rtps_control(mb, recipients);
02444   }
02445   else {
02446     ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats_manual() - "
02447                "failed to serialize HEARTBEAT submessage\n"));
02448   }
02449 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_i ( TransportQueueElement element,
bool  relink = true 
) [private, virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 434 of file RtpsUdpDataLink.cpp.

References lock_, and OpenDDS::DCPS::DataLink::send_i().

00435 {
00436   // Lock here to maintain the locking order:
00437   // RtpsUdpDataLink before RtpsUdpSendStrategy
00438   // which is required for resending due to nacks
00439   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00440   DataLink::send_i(element, relink);
00441 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies (  )  [private]

Definition at line 1833 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::heartbeat_high(), OpenDDS::DCPS::DisjointSequence::insert(), lock_, marshal_gaps(), OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_SET(), OPENDDS_STRING, OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::SingleSendBuffer::resend_i(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, send_nackfrag_replies(), send_strategy_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::TransportSendBuffer::strategy_lock(), OpenDDS::DCPS::Transport_debug_level, and writers_.

01834 {
01835   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01836   // Reply from local DW to remote DR: GAP or DATA
01837   using namespace OpenDDS::RTPS;
01838   typedef RtpsWriterMap::iterator rw_iter;
01839   for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
01840 
01841     // consolidate requests from N readers
01842     OPENDDS_SET(ACE_INET_Addr) recipients;
01843     DisjointSequence requests;
01844     RtpsWriter& writer = rw->second;
01845 
01846     //track if any messages have been fully acked by all readers
01847     SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
01848 
01849     typedef ReaderInfoMap::iterator ri_iter;
01850     const ri_iter end = writer.remote_readers_.end();
01851     for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01852 
01853       if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
01854         all_readers_ack = ri->second.cur_cumulative_ack_;
01855       }
01856 
01857       for (size_t i = 0; i < ri->second.requested_changes_.size(); ++i) {
01858         const SequenceNumberSet& sn_state = ri->second.requested_changes_[i];
01859         SequenceNumber base;
01860         base.setValue(sn_state.bitmapBase.high, sn_state.bitmapBase.low);
01861         if (sn_state.numBits == 1 && !(sn_state.bitmap[0] & 1)
01862             && base == writer.heartbeat_high(ri->second)) {
01863           // Since there is an entry in requested_changes_, the DR must have
01864           // sent a non-final AckNack.  If the base value is the high end of
01865           // the heartbeat range, treat it as a request for that seq#.
01866           if (!writer.send_buff_.is_nil() && writer.send_buff_->contains(base)) {
01867             requests.insert(base);
01868           }
01869         } else {
01870           requests.insert(base, sn_state.numBits, sn_state.bitmap.get_buffer());
01871         }
01872       }
01873 
01874       if (ri->second.requested_changes_.size()) {
01875         if (locators_.count(ri->first)) {
01876           recipients.insert(locators_[ri->first].addr_);
01877           if (Transport_debug_level > 5) {
01878             const GuidConverter local_conv(rw->first), remote_conv(ri->first);
01879             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01880                        "local %C remote %C requested resend\n",
01881                        OPENDDS_STRING(local_conv).c_str(),
01882                        OPENDDS_STRING(remote_conv).c_str()));
01883           }
01884         }
01885         ri->second.requested_changes_.clear();
01886       }
01887     }
01888 
01889     DisjointSequence gaps;
01890     if (!requests.empty()) {
01891       if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
01892         gaps = requests;
01893       } else {
01894         OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
01895         SingleSendBuffer& sb = *writer.send_buff_;
01896         ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
01897         const RtpsUdpSendStrategy::OverrideToken ot =
01898           send_strategy_->override_destinations(recipients);
01899         for (size_t i = 0; i < ranges.size(); ++i) {
01900           if (Transport_debug_level > 5) {
01901             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01902                        "resend data %d-%d\n", int(ranges[i].first.getValue()),
01903                        int(ranges[i].second.getValue())));
01904           }
01905           sb.resend_i(ranges[i], &gaps);
01906         }
01907       }
01908     }
01909 
01910     send_nackfrag_replies(writer, gaps, recipients);
01911 
01912     if (!gaps.empty()) {
01913       if (Transport_debug_level > 5) {
01914         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01915                    "GAPs:"));
01916         gaps.dump();
01917       }
01918       ACE_Message_Block* mb_gap =
01919         marshal_gaps(rw->first, GUID_UNKNOWN, gaps, writer.durable_);
01920       if (mb_gap) {
01921         send_strategy_->send_rtps_control(*mb_gap, recipients);
01922         mb_gap->release();
01923       }
01924     }
01925     if (all_readers_ack == SequenceNumber::MAX_VALUE) {
01926       continue;
01927     }
01928   }
01929 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_nackfrag_replies ( RtpsWriter writer,
DisjointSequence gaps,
OPENDDS_SET(ACE_INET_Addr)&  gap_recipients 
) [private]

Definition at line 1932 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::OPENDDS_MAP(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, and send_strategy_.

Referenced by send_nack_replies().

01935 {
01936   typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
01937   OPENDDS_MAP(ACE_INET_Addr, FragmentInfo) requests;
01938 
01939   typedef ReaderInfoMap::iterator ri_iter;
01940   const ri_iter end = writer.remote_readers_.end();
01941   for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01942 
01943     if (ri->second.requested_frags_.empty() || !locators_.count(ri->first)) {
01944       continue;
01945     }
01946 
01947     const ACE_INET_Addr& remote_addr = locators_[ri->first].addr_;
01948 
01949     typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet)::iterator rf_iter;
01950     const rf_iter rf_end = ri->second.requested_frags_.end();
01951     for (rf_iter rf = ri->second.requested_frags_.begin(); rf != rf_end; ++rf) {
01952 
01953       const SequenceNumber& seq = rf->first;
01954       if (writer.send_buff_->contains(seq)) {
01955         FragmentInfo& fi = requests[remote_addr];
01956         fi[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
01957                        rf->second.bitmap.get_buffer());
01958       } else {
01959         gaps.insert(seq);
01960         gap_recipients.insert(remote_addr);
01961       }
01962     }
01963     ri->second.requested_frags_.clear();
01964   }
01965 
01966   typedef OPENDDS_MAP(ACE_INET_Addr, FragmentInfo)::iterator req_iter;
01967   for (req_iter req = requests.begin(); req != requests.end(); ++req) {
01968     const FragmentInfo& fi = req->second;
01969 
01970     ACE_GUARD(TransportSendBuffer::LockType, guard,
01971       writer.send_buff_->strategy_lock());
01972     const RtpsUdpSendStrategy::OverrideToken ot =
01973       send_strategy_->override_destinations(req->first);
01974 
01975     for (FragmentInfo::const_iterator sn_iter = fi.begin();
01976          sn_iter != fi.end(); ++sn_iter) {
01977       const SequenceNumber& seq = sn_iter->first;
01978       writer.send_buff_->resend_fragments_i(seq, sn_iter->second);
01979     }
01980   }
01981 }

ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::send_strategy ( RtpsUdpSendStrategy send_strategy  ) 

Definition at line 12 of file RtpsUdpDataLink.inl.

References send_strategy_.

00013 {
00014   send_strategy_ = send_strategy;
00015 }

void OpenDDS::DCPS::RtpsUdpDataLink::stop_i (  )  [private, virtual]

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 527 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::cancel(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::disable(), heartbeat_, heartbeat_reply_, multicast_socket_, nack_reply_, and unicast_socket_.

Referenced by open().

00528 {
00529   nack_reply_.cancel();
00530   heartbeat_reply_.cancel();
00531   heartbeat_.disable();
00532   unicast_socket_.close();
00533   multicast_socket_.close();
00534 }

ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket (  ) 

Definition at line 44 of file RtpsUdpDataLink.inl.

References unicast_socket_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().

00045 {
00046   return unicast_socket_;
00047 }

void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_reader ( const RepoId writerid,
const RepoId readerid 
)

Definition at line 304 of file RtpsUdpDataLink.cpp.

References interesting_readers_, lock_, OPENDDS_VECTOR(), and reader_no_longer_exists_lock_.

00306 {
00307   OPENDDS_VECTOR(CallbackType) to_notify;
00308   {
00309     ACE_GUARD(ACE_Thread_Mutex, c, reader_no_longer_exists_lock_);
00310     to_notify.swap(readerDoesNotExistCallbacks_);
00311   }
00312   OPENDDS_VECTOR(CallbackType)::iterator iter = to_notify.begin();
00313   while(iter != to_notify.end()) {
00314     const RepoId& rid = iter->first;
00315     const InterestingRemote& remote = iter->second;
00316     remote.listener->reader_does_not_exist(rid, remote.localid);
00317     ++iter;
00318   }
00319   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00320   for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
00321          limit = interesting_readers_.upper_bound(readerid);
00322        pos != limit;
00323        ) {
00324     if (pos->second.localid == writerid) {
00325       interesting_readers_.erase(pos++);
00326     } else {
00327       ++pos;
00328     }
00329   }
00330 }

void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_writer ( const RepoId readerid,
const RepoId writerid 
)

Definition at line 348 of file RtpsUdpDataLink.cpp.

References interesting_writers_, lock_, OPENDDS_VECTOR(), and writer_no_longer_exists_lock_.

00350 {
00351   OPENDDS_VECTOR(CallbackType) to_notify;
00352   {
00353     ACE_GUARD(ACE_Thread_Mutex, c, writer_no_longer_exists_lock_);
00354     to_notify.swap(writerDoesNotExistCallbacks_);
00355   }
00356   OPENDDS_VECTOR(CallbackType)::iterator iter = to_notify.begin();
00357   while(iter != to_notify.end()) {
00358     const RepoId& rid = iter->first;
00359     const InterestingRemote& remote = iter->second;
00360     remote.listener->writer_does_not_exist(rid, remote.localid);
00361     ++iter;
00362   }
00363   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00364   for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
00365          limit = interesting_writers_.upper_bound(writerid);
00366        pos != limit;
00367        ) {
00368     if (pos->second.localid == readerid) {
00369       interesting_writers_.erase(pos++);
00370     } else {
00371       ++pos;
00372     }
00373   }
00374 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Definition at line 133 of file RtpsUdpDataLink.h.


Member Data Documentation

CORBA::Long OpenDDS::DCPS::RtpsUdpDataLink::best_effort_heartbeat_count_ [private]

Definition at line 348 of file RtpsUdpDataLink.h.

Referenced by send_heartbeats_manual().

RtpsUdpInst* OpenDDS::DCPS::RtpsUdpDataLink::config_ [private]

Definition at line 143 of file RtpsUdpDataLink.h.

Referenced by check_heartbeats(), config(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::enable(), open(), send_heartbeats(), and send_heartbeats_manual().

bool OpenDDS::DCPS::RtpsUdpDataLink::force_inline_qos_ = false [static, private]

static member used by testing code to force inline qos

Definition at line 135 of file RtpsUdpDataLink.h.

Referenced by requires_inline_qos().

OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_ [private]

Referenced by associated(), register_for_reader(), send_heartbeats(), and stop_i().

HeartBeatCountMapType OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_counts_ [private]

Definition at line 458 of file RtpsUdpDataLink.h.

Referenced by associated(), pre_stop_i(), register_for_reader(), release_reservations_i(), send_heartbeats(), and send_heartbeats_manual().

OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_reply_ [private]

Referenced by received(), and stop_i().

OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat OpenDDS::DCPS::RtpsUdpDataLink::heartbeatchecker_ [private]

Referenced by register_for_writer().

InterestingAckNackSetType OpenDDS::DCPS::RtpsUdpDataLink::interesting_ack_nacks_ [private]

Definition at line 481 of file RtpsUdpDataLink.h.

Referenced by received(), and send_heartbeat_replies().

InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_readers_ [private]

Definition at line 446 of file RtpsUdpDataLink.h.

Referenced by received(), register_for_reader(), send_heartbeats(), and unregister_for_reader().

InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_writers_ [private]

Definition at line 447 of file RtpsUdpDataLink.h.

Referenced by check_heartbeats(), received(), register_for_writer(), and unregister_for_writer().

GuidPrefix_t OpenDDS::DCPS::RtpsUdpDataLink::local_prefix_ [private]

Definition at line 149 of file RtpsUdpDataLink.h.

Referenced by received(), and RtpsUdpDataLink().

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::lock_ [mutable, private]

lock_ protects data structures accessed by both the transport's thread (TransportReactorTask) and an external thread which is responsible for adding/removing associations from the DataLink.

Definition at line 268 of file RtpsUdpDataLink.h.

Referenced by add_locator(), associated(), check_heartbeats(), customize_queue_element(), do_remove_sample(), pre_stop_i(), received(), register_for_reader(), register_for_writer(), release_reservations_i(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::retain_all(), send_heartbeat_replies(), send_heartbeats(), send_i(), send_nack_replies(), unregister_for_reader(), and unregister_for_writer().

OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer OpenDDS::DCPS::RtpsUdpDataLink::multi_buff_ [private]

Referenced by open().

ACE_SOCK_Dgram_Mcast OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket_ [private]

Definition at line 161 of file RtpsUdpDataLink.h.

Referenced by multicast_socket(), open(), and stop_i().

OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay OpenDDS::DCPS::RtpsUdpDataLink::nack_reply_ [private]

Referenced by received(), and stop_i().

TransportReactorTask_rch OpenDDS::DCPS::RtpsUdpDataLink::reactor_task_ [private]

Definition at line 144 of file RtpsUdpDataLink.h.

Referenced by get_reactor(), and reactor_is_shut_down().

RtpsReaderIndex OpenDDS::DCPS::RtpsUdpDataLink::reader_index_ [private]

Definition at line 261 of file RtpsUdpDataLink.h.

Referenced by associated(), and release_reservations_i().

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::reader_no_longer_exists_lock_ [mutable, private]

Definition at line 449 of file RtpsUdpDataLink.h.

Referenced by send_heartbeats(), and unregister_for_reader().

RtpsReaderMap OpenDDS::DCPS::RtpsUdpDataLink::readers_ [private]

Definition at line 257 of file RtpsUdpDataLink.h.

Referenced by associated(), received(), release_reservations_i(), send_final_acks(), and send_heartbeat_replies().

RtpsUdpReceiveStrategy_rch OpenDDS::DCPS::RtpsUdpDataLink::recv_strategy_ [private]

Definition at line 147 of file RtpsUdpDataLink.h.

Referenced by generate_nack_frags(), open(), process_data_i(), process_heartbeat_i(), receive_strategy(), and send_ack_nacks().

RtpsCustomizedElementAllocator OpenDDS::DCPS::RtpsUdpDataLink::rtps_customized_element_allocator_ [private]

Definition at line 163 of file RtpsUdpDataLink.h.

Referenced by customize_queue_element().

RtpsUdpSendStrategy_rch OpenDDS::DCPS::RtpsUdpDataLink::send_strategy_ [private]

The transport send strategy object for this DataLink.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 146 of file RtpsUdpDataLink.h.

Referenced by durability_resend(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert(), open(), send_ack_nacks(), send_durability_gaps(), send_heartbeat_replies(), send_heartbeats(), send_heartbeats_manual(), send_nack_replies(), send_nackfrag_replies(), and send_strategy().

ACE_SOCK_Dgram OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket_ [private]

Definition at line 160 of file RtpsUdpDataLink.h.

Referenced by open(), stop_i(), and unicast_socket().

ACE_Thread_Mutex OpenDDS::DCPS::RtpsUdpDataLink::writer_no_longer_exists_lock_ [mutable, private]

Definition at line 449 of file RtpsUdpDataLink.h.

Referenced by check_heartbeats(), and unregister_for_writer().

RtpsWriterMap OpenDDS::DCPS::RtpsUdpDataLink::writers_ [private]

Definition at line 223 of file RtpsUdpDataLink.h.

Referenced by add_delayed_notification(), add_gap_submsg(), associated(), check_handshake_complete(), customize_queue_element(), do_remove_sample(), end_historic_samples(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert(), marshal_gaps(), pre_stop_i(), process_acked_by_all_i(), received(), release_reservations_i(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::retain_all(), send_heartbeats(), send_heartbeats_manual(), and send_nack_replies().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:36 2016 for OpenDDS by  doxygen 1.4.7