RtpsUdpDataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsUdpDataLink.h"
00009 #include "RtpsUdpTransport.h"
00010 #include "RtpsUdpInst.h"
00011 
00012 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00013 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00014 #include "dds/DCPS/transport/framework/TransportSendControlElement.h"
00015 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00016 
00017 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00018 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00019 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00020 #include "dds/DCPS/RTPS/MessageTypes.h"
00021 
00022 #include "ace/Default_Constants.h"
00023 #include "ace/Log_Msg.h"
00024 #include "ace/Message_Block.h"
00025 #include "ace/Reverse_Lock_T.h"
00026 #include "ace/Reactor.h"
00027 
00028 #include <string.h>
00029 
00030 #ifndef __ACE_INLINE__
00031 # include "RtpsUdpDataLink.inl"
00032 #endif  /* __ACE_INLINE__ */
00033 
00034 namespace {
00035 
00036 /// Return the number of CORBA::Longs required for the bitmap representation of
00037 /// sequence numbers between low and high, inclusive (maximum 8 longs).
00038 CORBA::ULong
00039 bitmap_num_longs(const OpenDDS::DCPS::SequenceNumber& low,
00040                  const OpenDDS::DCPS::SequenceNumber& high)
00041 {
00042   return std::min(CORBA::ULong(8),
00043                   CORBA::ULong((high.getValue() - low.getValue() + 32) / 32));
00044 }
00045 
00046 }
00047 
00048 namespace OpenDDS {
00049 namespace DCPS {
00050 
00051 RtpsUdpDataLink::RtpsUdpDataLink(RtpsUdpTransport* transport,
00052                                  const GuidPrefix_t& local_prefix,
00053                                  RtpsUdpInst* config,
00054                                  TransportReactorTask* reactor_task)
00055   : DataLink(transport, // 3 data link "attributes", below, are unused
00056              0,         // priority
00057              false,     // is_loopback
00058              false),    // is_active
00059     config_(config),
00060     reactor_task_(reactor_task, false),
00061     rtps_customized_element_allocator_(40, sizeof(RtpsCustomizedElement)),
00062     multi_buff_(this, config->nak_depth_),
00063     best_effort_heartbeat_count_(0),
00064     nack_reply_(this, &RtpsUdpDataLink::send_nack_replies,
00065                 config->nak_response_delay_),
00066     heartbeat_reply_(this, &RtpsUdpDataLink::send_heartbeat_replies,
00067                      config->heartbeat_response_delay_),
00068   heartbeat_(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::send_heartbeats),
00069   heartbeatchecker_(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::check_heartbeats)
00070 {
00071   std::memcpy(local_prefix_, local_prefix, sizeof(GuidPrefix_t));
00072 }
00073 
00074 bool
00075 RtpsUdpDataLink::add_delayed_notification(TransportQueueElement* element)
00076 {
00077   RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
00078   if (iter != writers_.end()) {
00079 
00080     iter->second.add_elem_awaiting_ack(element);
00081     return true;
00082   }
00083   return false;
00084 }
00085 
00086 void RtpsUdpDataLink::do_remove_sample(const RepoId& pub_id,
00087   const TransportQueueElement::MatchCriteria& criteria)
00088 {
00089   RtpsWriter::SnToTqeMap sn_tqe_map;
00090   RtpsWriter::SnToTqeMap to_deliver;
00091   typedef RtpsWriter::SnToTqeMap::iterator iter_t;
00092 
00093   {
00094     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00095 
00096     RtpsWriterMap::iterator iter = writers_.find(pub_id);
00097     if (iter != writers_.end() && !iter->second.elems_not_acked_.empty()) {
00098       to_deliver.insert(iter->second.to_deliver_.begin(), iter->second.to_deliver_.end());
00099       iter->second.to_deliver_.clear();
00100       iter_t it = iter->second.elems_not_acked_.begin();
00101       OPENDDS_SET(SequenceNumber) sns_to_release;
00102       while (it != iter->second.elems_not_acked_.end()) {
00103         if (criteria.matches(*it->second)) {
00104           sn_tqe_map.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
00105           sns_to_release.insert(it->first);
00106           iter_t last = it;
00107           ++it;
00108           iter->second.elems_not_acked_.erase(last);
00109         } else {
00110           ++it;
00111         }
00112       }
00113       OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00114       while (sns_it != sns_to_release.end()) {
00115         iter->second.send_buff_->release_acked(*sns_it);
00116         ++sns_it;
00117       }
00118     }
00119   }
00120   iter_t deliver_iter = to_deliver.begin();
00121   while (deliver_iter != to_deliver.end()) {
00122     deliver_iter->second->data_delivered();
00123     ++deliver_iter;
00124   }
00125   iter_t drop_iter = sn_tqe_map.begin();
00126   while (drop_iter != sn_tqe_map.end()) {
00127     drop_iter->second->data_dropped(true);
00128     ++drop_iter;
00129   }
00130 }
00131 
00132 bool
00133 RtpsUdpDataLink::open(const ACE_SOCK_Dgram& unicast_socket)
00134 {
00135   unicast_socket_ = unicast_socket;
00136 
00137   if (config_->use_multicast_) {
00138     const OPENDDS_STRING& net_if = config_->multicast_interface_;
00139 #ifdef ACE_HAS_MAC_OSX
00140     multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00141                            ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00142 #endif
00143     if (multicast_socket_.join(config_->multicast_group_address_, 1,
00144                                net_if.empty() ? 0 :
00145                                ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00146       ACE_ERROR_RETURN((LM_ERROR,
00147                         ACE_TEXT("(%P|%t) ERROR: ")
00148                         ACE_TEXT("RtpsUdpDataLink::open: ")
00149                         ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed: %m\n")),
00150                        false);
00151     }
00152   }
00153 
00154   if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, config_->ttl_)) {
00155     ACE_ERROR_RETURN((LM_ERROR,
00156                       ACE_TEXT("(%P|%t) ERROR: ")
00157                       ACE_TEXT("RtpsUdpDataLink::open: ")
00158                       ACE_TEXT("failed to set TTL: %d\n"),
00159                       config_->ttl_),
00160                      false);
00161   }
00162 
00163   send_strategy_->send_buffer(&multi_buff_);
00164 
00165   if (start(static_rchandle_cast<TransportSendStrategy>(send_strategy_),
00166             static_rchandle_cast<TransportStrategy>(recv_strategy_)) != 0) {
00167     stop_i();
00168     ACE_ERROR_RETURN((LM_ERROR,
00169                       ACE_TEXT("(%P|%t) ERROR: ")
00170                       ACE_TEXT("UdpDataLink::open: start failed!\n")),
00171                      false);
00172   }
00173 
00174   return true;
00175 }
00176 
00177 void
00178 RtpsUdpDataLink::add_locator(const RepoId& remote_id,
00179                              const ACE_INET_Addr& address,
00180                              bool requires_inline_qos)
00181 {
00182   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00183   locators_[remote_id] = RemoteInfo(address, requires_inline_qos);
00184 }
00185 
00186 void
00187 RtpsUdpDataLink::get_locators(const RepoId& local_id,
00188                               OPENDDS_SET(ACE_INET_Addr)& addrs) const
00189 {
00190   typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00191 
00192   if (local_id == GUID_UNKNOWN) {
00193     for (iter_t iter = locators_.begin(); iter != locators_.end(); ++iter) {
00194       addrs.insert(iter->second.addr_);
00195     }
00196     return;
00197   }
00198 
00199   const GUIDSeq_var peers = peer_ids(local_id);
00200   if (!peers.ptr()) {
00201     return;
00202   }
00203   for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00204     const ACE_INET_Addr addr = get_locator(peers[i]);
00205     if (addr != ACE_INET_Addr()) {
00206       addrs.insert(addr);
00207     }
00208   }
00209 }
00210 
00211 ACE_INET_Addr
00212 RtpsUdpDataLink::get_locator(const RepoId& remote_id) const
00213 {
00214   typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00215   const iter_t iter = locators_.find(remote_id);
00216   if (iter == locators_.end()) {
00217     const GuidConverter conv(remote_id);
00218     ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::get_locator_i() - "
00219       "no locator found for peer %C\n", OPENDDS_STRING(conv).c_str()));
00220     return ACE_INET_Addr();
00221   }
00222   return iter->second.addr_;
00223 }
00224 
00225 void
00226 RtpsUdpDataLink::associated(const RepoId& local_id, const RepoId& remote_id,
00227                             bool local_reliable, bool remote_reliable,
00228                             bool local_durable, bool remote_durable)
00229 {
00230   if (!local_reliable) {
00231     return;
00232   }
00233 
00234   bool enable_heartbeat = false;
00235 
00236   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00237   const GuidConverter conv(local_id);
00238   const EntityKind kind = conv.entityKind();
00239   if (kind == KIND_WRITER && remote_reliable) {
00240     // Insert count if not already there.
00241     heartbeat_counts_.insert(HeartBeatCountMapType::value_type(local_id, 0));
00242     RtpsWriter& w = writers_[local_id];
00243     w.remote_readers_[remote_id].durable_ = remote_durable;
00244     w.durable_ = local_durable;
00245     enable_heartbeat = true;
00246 
00247   } else if (kind == KIND_READER) {
00248     RtpsReaderMap::iterator rr = readers_.find(local_id);
00249     if (rr == readers_.end()) {
00250       rr = readers_.insert(RtpsReaderMap::value_type(local_id, RtpsReader()))
00251         .first;
00252       rr->second.durable_ = local_durable;
00253     }
00254     rr->second.remote_writers_[remote_id];
00255     reader_index_.insert(RtpsReaderIndex::value_type(remote_id, rr));
00256   }
00257 
00258   g.release();
00259   if (enable_heartbeat) {
00260     heartbeat_.schedule_enable();
00261   }
00262 }
00263 
00264 bool
00265 RtpsUdpDataLink::check_handshake_complete(const RepoId& local_id,
00266                                           const RepoId& remote_id)
00267 {
00268   const GuidConverter conv(local_id);
00269   const EntityKind kind = conv.entityKind();
00270   if (kind == KIND_WRITER) {
00271     RtpsWriterMap::iterator rw = writers_.find(local_id);
00272     if (rw == writers_.end()) {
00273       return true; // not reliable, no handshaking
00274     }
00275     ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote_id);
00276     if (ri == rw->second.remote_readers_.end()) {
00277       return true; // not reliable, no handshaking
00278     }
00279     return ri->second.handshake_done_;
00280 
00281   } else if (kind == KIND_READER) {
00282     return true; // no handshaking for local reader
00283   }
00284   return false;
00285 }
00286 
00287 void
00288 RtpsUdpDataLink::register_for_reader(const RepoId& writerid,
00289                                      const RepoId& readerid,
00290                                      const ACE_INET_Addr& address,
00291                                      OpenDDS::DCPS::DiscoveryListener* listener)
00292 {
00293   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00294   bool enableheartbeat = interesting_readers_.empty();
00295   interesting_readers_.insert(InterestingRemoteMapType::value_type(readerid, InterestingRemote(writerid, address, listener)));
00296   heartbeat_counts_[writerid] = 0;
00297   g.release();
00298   if (enableheartbeat) {
00299     heartbeat_.schedule_enable();
00300   }
00301 }
00302 
00303 void
00304 RtpsUdpDataLink::unregister_for_reader(const RepoId& writerid,
00305                                        const RepoId& readerid)
00306 {
00307   OPENDDS_VECTOR(CallbackType) to_notify;
00308   {
00309     ACE_GUARD(ACE_Thread_Mutex, c, reader_no_longer_exists_lock_);
00310     to_notify.swap(readerDoesNotExistCallbacks_);
00311   }
00312   OPENDDS_VECTOR(CallbackType)::iterator iter = to_notify.begin();
00313   while(iter != to_notify.end()) {
00314     const RepoId& rid = iter->first;
00315     const InterestingRemote& remote = iter->second;
00316     remote.listener->reader_does_not_exist(rid, remote.localid);
00317     ++iter;
00318   }
00319   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00320   for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
00321          limit = interesting_readers_.upper_bound(readerid);
00322        pos != limit;
00323        ) {
00324     if (pos->second.localid == writerid) {
00325       interesting_readers_.erase(pos++);
00326     } else {
00327       ++pos;
00328     }
00329   }
00330 }
00331 
00332 void
00333 RtpsUdpDataLink::register_for_writer(const RepoId& readerid,
00334                                      const RepoId& writerid,
00335                                      const ACE_INET_Addr& address,
00336                                      OpenDDS::DCPS::DiscoveryListener* listener)
00337 {
00338   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00339   bool enableheartbeatchecker = interesting_writers_.empty();
00340   interesting_writers_.insert(InterestingRemoteMapType::value_type(writerid, InterestingRemote(readerid, address, listener)));
00341   g.release();
00342   if (enableheartbeatchecker) {
00343     heartbeatchecker_.schedule_enable();
00344   }
00345 }
00346 
00347 void
00348 RtpsUdpDataLink::unregister_for_writer(const RepoId& readerid,
00349                                        const RepoId& writerid)
00350 {
00351   OPENDDS_VECTOR(CallbackType) to_notify;
00352   {
00353     ACE_GUARD(ACE_Thread_Mutex, c, writer_no_longer_exists_lock_);
00354     to_notify.swap(writerDoesNotExistCallbacks_);
00355   }
00356   OPENDDS_VECTOR(CallbackType)::iterator iter = to_notify.begin();
00357   while(iter != to_notify.end()) {
00358     const RepoId& rid = iter->first;
00359     const InterestingRemote& remote = iter->second;
00360     remote.listener->writer_does_not_exist(rid, remote.localid);
00361     ++iter;
00362   }
00363   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00364   for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
00365          limit = interesting_writers_.upper_bound(writerid);
00366        pos != limit;
00367        ) {
00368     if (pos->second.localid == readerid) {
00369       interesting_writers_.erase(pos++);
00370     } else {
00371       ++pos;
00372     }
00373   }
00374 }
00375 
00376 void
00377 RtpsUdpDataLink::pre_stop_i()
00378 {
00379   DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
00380   DataLink::pre_stop_i();
00381   OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00382   OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00383   {
00384     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00385 
00386     typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00387 
00388     RtpsWriterMap::iterator iter = writers_.begin();
00389     while (iter != writers_.end()) {
00390       RtpsWriter& writer = iter->second;
00391       if (!writer.to_deliver_.empty()) {
00392         iter_t iter = writer.to_deliver_.begin();
00393         while (iter != writer.to_deliver_.end()) {
00394           to_deliver.push_back(iter->second);
00395           writer.to_deliver_.erase(iter);
00396           iter = writer.to_deliver_.begin();
00397         }
00398       }
00399       if (!writer.elems_not_acked_.empty()) {
00400         OPENDDS_SET(SequenceNumber) sns_to_release;
00401         iter_t iter = writer.elems_not_acked_.begin();
00402         while (iter != writer.elems_not_acked_.end()) {
00403           to_drop.push_back(iter->second);
00404           sns_to_release.insert(iter->first);
00405           writer.elems_not_acked_.erase(iter);
00406           iter = writer.elems_not_acked_.begin();
00407         }
00408         OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00409         while (sns_it != sns_to_release.end()) {
00410           writer.send_buff_->release_acked(*sns_it);
00411           ++sns_it;
00412         }
00413       }
00414       RtpsWriterMap::iterator last = iter;
00415       ++iter;
00416       heartbeat_counts_.erase(last->first);
00417       writers_.erase(last);
00418     }
00419   }
00420   typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00421   tqe_iter deliver_it = to_deliver.begin();
00422   while (deliver_it != to_deliver.end()) {
00423     (*deliver_it)->data_delivered();
00424     ++deliver_it;
00425   }
00426   tqe_iter drop_it = to_drop.begin();
00427   while (drop_it != to_drop.end()) {
00428     (*drop_it)->data_dropped(true);
00429     ++drop_it;
00430   }
00431 }
00432 
00433 void
00434 RtpsUdpDataLink::send_i(TransportQueueElement* element, bool relink)
00435 {
00436   // Lock here to maintain the locking order:
00437   // RtpsUdpDataLink before RtpsUdpSendStrategy
00438   // which is required for resending due to nacks
00439   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00440   DataLink::send_i(element, relink);
00441 }
00442 
00443 void
00444 RtpsUdpDataLink::release_reservations_i(const RepoId& remote_id,
00445                                         const RepoId& local_id)
00446 {
00447   OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00448   OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00449   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00450   using std::pair;
00451   const GuidConverter conv(local_id);
00452   const EntityKind kind = conv.entityKind();
00453   if (kind == KIND_WRITER) {
00454     const RtpsWriterMap::iterator rw = writers_.find(local_id);
00455 
00456     if (rw != writers_.end()) {
00457       rw->second.remote_readers_.erase(remote_id);
00458 
00459       if (rw->second.remote_readers_.empty()) {
00460         RtpsWriter& writer = rw->second;
00461         typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00462 
00463         if (!writer.to_deliver_.empty()) {
00464           iter_t iter = writer.to_deliver_.begin();
00465           while (iter != writer.to_deliver_.end()) {
00466             to_deliver.push_back(iter->second);
00467             writer.to_deliver_.erase(iter);
00468             iter = writer.to_deliver_.begin();
00469           }
00470         }
00471         if (!writer.elems_not_acked_.empty()) {
00472           OPENDDS_SET(SequenceNumber) sns_to_release;
00473           iter_t iter = writer.elems_not_acked_.begin();
00474           while (iter != writer.elems_not_acked_.end()) {
00475             to_drop.push_back(iter->second);
00476             sns_to_release.insert(iter->first);
00477             writer.elems_not_acked_.erase(iter);
00478             iter = writer.elems_not_acked_.begin();
00479           }
00480           OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00481           while (sns_it != sns_to_release.end()) {
00482             writer.send_buff_->release_acked(*sns_it);
00483             ++sns_it;
00484           }
00485         }
00486         heartbeat_counts_.erase(rw->first);
00487         writers_.erase(rw);
00488       }
00489     }
00490 
00491   } else if (kind == KIND_READER) {
00492     const RtpsReaderMap::iterator rr = readers_.find(local_id);
00493 
00494     if (rr != readers_.end()) {
00495       rr->second.remote_writers_.erase(remote_id);
00496 
00497       for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00498              reader_index_.equal_range(remote_id);
00499            iters.first != iters.second;) {
00500         if (iters.first->second == rr) {
00501           reader_index_.erase(iters.first++);
00502         } else {
00503           ++iters.first;
00504         }
00505       }
00506 
00507       if (rr->second.remote_writers_.empty()) {
00508         readers_.erase(rr);
00509       }
00510     }
00511   }
00512   g.release();
00513   typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00514   tqe_iter deliver_it = to_deliver.begin();
00515   while (deliver_it != to_deliver.end()) {
00516     (*deliver_it)->data_delivered();
00517     ++deliver_it;
00518   }
00519   tqe_iter drop_it = to_drop.begin();
00520   while (drop_it != to_drop.end()) {
00521     (*drop_it)->data_dropped(true);
00522     ++drop_it;
00523   }
00524 }
00525 
00526 void
00527 RtpsUdpDataLink::stop_i()
00528 {
00529   nack_reply_.cancel();
00530   heartbeat_reply_.cancel();
00531   heartbeat_.disable();
00532   unicast_socket_.close();
00533   multicast_socket_.close();
00534 }
00535 
00536 
00537 // Implementing MultiSendBuffer nested class
00538 
00539 void
00540 RtpsUdpDataLink::MultiSendBuffer::retain_all(RepoId pub_id)
00541 {
00542   ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00543   const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id);
00544   if (wi != outer_->writers_.end() && !wi->second.send_buff_.is_nil()) {
00545     wi->second.send_buff_->retain_all(pub_id);
00546   }
00547 }
00548 
00549 void
00550 RtpsUdpDataLink::MultiSendBuffer::insert(SequenceNumber /*transport_seq*/,
00551                                          TransportSendStrategy::QueueType* q,
00552                                          ACE_Message_Block* chain)
00553 {
00554   // Called from TransportSendStrategy::send_packet().
00555   // RtpsUdpDataLink is already locked.
00556   const TransportQueueElement* const tqe = q->peek();
00557   const SequenceNumber seq = tqe->sequence();
00558   if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00559     return;
00560   }
00561 
00562   const RepoId pub_id = tqe->publication_id();
00563 
00564   const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id);
00565   if (wi == outer_->writers_.end()) {
00566     return; // this datawriter is not reliable
00567   }
00568 
00569   RcHandle<SingleSendBuffer>& send_buff = wi->second.send_buff_;
00570 
00571   if (send_buff.is_nil()) {
00572     send_buff = new SingleSendBuffer(SingleSendBuffer::UNLIMITED, 1 /*mspp*/);
00573 
00574     send_buff->bind(outer_->send_strategy_.in());
00575   }
00576 
00577   if (Transport_debug_level > 5) {
00578     const GuidConverter pub(pub_id);
00579     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert() - "
00580       "pub_id %C seq %q frag %d\n", OPENDDS_STRING(pub).c_str(), seq.getValue(),
00581       (int)tqe->is_fragment()));
00582   }
00583 
00584   if (tqe->is_fragment()) {
00585     const RtpsCustomizedElement* const rce =
00586       dynamic_cast<const RtpsCustomizedElement*>(tqe);
00587     if (rce) {
00588       send_buff->insert_fragment(seq, rce->last_fragment(), q, chain);
00589     } else if (Transport_debug_level) {
00590       const GuidConverter pub(pub_id);
00591       ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert()"
00592         " - ERROR: couldn't get fragment number for pub_id %C seq %q\n",
00593         OPENDDS_STRING(pub).c_str(), seq.getValue()));
00594     }
00595   } else {
00596     send_buff->insert(seq, q, chain);
00597   }
00598 }
00599 
00600 
00601 // Support for the send() data handling path
00602 namespace {
00603   ACE_Message_Block* submsgs_to_msgblock(const RTPS::SubmessageSeq& subm)
00604   {
00605     size_t size = 0, padding = 0;
00606     for (CORBA::ULong i = 0; i < subm.length(); ++i) {
00607       if ((size + padding) % 4) {
00608         padding += 4 - ((size + padding) % 4);
00609       }
00610       gen_find_size(subm[i], size, padding);
00611     }
00612 
00613     ACE_Message_Block* hdr = new ACE_Message_Block(size + padding);
00614 
00615     for (CORBA::ULong i = 0; i < subm.length(); ++i) {
00616       // byte swapping is handled in the operator<<() implementation
00617       Serializer ser(hdr, false, Serializer::ALIGN_CDR);
00618       ser << subm[i];
00619       const size_t len = hdr->length();
00620       if (len % 4) {
00621         hdr->wr_ptr(4 - (len % 4));
00622       }
00623     }
00624     return hdr;
00625   }
00626 }
00627 
00628 TransportQueueElement*
00629 RtpsUdpDataLink::customize_queue_element(TransportQueueElement* element)
00630 {
00631   const ACE_Message_Block* msg = element->msg();
00632   if (!msg) {
00633     return element;
00634   }
00635 
00636   const RepoId pub_id = element->publication_id();
00637 
00638   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, 0);
00639 
00640   RTPS::SubmessageSeq subm;
00641 
00642   const RtpsWriterMap::iterator rw = writers_.find(pub_id);
00643 
00644   bool gap_ok = true;
00645   DestToEntityMap gap_receivers;
00646   if (rw != writers_.end() && !rw->second.remote_readers_.empty()) {
00647     for (ReaderInfoMap::iterator ri = rw->second.remote_readers_.begin();
00648          ri != rw->second.remote_readers_.end(); ++ri) {
00649       RepoId tmp;
00650       std::memcpy(tmp.guidPrefix, ri->first.guidPrefix, sizeof(GuidPrefix_t));
00651       tmp.entityId = ENTITYID_UNKNOWN;
00652       gap_receivers[tmp].push_back(ri->first);
00653 
00654       if (ri->second.expecting_durable_data()) {
00655         // Can't add an in-line GAP if some Data Reader is expecting durable
00656         // data, the GAP could cause that Data Reader to ignore the durable
00657         // data.  The other readers will eventually learn about the GAP by
00658         // sending an ACKNACK and getting a GAP reply.
00659         gap_ok = false;
00660         break;
00661       }
00662     }
00663   }
00664 
00665   if (gap_ok) {
00666     add_gap_submsg(subm, *element, gap_receivers);
00667   }
00668 
00669   const SequenceNumber seq = element->sequence();
00670   if (rw != writers_.end() && seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00671     rw->second.expected_ = seq;
00672     ++rw->second.expected_;
00673   }
00674 
00675   TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
00676   TransportCustomizedElement* tce =
00677     dynamic_cast<TransportCustomizedElement*>(element);
00678   TransportSendControlElement* tsce =
00679     dynamic_cast<TransportSendControlElement*>(element);
00680 
00681   ACE_Message_Block* data = 0;
00682   bool durable = false;
00683 
00684   // Based on the type of 'element', find and duplicate the data payload
00685   // continuation block.
00686   if (tsce) {        // Control message
00687     if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
00688       data = msg->cont()->duplicate();
00689       // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00690       RtpsSampleHeader::populate_data_control_submessages(
00691                 subm, *tsce, requires_inline_qos(pub_id));
00692     } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
00693       end_historic_samples(rw, tsce->header(), msg->cont());
00694       element->data_delivered();
00695       return 0;
00696     } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
00697       send_heartbeats_manual(tsce);
00698       element->data_delivered();
00699       return 0;
00700     } else {
00701       element->data_dropped(true /*dropped_by_transport*/);
00702       return 0;
00703     }
00704 
00705   } else if (tse) {  // Basic data message
00706     // {DataSampleHeader} -> {Data Payload}
00707     data = msg->cont()->duplicate();
00708     const DataSampleElement* dsle = tse->sample();
00709     // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00710     RtpsSampleHeader::populate_data_sample_submessages(
00711               subm, *dsle, requires_inline_qos(pub_id));
00712     durable = dsle->get_header().historic_sample_;
00713 
00714   } else if (tce) {  // Customized data message
00715     // {DataSampleHeader} -> {Content Filtering GUIDs} -> {Data Payload}
00716     data = msg->cont()->cont()->duplicate();
00717     const DataSampleElement* dsle = tce->original_send_element()->sample();
00718     // Create RTPS Submessage(s) in place of the OpenDDS DataSampleHeader
00719     RtpsSampleHeader::populate_data_sample_submessages(
00720               subm, *dsle, requires_inline_qos(pub_id));
00721     durable = dsle->get_header().historic_sample_;
00722 
00723   } else {
00724     return element;
00725   }
00726 
00727   ACE_Message_Block* hdr = submsgs_to_msgblock(subm);
00728   hdr->cont(data);
00729   RtpsCustomizedElement* rtps =
00730     RtpsCustomizedElement::alloc(element, hdr,
00731       &rtps_customized_element_allocator_);
00732 
00733   // Handle durability resends
00734   if (durable && rw != writers_.end()) {
00735     const RepoId sub = element->subscription_id();
00736     if (sub != GUID_UNKNOWN) {
00737       ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(sub);
00738       if (ri != rw->second.remote_readers_.end()) {
00739         ri->second.durable_data_[rtps->sequence()] = rtps;
00740         ri->second.durable_timestamp_ = ACE_OS::gettimeofday();
00741         if (Transport_debug_level > 3) {
00742           const GuidConverter conv(pub_id), sub_conv(sub);
00743           ACE_DEBUG((LM_DEBUG,
00744             "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00745             "storing durable data for local %C remote %C\n",
00746             OPENDDS_STRING(conv).c_str(), OPENDDS_STRING(sub_conv).c_str()));
00747         }
00748         return 0;
00749       }
00750     }
00751   } else if (durable && (Transport_debug_level)) {
00752     const GuidConverter conv(pub_id);
00753     ACE_DEBUG((LM_ERROR,
00754       "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00755       "WARNING: no RtpsWriter to store durable data for local %C\n",
00756       OPENDDS_STRING(conv).c_str()));
00757   }
00758 
00759   return rtps;
00760 }
00761 
00762 void
00763 RtpsUdpDataLink::end_historic_samples(RtpsWriterMap::iterator writer,
00764                                       const DataSampleHeader& header,
00765                                       ACE_Message_Block* body)
00766 {
00767   // Set the ReaderInfo::durable_timestamp_ for the case where no
00768   // durable samples exist in the DataWriter.
00769   if (writer != writers_.end() && writer->second.durable_) {
00770     const ACE_Time_Value now = ACE_OS::gettimeofday();
00771     RepoId sub = GUID_UNKNOWN;
00772     if (body && header.message_length_ >= sizeof(sub)) {
00773       std::memcpy(&sub, body->rd_ptr(), header.message_length_);
00774     }
00775     typedef ReaderInfoMap::iterator iter_t;
00776     if (sub == GUID_UNKNOWN) {
00777       if (Transport_debug_level > 3) {
00778         const GuidConverter conv(writer->first);
00779         ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
00780                    "local %C all readers\n", OPENDDS_STRING(conv).c_str()));
00781       }
00782       for (iter_t iter = writer->second.remote_readers_.begin();
00783            iter != writer->second.remote_readers_.end(); ++iter) {
00784         if (iter->second.durable_) {
00785           iter->second.durable_timestamp_ = now;
00786         }
00787       }
00788     } else {
00789       iter_t iter = writer->second.remote_readers_.find(sub);
00790       if (iter != writer->second.remote_readers_.end()) {
00791         if (iter->second.durable_) {
00792           iter->second.durable_timestamp_ = now;
00793           if (Transport_debug_level > 3) {
00794             const GuidConverter conv(writer->first), sub_conv(sub);
00795             ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
00796                        " local %C remote %C\n", OPENDDS_STRING(conv).c_str(),
00797                        OPENDDS_STRING(sub_conv).c_str()));
00798           }
00799         }
00800       }
00801     }
00802   }
00803 }
00804 
00805 bool
00806 RtpsUdpDataLink::requires_inline_qos(const PublicationId& pub_id)
00807 {
00808   if (force_inline_qos_) {
00809     // Force true for testing purposes
00810     return true;
00811   } else {
00812     const GUIDSeq_var peers = peer_ids(pub_id);
00813     if (!peers.ptr()) {
00814       return false;
00815     }
00816     typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::iterator iter_t;
00817     for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00818       const iter_t iter = locators_.find(peers[i]);
00819       if (iter != locators_.end() && iter->second.requires_inline_qos_) {
00820         return true;
00821       }
00822     }
00823     return false;
00824   }
00825 }
00826 
00827 bool RtpsUdpDataLink::force_inline_qos_ = false;
00828 
00829 void
00830 RtpsUdpDataLink::add_gap_submsg(RTPS::SubmessageSeq& msg,
00831                                 const TransportQueueElement& tqe,
00832                                 const DestToEntityMap& dtem)
00833 {
00834   // These are the GAP submessages that we'll send directly in-line with the
00835   // DATA when we notice that the DataWriter has deliberately skipped seq #s.
00836   // There are other GAP submessages generated in response to reader ACKNACKS,
00837   // see send_nack_replies().
00838   using namespace OpenDDS::RTPS;
00839 
00840   const SequenceNumber seq = tqe.sequence();
00841   const RepoId pub = tqe.publication_id();
00842   if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || pub == GUID_UNKNOWN
00843       || tqe.subscription_id() != GUID_UNKNOWN) {
00844     return;
00845   }
00846 
00847   const RtpsWriterMap::iterator wi = writers_.find(pub);
00848   if (wi == writers_.end()) {
00849     return; // not a reliable writer, does not send GAPs
00850   }
00851 
00852   RtpsWriter& rw = wi->second;
00853 
00854   if (seq != rw.expected_) {
00855     SequenceNumber firstMissing = rw.expected_;
00856 
00857     // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
00858     // [gapStart, gapListBase) and those in the SNSet.
00859     const SequenceNumber_t gapStart = {firstMissing.getHigh(),
00860                                        firstMissing.getLow()},
00861                            gapListBase = {seq.getHigh(),
00862                                           seq.getLow()};
00863 
00864     // We are not going to enable any bits in the "bitmap" of the SNSet,
00865     // but the "numBits" and the bitmap.length must both be > 0.
00866     LongSeq8 bitmap;
00867     bitmap.length(1);
00868     bitmap[0] = 0;
00869 
00870     GapSubmessage gap = {
00871       {GAP, 1 /*FLAG_E*/, 0 /*length determined below*/},
00872       ENTITYID_UNKNOWN, // readerId: applies to all matched readers
00873       pub.entityId,
00874       gapStart,
00875       {gapListBase, 1, bitmap}
00876     };
00877 
00878     size_t size = 0, padding = 0;
00879     gen_find_size(gap, size, padding);
00880     gap.smHeader.submessageLength =
00881       static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
00882 
00883     if (!rw.durable_) {
00884       const CORBA::ULong i = msg.length();
00885       msg.length(i + 1);
00886       msg[i].gap_sm(gap);
00887     } else {
00888       InfoDestinationSubmessage idst = {
00889         {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ},
00890         {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
00891       };
00892       CORBA::ULong ml = msg.length();
00893 
00894       //Change the non-directed Gap into multiple directed gaps to prevent
00895       //delivering to currently undiscovered durable readers
00896       DestToEntityMap::const_iterator iter = dtem.begin();
00897       while (iter != dtem.end()) {
00898         std::memcpy(idst.guidPrefix, iter->first.guidPrefix, sizeof(GuidPrefix_t));
00899         msg.length(ml + 1);
00900         msg[ml++].info_dst_sm(idst);
00901 
00902         const OPENDDS_VECTOR(RepoId)& readers = iter->second;
00903         for (size_t i = 0; i < readers.size(); ++i) {
00904           gap.readerId = readers.at(i).entityId;
00905           msg.length(ml + 1);
00906           msg[ml++].gap_sm(gap);
00907         } //END iter over reader entity ids
00908         ++iter;
00909       } //END iter over reader GuidPrefix_t's
00910     }
00911   }
00912 }
00913 
00914 
00915 // DataReader's side of Reliability
00916 
00917 void
00918 RtpsUdpDataLink::received(const RTPS::DataSubmessage& data,
00919                           const GuidPrefix_t& src_prefix)
00920 {
00921   datareader_dispatch(data, src_prefix, &RtpsUdpDataLink::process_data_i);
00922 }
00923 
00924 bool
00925 RtpsUdpDataLink::process_data_i(const RTPS::DataSubmessage& data,
00926                                 const RepoId& src,
00927                                 RtpsReaderMap::value_type& rr)
00928 {
00929   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
00930   if (wi != rr.second.remote_writers_.end()) {
00931     WriterInfo& info = wi->second;
00932     SequenceNumber seq;
00933     seq.setValue(data.writerSN.high, data.writerSN.low);
00934     info.frags_.erase(seq);
00935     const RepoId& readerId = rr.first;
00936     if (info.recvd_.contains(seq)) {
00937       if (Transport_debug_level > 5) {
00938         GuidConverter writer(src);
00939         GuidConverter reader(readerId);
00940         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00941                              ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because ALREADY received\n"),
00942                              seq.getValue(),
00943                              OPENDDS_STRING(writer).c_str(),
00944                              OPENDDS_STRING(reader).c_str()));
00945       }
00946       recv_strategy_->withhold_data_from(readerId);
00947     } else if (info.recvd_.disjoint() ||
00948         (!info.recvd_.empty() && info.recvd_.cumulative_ack() != seq.previous())
00949         || (rr.second.durable_ && !info.recvd_.empty() && info.recvd_.low() > 1)
00950         || (rr.second.durable_ && info.recvd_.empty() && seq > 1)) {
00951       if (Transport_debug_level > 5) {
00952         GuidConverter writer(src);
00953         GuidConverter reader(readerId);
00954         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00955                              ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because can't receive yet\n"),
00956                              seq.getValue(),
00957                              OPENDDS_STRING(writer).c_str(),
00958                              OPENDDS_STRING(reader).c_str()));
00959       }
00960       const ReceivedDataSample* sample =
00961         recv_strategy_->withhold_data_from(readerId);
00962       info.held_.insert(std::make_pair(seq, *sample));
00963     } else {
00964       if (Transport_debug_level > 5) {
00965         GuidConverter writer(src);
00966         GuidConverter reader(readerId);
00967         ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00968                              ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
00969                              seq.getValue(),
00970                              OPENDDS_STRING(writer).c_str(),
00971                              OPENDDS_STRING(reader).c_str()));
00972       }
00973       recv_strategy_->do_not_withhold_data_from(readerId);
00974     }
00975     info.recvd_.insert(seq);
00976     deliver_held_data(readerId, info, rr.second.durable_);
00977   } else {
00978     if (Transport_debug_level > 5) {
00979       GuidConverter writer(src);
00980       GuidConverter reader(rr.first);
00981       SequenceNumber seq;
00982       seq.setValue(data.writerSN.high, data.writerSN.low);
00983       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00984                            ACE_TEXT(" data seq: %q from %C to %C OK to deliver (Writer not currently in Reader remote writer map)\n"),
00985                            seq.getValue(),
00986                            OPENDDS_STRING(writer).c_str(),
00987                            OPENDDS_STRING(reader).c_str()));
00988     }
00989     recv_strategy_->do_not_withhold_data_from(rr.first);
00990   }
00991   return false;
00992 }
00993 
00994 void
00995 RtpsUdpDataLink::deliver_held_data(const RepoId& readerId, WriterInfo& info,
00996                                    bool durable)
00997 {
00998   if (durable && (info.recvd_.empty() || info.recvd_.low() > 1)) return;
00999   const SequenceNumber ca = info.recvd_.cumulative_ack();
01000   typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter;
01001   const iter end = info.held_.upper_bound(ca);
01002   for (iter it = info.held_.begin(); it != end; /*increment in loop body*/) {
01003     if (Transport_debug_level > 5) {
01004       GuidConverter reader(readerId);
01005       ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::deliver_held_data -")
01006                            ACE_TEXT(" deliver sequence: %q to %C\n"),
01007                            it->second.header_.sequence_.getValue(),
01008                            OPENDDS_STRING(reader).c_str()));
01009     }
01010     data_received(it->second, readerId);
01011     info.held_.erase(it++);
01012   }
01013 }
01014 
01015 void
01016 RtpsUdpDataLink::received(const RTPS::GapSubmessage& gap,
01017                           const GuidPrefix_t& src_prefix)
01018 {
01019   datareader_dispatch(gap, src_prefix, &RtpsUdpDataLink::process_gap_i);
01020 }
01021 
01022 bool
01023 RtpsUdpDataLink::process_gap_i(const RTPS::GapSubmessage& gap,
01024                                const RepoId& src, RtpsReaderMap::value_type& rr)
01025 {
01026   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01027   if (wi != rr.second.remote_writers_.end()) {
01028     SequenceRange sr;
01029     sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
01030     SequenceNumber base;
01031     base.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
01032     sr.second = base.previous();
01033     if (sr.first <= sr.second) {
01034       if (Transport_debug_level > 5) {
01035         const GuidConverter conv(src);
01036         const GuidConverter rdr(rr.first);
01037         ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01038                   "Reader %C received GAP with range [%q, %q] from %C\n",
01039                   OPENDDS_STRING(rdr).c_str(),
01040                   sr.first.getValue(), sr.second.getValue(),
01041                   OPENDDS_STRING(conv).c_str()));
01042       }
01043       wi->second.recvd_.insert(sr);
01044     } else {
01045       const GuidConverter conv(src);
01046       VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01047                 "received GAP with invalid range [%q, %q] from %C\n",
01048                 sr.first.getValue(), sr.second.getValue(),
01049                 OPENDDS_STRING(conv).c_str()), 2);
01050     }
01051     wi->second.recvd_.insert(base, gap.gapList.numBits,
01052                              gap.gapList.bitmap.get_buffer());
01053     deliver_held_data(rr.first, wi->second, rr.second.durable_);
01054     //FUTURE: to support wait_for_acks(), notify DCPS layer of the GAP
01055   }
01056   return false;
01057 }
01058 
01059 void
01060 RtpsUdpDataLink::received(const RTPS::HeartBeatSubmessage& heartbeat,
01061                           const GuidPrefix_t& src_prefix)
01062 {
01063   RepoId src;
01064   std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01065   src.entityId = heartbeat.writerId;
01066 
01067   bool schedule_acknack = false;
01068   const ACE_Time_Value now = ACE_OS::gettimeofday();
01069   OPENDDS_VECTOR(InterestingRemote) callbacks;
01070 
01071   {
01072     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01073 
01074     // We received a heartbeat from a writer.
01075     // We should ACKNACK if the writer is interesting and there is no association.
01076 
01077     for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
01078            limit = interesting_writers_.upper_bound(src);
01079          pos != limit;
01080          ++pos) {
01081       const RepoId& writerid = src;
01082       const RepoId& readerid = pos->second.localid;
01083 
01084       RtpsReaderMap::const_iterator riter = readers_.find(readerid);
01085       if (riter == readers_.end()) {
01086         // Reader has no associations.
01087         interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01088       } else if (riter->second.remote_writers_.find(writerid) == riter->second.remote_writers_.end()) {
01089         // Reader is not associated with this writer.
01090         interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01091       }
01092       pos->second.last_activity = now;
01093       if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01094         callbacks.push_back(pos->second);
01095         pos->second.status = InterestingRemote::EXISTS;
01096       }
01097     }
01098 
01099     schedule_acknack = !interesting_ack_nacks_.empty();
01100   }
01101 
01102   for (size_t i = 0; i < callbacks.size(); ++i) {
01103     callbacks[i].listener->writer_exists(src, callbacks[i].localid);
01104   }
01105 
01106   if (schedule_acknack) {
01107     heartbeat_reply_.schedule();
01108   }
01109 
01110   datareader_dispatch(heartbeat, src_prefix,
01111                       &RtpsUdpDataLink::process_heartbeat_i);
01112 }
01113 
01114 bool
01115 RtpsUdpDataLink::process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
01116                                      const RepoId& src,
01117                                      RtpsReaderMap::value_type& rr)
01118 {
01119   const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01120   if (wi == rr.second.remote_writers_.end()) {
01121     // we may not be associated yet, even if the writer thinks we are
01122     return false;
01123   }
01124 
01125   WriterInfo& info = wi->second;
01126 
01127   if (heartbeat.count.value <= info.heartbeat_recvd_count_) {
01128     return false;
01129   }
01130   info.heartbeat_recvd_count_ = heartbeat.count.value;
01131 
01132   SequenceNumber& first = info.hb_range_.first;
01133   first.setValue(heartbeat.firstSN.high, heartbeat.firstSN.low);
01134   SequenceNumber& last = info.hb_range_.second;
01135   last.setValue(heartbeat.lastSN.high, heartbeat.lastSN.low);
01136   static const SequenceNumber starting, zero = SequenceNumber::ZERO();
01137 
01138   DisjointSequence& recvd = info.recvd_;
01139   if (!rr.second.durable_ && info.initial_hb_) {
01140     if (last.getValue() < starting.getValue()) {
01141       // this is an invalid heartbeat -- last must be positive
01142       return false;
01143     }
01144     // For the non-durable reader, the first received HB or DATA establishes
01145     // a baseline of the lowest sequence number we'd ever need to NACK.
01146     if (recvd.empty() || recvd.low() >= last) {
01147       recvd.insert(SequenceRange(zero,
01148                                  (last > starting) ? last.previous() : zero));
01149     } else {
01150       recvd.insert(SequenceRange(zero, recvd.low()));
01151     }
01152   } else if (!recvd.empty()) {
01153     // All sequence numbers below 'first' should not be NACKed.
01154     // The value of 'first' may not decrease with subsequent HBs.
01155     recvd.insert(SequenceRange(zero,
01156                                (first > starting) ? first.previous() : zero));
01157   }
01158 
01159   deliver_held_data(rr.first, info, rr.second.durable_);
01160 
01161   //FUTURE: to support wait_for_acks(), notify DCPS layer of the sequence
01162   //        numbers we no longer expect to receive due to HEARTBEAT
01163 
01164   info.initial_hb_ = false;
01165 
01166   const bool final = heartbeat.smHeader.flags & 2 /* FLAG_F */,
01167     liveliness = heartbeat.smHeader.flags & 4 /* FLAG_L */;
01168 
01169   if (!final || (!liveliness && (info.should_nack() ||
01170       rr.second.nack_durable(info) ||
01171       recv_strategy_->has_fragments(info.hb_range_, wi->first)))) {
01172     info.ack_pending_ = true;
01173     return true; // timer will invoke send_heartbeat_replies()
01174   }
01175 
01176   //FUTURE: support assertion of liveliness for MANUAL_BY_TOPIC
01177   return false;
01178 }
01179 
01180 bool
01181 RtpsUdpDataLink::WriterInfo::should_nack() const
01182 {
01183   if (recvd_.disjoint() && recvd_.cumulative_ack() < hb_range_.second) {
01184     return true;
01185   }
01186   if (!recvd_.empty()) {
01187     return recvd_.high() < hb_range_.second;
01188   }
01189   return false;
01190 }
01191 
01192 bool
01193 RtpsUdpDataLink::RtpsReader::nack_durable(const WriterInfo& info)
01194 {
01195   return durable_ && (info.recvd_.empty() ||
01196                       info.recvd_.low() > info.hb_range_.first);
01197 }
01198 
01199 void
01200 RtpsUdpDataLink::send_ack_nacks(RtpsReaderMap::iterator rr, bool finalFlag)
01201 {
01202   using namespace OpenDDS::RTPS;
01203 
01204   WriterInfoMap& writers = rr->second.remote_writers_;
01205   for (WriterInfoMap::iterator wi = writers.begin(); wi != writers.end();
01206        ++wi) {
01207 
01208     // if we have some negative acknowledgments, we'll ask for a reply
01209     DisjointSequence& recvd = wi->second.recvd_;
01210     const bool nack = wi->second.should_nack() ||
01211       rr->second.nack_durable(wi->second);
01212     bool final = finalFlag || !nack;
01213 
01214     if (wi->second.ack_pending_ || nack || finalFlag) {
01215       const bool prev_ack_pending = wi->second.ack_pending_;
01216       wi->second.ack_pending_ = false;
01217 
01218       SequenceNumber ack;
01219       CORBA::ULong num_bits = 1;
01220       LongSeq8 bitmap;
01221       bitmap.length(1);
01222       bitmap[0] = 0;
01223 
01224       const SequenceNumber& hb_low = wi->second.hb_range_.first;
01225       const SequenceNumber& hb_high = wi->second.hb_range_.second;
01226       const SequenceNumber::Value hb_low_val = hb_low.getValue(),
01227         hb_high_val = hb_high.getValue();
01228 
01229       if (recvd.empty()) {
01230         // Nack the entire heartbeat range.  Only reached when durable.
01231         ack = hb_low;
01232         bitmap.length(bitmap_num_longs(ack, hb_high));
01233         const CORBA::ULong idx = (hb_high_val > hb_low_val + 255)
01234           ? 255
01235           : CORBA::ULong(hb_high_val - hb_low_val);
01236         DisjointSequence::fill_bitmap_range(0, idx,
01237                                             bitmap.get_buffer(),
01238                                             bitmap.length(), num_bits);
01239       } else if (((prev_ack_pending && !nack) || rr->second.nack_durable(wi->second)) && recvd.low() > hb_low) {
01240         // Nack the range between the heartbeat low and the recvd low.
01241         ack = hb_low;
01242         const SequenceNumber& rec_low = recvd.low();
01243         const SequenceNumber::Value rec_low_val = rec_low.getValue();
01244         bitmap.length(bitmap_num_longs(ack, rec_low));
01245         const CORBA::ULong idx = (rec_low_val > hb_low_val + 255)
01246           ? 255
01247           : CORBA::ULong(rec_low_val - hb_low_val);
01248         DisjointSequence::fill_bitmap_range(0, idx,
01249                                             bitmap.get_buffer(),
01250                                             bitmap.length(), num_bits);
01251 
01252       } else {
01253         ack = ++SequenceNumber(recvd.cumulative_ack());
01254         if (recvd.low().getValue() > 1) {
01255           // since the "ack" really is cumulative, we need to make
01256           // sure that a lower discontinuity is not possible later
01257           recvd.insert(SequenceRange(SequenceNumber::ZERO(), recvd.low()));
01258         }
01259 
01260         if (recvd.disjoint()) {
01261           bitmap.length(bitmap_num_longs(ack, recvd.last_ack().previous()));
01262           recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
01263                           num_bits, true);
01264         }
01265       }
01266 
01267       const SequenceNumber::Value ack_val = ack.getValue();
01268 
01269       if (!recvd.empty() && hb_high > recvd.high()) {
01270         const SequenceNumber eff_high =
01271           (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
01272         const SequenceNumber::Value eff_high_val = eff_high.getValue();
01273         // Nack the range between the received high and the effective high.
01274         const CORBA::ULong old_len = bitmap.length(),
01275           new_len = bitmap_num_longs(ack, eff_high);
01276         if (new_len > old_len) {
01277           bitmap.length(new_len);
01278           for (CORBA::ULong i = old_len; i < new_len; ++i) {
01279             bitmap[i] = 0;
01280           }
01281         }
01282         const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
01283           idx_recv_high = recvd.disjoint() ?
01284           CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
01285         DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
01286                                             bitmap.get_buffer(), new_len,
01287                                             num_bits);
01288       }
01289 
01290       // If the receive strategy is holding any fragments, those should
01291       // not be "nacked" in the ACKNACK reply.  They will be accounted for
01292       // in the NACK_FRAG(s) instead.
01293       bool frags_modified =
01294         recv_strategy_->remove_frags_from_bitmap(bitmap.get_buffer(),
01295                                                  num_bits, ack, wi->first);
01296       if (frags_modified && !final) { // change to final if bitmap is empty
01297         final = true;
01298         for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
01299           if ((i + 1) * 32 <= num_bits) {
01300             if (bitmap[i]) {
01301               final = false;
01302               break;
01303             }
01304           } else {
01305             if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
01306               final = false;
01307               break;
01308             }
01309           }
01310         }
01311       }
01312 
01313       AckNackSubmessage acknack = {
01314         {ACKNACK,
01315          CORBA::Octet(1 /*FLAG_E*/ | (final ? 2 /*FLAG_F*/ : 0)),
01316          0 /*length*/},
01317         rr->first.entityId,
01318         wi->first.entityId,
01319         { // SequenceNumberSet: acking bitmapBase - 1
01320           {ack.getHigh(), ack.getLow()},
01321           num_bits, bitmap
01322         },
01323         {++wi->second.acknack_count_}
01324       };
01325 
01326       size_t size = 0, padding = 0;
01327       gen_find_size(acknack, size, padding);
01328       acknack.smHeader.submessageLength =
01329         static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01330       InfoDestinationSubmessage info_dst = {
01331         {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ},
01332         {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01333       };
01334       gen_find_size(info_dst, size, padding);
01335 
01336       OPENDDS_VECTOR(NackFragSubmessage) nack_frags;
01337       size += generate_nack_frags(nack_frags, wi->second, wi->first);
01338 
01339       ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators?
01340       // byte swapping is handled in the operator<<() implementation
01341       Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01342       std::memcpy(info_dst.guidPrefix, wi->first.guidPrefix,
01343                   sizeof(GuidPrefix_t));
01344       ser << info_dst;
01345       // Interoperability note: we used to insert INFO_REPLY submessage here, but
01346       // testing indicated that other DDS implementations didn't accept it.
01347       ser << acknack;
01348       for (size_t i = 0; i < nack_frags.size(); ++i) {
01349         nack_frags[i].readerId = rr->first.entityId;
01350         nack_frags[i].writerId = wi->first.entityId;
01351         ser << nack_frags[i]; // always 4-byte aligned
01352       }
01353 
01354       if (!locators_.count(wi->first)) {
01355         if (Transport_debug_level) {
01356           const GuidConverter conv(wi->first);
01357           ACE_DEBUG((LM_ERROR,
01358                      "(%P|%t) RtpsUdpDataLink::send_heartbeat_replies() - "
01359                      "no locator for remote %C\n", OPENDDS_STRING(conv).c_str()));
01360         }
01361       } else {
01362         send_strategy_->send_rtps_control(mb_acknack,
01363                                           locators_[wi->first].addr_);
01364       }
01365     }
01366   }
01367 }
01368 
01369 void
01370 RtpsUdpDataLink::send_heartbeat_replies() // from DR to DW
01371 {
01372   using namespace OpenDDS::RTPS;
01373   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01374 
01375   for (InterestingAckNackSetType::const_iterator pos = interesting_ack_nacks_.begin(),
01376          limit = interesting_ack_nacks_.end();
01377        pos != limit;
01378        ++pos) {
01379 
01380     SequenceNumber ack;
01381     LongSeq8 bitmap;
01382     bitmap.length(0);
01383 
01384     AckNackSubmessage acknack = {
01385       {ACKNACK,
01386        CORBA::Octet(1 /*FLAG_E*/ | 2 /*FLAG_F*/),
01387        0 /*length*/},
01388       pos->readerid.entityId,
01389       pos->writerid.entityId,
01390       { // SequenceNumberSet: acking bitmapBase - 1
01391         {ack.getHigh(), ack.getLow()},
01392         0 /* num_bits */, bitmap
01393       },
01394       {0 /* acknack count */}
01395     };
01396 
01397     size_t size = 0, padding = 0;
01398     gen_find_size(acknack, size, padding);
01399     acknack.smHeader.submessageLength =
01400       static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01401     InfoDestinationSubmessage info_dst = {
01402       {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ},
01403       {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01404     };
01405     gen_find_size(info_dst, size, padding);
01406 
01407     ACE_Message_Block mb_acknack(size + padding); //FUTURE: allocators?
01408     // byte swapping is handled in the operator<<() implementation
01409     Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01410     std::memcpy(info_dst.guidPrefix, pos->writerid.guidPrefix,
01411                 sizeof(GuidPrefix_t));
01412     ser << info_dst;
01413     // Interoperability note: we used to insert INFO_REPLY submessage here, but
01414     // testing indicated that other DDS implementations didn't accept it.
01415     ser << acknack;
01416 
01417     send_strategy_->send_rtps_control(mb_acknack, pos->writer_address);
01418   }
01419   interesting_ack_nacks_.clear();
01420 
01421   for (RtpsReaderMap::iterator rr = readers_.begin(); rr != readers_.end();
01422        ++rr) {
01423     send_ack_nacks (rr);
01424   }
01425 }
01426 
01427 size_t
01428 RtpsUdpDataLink::generate_nack_frags(OPENDDS_VECTOR(RTPS::NackFragSubmessage)& nf,
01429                                      WriterInfo& wi, const RepoId& pub_id)
01430 {
01431   typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
01432   typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
01433   RtpsUdpReceiveStrategy::FragmentInfo frag_info;
01434 
01435   // Populate frag_info with two possible sources of NackFrags:
01436   // 1. sequence #s in the reception gaps that we have partially received
01437   OPENDDS_VECTOR(SequenceRange) missing = wi.recvd_.missing_sequence_ranges();
01438   for (size_t i = 0; i < missing.size(); ++i) {
01439     recv_strategy_->has_fragments(missing[i], pub_id, &frag_info);
01440   }
01441   // 1b. larger than the last received seq# but less than the heartbeat.lastSN
01442   if (!wi.recvd_.empty()) {
01443     const SequenceRange range(wi.recvd_.high(), wi.hb_range_.second);
01444     recv_strategy_->has_fragments(range, pub_id, &frag_info);
01445   }
01446   for (size_t i = 0; i < frag_info.size(); ++i) {
01447     // If we've received a HeartbeatFrag, we know the last (available) frag #
01448     const iter_t heartbeat_frag = wi.frags_.find(frag_info[i].first);
01449     if (heartbeat_frag != wi.frags_.end()) {
01450       extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value);
01451     }
01452   }
01453 
01454   // 2. sequence #s outside the recvd_ gaps for which we have a HeartbeatFrag
01455   const iter_t low = wi.frags_.lower_bound(wi.recvd_.cumulative_ack()),
01456               high = wi.frags_.upper_bound(wi.recvd_.last_ack()),
01457                end = wi.frags_.end();
01458   for (iter_t iter = wi.frags_.begin(); iter != end; ++iter) {
01459     if (iter == low) {
01460       // skip over the range covered by step #1 above
01461       if (high == end) {
01462         break;
01463       }
01464       iter = high;
01465     }
01466 
01467     const SequenceRange range(iter->first, iter->first);
01468     if (recv_strategy_->has_fragments(range, pub_id, &frag_info)) {
01469       extend_bitmap_range(frag_info.back().second, iter->second.value);
01470     } else {
01471       // it was not in the recv strategy, so the entire range is "missing"
01472       frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
01473       RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
01474       fnSet.bitmapBase.value = 1;
01475       fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
01476       fnSet.bitmap.length((fnSet.numBits + 31) / 32);
01477       for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
01478         fnSet.bitmap[i] = 0xFFFFFFFF;
01479       }
01480     }
01481   }
01482 
01483   if (frag_info.empty()) {
01484     return 0;
01485   }
01486 
01487   const RTPS::NackFragSubmessage nackfrag_prototype = {
01488     {RTPS::NACK_FRAG, 1 /*FLAG_E*/, 0 /* length set below */},
01489     ENTITYID_UNKNOWN, // readerId will be filled-in by send_heartbeat_replies()
01490     ENTITYID_UNKNOWN, // writerId will be filled-in by send_heartbeat_replies()
01491     {0, 0}, // writerSN set below
01492     RTPS::FragmentNumberSet(), // fragmentNumberState set below
01493     {0} // count set below
01494   };
01495 
01496   size_t size = 0, padding = 0;
01497   for (size_t i = 0; i < frag_info.size(); ++i) {
01498     nf.push_back(nackfrag_prototype);
01499     RTPS::NackFragSubmessage& nackfrag = nf.back();
01500     nackfrag.writerSN.low = frag_info[i].first.getLow();
01501     nackfrag.writerSN.high = frag_info[i].first.getHigh();
01502     nackfrag.fragmentNumberState = frag_info[i].second;
01503     nackfrag.count.value = ++wi.nackfrag_count_;
01504     const size_t before_size = size;
01505     gen_find_size(nackfrag, size, padding);
01506     nackfrag.smHeader.submessageLength =
01507       static_cast<CORBA::UShort>(size - before_size) - RTPS::SMHDR_SZ;
01508   }
01509   return size;
01510 }
01511 
01512 void
01513 RtpsUdpDataLink::extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
01514                                      CORBA::ULong extent)
01515 {
01516   if (extent < fnSet.bitmapBase.value) {
01517     return; // can't extend to some number under the base
01518   }
01519   // calculate the index to the extent to determine the new_num_bits
01520   const CORBA::ULong new_num_bits = std::min(CORBA::ULong(255),
01521                                              extent - fnSet.bitmapBase.value + 1),
01522                      len = (new_num_bits + 31) / 32;
01523   if (new_num_bits < fnSet.numBits) {
01524     return; // bitmap already extends past "extent"
01525   }
01526   fnSet.bitmap.length(len);
01527   // We are missing from one past old bitmap end to the new end
01528   DisjointSequence::fill_bitmap_range(fnSet.numBits + 1, new_num_bits,
01529                                       fnSet.bitmap.get_buffer(), len,
01530                                       fnSet.numBits);
01531 }
01532 
01533 void
01534 RtpsUdpDataLink::received(const RTPS::HeartBeatFragSubmessage& hb_frag,
01535                           const GuidPrefix_t& src_prefix)
01536 {
01537   datareader_dispatch(hb_frag, src_prefix, &RtpsUdpDataLink::process_hb_frag_i);
01538 }
01539 
01540 bool
01541 RtpsUdpDataLink::process_hb_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
01542                                    const RepoId& src,
01543                                    RtpsReaderMap::value_type& rr)
01544 {
01545   WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01546   if (wi == rr.second.remote_writers_.end()) {
01547     // we may not be associated yet, even if the writer thinks we are
01548     return false;
01549   }
01550 
01551   if (hb_frag.count.value <= wi->second.hb_frag_recvd_count_) {
01552     return false;
01553   }
01554 
01555   wi->second.hb_frag_recvd_count_ = hb_frag.count.value;
01556 
01557   SequenceNumber seq;
01558   seq.setValue(hb_frag.writerSN.high, hb_frag.writerSN.low);
01559 
01560   // If seq is outside the heartbeat range or we haven't completely received
01561   // it yet, send a NackFrag along with the AckNack.  The heartbeat range needs
01562   // to be checked first because recvd_ contains the numbers below the
01563   // heartbeat range (so that we don't NACK those).
01564   if (seq < wi->second.hb_range_.first || seq > wi->second.hb_range_.second
01565       || !wi->second.recvd_.contains(seq)) {
01566     wi->second.frags_[seq] = hb_frag.lastFragmentNum;
01567     wi->second.ack_pending_ = true;
01568     return true; // timer will invoke send_heartbeat_replies()
01569   }
01570   return false;
01571 }
01572 
01573 
01574 // DataWriter's side of Reliability
01575 
01576 void
01577 RtpsUdpDataLink::received(const RTPS::AckNackSubmessage& acknack,
01578                           const GuidPrefix_t& src_prefix)
01579 {
01580   // local side is DW
01581   RepoId local;
01582   std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01583   local.entityId = acknack.writerId; // can't be ENTITYID_UNKNOWN
01584 
01585   RepoId remote;
01586   std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01587   remote.entityId = acknack.readerId;
01588 
01589   const ACE_Time_Value now = ACE_OS::gettimeofday();
01590   OPENDDS_VECTOR(DiscoveryListener*) callbacks;
01591 
01592   {
01593     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01594     for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
01595            limit = interesting_readers_.upper_bound(remote);
01596          pos != limit;
01597          ++pos) {
01598       pos->second.last_activity = now;
01599       // Ensure the acknack was for the writer.
01600       if (local == pos->second.localid) {
01601         if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01602           callbacks.push_back(pos->second.listener);
01603           pos->second.status = InterestingRemote::EXISTS;
01604         }
01605       }
01606     }
01607   }
01608 
01609   for (size_t i = 0; i < callbacks.size(); ++i) {
01610     callbacks[i]->reader_exists(remote, local);
01611   }
01612 
01613   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01614   const RtpsWriterMap::iterator rw = writers_.find(local);
01615   if (rw == writers_.end()) {
01616     if (Transport_debug_level > 5) {
01617       GuidConverter local_conv(local);
01618       ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01619         "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01620     }
01621     return;
01622   }
01623 
01624   if (Transport_debug_level > 5) {
01625     GuidConverter local_conv(local), remote_conv(remote);
01626     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01627       "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01628       OPENDDS_STRING(remote_conv).c_str()));
01629   }
01630 
01631   const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01632   if (ri == rw->second.remote_readers_.end()) {
01633     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01634       "WARNING ReaderInfo not found\n"));
01635     return;
01636   }
01637 
01638   if (acknack.count.value <= ri->second.acknack_recvd_count_) {
01639     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01640       "WARNING Count indicates duplicate, dropping\n"));
01641     return;
01642   }
01643 
01644   ri->second.acknack_recvd_count_ = acknack.count.value;
01645 
01646   if (!ri->second.handshake_done_) {
01647     ri->second.handshake_done_ = true;
01648     invoke_on_start_callbacks(true);
01649   }
01650 
01651   OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
01652   const bool final = acknack.smHeader.flags & 2 /* FLAG_F */;
01653 
01654   if (!ri->second.durable_data_.empty()) {
01655     if (Transport_debug_level > 5) {
01656       const GuidConverter local_conv(local), remote_conv(remote);
01657       ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01658                  "local %C has durable for remote %C\n",
01659                  OPENDDS_STRING(local_conv).c_str(),
01660                  OPENDDS_STRING(remote_conv).c_str()));
01661     }
01662     SequenceNumber ack;
01663     ack.setValue(acknack.readerSNState.bitmapBase.high,
01664                  acknack.readerSNState.bitmapBase.low);
01665     const SequenceNumber& dd_last = ri->second.durable_data_.rbegin()->first;
01666     if (ack > dd_last) {
01667       // Reader acknowledges durable data, we no longer need to store it
01668       ri->second.durable_data_.swap(pendingCallbacks);
01669       if (Transport_debug_level > 5) {
01670         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01671                    "durable data acked\n"));
01672       }
01673     } else {
01674       DisjointSequence requests;
01675       if (!requests.insert(ack, acknack.readerSNState.numBits,
01676                            acknack.readerSNState.bitmap.get_buffer())
01677           && !final && ack == rw->second.heartbeat_high(ri->second)) {
01678         // This is a non-final AckNack with no bits in the bitmap.
01679         // Attempt to reply to a request for the "base" value which
01680         // is neither Acked nor Nacked, only when it's the HB high.
01681         if (ri->second.durable_data_.count(ack)) requests.insert(ack);
01682       }
01683       // Attempt to reply to nacks for durable data
01684       bool sent_some = false;
01685       typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01686       iter_t it = ri->second.durable_data_.begin();
01687       const OPENDDS_VECTOR(SequenceRange) psr = requests.present_sequence_ranges();
01688       SequenceNumber lastSent = SequenceNumber::ZERO();
01689       if (!requests.empty()) {
01690         lastSent = requests.low().previous();
01691       }
01692       DisjointSequence gaps;
01693       for (size_t i = 0; i < psr.size(); ++i) {
01694         for (; it != ri->second.durable_data_.end()
01695              && it->first < psr[i].first; ++it) ; // empty for-loop
01696         for (; it != ri->second.durable_data_.end()
01697              && it->first <= psr[i].second; ++it) {
01698           if (Transport_debug_level > 5) {
01699             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01700                        "durable resend %d\n", int(it->first.getValue())));
01701           }
01702           durability_resend(it->second);
01703           //FUTURE: combine multiple resends into one RTPS Message?
01704           sent_some = true;
01705           if (it->first > lastSent + 1) {
01706             gaps.insert(SequenceRange(lastSent + 1, it->first.previous()));
01707           }
01708           lastSent = it->first;
01709         }
01710         if (sent_some && lastSent < psr[i].second && psr[i].second < dd_last) {
01711           gaps.insert(SequenceRange(lastSent + 1, psr[i].second));
01712         }
01713       }
01714       if (!gaps.empty()) {
01715         if (Transport_debug_level > 5) {
01716           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01717                      "sending durability gaps: "));
01718           gaps.dump();
01719         }
01720         send_durability_gaps(local, remote, gaps);
01721       }
01722       if (sent_some) {
01723         return;
01724       }
01725       const SequenceNumber& dd_first = ri->second.durable_data_.begin()->first;
01726       if (!requests.empty() && requests.high() < dd_first) {
01727         // All nacks were below the start of the durable data.
01728         if (Transport_debug_level > 5) {
01729           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01730                      "sending durability gaps for all requests: "));
01731           requests.dump();
01732         }
01733         send_durability_gaps(local, remote, requests);
01734         return;
01735       }
01736       if (!requests.empty() && requests.low() < dd_first) {
01737         // Lowest nack was below the start of the durable data.
01738         for (size_t i = 0; i < psr.size(); ++i) {
01739           if (psr[i].first > dd_first) {
01740             break;
01741           }
01742           gaps.insert(SequenceRange(psr[i].first,
01743                                     std::min(psr[i].second, dd_first)));
01744         }
01745         if (Transport_debug_level > 5) {
01746           ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01747                      "sending durability gaps for some requests: "));
01748           gaps.dump();
01749         }
01750         send_durability_gaps(local, remote, gaps);
01751         return;
01752       }
01753     }
01754   }
01755   SequenceNumber ack;
01756   ack.setValue(acknack.readerSNState.bitmapBase.high,
01757                acknack.readerSNState.bitmapBase.low);
01758   if (ack != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
01759       && ack != SequenceNumber::ZERO()) {
01760     ri->second.cur_cumulative_ack_ = ack;
01761   }
01762   // If this ACKNACK was final, the DR doesn't expect a reply, and therefore
01763   // we don't need to do anything further.
01764   if (!final) {
01765     ri->second.requested_changes_.push_back(acknack.readerSNState);
01766   }
01767   process_acked_by_all_i(g, local);
01768   g.release();
01769   if (!final) {
01770     nack_reply_.schedule(); // timer will invoke send_nack_replies()
01771   }
01772   typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01773   for (iter_t it = pendingCallbacks.begin();
01774        it != pendingCallbacks.end(); ++it) {
01775     it->second->data_delivered();
01776   }
01777 }
01778 
01779 void
01780 RtpsUdpDataLink::received(const RTPS::NackFragSubmessage& nackfrag,
01781                           const GuidPrefix_t& src_prefix)
01782 {
01783   // local side is DW
01784   RepoId local;
01785   std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01786   local.entityId = nackfrag.writerId; // can't be ENTITYID_UNKNOWN
01787 
01788   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01789   const RtpsWriterMap::iterator rw = writers_.find(local);
01790   if (rw == writers_.end()) {
01791     if (Transport_debug_level > 5) {
01792       GuidConverter local_conv(local);
01793       ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01794         "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01795     }
01796     return;
01797   }
01798 
01799   RepoId remote;
01800   std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01801   remote.entityId = nackfrag.readerId;
01802 
01803   if (Transport_debug_level > 5) {
01804     GuidConverter local_conv(local), remote_conv(remote);
01805     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01806       "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01807       OPENDDS_STRING(remote_conv).c_str()));
01808   }
01809 
01810   const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01811   if (ri == rw->second.remote_readers_.end()) {
01812     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01813       "WARNING ReaderInfo not found\n"));
01814     return;
01815   }
01816 
01817   if (nackfrag.count.value <= ri->second.nackfrag_recvd_count_) {
01818     VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01819       "WARNING Count indicates duplicate, dropping\n"));
01820     return;
01821   }
01822 
01823   ri->second.nackfrag_recvd_count_ = nackfrag.count.value;
01824 
01825   SequenceNumber seq;
01826   seq.setValue(nackfrag.writerSN.high, nackfrag.writerSN.low);
01827   ri->second.requested_frags_[seq] = nackfrag.fragmentNumberState;
01828   g.release();
01829   nack_reply_.schedule(); // timer will invoke send_nack_replies()
01830 }
01831 
01832 void
01833 RtpsUdpDataLink::send_nack_replies()
01834 {
01835   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01836   // Reply from local DW to remote DR: GAP or DATA
01837   using namespace OpenDDS::RTPS;
01838   typedef RtpsWriterMap::iterator rw_iter;
01839   for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
01840 
01841     // consolidate requests from N readers
01842     OPENDDS_SET(ACE_INET_Addr) recipients;
01843     DisjointSequence requests;
01844     RtpsWriter& writer = rw->second;
01845 
01846     //track if any messages have been fully acked by all readers
01847     SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
01848 
01849     typedef ReaderInfoMap::iterator ri_iter;
01850     const ri_iter end = writer.remote_readers_.end();
01851     for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01852 
01853       if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
01854         all_readers_ack = ri->second.cur_cumulative_ack_;
01855       }
01856 
01857       for (size_t i = 0; i < ri->second.requested_changes_.size(); ++i) {
01858         const SequenceNumberSet& sn_state = ri->second.requested_changes_[i];
01859         SequenceNumber base;
01860         base.setValue(sn_state.bitmapBase.high, sn_state.bitmapBase.low);
01861         if (sn_state.numBits == 1 && !(sn_state.bitmap[0] & 1)
01862             && base == writer.heartbeat_high(ri->second)) {
01863           // Since there is an entry in requested_changes_, the DR must have
01864           // sent a non-final AckNack.  If the base value is the high end of
01865           // the heartbeat range, treat it as a request for that seq#.
01866           if (!writer.send_buff_.is_nil() && writer.send_buff_->contains(base)) {
01867             requests.insert(base);
01868           }
01869         } else {
01870           requests.insert(base, sn_state.numBits, sn_state.bitmap.get_buffer());
01871         }
01872       }
01873 
01874       if (ri->second.requested_changes_.size()) {
01875         if (locators_.count(ri->first)) {
01876           recipients.insert(locators_[ri->first].addr_);
01877           if (Transport_debug_level > 5) {
01878             const GuidConverter local_conv(rw->first), remote_conv(ri->first);
01879             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01880                        "local %C remote %C requested resend\n",
01881                        OPENDDS_STRING(local_conv).c_str(),
01882                        OPENDDS_STRING(remote_conv).c_str()));
01883           }
01884         }
01885         ri->second.requested_changes_.clear();
01886       }
01887     }
01888 
01889     DisjointSequence gaps;
01890     if (!requests.empty()) {
01891       if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
01892         gaps = requests;
01893       } else {
01894         OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
01895         SingleSendBuffer& sb = *writer.send_buff_;
01896         ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
01897         const RtpsUdpSendStrategy::OverrideToken ot =
01898           send_strategy_->override_destinations(recipients);
01899         for (size_t i = 0; i < ranges.size(); ++i) {
01900           if (Transport_debug_level > 5) {
01901             ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01902                        "resend data %d-%d\n", int(ranges[i].first.getValue()),
01903                        int(ranges[i].second.getValue())));
01904           }
01905           sb.resend_i(ranges[i], &gaps);
01906         }
01907       }
01908     }
01909 
01910     send_nackfrag_replies(writer, gaps, recipients);
01911 
01912     if (!gaps.empty()) {
01913       if (Transport_debug_level > 5) {
01914         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01915                    "GAPs:"));
01916         gaps.dump();
01917       }
01918       ACE_Message_Block* mb_gap =
01919         marshal_gaps(rw->first, GUID_UNKNOWN, gaps, writer.durable_);
01920       if (mb_gap) {
01921         send_strategy_->send_rtps_control(*mb_gap, recipients);
01922         mb_gap->release();
01923       }
01924     }
01925     if (all_readers_ack == SequenceNumber::MAX_VALUE) {
01926       continue;
01927     }
01928   }
01929 }
01930 
01931 void
01932 RtpsUdpDataLink::send_nackfrag_replies(RtpsWriter& writer,
01933                                        DisjointSequence& gaps,
01934                                        OPENDDS_SET(ACE_INET_Addr)& gap_recipients)
01935 {
01936   typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
01937   OPENDDS_MAP(ACE_INET_Addr, FragmentInfo) requests;
01938 
01939   typedef ReaderInfoMap::iterator ri_iter;
01940   const ri_iter end = writer.remote_readers_.end();
01941   for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01942 
01943     if (ri->second.requested_frags_.empty() || !locators_.count(ri->first)) {
01944       continue;
01945     }
01946 
01947     const ACE_INET_Addr& remote_addr = locators_[ri->first].addr_;
01948 
01949     typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet)::iterator rf_iter;
01950     const rf_iter rf_end = ri->second.requested_frags_.end();
01951     for (rf_iter rf = ri->second.requested_frags_.begin(); rf != rf_end; ++rf) {
01952 
01953       const SequenceNumber& seq = rf->first;
01954       if (writer.send_buff_->contains(seq)) {
01955         FragmentInfo& fi = requests[remote_addr];
01956         fi[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
01957                        rf->second.bitmap.get_buffer());
01958       } else {
01959         gaps.insert(seq);
01960         gap_recipients.insert(remote_addr);
01961       }
01962     }
01963     ri->second.requested_frags_.clear();
01964   }
01965 
01966   typedef OPENDDS_MAP(ACE_INET_Addr, FragmentInfo)::iterator req_iter;
01967   for (req_iter req = requests.begin(); req != requests.end(); ++req) {
01968     const FragmentInfo& fi = req->second;
01969 
01970     ACE_GUARD(TransportSendBuffer::LockType, guard,
01971       writer.send_buff_->strategy_lock());
01972     const RtpsUdpSendStrategy::OverrideToken ot =
01973       send_strategy_->override_destinations(req->first);
01974 
01975     for (FragmentInfo::const_iterator sn_iter = fi.begin();
01976          sn_iter != fi.end(); ++sn_iter) {
01977       const SequenceNumber& seq = sn_iter->first;
01978       writer.send_buff_->resend_fragments_i(seq, sn_iter->second);
01979     }
01980   }
01981 }
01982 
01983 void
01984 RtpsUdpDataLink::process_acked_by_all_i(ACE_Guard<ACE_Thread_Mutex>& g, const RepoId& pub_id)
01985 {
01986   using namespace OpenDDS::RTPS;
01987   typedef RtpsWriterMap::iterator rw_iter;
01988   typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01989   OPENDDS_VECTOR(RepoId) to_check;
01990   rw_iter rw = writers_.find(pub_id);
01991   if (rw == writers_.end()) {
01992     return;
01993   }
01994   RtpsWriter& writer = rw->second;
01995   if (!writer.elems_not_acked_.empty()) {
01996 
01997     //start with the max sequence number writer knows about and decrease
01998     //by what the min over all readers is
01999     SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
02000 
02001     typedef ReaderInfoMap::iterator ri_iter;
02002     const ri_iter end = writer.remote_readers_.end();
02003     for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
02004       if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
02005         all_readers_ack = ri->second.cur_cumulative_ack_;
02006       }
02007     }
02008     if (all_readers_ack == SequenceNumber::MAX_VALUE) {
02009       return;
02010     }
02011     OPENDDS_VECTOR(SequenceNumber) sns;
02012     //if any messages fully acked, call data delivered and remove from map
02013 
02014     iter_t it = writer.elems_not_acked_.begin();
02015     OPENDDS_SET(SequenceNumber) sns_to_release;
02016     while (it != writer.elems_not_acked_.end()) {
02017       if (it->first < all_readers_ack) {
02018         writer.to_deliver_.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
02019         sns_to_release.insert(it->first);
02020         iter_t last = it;
02021         ++it;
02022         writer.elems_not_acked_.erase(last);
02023       } else {
02024         break;
02025       }
02026     }
02027     OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
02028     while (sns_it != sns_to_release.end()) {
02029       writer.send_buff_->release_acked(*sns_it);
02030       ++sns_it;
02031     }
02032     TransportQueueElement* tqe_to_deliver;
02033 
02034     while (true) {
02035       rw_iter deliver_on_writer = writers_.find(pub_id);
02036       if (deliver_on_writer == writers_.end()) {
02037         break;
02038       }
02039       RtpsWriter& writer = deliver_on_writer->second;
02040       iter_t to_deliver_iter = writer.to_deliver_.begin();
02041       if (to_deliver_iter == writer.to_deliver_.end()) {
02042         break;
02043       }
02044       tqe_to_deliver = to_deliver_iter->second;
02045       writer.to_deliver_.erase(to_deliver_iter);
02046       g.release();
02047 
02048       tqe_to_deliver->data_delivered();
02049       g.acquire();
02050     }
02051   }
02052 }
02053 
02054 ACE_Message_Block*
02055 RtpsUdpDataLink::marshal_gaps(const RepoId& writer, const RepoId& reader,
02056                               const DisjointSequence& gaps, bool durable)
02057 {
02058   using namespace RTPS;
02059   // RTPS v2.1 8.3.7.4: the Gap sequence numbers are those in the range
02060   // [gapStart, gapListBase) and those in the SNSet.
02061   const SequenceNumber firstMissing = gaps.low(),
02062                        base = ++SequenceNumber(gaps.cumulative_ack());
02063   const SequenceNumber_t gapStart = {firstMissing.getHigh(),
02064                                      firstMissing.getLow()},
02065                          gapListBase = {base.getHigh(), base.getLow()};
02066   CORBA::ULong num_bits = 0;
02067   LongSeq8 bitmap;
02068 
02069   if (gaps.disjoint()) {
02070     bitmap.length(bitmap_num_longs(base, gaps.high()));
02071     gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits);
02072 
02073   } else {
02074     bitmap.length(1);
02075     bitmap[0] = 0;
02076     num_bits = 1;
02077   }
02078 
02079   GapSubmessage gap = {
02080     {GAP, 1 /*FLAG_E*/, 0 /*length determined below*/},
02081     reader.entityId,
02082     writer.entityId,
02083     gapStart,
02084     {gapListBase, num_bits, bitmap}
02085   };
02086 
02087   if (Transport_debug_level > 5) {
02088     const GuidConverter conv(writer);
02089     SequenceRange sr;
02090     sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
02091     SequenceNumber srbase;
02092     srbase.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
02093     sr.second = srbase.previous();
02094     ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::marshal_gaps "
02095               "GAP with range [%q, %q] from %C\n",
02096               sr.first.getValue(), sr.second.getValue(),
02097               OPENDDS_STRING(conv).c_str()));
02098   }
02099 
02100   size_t gap_size = 0, padding = 0;
02101   gen_find_size(gap, gap_size, padding);
02102   gap.smHeader.submessageLength =
02103     static_cast<CORBA::UShort>(gap_size + padding) - SMHDR_SZ;
02104 
02105   // For durable writers, change a non-directed Gap into multiple directed gaps.
02106   OPENDDS_VECTOR(RepoId) readers;
02107   if (durable && reader.entityId == ENTITYID_UNKNOWN) {
02108     if (Transport_debug_level > 5) {
02109       const GuidConverter local_conv(writer);
02110       ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps local %C "
02111                  "durable writer\n", OPENDDS_STRING(local_conv).c_str()));
02112     }
02113     const RtpsWriterMap::iterator iter = writers_.find(writer);
02114     RtpsWriter& rw = iter->second;
02115     for (ReaderInfoMap::iterator ri = rw.remote_readers_.begin();
02116          ri != rw.remote_readers_.end(); ++ri) {
02117       if (!ri->second.expecting_durable_data()) {
02118         readers.push_back(ri->first);
02119       } else if (Transport_debug_level > 5) {
02120         const GuidConverter remote_conv(ri->first);
02121         ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps reader "
02122                    "%C is expecting durable data, no GAP sent\n",
02123                    OPENDDS_STRING(remote_conv).c_str()));
02124       }
02125     }
02126     if (readers.empty()) return 0;
02127   }
02128 
02129   const size_t size_per_idst = INFO_DST_SZ + SMHDR_SZ,
02130     prefix_sz = sizeof(reader.guidPrefix);
02131   // no additional padding needed for INFO_DST
02132   const size_t total_sz = readers.empty() ? (gap_size + padding) :
02133     (readers.size() * (gap_size + padding + size_per_idst));
02134 
02135   ACE_Message_Block* mb_gap = new ACE_Message_Block(total_sz);
02136   //FUTURE: allocators?
02137   // byte swapping is handled in the operator<<() implementation
02138   Serializer ser(mb_gap, false, Serializer::ALIGN_CDR);
02139   if (readers.empty()) {
02140     ser << gap;
02141   } else {
02142     InfoDestinationSubmessage idst = {
02143       {INFO_DST, 1 /*FLAG_E*/, INFO_DST_SZ},
02144       {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02145     };
02146     for (size_t i = 0; i < readers.size(); ++i) {
02147       std::memcpy(idst.guidPrefix, readers[i].guidPrefix, prefix_sz);
02148       gap.readerId = readers[i].entityId;
02149       ser << idst;
02150       ser << gap;
02151     }
02152   }
02153   return mb_gap;
02154 }
02155 
02156 void
02157 RtpsUdpDataLink::durability_resend(TransportQueueElement* element)
02158 {
02159   ACE_Message_Block* msg = const_cast<ACE_Message_Block*>(element->msg());
02160   send_strategy_->send_rtps_control(*msg,
02161                                     get_locator(element->subscription_id()));
02162 }
02163 
02164 void
02165 RtpsUdpDataLink::send_durability_gaps(const RepoId& writer,
02166                                       const RepoId& reader,
02167                                       const DisjointSequence& gaps)
02168 {
02169   ACE_Message_Block mb(RTPS::INFO_DST_SZ + RTPS::SMHDR_SZ);
02170   Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02171   RTPS::InfoDestinationSubmessage info_dst = {
02172     {RTPS::INFO_DST, 1 /*FLAG_E*/, RTPS::INFO_DST_SZ},
02173     {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02174   };
02175   std::memcpy(info_dst.guidPrefix, reader.guidPrefix, sizeof(GuidPrefix_t));
02176   ser << info_dst;
02177   mb.cont(marshal_gaps(writer, reader, gaps));
02178   send_strategy_->send_rtps_control(mb, get_locator(reader));
02179   mb.cont()->release();
02180 }
02181 
02182 void
02183 RtpsUdpDataLink::send_heartbeats()
02184 {
02185   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02186   ACE_GUARD(ACE_Thread_Mutex, c, reader_no_longer_exists_lock_);
02187 
02188   if (writers_.empty() && interesting_readers_.empty()) {
02189     heartbeat_.disable();
02190   }
02191 
02192   using namespace OpenDDS::RTPS;
02193   OPENDDS_VECTOR(HeartBeatSubmessage) subm;
02194   OPENDDS_SET(ACE_INET_Addr) recipients;
02195   OPENDDS_VECTOR(TransportQueueElement*) pendingCallbacks;
02196   const ACE_Time_Value now = ACE_OS::gettimeofday();
02197 
02198   RepoIdSet writers_to_advertise;
02199 
02200   const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config_->heartbeat_period_;
02201   const ACE_Time_Value tv3 = ACE_OS::gettimeofday() - 3 * config_->heartbeat_period_;
02202   for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
02203          limit = interesting_readers_.end();
02204        pos != limit;
02205        ++pos) {
02206     if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
02207         (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
02208       recipients.insert(pos->second.address);
02209       writers_to_advertise.insert(pos->second.localid);
02210     }
02211     if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02212       CallbackType callback(pos->first, pos->second);
02213       readerDoesNotExistCallbacks_.push_back(callback);
02214       pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02215     }
02216   }
02217 
02218   typedef RtpsWriterMap::iterator rw_iter;
02219   for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
02220     const bool has_data = !rw->second.send_buff_.is_nil()
02221                           && !rw->second.send_buff_->empty();
02222     bool final = true, has_durable_data = false;
02223     SequenceNumber durable_max;
02224 
02225     typedef ReaderInfoMap::iterator ri_iter;
02226     const ri_iter end = rw->second.remote_readers_.end();
02227     for (ri_iter ri = rw->second.remote_readers_.begin(); ri != end; ++ri) {
02228       if ((has_data || !ri->second.handshake_done_)
02229           && locators_.count(ri->first)) {
02230         recipients.insert(locators_[ri->first].addr_);
02231         if (final && !ri->second.handshake_done_) {
02232           final = false;
02233         }
02234       }
02235       if (!ri->second.durable_data_.empty()) {
02236         const ACE_Time_Value expiration =
02237           ri->second.durable_timestamp_ + config_->durable_data_timeout_;
02238         if (now > expiration) {
02239           typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator
02240             dd_iter;
02241           for (dd_iter it = ri->second.durable_data_.begin();
02242                it != ri->second.durable_data_.end(); ++it) {
02243             pendingCallbacks.push_back(it->second);
02244           }
02245           ri->second.durable_data_.clear();
02246           if (Transport_debug_level > 3) {
02247             const GuidConverter gw(rw->first), gr(ri->first);
02248             VDBG_LVL((LM_INFO, "(%P|%t) RtpsUdpDataLink::send_heartbeats - "
02249               "removed expired durable data for %C -> %C\n",
02250               OPENDDS_STRING(gw).c_str(), OPENDDS_STRING(gr).c_str()), 3);
02251           }
02252         } else {
02253           has_durable_data = true;
02254           if (ri->second.durable_data_.rbegin()->first > durable_max) {
02255             durable_max = ri->second.durable_data_.rbegin()->first;
02256           }
02257           if (locators_.count(ri->first)) {
02258             recipients.insert(locators_[ri->first].addr_);
02259           }
02260         }
02261       }
02262     }
02263 
02264     if (!rw->second.elems_not_acked_.empty()) {
02265       final = false;
02266     }
02267 
02268     if (writers_to_advertise.count(rw->first)) {
02269       final = false;
02270       writers_to_advertise.erase(rw->first);
02271     }
02272 
02273     if (final && !has_data && !has_durable_data) {
02274       continue;
02275     }
02276 
02277     const SequenceNumber firstSN = (rw->second.durable_ || !has_data)
02278                                    ? 1 : rw->second.send_buff_->low(),
02279         lastSN = std::max(durable_max,
02280                           has_data ? rw->second.send_buff_->high() : 1);
02281 
02282     const HeartBeatSubmessage hb = {
02283       {HEARTBEAT,
02284        CORBA::Octet(1 /*FLAG_E*/ | (final ? 2 /*FLAG_F*/ : 0)),
02285        HEARTBEAT_SZ},
02286       ENTITYID_UNKNOWN, // any matched reader may be interested in this
02287       rw->first.entityId,
02288       {firstSN.getHigh(), firstSN.getLow()},
02289       {lastSN.getHigh(), lastSN.getLow()},
02290       {++heartbeat_counts_[rw->first]}
02291     };
02292     subm.push_back(hb);
02293   }
02294 
02295   for (RepoIdSet::const_iterator pos = writers_to_advertise.begin(),
02296          limit = writers_to_advertise.end();
02297        pos != limit;
02298        ++pos) {
02299     const SequenceNumber SN = 1;
02300     const HeartBeatSubmessage hb = {
02301       {HEARTBEAT,
02302        CORBA::Octet(1 /*FLAG_E*/),
02303        HEARTBEAT_SZ},
02304       ENTITYID_UNKNOWN, // any matched reader may be interested in this
02305       pos->entityId,
02306       {SN.getHigh(), SN.getLow()},
02307       {SN.getHigh(), SN.getLow()},
02308       {++heartbeat_counts_[*pos]}
02309     };
02310     subm.push_back(hb);
02311   }
02312 
02313   if (!subm.empty()) {
02314     ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * subm.size()); //FUTURE: allocators?
02315     // byte swapping is handled in the operator<<() implementation
02316     Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02317     bool send_ok = true;
02318     for (size_t i = 0; i < subm.size(); ++i) {
02319       if (!(ser << subm[i])) {
02320         ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats() - "
02321           "failed to serialize HEARTBEAT submessage %B\n", i));
02322         send_ok = false;
02323         break;
02324       }
02325     }
02326     if (send_ok) {
02327       send_strategy_->send_rtps_control(mb, recipients);
02328     }
02329   }
02330   c.release();
02331   g.release();
02332 
02333   while(true) {
02334     c.acquire();
02335     if (readerDoesNotExistCallbacks_.empty()) {
02336       break;
02337     }
02338     OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks_.begin();
02339     const RepoId& rid = iter->first;
02340     const InterestingRemote& remote = iter->second;
02341     readerDoesNotExistCallbacks_.erase(iter);
02342     c.release();
02343     remote.listener->reader_does_not_exist(rid, remote.localid);
02344   }
02345 
02346   for (size_t i = 0; i < pendingCallbacks.size(); ++i) {
02347     pendingCallbacks[i]->data_dropped();
02348   }
02349 }
02350 
02351 void
02352 RtpsUdpDataLink::check_heartbeats()
02353 {
02354   // Have any interesting writers timed out?
02355   const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config_->heartbeat_period_;
02356   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02357   ACE_GUARD(ACE_Thread_Mutex, c, writer_no_longer_exists_lock_);
02358   for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
02359        pos != limit;
02360        ++pos) {
02361     if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02362       CallbackType callback(pos->first, pos->second);
02363       writerDoesNotExistCallbacks_.push_back(callback);
02364       pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02365     }
02366   }
02367   c.release();
02368   g.release();
02369   while(true) {
02370     c.acquire();
02371     if (writerDoesNotExistCallbacks_.empty()) {
02372       break;
02373     }
02374     OPENDDS_VECTOR(CallbackType)::iterator iter = writerDoesNotExistCallbacks_.begin();
02375     const RepoId& rid = iter->first;
02376     const InterestingRemote& remote = iter->second;
02377     writerDoesNotExistCallbacks_.erase(iter);
02378     c.release();
02379     remote.listener->writer_does_not_exist(rid, remote.localid);
02380   }
02381 }
02382 
02383 void
02384 RtpsUdpDataLink::send_heartbeats_manual(const TransportSendControlElement* tsce)
02385 {
02386   using namespace OpenDDS::RTPS;
02387 
02388   const RepoId pub_id = tsce->publication_id();
02389 
02390   // Populate the recipients.
02391   OPENDDS_SET(ACE_INET_Addr) recipients;
02392   get_locators (pub_id, recipients);
02393   if (recipients.empty()) {
02394     return;
02395   }
02396 
02397   // Populate the sequence numbers and counter.
02398 
02399   SequenceNumber firstSN, lastSN;
02400   CORBA::Long counter;
02401   RtpsWriterMap::iterator pos = writers_.find (pub_id);
02402   if (pos != writers_.end ()) {
02403     // Reliable.
02404     const bool has_data = !pos->second.send_buff_.is_nil() && !pos->second.send_buff_->empty();
02405     SequenceNumber durable_max;
02406     const ACE_Time_Value now = ACE_OS::gettimeofday();
02407     for (ReaderInfoMap::const_iterator ri = pos->second.remote_readers_.begin(), end = pos->second.remote_readers_.end();
02408          ri != end;
02409          ++ri) {
02410       if (!ri->second.durable_data_.empty()) {
02411         const ACE_Time_Value expiration = ri->second.durable_timestamp_ + config_->durable_data_timeout_;
02412         if (now <= expiration &&
02413             ri->second.durable_data_.rbegin()->first > durable_max) {
02414           durable_max = ri->second.durable_data_.rbegin()->first;
02415         }
02416       }
02417     }
02418     firstSN = (pos->second.durable_ || !has_data) ? 1 : pos->second.send_buff_->low();
02419     lastSN = std::max(durable_max, has_data ? pos->second.send_buff_->high() : 1);
02420     counter = ++heartbeat_counts_[pos->first];
02421   } else {
02422     // Unreliable.
02423     firstSN = 1;
02424     lastSN = tsce->sequence();
02425     counter = ++this->best_effort_heartbeat_count_;
02426   }
02427 
02428   const HeartBeatSubmessage hb = {
02429     {HEARTBEAT,
02430      CORBA::Octet(1 /*FLAG_E*/ | 2 /*FLAG_F*/ | 4 /*FLAG_L*/),
02431      HEARTBEAT_SZ},
02432     ENTITYID_UNKNOWN, // any matched reader may be interested in this
02433     pub_id.entityId,
02434     {firstSN.getHigh(), firstSN.getLow()},
02435     {lastSN.getHigh(), lastSN.getLow()},
02436     {counter}
02437   };
02438 
02439   ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * 1); //FUTURE: allocators?
02440   // byte swapping is handled in the operator<<() implementation
02441   Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02442   if ((ser << hb)) {
02443     send_strategy_->send_rtps_control(mb, recipients);
02444   }
02445   else {
02446     ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats_manual() - "
02447                "failed to serialize HEARTBEAT submessage\n"));
02448   }
02449 }
02450 
02451 RtpsUdpDataLink::ReaderInfo::~ReaderInfo()
02452 {
02453   expire_durable_data();
02454 }
02455 
02456 void
02457 RtpsUdpDataLink::ReaderInfo::expire_durable_data()
02458 {
02459   typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
02460   for (iter_t it = durable_data_.begin(); it != durable_data_.end(); ++it) {
02461     it->second->data_dropped();
02462   }
02463 }
02464 
02465 bool
02466 RtpsUdpDataLink::ReaderInfo::expecting_durable_data() const
02467 {
02468   return durable_ &&
02469     (durable_timestamp_ == ACE_Time_Value::zero // DW hasn't resent yet
02470      || !durable_data_.empty());                // DW resent, not sent to reader
02471 }
02472 
02473 RtpsUdpDataLink::RtpsWriter::~RtpsWriter()
02474 {
02475   if (!to_deliver_.empty()) {
02476     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: RtpsWriter::~RtpsWriter - deleting with %d elements left to deliver\n", to_deliver_.size()));
02477   }
02478   if (!elems_not_acked_.empty()) {
02479     ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: RtpsWriter::~RtpsWriter - deleting with %d elements left not fully acknowledged\n", elems_not_acked_.size()));
02480   }
02481 }
02482 
02483 SequenceNumber
02484 RtpsUdpDataLink::RtpsWriter::heartbeat_high(const ReaderInfo& ri) const
02485 {
02486   const SequenceNumber durable_max =
02487     ri.durable_data_.empty() ? 0 : ri.durable_data_.rbegin()->first;
02488   const SequenceNumber data_max =
02489     send_buff_.is_nil() ? 0 : (send_buff_->empty() ? 0 : send_buff_->high());
02490   return std::max(durable_max, data_max);
02491 }
02492 
02493 void
02494 RtpsUdpDataLink::RtpsWriter::add_elem_awaiting_ack(TransportQueueElement* element)
02495 {
02496   elems_not_acked_.insert(SnToTqeMap::value_type(element->sequence(), element));
02497 }
02498 
02499 
02500 // Implementing TimedDelay and HeartBeat nested classes (for ACE timers)
02501 
02502 void
02503 RtpsUdpDataLink::TimedDelay::schedule()
02504 {
02505   if (!scheduled_) {
02506     const long timer = outer_->get_reactor()->schedule_timer(this, 0, timeout_);
02507 
02508     if (timer == -1) {
02509       ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::TimedDelay::schedule "
02510         "failed to schedule timer %p\n", ACE_TEXT("")));
02511     } else {
02512       scheduled_ = true;
02513     }
02514   }
02515 }
02516 
02517 void
02518 RtpsUdpDataLink::TimedDelay::cancel()
02519 {
02520   if (scheduled_) {
02521     outer_->get_reactor()->cancel_timer(this);
02522     scheduled_ = false;
02523   }
02524 }
02525 
02526 void
02527 RtpsUdpDataLink::HeartBeat::enable()
02528 {
02529   if (!enabled_) {
02530     const ACE_Time_Value& per = outer_->config_->heartbeat_period_;
02531     const long timer =
02532       outer_->get_reactor()->schedule_timer(this, 0, ACE_Time_Value::zero, per);
02533 
02534     if (timer == -1) {
02535       ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::HeartBeat::enable"
02536         " failed to schedule timer %p\n", ACE_TEXT("")));
02537     } else {
02538       enabled_ = true;
02539     }
02540   }
02541 }
02542 
02543 void
02544 RtpsUdpDataLink::HeartBeat::disable()
02545 {
02546   if (enabled_) {
02547     outer_->get_reactor()->cancel_timer(this);
02548     enabled_ = false;
02549   }
02550 }
02551 
02552 void
02553 RtpsUdpDataLink::send_final_acks (const RepoId& readerid)
02554 {
02555   RtpsReaderMap::iterator rr = readers_.find (readerid);
02556   if (rr != readers_.end ()) {
02557     send_ack_nacks (rr, true);
02558   }
02559 }
02560 
02561 } // namespace DCPS
02562 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7