OpenDDS::DCPS::RtpsUdpDataLink Class Reference

#include <RtpsUdpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::RtpsUdpDataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::RtpsUdpDataLink:
Collaboration graph
[legend]

List of all members.

Classes

struct  HeartBeat
class  HeldDataDeliveryHandler
struct  InterestingAckNack
struct  InterestingRemote
 Data structure representing an "interesting" remote entity for static discovery. More...
struct  MultiSendBuffer
struct  ReaderInfo
struct  RemoteInfo
struct  RtpsReader
struct  RtpsWriter
struct  TimedDelay
struct  WriterInfo

Public Member Functions

 RtpsUdpDataLink (RtpsUdpTransport &transport, const GuidPrefix_t &local_prefix, const RtpsUdpInst &config, const TransportReactorTask_rch &reactor_task)
bool add_delayed_notification (TransportQueueElement *element)
void do_remove_sample (const RepoId &pub_id, const TransportQueueElement::MatchCriteria &criteria, ACE_Guard< ACE_Thread_Mutex > &guard)
RtpsUdpInstconfig () const
ACE_Reactorget_reactor ()
bool reactor_is_shut_down ()
ACE_SOCK_Dgramunicast_socket ()
ACE_SOCK_Dgram_Mcastmulticast_socket ()
bool open (const ACE_SOCK_Dgram &unicast_socket)
void received (const RTPS::DataSubmessage &data, const GuidPrefix_t &src_prefix)
void received (const RTPS::GapSubmessage &gap, const GuidPrefix_t &src_prefix)
void received (const RTPS::HeartBeatSubmessage &heartbeat, const GuidPrefix_t &src_prefix)
void received (const RTPS::HeartBeatFragSubmessage &hb_frag, const GuidPrefix_t &src_prefix)
void received (const RTPS::AckNackSubmessage &acknack, const GuidPrefix_t &src_prefix)
void received (const RTPS::NackFragSubmessage &nackfrag, const GuidPrefix_t &src_prefix)
const GuidPrefix_tlocal_prefix () const
void add_locator (const RepoId &remote_id, const ACE_INET_Addr &address, bool requires_inline_qos)
void get_locators (const RepoId &local_id, OPENDDS_SET(ACE_INET_Addr)&addrs) const
ACE_INET_Addr get_locator (const RepoId &remote_id) const
void associated (const RepoId &local, const RepoId &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable)
bool check_handshake_complete (const RepoId &local, const RepoId &remote)
void register_for_reader (const RepoId &writerid, const RepoId &readerid, const ACE_INET_Addr &address, OpenDDS::DCPS::DiscoveryListener *listener)
void unregister_for_reader (const RepoId &writerid, const RepoId &readerid)
void register_for_writer (const RepoId &readerid, const RepoId &writerid, const ACE_INET_Addr &address, OpenDDS::DCPS::DiscoveryListener *listener)
void unregister_for_writer (const RepoId &readerid, const RepoId &writerid)
virtual void pre_stop_i ()
virtual void send_final_acks (const RepoId &readerid)

Private Types

typedef void(RtpsUdpDataLink::* PMF )()
typedef std::pair< RepoId,
InterestingRemote
CallbackType

Private Member Functions

virtual void stop_i ()
virtual void send_i (TransportQueueElement *element, bool relink=true)
RemoveResult remove_sample (const DataSampleElement *sample, void *context)
virtual TransportQueueElementcustomize_queue_element (TransportQueueElement *element)
virtual void release_remote_i (const RepoId &remote_id)
virtual void release_reservations_i (const RepoId &remote_id, const RepoId &local_id)
bool requires_inline_qos (const GUIDSeq_var &peers)
typedef OPENDDS_MAP_CMP (RepoId, OPENDDS_VECTOR(RepoId), GUID_tKeyLessThan) DestToEntityMap
void add_gap_submsg (RTPS::SubmessageSeq &msg, const TransportQueueElement &tqe, const DestToEntityMap &dtem)
RtpsUdpSendStrategysend_strategy ()
RtpsUdpReceiveStrategyreceive_strategy ()
 OPENDDS_MAP_CMP (RepoId, RemoteInfo, GUID_tKeyLessThan) locators_
typedef OPENDDS_MAP_CMP (RepoId, ReaderInfo, GUID_tKeyLessThan) ReaderInfoMap
typedef OPENDDS_MAP_CMP (RepoId, RtpsWriter, GUID_tKeyLessThan) RtpsWriterMap
void end_historic_samples (RtpsWriterMap::iterator writer, const DataSampleHeader &header, ACE_Message_Block *body)
typedef OPENDDS_MAP_CMP (RepoId, WriterInfo, GUID_tKeyLessThan) WriterInfoMap
typedef OPENDDS_MAP_CMP (RepoId, RtpsReader, GUID_tKeyLessThan) RtpsReaderMap
typedef OPENDDS_MULTIMAP_CMP (RepoId, RtpsReaderMap::iterator, GUID_tKeyLessThan) RtpsReaderIndex
void deliver_held_data (const RepoId &readerId, WriterInfo &info, bool durable)
size_t generate_nack_frags (OPENDDS_VECTOR(RTPS::NackFragSubmessage)&nack_frags, WriterInfo &wi, const RepoId &pub_id)
bool process_heartbeat_i (const RTPS::HeartBeatSubmessage &heartbeat, const RepoId &src, RtpsReaderMap::value_type &rr)
bool process_hb_frag_i (const RTPS::HeartBeatFragSubmessage &hb_frag, const RepoId &src, RtpsReaderMap::value_type &rr)
bool process_gap_i (const RTPS::GapSubmessage &gap, const RepoId &src, RtpsReaderMap::value_type &rr)
bool process_data_i (const RTPS::DataSubmessage &data, const RepoId &src, RtpsReaderMap::value_type &rr)
void durability_resend (TransportQueueElement *element)
void send_durability_gaps (const RepoId &writer, const RepoId &reader, const DisjointSequence &gaps)
ACE_Message_Blockmarshal_gaps (const RepoId &writer, const RepoId &reader, const DisjointSequence &gaps, bool durable=false)
void send_nackfrag_replies (RtpsWriter &writer, DisjointSequence &gaps, OPENDDS_SET(ACE_INET_Addr)&gap_recipients)
template<typename T , typename FN >
void datareader_dispatch (const T &submessage, const GuidPrefix_t &src_prefix, const FN &func)
void send_nack_replies ()
void send_directed_nack_replies (const RepoId &writerId, RtpsWriter &writer, const RepoId &readerId, ReaderInfo &reader)
void process_requested_changes (DisjointSequence &requests, const RtpsWriter &writer, const ReaderInfo &reader)
void process_acked_by_all_i (ACE_Guard< ACE_Thread_Mutex > &g, const RepoId &pub_id)
void send_heartbeats ()
void send_directed_heartbeats (OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)&hbs)
void check_heartbeats ()
void send_heartbeats_manual (const TransportSendControlElement *tsce)
void send_heartbeat_replies ()
typedef OPENDDS_MULTIMAP_CMP (RepoId, InterestingRemote, DCPS::GUID_tKeyLessThan) InterestingRemoteMapType
typedef OPENDDS_MAP_CMP (RepoId, CORBA::Long, DCPS::GUID_tKeyLessThan) HeartBeatCountMapType
typedef OPENDDS_SET (InterestingAckNack) InterestingAckNackSetType
void send_ack_nacks (RtpsReaderMap::iterator rr, bool finalFlag=false)

Static Private Member Functions

static void extend_bitmap_range (RTPS::FragmentNumberSet &fnSet, CORBA::ULong extent)

Private Attributes

TransportReactorTask_rch reactor_task_
GuidPrefix_t local_prefix_
ACE_SOCK_Dgram unicast_socket_
ACE_SOCK_Dgram_Mcast multicast_socket_
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer multi_buff_
RtpsWriterMap writers_
RtpsReaderMap readers_
RtpsReaderIndex reader_index_
ACE_Thread_Mutex lock_
CORBA::Long best_effort_heartbeat_count_
OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay nack_reply_
OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay heartbeat_reply_
RcHandle< HeartBeatheartbeat_
RcHandle< HeartBeatheartbeatchecker_
InterestingRemoteMapType interesting_readers_
InterestingRemoteMapType interesting_writers_
HeartBeatCountMapType heartbeat_counts_
InterestingAckNackSetType interesting_ack_nacks_
HeldDataDeliveryHandler held_data_delivery_handler_

Static Private Attributes

static bool force_inline_qos_ = false
 static member used by testing code to force inline qos

Friends

class ::DDS_TEST

Detailed Description

Definition at line 54 of file RtpsUdpDataLink.h.


Member Typedef Documentation

Definition at line 481 of file RtpsUdpDataLink.h.

typedef void(RtpsUdpDataLink::* OpenDDS::DCPS::RtpsUdpDataLink::PMF)() [private]

Definition at line 380 of file RtpsUdpDataLink.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::RtpsUdpDataLink::RtpsUdpDataLink ( RtpsUdpTransport transport,
const GuidPrefix_t local_prefix,
const RtpsUdpInst config,
const TransportReactorTask_rch reactor_task 
)

Definition at line 80 of file RtpsUdpDataLink.cpp.

References defined(), if(), local_prefix_, OpenDDS::DCPS::DataLink::receive_strategy_, Security, and OpenDDS::DCPS::DataLink::send_strategy_.

00084   : DataLink(transport, // 3 data link "attributes", below, are unused
00085              0,         // priority
00086              false,     // is_loopback
00087              false),    // is_active
00088     reactor_task_(reactor_task),
00089     multi_buff_(this, config.nak_depth_),
00090     best_effort_heartbeat_count_(0),
00091     nack_reply_(this, &RtpsUdpDataLink::send_nack_replies,
00092                 config.nak_response_delay_),
00093     heartbeat_reply_(this, &RtpsUdpDataLink::send_heartbeat_replies,
00094                      config.heartbeat_response_delay_),
00095     heartbeat_(make_rch<HeartBeat>(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::send_heartbeats)),
00096     heartbeatchecker_(make_rch<HeartBeat>(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::check_heartbeats)),
00097 #if defined(OPENDDS_SECURITY)
00098     held_data_delivery_handler_(this),
00099     security_config_(Security::SecurityRegistry::instance()->default_config()),
00100     local_crypto_handle_(DDS::HANDLE_NIL)
00101 #else
00102     held_data_delivery_handler_(this)
00103 #endif
00104 {
00105   this->send_strategy_ = make_rch<RtpsUdpSendStrategy>(this, local_prefix);
00106   this->receive_strategy_ = make_rch<RtpsUdpReceiveStrategy>(this, local_prefix);
00107   std::memcpy(local_prefix_, local_prefix, sizeof(GuidPrefix_t));
00108 }

Here is the call graph for this function:


Member Function Documentation

bool OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification ( TransportQueueElement element  ) 

Definition at line 117 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransportQueueElement::publication_id(), and writers_.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification().

00118 {
00119   RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
00120   if (iter != writers_.end()) {
00121 
00122     iter->second.add_elem_awaiting_ack(element);
00123     return true;
00124   }
00125   return false;
00126 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::add_gap_submsg ( RTPS::SubmessageSeq msg,
const TransportQueueElement tqe,
const DestToEntityMap &  dtem 
) [private]

Definition at line 891 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::expected_, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::TransportQueueElement::subscription_id(), and writers_.

Referenced by customize_queue_element().

00894 {
00895   // These are the GAP submessages that we'll send directly in-line with the
00896   // DATA when we notice that the DataWriter has deliberately skipped seq #s.
00897   // There are other GAP submessages generated in response to reader ACKNACKS,
00898   // see send_nack_replies().
00899   using namespace OpenDDS::RTPS;
00900 
00901   const SequenceNumber seq = tqe.sequence();
00902   const RepoId pub = tqe.publication_id();
00903   if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || pub == GUID_UNKNOWN
00904       || tqe.subscription_id() != GUID_UNKNOWN) {
00905     return;
00906   }
00907 
00908   const RtpsWriterMap::iterator wi = writers_.find(pub);
00909   if (wi == writers_.end()) {
00910     return; // not a reliable writer, does not send GAPs
00911   }
00912 
00913   RtpsWriter& rw = wi->second;
00914 
00915   if (seq != rw.expected_) {
00916     SequenceNumber firstMissing = rw.expected_;
00917 
00918     // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
00919     // [gapStart, gapListBase) and those in the SNSet.
00920     const SequenceNumber_t gapStart = {firstMissing.getHigh(),
00921                                        firstMissing.getLow()},
00922                            gapListBase = {seq.getHigh(),
00923                                           seq.getLow()};
00924 
00925     // We are not going to enable any bits in the "bitmap" of the SNSet,
00926     // but the "numBits" and the bitmap.length must both be > 0.
00927     LongSeq8 bitmap;
00928     bitmap.length(1);
00929     bitmap[0] = 0;
00930 
00931     GapSubmessage gap = {
00932       {GAP, FLAG_E, 0 /*length determined below*/},
00933       ENTITYID_UNKNOWN, // readerId: applies to all matched readers
00934       pub.entityId,
00935       gapStart,
00936       {gapListBase, 1, bitmap}
00937     };
00938 
00939     size_t size = 0, padding = 0;
00940     gen_find_size(gap, size, padding);
00941     gap.smHeader.submessageLength =
00942       static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
00943 
00944     if (!rw.durable_) {
00945       const CORBA::ULong i = msg.length();
00946       msg.length(i + 1);
00947       msg[i].gap_sm(gap);
00948     } else {
00949       InfoDestinationSubmessage idst = {
00950         {INFO_DST, FLAG_E, INFO_DST_SZ},
00951         {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
00952       };
00953       CORBA::ULong ml = msg.length();
00954 
00955       //Change the non-directed Gap into multiple directed gaps to prevent
00956       //delivering to currently undiscovered durable readers
00957       DestToEntityMap::const_iterator iter = dtem.begin();
00958       while (iter != dtem.end()) {
00959         std::memcpy(idst.guidPrefix, iter->first.guidPrefix, sizeof(GuidPrefix_t));
00960         msg.length(ml + 1);
00961         msg[ml++].info_dst_sm(idst);
00962 
00963         const OPENDDS_VECTOR(RepoId)& readers = iter->second;
00964         for (size_t i = 0; i < readers.size(); ++i) {
00965           gap.readerId = readers.at(i).entityId;
00966           msg.length(ml + 1);
00967           msg[ml++].gap_sm(gap);
00968         } //END iter over reader entity ids
00969         ++iter;
00970       } //END iter over reader GuidPrefix_t's
00971     }
00972   }
00973 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::add_locator ( const RepoId remote_id,
const ACE_INET_Addr address,
bool  requires_inline_qos 
)

Definition at line 254 of file RtpsUdpDataLink.cpp.

References lock_.

00257 {
00258   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00259   locators_[remote_id] = RemoteInfo(address, requires_inline_qos);
00260 }

void OpenDDS::DCPS::RtpsUdpDataLink::associated ( const RepoId local,
const RepoId remote,
bool  local_reliable,
bool  remote_reliable,
bool  local_durable,
bool  remote_durable 
)

Definition at line 302 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, heartbeat_, heartbeat_counts_, OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), lock_, reader_index_, readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, and writers_.

00305 {
00306   if (!local_reliable) {
00307     return;
00308   }
00309 
00310   bool enable_heartbeat = false;
00311 
00312   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00313   const GuidConverter conv(local_id);
00314   if (conv.isWriter() && remote_reliable) {
00315     // Insert count if not already there.
00316     heartbeat_counts_.insert(HeartBeatCountMapType::value_type(local_id, 0));
00317     RtpsWriter& w = writers_[local_id];
00318     w.remote_readers_[remote_id].durable_ = remote_durable;
00319     w.durable_ = local_durable;
00320     enable_heartbeat = true;
00321 
00322   } else if (conv.isReader()) {
00323     RtpsReaderMap::iterator rr = readers_.find(local_id);
00324     if (rr == readers_.end()) {
00325       rr = readers_.insert(RtpsReaderMap::value_type(local_id, RtpsReader()))
00326         .first;
00327       rr->second.durable_ = local_durable;
00328     }
00329     rr->second.remote_writers_[remote_id];
00330     reader_index_.insert(RtpsReaderIndex::value_type(remote_id, rr));
00331   }
00332 
00333   g.release();
00334   if (enable_heartbeat) {
00335     heartbeat_->schedule_enable();
00336   }
00337 }

Here is the call graph for this function:

bool OpenDDS::DCPS::RtpsUdpDataLink::check_handshake_complete ( const RepoId local,
const RepoId remote 
)

Definition at line 340 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), and writers_.

00342 {
00343   const GuidConverter conv(local_id);
00344   if (conv.isWriter()) {
00345     RtpsWriterMap::iterator rw = writers_.find(local_id);
00346     if (rw == writers_.end()) {
00347       return true; // not reliable, no handshaking
00348     }
00349     ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote_id);
00350     if (ri == rw->second.remote_readers_.end()) {
00351       return true; // not reliable, no handshaking
00352     }
00353     return ri->second.handshake_done_;
00354 
00355   } else if (conv.isReader()) {
00356     return true; // no handshaking for local reader
00357   }
00358   return false;
00359 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::check_heartbeats (  )  [private]

Definition at line 2531 of file RtpsUdpDataLink.cpp.

References config(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, ACE_OS::gettimeofday(), OpenDDS::DCPS::RtpsUdpInst::heartbeat_period_, interesting_writers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, lock_, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), and OpenDDS::DCPS::DiscoveryListener::writer_does_not_exist().

02532 {
02533   OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks;
02534 
02535   // Have any interesting writers timed out?
02536   const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * this->config().heartbeat_period_;
02537   {
02538       ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02539 
02540       for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
02541            pos != limit;
02542            ++pos) {
02543         if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02544           CallbackType callback(pos->first, pos->second);
02545           writerDoesNotExistCallbacks.push_back(callback);
02546           pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02547         }
02548     }
02549   }
02550 
02551   OPENDDS_VECTOR(CallbackType)::iterator iter;
02552   for (iter = writerDoesNotExistCallbacks.begin(); iter != writerDoesNotExistCallbacks.end(); ++iter) {
02553     const RepoId& rid = iter->first;
02554     const InterestingRemote& remote = iter->second;
02555     remote.listener->writer_does_not_exist(rid, remote.localid);
02556   }
02557 }

Here is the call graph for this function:

RtpsUdpInst & OpenDDS::DCPS::RtpsUdpDataLink::config (  )  const

Definition at line 111 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DataLink::impl().

Referenced by check_heartbeats(), OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat::enable(), open(), send_heartbeats(), send_heartbeats_manual(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().

00112 {
00113   return static_cast<RtpsUdpTransport&>(impl()).config();
00114 }

Here is the call graph for this function:

Here is the caller graph for this function:

TransportQueueElement * OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element ( TransportQueueElement element  )  [private, virtual]

Allow derived classes to provide an alternate "customized" queue element for this DataLink (not shared with other links in the DataLinkSet).

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 688 of file RtpsUdpDataLink.cpp.

References add_gap_submsg(), ACE_Message_Block::cont(), OpenDDS::DCPS::RtpsSampleHeader::control_message_supported(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, ACE_Message_Block::duplicate(), end_historic_samples(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::DataSampleElement::get_header(), ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::TransportSendControlElement::header(), OpenDDS::DCPS::DataSampleHeader::historic_sample_, LM_DEBUG, LM_ERROR, lock_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::TransportQueueElement::msg(), OPENDDS_STRING, OpenDDS::DCPS::TransportCustomizedElement::original_send_element(), OpenDDS::DCPS::DataLink::peer_ids(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_control_submessages(), OpenDDS::DCPS::RtpsSampleHeader::populate_data_sample_submessages(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), requires_inline_qos(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::TransportSendElement::sample(), send_heartbeats_manual(), send_strategy(), OpenDDS::DCPS::RtpsCustomizedElement::sequence(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN, OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::Transport_debug_level, and writers_.

00689 {
00690   const ACE_Message_Block* msg = element->msg();
00691   if (!msg) {
00692     return element;
00693   }
00694 
00695   const RepoId pub_id = element->publication_id();
00696   GUIDSeq_var peers = peer_ids(pub_id);
00697   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, 0);
00698   bool requires_inline_qos = this->requires_inline_qos(peers);
00699 
00700   RTPS::SubmessageSeq subm;
00701 
00702   const RtpsWriterMap::iterator rw = writers_.find(pub_id);
00703 
00704   bool gap_ok = true;
00705   DestToEntityMap gap_receivers;
00706   if (rw != writers_.end() && !rw->second.remote_readers_.empty()) {
00707     for (ReaderInfoMap::iterator ri = rw->second.remote_readers_.begin();
00708          ri != rw->second.remote_readers_.end(); ++ri) {
00709       RepoId tmp;
00710       std::memcpy(tmp.guidPrefix, ri->first.guidPrefix, sizeof(GuidPrefix_t));
00711       tmp.entityId = ENTITYID_UNKNOWN;
00712       gap_receivers[tmp].push_back(ri->first);
00713 
00714       if (ri->second.expecting_durable_data()) {
00715         // Can't add an in-line GAP if some Data Reader is expecting durable
00716         // data, the GAP could cause that Data Reader to ignore the durable
00717         // data.  The other readers will eventually learn about the GAP by
00718         // sending an ACKNACK and getting a GAP reply.
00719         gap_ok = false;
00720         break;
00721       }
00722     }
00723   }
00724 
00725   if (gap_ok) {
00726     add_gap_submsg(subm, *element, gap_receivers);
00727   }
00728 
00729   const SequenceNumber seq = element->sequence();
00730   if (rw != writers_.end() && seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00731     rw->second.expected_ = seq;
00732     ++rw->second.expected_;
00733   }
00734 
00735   TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
00736   TransportCustomizedElement* tce =
00737     dynamic_cast<TransportCustomizedElement*>(element);
00738   TransportSendControlElement* tsce =
00739     dynamic_cast<TransportSendControlElement*>(element);
00740 
00741   Message_Block_Ptr data;
00742   bool durable = false;
00743 
00744   // Based on the type of 'element', find and duplicate the data payload
00745   // continuation block.
00746   if (tsce) {        // Control message
00747     if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
00748       data.reset(msg->cont()->duplicate());
00749       // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00750       RtpsSampleHeader::populate_data_control_submessages(
00751                 subm, *tsce, requires_inline_qos);
00752     } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
00753       end_historic_samples(rw, tsce->header(), msg->cont());
00754       element->data_delivered();
00755       return 0;
00756     } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
00757       send_heartbeats_manual(tsce);
00758       element->data_delivered();
00759       return 0;
00760     } else {
00761       element->data_dropped(true /*dropped_by_transport*/);
00762       return 0;
00763     }
00764 
00765   } else if (tse) {  // Basic data message
00766     // {DataSampleHeader} -> {Data Payload}
00767     data.reset(msg->cont()->duplicate());
00768     const DataSampleElement* dsle = tse->sample();
00769     // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00770     RtpsSampleHeader::populate_data_sample_submessages(
00771               subm, *dsle, requires_inline_qos);
00772     durable = dsle->get_header().historic_sample_;
00773 
00774   } else if (tce) {  // Customized data message
00775     // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
00776     data.reset(msg->cont()->cont()->duplicate());
00777     const DataSampleElement* dsle = tce->original_send_element()->sample();
00778     // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00779     RtpsSampleHeader::populate_data_sample_submessages(
00780               subm, *dsle, requires_inline_qos);
00781     durable = dsle->get_header().historic_sample_;
00782 
00783   } else {
00784     return element;
00785   }
00786 
00787 #if defined(OPENDDS_SECURITY)
00788   send_strategy()->encode_payload(pub_id, data, subm);
00789 #endif
00790 
00791   Message_Block_Ptr hdr(submsgs_to_msgblock(subm));
00792   hdr->cont(data.release());
00793   RtpsCustomizedElement* rtps =
00794     new RtpsCustomizedElement(element, move(hdr));
00795 
00796   // Handle durability resends
00797   if (durable && rw != writers_.end()) {
00798     const RepoId sub = element->subscription_id();
00799     if (sub != GUID_UNKNOWN) {
00800       ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(sub);
00801       if (ri != rw->second.remote_readers_.end()) {
00802         ri->second.durable_data_[rtps->sequence()] = rtps;
00803         ri->second.durable_timestamp_ = ACE_OS::gettimeofday();
00804         if (Transport_debug_level > 3) {
00805           const GuidConverter conv(pub_id), sub_conv(sub);
00806           ACE_DEBUG((LM_DEBUG,
00807             "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00808             "storing durable data for local %C remote %C\n",
00809             OPENDDS_STRING(conv).c_str(), OPENDDS_STRING(sub_conv).c_str()));
00810         }
00811         return 0;
00812       }
00813     }
00814   } else if (durable && (Transport_debug_level)) {
00815     const GuidConverter conv(pub_id);
00816     ACE_ERROR((LM_ERROR,
00817       "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00818       "WARNING: no RtpsWriter to store durable data for local %C\n",
00819       OPENDDS_STRING(conv).c_str()));
00820   }
00821 
00822   return rtps;
00823 }

Here is the call graph for this function:

template<typename T , typename FN >
void OpenDDS::DCPS::RtpsUdpDataLink::datareader_dispatch ( const T &  submessage,
const GuidPrefix_t src_prefix,
const FN &  func 
) [inline, private]

Definition at line 331 of file RtpsUdpDataLink.h.

References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, and OpenDDS::DCPS::GUID_t::guidPrefix.

Referenced by received().

00333   {
00334     using std::pair;
00335     RepoId local;
00336     std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
00337     local.entityId = submessage.readerId;
00338 
00339     RepoId src;
00340     std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
00341     src.entityId = submessage.writerId;
00342 
00343     bool schedule_timer = false;
00344     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00345     if (local.entityId == ENTITYID_UNKNOWN) {
00346       for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00347              reader_index_.equal_range(src);
00348            iters.first != iters.second; ++iters.first) {
00349         schedule_timer |= (this->*func)(submessage, src, *iters.first->second);
00350       }
00351 
00352     } else {
00353       const RtpsReaderMap::iterator rr = readers_.find(local);
00354       if (rr == readers_.end()) {
00355         return;
00356       }
00357       schedule_timer = (this->*func)(submessage, src, *rr);
00358     }
00359     g.release();
00360     if (schedule_timer) {
00361       heartbeat_reply_.schedule();
00362     }
00363   }

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::deliver_held_data ( const RepoId readerId,
WriterInfo info,
bool  durable 
) [private]

Definition at line 1056 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::empty(), held_data_delivery_handler_, OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::RtpsUdpDataLink::HeldDataDeliveryHandler::notify_delivery(), and OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_.

Referenced by process_data_i(), process_gap_i(), and process_heartbeat_i().

01058 {
01059   if (durable && (info.recvd_.empty() || info.recvd_.low() > 1)) return;
01060   held_data_delivery_handler_.notify_delivery(readerId, info);
01061 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::do_remove_sample ( const RepoId pub_id,
const TransportQueueElement::MatchCriteria criteria,
ACE_Guard< ACE_Thread_Mutex > &  guard 
)

Definition at line 128 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), OPENDDS_SET(), ACE_Guard< ACE_LOCK >::release(), and writers_.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::do_remove_sample().

00131 {
00132   RtpsWriter::SnToTqeMap sn_tqe_map;
00133   RtpsWriter::SnToTqeMap to_deliver;
00134   typedef RtpsWriter::SnToTqeMap::iterator iter_t;
00135 
00136   RtpsWriterMap::iterator iter = writers_.find(pub_id);
00137   if (iter != writers_.end() && !iter->second.elems_not_acked_.empty()) {
00138     to_deliver.insert(iter->second.to_deliver_.begin(), iter->second.to_deliver_.end());
00139     iter->second.to_deliver_.clear();
00140     iter_t it = iter->second.elems_not_acked_.begin();
00141     OPENDDS_SET(SequenceNumber) sns_to_release;
00142     while (it != iter->second.elems_not_acked_.end()) {
00143       if (criteria.matches(*it->second)) {
00144         sn_tqe_map.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
00145         sns_to_release.insert(it->first);
00146         iter_t last = it;
00147         ++it;
00148         iter->second.elems_not_acked_.erase(last);
00149       } else {
00150         ++it;
00151       }
00152     }
00153     OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00154     while (sns_it != sns_to_release.end()) {
00155       iter->second.send_buff_->release_acked(*sns_it);
00156       ++sns_it;
00157     }
00158   }
00159 
00160   // see comment in RtpsUdpDataLink::send_i() for lock order
00161   // reverse guard can't be used since that involves re-locking
00162   guard.release();
00163 
00164   iter_t deliver_iter = to_deliver.begin();
00165   while (deliver_iter != to_deliver.end()) {
00166     deliver_iter->second->data_delivered();
00167     ++deliver_iter;
00168   }
00169   iter_t drop_iter = sn_tqe_map.begin();
00170   while (drop_iter != sn_tqe_map.end()) {
00171     drop_iter->second->data_dropped(true);
00172     ++drop_iter;
00173   }
00174 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::durability_resend ( TransportQueueElement element  )  [private]

Definition at line 2287 of file RtpsUdpDataLink.cpp.

References get_locator(), OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), and OpenDDS::DCPS::TransportQueueElement::subscription_id().

Referenced by received().

02288 {
02289   ACE_Message_Block* msg = const_cast<ACE_Message_Block*>(element->msg());
02290   send_strategy()->send_rtps_control(*msg,
02291                                     get_locator(element->subscription_id()));
02292 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::end_historic_samples ( RtpsWriterMap::iterator  writer,
const DataSampleHeader header,
ACE_Message_Block body 
) [private]

Definition at line 826 of file RtpsUdpDataLink.cpp.

References ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_UNKNOWN, LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_length_, OPENDDS_STRING, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::Transport_debug_level, and writers_.

Referenced by customize_queue_element().

00829 {
00830   // Set the ReaderInfo::durable_timestamp_ for the case where no
00831   // durable samples exist in the DataWriter.
00832   if (writer != writers_.end() && writer->second.durable_) {
00833     const ACE_Time_Value now = ACE_OS::gettimeofday();
00834     RepoId sub = GUID_UNKNOWN;
00835     if (body && header.message_length_ >= sizeof(sub)) {
00836       std::memcpy(&sub, body->rd_ptr(), header.message_length_);
00837     }
00838     typedef ReaderInfoMap::iterator iter_t;
00839     if (sub == GUID_UNKNOWN) {
00840       if (Transport_debug_level > 3) {
00841         const GuidConverter conv(writer->first);
00842         ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
00843                    "local %C all readers\n", OPENDDS_STRING(conv).c_str()));
00844       }
00845       for (iter_t iter = writer->second.remote_readers_.begin();
00846            iter != writer->second.remote_readers_.end(); ++iter) {
00847         if (iter->second.durable_) {
00848           iter->second.durable_timestamp_ = now;
00849         }
00850       }
00851     } else {
00852       iter_t iter = writer->second.remote_readers_.find(sub);
00853       if (iter != writer->second.remote_readers_.end()) {
00854         if (iter->second.durable_) {
00855           iter->second.durable_timestamp_ = now;
00856           if (Transport_debug_level > 3) {
00857             const GuidConverter conv(writer->first), sub_conv(sub);
00858             ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
00859                        " local %C remote %C\n", OPENDDS_STRING(conv).c_str(),
00860                        OPENDDS_STRING(sub_conv).c_str()));
00861           }
00862         }
00863       }
00864     }
00865   }
00866 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::extend_bitmap_range ( RTPS::FragmentNumberSet fnSet,
CORBA::ULong  extent 
) [static, private]

Extend the FragmentNumberSet to cover the fragments that are missing from our last known fragment to the extent

Parameters:
fnSet FragmentNumberSet for the message sequence number in question
extent is the highest fragment sequence number for this FragmentNumberSet

Definition at line 1568 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), and OpenDDS::RTPS::FragmentNumberSet::numBits.

Referenced by generate_nack_frags().

01570 {
01571   if (extent < fnSet.bitmapBase.value) {
01572     return; // can't extend to some number under the base
01573   }
01574   // calculate the index to the extent to determine the new_num_bits
01575   const CORBA::ULong new_num_bits = std::min(CORBA::ULong(255),
01576                                              extent - fnSet.bitmapBase.value + 1),
01577                      len = (new_num_bits + 31) / 32;
01578   if (new_num_bits < fnSet.numBits) {
01579     return; // bitmap already extends past "extent"
01580   }
01581   fnSet.bitmap.length(len);
01582   // We are missing from one past old bitmap end to the new end
01583   DisjointSequence::fill_bitmap_range(fnSet.numBits + 1, new_num_bits,
01584                                       fnSet.bitmap.get_buffer(), len,
01585                                       fnSet.numBits);
01586 }

Here is the call graph for this function:

Here is the caller graph for this function:

size_t OpenDDS::DCPS::RtpsUdpDataLink::generate_nack_frags ( OPENDDS_VECTOR(RTPS::NackFragSubmessage)&  nack_frags,
WriterInfo wi,
const RepoId pub_id 
) [private]

Definition at line 1483 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::FragmentNumberSet::bitmap, OpenDDS::RTPS::FragmentNumberSet::bitmapBase, OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::ENTITYID_UNKNOWN, extend_bitmap_range(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::hb_range_, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::DisjointSequence::last_ack(), OpenDDS::RTPS::NACK_FRAG, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::nackfrag_count_, OpenDDS::RTPS::FragmentNumberSet::numBits, OpenDDS::DCPS::OPENDDS_MAP(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), receive_strategy(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::NackFragSubmessage::smHeader, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

Referenced by send_ack_nacks().

01485 {
01486   typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
01487   typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
01488   RtpsUdpReceiveStrategy::FragmentInfo frag_info;
01489 
01490   // Populate frag_info with two possible sources of NackFrags:
01491   // 1. sequence #s in the reception gaps that we have partially received
01492   OPENDDS_VECTOR(SequenceRange) missing = wi.recvd_.missing_sequence_ranges();
01493   for (size_t i = 0; i < missing.size(); ++i) {
01494     receive_strategy()->has_fragments(missing[i], pub_id, &frag_info);
01495   }
01496   // 1b. larger than the last received seq# but less than the heartbeat.lastSN
01497   if (!wi.recvd_.empty()) {
01498     const SequenceRange range(wi.recvd_.high(), wi.hb_range_.second);
01499     receive_strategy()->has_fragments(range, pub_id, &frag_info);
01500   }
01501   for (size_t i = 0; i < frag_info.size(); ++i) {
01502     // If we've received a HeartbeatFrag, we know the last (available) frag #
01503     const iter_t heartbeat_frag = wi.frags_.find(frag_info[i].first);
01504     if (heartbeat_frag != wi.frags_.end()) {
01505       extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value);
01506     }
01507   }
01508 
01509   // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag
01510   const iter_t low = wi.frags_.lower_bound(wi.recvd_.cumulative_ack()),
01511               high = wi.frags_.upper_bound(wi.recvd_.last_ack()),
01512                end = wi.frags_.end();
01513   for (iter_t iter = wi.frags_.begin(); iter != end; ++iter) {
01514     if (iter == low) {
01515       // skip over the range covered by step #1 above
01516       if (high == end) {
01517         break;
01518       }
01519       iter = high;
01520     }
01521 
01522     const SequenceRange range(iter->first, iter->first);
01523     if (receive_strategy()->has_fragments(range, pub_id, &frag_info)) {
01524       extend_bitmap_range(frag_info.back().second, iter->second.value);
01525     } else {
01526       // it was not in the recv strategy, so the entire range is "missing"
01527       frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
01528       RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
01529       fnSet.bitmapBase.value = 1;
01530       fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
01531       fnSet.bitmap.length((fnSet.numBits + 31) / 32);
01532       for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
01533         fnSet.bitmap[i] = 0xFFFFFFFF;
01534       }
01535     }
01536   }
01537 
01538   if (frag_info.empty()) {
01539     return 0;
01540   }
01541 
01542   const RTPS::NackFragSubmessage nackfrag_prototype = {
01543     {RTPS::NACK_FRAG, RTPS::FLAG_E, 0 /* length set below */},
01544     ENTITYID_UNKNOWN, // readerId will be filled-in by send_heartbeat_replies()
01545     ENTITYID_UNKNOWN, // writerId will be filled-in by send_heartbeat_replies()
01546     {0, 0}, // writerSN set below
01547     RTPS::FragmentNumberSet(), // fragmentNumberState set below
01548     {0} // count set below
01549   };
01550 
01551   size_t size = 0, padding = 0;
01552   for (size_t i = 0; i < frag_info.size(); ++i) {
01553     nf.push_back(nackfrag_prototype);
01554     RTPS::NackFragSubmessage& nackfrag = nf.back();
01555     nackfrag.writerSN.low = frag_info[i].first.getLow();
01556     nackfrag.writerSN.high = frag_info[i].first.getHigh();
01557     nackfrag.fragmentNumberState = frag_info[i].second;
01558     nackfrag.count.value = ++wi.nackfrag_count_;
01559     const size_t before_size = size;
01560     gen_find_size(nackfrag, size, padding);
01561     nackfrag.smHeader.submessageLength =
01562       static_cast<CORBA::UShort>(size - before_size) - RTPS::SMHDR_SZ;
01563   }
01564   return size;
01565 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INET_Addr OpenDDS::DCPS::RtpsUdpDataLink::get_locator ( const RepoId remote_id  )  const

Definition at line 288 of file RtpsUdpDataLink.cpp.

References LM_ERROR, OPENDDS_MAP_CMP(), and OPENDDS_STRING.

Referenced by durability_resend(), get_locators(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper(), and send_durability_gaps().

00289 {
00290   typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00291   const iter_t iter = locators_.find(remote_id);
00292   if (iter == locators_.end()) {
00293     const GuidConverter conv(remote_id);
00294     ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::get_locator_i() - "
00295       "no locator found for peer %C\n", OPENDDS_STRING(conv).c_str()));
00296     return ACE_INET_Addr();
00297   }
00298   return iter->second.addr_;
00299 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::get_locators ( const RepoId local_id,
OPENDDS_SET(ACE_INET_Addr)&  addrs 
) const

Given a 'local_id' of a publication or subscription, populate the set of 'addrs' with the network addresses of any remote peers (or if 'local_id' is GUID_UNKNOWN, all known addresses).

Definition at line 263 of file RtpsUdpDataLink.cpp.

References get_locator(), OpenDDS::DCPS::GUID_UNKNOWN, OPENDDS_MAP_CMP(), and OpenDDS::DCPS::DataLink::peer_ids().

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper(), and send_heartbeats_manual().

00265 {
00266   typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00267 
00268   if (local_id == GUID_UNKNOWN) {
00269     for (iter_t iter = locators_.begin(); iter != locators_.end(); ++iter) {
00270       addrs.insert(iter->second.addr_);
00271     }
00272     return;
00273   }
00274 
00275   const GUIDSeq_var peers = peer_ids(local_id);
00276   if (!peers.ptr()) {
00277     return;
00278   }
00279   for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00280     const ACE_INET_Addr addr = get_locator(peers[i]);
00281     if (addr != ACE_INET_Addr()) {
00282       addrs.insert(addr);
00283     }
00284   }
00285 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::RtpsUdpDataLink::get_reactor ( void   ) 
const GuidPrefix_t& OpenDDS::DCPS::RtpsUdpDataLink::local_prefix (  )  const [inline]

Definition at line 95 of file RtpsUdpDataLink.h.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().

00095 { return local_prefix_; }

Here is the caller graph for this function:

ACE_Message_Block * OpenDDS::DCPS::RtpsUdpDataLink::marshal_gaps ( const RepoId writer,
const RepoId reader,
const DisjointSequence gaps,
bool  durable = false 
) [private]

Definition at line 2185 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::GAP, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::DisjointSequence::to_bitmap(), OpenDDS::DCPS::Transport_debug_level, and writers_.

Referenced by send_directed_nack_replies(), send_durability_gaps(), and send_nack_replies().

02187 {
02188   using namespace RTPS;
02189   // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
02190   // [gapStart, gapListBase) and those in the SNSet.
02191   const SequenceNumber firstMissing = gaps.low(),
02192                        base = ++SequenceNumber(gaps.cumulative_ack());
02193   const SequenceNumber_t gapStart = {firstMissing.getHigh(),
02194                                      firstMissing.getLow()},
02195                          gapListBase = {base.getHigh(), base.getLow()};
02196   CORBA::ULong num_bits = 0;
02197   LongSeq8 bitmap;
02198 
02199   if (gaps.disjoint()) {
02200     bitmap.length(bitmap_num_longs(base, gaps.high()));
02201     gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits);
02202 
02203   } else {
02204     bitmap.length(1);
02205     bitmap[0] = 0;
02206     num_bits = 1;
02207   }
02208 
02209   GapSubmessage gap = {
02210     {GAP, FLAG_E, 0 /*length determined below*/},
02211     reader.entityId,
02212     writer.entityId,
02213     gapStart,
02214     {gapListBase, num_bits, bitmap}
02215   };
02216 
02217   if (Transport_debug_level > 5) {
02218     const GuidConverter conv(writer);
02219     SequenceRange sr;
02220     sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
02221     SequenceNumber srbase;
02222     srbase.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
02223     sr.second = srbase.previous();
02224     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::marshal_gaps "
02225               "GAP with range [%q, %q] from %C\n",
02226               sr.first.getValue(), sr.second.getValue(),
02227               OPENDDS_STRING(conv).c_str()));
02228   }
02229 
02230   size_t gap_size = 0, padding = 0;
02231   gen_find_size(gap, gap_size, padding);
02232   gap.smHeader.submessageLength =
02233     static_cast<CORBA::UShort>(gap_size + padding) - SMHDR_SZ;
02234 
02235   // For durable writers, change a non-directed Gap into multiple directed gaps.
02236   OPENDDS_VECTOR(RepoId) readers;
02237   if (durable && reader.entityId == ENTITYID_UNKNOWN) {
02238     if (Transport_debug_level > 5) {
02239       const GuidConverter local_conv(writer);
02240       ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps local %C "
02241                  "durable writer\n", OPENDDS_STRING(local_conv).c_str()));
02242     }
02243     const RtpsWriterMap::iterator iter = writers_.find(writer);
02244     RtpsWriter& rw = iter->second;
02245     for (ReaderInfoMap::iterator ri = rw.remote_readers_.begin();
02246          ri != rw.remote_readers_.end(); ++ri) {
02247       if (!ri->second.expecting_durable_data()) {
02248         readers.push_back(ri->first);
02249       } else if (Transport_debug_level > 5) {
02250         const GuidConverter remote_conv(ri->first);
02251         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps reader "
02252                    "%C is expecting durable data, no GAP sent\n",
02253                    OPENDDS_STRING(remote_conv).c_str()));
02254       }
02255     }
02256     if (readers.empty()) return 0;
02257   }
02258 
02259   const size_t size_per_idst = INFO_DST_SZ + SMHDR_SZ,
02260     prefix_sz = sizeof(reader.guidPrefix);
02261   // no additional padding needed for INFO_DST
02262   const size_t total_sz = readers.empty() ? (gap_size + padding) :
02263     (readers.size() * (gap_size + padding + size_per_idst));
02264 
02265   ACE_Message_Block* mb_gap = new ACE_Message_Block(total_sz);
02266   //FUTURE: allocators?
02267   // byte swapping is handled in the operator<<() implementation
02268   Serializer ser(mb_gap, false, Serializer::ALIGN_CDR);
02269   if (readers.empty()) {
02270     ser << gap;
02271   } else {
02272     InfoDestinationSubmessage idst = {
02273       {INFO_DST, FLAG_E, INFO_DST_SZ},
02274       {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02275     };
02276     for (size_t i = 0; i < readers.size(); ++i) {
02277       std::memcpy(idst.guidPrefix, readers[i].guidPrefix, prefix_sz);
02278       gap.readerId = readers[i].entityId;
02279       ser << idst;
02280       ser << gap;
02281     }
02282   }
02283   return mb_gap;
02284 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE ACE_SOCK_Dgram_Mcast & OpenDDS::DCPS::RtpsUdpDataLink::multicast_socket (  ) 

Definition at line 36 of file RtpsUdpDataLink.inl.

References multicast_socket_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::start_i(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::stop_i().

00037 {
00038   return multicast_socket_;
00039 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::RtpsUdpDataLink::open ( const ACE_SOCK_Dgram unicast_socket  ) 

Definition at line 177 of file RtpsUdpDataLink.cpp.

References ACE_TEXT(), ACE_TEXT_CHAR_TO_TCHAR, config(), ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, ACE_SOCK_Dgram_Mcast::join(), LM_ERROR, multi_buff_, OpenDDS::DCPS::RtpsUdpInst::multicast_group_address_, OpenDDS::DCPS::RtpsUdpInst::multicast_interface_, multicast_socket_, OPENDDS_STRING, ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), OpenDDS::DCPS::RtpsUdpInst::rcv_buffer_size_, OpenDDS::DCPS::DataLink::receive_strategy_, OpenDDS::DCPS::TransportSendStrategy::send_buffer(), OpenDDS::DCPS::RtpsUdpInst::send_buffer_size_, send_strategy(), OpenDDS::DCPS::DataLink::send_strategy_, ACE_SOCK::set_option(), OpenDDS::DCPS::set_socket_multicast_ttl(), OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::RtpsUdpInst::ttl_, unicast_socket_, and OpenDDS::DCPS::RtpsUdpInst::use_multicast_.

00178 {
00179   unicast_socket_ = unicast_socket;
00180 
00181   RtpsUdpInst& config = this->config();
00182 
00183   if (config.use_multicast_) {
00184     const OPENDDS_STRING& net_if = config.multicast_interface_;
00185 #ifdef ACE_HAS_MAC_OSX
00186     multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00187                            ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00188 #endif
00189     if (multicast_socket_.join(config.multicast_group_address_, 1,
00190                                net_if.empty() ? 0 :
00191                                ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00192       ACE_ERROR_RETURN((LM_ERROR,
00193                         ACE_TEXT("(%P|%t) ERROR: ")
00194                         ACE_TEXT("RtpsUdpDataLink::open: ")
00195                         ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed: %m\n")),
00196                        false);
00197     }
00198   }
00199 
00200   if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, config.ttl_)) {
00201     ACE_ERROR_RETURN((LM_ERROR,
00202                       ACE_TEXT("(%P|%t) ERROR: ")
00203                       ACE_TEXT("RtpsUdpDataLink::open: ")
00204                       ACE_TEXT("failed to set TTL: %d\n"),
00205                       config.ttl_),
00206                      false);
00207   }
00208 
00209   if (config.send_buffer_size_ > 0) {
00210     int snd_size = config.send_buffer_size_;
00211     if (this->unicast_socket_.set_option(SOL_SOCKET,
00212                                 SO_SNDBUF,
00213                                 (void *) &snd_size,
00214                                 sizeof(snd_size)) < 0
00215         && errno != ENOTSUP) {
00216       ACE_ERROR_RETURN((LM_ERROR,
00217                         ACE_TEXT("(%P|%t) ERROR: ")
00218                         ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00219                         snd_size),
00220                        false);
00221     }
00222   }
00223 
00224   if (config.rcv_buffer_size_ > 0) {
00225     int rcv_size = config.rcv_buffer_size_;
00226     if (this->unicast_socket_.set_option(SOL_SOCKET,
00227                                 SO_RCVBUF,
00228                                 (void *) &rcv_size,
00229                                 sizeof(int)) < 0
00230         && errno != ENOTSUP) {
00231       ACE_ERROR_RETURN((LM_ERROR,
00232                         ACE_TEXT("(%P|%t) ERROR: ")
00233                         ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00234                         rcv_size),
00235                        false);
00236     }
00237   }
00238 
00239   send_strategy()->send_buffer(&multi_buff_);
00240 
00241   if (start(send_strategy_,
00242             receive_strategy_) != 0) {
00243     stop_i();
00244     ACE_ERROR_RETURN((LM_ERROR,
00245                       ACE_TEXT("(%P|%t) ERROR: ")
00246                       ACE_TEXT("UdpDataLink::open: start failed!\n")),
00247                      false);
00248   }
00249 
00250   return true;
00251 }

Here is the call graph for this function:

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
CORBA::Long  ,
DCPS::GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
RtpsReader  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
WriterInfo  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
RtpsWriter  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
ReaderInfo  ,
GUID_tKeyLessThan   
) [private]
OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
RemoteInfo  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MAP_CMP ( RepoId  ,
OPENDDS_VECTOR(RepoId ,
GUID_tKeyLessThan   
) [private]

Referenced by get_locator(), get_locators(), and requires_inline_qos().

Here is the caller graph for this function:

typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP ( RepoId  ,
InterestingRemote  ,
DCPS::GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_MULTIMAP_CMP ( RepoId  ,
RtpsReaderMap::iterator  ,
GUID_tKeyLessThan   
) [private]
typedef OpenDDS::DCPS::RtpsUdpDataLink::OPENDDS_SET ( InterestingAckNack   )  [private]
void OpenDDS::DCPS::RtpsUdpDataLink::pre_stop_i (  )  [virtual]

Called before release the datalink or before shutdown to let the concrete DataLink to do anything necessary.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 427 of file RtpsUdpDataLink.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_, heartbeat_counts_, lock_, OPENDDS_MULTIMAP, OPENDDS_SET(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::to_deliver_, and writers_.

00428 {
00429   DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
00430   DataLink::pre_stop_i();
00431   OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00432   OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00433   {
00434     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00435 
00436     typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00437 
00438     RtpsWriterMap::iterator iter = writers_.begin();
00439     while (iter != writers_.end()) {
00440       RtpsWriter& writer = iter->second;
00441       if (!writer.to_deliver_.empty()) {
00442         iter_t iter = writer.to_deliver_.begin();
00443         while (iter != writer.to_deliver_.end()) {
00444           to_deliver.push_back(iter->second);
00445           writer.to_deliver_.erase(iter);
00446           iter = writer.to_deliver_.begin();
00447         }
00448       }
00449       if (!writer.elems_not_acked_.empty()) {
00450         OPENDDS_SET(SequenceNumber) sns_to_release;
00451         iter_t iter = writer.elems_not_acked_.begin();
00452         while (iter != writer.elems_not_acked_.end()) {
00453           to_drop.push_back(iter->second);
00454           sns_to_release.insert(iter->first);
00455           writer.elems_not_acked_.erase(iter);
00456           iter = writer.elems_not_acked_.begin();
00457         }
00458         OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00459         while (sns_it != sns_to_release.end()) {
00460           writer.send_buff_->release_acked(*sns_it);
00461           ++sns_it;
00462         }
00463       }
00464       RtpsWriterMap::iterator last = iter;
00465       ++iter;
00466       heartbeat_counts_.erase(last->first);
00467       writers_.erase(last);
00468     }
00469   }
00470   typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00471   tqe_iter deliver_it = to_deliver.begin();
00472   while (deliver_it != to_deliver.end()) {
00473     (*deliver_it)->data_delivered();
00474     ++deliver_it;
00475   }
00476   tqe_iter drop_it = to_drop.begin();
00477   while (drop_it != to_drop.end()) {
00478     (*drop_it)->data_dropped(true);
00479     ++drop_it;
00480   }
00481 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::process_acked_by_all_i ( ACE_Guard< ACE_Thread_Mutex > &  g,
const RepoId pub_id 
) [private]

Definition at line 2114 of file RtpsUdpDataLink.cpp.

References ACE_Guard< ACE_LOCK >::acquire(), OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_, OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_MULTIMAP, OPENDDS_SET(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::to_deliver_, and writers_.

Referenced by received(), and release_reservations_i().

02115 {
02116   using namespace OpenDDS::RTPS;
02117   typedef RtpsWriterMap::iterator rw_iter;
02118   typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
02119   OPENDDS_VECTOR(RepoId) to_check;
02120   rw_iter rw = writers_.find(pub_id);
02121   if (rw == writers_.end()) {
02122     return;
02123   }
02124   RtpsWriter& writer = rw->second;
02125   if (!writer.elems_not_acked_.empty()) {
02126 
02127     //start with the max sequence number writer knows about and decrease
02128     //by what the min over all readers is
02129     SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
02130 
02131     typedef ReaderInfoMap::iterator ri_iter;
02132     const ri_iter end = writer.remote_readers_.end();
02133     for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
02134       if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
02135         all_readers_ack = ri->second.cur_cumulative_ack_;
02136       }
02137     }
02138     if (all_readers_ack == SequenceNumber::MAX_VALUE) {
02139       return;
02140     }
02141     OPENDDS_VECTOR(SequenceNumber) sns;
02142     //if any messages fully acked, call data delivered and remove from map
02143 
02144     iter_t it = writer.elems_not_acked_.begin();
02145     OPENDDS_SET(SequenceNumber) sns_to_release;
02146     while (it != writer.elems_not_acked_.end()) {
02147       if (it->first < all_readers_ack) {
02148         writer.to_deliver_.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
02149         sns_to_release.insert(it->first);
02150         iter_t last = it;
02151         ++it;
02152         writer.elems_not_acked_.erase(last);
02153       } else {
02154         break;
02155       }
02156     }
02157     OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
02158     while (sns_it != sns_to_release.end()) {
02159       writer.send_buff_->release_acked(*sns_it);
02160       ++sns_it;
02161     }
02162     TransportQueueElement* tqe_to_deliver;
02163 
02164     while (true) {
02165       rw_iter deliver_on_writer = writers_.find(pub_id);
02166       if (deliver_on_writer == writers_.end()) {
02167         break;
02168       }
02169       RtpsWriter& writer = deliver_on_writer->second;
02170       iter_t to_deliver_iter = writer.to_deliver_.begin();
02171       if (to_deliver_iter == writer.to_deliver_.end()) {
02172         break;
02173       }
02174       tqe_to_deliver = to_deliver_iter->second;
02175       writer.to_deliver_.erase(to_deliver_iter);
02176       g.release();
02177 
02178       tqe_to_deliver->data_delivered();
02179       g.acquire();
02180     }
02181   }
02182 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::RtpsUdpDataLink::process_data_i ( const RTPS::DataSubmessage data,
const RepoId src,
RtpsReaderMap::value_type &  rr 
) [private]

Definition at line 986 of file RtpsUdpDataLink.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DisjointSequence::contains(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), deliver_held_data(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::do_not_withhold_data_from(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_STRING, OpenDDS::DCPS::SequenceNumber::previous(), receive_strategy(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::RtpsUdpReceiveStrategy::withhold_data_from(), and OpenDDS::RTPS::DataSubmessage::writerSN.

Referenced by received().

00989 {
00990   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
00991   if (wi != rr.second.remote_writers_.end()) {
00992     WriterInfo& info = wi->second;
00993     SequenceNumber seq;
00994     seq.setValue(data.writerSN.high, data.writerSN.low);
00995     info.frags_.erase(seq);
00996     const RepoId& readerId = rr.first;
00997     if (info.recvd_.contains(seq)) {
00998       if (Transport_debug_level > 5) {
00999         GuidConverter writer(src);
01000         GuidConverter reader(readerId);
01001         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01002                              ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because ALREADY received\n"),
01003                              seq.getValue(),
01004                              OPENDDS_STRING(writer).c_str(),
01005                              OPENDDS_STRING(reader).c_str()));
01006       }
01007       receive_strategy()->withhold_data_from(readerId);
01008     } else if (info.recvd_.disjoint() ||
01009         (!info.recvd_.empty() && info.recvd_.cumulative_ack() != seq.previous())
01010         || (rr.second.durable_ && !info.recvd_.empty() && info.recvd_.low() > 1)
01011         || (rr.second.durable_ && info.recvd_.empty() && seq > 1)) {
01012       if (Transport_debug_level > 5) {
01013         GuidConverter writer(src);
01014         GuidConverter reader(readerId);
01015         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01016                              ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because can't receive yet\n"),
01017                              seq.getValue(),
01018                              OPENDDS_STRING(writer).c_str(),
01019                              OPENDDS_STRING(reader).c_str()));
01020       }
01021       const ReceivedDataSample* sample =
01022         receive_strategy()->withhold_data_from(readerId);
01023       info.held_.insert(std::make_pair(seq, *sample));
01024     } else {
01025       if (Transport_debug_level > 5) {
01026         GuidConverter writer(src);
01027         GuidConverter reader(readerId);
01028         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01029                              ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
01030                              seq.getValue(),
01031                              OPENDDS_STRING(writer).c_str(),
01032                              OPENDDS_STRING(reader).c_str()));
01033       }
01034       receive_strategy()->do_not_withhold_data_from(readerId);
01035     }
01036     info.recvd_.insert(seq);
01037     deliver_held_data(readerId, info, rr.second.durable_);
01038   } else {
01039     if (Transport_debug_level > 5) {
01040       GuidConverter writer(src);
01041       GuidConverter reader(rr.first);
01042       SequenceNumber seq;
01043       seq.setValue(data.writerSN.high, data.writerSN.low);
01044       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01045                            ACE_TEXT(" data seq: %q from %C to %C OK to deliver (Writer not currently in Reader remote writer map)\n"),
01046                            seq.getValue(),
01047                            OPENDDS_STRING(writer).c_str(),
01048                            OPENDDS_STRING(reader).c_str()));
01049     }
01050     receive_strategy()->do_not_withhold_data_from(rr.first);
01051   }
01052   return false;
01053 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::RtpsUdpDataLink::process_gap_i ( const RTPS::GapSubmessage gap,
const RepoId src,
RtpsReaderMap::value_type &  rr 
) [private]

Definition at line 1071 of file RtpsUdpDataLink.cpp.

References deliver_held_data(), OpenDDS::RTPS::GapSubmessage::gapList, OpenDDS::RTPS::GapSubmessage::gapStart, OpenDDS::DCPS::SequenceNumber::getValue(), LM_DEBUG, LM_WARNING, OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, and VDBG_LVL.

Referenced by received().

01073 {
01074   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01075   if (wi != rr.second.remote_writers_.end()) {
01076     SequenceRange sr;
01077     sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
01078     SequenceNumber base;
01079     base.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
01080     SequenceNumber first_received = SequenceNumber::MAX_VALUE;
01081     if (!wi->second.recvd_.empty()) {
01082       OPENDDS_VECTOR(SequenceRange) missing = wi->second.recvd_.missing_sequence_ranges();
01083       if (!missing.empty()) {
01084         first_received = missing.front().second;
01085       }
01086     }
01087     sr.second = std::min(first_received, base.previous());
01088     if (sr.first <= sr.second) {
01089       if (Transport_debug_level > 5) {
01090         const GuidConverter conv(src);
01091         const GuidConverter rdr(rr.first);
01092         ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01093                   "Reader %C received GAP with range [%q, %q] (inserting range [%q, %q]) from %C\n",
01094                   OPENDDS_STRING(rdr).c_str(),
01095                   sr.first.getValue(), base.previous().getValue(),
01096                   sr.first.getValue(), sr.second.getValue(),
01097                   OPENDDS_STRING(conv).c_str()));
01098       }
01099       wi->second.recvd_.insert(sr);
01100     } else {
01101       const GuidConverter conv(src);
01102       VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01103                 "received GAP with invalid range [%q, %q] from %C\n",
01104                 sr.first.getValue(), sr.second.getValue(),
01105                 OPENDDS_STRING(conv).c_str()), 2);
01106     }
01107     wi->second.recvd_.insert(base, gap.gapList.numBits,
01108                              gap.gapList.bitmap.get_buffer());
01109     deliver_held_data(rr.first, wi->second, rr.second.durable_);
01110     //FUTURE: to support wait_for_acks(), notify DCPS layer of the GAP
01111   }
01112   return false;
01113 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::RtpsUdpDataLink::process_hb_frag_i ( const RTPS::HeartBeatFragSubmessage hb_frag,
const RepoId src,
RtpsReaderMap::value_type &  rr 
) [private]

Definition at line 1596 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::HeartBeatFragSubmessage::count, OpenDDS::RTPS::HeartBeatFragSubmessage::lastFragmentNum, OpenDDS::DCPS::SequenceNumber::setValue(), and OpenDDS::RTPS::HeartBeatFragSubmessage::writerSN.

Referenced by received().

01599 {
01600   WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01601   if (wi == rr.second.remote_writers_.end()) {
01602     // we may not be associated yet, even if the writer thinks we are
01603     return false;
01604   }
01605 
01606   if (hb_frag.count.value <= wi->second.hb_frag_recvd_count_) {
01607     return false;
01608   }
01609 
01610   wi->second.hb_frag_recvd_count_ = hb_frag.count.value;
01611 
01612   SequenceNumber seq;
01613   seq.setValue(hb_frag.writerSN.high, hb_frag.writerSN.low);
01614 
01615   // If seq is outside the heartbeat range or we haven't completely received
01616   // it yet, send a NackFrag along with the AckNack.  The heartbeat range needs
01617   // to be checked first because recvd_ contains the numbers below the
01618   // heartbeat range (so that we don't NACK those).
01619   if (seq < wi->second.hb_range_.first || seq > wi->second.hb_range_.second
01620       || !wi->second.recvd_.contains(seq)) {
01621     wi->second.frags_[seq] = hb_frag.lastFragmentNum;
01622     wi->second.ack_pending_ = true;
01623     return true; // timer will invoke send_heartbeat_replies()
01624   }
01625   return false;
01626 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::RtpsUdpDataLink::process_heartbeat_i ( const RTPS::HeartBeatSubmessage heartbeat,
const RepoId src,
RtpsReaderMap::value_type &  rr 
) [private]

Definition at line 1171 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::ack_pending_, OpenDDS::RTPS::HeartBeatSubmessage::count, deliver_held_data(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::HeartBeatSubmessage::firstSN, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::FLAG_L, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::hb_range_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::heartbeat_recvd_count_, OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::initial_hb_, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::RTPS::HeartBeatSubmessage::lastSN, OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::SequenceNumber::previous(), receive_strategy(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::recvd_, OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::RtpsUdpDataLink::WriterInfo::should_nack(), OpenDDS::RTPS::HeartBeatSubmessage::smHeader, and OpenDDS::DCPS::SequenceNumber::ZERO().

Referenced by received().

01174 {
01175   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01176   if (wi == rr.second.remote_writers_.end()) {
01177     // we may not be associated yet, even if the writer thinks we are
01178     return false;
01179   }
01180 
01181   WriterInfo& info = wi->second;
01182 
01183   if (heartbeat.count.value <= info.heartbeat_recvd_count_) {
01184     return false;
01185   }
01186   info.heartbeat_recvd_count_ = heartbeat.count.value;
01187 
01188   SequenceNumber& first = info.hb_range_.first;
01189   first.setValue(heartbeat.firstSN.high, heartbeat.firstSN.low);
01190   SequenceNumber& last = info.hb_range_.second;
01191   last.setValue(heartbeat.lastSN.high, heartbeat.lastSN.low);
01192   static const SequenceNumber starting, zero = SequenceNumber::ZERO();
01193 
01194   DisjointSequence& recvd = info.recvd_;
01195   if (!rr.second.durable_ && info.initial_hb_) {
01196     if (last.getValue() < starting.getValue()) {
01197       // this is an invalid heartbeat -- last must be positive
01198       return false;
01199     }
01200     // For the non-durable reader, the first received HB or DATA establishes
01201     // a baseline of the lowest sequence number we'd ever need to NACK.
01202     if (recvd.empty() || recvd.low() >= last) {
01203       recvd.insert(SequenceRange(zero, last));
01204     } else {
01205       recvd.insert(SequenceRange(zero, recvd.low()));
01206     }
01207   } else if (!recvd.empty()) {
01208     // All sequence numbers below 'first' should not be NACKed.
01209     // The value of 'first' may not decrease with subsequent HBs.
01210     recvd.insert(SequenceRange(zero,
01211                                (first > starting) ? first.previous() : zero));
01212   }
01213 
01214   deliver_held_data(rr.first, info, rr.second.durable_);
01215 
01216   //FUTURE: to support wait_for_acks(), notify DCPS layer of the sequence
01217   //        numbers we no longer expect to receive due to HEARTBEAT
01218 
01219   info.initial_hb_ = false;
01220 
01221   const bool final = heartbeat.smHeader.flags & RTPS::FLAG_F,
01222     liveliness = heartbeat.smHeader.flags & RTPS::FLAG_L;
01223 
01224   if (!final || (!liveliness && (info.should_nack() ||
01225       rr.second.nack_durable(info) ||
01226       receive_strategy()->has_fragments(info.hb_range_, wi->first)))) {
01227     info.ack_pending_ = true;
01228     return true; // timer will invoke send_heartbeat_replies()
01229   }
01230 
01231   //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC
01232   return false;
01233 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::process_requested_changes ( DisjointSequence requests,
const RtpsWriter writer,
const ReaderInfo reader 
) [private]

Definition at line 2039 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::SequenceNumberSet::bitmap, OpenDDS::RTPS::SequenceNumberSet::bitmapBase, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::heartbeat_high(), OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::RTPS::SequenceNumberSet::numBits, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, and OpenDDS::DCPS::SequenceNumber::setValue().

Referenced by send_directed_nack_replies(), and send_nack_replies().

02042 {
02043   for (size_t i = 0; i < reader.requested_changes_.size(); ++i) {
02044     const RTPS::SequenceNumberSet& sn_state = reader.requested_changes_[i];
02045     SequenceNumber base;
02046     base.setValue(sn_state.bitmapBase.high, sn_state.bitmapBase.low);
02047     if (sn_state.numBits == 1 && !(sn_state.bitmap[0] & 1)
02048         && base == writer.heartbeat_high(reader)) {
02049       // Since there is an entry in requested_changes_, the DR must have
02050       // sent a non-final AckNack.  If the base value is the high end of
02051       // the heartbeat range, treat it as a request for that seq#.
02052       if (!writer.send_buff_.is_nil() && writer.send_buff_->contains(base)) {
02053         requests.insert(base);
02054       }
02055     } else {
02056       requests.insert(base, sn_state.numBits, sn_state.bitmap.get_buffer());
02057     }
02058   }
02059 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::RtpsUdpDataLink::reactor_is_shut_down (  ) 

Definition at line 23 of file RtpsUdpDataLink.inl.

References reactor_task_.

00024 {
00025   if (!reactor_task_) return true;
00026   return reactor_task_->is_shut_down();
00027 }

OpenDDS::DCPS::RtpsUdpReceiveStrategy * OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy (  )  [private]

Definition at line 2865 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RcHandle< T >::in(), and OpenDDS::DCPS::DataLink::receive_strategy_.

Referenced by generate_nack_frags(), process_data_i(), process_heartbeat_i(), and send_ack_nacks().

02866 {
02867   return static_cast<OpenDDS::DCPS::RtpsUdpReceiveStrategy*>(receive_strategy_.in());
02868 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::NackFragSubmessage nackfrag,
const GuidPrefix_t src_prefix 
)

Definition at line 1836 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::NackFragSubmessage::count, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::NackFragSubmessage::fragmentNumberState, OpenDDS::DCPS::GUID_t::guidPrefix, LM_DEBUG, LM_WARNING, local_prefix_, lock_, nack_reply_, OPENDDS_STRING, OpenDDS::RTPS::NackFragSubmessage::readerId, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::DCPS::Transport_debug_level, VDBG, OpenDDS::RTPS::NackFragSubmessage::writerId, writers_, and OpenDDS::RTPS::NackFragSubmessage::writerSN.

01838 {
01839   // local side is DW
01840   RepoId local;
01841   std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01842   local.entityId = nackfrag.writerId; // can't be ENTITYID_UNKNOWN
01843 
01844   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01845   const RtpsWriterMap::iterator rw = writers_.find(local);
01846   if (rw == writers_.end()) {
01847     if (Transport_debug_level > 5) {
01848       GuidConverter local_conv(local);
01849       ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01850         "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01851     }
01852     return;
01853   }
01854 
01855   RepoId remote;
01856   std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01857   remote.entityId = nackfrag.readerId;
01858 
01859   if (Transport_debug_level > 5) {
01860     GuidConverter local_conv(local), remote_conv(remote);
01861     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01862       "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01863       OPENDDS_STRING(remote_conv).c_str()));
01864   }
01865 
01866   const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01867   if (ri == rw->second.remote_readers_.end()) {
01868     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01869       "WARNING ReaderInfo not found\n"));
01870     return;
01871   }
01872 
01873   if (nackfrag.count.value <= ri->second.nackfrag_recvd_count_) {
01874     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01875       "WARNING Count indicates duplicate, dropping\n"));
01876     return;
01877   }
01878 
01879   ri->second.nackfrag_recvd_count_ = nackfrag.count.value;
01880 
01881   SequenceNumber seq;
01882   seq.setValue(nackfrag.writerSN.high, nackfrag.writerSN.low);
01883   ri->second.requested_frags_[seq] = nackfrag.fragmentNumberState;
01884   g.release();
01885   nack_reply_.schedule(); // timer will invoke send_nack_replies()
01886 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::AckNackSubmessage acknack,
const GuidPrefix_t src_prefix 
)

Definition at line 1632 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::AckNackSubmessage::count, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::DisjointSequence::dump(), durability_resend(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::RTPS::FLAG_F, ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::DCPS::DisjointSequence::insert(), interesting_readers_, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), LM_DEBUG, LM_WARNING, local_prefix_, lock_, OpenDDS::DCPS::DisjointSequence::low(), nack_reply_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), process_acked_by_all_i(), OpenDDS::RTPS::AckNackSubmessage::readerId, OpenDDS::RTPS::AckNackSubmessage::readerSNState, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), send_durability_gaps(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::SequenceNumber::setValue(), OpenDDS::RTPS::AckNackSubmessage::smHeader, OpenDDS::DCPS::Transport_debug_level, VDBG, OpenDDS::RTPS::AckNackSubmessage::writerId, writers_, and OpenDDS::DCPS::SequenceNumber::ZERO().

01634 {
01635   // local side is DW
01636   RepoId local;
01637   std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01638   local.entityId = acknack.writerId; // can't be ENTITYID_UNKNOWN
01639 
01640   RepoId remote;
01641   std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01642   remote.entityId = acknack.readerId;
01643 
01644   const ACE_Time_Value now = ACE_OS::gettimeofday();
01645   OPENDDS_VECTOR(DiscoveryListener*) callbacks;
01646 
01647   {
01648     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01649     for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
01650            limit = interesting_readers_.upper_bound(remote);
01651          pos != limit;
01652          ++pos) {
01653       pos->second.last_activity = now;
01654       // Ensure the acknack was for the writer.
01655       if (local == pos->second.localid) {
01656         if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01657           callbacks.push_back(pos->second.listener);
01658           pos->second.status = InterestingRemote::EXISTS;
01659         }
01660       }
01661     }
01662   }
01663 
01664   for (size_t i = 0; i < callbacks.size(); ++i) {
01665     callbacks[i]->reader_exists(remote, local);
01666   }
01667 
01668   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01669   const RtpsWriterMap::iterator rw = writers_.find(local);
01670   if (rw == writers_.end()) {
01671     if (Transport_debug_level > 5) {
01672       GuidConverter local_conv(local);
01673       ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01674         "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01675     }
01676     return;
01677   }
01678 
01679   if (Transport_debug_level > 5) {
01680     GuidConverter local_conv(local), remote_conv(remote);
01681     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01682       "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01683       OPENDDS_STRING(remote_conv).c_str()));
01684   }
01685 
01686   const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01687   if (ri == rw->second.remote_readers_.end()) {
01688     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01689       "WARNING ReaderInfo not found\n"));
01690     return;
01691   }
01692 
01693   if (acknack.count.value <= ri->second.acknack_recvd_count_) {
01694     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01695       "WARNING Count indicates duplicate, dropping\n"));
01696     return;
01697   }
01698 
01699   ri->second.acknack_recvd_count_ = acknack.count.value;
01700 
01701   if (!ri->second.handshake_done_) {
01702     ri->second.handshake_done_ = true;
01703     invoke_on_start_callbacks(true);
01704   }
01705 
01706   OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
01707   const bool final = acknack.smHeader.flags & RTPS::FLAG_F;
01708 
01709   if (!ri->second.durable_data_.empty()) {
01710     if (Transport_debug_level > 5) {
01711       const GuidConverter local_conv(local), remote_conv(remote);
01712       ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01713                  "local %C has durable for remote %C\n",
01714                  OPENDDS_STRING(local_conv).c_str(),
01715                  OPENDDS_STRING(remote_conv).c_str()));
01716     }
01717     SequenceNumber ack;
01718     ack.setValue(acknack.readerSNState.bitmapBase.high,
01719                  acknack.readerSNState.bitmapBase.low);
01720     const SequenceNumber& dd_last = ri->second.durable_data_.rbegin()->first;
01721     if (ack > dd_last) {
01722       // Reader acknowledges durable data, we no longer need to store it
01723       ri->second.durable_data_.swap(pendingCallbacks);
01724       if (Transport_debug_level > 5) {
01725         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01726                    "durable data acked\n"));
01727       }
01728     } else {
01729       DisjointSequence requests;
01730       if (!requests.insert(ack, acknack.readerSNState.numBits,
01731                            acknack.readerSNState.bitmap.get_buffer())
01732           && !final && ack == rw->second.heartbeat_high(ri->second)) {
01733         // This is a non-final AckNack with no bits in the bitmap.
01734         // Attempt to reply to a request for the "base" value which
01735         // is neither Acked nor Nacked, only when it's the HB high.
01736         if (ri->second.durable_data_.count(ack)) requests.insert(ack);
01737       }
01738       // Attempt to reply to nacks for durable data
01739       bool sent_some = false;
01740       typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01741       iter_t it = ri->second.durable_data_.begin();
01742       const OPENDDS_VECTOR(SequenceRange) psr = requests.present_sequence_ranges();
01743       SequenceNumber lastSent = SequenceNumber::ZERO();
01744       if (!requests.empty()) {
01745         lastSent = requests.low().previous();
01746       }
01747       DisjointSequence gaps;
01748       for (size_t i = 0; i < psr.size(); ++i) {
01749         for (; it != ri->second.durable_data_.end()
01750              && it->first < psr[i].first; ++it) ; // empty for-loop
01751         for (; it != ri->second.durable_data_.end()
01752              && it->first <= psr[i].second; ++it) {
01753           if (Transport_debug_level > 5) {
01754             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01755                        "durable resend %d\n", int(it->first.getValue())));
01756           }
01757           durability_resend(it->second);
01758           //FUTURE: combine multiple resends into one RTPS Message?
01759           sent_some = true;
01760           if (it->first > lastSent + 1) {
01761             gaps.insert(SequenceRange(lastSent + 1, it->first.previous()));
01762           }
01763           lastSent = it->first;
01764         }
01765         if (sent_some && lastSent < psr[i].second && psr[i].second < dd_last) {
01766           gaps.insert(SequenceRange(lastSent + 1, psr[i].second));
01767         }
01768       }
01769       if (!gaps.empty()) {
01770         if (Transport_debug_level > 5) {
01771           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01772                      "sending durability gaps: "));
01773           gaps.dump();
01774         }
01775         send_durability_gaps(local, remote, gaps);
01776       }
01777       if (sent_some) {
01778         return;
01779       }
01780       const SequenceNumber& dd_first = ri->second.durable_data_.begin()->first;
01781       if (!requests.empty() && requests.high() < dd_first) {
01782         // All nacks were below the start of the durable data.
01783           requests.insert(SequenceRange(requests.high(), dd_first.previous()));
01784         if (Transport_debug_level > 5) {
01785           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01786                      "sending durability gaps for all requests: "));
01787           requests.dump();
01788         }
01789         send_durability_gaps(local, remote, requests);
01790         return;
01791       }
01792       if (!requests.empty() && requests.low() < dd_first) {
01793         // Lowest nack was below the start of the durable data.
01794         for (size_t i = 0; i < psr.size(); ++i) {
01795           if (psr[i].first > dd_first) {
01796             break;
01797           }
01798           gaps.insert(SequenceRange(psr[i].first,
01799                                     std::min(psr[i].second, dd_first)));
01800         }
01801         if (Transport_debug_level > 5) {
01802           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01803                      "sending durability gaps for some requests: "));
01804           gaps.dump();
01805         }
01806         send_durability_gaps(local, remote, gaps);
01807         return;
01808       }
01809     }
01810   }
01811   SequenceNumber ack;
01812   ack.setValue(acknack.readerSNState.bitmapBase.high,
01813                acknack.readerSNState.bitmapBase.low);
01814   if (ack != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
01815       && ack != SequenceNumber::ZERO()) {
01816     ri->second.cur_cumulative_ack_ = ack;
01817   }
01818   // If this ACKNACK was final, the DR doesn't expect a reply, and therefore
01819   // we don't need to do anything further.
01820   if (!final || bitmapNonEmpty(acknack.readerSNState)) {
01821     ri->second.requested_changes_.push_back(acknack.readerSNState);
01822   }
01823   process_acked_by_all_i(g, local);
01824   g.release();
01825   if (!final) {
01826     nack_reply_.schedule(); // timer will invoke send_nack_replies()
01827   }
01828   typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01829   for (iter_t it = pendingCallbacks.begin();
01830        it != pendingCallbacks.end(); ++it) {
01831     it->second->data_delivered();
01832   }
01833 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::HeartBeatFragSubmessage hb_frag,
const GuidPrefix_t src_prefix 
)

Definition at line 1589 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), and process_hb_frag_i().

01591 {
01592   datareader_dispatch(hb_frag, src_prefix, &RtpsUdpDataLink::process_hb_frag_i);
01593 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::HeartBeatSubmessage heartbeat,
const GuidPrefix_t src_prefix 
)

Definition at line 1116 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, ACE_OS::gettimeofday(), OpenDDS::DCPS::GUID_t::guidPrefix, heartbeat_reply_, interesting_ack_nacks_, interesting_writers_, lock_, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), process_heartbeat_i(), readers_, OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::schedule(), and OpenDDS::RTPS::HeartBeatSubmessage::writerId.

01118 {
01119   RepoId src;
01120   std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01121   src.entityId = heartbeat.writerId;
01122 
01123   bool schedule_acknack = false;
01124   const ACE_Time_Value now = ACE_OS::gettimeofday();
01125   OPENDDS_VECTOR(InterestingRemote) callbacks;
01126 
01127   {
01128     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01129 
01130     // We received a heartbeat from a writer.
01131     // We should ACKNACK if the writer is interesting and there is no association.
01132 
01133     for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
01134            limit = interesting_writers_.upper_bound(src);
01135          pos != limit;
01136          ++pos) {
01137       const RepoId& writerid = src;
01138       const RepoId& readerid = pos->second.localid;
01139 
01140       RtpsReaderMap::const_iterator riter = readers_.find(readerid);
01141       if (riter == readers_.end()) {
01142         // Reader has no associations.
01143         interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01144       } else if (riter->second.remote_writers_.find(writerid) == riter->second.remote_writers_.end()) {
01145         // Reader is not associated with this writer.
01146         interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01147       }
01148       pos->second.last_activity = now;
01149       if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01150         callbacks.push_back(pos->second);
01151         pos->second.status = InterestingRemote::EXISTS;
01152       }
01153     }
01154 
01155     schedule_acknack = !interesting_ack_nacks_.empty();
01156   }
01157 
01158   for (size_t i = 0; i < callbacks.size(); ++i) {
01159     callbacks[i].listener->writer_exists(src, callbacks[i].localid);
01160   }
01161 
01162   if (schedule_acknack) {
01163     heartbeat_reply_.schedule();
01164   }
01165 
01166   datareader_dispatch(heartbeat, src_prefix,
01167                       &RtpsUdpDataLink::process_heartbeat_i);
01168 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::GapSubmessage gap,
const GuidPrefix_t src_prefix 
)

Definition at line 1064 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), and process_gap_i().

01066 {
01067   datareader_dispatch(gap, src_prefix, &RtpsUdpDataLink::process_gap_i);
01068 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::received ( const RTPS::DataSubmessage data,
const GuidPrefix_t src_prefix 
)

Definition at line 979 of file RtpsUdpDataLink.cpp.

References datareader_dispatch(), and process_data_i().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample_i().

00981 {
00982   datareader_dispatch(data, src_prefix, &RtpsUdpDataLink::process_data_i);
00983 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::register_for_reader ( const RepoId writerid,
const RepoId readerid,
const ACE_INET_Addr address,
OpenDDS::DCPS::DiscoveryListener listener 
)

Definition at line 362 of file RtpsUdpDataLink.cpp.

References heartbeat_, heartbeat_counts_, interesting_readers_, and lock_.

00366 {
00367   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00368   bool enableheartbeat = interesting_readers_.empty();
00369   interesting_readers_.insert(InterestingRemoteMapType::value_type(readerid, InterestingRemote(writerid, address, listener)));
00370   heartbeat_counts_[writerid] = 0;
00371   g.release();
00372   if (enableheartbeat) {
00373     heartbeat_->schedule_enable();
00374   }
00375 }

void OpenDDS::DCPS::RtpsUdpDataLink::register_for_writer ( const RepoId readerid,
const RepoId writerid,
const ACE_INET_Addr address,
OpenDDS::DCPS::DiscoveryListener listener 
)

Definition at line 395 of file RtpsUdpDataLink.cpp.

References heartbeatchecker_, interesting_writers_, and lock_.

00399 {
00400   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00401   bool enableheartbeatchecker = interesting_writers_.empty();
00402   interesting_writers_.insert(InterestingRemoteMapType::value_type(writerid, InterestingRemote(readerid, address, listener)));
00403   g.release();
00404   if (enableheartbeatchecker) {
00405     heartbeatchecker_->schedule_enable();
00406   }
00407 }

ACE_INLINE void OpenDDS::DCPS::RtpsUdpDataLink::release_remote_i ( const RepoId remote_id  )  [private, virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 42 of file RtpsUdpDataLink.inl.

References lock_.

00043 {
00044   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00045   locators_.erase(remote_id);
00046 }

void OpenDDS::DCPS::RtpsUdpDataLink::release_reservations_i ( const RepoId remote_id,
const RepoId local_id 
) [private, virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 502 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::elems_not_acked_, heartbeat_counts_, OpenDDS::DCPS::GuidConverter::isReader(), OpenDDS::DCPS::GuidConverter::isWriter(), lock_, OPENDDS_MULTIMAP, OPENDDS_SET(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), process_acked_by_all_i(), reader_index_, readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::to_deliver_, and writers_.

00504 {
00505   OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00506   OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00507   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00508   using std::pair;
00509   const GuidConverter conv(local_id);
00510   if (conv.isWriter()) {
00511     const RtpsWriterMap::iterator rw = writers_.find(local_id);
00512 
00513     if (rw != writers_.end()) {
00514       rw->second.remote_readers_.erase(remote_id);
00515 
00516       if (rw->second.remote_readers_.empty()) {
00517         RtpsWriter& writer = rw->second;
00518         typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00519 
00520         if (!writer.to_deliver_.empty()) {
00521           iter_t iter = writer.to_deliver_.begin();
00522           while (iter != writer.to_deliver_.end()) {
00523             to_deliver.push_back(iter->second);
00524             writer.to_deliver_.erase(iter);
00525             iter = writer.to_deliver_.begin();
00526           }
00527         }
00528         if (!writer.elems_not_acked_.empty()) {
00529           OPENDDS_SET(SequenceNumber) sns_to_release;
00530           iter_t iter = writer.elems_not_acked_.begin();
00531           while (iter != writer.elems_not_acked_.end()) {
00532             to_drop.push_back(iter->second);
00533             sns_to_release.insert(iter->first);
00534             writer.elems_not_acked_.erase(iter);
00535             iter = writer.elems_not_acked_.begin();
00536           }
00537           OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00538           while (sns_it != sns_to_release.end()) {
00539             writer.send_buff_->release_acked(*sns_it);
00540             ++sns_it;
00541           }
00542         }
00543         heartbeat_counts_.erase(rw->first);
00544         writers_.erase(rw);
00545       } else {
00546         process_acked_by_all_i(g, local_id);
00547       }
00548     }
00549 
00550   } else if (conv.isReader()) {
00551     const RtpsReaderMap::iterator rr = readers_.find(local_id);
00552 
00553     if (rr != readers_.end()) {
00554       rr->second.remote_writers_.erase(remote_id);
00555 
00556       for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00557              reader_index_.equal_range(remote_id);
00558            iters.first != iters.second;) {
00559         if (iters.first->second == rr) {
00560           reader_index_.erase(iters.first++);
00561         } else {
00562           ++iters.first;
00563         }
00564       }
00565 
00566       if (rr->second.remote_writers_.empty()) {
00567         readers_.erase(rr);
00568       }
00569     }
00570   }
00571   g.release();
00572   typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00573   tqe_iter deliver_it = to_deliver.begin();
00574   while (deliver_it != to_deliver.end()) {
00575     (*deliver_it)->data_delivered();
00576     ++deliver_it;
00577   }
00578   tqe_iter drop_it = to_drop.begin();
00579   while (drop_it != to_drop.end()) {
00580     (*drop_it)->data_dropped(true);
00581     ++drop_it;
00582   }
00583 }

Here is the call graph for this function:

RemoveResult OpenDDS::DCPS::RtpsUdpDataLink::remove_sample ( const DataSampleElement sample,
void *  context 
) [private, virtual]

This method is essentially an "undo_send()" method. It's goal is to remove all traces of the sample from this DataLink (if the sample is even known to the DataLink).

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 494 of file RtpsUdpDataLink.cpp.

References lock_, and OpenDDS::DCPS::REMOVE_ERROR.

00495 {
00496   // see comment in RtpsUdpDataLink::send_i() for lock order
00497   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, REMOVE_ERROR);
00498   return DataLink::remove_sample(sample, &g);
00499 }

bool OpenDDS::DCPS::RtpsUdpDataLink::requires_inline_qos ( const GUIDSeq_var &  peers  )  [private]

Definition at line 868 of file RtpsUdpDataLink.cpp.

References force_inline_qos_, and OPENDDS_MAP_CMP().

Referenced by customize_queue_element().

00869 {
00870   if (force_inline_qos_) {
00871     // Force true for testing purposes
00872     return true;
00873   } else {
00874     if (!peers.ptr()) {
00875       return false;
00876     }
00877     typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::iterator iter_t;
00878     for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00879       const iter_t iter = locators_.find(peers[i]);
00880       if (iter != locators_.end() && iter->second.requires_inline_qos_) {
00881         return true;
00882       }
00883     }
00884     return false;
00885   }
00886 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_ack_nacks ( RtpsReaderMap::iterator  rr,
bool  finalFlag = false 
) [private]

Definition at line 1255 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::DCPS::gen_find_size(), generate_nack_frags(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::high(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::DisjointSequence::last_ack(), LM_ERROR, OpenDDS::DCPS::DisjointSequence::low(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::SequenceNumber::previous(), receive_strategy(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::DisjointSequence::to_bitmap(), OpenDDS::DCPS::Transport_debug_level, and OpenDDS::DCPS::SequenceNumber::ZERO().

Referenced by send_final_acks(), and send_heartbeat_replies().

01256 {
01257   using namespace OpenDDS::RTPS;
01258 
01259   WriterInfoMap& writers = rr->second.remote_writers_;
01260   for (WriterInfoMap::iterator wi = writers.begin(); wi != writers.end();
01261        ++wi) {
01262 
01263     // if we have some negative acknowledgments, we'll ask for a reply
01264     DisjointSequence& recvd = wi->second.recvd_;
01265     const bool nack = wi->second.should_nack() ||
01266       rr->second.nack_durable(wi->second);
01267     bool final = finalFlag || !nack;
01268 
01269     if (wi->second.ack_pending_ || nack || finalFlag) {
01270       const bool prev_ack_pending = wi->second.ack_pending_;
01271       wi->second.ack_pending_ = false;
01272 
01273       SequenceNumber ack;
01274       CORBA::ULong num_bits = 1;
01275       LongSeq8 bitmap;
01276       bitmap.length(1);
01277       bitmap[0] = 0;
01278 
01279       const SequenceNumber& hb_low = wi->second.hb_range_.first;
01280       const SequenceNumber& hb_high = wi->second.hb_range_.second;
01281       const SequenceNumber::Value hb_low_val = hb_low.getValue(),
01282         hb_high_val = hb_high.getValue();
01283 
01284       if (recvd.empty()) {
01285         // Nack the entire heartbeat range.  Only reached when durable.
01286         ack = hb_low;
01287         bitmap.length(bitmap_num_longs(ack, hb_high));
01288         const CORBA::ULong idx = (hb_high_val > hb_low_val + 255)
01289           ? 255
01290           : CORBA::ULong(hb_high_val - hb_low_val);
01291         DisjointSequence::fill_bitmap_range(0, idx,
01292                                             bitmap.get_buffer(),
01293                                             bitmap.length(), num_bits);
01294       } else if (((prev_ack_pending && !nack) || rr->second.nack_durable(wi->second)) && recvd.low() > hb_low) {
01295         // Nack the range between the heartbeat low and the recvd low.
01296         ack = hb_low;
01297         const SequenceNumber& rec_low = recvd.low();
01298         const SequenceNumber::Value rec_low_val = rec_low.getValue();
01299         bitmap.length(bitmap_num_longs(ack, rec_low));
01300         const CORBA::ULong idx = (rec_low_val > hb_low_val + 255)
01301           ? 255
01302           : CORBA::ULong(rec_low_val - hb_low_val);
01303         DisjointSequence::fill_bitmap_range(0, idx,
01304                                             bitmap.get_buffer(),
01305                                             bitmap.length(), num_bits);
01306 
01307       } else {
01308         ack = ++SequenceNumber(recvd.cumulative_ack());
01309         if (recvd.low().getValue() > 1) {
01310           // since the "ack" really is cumulative, we need to make
01311           // sure that a lower discontinuity is not possible later
01312           recvd.insert(SequenceRange(SequenceNumber::ZERO(), recvd.low()));
01313         }
01314 
01315         if (recvd.disjoint()) {
01316           bitmap.length(bitmap_num_longs(ack, recvd.last_ack().previous()));
01317           recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
01318                           num_bits, true);
01319         }
01320       }
01321 
01322       const SequenceNumber::Value ack_val = ack.getValue();
01323 
01324       if (!recvd.empty() && hb_high > recvd.high()) {
01325         const SequenceNumber eff_high =
01326           (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
01327         const SequenceNumber::Value eff_high_val = eff_high.getValue();
01328         // Nack the range between the received high and the effective high.
01329         const CORBA::ULong old_len = bitmap.length(),
01330           new_len = bitmap_num_longs(ack, eff_high);
01331         if (new_len > old_len) {
01332           bitmap.length(new_len);
01333           for (CORBA::ULong i = old_len; i < new_len; ++i) {
01334             bitmap[i] = 0;
01335           }
01336         }
01337         const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
01338           idx_recv_high = recvd.disjoint() ?
01339           CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
01340         DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
01341                                             bitmap.get_buffer(), new_len,
01342                                             num_bits);
01343       }
01344 
01345       // If the receive strategy is holding any fragments, those should
01346       // not be "nacked" in the ACKNACK reply.  They will be accounted for
01347       // in the NACK_FRAG(s) instead.
01348       bool frags_modified =
01349         receive_strategy()->remove_frags_from_bitmap(bitmap.get_buffer(),
01350                                                  num_bits, ack, wi->first);
01351       if (frags_modified && !final) { // change to final if bitmap is empty
01352         final = true;
01353         for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
01354           if ((i + 1) * 32 <= num_bits) {
01355             if (bitmap[i]) {
01356               final = false;
01357               break;
01358             }
01359           } else {
01360             if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
01361               final = false;
01362               break;
01363             }
01364           }
01365         }
01366       }
01367 
01368       AckNackSubmessage acknack = {
01369         {ACKNACK,
01370          CORBA::Octet(FLAG_E | (final ? FLAG_F : 0)),
01371          0 /*length*/},
01372         rr->first.entityId,
01373         wi->first.entityId,
01374         { // SequenceNumberSet: acking bitmapBase - 1
01375           {ack.getHigh(), ack.getLow()},
01376           num_bits, bitmap
01377         },
01378         {++wi->second.acknack_count_}
01379       };
01380 
01381       size_t size = 0, padding = 0;
01382       gen_find_size(acknack, size, padding);
01383       acknack.smHeader.submessageLength =
01384         static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01385       InfoDestinationSubmessage info_dst = {
01386         {INFO_DST, FLAG_E, INFO_DST_SZ},
01387         {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01388       };
01389       gen_find_size(info_dst, size, padding);
01390 
01391       OPENDDS_VECTOR(NackFragSubmessage) nack_frags;
01392       size += generate_nack_frags(nack_frags, wi->second, wi->first);
01393 
01394       ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators?
01395       // byte swapping is handled in the operator<<() implementation
01396       Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01397       std::memcpy(info_dst.guidPrefix, wi->first.guidPrefix,
01398                   sizeof(GuidPrefix_t));
01399       ser << info_dst;
01400       // Interoperability note: we used to insert INFO_REPLY submessage here, but
01401       // testing indicated that other DDS implementations didn't accept it.
01402       ser << acknack;
01403       for (size_t i = 0; i < nack_frags.size(); ++i) {
01404         nack_frags[i].readerId = rr->first.entityId;
01405         nack_frags[i].writerId = wi->first.entityId;
01406         ser << nack_frags[i]; // always 4-byte aligned
01407       }
01408 
01409       if (!locators_.count(wi->first)) {
01410         if (Transport_debug_level) {
01411           const GuidConverter conv(wi->first);
01412           ACE_ERROR((LM_ERROR,
01413                      "(%P|%t) RtpsUdpDataLink::send_heartbeat_replies() - "
01414                      "no locator for remote %C\n", OPENDDS_STRING(conv).c_str()));
01415         }
01416       } else {
01417         send_strategy()->send_rtps_control(mb_acknack,
01418                                           locators_[wi->first].addr_);
01419       }
01420     }
01421   }
01422 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_directed_heartbeats ( OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)&  hbs  )  [private]

Definition at line 2480 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::DCPS::assign(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, OpenDDS::RTPS::FLAG_E, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::HEARTBEAT_SZ, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, local_prefix_, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::RTPS::InfoDestinationSubmessage::smHeader, and writers_.

Referenced by send_heartbeats().

02481 {
02482 #if defined(OPENDDS_SECURITY)
02483   const EntityId_t& volatile_writer =
02484     RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
02485 
02486   RTPS::InfoDestinationSubmessage idst;
02487   idst.smHeader.submessageId = RTPS::INFO_DST;
02488   idst.smHeader.flags = RTPS::FLAG_E;
02489   idst.smHeader.submessageLength = RTPS::INFO_DST_SZ;
02490   const size_t block_size = RTPS::INFO_DST_SZ + RTPS::HEARTBEAT_SZ
02491     + 2 * RTPS::SMHDR_SZ;
02492   Message_Block_Ptr mb;
02493 
02494   typedef OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)::iterator iter_t;
02495   iter_t it = hbs.begin(), last = hbs.end();
02496   while (it != last) {
02497     if (0 == std::memcmp(&it->writerId, &volatile_writer, sizeof(EntityId_t))) {
02498       RepoId local;
02499       RTPS::assign(local.guidPrefix, local_prefix_);
02500       local.entityId = it->writerId;
02501       RtpsWriterMap::const_iterator rw = writers_.find(local);
02502       if (rw != writers_.end()) {
02503         const ReaderInfoMap& rinfo = rw->second.remote_readers_;
02504         for (ReaderInfoMap::const_iterator ri = rinfo.begin();
02505              ri != rinfo.end(); ++ri) {
02506           RTPS::assign(idst.guidPrefix, ri->first.guidPrefix);
02507           it->readerId = ri->first.entityId;
02508           if (mb) {
02509             mb->reset();
02510           } else {
02511             mb.reset(new ACE_Message_Block(block_size));
02512           }
02513           Serializer ser(mb.get(), false, Serializer::ALIGN_CDR);
02514           ser << idst;
02515           ser << *it;
02516           send_strategy()->send_rtps_control(*mb, locators_[ri->first].addr_);
02517         }
02518       }
02519       std::iter_swap(it, --last);
02520     } else {
02521       ++it;
02522     }
02523   }
02524   hbs.erase(last, hbs.end());
02525 #else
02526   ACE_UNUSED_ARG(hbs);
02527 #endif
02528 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_directed_nack_replies ( const RepoId writerId,
RtpsWriter writer,
const RepoId readerId,
ReaderInfo reader 
) [private]

Definition at line 2062 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, marshal_gaps(), OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations(), process_requested_changes(), ACE_Message_Block::release(), OpenDDS::DCPS::SingleSendBuffer::resend_i(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::DCPS::TransportSendBuffer::strategy_lock(), and OpenDDS::DCPS::Transport_debug_level.

Referenced by send_nack_replies().

02066 {
02067   if (!locators_.count(readerId)) {
02068     return;
02069   }
02070 
02071   DisjointSequence requests;
02072   process_requested_changes(requests, writer, reader);
02073   reader.requested_changes_.clear();
02074 
02075   DisjointSequence gaps;
02076   ACE_INET_Addr addr = locators_[readerId].addr_;
02077 
02078   if (!requests.empty()) {
02079     if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
02080       gaps = requests;
02081     } else {
02082       OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
02083       SingleSendBuffer& sb = *writer.send_buff_;
02084       ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
02085       const RtpsUdpSendStrategy::OverrideToken ot =
02086         send_strategy()->override_destinations(addr);
02087       for (size_t i = 0; i < ranges.size(); ++i) {
02088         if (Transport_debug_level > 5) {
02089           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_directed_nack_replies "
02090                      "resend data %d-%d\n", int(ranges[i].first.getValue()),
02091                      int(ranges[i].second.getValue())));
02092         }
02093         sb.resend_i(ranges[i], &gaps, readerId);
02094       }
02095     }
02096   }
02097 
02098   if (gaps.empty()) {
02099     return;
02100   }
02101   if (Transport_debug_level > 5) {
02102     ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_directed_nack_replies GAPs: "));
02103     gaps.dump();
02104   }
02105   ACE_Message_Block* mb_gap =
02106     marshal_gaps(writerId, readerId, gaps, writer.durable_);
02107   if (mb_gap) {
02108     send_strategy()->send_rtps_control(*mb_gap, addr);
02109     mb_gap->release();
02110   }
02111 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_durability_gaps ( const RepoId writer,
const RepoId reader,
const DisjointSequence gaps 
) [private]

Definition at line 2295 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, ACE_Message_Block::cont(), OpenDDS::RTPS::FLAG_E, get_locator(), OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::InfoDestinationSubmessage::guidPrefix, OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, marshal_gaps(), ACE_Message_Block::release(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), and OpenDDS::RTPS::SMHDR_SZ.

Referenced by received().

02298 {
02299   ACE_Message_Block mb(RTPS::INFO_DST_SZ + RTPS::SMHDR_SZ);
02300   Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02301   RTPS::InfoDestinationSubmessage info_dst = {
02302     {RTPS::INFO_DST, RTPS::FLAG_E, RTPS::INFO_DST_SZ},
02303     {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02304   };
02305   std::memcpy(info_dst.guidPrefix, reader.guidPrefix, sizeof(GuidPrefix_t));
02306   ser << info_dst;
02307   mb.cont(marshal_gaps(writer, reader, gaps));
02308   send_strategy()->send_rtps_control(mb, get_locator(reader));
02309   mb.cont()->release();
02310 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_final_acks ( const RepoId readerid  )  [virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 2800 of file RtpsUdpDataLink.cpp.

References lock_, readers_, and send_ack_nacks().

02801 {
02802   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02803   RtpsReaderMap::iterator rr = readers_.find (readerid);
02804   if (rr != readers_.end ()) {
02805     send_ack_nacks (rr, true);
02806   }
02807 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeat_replies (  )  [private]

Definition at line 1425 of file RtpsUdpDataLink.cpp.

References OpenDDS::RTPS::ACKNACK, OpenDDS::DCPS::Serializer::ALIGN_CDR, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::DCPS::gen_find_size(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), OpenDDS::RTPS::INFO_DST, OpenDDS::RTPS::INFO_DST_SZ, interesting_ack_nacks_, lock_, readers_, send_ack_nacks(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), and OpenDDS::RTPS::SMHDR_SZ.

01426 {
01427   using namespace OpenDDS::RTPS;
01428   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01429 
01430   for (InterestingAckNackSetType::const_iterator pos = interesting_ack_nacks_.begin(),
01431          limit = interesting_ack_nacks_.end();
01432        pos != limit;
01433        ++pos) {
01434 
01435     SequenceNumber ack;
01436     LongSeq8 bitmap;
01437     bitmap.length(0);
01438 
01439     AckNackSubmessage acknack = {
01440       {ACKNACK,
01441        CORBA::Octet(FLAG_E | FLAG_F),
01442        0 /*length*/},
01443       pos->readerid.entityId,
01444       pos->writerid.entityId,
01445       { // SequenceNumberSet: acking bitmapBase - 1
01446         {ack.getHigh(), ack.getLow()},
01447         0 /* num_bits */, bitmap
01448       },
01449       {0 /* acknack count */}
01450     };
01451 
01452     size_t size = 0, padding = 0;
01453     gen_find_size(acknack, size, padding);
01454     acknack.smHeader.submessageLength =
01455       static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01456     InfoDestinationSubmessage info_dst = {
01457       {INFO_DST, FLAG_E, INFO_DST_SZ},
01458       {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01459     };
01460     gen_find_size(info_dst, size, padding);
01461 
01462     ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators?
01463     // byte swapping is handled in the operator<<() implementation
01464     Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01465     std::memcpy(info_dst.guidPrefix, pos->writerid.guidPrefix,
01466                 sizeof(GuidPrefix_t));
01467     ser << info_dst;
01468     // Interoperability note: we used to insert INFO_REPLY submessage here, but
01469     // testing indicated that other DDS implementations didn't accept it.
01470     ser << acknack;
01471 
01472     send_strategy()->send_rtps_control(mb_acknack, pos->writer_address);
01473   }
01474   interesting_ack_nacks_.clear();
01475 
01476   for (RtpsReaderMap::iterator rr = readers_.begin(); rr != readers_.end();
01477        ++rr) {
01478     send_ack_nacks (rr);
01479   }
01480 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats (  )  [private]

Definition at line 2313 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, config(), OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::DOES_NOT_EXIST, OpenDDS::DCPS::RtpsUdpInst::durable_data_timeout_, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::EXISTS, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), ACE_OS::gettimeofday(), OpenDDS::RTPS::HEARTBEAT, heartbeat_, heartbeat_counts_, OpenDDS::DCPS::RtpsUdpInst::heartbeat_period_, OpenDDS::RTPS::HEARTBEAT_SZ, interesting_readers_, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::listener, LM_ERROR, LM_INFO, OpenDDS::DCPS::RtpsUdpDataLink::InterestingRemote::localid, lock_, OpenDDS::DCPS::OPENDDS_MAP(), OPENDDS_SET(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::DiscoveryListener::reader_does_not_exist(), send_directed_heartbeats(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::RTPS::SMHDR_SZ, OpenDDS::DCPS::Transport_debug_level, VDBG_LVL, and writers_.

02314 {
02315   OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks;
02316   OPENDDS_VECTOR(TransportQueueElement*) pendingCallbacks;
02317 
02318   {
02319     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02320 
02321     if (writers_.empty() && interesting_readers_.empty()) {
02322       heartbeat_->disable();
02323     }
02324 
02325     using namespace OpenDDS::RTPS;
02326     OPENDDS_VECTOR(HeartBeatSubmessage) subm;
02327     OPENDDS_SET(ACE_INET_Addr) recipients;
02328     const ACE_Time_Value now = ACE_OS::gettimeofday();
02329 
02330     RepoIdSet writers_to_advertise;
02331 
02332     RtpsUdpInst& config = this->config();
02333 
02334     const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config.heartbeat_period_;
02335     const ACE_Time_Value tv3 = ACE_OS::gettimeofday() - 3 * config.heartbeat_period_;
02336     for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
02337            limit = interesting_readers_.end();
02338          pos != limit;
02339          ++pos) {
02340       if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
02341           (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
02342         recipients.insert(pos->second.address);
02343         writers_to_advertise.insert(pos->second.localid);
02344       }
02345       if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02346         CallbackType callback(pos->first, pos->second);
02347         readerDoesNotExistCallbacks.push_back(callback);
02348         pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02349       }
02350     }
02351 
02352     typedef RtpsWriterMap::iterator rw_iter;
02353     for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
02354       const bool has_data = !rw->second.send_buff_.is_nil()
02355                             && !rw->second.send_buff_->empty();
02356       bool final = true, has_durable_data = false;
02357       SequenceNumber durable_max;
02358 
02359       typedef ReaderInfoMap::iterator ri_iter;
02360       const ri_iter end = rw->second.remote_readers_.end();
02361       for (ri_iter ri = rw->second.remote_readers_.begin(); ri != end; ++ri) {
02362         if ((has_data || !ri->second.handshake_done_)
02363             && locators_.count(ri->first)) {
02364           recipients.insert(locators_[ri->first].addr_);
02365           if (final && !ri->second.handshake_done_) {
02366             final = false;
02367           }
02368         }
02369         if (!ri->second.durable_data_.empty()) {
02370           const ACE_Time_Value expiration =
02371             ri->second.durable_timestamp_ + config.durable_data_timeout_;
02372           if (now > expiration) {
02373             typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator
02374               dd_iter;
02375             for (dd_iter it = ri->second.durable_data_.begin();
02376                  it != ri->second.durable_data_.end(); ++it) {
02377               pendingCallbacks.push_back(it->second);
02378             }
02379             ri->second.durable_data_.clear();
02380             if (Transport_debug_level > 3) {
02381               const GuidConverter gw(rw->first), gr(ri->first);
02382               VDBG_LVL((LM_INFO, "(%P|%t) RtpsUdpDataLink::send_heartbeats - "
02383                 "removed expired durable data for %C -> %C\n",
02384                 OPENDDS_STRING(gw).c_str(), OPENDDS_STRING(gr).c_str()), 3);
02385             }
02386           } else {
02387             has_durable_data = true;
02388             if (ri->second.durable_data_.rbegin()->first > durable_max) {
02389               durable_max = ri->second.durable_data_.rbegin()->first;
02390             }
02391             if (locators_.count(ri->first)) {
02392               recipients.insert(locators_[ri->first].addr_);
02393             }
02394           }
02395         }
02396       }
02397 
02398       if (!rw->second.elems_not_acked_.empty()) {
02399         final = false;
02400       }
02401 
02402       if (writers_to_advertise.count(rw->first)) {
02403         final = false;
02404         writers_to_advertise.erase(rw->first);
02405       }
02406 
02407       if (final && !has_data && !has_durable_data) {
02408         continue;
02409       }
02410 
02411       const SequenceNumber firstSN = (rw->second.durable_ || !has_data)
02412                                      ? 1 : rw->second.send_buff_->low(),
02413           lastSN = std::max(durable_max,
02414                             has_data ? rw->second.send_buff_->high() : 1);
02415 
02416       const HeartBeatSubmessage hb = {
02417         {HEARTBEAT,
02418          CORBA::Octet(FLAG_E | (final ? FLAG_F : 0)),
02419          HEARTBEAT_SZ},
02420         ENTITYID_UNKNOWN, // any matched reader may be interested in this
02421         rw->first.entityId,
02422         {firstSN.getHigh(), firstSN.getLow()},
02423         {lastSN.getHigh(), lastSN.getLow()},
02424         {++heartbeat_counts_[rw->first]}
02425       };
02426       subm.push_back(hb);
02427     }
02428 
02429     for (RepoIdSet::const_iterator pos = writers_to_advertise.begin(),
02430            limit = writers_to_advertise.end();
02431          pos != limit;
02432          ++pos) {
02433       const SequenceNumber SN = 1;
02434       const HeartBeatSubmessage hb = {
02435         {HEARTBEAT,
02436          FLAG_E,
02437          HEARTBEAT_SZ},
02438         ENTITYID_UNKNOWN, // any matched reader may be interested in this
02439         pos->entityId,
02440         {SN.getHigh(), SN.getLow()},
02441         {SN.getHigh(), SN.getLow()},
02442         {++heartbeat_counts_[*pos]}
02443       };
02444       subm.push_back(hb);
02445     }
02446 
02447     send_directed_heartbeats(subm);
02448 
02449     if (!subm.empty()) {
02450       ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * subm.size()); //FUTURE: allocators?
02451       // byte swapping is handled in the operator<<() implementation
02452       Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02453       bool send_ok = true;
02454       for (size_t i = 0; i < subm.size(); ++i) {
02455         if (!(ser << subm[i])) {
02456           ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats() - "
02457             "failed to serialize HEARTBEAT submessage %B\n", i));
02458           send_ok = false;
02459           break;
02460         }
02461       }
02462       if (send_ok) {
02463         send_strategy()->send_rtps_control(mb, recipients);
02464       }
02465     }
02466   }
02467 
02468   for (OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks.begin();
02469       iter != readerDoesNotExistCallbacks.end(); ++iter){
02470     const InterestingRemote& remote = iter->second;
02471     remote.listener->reader_does_not_exist(iter->first, remote.localid);
02472   }
02473 
02474   for (size_t i = 0; i < pendingCallbacks.size(); ++i) {
02475     pendingCallbacks[i]->data_dropped();
02476   }
02477 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_heartbeats_manual ( const TransportSendControlElement tsce  )  [private]

Definition at line 2560 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::Serializer::ALIGN_CDR, best_effort_heartbeat_count_, config(), OpenDDS::DCPS::RtpsUdpInst::durable_data_timeout_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_UNKNOWN, OpenDDS::RTPS::FLAG_E, OpenDDS::RTPS::FLAG_F, OpenDDS::RTPS::FLAG_L, get_locators(), OpenDDS::DCPS::SequenceNumber::getHigh(), OpenDDS::DCPS::SequenceNumber::getLow(), ACE_OS::gettimeofday(), OpenDDS::RTPS::HEARTBEAT, heartbeat_counts_, OpenDDS::RTPS::HEARTBEAT_SZ, LM_ERROR, OPENDDS_SET(), OpenDDS::DCPS::TransportSendControlElement::publication_id(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::DCPS::TransportSendControlElement::sequence(), OpenDDS::RTPS::SMHDR_SZ, and writers_.

Referenced by customize_queue_element().

02561 {
02562   using namespace OpenDDS::RTPS;
02563 
02564   const RepoId pub_id = tsce->publication_id();
02565 
02566   // Populate the recipients.
02567   OPENDDS_SET(ACE_INET_Addr) recipients;
02568   get_locators (pub_id, recipients);
02569   if (recipients.empty()) {
02570     return;
02571   }
02572 
02573   // Populate the sequence numbers and counter.
02574 
02575   SequenceNumber firstSN, lastSN;
02576   CORBA::Long counter;
02577   RtpsWriterMap::iterator pos = writers_.find (pub_id);
02578   if (pos != writers_.end ()) {
02579     // Reliable.
02580     const bool has_data = !pos->second.send_buff_.is_nil() && !pos->second.send_buff_->empty();
02581     SequenceNumber durable_max;
02582     const ACE_Time_Value now = ACE_OS::gettimeofday();
02583     for (ReaderInfoMap::const_iterator ri = pos->second.remote_readers_.begin(), end = pos->second.remote_readers_.end();
02584          ri != end;
02585          ++ri) {
02586       if (!ri->second.durable_data_.empty()) {
02587         const ACE_Time_Value expiration = ri->second.durable_timestamp_ + config().durable_data_timeout_;
02588         if (now <= expiration &&
02589             ri->second.durable_data_.rbegin()->first > durable_max) {
02590           durable_max = ri->second.durable_data_.rbegin()->first;
02591         }
02592       }
02593     }
02594     firstSN = (pos->second.durable_ || !has_data) ? 1 : pos->second.send_buff_->low();
02595     lastSN = std::max(durable_max, has_data ? pos->second.send_buff_->high() : 1);
02596     counter = ++heartbeat_counts_[pos->first];
02597   } else {
02598     // Unreliable.
02599     firstSN = 1;
02600     lastSN = tsce->sequence();
02601     counter = ++this->best_effort_heartbeat_count_;
02602   }
02603 
02604   const HeartBeatSubmessage hb = {
02605     {HEARTBEAT,
02606      CORBA::Octet(FLAG_E | FLAG_F | FLAG_L),
02607      HEARTBEAT_SZ},
02608     ENTITYID_UNKNOWN, // any matched reader may be interested in this
02609     pub_id.entityId,
02610     {firstSN.getHigh(), firstSN.getLow()},
02611     {lastSN.getHigh(), lastSN.getLow()},
02612     {counter}
02613   };
02614 
02615   ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * 1); //FUTURE: allocators?
02616   // byte swapping is handled in the operator<<() implementation
02617   Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02618   if ((ser << hb)) {
02619     send_strategy()->send_rtps_control(mb, recipients);
02620   }
02621   else {
02622     ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats_manual() - "
02623                "failed to serialize HEARTBEAT submessage\n"));
02624   }
02625 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_i ( TransportQueueElement element,
bool  relink = true 
) [private, virtual]

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 484 of file RtpsUdpDataLink.cpp.

References lock_.

00485 {
00486   // Lock here to maintain the locking order:
00487   // RtpsUdpDataLink before RtpsUdpSendStrategy
00488   // which is required for resending due to nacks
00489   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00490   DataLink::send_i(element, relink);
00491 }

void OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies (  )  [private]

Definition at line 1889 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::durable_, OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, lock_, marshal_gaps(), OpenDDS::DCPS::SequenceNumber::MAX_VALUE, OPENDDS_SET(), OPENDDS_STRING, OpenDDS::DCPS::DataLink::OPENDDS_VECTOR(), OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations(), process_requested_changes(), ACE_Message_Block::release(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::SingleSendBuffer::resend_i(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, send_directed_nack_replies(), send_nackfrag_replies(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control(), send_strategy(), OpenDDS::DCPS::TransportSendBuffer::strategy_lock(), OpenDDS::DCPS::Transport_debug_level, and writers_.

01890 {
01891   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01892   // Reply from local DW to remote DR: GAP or DATA
01893   using namespace OpenDDS::RTPS;
01894   typedef RtpsWriterMap::iterator rw_iter;
01895   for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
01896 
01897     // consolidate requests from N readers
01898     OPENDDS_SET(ACE_INET_Addr) recipients;
01899     DisjointSequence requests;
01900     RtpsWriter& writer = rw->second;
01901 
01902     //track if any messages have been fully acked by all readers
01903     SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
01904 
01905 #if defined(OPENDDS_SECURITY)
01906     const EntityId_t& pvs_writer =
01907       RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
01908     const bool is_pvs_writer =
01909       0 == std::memcmp(&pvs_writer, &rw->first.entityId, sizeof pvs_writer);
01910 #endif
01911 
01912     typedef ReaderInfoMap::iterator ri_iter;
01913     const ri_iter end = writer.remote_readers_.end();
01914     for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01915 
01916       if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
01917         all_readers_ack = ri->second.cur_cumulative_ack_;
01918       }
01919 
01920 #if defined(OPENDDS_SECURITY)
01921       if (is_pvs_writer && !ri->second.requested_changes_.empty()) {
01922         send_directed_nack_replies(rw->first, writer, ri->first, ri->second);
01923         continue;
01924       }
01925 #endif
01926 
01927       process_requested_changes(requests, writer, ri->second);
01928 
01929       if (!ri->second.requested_changes_.empty()) {
01930         if (locators_.count(ri->first)) {
01931           recipients.insert(locators_[ri->first].addr_);
01932           if (Transport_debug_level > 5) {
01933             const GuidConverter local_conv(rw->first), remote_conv(ri->first);
01934             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01935                        "local %C remote %C requested resend\n",
01936                        OPENDDS_STRING(local_conv).c_str(),
01937                        OPENDDS_STRING(remote_conv).c_str()));
01938           }
01939         }
01940         ri->second.requested_changes_.clear();
01941       }
01942     }
01943 
01944     DisjointSequence gaps;
01945     if (!requests.empty()) {
01946       if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
01947         gaps = requests;
01948       } else {
01949         OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
01950         SingleSendBuffer& sb = *writer.send_buff_;
01951         ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
01952         const RtpsUdpSendStrategy::OverrideToken ot =
01953           send_strategy()->override_destinations(recipients);
01954         for (size_t i = 0; i < ranges.size(); ++i) {
01955           if (Transport_debug_level > 5) {
01956             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01957                        "resend data %d-%d\n", int(ranges[i].first.getValue()),
01958                        int(ranges[i].second.getValue())));
01959           }
01960           sb.resend_i(ranges[i], &gaps);
01961         }
01962       }
01963     }
01964 
01965     send_nackfrag_replies(writer, gaps, recipients);
01966 
01967     if (!gaps.empty()) {
01968       if (Transport_debug_level > 5) {
01969         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01970                    "GAPs:"));
01971         gaps.dump();
01972       }
01973       ACE_Message_Block* mb_gap =
01974         marshal_gaps(rw->first, GUID_UNKNOWN, gaps, writer.durable_);
01975       if (mb_gap) {
01976         send_strategy()->send_rtps_control(*mb_gap, recipients);
01977         mb_gap->release();
01978       }
01979     }
01980     if (all_readers_ack == SequenceNumber::MAX_VALUE) {
01981       continue;
01982     }
01983   }
01984 }

Here is the call graph for this function:

void OpenDDS::DCPS::RtpsUdpDataLink::send_nackfrag_replies ( RtpsWriter writer,
DisjointSequence gaps,
OPENDDS_SET(ACE_INET_Addr)&  gap_recipients 
) [private]

Definition at line 1987 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::OPENDDS_MAP(), OpenDDS::DCPS::RtpsUdpSendStrategy::override_destinations(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remote_readers_, OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::send_buff_, and send_strategy().

Referenced by send_nack_replies().

01990 {
01991   typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
01992   OPENDDS_MAP(ACE_INET_Addr, FragmentInfo) requests;
01993 
01994   typedef ReaderInfoMap::iterator ri_iter;
01995   const ri_iter end = writer.remote_readers_.end();
01996   for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01997 
01998     if (ri->second.requested_frags_.empty() || !locators_.count(ri->first)) {
01999       continue;
02000     }
02001 
02002     const ACE_INET_Addr& remote_addr = locators_[ri->first].addr_;
02003 
02004     typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet)::iterator rf_iter;
02005     const rf_iter rf_end = ri->second.requested_frags_.end();
02006     for (rf_iter rf = ri->second.requested_frags_.begin(); rf != rf_end; ++rf) {
02007 
02008       const SequenceNumber& seq = rf->first;
02009       if (writer.send_buff_->contains(seq)) {
02010         FragmentInfo& fi = requests[remote_addr];
02011         fi[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
02012                        rf->second.bitmap.get_buffer());
02013       } else {
02014         gaps.insert(seq);
02015         gap_recipients.insert(remote_addr);
02016       }
02017     }
02018     ri->second.requested_frags_.clear();
02019   }
02020 
02021   typedef OPENDDS_MAP(ACE_INET_Addr, FragmentInfo)::iterator req_iter;
02022   for (req_iter req = requests.begin(); req != requests.end(); ++req) {
02023     const FragmentInfo& fi = req->second;
02024 
02025     ACE_GUARD(TransportSendBuffer::LockType, guard,
02026       writer.send_buff_->strategy_lock());
02027     const RtpsUdpSendStrategy::OverrideToken ot =
02028       send_strategy()->override_destinations(req->first);
02029 
02030     for (FragmentInfo::const_iterator sn_iter = fi.begin();
02031          sn_iter != fi.end(); ++sn_iter) {
02032       const SequenceNumber& seq = sn_iter->first;
02033       writer.send_buff_->resend_fragments_i(seq, sn_iter->second);
02034     }
02035   }
02036 }

Here is the call graph for this function:

Here is the caller graph for this function:

OpenDDS::DCPS::RtpsUdpSendStrategy * OpenDDS::DCPS::RtpsUdpDataLink::send_strategy (  )  [private]
void OpenDDS::DCPS::RtpsUdpDataLink::stop_i (  )  [private, virtual]

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 586 of file RtpsUdpDataLink.cpp.

References OpenDDS::DCPS::RtpsUdpDataLink::TimedDelay::cancel(), ACE_SOCK::close(), heartbeat_, heartbeat_reply_, multicast_socket_, nack_reply_, and unicast_socket_.

Referenced by open().

00587 {
00588   nack_reply_.cancel();
00589   heartbeat_reply_.cancel();
00590   heartbeat_->disable();
00591   unicast_socket_.close();
00592   multicast_socket_.close();
00593 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::RtpsUdpDataLink::unicast_socket (  ) 
void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_reader ( const RepoId writerid,
const RepoId readerid 
)

Definition at line 378 of file RtpsUdpDataLink.cpp.

References interesting_readers_, and lock_.

00380 {
00381   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00382   for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
00383          limit = interesting_readers_.upper_bound(readerid);
00384        pos != limit;
00385        ) {
00386     if (pos->second.localid == writerid) {
00387       interesting_readers_.erase(pos++);
00388     } else {
00389       ++pos;
00390     }
00391   }
00392 }

void OpenDDS::DCPS::RtpsUdpDataLink::unregister_for_writer ( const RepoId readerid,
const RepoId writerid 
)

Definition at line 410 of file RtpsUdpDataLink.cpp.

References interesting_writers_, and lock_.

00412 {
00413   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00414   for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
00415          limit = interesting_writers_.upper_bound(writerid);
00416        pos != limit;
00417        ) {
00418     if (pos->second.localid == readerid) {
00419       interesting_writers_.erase(pos++);
00420     } else {
00421       ++pos;
00422     }
00423   }
00424 }


Friends And Related Function Documentation

friend class ::DDS_TEST [friend]

Definition at line 162 of file RtpsUdpDataLink.h.


Member Data Documentation

Definition at line 378 of file RtpsUdpDataLink.h.

Referenced by send_heartbeats_manual().

bool OpenDDS::DCPS::RtpsUdpDataLink::force_inline_qos_ = false [static, private]

static member used by testing code to force inline qos

Definition at line 164 of file RtpsUdpDataLink.h.

Referenced by requires_inline_qos().

Definition at line 453 of file RtpsUdpDataLink.h.

Referenced by associated(), register_for_reader(), send_heartbeats(), and stop_i().

HeartBeatCountMapType OpenDDS::DCPS::RtpsUdpDataLink::heartbeat_counts_ [private]

Referenced by received(), and stop_i().

Definition at line 453 of file RtpsUdpDataLink.h.

Referenced by register_for_writer().

Definition at line 531 of file RtpsUdpDataLink.h.

Referenced by deliver_held_data().

InterestingAckNackSetType OpenDDS::DCPS::RtpsUdpDataLink::interesting_ack_nacks_ [private]

Definition at line 508 of file RtpsUdpDataLink.h.

Referenced by received(), and send_heartbeat_replies().

InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_readers_ [private]
InterestingRemoteMapType OpenDDS::DCPS::RtpsUdpDataLink::interesting_writers_ [private]

Definition at line 177 of file RtpsUdpDataLink.h.

Referenced by received(), RtpsUdpDataLink(), and send_directed_heartbeats().

Referenced by open().

Definition at line 189 of file RtpsUdpDataLink.h.

Referenced by multicast_socket(), open(), and stop_i().

Referenced by received(), and stop_i().

Definition at line 287 of file RtpsUdpDataLink.h.

Referenced by associated(), and release_reservations_i().

Definition at line 188 of file RtpsUdpDataLink.h.

Referenced by open(), stop_i(), and unicast_socket().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1