RtpsUdpDataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsUdpDataLink.h"
00009 #include "RtpsUdpTransport.h"
00010 #include "RtpsUdpInst.h"
00011 #include "RtpsUdpSendStrategy.h"
00012 #include "RtpsUdpReceiveStrategy.h"
00013 
00014 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00015 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00016 #include "dds/DCPS/transport/framework/TransportSendControlElement.h"
00017 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00018 
00019 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00020 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00021 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00022 #include "dds/DCPS/RTPS/MessageTypes.h"
00023 
00024 #if defined(OPENDDS_SECURITY)
00025 #include "dds/DCPS/RTPS/SecurityHelpers.h"
00026 #include "dds/DCPS/security/framework/SecurityRegistry.h"
00027 #endif
00028 
00029 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00030 
00031 #include "ace/Default_Constants.h"
00032 #include "ace/Log_Msg.h"
00033 #include "ace/Message_Block.h"
00034 #include "ace/Reverse_Lock_T.h"
00035 #include "ace/Reactor.h"
00036 
00037 #include <string.h>
00038 
00039 #ifndef __ACE_INLINE__
00040 # include "RtpsUdpDataLink.inl"
00041 #endif  /* __ACE_INLINE__ */
00042 
00043 namespace {
00044 
00045 /// Return the number of CORBA::Longs required for the bitmap representation of
00046 /// sequence numbers between low and high, inclusive (maximum 8 longs).
00047 CORBA::ULong
00048 bitmap_num_longs(const OpenDDS::DCPS::SequenceNumber& low,
00049                  const OpenDDS::DCPS::SequenceNumber& high)
00050 {
00051   return std::min(CORBA::ULong(8),
00052                   CORBA::ULong((high.getValue() - low.getValue() + 32) / 32));
00053 }
00054 
00055 bool bitmapNonEmpty(const OpenDDS::RTPS::SequenceNumberSet& snSet)
00056 {
00057   for (CORBA::ULong i = 0; i < snSet.bitmap.length(); ++i) {
00058     if (snSet.bitmap[i]) {
00059       if (snSet.numBits >= (i + 1) * 32) {
00060         return true;
00061       }
00062       for (int bit = 31; bit >= 0; --bit) {
00063         if ((snSet.bitmap[i] & (1 << bit))
00064             && snSet.numBits >= i * 32 + (31 - bit)) {
00065           return true;
00066         }
00067       }
00068     }
00069   }
00070   return false;
00071 }
00072 
00073 }
00074 
00075 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00076 
00077 namespace OpenDDS {
00078 namespace DCPS {
00079 
00080 RtpsUdpDataLink::RtpsUdpDataLink(RtpsUdpTransport& transport,
00081                                  const GuidPrefix_t& local_prefix,
00082                                  const RtpsUdpInst& config,
00083                                  const TransportReactorTask_rch& reactor_task)
00084   : DataLink(transport, // 3 data link "attributes", below, are unused
00085              0,         // priority
00086              false,     // is_loopback
00087              false),    // is_active
00088     reactor_task_(reactor_task),
00089     multi_buff_(this, config.nak_depth_),
00090     best_effort_heartbeat_count_(0),
00091     nack_reply_(this, &RtpsUdpDataLink::send_nack_replies,
00092                 config.nak_response_delay_),
00093     heartbeat_reply_(this, &RtpsUdpDataLink::send_heartbeat_replies,
00094                      config.heartbeat_response_delay_),
00095     heartbeat_(make_rch<HeartBeat>(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::send_heartbeats)),
00096     heartbeatchecker_(make_rch<HeartBeat>(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::check_heartbeats)),
00097 #if defined(OPENDDS_SECURITY)
00098     held_data_delivery_handler_(this),
00099     security_config_(Security::SecurityRegistry::instance()->default_config()),
00100     local_crypto_handle_(DDS::HANDLE_NIL)
00101 #else
00102     held_data_delivery_handler_(this)
00103 #endif
00104 {
00105   this->send_strategy_ = make_rch<RtpsUdpSendStrategy>(this, local_prefix);
00106   this->receive_strategy_ = make_rch<RtpsUdpReceiveStrategy>(this, local_prefix);
00107   std::memcpy(local_prefix_, local_prefix, sizeof(GuidPrefix_t));
00108 }
00109 
00110 RtpsUdpInst&
00111 RtpsUdpDataLink::config() const
00112 {
00113   return static_cast<RtpsUdpTransport&>(impl()).config();
00114 }
00115 
00116 bool
00117 RtpsUdpDataLink::add_delayed_notification(TransportQueueElement* element)
00118 {
00119   RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
00120   if (iter != writers_.end()) {
00121 
00122     iter->second.add_elem_awaiting_ack(element);
00123     return true;
00124   }
00125   return false;
00126 }
00127 
00128 void RtpsUdpDataLink::do_remove_sample(const RepoId& pub_id,
00129   const TransportQueueElement::MatchCriteria& criteria,
00130   ACE_Guard<ACE_Thread_Mutex>& guard)
00131 {
00132   RtpsWriter::SnToTqeMap sn_tqe_map;
00133   RtpsWriter::SnToTqeMap to_deliver;
00134   typedef RtpsWriter::SnToTqeMap::iterator iter_t;
00135 
00136   RtpsWriterMap::iterator iter = writers_.find(pub_id);
00137   if (iter != writers_.end() && !iter->second.elems_not_acked_.empty()) {
00138     to_deliver.insert(iter->second.to_deliver_.begin(), iter->second.to_deliver_.end());
00139     iter->second.to_deliver_.clear();
00140     iter_t it = iter->second.elems_not_acked_.begin();
00141     OPENDDS_SET(SequenceNumber) sns_to_release;
00142     while (it != iter->second.elems_not_acked_.end()) {
00143       if (criteria.matches(*it->second)) {
00144         sn_tqe_map.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
00145         sns_to_release.insert(it->first);
00146         iter_t last = it;
00147         ++it;
00148         iter->second.elems_not_acked_.erase(last);
00149       } else {
00150         ++it;
00151       }
00152     }
00153     OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00154     while (sns_it != sns_to_release.end()) {
00155       iter->second.send_buff_->release_acked(*sns_it);
00156       ++sns_it;
00157     }
00158   }
00159 
00160   // see comment in RtpsUdpDataLink::send_i() for lock order
00161   // reverse guard can't be used since that involves re-locking
00162   guard.release();
00163 
00164   iter_t deliver_iter = to_deliver.begin();
00165   while (deliver_iter != to_deliver.end()) {
00166     deliver_iter->second->data_delivered();
00167     ++deliver_iter;
00168   }
00169   iter_t drop_iter = sn_tqe_map.begin();
00170   while (drop_iter != sn_tqe_map.end()) {
00171     drop_iter->second->data_dropped(true);
00172     ++drop_iter;
00173   }
00174 }
00175 
00176 bool
00177 RtpsUdpDataLink::open(const ACE_SOCK_Dgram& unicast_socket)
00178 {
00179   unicast_socket_ = unicast_socket;
00180 
00181   RtpsUdpInst& config = this->config();
00182 
00183   if (config.use_multicast_) {
00184     const OPENDDS_STRING& net_if = config.multicast_interface_;
00185 #ifdef ACE_HAS_MAC_OSX
00186     multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00187                            ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00188 #endif
00189     if (multicast_socket_.join(config.multicast_group_address_, 1,
00190                                net_if.empty() ? 0 :
00191                                ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00192       ACE_ERROR_RETURN((LM_ERROR,
00193                         ACE_TEXT("(%P|%t) ERROR: ")
00194                         ACE_TEXT("RtpsUdpDataLink::open: ")
00195                         ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed: %m\n")),
00196                        false);
00197     }
00198   }
00199 
00200   if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, config.ttl_)) {
00201     ACE_ERROR_RETURN((LM_ERROR,
00202                       ACE_TEXT("(%P|%t) ERROR: ")
00203                       ACE_TEXT("RtpsUdpDataLink::open: ")
00204                       ACE_TEXT("failed to set TTL: %d\n"),
00205                       config.ttl_),
00206                      false);
00207   }
00208 
00209   if (config.send_buffer_size_ > 0) {
00210     int snd_size = config.send_buffer_size_;
00211     if (this->unicast_socket_.set_option(SOL_SOCKET,
00212                                 SO_SNDBUF,
00213                                 (void *) &snd_size,
00214                                 sizeof(snd_size)) < 0
00215         && errno != ENOTSUP) {
00216       ACE_ERROR_RETURN((LM_ERROR,
00217                         ACE_TEXT("(%P|%t) ERROR: ")
00218                         ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00219                         snd_size),
00220                        false);
00221     }
00222   }
00223 
00224   if (config.rcv_buffer_size_ > 0) {
00225     int rcv_size = config.rcv_buffer_size_;
00226     if (this->unicast_socket_.set_option(SOL_SOCKET,
00227                                 SO_RCVBUF,
00228                                 (void *) &rcv_size,
00229                                 sizeof(int)) < 0
00230         && errno != ENOTSUP) {
00231       ACE_ERROR_RETURN((LM_ERROR,
00232                         ACE_TEXT("(%P|%t) ERROR: ")
00233                         ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00234                         rcv_size),
00235                        false);
00236     }
00237   }
00238 
00239   send_strategy()->send_buffer(&multi_buff_);
00240 
00241   if (start(send_strategy_,
00242             receive_strategy_) != 0) {
00243     stop_i();
00244     ACE_ERROR_RETURN((LM_ERROR,
00245                       ACE_TEXT("(%P|%t) ERROR: ")
00246                       ACE_TEXT("UdpDataLink::open: start failed!\n")),
00247                      false);
00248   }
00249 
00250   return true;
00251 }
00252 
00253 void
00254 RtpsUdpDataLink::add_locator(const RepoId& remote_id,
00255                              const ACE_INET_Addr& address,
00256                              bool requires_inline_qos)
00257 {
00258   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00259   locators_[remote_id] = RemoteInfo(address, requires_inline_qos);
00260 }
00261 
00262 void
00263 RtpsUdpDataLink::get_locators(const RepoId& local_id,
00264                               OPENDDS_SET(ACE_INET_Addr)& addrs) const
00265 {
00266   typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00267 
00268   if (local_id == GUID_UNKNOWN) {
00269     for (iter_t iter = locators_.begin(); iter != locators_.end(); ++iter) {
00270       addrs.insert(iter->second.addr_);
00271     }
00272     return;
00273   }
00274 
00275   const GUIDSeq_var peers = peer_ids(local_id);
00276   if (!peers.ptr()) {
00277     return;
00278   }
00279   for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00280     const ACE_INET_Addr addr = get_locator(peers[i]);
00281     if (addr != ACE_INET_Addr()) {
00282       addrs.insert(addr);
00283     }
00284   }
00285 }
00286 
00287 ACE_INET_Addr
00288 RtpsUdpDataLink::get_locator(const RepoId& remote_id) const
00289 {
00290   typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00291   const iter_t iter = locators_.find(remote_id);
00292   if (iter == locators_.end()) {
00293     const GuidConverter conv(remote_id);
00294     ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::get_locator_i() - "
00295       "no locator found for peer %C\n", OPENDDS_STRING(conv).c_str()));
00296     return ACE_INET_Addr();
00297   }
00298   return iter->second.addr_;
00299 }
00300 
00301 void
00302 RtpsUdpDataLink::associated(const RepoId& local_id, const RepoId& remote_id,
00303                             bool local_reliable, bool remote_reliable,
00304                             bool local_durable, bool remote_durable)
00305 {
00306   if (!local_reliable) {
00307     return;
00308   }
00309 
00310   bool enable_heartbeat = false;
00311 
00312   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00313   const GuidConverter conv(local_id);
00314   if (conv.isWriter() && remote_reliable) {
00315     // Insert count if not already there.
00316     heartbeat_counts_.insert(HeartBeatCountMapType::value_type(local_id, 0));
00317     RtpsWriter& w = writers_[local_id];
00318     w.remote_readers_[remote_id].durable_ = remote_durable;
00319     w.durable_ = local_durable;
00320     enable_heartbeat = true;
00321 
00322   } else if (conv.isReader()) {
00323     RtpsReaderMap::iterator rr = readers_.find(local_id);
00324     if (rr == readers_.end()) {
00325       rr = readers_.insert(RtpsReaderMap::value_type(local_id, RtpsReader()))
00326         .first;
00327       rr->second.durable_ = local_durable;
00328     }
00329     rr->second.remote_writers_[remote_id];
00330     reader_index_.insert(RtpsReaderIndex::value_type(remote_id, rr));
00331   }
00332 
00333   g.release();
00334   if (enable_heartbeat) {
00335     heartbeat_->schedule_enable();
00336   }
00337 }
00338 
00339 bool
00340 RtpsUdpDataLink::check_handshake_complete(const RepoId& local_id,
00341                                           const RepoId& remote_id)
00342 {
00343   const GuidConverter conv(local_id);
00344   if (conv.isWriter()) {
00345     RtpsWriterMap::iterator rw = writers_.find(local_id);
00346     if (rw == writers_.end()) {
00347       return true; // not reliable, no handshaking
00348     }
00349     ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote_id);
00350     if (ri == rw->second.remote_readers_.end()) {
00351       return true; // not reliable, no handshaking
00352     }
00353     return ri->second.handshake_done_;
00354 
00355   } else if (conv.isReader()) {
00356     return true; // no handshaking for local reader
00357   }
00358   return false;
00359 }
00360 
00361 void
00362 RtpsUdpDataLink::register_for_reader(const RepoId& writerid,
00363                                      const RepoId& readerid,
00364                                      const ACE_INET_Addr& address,
00365                                      OpenDDS::DCPS::DiscoveryListener* listener)
00366 {
00367   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00368   bool enableheartbeat = interesting_readers_.empty();
00369   interesting_readers_.insert(InterestingRemoteMapType::value_type(readerid, InterestingRemote(writerid, address, listener)));
00370   heartbeat_counts_[writerid] = 0;
00371   g.release();
00372   if (enableheartbeat) {
00373     heartbeat_->schedule_enable();
00374   }
00375 }
00376 
00377 void
00378 RtpsUdpDataLink::unregister_for_reader(const RepoId& writerid,
00379                                        const RepoId& readerid)
00380 {
00381   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00382   for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
00383          limit = interesting_readers_.upper_bound(readerid);
00384        pos != limit;
00385        ) {
00386     if (pos->second.localid == writerid) {
00387       interesting_readers_.erase(pos++);
00388     } else {
00389       ++pos;
00390     }
00391   }
00392 }
00393 
00394 void
00395 RtpsUdpDataLink::register_for_writer(const RepoId& readerid,
00396                                      const RepoId& writerid,
00397                                      const ACE_INET_Addr& address,
00398                                      OpenDDS::DCPS::DiscoveryListener* listener)
00399 {
00400   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00401   bool enableheartbeatchecker = interesting_writers_.empty();
00402   interesting_writers_.insert(InterestingRemoteMapType::value_type(writerid, InterestingRemote(readerid, address, listener)));
00403   g.release();
00404   if (enableheartbeatchecker) {
00405     heartbeatchecker_->schedule_enable();
00406   }
00407 }
00408 
00409 void
00410 RtpsUdpDataLink::unregister_for_writer(const RepoId& readerid,
00411                                        const RepoId& writerid)
00412 {
00413   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00414   for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
00415          limit = interesting_writers_.upper_bound(writerid);
00416        pos != limit;
00417        ) {
00418     if (pos->second.localid == readerid) {
00419       interesting_writers_.erase(pos++);
00420     } else {
00421       ++pos;
00422     }
00423   }
00424 }
00425 
00426 void
00427 RtpsUdpDataLink::pre_stop_i()
00428 {
00429   DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
00430   DataLink::pre_stop_i();
00431   OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00432   OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00433   {
00434     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00435 
00436     typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00437 
00438     RtpsWriterMap::iterator iter = writers_.begin();
00439     while (iter != writers_.end()) {
00440       RtpsWriter& writer = iter->second;
00441       if (!writer.to_deliver_.empty()) {
00442         iter_t iter = writer.to_deliver_.begin();
00443         while (iter != writer.to_deliver_.end()) {
00444           to_deliver.push_back(iter->second);
00445           writer.to_deliver_.erase(iter);
00446           iter = writer.to_deliver_.begin();
00447         }
00448       }
00449       if (!writer.elems_not_acked_.empty()) {
00450         OPENDDS_SET(SequenceNumber) sns_to_release;
00451         iter_t iter = writer.elems_not_acked_.begin();
00452         while (iter != writer.elems_not_acked_.end()) {
00453           to_drop.push_back(iter->second);
00454           sns_to_release.insert(iter->first);
00455           writer.elems_not_acked_.erase(iter);
00456           iter = writer.elems_not_acked_.begin();
00457         }
00458         OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00459         while (sns_it != sns_to_release.end()) {
00460           writer.send_buff_->release_acked(*sns_it);
00461           ++sns_it;
00462         }
00463       }
00464       RtpsWriterMap::iterator last = iter;
00465       ++iter;
00466       heartbeat_counts_.erase(last->first);
00467       writers_.erase(last);
00468     }
00469   }
00470   typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00471   tqe_iter deliver_it = to_deliver.begin();
00472   while (deliver_it != to_deliver.end()) {
00473     (*deliver_it)->data_delivered();
00474     ++deliver_it;
00475   }
00476   tqe_iter drop_it = to_drop.begin();
00477   while (drop_it != to_drop.end()) {
00478     (*drop_it)->data_dropped(true);
00479     ++drop_it;
00480   }
00481 }
00482 
00483 void
00484 RtpsUdpDataLink::send_i(TransportQueueElement* element, bool relink)
00485 {
00486   // Lock here to maintain the locking order:
00487   // RtpsUdpDataLink before RtpsUdpSendStrategy
00488   // which is required for resending due to nacks
00489   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00490   DataLink::send_i(element, relink);
00491 }
00492 
00493 RemoveResult
00494 RtpsUdpDataLink::remove_sample(const DataSampleElement* sample, void*)
00495 {
00496   // see comment in RtpsUdpDataLink::send_i() for lock order
00497   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, REMOVE_ERROR);
00498   return DataLink::remove_sample(sample, &g);
00499 }
00500 
00501 void
00502 RtpsUdpDataLink::release_reservations_i(const RepoId& remote_id,
00503                                         const RepoId& local_id)
00504 {
00505   OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00506   OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00507   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00508   using std::pair;
00509   const GuidConverter conv(local_id);
00510   if (conv.isWriter()) {
00511     const RtpsWriterMap::iterator rw = writers_.find(local_id);
00512 
00513     if (rw != writers_.end()) {
00514       rw->second.remote_readers_.erase(remote_id);
00515 
00516       if (rw->second.remote_readers_.empty()) {
00517         RtpsWriter& writer = rw->second;
00518         typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00519 
00520         if (!writer.to_deliver_.empty()) {
00521           iter_t iter = writer.to_deliver_.begin();
00522           while (iter != writer.to_deliver_.end()) {
00523             to_deliver.push_back(iter->second);
00524             writer.to_deliver_.erase(iter);
00525             iter = writer.to_deliver_.begin();
00526           }
00527         }
00528         if (!writer.elems_not_acked_.empty()) {
00529           OPENDDS_SET(SequenceNumber) sns_to_release;
00530           iter_t iter = writer.elems_not_acked_.begin();
00531           while (iter != writer.elems_not_acked_.end()) {
00532             to_drop.push_back(iter->second);
00533             sns_to_release.insert(iter->first);
00534             writer.elems_not_acked_.erase(iter);
00535             iter = writer.elems_not_acked_.begin();
00536           }
00537           OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00538           while (sns_it != sns_to_release.end()) {
00539             writer.send_buff_->release_acked(*sns_it);
00540             ++sns_it;
00541           }
00542         }
00543         heartbeat_counts_.erase(rw->first);
00544         writers_.erase(rw);
00545       } else {
00546         process_acked_by_all_i(g, local_id);
00547       }
00548     }
00549 
00550   } else if (conv.isReader()) {
00551     const RtpsReaderMap::iterator rr = readers_.find(local_id);
00552 
00553     if (rr != readers_.end()) {
00554       rr->second.remote_writers_.erase(remote_id);
00555 
00556       for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00557              reader_index_.equal_range(remote_id);
00558            iters.first != iters.second;) {
00559         if (iters.first->second == rr) {
00560           reader_index_.erase(iters.first++);
00561         } else {
00562           ++iters.first;
00563         }
00564       }
00565 
00566       if (rr->second.remote_writers_.empty()) {
00567         readers_.erase(rr);
00568       }
00569     }
00570   }
00571   g.release();
00572   typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00573   tqe_iter deliver_it = to_deliver.begin();
00574   while (deliver_it != to_deliver.end()) {
00575     (*deliver_it)->data_delivered();
00576     ++deliver_it;
00577   }
00578   tqe_iter drop_it = to_drop.begin();
00579   while (drop_it != to_drop.end()) {
00580     (*drop_it)->data_dropped(true);
00581     ++drop_it;
00582   }
00583 }
00584 
00585 void
00586 RtpsUdpDataLink::stop_i()
00587 {
00588   nack_reply_.cancel();
00589   heartbeat_reply_.cancel();
00590   heartbeat_->disable();
00591   unicast_socket_.close();
00592   multicast_socket_.close();
00593 }
00594 
00595 
00596 // Implementing MultiSendBuffer nested class
00597 
00598 void
00599 RtpsUdpDataLink::MultiSendBuffer::retain_all(RepoId pub_id)
00600 {
00601   ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00602   const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id);
00603   if (wi != outer_->writers_.end() && !wi->second.send_buff_.is_nil()) {
00604     wi->second.send_buff_->retain_all(pub_id);
00605   }
00606 }
00607 
00608 void
00609 RtpsUdpDataLink::MultiSendBuffer::insert(SequenceNumber /*transport_seq*/,
00610                                          TransportSendStrategy::QueueType* q,
00611                                          ACE_Message_Block* chain)
00612 {
00613   // Called from TransportSendStrategy::send_packet().
00614   // RtpsUdpDataLink is already locked.
00615   const TransportQueueElement* const tqe = q->peek();
00616   const SequenceNumber seq = tqe->sequence();
00617   if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00618     return;
00619   }
00620 
00621   const RepoId pub_id = tqe->publication_id();
00622 
00623   const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id);
00624   if (wi == outer_->writers_.end()) {
00625     return; // this datawriter is not reliable
00626   }
00627 
00628   RcHandle<SingleSendBuffer>& send_buff = wi->second.send_buff_;
00629 
00630   if (send_buff.is_nil()) {
00631     send_buff = make_rch<SingleSendBuffer>(SingleSendBuffer::UNLIMITED, 1 /*mspp*/);
00632 
00633     send_buff->bind(outer_->send_strategy());
00634   }
00635 
00636   if (Transport_debug_level > 5) {
00637     const GuidConverter pub(pub_id);
00638     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert() - "
00639       "pub_id %C seq %q frag %d\n", OPENDDS_STRING(pub).c_str(), seq.getValue(),
00640       (int)tqe->is_fragment()));
00641   }
00642 
00643   if (tqe->is_fragment()) {
00644     const RtpsCustomizedElement* const rce =
00645       dynamic_cast<const RtpsCustomizedElement*>(tqe);
00646     if (rce) {
00647       send_buff->insert_fragment(seq, rce->last_fragment(), q, chain);
00648     } else if (Transport_debug_level) {
00649       const GuidConverter pub(pub_id);
00650       ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert()"
00651         " - ERROR: couldn't get fragment number for pub_id %C seq %q\n",
00652         OPENDDS_STRING(pub).c_str(), seq.getValue()));
00653     }
00654   } else {
00655     send_buff->insert(seq, q, chain);
00656   }
00657 }
00658 
00659 
00660 // Support for the send() data handling path
00661 namespace {
00662   ACE_Message_Block* submsgs_to_msgblock(const RTPS::SubmessageSeq& subm)
00663   {
00664     size_t size = 0, padding = 0;
00665     for (CORBA::ULong i = 0; i < subm.length(); ++i) {
00666       if ((size + padding) % 4) {
00667         padding += 4 - ((size + padding) % 4);
00668       }
00669       gen_find_size(subm[i], size, padding);
00670     }
00671 
00672     ACE_Message_Block* hdr = new ACE_Message_Block(size + padding);
00673 
00674     for (CORBA::ULong i = 0; i < subm.length(); ++i) {
00675       // byte swapping is handled in the operator<<() implementation
00676       Serializer ser(hdr, false, Serializer::ALIGN_CDR);
00677       ser << subm[i];
00678       const size_t len = hdr->length();
00679       if (len % 4) {
00680         hdr->wr_ptr(4 - (len % 4));
00681       }
00682     }
00683     return hdr;
00684   }
00685 }
00686 
00687 TransportQueueElement*
00688 RtpsUdpDataLink::customize_queue_element(TransportQueueElement* element)
00689 {
00690   const ACE_Message_Block* msg = element->msg();
00691   if (!msg) {
00692     return element;
00693   }
00694 
00695   const RepoId pub_id = element->publication_id();
00696   GUIDSeq_var peers = peer_ids(pub_id);
00697   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, 0);
00698   bool requires_inline_qos = this->requires_inline_qos(peers);
00699 
00700   RTPS::SubmessageSeq subm;
00701 
00702   const RtpsWriterMap::iterator rw = writers_.find(pub_id);
00703 
00704   bool gap_ok = true;
00705   DestToEntityMap gap_receivers;
00706   if (rw != writers_.end() && !rw->second.remote_readers_.empty()) {
00707     for (ReaderInfoMap::iterator ri = rw->second.remote_readers_.begin();
00708          ri != rw->second.remote_readers_.end(); ++ri) {
00709       RepoId tmp;
00710       std::memcpy(tmp.guidPrefix, ri->first.guidPrefix, sizeof(GuidPrefix_t));
00711       tmp.entityId = ENTITYID_UNKNOWN;
00712       gap_receivers[tmp].push_back(ri->first);
00713 
00714       if (ri->second.expecting_durable_data()) {
00715         // Can't add an in-line GAP if some Data Reader is expecting durable
00716         // data, the GAP could cause that Data Reader to ignore the durable
00717         // data.  The other readers will eventually learn about the GAP by
00718         // sending an ACKNACK and getting a GAP reply.
00719         gap_ok = false;
00720         break;
00721       }
00722     }
00723   }
00724 
00725   if (gap_ok) {
00726     add_gap_submsg(subm, *element, gap_receivers);
00727   }
00728 
00729   const SequenceNumber seq = element->sequence();
00730   if (rw != writers_.end() && seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00731     rw->second.expected_ = seq;
00732     ++rw->second.expected_;
00733   }
00734 
00735   TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
00736   TransportCustomizedElement* tce =
00737     dynamic_cast<TransportCustomizedElement*>(element);
00738   TransportSendControlElement* tsce =
00739     dynamic_cast<TransportSendControlElement*>(element);
00740 
00741   Message_Block_Ptr data;
00742   bool durable = false;
00743 
00744   // Based on the type of 'element', find and duplicate the data payload
00745   // continuation block.
00746   if (tsce) {        // Control message
00747     if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
00748       data.reset(msg->cont()->duplicate());
00749       // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00750       RtpsSampleHeader::populate_data_control_submessages(
00751                 subm, *tsce, requires_inline_qos);
00752     } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
00753       end_historic_samples(rw, tsce->header(), msg->cont());
00754       element->data_delivered();
00755       return 0;
00756     } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
00757       send_heartbeats_manual(tsce);
00758       element->data_delivered();
00759       return 0;
00760     } else {
00761       element->data_dropped(true /*dropped_by_transport*/);
00762       return 0;
00763     }
00764 
00765   } else if (tse) {  // Basic data message
00766     // {DataSampleHeader} -> {Data Payload}
00767     data.reset(msg->cont()->duplicate());
00768     const DataSampleElement* dsle = tse->sample();
00769     // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00770     RtpsSampleHeader::populate_data_sample_submessages(
00771               subm, *dsle, requires_inline_qos);
00772     durable = dsle->get_header().historic_sample_;
00773 
00774   } else if (tce) {  // Customized data message
00775     // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
00776     data.reset(msg->cont()->cont()->duplicate());
00777     const DataSampleElement* dsle = tce->original_send_element()->sample();
00778     // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00779     RtpsSampleHeader::populate_data_sample_submessages(
00780               subm, *dsle, requires_inline_qos);
00781     durable = dsle->get_header().historic_sample_;
00782 
00783   } else {
00784     return element;
00785   }
00786 
00787 #if defined(OPENDDS_SECURITY)
00788   send_strategy()->encode_payload(pub_id, data, subm);
00789 #endif
00790 
00791   Message_Block_Ptr hdr(submsgs_to_msgblock(subm));
00792   hdr->cont(data.release());
00793   RtpsCustomizedElement* rtps =
00794     new RtpsCustomizedElement(element, move(hdr));
00795 
00796   // Handle durability resends
00797   if (durable && rw != writers_.end()) {
00798     const RepoId sub = element->subscription_id();
00799     if (sub != GUID_UNKNOWN) {
00800       ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(sub);
00801       if (ri != rw->second.remote_readers_.end()) {
00802         ri->second.durable_data_[rtps->sequence()] = rtps;
00803         ri->second.durable_timestamp_ = ACE_OS::gettimeofday();
00804         if (Transport_debug_level > 3) {
00805           const GuidConverter conv(pub_id), sub_conv(sub);
00806           ACE_DEBUG((LM_DEBUG,
00807             "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00808             "storing durable data for local %C remote %C\n",
00809             OPENDDS_STRING(conv).c_str(), OPENDDS_STRING(sub_conv).c_str()));
00810         }
00811         return 0;
00812       }
00813     }
00814   } else if (durable && (Transport_debug_level)) {
00815     const GuidConverter conv(pub_id);
00816     ACE_ERROR((LM_ERROR,
00817       "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00818       "WARNING: no RtpsWriter to store durable data for local %C\n",
00819       OPENDDS_STRING(conv).c_str()));
00820   }
00821 
00822   return rtps;
00823 }
00824 
00825 void
00826 RtpsUdpDataLink::end_historic_samples(RtpsWriterMap::iterator writer,
00827                                       const DataSampleHeader& header,
00828                                       ACE_Message_Block* body)
00829 {
00830   // Set the ReaderInfo::durable_timestamp_ for the case where no
00831   // durable samples exist in the DataWriter.
00832   if (writer != writers_.end() && writer->second.durable_) {
00833     const ACE_Time_Value now = ACE_OS::gettimeofday();
00834     RepoId sub = GUID_UNKNOWN;
00835     if (body && header.message_length_ >= sizeof(sub)) {
00836       std::memcpy(&sub, body->rd_ptr(), header.message_length_);
00837     }
00838     typedef ReaderInfoMap::iterator iter_t;
00839     if (sub == GUID_UNKNOWN) {
00840       if (Transport_debug_level > 3) {
00841         const GuidConverter conv(writer->first);
00842         ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
00843                    "local %C all readers\n", OPENDDS_STRING(conv).c_str()));
00844       }
00845       for (iter_t iter = writer->second.remote_readers_.begin();
00846            iter != writer->second.remote_readers_.end(); ++iter) {
00847         if (iter->second.durable_) {
00848           iter->second.durable_timestamp_ = now;
00849         }
00850       }
00851     } else {
00852       iter_t iter = writer->second.remote_readers_.find(sub);
00853       if (iter != writer->second.remote_readers_.end()) {
00854         if (iter->second.durable_) {
00855           iter->second.durable_timestamp_ = now;
00856           if (Transport_debug_level > 3) {
00857             const GuidConverter conv(writer->first), sub_conv(sub);
00858             ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
00859                        " local %C remote %C\n", OPENDDS_STRING(conv).c_str(),
00860                        OPENDDS_STRING(sub_conv).c_str()));
00861           }
00862         }
00863       }
00864     }
00865   }
00866 }
00867 
00868 bool RtpsUdpDataLink::requires_inline_qos(const GUIDSeq_var& peers)
00869 {
00870   if (force_inline_qos_) {
00871     // Force true for testing purposes
00872     return true;
00873   } else {
00874     if (!peers.ptr()) {
00875       return false;
00876     }
00877     typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::iterator iter_t;
00878     for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00879       const iter_t iter = locators_.find(peers[i]);
00880       if (iter != locators_.end() && iter->second.requires_inline_qos_) {
00881         return true;
00882       }
00883     }
00884     return false;
00885   }
00886 }
00887 
00888 bool RtpsUdpDataLink::force_inline_qos_ = false;
00889 
00890 void
00891 RtpsUdpDataLink::add_gap_submsg(RTPS::SubmessageSeq& msg,
00892                                 const TransportQueueElement& tqe,
00893                                 const DestToEntityMap& dtem)
00894 {
00895   // These are the GAP submessages that we'll send directly in-line with the
00896   // DATA when we notice that the DataWriter has deliberately skipped seq #s.
00897   // There are other GAP submessages generated in response to reader ACKNACKS,
00898   // see send_nack_replies().
00899   using namespace OpenDDS::RTPS;
00900 
00901   const SequenceNumber seq = tqe.sequence();
00902   const RepoId pub = tqe.publication_id();
00903   if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || pub == GUID_UNKNOWN
00904       || tqe.subscription_id() != GUID_UNKNOWN) {
00905     return;
00906   }
00907 
00908   const RtpsWriterMap::iterator wi = writers_.find(pub);
00909   if (wi == writers_.end()) {
00910     return; // not a reliable writer, does not send GAPs
00911   }
00912 
00913   RtpsWriter& rw = wi->second;
00914 
00915   if (seq != rw.expected_) {
00916     SequenceNumber firstMissing = rw.expected_;
00917 
00918     // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
00919     // [gapStart, gapListBase) and those in the SNSet.
00920     const SequenceNumber_t gapStart = {firstMissing.getHigh(),
00921                                        firstMissing.getLow()},
00922                            gapListBase = {seq.getHigh(),
00923                                           seq.getLow()};
00924 
00925     // We are not going to enable any bits in the "bitmap" of the SNSet,
00926     // but the "numBits" and the bitmap.length must both be > 0.
00927     LongSeq8 bitmap;
00928     bitmap.length(1);
00929     bitmap[0] = 0;
00930 
00931     GapSubmessage gap = {
00932       {GAP, FLAG_E, 0 /*length determined below*/},
00933       ENTITYID_UNKNOWN, // readerId: applies to all matched readers
00934       pub.entityId,
00935       gapStart,
00936       {gapListBase, 1, bitmap}
00937     };
00938 
00939     size_t size = 0, padding = 0;
00940     gen_find_size(gap, size, padding);
00941     gap.smHeader.submessageLength =
00942       static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
00943 
00944     if (!rw.durable_) {
00945       const CORBA::ULong i = msg.length();
00946       msg.length(i + 1);
00947       msg[i].gap_sm(gap);
00948     } else {
00949       InfoDestinationSubmessage idst = {
00950         {INFO_DST, FLAG_E, INFO_DST_SZ},
00951         {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
00952       };
00953       CORBA::ULong ml = msg.length();
00954 
00955       //Change the non-directed Gap into multiple directed gaps to prevent
00956       //delivering to currently undiscovered durable readers
00957       DestToEntityMap::const_iterator iter = dtem.begin();
00958       while (iter != dtem.end()) {
00959         std::memcpy(idst.guidPrefix, iter->first.guidPrefix, sizeof(GuidPrefix_t));
00960         msg.length(ml + 1);
00961         msg[ml++].info_dst_sm(idst);
00962 
00963         const OPENDDS_VECTOR(RepoId)& readers = iter->second;
00964         for (size_t i = 0; i < readers.size(); ++i) {
00965           gap.readerId = readers.at(i).entityId;
00966           msg.length(ml + 1);
00967           msg[ml++].gap_sm(gap);
00968         } //END iter over reader entity ids
00969         ++iter;
00970       } //END iter over reader GuidPrefix_t's
00971     }
00972   }
00973 }
00974 
00975 
00976 // DataReader's side of Reliability
00977 
00978 void
00979 RtpsUdpDataLink::received(const RTPS::DataSubmessage& data,
00980                           const GuidPrefix_t& src_prefix)
00981 {
00982   datareader_dispatch(data, src_prefix, &RtpsUdpDataLink::process_data_i);
00983 }
00984 
00985 bool
00986 RtpsUdpDataLink::process_data_i(const RTPS::DataSubmessage& data,
00987                                 const RepoId& src,
00988                                 RtpsReaderMap::value_type& rr)
00989 {
00990   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
00991   if (wi != rr.second.remote_writers_.end()) {
00992     WriterInfo& info = wi->second;
00993     SequenceNumber seq;
00994     seq.setValue(data.writerSN.high, data.writerSN.low);
00995     info.frags_.erase(seq);
00996     const RepoId& readerId = rr.first;
00997     if (info.recvd_.contains(seq)) {
00998       if (Transport_debug_level > 5) {
00999         GuidConverter writer(src);
01000         GuidConverter reader(readerId);
01001         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01002                              ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because ALREADY received\n"),
01003                              seq.getValue(),
01004                              OPENDDS_STRING(writer).c_str(),
01005                              OPENDDS_STRING(reader).c_str()));
01006       }
01007       receive_strategy()->withhold_data_from(readerId);
01008     } else if (info.recvd_.disjoint() ||
01009         (!info.recvd_.empty() && info.recvd_.cumulative_ack() != seq.previous())
01010         || (rr.second.durable_ && !info.recvd_.empty() && info.recvd_.low() > 1)
01011         || (rr.second.durable_ && info.recvd_.empty() && seq > 1)) {
01012       if (Transport_debug_level > 5) {
01013         GuidConverter writer(src);
01014         GuidConverter reader(readerId);
01015         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01016                              ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because can't receive yet\n"),
01017                              seq.getValue(),
01018                              OPENDDS_STRING(writer).c_str(),
01019                              OPENDDS_STRING(reader).c_str()));
01020       }
01021       const ReceivedDataSample* sample =
01022         receive_strategy()->withhold_data_from(readerId);
01023       info.held_.insert(std::make_pair(seq, *sample));
01024     } else {
01025       if (Transport_debug_level > 5) {
01026         GuidConverter writer(src);
01027         GuidConverter reader(readerId);
01028         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01029                              ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
01030                              seq.getValue(),
01031                              OPENDDS_STRING(writer).c_str(),
01032                              OPENDDS_STRING(reader).c_str()));
01033       }
01034       receive_strategy()->do_not_withhold_data_from(readerId);
01035     }
01036     info.recvd_.insert(seq);
01037     deliver_held_data(readerId, info, rr.second.durable_);
01038   } else {
01039     if (Transport_debug_level > 5) {
01040       GuidConverter writer(src);
01041       GuidConverter reader(rr.first);
01042       SequenceNumber seq;
01043       seq.setValue(data.writerSN.high, data.writerSN.low);
01044       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01045                            ACE_TEXT(" data seq: %q from %C to %C OK to deliver (Writer not currently in Reader remote writer map)\n"),
01046                            seq.getValue(),
01047                            OPENDDS_STRING(writer).c_str(),
01048                            OPENDDS_STRING(reader).c_str()));
01049     }
01050     receive_strategy()->do_not_withhold_data_from(rr.first);
01051   }
01052   return false;
01053 }
01054 
01055 void
01056 RtpsUdpDataLink::deliver_held_data(const RepoId& readerId, WriterInfo& info,
01057                                    bool durable)
01058 {
01059   if (durable && (info.recvd_.empty() || info.recvd_.low() > 1)) return;
01060   held_data_delivery_handler_.notify_delivery(readerId, info);
01061 }
01062 
01063 void
01064 RtpsUdpDataLink::received(const RTPS::GapSubmessage& gap,
01065                           const GuidPrefix_t& src_prefix)
01066 {
01067   datareader_dispatch(gap, src_prefix, &RtpsUdpDataLink::process_gap_i);
01068 }
01069 
01070 bool
01071 RtpsUdpDataLink::process_gap_i(const RTPS::GapSubmessage& gap,
01072                                const RepoId& src, RtpsReaderMap::value_type& rr)
01073 {
01074   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01075   if (wi != rr.second.remote_writers_.end()) {
01076     SequenceRange sr;
01077     sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
01078     SequenceNumber base;
01079     base.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
01080     SequenceNumber first_received = SequenceNumber::MAX_VALUE;
01081     if (!wi->second.recvd_.empty()) {
01082       OPENDDS_VECTOR(SequenceRange) missing = wi->second.recvd_.missing_sequence_ranges();
01083       if (!missing.empty()) {
01084         first_received = missing.front().second;
01085       }
01086     }
01087     sr.second = std::min(first_received, base.previous());
01088     if (sr.first <= sr.second) {
01089       if (Transport_debug_level > 5) {
01090         const GuidConverter conv(src);
01091         const GuidConverter rdr(rr.first);
01092         ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01093                   "Reader %C received GAP with range [%q, %q] (inserting range [%q, %q]) from %C\n",
01094                   OPENDDS_STRING(rdr).c_str(),
01095                   sr.first.getValue(), base.previous().getValue(),
01096                   sr.first.getValue(), sr.second.getValue(),
01097                   OPENDDS_STRING(conv).c_str()));
01098       }
01099       wi->second.recvd_.insert(sr);
01100     } else {
01101       const GuidConverter conv(src);
01102       VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01103                 "received GAP with invalid range [%q, %q] from %C\n",
01104                 sr.first.getValue(), sr.second.getValue(),
01105                 OPENDDS_STRING(conv).c_str()), 2);
01106     }
01107     wi->second.recvd_.insert(base, gap.gapList.numBits,
01108                              gap.gapList.bitmap.get_buffer());
01109     deliver_held_data(rr.first, wi->second, rr.second.durable_);
01110     //FUTURE: to support wait_for_acks(), notify DCPS layer of the GAP
01111   }
01112   return false;
01113 }
01114 
01115 void
01116 RtpsUdpDataLink::received(const RTPS::HeartBeatSubmessage& heartbeat,
01117                           const GuidPrefix_t& src_prefix)
01118 {
01119   RepoId src;
01120   std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01121   src.entityId = heartbeat.writerId;
01122 
01123   bool schedule_acknack = false;
01124   const ACE_Time_Value now = ACE_OS::gettimeofday();
01125   OPENDDS_VECTOR(InterestingRemote) callbacks;
01126 
01127   {
01128     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01129 
01130     // We received a heartbeat from a writer.
01131     // We should ACKNACK if the writer is interesting and there is no association.
01132 
01133     for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
01134            limit = interesting_writers_.upper_bound(src);
01135          pos != limit;
01136          ++pos) {
01137       const RepoId& writerid = src;
01138       const RepoId& readerid = pos->second.localid;
01139 
01140       RtpsReaderMap::const_iterator riter = readers_.find(readerid);
01141       if (riter == readers_.end()) {
01142         // Reader has no associations.
01143         interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01144       } else if (riter->second.remote_writers_.find(writerid) == riter->second.remote_writers_.end()) {
01145         // Reader is not associated with this writer.
01146         interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01147       }
01148       pos->second.last_activity = now;
01149       if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01150         callbacks.push_back(pos->second);
01151         pos->second.status = InterestingRemote::EXISTS;
01152       }
01153     }
01154 
01155     schedule_acknack = !interesting_ack_nacks_.empty();
01156   }
01157 
01158   for (size_t i = 0; i < callbacks.size(); ++i) {
01159     callbacks[i].listener->writer_exists(src, callbacks[i].localid);
01160   }
01161 
01162   if (schedule_acknack) {
01163     heartbeat_reply_.schedule();
01164   }
01165 
01166   datareader_dispatch(heartbeat, src_prefix,
01167                       &RtpsUdpDataLink::process_heartbeat_i);
01168 }
01169 
01170 bool
01171 RtpsUdpDataLink::process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
01172                                      const RepoId& src,
01173                                      RtpsReaderMap::value_type& rr)
01174 {
01175   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01176   if (wi == rr.second.remote_writers_.end()) {
01177     // we may not be associated yet, even if the writer thinks we are
01178     return false;
01179   }
01180 
01181   WriterInfo& info = wi->second;
01182 
01183   if (heartbeat.count.value <= info.heartbeat_recvd_count_) {
01184     return false;
01185   }
01186   info.heartbeat_recvd_count_ = heartbeat.count.value;
01187 
01188   SequenceNumber& first = info.hb_range_.first;
01189   first.setValue(heartbeat.firstSN.high, heartbeat.firstSN.low);
01190   SequenceNumber& last = info.hb_range_.second;
01191   last.setValue(heartbeat.lastSN.high, heartbeat.lastSN.low);
01192   static const SequenceNumber starting, zero = SequenceNumber::ZERO();
01193 
01194   DisjointSequence& recvd = info.recvd_;
01195   if (!rr.second.durable_ && info.initial_hb_) {
01196     if (last.getValue() < starting.getValue()) {
01197       // this is an invalid heartbeat -- last must be positive
01198       return false;
01199     }
01200     // For the non-durable reader, the first received HB or DATA establishes
01201     // a baseline of the lowest sequence number we'd ever need to NACK.
01202     if (recvd.empty() || recvd.low() >= last) {
01203       recvd.insert(SequenceRange(zero, last));
01204     } else {
01205       recvd.insert(SequenceRange(zero, recvd.low()));
01206     }
01207   } else if (!recvd.empty()) {
01208     // All sequence numbers below 'first' should not be NACKed.
01209     // The value of 'first' may not decrease with subsequent HBs.
01210     recvd.insert(SequenceRange(zero,
01211                                (first > starting) ? first.previous() : zero));
01212   }
01213 
01214   deliver_held_data(rr.first, info, rr.second.durable_);
01215 
01216   //FUTURE: to support wait_for_acks(), notify DCPS layer of the sequence
01217   //        numbers we no longer expect to receive due to HEARTBEAT
01218 
01219   info.initial_hb_ = false;
01220 
01221   const bool final = heartbeat.smHeader.flags & RTPS::FLAG_F,
01222     liveliness = heartbeat.smHeader.flags & RTPS::FLAG_L;
01223 
01224   if (!final || (!liveliness && (info.should_nack() ||
01225       rr.second.nack_durable(info) ||
01226       receive_strategy()->has_fragments(info.hb_range_, wi->first)))) {
01227     info.ack_pending_ = true;
01228     return true; // timer will invoke send_heartbeat_replies()
01229   }
01230 
01231   //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC
01232   return false;
01233 }
01234 
01235 bool
01236 RtpsUdpDataLink::WriterInfo::should_nack() const
01237 {
01238   if (recvd_.disjoint() && recvd_.cumulative_ack() < hb_range_.second) {
01239     return true;
01240   }
01241   if (!recvd_.empty()) {
01242     return recvd_.high() < hb_range_.second;
01243   }
01244   return false;
01245 }
01246 
01247 bool
01248 RtpsUdpDataLink::RtpsReader::nack_durable(const WriterInfo& info)
01249 {
01250   return durable_ && (info.recvd_.empty() ||
01251                       info.recvd_.low() > info.hb_range_.first);
01252 }
01253 
01254 void
01255 RtpsUdpDataLink::send_ack_nacks(RtpsReaderMap::iterator rr, bool finalFlag)
01256 {
01257   using namespace OpenDDS::RTPS;
01258 
01259   WriterInfoMap& writers = rr->second.remote_writers_;
01260   for (WriterInfoMap::iterator wi = writers.begin(); wi != writers.end();
01261        ++wi) {
01262 
01263     // if we have some negative acknowledgments, we'll ask for a reply
01264     DisjointSequence& recvd = wi->second.recvd_;
01265     const bool nack = wi->second.should_nack() ||
01266       rr->second.nack_durable(wi->second);
01267     bool final = finalFlag || !nack;
01268 
01269     if (wi->second.ack_pending_ || nack || finalFlag) {
01270       const bool prev_ack_pending = wi->second.ack_pending_;
01271       wi->second.ack_pending_ = false;
01272 
01273       SequenceNumber ack;
01274       CORBA::ULong num_bits = 1;
01275       LongSeq8 bitmap;
01276       bitmap.length(1);
01277       bitmap[0] = 0;
01278 
01279       const SequenceNumber& hb_low = wi->second.hb_range_.first;
01280       const SequenceNumber& hb_high = wi->second.hb_range_.second;
01281       const SequenceNumber::Value hb_low_val = hb_low.getValue(),
01282         hb_high_val = hb_high.getValue();
01283 
01284       if (recvd.empty()) {
01285         // Nack the entire heartbeat range.  Only reached when durable.
01286         ack = hb_low;
01287         bitmap.length(bitmap_num_longs(ack, hb_high));
01288         const CORBA::ULong idx = (hb_high_val > hb_low_val + 255)
01289           ? 255
01290           : CORBA::ULong(hb_high_val - hb_low_val);
01291         DisjointSequence::fill_bitmap_range(0, idx,
01292                                             bitmap.get_buffer(),
01293                                             bitmap.length(), num_bits);
01294       } else if (((prev_ack_pending && !nack) || rr->second.nack_durable(wi->second)) && recvd.low() > hb_low) {
01295         // Nack the range between the heartbeat low and the recvd low.
01296         ack = hb_low;
01297         const SequenceNumber& rec_low = recvd.low();
01298         const SequenceNumber::Value rec_low_val = rec_low.getValue();
01299         bitmap.length(bitmap_num_longs(ack, rec_low));
01300         const CORBA::ULong idx = (rec_low_val > hb_low_val + 255)
01301           ? 255
01302           : CORBA::ULong(rec_low_val - hb_low_val);
01303         DisjointSequence::fill_bitmap_range(0, idx,
01304                                             bitmap.get_buffer(),
01305                                             bitmap.length(), num_bits);
01306 
01307       } else {
01308         ack = ++SequenceNumber(recvd.cumulative_ack());
01309         if (recvd.low().getValue() > 1) {
01310           // since the "ack" really is cumulative, we need to make
01311           // sure that a lower discontinuity is not possible later
01312           recvd.insert(SequenceRange(SequenceNumber::ZERO(), recvd.low()));
01313         }
01314 
01315         if (recvd.disjoint()) {
01316           bitmap.length(bitmap_num_longs(ack, recvd.last_ack().previous()));
01317           recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
01318                           num_bits, true);
01319         }
01320       }
01321 
01322       const SequenceNumber::Value ack_val = ack.getValue();
01323 
01324       if (!recvd.empty() && hb_high > recvd.high()) {
01325         const SequenceNumber eff_high =
01326           (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
01327         const SequenceNumber::Value eff_high_val = eff_high.getValue();
01328         // Nack the range between the received high and the effective high.
01329         const CORBA::ULong old_len = bitmap.length(),
01330           new_len = bitmap_num_longs(ack, eff_high);
01331         if (new_len > old_len) {
01332           bitmap.length(new_len);
01333           for (CORBA::ULong i = old_len; i < new_len; ++i) {
01334             bitmap[i] = 0;
01335           }
01336         }
01337         const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
01338           idx_recv_high = recvd.disjoint() ?
01339           CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
01340         DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
01341                                             bitmap.get_buffer(), new_len,
01342                                             num_bits);
01343       }
01344 
01345       // If the receive strategy is holding any fragments, those should
01346       // not be "nacked" in the ACKNACK reply.  They will be accounted for
01347       // in the NACK_FRAG(s) instead.
01348       bool frags_modified =
01349         receive_strategy()->remove_frags_from_bitmap(bitmap.get_buffer(),
01350                                                  num_bits, ack, wi->first);
01351       if (frags_modified && !final) { // change to final if bitmap is empty
01352         final = true;
01353         for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
01354           if ((i + 1) * 32 <= num_bits) {
01355             if (bitmap[i]) {
01356               final = false;
01357               break;
01358             }
01359           } else {
01360             if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
01361               final = false;
01362               break;
01363             }
01364           }
01365         }
01366       }
01367 
01368       AckNackSubmessage acknack = {
01369         {ACKNACK,
01370          CORBA::Octet(FLAG_E | (final ? FLAG_F : 0)),
01371          0 /*length*/},
01372         rr->first.entityId,
01373         wi->first.entityId,
01374         { // SequenceNumberSet: acking bitmapBase - 1
01375           {ack.getHigh(), ack.getLow()},
01376           num_bits, bitmap
01377         },
01378         {++wi->second.acknack_count_}
01379       };
01380 
01381       size_t size = 0, padding = 0;
01382       gen_find_size(acknack, size, padding);
01383       acknack.smHeader.submessageLength =
01384         static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01385       InfoDestinationSubmessage info_dst = {
01386         {INFO_DST, FLAG_E, INFO_DST_SZ},
01387         {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01388       };
01389       gen_find_size(info_dst, size, padding);
01390 
01391       OPENDDS_VECTOR(NackFragSubmessage) nack_frags;
01392       size += generate_nack_frags(nack_frags, wi->second, wi->first);
01393 
01394       ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators?
01395       // byte swapping is handled in the operator<<() implementation
01396       Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01397       std::memcpy(info_dst.guidPrefix, wi->first.guidPrefix,
01398                   sizeof(GuidPrefix_t));
01399       ser << info_dst;
01400       // Interoperability note: we used to insert INFO_REPLY submessage here, but
01401       // testing indicated that other DDS implementations didn't accept it.
01402       ser << acknack;
01403       for (size_t i = 0; i < nack_frags.size(); ++i) {
01404         nack_frags[i].readerId = rr->first.entityId;
01405         nack_frags[i].writerId = wi->first.entityId;
01406         ser << nack_frags[i]; // always 4-byte aligned
01407       }
01408 
01409       if (!locators_.count(wi->first)) {
01410         if (Transport_debug_level) {
01411           const GuidConverter conv(wi->first);
01412           ACE_ERROR((LM_ERROR,
01413                      "(%P|%t) RtpsUdpDataLink::send_heartbeat_replies() - "
01414                      "no locator for remote %C\n", OPENDDS_STRING(conv).c_str()));
01415         }
01416       } else {
01417         send_strategy()->send_rtps_control(mb_acknack,
01418                                           locators_[wi->first].addr_);
01419       }
01420     }
01421   }
01422 }
01423 
01424 void
01425 RtpsUdpDataLink::send_heartbeat_replies() // from DR to DW
01426 {
01427   using namespace OpenDDS::RTPS;
01428   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01429 
01430   for (InterestingAckNackSetType::const_iterator pos = interesting_ack_nacks_.begin(),
01431          limit = interesting_ack_nacks_.end();
01432        pos != limit;
01433        ++pos) {
01434 
01435     SequenceNumber ack;
01436     LongSeq8 bitmap;
01437     bitmap.length(0);
01438 
01439     AckNackSubmessage acknack = {
01440       {ACKNACK,
01441        CORBA::Octet(FLAG_E | FLAG_F),
01442        0 /*length*/},
01443       pos->readerid.entityId,
01444       pos->writerid.entityId,
01445       { // SequenceNumberSet: acking bitmapBase - 1
01446         {ack.getHigh(), ack.getLow()},
01447         0 /* num_bits */, bitmap
01448       },
01449       {0 /* acknack count */}
01450     };
01451 
01452     size_t size = 0, padding = 0;
01453     gen_find_size(acknack, size, padding);
01454     acknack.smHeader.submessageLength =
01455       static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01456     InfoDestinationSubmessage info_dst = {
01457       {INFO_DST, FLAG_E, INFO_DST_SZ},
01458       {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01459     };
01460     gen_find_size(info_dst, size, padding);
01461 
01462     ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators?
01463     // byte swapping is handled in the operator<<() implementation
01464     Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01465     std::memcpy(info_dst.guidPrefix, pos->writerid.guidPrefix,
01466                 sizeof(GuidPrefix_t));
01467     ser << info_dst;
01468     // Interoperability note: we used to insert INFO_REPLY submessage here, but
01469     // testing indicated that other DDS implementations didn't accept it.
01470     ser << acknack;
01471 
01472     send_strategy()->send_rtps_control(mb_acknack, pos->writer_address);
01473   }
01474   interesting_ack_nacks_.clear();
01475 
01476   for (RtpsReaderMap::iterator rr = readers_.begin(); rr != readers_.end();
01477        ++rr) {
01478     send_ack_nacks (rr);
01479   }
01480 }
01481 
01482 size_t
01483 RtpsUdpDataLink::generate_nack_frags(OPENDDS_VECTOR(RTPS::NackFragSubmessage)& nf,
01484                                      WriterInfo& wi, const RepoId& pub_id)
01485 {
01486   typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
01487   typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
01488   RtpsUdpReceiveStrategy::FragmentInfo frag_info;
01489 
01490   // Populate frag_info with two possible sources of NackFrags:
01491   // 1. sequence #s in the reception gaps that we have partially received
01492   OPENDDS_VECTOR(SequenceRange) missing = wi.recvd_.missing_sequence_ranges();
01493   for (size_t i = 0; i < missing.size(); ++i) {
01494     receive_strategy()->has_fragments(missing[i], pub_id, &frag_info);
01495   }
01496   // 1b. larger than the last received seq# but less than the heartbeat.lastSN
01497   if (!wi.recvd_.empty()) {
01498     const SequenceRange range(wi.recvd_.high(), wi.hb_range_.second);
01499     receive_strategy()->has_fragments(range, pub_id, &frag_info);
01500   }
01501   for (size_t i = 0; i < frag_info.size(); ++i) {
01502     // If we've received a HeartbeatFrag, we know the last (available) frag #
01503     const iter_t heartbeat_frag = wi.frags_.find(frag_info[i].first);
01504     if (heartbeat_frag != wi.frags_.end()) {
01505       extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value);
01506     }
01507   }
01508 
01509   // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag
01510   const iter_t low = wi.frags_.lower_bound(wi.recvd_.cumulative_ack()),
01511               high = wi.frags_.upper_bound(wi.recvd_.last_ack()),
01512                end = wi.frags_.end();
01513   for (iter_t iter = wi.frags_.begin(); iter != end; ++iter) {
01514     if (iter == low) {
01515       // skip over the range covered by step #1 above
01516       if (high == end) {
01517         break;
01518       }
01519       iter = high;
01520     }
01521 
01522     const SequenceRange range(iter->first, iter->first);
01523     if (receive_strategy()->has_fragments(range, pub_id, &frag_info)) {
01524       extend_bitmap_range(frag_info.back().second, iter->second.value);
01525     } else {
01526       // it was not in the recv strategy, so the entire range is "missing"
01527       frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
01528       RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
01529       fnSet.bitmapBase.value = 1;
01530       fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
01531       fnSet.bitmap.length((fnSet.numBits + 31) / 32);
01532       for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
01533         fnSet.bitmap[i] = 0xFFFFFFFF;
01534       }
01535     }
01536   }
01537 
01538   if (frag_info.empty()) {
01539     return 0;
01540   }
01541 
01542   const RTPS::NackFragSubmessage nackfrag_prototype = {
01543     {RTPS::NACK_FRAG, RTPS::FLAG_E, 0 /* length set below */},
01544     ENTITYID_UNKNOWN, // readerId will be filled-in by send_heartbeat_replies()
01545     ENTITYID_UNKNOWN, // writerId will be filled-in by send_heartbeat_replies()
01546     {0, 0}, // writerSN set below
01547     RTPS::FragmentNumberSet(), // fragmentNumberState set below
01548     {0} // count set below
01549   };
01550 
01551   size_t size = 0, padding = 0;
01552   for (size_t i = 0; i < frag_info.size(); ++i) {
01553     nf.push_back(nackfrag_prototype);
01554     RTPS::NackFragSubmessage& nackfrag = nf.back();
01555     nackfrag.writerSN.low = frag_info[i].first.getLow();
01556     nackfrag.writerSN.high = frag_info[i].first.getHigh();
01557     nackfrag.fragmentNumberState = frag_info[i].second;
01558     nackfrag.count.value = ++wi.nackfrag_count_;
01559     const size_t before_size = size;
01560     gen_find_size(nackfrag, size, padding);
01561     nackfrag.smHeader.submessageLength =
01562       static_cast<CORBA::UShort>(size - before_size) - RTPS::SMHDR_SZ;
01563   }
01564   return size;
01565 }
01566 
01567 void
01568 RtpsUdpDataLink::extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
01569                                      CORBA::ULong extent)
01570 {
01571   if (extent < fnSet.bitmapBase.value) {
01572     return; // can't extend to some number under the base
01573   }
01574   // calculate the index to the extent to determine the new_num_bits
01575   const CORBA::ULong new_num_bits = std::min(CORBA::ULong(255),
01576                                              extent - fnSet.bitmapBase.value + 1),
01577                      len = (new_num_bits + 31) / 32;
01578   if (new_num_bits < fnSet.numBits) {
01579     return; // bitmap already extends past "extent"
01580   }
01581   fnSet.bitmap.length(len);
01582   // We are missing from one past old bitmap end to the new end
01583   DisjointSequence::fill_bitmap_range(fnSet.numBits + 1, new_num_bits,
01584                                       fnSet.bitmap.get_buffer(), len,
01585                                       fnSet.numBits);
01586 }
01587 
01588 void
01589 RtpsUdpDataLink::received(const RTPS::HeartBeatFragSubmessage& hb_frag,
01590                           const GuidPrefix_t& src_prefix)
01591 {
01592   datareader_dispatch(hb_frag, src_prefix, &RtpsUdpDataLink::process_hb_frag_i);
01593 }
01594 
01595 bool
01596 RtpsUdpDataLink::process_hb_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
01597                                    const RepoId& src,
01598                                    RtpsReaderMap::value_type& rr)
01599 {
01600   WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01601   if (wi == rr.second.remote_writers_.end()) {
01602     // we may not be associated yet, even if the writer thinks we are
01603     return false;
01604   }
01605 
01606   if (hb_frag.count.value <= wi->second.hb_frag_recvd_count_) {
01607     return false;
01608   }
01609 
01610   wi->second.hb_frag_recvd_count_ = hb_frag.count.value;
01611 
01612   SequenceNumber seq;
01613   seq.setValue(hb_frag.writerSN.high, hb_frag.writerSN.low);
01614 
01615   // If seq is outside the heartbeat range or we haven't completely received
01616   // it yet, send a NackFrag along with the AckNack.  The heartbeat range needs
01617   // to be checked first because recvd_ contains the numbers below the
01618   // heartbeat range (so that we don't NACK those).
01619   if (seq < wi->second.hb_range_.first || seq > wi->second.hb_range_.second
01620       || !wi->second.recvd_.contains(seq)) {
01621     wi->second.frags_[seq] = hb_frag.lastFragmentNum;
01622     wi->second.ack_pending_ = true;
01623     return true; // timer will invoke send_heartbeat_replies()
01624   }
01625   return false;
01626 }
01627 
01628 
01629 // DataWriter's side of Reliability
01630 
01631 void
01632 RtpsUdpDataLink::received(const RTPS::AckNackSubmessage& acknack,
01633                           const GuidPrefix_t& src_prefix)
01634 {
01635   // local side is DW
01636   RepoId local;
01637   std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01638   local.entityId = acknack.writerId; // can't be ENTITYID_UNKNOWN
01639 
01640   RepoId remote;
01641   std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01642   remote.entityId = acknack.readerId;
01643 
01644   const ACE_Time_Value now = ACE_OS::gettimeofday();
01645   OPENDDS_VECTOR(DiscoveryListener*) callbacks;
01646 
01647   {
01648     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01649     for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
01650            limit = interesting_readers_.upper_bound(remote);
01651          pos != limit;
01652          ++pos) {
01653       pos->second.last_activity = now;
01654       // Ensure the acknack was for the writer.
01655       if (local == pos->second.localid) {
01656         if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01657           callbacks.push_back(pos->second.listener);
01658           pos->second.status = InterestingRemote::EXISTS;
01659         }
01660       }
01661     }
01662   }
01663 
01664   for (size_t i = 0; i < callbacks.size(); ++i) {
01665     callbacks[i]->reader_exists(remote, local);
01666   }
01667 
01668   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01669   const RtpsWriterMap::iterator rw = writers_.find(local);
01670   if (rw == writers_.end()) {
01671     if (Transport_debug_level > 5) {
01672       GuidConverter local_conv(local);
01673       ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01674         "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01675     }
01676     return;
01677   }
01678 
01679   if (Transport_debug_level > 5) {
01680     GuidConverter local_conv(local), remote_conv(remote);
01681     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01682       "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01683       OPENDDS_STRING(remote_conv).c_str()));
01684   }
01685 
01686   const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01687   if (ri == rw->second.remote_readers_.end()) {
01688     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01689       "WARNING ReaderInfo not found\n"));
01690     return;
01691   }
01692 
01693   if (acknack.count.value <= ri->second.acknack_recvd_count_) {
01694     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01695       "WARNING Count indicates duplicate, dropping\n"));
01696     return;
01697   }
01698 
01699   ri->second.acknack_recvd_count_ = acknack.count.value;
01700 
01701   if (!ri->second.handshake_done_) {
01702     ri->second.handshake_done_ = true;
01703     invoke_on_start_callbacks(true);
01704   }
01705 
01706   OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
01707   const bool final = acknack.smHeader.flags & RTPS::FLAG_F;
01708 
01709   if (!ri->second.durable_data_.empty()) {
01710     if (Transport_debug_level > 5) {
01711       const GuidConverter local_conv(local), remote_conv(remote);
01712       ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01713                  "local %C has durable for remote %C\n",
01714                  OPENDDS_STRING(local_conv).c_str(),
01715                  OPENDDS_STRING(remote_conv).c_str()));
01716     }
01717     SequenceNumber ack;
01718     ack.setValue(acknack.readerSNState.bitmapBase.high,
01719                  acknack.readerSNState.bitmapBase.low);
01720     const SequenceNumber& dd_last = ri->second.durable_data_.rbegin()->first;
01721     if (ack > dd_last) {
01722       // Reader acknowledges durable data, we no longer need to store it
01723       ri->second.durable_data_.swap(pendingCallbacks);
01724       if (Transport_debug_level > 5) {
01725         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01726                    "durable data acked\n"));
01727       }
01728     } else {
01729       DisjointSequence requests;
01730       if (!requests.insert(ack, acknack.readerSNState.numBits,
01731                            acknack.readerSNState.bitmap.get_buffer())
01732           && !final && ack == rw->second.heartbeat_high(ri->second)) {
01733         // This is a non-final AckNack with no bits in the bitmap.
01734         // Attempt to reply to a request for the "base" value which
01735         // is neither Acked nor Nacked, only when it's the HB high.
01736         if (ri->second.durable_data_.count(ack)) requests.insert(ack);
01737       }
01738       // Attempt to reply to nacks for durable data
01739       bool sent_some = false;
01740       typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01741       iter_t it = ri->second.durable_data_.begin();
01742       const OPENDDS_VECTOR(SequenceRange) psr = requests.present_sequence_ranges();
01743       SequenceNumber lastSent = SequenceNumber::ZERO();
01744       if (!requests.empty()) {
01745         lastSent = requests.low().previous();
01746       }
01747       DisjointSequence gaps;
01748       for (size_t i = 0; i < psr.size(); ++i) {
01749         for (; it != ri->second.durable_data_.end()
01750              && it->first < psr[i].first; ++it) ; // empty for-loop
01751         for (; it != ri->second.durable_data_.end()
01752              && it->first <= psr[i].second; ++it) {
01753           if (Transport_debug_level > 5) {
01754             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01755                        "durable resend %d\n", int(it->first.getValue())));
01756           }
01757           durability_resend(it->second);
01758           //FUTURE: combine multiple resends into one RTPS Message?
01759           sent_some = true;
01760           if (it->first > lastSent + 1) {
01761             gaps.insert(SequenceRange(lastSent + 1, it->first.previous()));
01762           }
01763           lastSent = it->first;
01764         }
01765         if (sent_some && lastSent < psr[i].second && psr[i].second < dd_last) {
01766           gaps.insert(SequenceRange(lastSent + 1, psr[i].second));
01767         }
01768       }
01769       if (!gaps.empty()) {
01770         if (Transport_debug_level > 5) {
01771           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01772                      "sending durability gaps: "));
01773           gaps.dump();
01774         }
01775         send_durability_gaps(local, remote, gaps);
01776       }
01777       if (sent_some) {
01778         return;
01779       }
01780       const SequenceNumber& dd_first = ri->second.durable_data_.begin()->first;
01781       if (!requests.empty() && requests.high() < dd_first) {
01782         // All nacks were below the start of the durable data.
01783           requests.insert(SequenceRange(requests.high(), dd_first.previous()));
01784         if (Transport_debug_level > 5) {
01785           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01786                      "sending durability gaps for all requests: "));
01787           requests.dump();
01788         }
01789         send_durability_gaps(local, remote, requests);
01790         return;
01791       }
01792       if (!requests.empty() && requests.low() < dd_first) {
01793         // Lowest nack was below the start of the durable data.
01794         for (size_t i = 0; i < psr.size(); ++i) {
01795           if (psr[i].first > dd_first) {
01796             break;
01797           }
01798           gaps.insert(SequenceRange(psr[i].first,
01799                                     std::min(psr[i].second, dd_first)));
01800         }
01801         if (Transport_debug_level > 5) {
01802           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01803                      "sending durability gaps for some requests: "));
01804           gaps.dump();
01805         }
01806         send_durability_gaps(local, remote, gaps);
01807         return;
01808       }
01809     }
01810   }
01811   SequenceNumber ack;
01812   ack.setValue(acknack.readerSNState.bitmapBase.high,
01813                acknack.readerSNState.bitmapBase.low);
01814   if (ack != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
01815       && ack != SequenceNumber::ZERO()) {
01816     ri->second.cur_cumulative_ack_ = ack;
01817   }
01818   // If this ACKNACK was final, the DR doesn't expect a reply, and therefore
01819   // we don't need to do anything further.
01820   if (!final || bitmapNonEmpty(acknack.readerSNState)) {
01821     ri->second.requested_changes_.push_back(acknack.readerSNState);
01822   }
01823   process_acked_by_all_i(g, local);
01824   g.release();
01825   if (!final) {
01826     nack_reply_.schedule(); // timer will invoke send_nack_replies()
01827   }
01828   typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01829   for (iter_t it = pendingCallbacks.begin();
01830        it != pendingCallbacks.end(); ++it) {
01831     it->second->data_delivered();
01832   }
01833 }
01834 
01835 void
01836 RtpsUdpDataLink::received(const RTPS::NackFragSubmessage& nackfrag,
01837                           const GuidPrefix_t& src_prefix)
01838 {
01839   // local side is DW
01840   RepoId local;
01841   std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01842   local.entityId = nackfrag.writerId; // can't be ENTITYID_UNKNOWN
01843 
01844   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01845   const RtpsWriterMap::iterator rw = writers_.find(local);
01846   if (rw == writers_.end()) {
01847     if (Transport_debug_level > 5) {
01848       GuidConverter local_conv(local);
01849       ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01850         "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01851     }
01852     return;
01853   }
01854 
01855   RepoId remote;
01856   std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01857   remote.entityId = nackfrag.readerId;
01858 
01859   if (Transport_debug_level > 5) {
01860     GuidConverter local_conv(local), remote_conv(remote);
01861     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01862       "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01863       OPENDDS_STRING(remote_conv).c_str()));
01864   }
01865 
01866   const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01867   if (ri == rw->second.remote_readers_.end()) {
01868     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01869       "WARNING ReaderInfo not found\n"));
01870     return;
01871   }
01872 
01873   if (nackfrag.count.value <= ri->second.nackfrag_recvd_count_) {
01874     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01875       "WARNING Count indicates duplicate, dropping\n"));
01876     return;
01877   }
01878 
01879   ri->second.nackfrag_recvd_count_ = nackfrag.count.value;
01880 
01881   SequenceNumber seq;
01882   seq.setValue(nackfrag.writerSN.high, nackfrag.writerSN.low);
01883   ri->second.requested_frags_[seq] = nackfrag.fragmentNumberState;
01884   g.release();
01885   nack_reply_.schedule(); // timer will invoke send_nack_replies()
01886 }
01887 
01888 void
01889 RtpsUdpDataLink::send_nack_replies()
01890 {
01891   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01892   // Reply from local DW to remote DR: GAP or DATA
01893   using namespace OpenDDS::RTPS;
01894   typedef RtpsWriterMap::iterator rw_iter;
01895   for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
01896 
01897     // consolidate requests from N readers
01898     OPENDDS_SET(ACE_INET_Addr) recipients;
01899     DisjointSequence requests;
01900     RtpsWriter& writer = rw->second;
01901 
01902     //track if any messages have been fully acked by all readers
01903     SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
01904 
01905 #if defined(OPENDDS_SECURITY)
01906     const EntityId_t& pvs_writer =
01907       RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
01908     const bool is_pvs_writer =
01909       0 == std::memcmp(&pvs_writer, &rw->first.entityId, sizeof pvs_writer);
01910 #endif
01911 
01912     typedef ReaderInfoMap::iterator ri_iter;
01913     const ri_iter end = writer.remote_readers_.end();
01914     for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01915 
01916       if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
01917         all_readers_ack = ri->second.cur_cumulative_ack_;
01918       }
01919 
01920 #if defined(OPENDDS_SECURITY)
01921       if (is_pvs_writer && !ri->second.requested_changes_.empty()) {
01922         send_directed_nack_replies(rw->first, writer, ri->first, ri->second);
01923         continue;
01924       }
01925 #endif
01926 
01927       process_requested_changes(requests, writer, ri->second);
01928 
01929       if (!ri->second.requested_changes_.empty()) {
01930         if (locators_.count(ri->first)) {
01931           recipients.insert(locators_[ri->first].addr_);
01932           if (Transport_debug_level > 5) {
01933             const GuidConverter local_conv(rw->first), remote_conv(ri->first);
01934             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01935                        "local %C remote %C requested resend\n",
01936                        OPENDDS_STRING(local_conv).c_str(),
01937                        OPENDDS_STRING(remote_conv).c_str()));
01938           }
01939         }
01940         ri->second.requested_changes_.clear();
01941       }
01942     }
01943 
01944     DisjointSequence gaps;
01945     if (!requests.empty()) {
01946       if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
01947         gaps = requests;
01948       } else {
01949         OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
01950         SingleSendBuffer& sb = *writer.send_buff_;
01951         ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
01952         const RtpsUdpSendStrategy::OverrideToken ot =
01953           send_strategy()->override_destinations(recipients);
01954         for (size_t i = 0; i < ranges.size(); ++i) {
01955           if (Transport_debug_level > 5) {
01956             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01957                        "resend data %d-%d\n", int(ranges[i].first.getValue()),
01958                        int(ranges[i].second.getValue())));
01959           }
01960           sb.resend_i(ranges[i], &gaps);
01961         }
01962       }
01963     }
01964 
01965     send_nackfrag_replies(writer, gaps, recipients);
01966 
01967     if (!gaps.empty()) {
01968       if (Transport_debug_level > 5) {
01969         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01970                    "GAPs:"));
01971         gaps.dump();
01972       }
01973       ACE_Message_Block* mb_gap =
01974         marshal_gaps(rw->first, GUID_UNKNOWN, gaps, writer.durable_);
01975       if (mb_gap) {
01976         send_strategy()->send_rtps_control(*mb_gap, recipients);
01977         mb_gap->release();
01978       }
01979     }
01980     if (all_readers_ack == SequenceNumber::MAX_VALUE) {
01981       continue;
01982     }
01983   }
01984 }
01985 
01986 void
01987 RtpsUdpDataLink::send_nackfrag_replies(RtpsWriter& writer,
01988                                        DisjointSequence& gaps,
01989                                        OPENDDS_SET(ACE_INET_Addr)& gap_recipients)
01990 {
01991   typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
01992   OPENDDS_MAP(ACE_INET_Addr, FragmentInfo) requests;
01993 
01994   typedef ReaderInfoMap::iterator ri_iter;
01995   const ri_iter end = writer.remote_readers_.end();
01996   for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01997 
01998     if (ri->second.requested_frags_.empty() || !locators_.count(ri->first)) {
01999       continue;
02000     }
02001 
02002     const ACE_INET_Addr& remote_addr = locators_[ri->first].addr_;
02003 
02004     typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet)::iterator rf_iter;
02005     const rf_iter rf_end = ri->second.requested_frags_.end();
02006     for (rf_iter rf = ri->second.requested_frags_.begin(); rf != rf_end; ++rf) {
02007 
02008       const SequenceNumber& seq = rf->first;
02009       if (writer.send_buff_->contains(seq)) {
02010         FragmentInfo& fi = requests[remote_addr];
02011         fi[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
02012                        rf->second.bitmap.get_buffer());
02013       } else {
02014         gaps.insert(seq);
02015         gap_recipients.insert(remote_addr);
02016       }
02017     }
02018     ri->second.requested_frags_.clear();
02019   }
02020 
02021   typedef OPENDDS_MAP(ACE_INET_Addr, FragmentInfo)::iterator req_iter;
02022   for (req_iter req = requests.begin(); req != requests.end(); ++req) {
02023     const FragmentInfo& fi = req->second;
02024 
02025     ACE_GUARD(TransportSendBuffer::LockType, guard,
02026       writer.send_buff_->strategy_lock());
02027     const RtpsUdpSendStrategy::OverrideToken ot =
02028       send_strategy()->override_destinations(req->first);
02029 
02030     for (FragmentInfo::const_iterator sn_iter = fi.begin();
02031          sn_iter != fi.end(); ++sn_iter) {
02032       const SequenceNumber& seq = sn_iter->first;
02033       writer.send_buff_->resend_fragments_i(seq, sn_iter->second);
02034     }
02035   }
02036 }
02037 
02038 void
02039 RtpsUdpDataLink::process_requested_changes(DisjointSequence& requests,
02040                                            const RtpsWriter& writer,
02041                                            const ReaderInfo& reader)
02042 {
02043   for (size_t i = 0; i < reader.requested_changes_.size(); ++i) {
02044     const RTPS::SequenceNumberSet& sn_state = reader.requested_changes_[i];
02045     SequenceNumber base;
02046     base.setValue(sn_state.bitmapBase.high, sn_state.bitmapBase.low);
02047     if (sn_state.numBits == 1 && !(sn_state.bitmap[0] & 1)
02048         && base == writer.heartbeat_high(reader)) {
02049       // Since there is an entry in requested_changes_, the DR must have
02050       // sent a non-final AckNack.  If the base value is the high end of
02051       // the heartbeat range, treat it as a request for that seq#.
02052       if (!writer.send_buff_.is_nil() && writer.send_buff_->contains(base)) {
02053         requests.insert(base);
02054       }
02055     } else {
02056       requests.insert(base, sn_state.numBits, sn_state.bitmap.get_buffer());
02057     }
02058   }
02059 }
02060 
02061 void
02062 RtpsUdpDataLink::send_directed_nack_replies(const RepoId& writerId,
02063                                             RtpsWriter& writer,
02064                                             const RepoId& readerId,
02065                                             ReaderInfo& reader)
02066 {
02067   if (!locators_.count(readerId)) {
02068     return;
02069   }
02070 
02071   DisjointSequence requests;
02072   process_requested_changes(requests, writer, reader);
02073   reader.requested_changes_.clear();
02074 
02075   DisjointSequence gaps;
02076   ACE_INET_Addr addr = locators_[readerId].addr_;
02077 
02078   if (!requests.empty()) {
02079     if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
02080       gaps = requests;
02081     } else {
02082       OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
02083       SingleSendBuffer& sb = *writer.send_buff_;
02084       ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
02085       const RtpsUdpSendStrategy::OverrideToken ot =
02086         send_strategy()->override_destinations(addr);
02087       for (size_t i = 0; i < ranges.size(); ++i) {
02088         if (Transport_debug_level > 5) {
02089           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_directed_nack_replies "
02090                      "resend data %d-%d\n", int(ranges[i].first.getValue()),
02091                      int(ranges[i].second.getValue())));
02092         }
02093         sb.resend_i(ranges[i], &gaps, readerId);
02094       }
02095     }
02096   }
02097 
02098   if (gaps.empty()) {
02099     return;
02100   }
02101   if (Transport_debug_level > 5) {
02102     ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_directed_nack_replies GAPs: "));
02103     gaps.dump();
02104   }
02105   ACE_Message_Block* mb_gap =
02106     marshal_gaps(writerId, readerId, gaps, writer.durable_);
02107   if (mb_gap) {
02108     send_strategy()->send_rtps_control(*mb_gap, addr);
02109     mb_gap->release();
02110   }
02111 }
02112 
02113 void
02114 RtpsUdpDataLink::process_acked_by_all_i(ACE_Guard<ACE_Thread_Mutex>& g, const RepoId& pub_id)
02115 {
02116   using namespace OpenDDS::RTPS;
02117   typedef RtpsWriterMap::iterator rw_iter;
02118   typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
02119   OPENDDS_VECTOR(RepoId) to_check;
02120   rw_iter rw = writers_.find(pub_id);
02121   if (rw == writers_.end()) {
02122     return;
02123   }
02124   RtpsWriter& writer = rw->second;
02125   if (!writer.elems_not_acked_.empty()) {
02126 
02127     //start with the max sequence number writer knows about and decrease
02128     //by what the min over all readers is
02129     SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
02130 
02131     typedef ReaderInfoMap::iterator ri_iter;
02132     const ri_iter end = writer.remote_readers_.end();
02133     for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
02134       if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
02135         all_readers_ack = ri->second.cur_cumulative_ack_;
02136       }
02137     }
02138     if (all_readers_ack == SequenceNumber::MAX_VALUE) {
02139       return;
02140     }
02141     OPENDDS_VECTOR(SequenceNumber) sns;
02142     //if any messages fully acked, call data delivered and remove from map
02143 
02144     iter_t it = writer.elems_not_acked_.begin();
02145     OPENDDS_SET(SequenceNumber) sns_to_release;
02146     while (it != writer.elems_not_acked_.end()) {
02147       if (it->first < all_readers_ack) {
02148         writer.to_deliver_.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
02149         sns_to_release.insert(it->first);
02150         iter_t last = it;
02151         ++it;
02152         writer.elems_not_acked_.erase(last);
02153       } else {
02154         break;
02155       }
02156     }
02157     OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
02158     while (sns_it != sns_to_release.end()) {
02159       writer.send_buff_->release_acked(*sns_it);
02160       ++sns_it;
02161     }
02162     TransportQueueElement* tqe_to_deliver;
02163 
02164     while (true) {
02165       rw_iter deliver_on_writer = writers_.find(pub_id);
02166       if (deliver_on_writer == writers_.end()) {
02167         break;
02168       }
02169       RtpsWriter& writer = deliver_on_writer->second;
02170       iter_t to_deliver_iter = writer.to_deliver_.begin();
02171       if (to_deliver_iter == writer.to_deliver_.end()) {
02172         break;
02173       }
02174       tqe_to_deliver = to_deliver_iter->second;
02175       writer.to_deliver_.erase(to_deliver_iter);
02176       g.release();
02177 
02178       tqe_to_deliver->data_delivered();
02179       g.acquire();
02180     }
02181   }
02182 }
02183 
02184 ACE_Message_Block*
02185 RtpsUdpDataLink::marshal_gaps(const RepoId& writer, const RepoId& reader,
02186                               const DisjointSequence& gaps, bool durable)
02187 {
02188   using namespace RTPS;
02189   // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
02190   // [gapStart, gapListBase) and those in the SNSet.
02191   const SequenceNumber firstMissing = gaps.low(),
02192                        base = ++SequenceNumber(gaps.cumulative_ack());
02193   const SequenceNumber_t gapStart = {firstMissing.getHigh(),
02194                                      firstMissing.getLow()},
02195                          gapListBase = {base.getHigh(), base.getLow()};
02196   CORBA::ULong num_bits = 0;
02197   LongSeq8 bitmap;
02198 
02199   if (gaps.disjoint()) {
02200     bitmap.length(bitmap_num_longs(base, gaps.high()));
02201     gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits);
02202 
02203   } else {
02204     bitmap.length(1);
02205     bitmap[0] = 0;
02206     num_bits = 1;
02207   }
02208 
02209   GapSubmessage gap = {
02210     {GAP, FLAG_E, 0 /*length determined below*/},
02211     reader.entityId,
02212     writer.entityId,
02213     gapStart,
02214     {gapListBase, num_bits, bitmap}
02215   };
02216 
02217   if (Transport_debug_level > 5) {
02218     const GuidConverter conv(writer);
02219     SequenceRange sr;
02220     sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
02221     SequenceNumber srbase;
02222     srbase.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
02223     sr.second = srbase.previous();
02224     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::marshal_gaps "
02225               "GAP with range [%q, %q] from %C\n",
02226               sr.first.getValue(), sr.second.getValue(),
02227               OPENDDS_STRING(conv).c_str()));
02228   }
02229 
02230   size_t gap_size = 0, padding = 0;
02231   gen_find_size(gap, gap_size, padding);
02232   gap.smHeader.submessageLength =
02233     static_cast<CORBA::UShort>(gap_size + padding) - SMHDR_SZ;
02234 
02235   // For durable writers, change a non-directed Gap into multiple directed gaps.
02236   OPENDDS_VECTOR(RepoId) readers;
02237   if (durable && reader.entityId == ENTITYID_UNKNOWN) {
02238     if (Transport_debug_level > 5) {
02239       const GuidConverter local_conv(writer);
02240       ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps local %C "
02241                  "durable writer\n", OPENDDS_STRING(local_conv).c_str()));
02242     }
02243     const RtpsWriterMap::iterator iter = writers_.find(writer);
02244     RtpsWriter& rw = iter->second;
02245     for (ReaderInfoMap::iterator ri = rw.remote_readers_.begin();
02246          ri != rw.remote_readers_.end(); ++ri) {
02247       if (!ri->second.expecting_durable_data()) {
02248         readers.push_back(ri->first);
02249       } else if (Transport_debug_level > 5) {
02250         const GuidConverter remote_conv(ri->first);
02251         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps reader "
02252                    "%C is expecting durable data, no GAP sent\n",
02253                    OPENDDS_STRING(remote_conv).c_str()));
02254       }
02255     }
02256     if (readers.empty()) return 0;
02257   }
02258 
02259   const size_t size_per_idst = INFO_DST_SZ + SMHDR_SZ,
02260     prefix_sz = sizeof(reader.guidPrefix);
02261   // no additional padding needed for INFO_DST
02262   const size_t total_sz = readers.empty() ? (gap_size + padding) :
02263     (readers.size() * (gap_size + padding + size_per_idst));
02264 
02265   ACE_Message_Block* mb_gap = new ACE_Message_Block(total_sz);
02266   //FUTURE: allocators?
02267   // byte swapping is handled in the operator<<() implementation
02268   Serializer ser(mb_gap, false, Serializer::ALIGN_CDR);
02269   if (readers.empty()) {
02270     ser << gap;
02271   } else {
02272     InfoDestinationSubmessage idst = {
02273       {INFO_DST, FLAG_E, INFO_DST_SZ},
02274       {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02275     };
02276     for (size_t i = 0; i < readers.size(); ++i) {
02277       std::memcpy(idst.guidPrefix, readers[i].guidPrefix, prefix_sz);
02278       gap.readerId = readers[i].entityId;
02279       ser << idst;
02280       ser << gap;
02281     }
02282   }
02283   return mb_gap;
02284 }
02285 
02286 void
02287 RtpsUdpDataLink::durability_resend(TransportQueueElement* element)
02288 {
02289   ACE_Message_Block* msg = const_cast<ACE_Message_Block*>(element->msg());
02290   send_strategy()->send_rtps_control(*msg,
02291                                     get_locator(element->subscription_id()));
02292 }
02293 
02294 void
02295 RtpsUdpDataLink::send_durability_gaps(const RepoId& writer,
02296                                       const RepoId& reader,
02297                                       const DisjointSequence& gaps)
02298 {
02299   ACE_Message_Block mb(RTPS::INFO_DST_SZ + RTPS::SMHDR_SZ);
02300   Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02301   RTPS::InfoDestinationSubmessage info_dst = {
02302     {RTPS::INFO_DST, RTPS::FLAG_E, RTPS::INFO_DST_SZ},
02303     {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02304   };
02305   std::memcpy(info_dst.guidPrefix, reader.guidPrefix, sizeof(GuidPrefix_t));
02306   ser << info_dst;
02307   mb.cont(marshal_gaps(writer, reader, gaps));
02308   send_strategy()->send_rtps_control(mb, get_locator(reader));
02309   mb.cont()->release();
02310 }
02311 
02312 void
02313 RtpsUdpDataLink::send_heartbeats()
02314 {
02315   OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks;
02316   OPENDDS_VECTOR(TransportQueueElement*) pendingCallbacks;
02317 
02318   {
02319     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02320 
02321     if (writers_.empty() && interesting_readers_.empty()) {
02322       heartbeat_->disable();
02323     }
02324 
02325     using namespace OpenDDS::RTPS;
02326     OPENDDS_VECTOR(HeartBeatSubmessage) subm;
02327     OPENDDS_SET(ACE_INET_Addr) recipients;
02328     const ACE_Time_Value now = ACE_OS::gettimeofday();
02329 
02330     RepoIdSet writers_to_advertise;
02331 
02332     RtpsUdpInst& config = this->config();
02333 
02334     const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config.heartbeat_period_;
02335     const ACE_Time_Value tv3 = ACE_OS::gettimeofday() - 3 * config.heartbeat_period_;
02336     for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
02337            limit = interesting_readers_.end();
02338          pos != limit;
02339          ++pos) {
02340       if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
02341           (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
02342         recipients.insert(pos->second.address);
02343         writers_to_advertise.insert(pos->second.localid);
02344       }
02345       if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02346         CallbackType callback(pos->first, pos->second);
02347         readerDoesNotExistCallbacks.push_back(callback);
02348         pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02349       }
02350     }
02351 
02352     typedef RtpsWriterMap::iterator rw_iter;
02353     for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
02354       const bool has_data = !rw->second.send_buff_.is_nil()
02355                             && !rw->second.send_buff_->empty();
02356       bool final = true, has_durable_data = false;
02357       SequenceNumber durable_max;
02358 
02359       typedef ReaderInfoMap::iterator ri_iter;
02360       const ri_iter end = rw->second.remote_readers_.end();
02361       for (ri_iter ri = rw->second.remote_readers_.begin(); ri != end; ++ri) {
02362         if ((has_data || !ri->second.handshake_done_)
02363             && locators_.count(ri->first)) {
02364           recipients.insert(locators_[ri->first].addr_);
02365           if (final && !ri->second.handshake_done_) {
02366             final = false;
02367           }
02368         }
02369         if (!ri->second.durable_data_.empty()) {
02370           const ACE_Time_Value expiration =
02371             ri->second.durable_timestamp_ + config.durable_data_timeout_;
02372           if (now > expiration) {
02373             typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator
02374               dd_iter;
02375             for (dd_iter it = ri->second.durable_data_.begin();
02376                  it != ri->second.durable_data_.end(); ++it) {
02377               pendingCallbacks.push_back(it->second);
02378             }
02379             ri->second.durable_data_.clear();
02380             if (Transport_debug_level > 3) {
02381               const GuidConverter gw(rw->first), gr(ri->first);
02382               VDBG_LVL((LM_INFO, "(%P|%t) RtpsUdpDataLink::send_heartbeats - "
02383                 "removed expired durable data for %C -> %C\n",
02384                 OPENDDS_STRING(gw).c_str(), OPENDDS_STRING(gr).c_str()), 3);
02385             }
02386           } else {
02387             has_durable_data = true;
02388             if (ri->second.durable_data_.rbegin()->first > durable_max) {
02389               durable_max = ri->second.durable_data_.rbegin()->first;
02390             }
02391             if (locators_.count(ri->first)) {
02392               recipients.insert(locators_[ri->first].addr_);
02393             }
02394           }
02395         }
02396       }
02397 
02398       if (!rw->second.elems_not_acked_.empty()) {
02399         final = false;
02400       }
02401 
02402       if (writers_to_advertise.count(rw->first)) {
02403         final = false;
02404         writers_to_advertise.erase(rw->first);
02405       }
02406 
02407       if (final && !has_data && !has_durable_data) {
02408         continue;
02409       }
02410 
02411       const SequenceNumber firstSN = (rw->second.durable_ || !has_data)
02412                                      ? 1 : rw->second.send_buff_->low(),
02413           lastSN = std::max(durable_max,
02414                             has_data ? rw->second.send_buff_->high() : 1);
02415 
02416       const HeartBeatSubmessage hb = {
02417         {HEARTBEAT,
02418          CORBA::Octet(FLAG_E | (final ? FLAG_F : 0)),
02419          HEARTBEAT_SZ},
02420         ENTITYID_UNKNOWN, // any matched reader may be interested in this
02421         rw->first.entityId,
02422         {firstSN.getHigh(), firstSN.getLow()},
02423         {lastSN.getHigh(), lastSN.getLow()},
02424         {++heartbeat_counts_[rw->first]}
02425       };
02426       subm.push_back(hb);
02427     }
02428 
02429     for (RepoIdSet::const_iterator pos = writers_to_advertise.begin(),
02430            limit = writers_to_advertise.end();
02431          pos != limit;
02432          ++pos) {
02433       const SequenceNumber SN = 1;
02434       const HeartBeatSubmessage hb = {
02435         {HEARTBEAT,
02436          FLAG_E,
02437          HEARTBEAT_SZ},
02438         ENTITYID_UNKNOWN, // any matched reader may be interested in this
02439         pos->entityId,
02440         {SN.getHigh(), SN.getLow()},
02441         {SN.getHigh(), SN.getLow()},
02442         {++heartbeat_counts_[*pos]}
02443       };
02444       subm.push_back(hb);
02445     }
02446 
02447     send_directed_heartbeats(subm);
02448 
02449     if (!subm.empty()) {
02450       ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * subm.size()); //FUTURE: allocators?
02451       // byte swapping is handled in the operator<<() implementation
02452       Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02453       bool send_ok = true;
02454       for (size_t i = 0; i < subm.size(); ++i) {
02455         if (!(ser << subm[i])) {
02456           ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats() - "
02457             "failed to serialize HEARTBEAT submessage %B\n", i));
02458           send_ok = false;
02459           break;
02460         }
02461       }
02462       if (send_ok) {
02463         send_strategy()->send_rtps_control(mb, recipients);
02464       }
02465     }
02466   }
02467 
02468   for (OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks.begin();
02469       iter != readerDoesNotExistCallbacks.end(); ++iter){
02470     const InterestingRemote& remote = iter->second;
02471     remote.listener->reader_does_not_exist(iter->first, remote.localid);
02472   }
02473 
02474   for (size_t i = 0; i < pendingCallbacks.size(); ++i) {
02475     pendingCallbacks[i]->data_dropped();
02476   }
02477 }
02478 
02479 void
02480 RtpsUdpDataLink::send_directed_heartbeats(OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)& hbs)
02481 {
02482 #if defined(OPENDDS_SECURITY)
02483   const EntityId_t& volatile_writer =
02484     RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
02485 
02486   RTPS::InfoDestinationSubmessage idst;
02487   idst.smHeader.submessageId = RTPS::INFO_DST;
02488   idst.smHeader.flags = RTPS::FLAG_E;
02489   idst.smHeader.submessageLength = RTPS::INFO_DST_SZ;
02490   const size_t block_size = RTPS::INFO_DST_SZ + RTPS::HEARTBEAT_SZ
02491     + 2 * RTPS::SMHDR_SZ;
02492   Message_Block_Ptr mb;
02493 
02494   typedef OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)::iterator iter_t;
02495   iter_t it = hbs.begin(), last = hbs.end();
02496   while (it != last) {
02497     if (0 == std::memcmp(&it->writerId, &volatile_writer, sizeof(EntityId_t))) {
02498       RepoId local;
02499       RTPS::assign(local.guidPrefix, local_prefix_);
02500       local.entityId = it->writerId;
02501       RtpsWriterMap::const_iterator rw = writers_.find(local);
02502       if (rw != writers_.end()) {
02503         const ReaderInfoMap& rinfo = rw->second.remote_readers_;
02504         for (ReaderInfoMap::const_iterator ri = rinfo.begin();
02505              ri != rinfo.end(); ++ri) {
02506           RTPS::assign(idst.guidPrefix, ri->first.guidPrefix);
02507           it->readerId = ri->first.entityId;
02508           if (mb) {
02509             mb->reset();
02510           } else {
02511             mb.reset(new ACE_Message_Block(block_size));
02512           }
02513           Serializer ser(mb.get(), false, Serializer::ALIGN_CDR);
02514           ser << idst;
02515           ser << *it;
02516           send_strategy()->send_rtps_control(*mb, locators_[ri->first].addr_);
02517         }
02518       }
02519       std::iter_swap(it, --last);
02520     } else {
02521       ++it;
02522     }
02523   }
02524   hbs.erase(last, hbs.end());
02525 #else
02526   ACE_UNUSED_ARG(hbs);
02527 #endif
02528 }
02529 
02530 void
02531 RtpsUdpDataLink::check_heartbeats()
02532 {
02533   OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks;
02534 
02535   // Have any interesting writers timed out?
02536   const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * this->config().heartbeat_period_;
02537   {
02538       ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02539 
02540       for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
02541            pos != limit;
02542            ++pos) {
02543         if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02544           CallbackType callback(pos->first, pos->second);
02545           writerDoesNotExistCallbacks.push_back(callback);
02546           pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02547         }
02548     }
02549   }
02550 
02551   OPENDDS_VECTOR(CallbackType)::iterator iter;
02552   for (iter = writerDoesNotExistCallbacks.begin(); iter != writerDoesNotExistCallbacks.end(); ++iter) {
02553     const RepoId& rid = iter->first;
02554     const InterestingRemote& remote = iter->second;
02555     remote.listener->writer_does_not_exist(rid, remote.localid);
02556   }
02557 }
02558 
02559 void
02560 RtpsUdpDataLink::send_heartbeats_manual(const TransportSendControlElement* tsce)
02561 {
02562   using namespace OpenDDS::RTPS;
02563 
02564   const RepoId pub_id = tsce->publication_id();
02565 
02566   // Populate the recipients.
02567   OPENDDS_SET(ACE_INET_Addr) recipients;
02568   get_locators (pub_id, recipients);
02569   if (recipients.empty()) {
02570     return;
02571   }
02572 
02573   // Populate the sequence numbers and counter.
02574 
02575   SequenceNumber firstSN, lastSN;
02576   CORBA::Long counter;
02577   RtpsWriterMap::iterator pos = writers_.find (pub_id);
02578   if (pos != writers_.end ()) {
02579     // Reliable.
02580     const bool has_data = !pos->second.send_buff_.is_nil() && !pos->second.send_buff_->empty();
02581     SequenceNumber durable_max;
02582     const ACE_Time_Value now = ACE_OS::gettimeofday();
02583     for (ReaderInfoMap::const_iterator ri = pos->second.remote_readers_.begin(), end = pos->second.remote_readers_.end();
02584          ri != end;
02585          ++ri) {
02586       if (!ri->second.durable_data_.empty()) {
02587         const ACE_Time_Value expiration = ri->second.durable_timestamp_ + config().durable_data_timeout_;
02588         if (now <= expiration &&
02589             ri->second.durable_data_.rbegin()->first > durable_max) {
02590           durable_max = ri->second.durable_data_.rbegin()->first;
02591         }
02592       }
02593     }
02594     firstSN = (pos->second.durable_ || !has_data) ? 1 : pos->second.send_buff_->low();
02595     lastSN = std::max(durable_max, has_data ? pos->second.send_buff_->high() : 1);
02596     counter = ++heartbeat_counts_[pos->first];
02597   } else {
02598     // Unreliable.
02599     firstSN = 1;
02600     lastSN = tsce->sequence();
02601     counter = ++this->best_effort_heartbeat_count_;
02602   }
02603 
02604   const HeartBeatSubmessage hb = {
02605     {HEARTBEAT,
02606      CORBA::Octet(FLAG_E | FLAG_F | FLAG_L),
02607      HEARTBEAT_SZ},
02608     ENTITYID_UNKNOWN, // any matched reader may be interested in this
02609     pub_id.entityId,
02610     {firstSN.getHigh(), firstSN.getLow()},
02611     {lastSN.getHigh(), lastSN.getLow()},
02612     {counter}
02613   };
02614 
02615   ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * 1); //FUTURE: allocators?
02616   // byte swapping is handled in the operator<<() implementation
02617   Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02618   if ((ser << hb)) {
02619     send_strategy()->send_rtps_control(mb, recipients);
02620   }
02621   else {
02622     ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats_manual() - "
02623                "failed to serialize HEARTBEAT submessage\n"));
02624   }
02625 }
02626 
02627 #if defined(OPENDDS_SECURITY)
02628 void
02629 RtpsUdpDataLink::populate_security_handles(const RepoId& local_id,
02630                                            const RepoId& remote_id,
02631                                            const unsigned char* buffer,
02632                                            unsigned int buffer_size)
02633 {
02634   using DDS::Security::ParticipantCryptoHandle;
02635   using DDS::Security::DatawriterCryptoHandle;
02636   using DDS::Security::DatareaderCryptoHandle;
02637 
02638   ACE_Data_Block db(buffer_size, ACE_Message_Block::MB_DATA,
02639     reinterpret_cast<const char*>(buffer),
02640     0 /*alloc*/, 0 /*lock*/, ACE_Message_Block::DONT_DELETE, 0 /*db_alloc*/);
02641   ACE_Message_Block mb(&db, ACE_Message_Block::DONT_DELETE, 0 /*mb_alloc*/);
02642   mb.wr_ptr(mb.space());
02643   DCPS::Serializer ser(&mb, ACE_CDR_BYTE_ORDER, DCPS::Serializer::ALIGN_CDR);
02644 
02645   const EntityKind localKind(GuidConverter(local_id).entityKind());
02646   const RepoId& writer_id = (localKind == KIND_WRITER) ? local_id : remote_id;
02647   const RepoId& reader_id = (localKind == KIND_WRITER) ? remote_id : local_id;
02648 
02649   while (mb.length()) {
02650     DDS::BinaryProperty_t prop;
02651     if (!(ser >> prop)) {
02652       ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::populate_security_handles()"
02653                  " - failed to deserialize BinaryProperty_t\n"));
02654       return;
02655     }
02656 
02657     if (std::strcmp(prop.name.in(), RTPS::BLOB_PROP_PART_CRYPTO_HANDLE) == 0
02658         && prop.value.length() >= sizeof(ParticipantCryptoHandle)) {
02659       unsigned int handle = 0;
02660       for (unsigned int i = 0; i < prop.value.length(); ++i) {
02661         handle = handle << 8 | prop.value[i];
02662       }
02663 
02664       RepoId remote_participant;
02665       RTPS::assign(remote_participant.guidPrefix, remote_id.guidPrefix);
02666       remote_participant.entityId = ENTITYID_PARTICIPANT;
02667       peer_crypto_handles_[remote_participant] = handle;
02668       ACE_DEBUG((LM_DEBUG, "(%P|%t) RPCH %C = %d\n",
02669                  std::string(GuidConverter(remote_participant)).c_str(), handle));
02670     }
02671 
02672     else if (std::strcmp(prop.name.in(), RTPS::BLOB_PROP_DW_CRYPTO_HANDLE) == 0
02673              && prop.value.length() >= sizeof(DatawriterCryptoHandle)) {
02674       unsigned int handle = 0;
02675       for (unsigned int i = 0; i < prop.value.length(); ++i) {
02676         handle = handle << 8 | prop.value[i];
02677       }
02678       peer_crypto_handles_[writer_id] = handle;
02679       ACE_DEBUG((LM_DEBUG, "(%P|%t) DWCH %C = %d\n",
02680                  std::string(GuidConverter(writer_id)).c_str(), handle));
02681     }
02682 
02683     else if (std::strcmp(prop.name.in(), RTPS::BLOB_PROP_DR_CRYPTO_HANDLE) == 0
02684              && prop.value.length() >= sizeof(DatareaderCryptoHandle)) {
02685       unsigned int handle = 0;
02686       for (unsigned int i = 0; i < prop.value.length(); ++i) {
02687         handle = handle << 8 | prop.value[i];
02688       }
02689       peer_crypto_handles_[reader_id] = handle;
02690       ACE_DEBUG((LM_DEBUG, "(%P|%t) DRCH %C = %d\n",
02691                  std::string(GuidConverter(reader_id)).c_str(), handle));
02692     }
02693 
02694   }
02695 }
02696 #endif
02697 
02698 RtpsUdpDataLink::ReaderInfo::~ReaderInfo()
02699 {
02700   expire_durable_data();
02701 }
02702 
02703 void
02704 RtpsUdpDataLink::ReaderInfo::expire_durable_data()
02705 {
02706   typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
02707   for (iter_t it = durable_data_.begin(); it != durable_data_.end(); ++it) {
02708     it->second->data_dropped();
02709   }
02710 }
02711 
02712 bool
02713 RtpsUdpDataLink::ReaderInfo::expecting_durable_data() const
02714 {
02715   return durable_ &&
02716     (durable_timestamp_ == ACE_Time_Value::zero // DW hasn't resent yet
02717      || !durable_data_.empty());                // DW resent, not sent to reader
02718 }
02719 
02720 RtpsUdpDataLink::RtpsWriter::~RtpsWriter()
02721 {
02722   if (!to_deliver_.empty()) {
02723     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: RtpsWriter::~RtpsWriter - deleting with %d elements left to deliver\n", to_deliver_.size()));
02724   }
02725   if (!elems_not_acked_.empty()) {
02726     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: RtpsWriter::~RtpsWriter - deleting with %d elements left not fully acknowledged\n", elems_not_acked_.size()));
02727   }
02728 }
02729 
02730 SequenceNumber
02731 RtpsUdpDataLink::RtpsWriter::heartbeat_high(const ReaderInfo& ri) const
02732 {
02733   const SequenceNumber durable_max =
02734     ri.durable_data_.empty() ? 0 : ri.durable_data_.rbegin()->first;
02735   const SequenceNumber data_max =
02736     send_buff_.is_nil() ? 0 : (send_buff_->empty() ? 0 : send_buff_->high());
02737   return std::max(durable_max, data_max);
02738 }
02739 
02740 void
02741 RtpsUdpDataLink::RtpsWriter::add_elem_awaiting_ack(TransportQueueElement* element)
02742 {
02743   elems_not_acked_.insert(SnToTqeMap::value_type(element->sequence(), element));
02744 }
02745 
02746 
02747 // Implementing TimedDelay and HeartBeat nested classes (for ACE timers)
02748 
02749 void
02750 RtpsUdpDataLink::TimedDelay::schedule()
02751 {
02752   if (!scheduled_) {
02753     const long timer = outer_->get_reactor()->schedule_timer(this, 0, timeout_);
02754 
02755     if (timer == -1) {
02756       ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::TimedDelay::schedule "
02757         "failed to schedule timer %p\n", ACE_TEXT("")));
02758     } else {
02759       scheduled_ = true;
02760     }
02761   }
02762 }
02763 
02764 void
02765 RtpsUdpDataLink::TimedDelay::cancel()
02766 {
02767   if (scheduled_) {
02768     outer_->get_reactor()->cancel_timer(this);
02769     scheduled_ = false;
02770   }
02771 }
02772 
02773 void
02774 RtpsUdpDataLink::HeartBeat::enable()
02775 {
02776   if (!enabled_) {
02777     const ACE_Time_Value& per = outer_->config().heartbeat_period_;
02778     const long timer =
02779       outer_->get_reactor()->schedule_timer(this, 0, ACE_Time_Value::zero, per);
02780 
02781     if (timer == -1) {
02782       ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::HeartBeat::enable"
02783         " failed to schedule timer %p\n", ACE_TEXT("")));
02784     } else {
02785       enabled_ = true;
02786     }
02787   }
02788 }
02789 
02790 void
02791 RtpsUdpDataLink::HeartBeat::disable()
02792 {
02793   if (enabled_) {
02794     outer_->get_reactor()->cancel_timer(this);
02795     enabled_ = false;
02796   }
02797 }
02798 
02799 void
02800 RtpsUdpDataLink::send_final_acks (const RepoId& readerid)
02801 {
02802   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02803   RtpsReaderMap::iterator rr = readers_.find (readerid);
02804   if (rr != readers_.end ()) {
02805     send_ack_nacks (rr, true);
02806   }
02807 }
02808 
02809 
02810 int
02811 RtpsUdpDataLink::HeldDataDeliveryHandler::handle_exception(ACE_HANDLE /* fd */)
02812 {
02813   ACE_ASSERT(link_->reactor_task_->get_reactor_owner() == ACE_Thread::self());
02814 
02815   HeldData::iterator itr;
02816   for (itr = held_data_.begin(); itr != held_data_.end(); ++itr) {
02817     link_->data_received(itr->first, itr->second);
02818   }
02819   held_data_.clear();
02820   return 0;
02821 }
02822 
02823 void RtpsUdpDataLink::HeldDataDeliveryHandler::notify_delivery(const RepoId& readerId, WriterInfo& info)
02824 {
02825   ACE_ASSERT(link_->reactor_task_->get_reactor_owner() == ACE_Thread::self());
02826 
02827   const SequenceNumber ca = info.recvd_.cumulative_ack();
02828   typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter;
02829   const iter end = info.held_.upper_bound(ca);
02830 
02831   for (iter it = info.held_.begin(); it != end; /*increment in loop body*/) {
02832     if (Transport_debug_level > 5) {
02833       GuidConverter reader(readerId);
02834       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::HeldDataDeliveryHandler::notify_delivery -")
02835                            ACE_TEXT(" deliver sequence: %q to %C\n"),
02836                            it->second.header_.sequence_.getValue(),
02837                            OPENDDS_STRING(reader).c_str()));
02838     }
02839     // The head_data_ is not protected by a mutex because it is always accessed from the reactor task thread.
02840     held_data_.push_back(HeldDataEntry(it->second, readerId));
02841     info.held_.erase(it++);
02842   }
02843   link_->reactor_task_->get_reactor()->notify(this);
02844 }
02845 
02846 ACE_Event_Handler::Reference_Count
02847 RtpsUdpDataLink::HeldDataDeliveryHandler::add_reference()
02848 {
02849   return link_->add_reference();
02850 }
02851 
02852 ACE_Event_Handler::Reference_Count
02853 RtpsUdpDataLink::HeldDataDeliveryHandler::remove_reference()
02854 {
02855   return link_->remove_reference();
02856 }
02857 
02858 OpenDDS::DCPS::RtpsUdpSendStrategy*
02859 OpenDDS::DCPS::RtpsUdpDataLink::send_strategy()
02860 {
02861   return static_cast<OpenDDS::DCPS::RtpsUdpSendStrategy*>(send_strategy_.in());
02862 }
02863 
02864 OpenDDS::DCPS::RtpsUdpReceiveStrategy*
02865 OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy()
02866 {
02867   return static_cast<OpenDDS::DCPS::RtpsUdpReceiveStrategy*>(receive_strategy_.in());
02868 }
02869 
02870 
02871 } // namespace DCPS
02872 } // namespace OpenDDS
02873 
02874 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1