00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsUdpDataLink.h"
00009 #include "RtpsUdpTransport.h"
00010 #include "RtpsUdpInst.h"
00011 #include "RtpsUdpSendStrategy.h"
00012 #include "RtpsUdpReceiveStrategy.h"
00013
00014 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00015 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00016 #include "dds/DCPS/transport/framework/TransportSendControlElement.h"
00017 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00018
00019 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00020 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00021 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00022 #include "dds/DCPS/RTPS/MessageTypes.h"
00023
00024 #if defined(OPENDDS_SECURITY)
00025 #include "dds/DCPS/RTPS/SecurityHelpers.h"
00026 #include "dds/DCPS/security/framework/SecurityRegistry.h"
00027 #endif
00028
00029 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00030
00031 #include "ace/Default_Constants.h"
00032 #include "ace/Log_Msg.h"
00033 #include "ace/Message_Block.h"
00034 #include "ace/Reverse_Lock_T.h"
00035 #include "ace/Reactor.h"
00036
00037 #include <string.h>
00038
00039 #ifndef __ACE_INLINE__
00040 # include "RtpsUdpDataLink.inl"
00041 #endif
00042
00043 namespace {
00044
00045
00046
00047 CORBA::ULong
00048 bitmap_num_longs(const OpenDDS::DCPS::SequenceNumber& low,
00049 const OpenDDS::DCPS::SequenceNumber& high)
00050 {
00051 return std::min(CORBA::ULong(8),
00052 CORBA::ULong((high.getValue() - low.getValue() + 32) / 32));
00053 }
00054
00055 bool bitmapNonEmpty(const OpenDDS::RTPS::SequenceNumberSet& snSet)
00056 {
00057 for (CORBA::ULong i = 0; i < snSet.bitmap.length(); ++i) {
00058 if (snSet.bitmap[i]) {
00059 if (snSet.numBits >= (i + 1) * 32) {
00060 return true;
00061 }
00062 for (int bit = 31; bit >= 0; --bit) {
00063 if ((snSet.bitmap[i] & (1 << bit))
00064 && snSet.numBits >= i * 32 + (31 - bit)) {
00065 return true;
00066 }
00067 }
00068 }
00069 }
00070 return false;
00071 }
00072
00073 }
00074
00075 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00076
00077 namespace OpenDDS {
00078 namespace DCPS {
00079
00080 RtpsUdpDataLink::RtpsUdpDataLink(RtpsUdpTransport& transport,
00081 const GuidPrefix_t& local_prefix,
00082 const RtpsUdpInst& config,
00083 const TransportReactorTask_rch& reactor_task)
00084 : DataLink(transport,
00085 0,
00086 false,
00087 false),
00088 reactor_task_(reactor_task),
00089 multi_buff_(this, config.nak_depth_),
00090 best_effort_heartbeat_count_(0),
00091 nack_reply_(this, &RtpsUdpDataLink::send_nack_replies,
00092 config.nak_response_delay_),
00093 heartbeat_reply_(this, &RtpsUdpDataLink::send_heartbeat_replies,
00094 config.heartbeat_response_delay_),
00095 heartbeat_(make_rch<HeartBeat>(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::send_heartbeats)),
00096 heartbeatchecker_(make_rch<HeartBeat>(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::check_heartbeats)),
00097 #if defined(OPENDDS_SECURITY)
00098 held_data_delivery_handler_(this),
00099 security_config_(Security::SecurityRegistry::instance()->default_config()),
00100 local_crypto_handle_(DDS::HANDLE_NIL)
00101 #else
00102 held_data_delivery_handler_(this)
00103 #endif
00104 {
00105 this->send_strategy_ = make_rch<RtpsUdpSendStrategy>(this, local_prefix);
00106 this->receive_strategy_ = make_rch<RtpsUdpReceiveStrategy>(this, local_prefix);
00107 std::memcpy(local_prefix_, local_prefix, sizeof(GuidPrefix_t));
00108 }
00109
00110 RtpsUdpInst&
00111 RtpsUdpDataLink::config() const
00112 {
00113 return static_cast<RtpsUdpTransport&>(impl()).config();
00114 }
00115
00116 bool
00117 RtpsUdpDataLink::add_delayed_notification(TransportQueueElement* element)
00118 {
00119 RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
00120 if (iter != writers_.end()) {
00121
00122 iter->second.add_elem_awaiting_ack(element);
00123 return true;
00124 }
00125 return false;
00126 }
00127
00128 void RtpsUdpDataLink::do_remove_sample(const RepoId& pub_id,
00129 const TransportQueueElement::MatchCriteria& criteria,
00130 ACE_Guard<ACE_Thread_Mutex>& guard)
00131 {
00132 RtpsWriter::SnToTqeMap sn_tqe_map;
00133 RtpsWriter::SnToTqeMap to_deliver;
00134 typedef RtpsWriter::SnToTqeMap::iterator iter_t;
00135
00136 RtpsWriterMap::iterator iter = writers_.find(pub_id);
00137 if (iter != writers_.end() && !iter->second.elems_not_acked_.empty()) {
00138 to_deliver.insert(iter->second.to_deliver_.begin(), iter->second.to_deliver_.end());
00139 iter->second.to_deliver_.clear();
00140 iter_t it = iter->second.elems_not_acked_.begin();
00141 OPENDDS_SET(SequenceNumber) sns_to_release;
00142 while (it != iter->second.elems_not_acked_.end()) {
00143 if (criteria.matches(*it->second)) {
00144 sn_tqe_map.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
00145 sns_to_release.insert(it->first);
00146 iter_t last = it;
00147 ++it;
00148 iter->second.elems_not_acked_.erase(last);
00149 } else {
00150 ++it;
00151 }
00152 }
00153 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00154 while (sns_it != sns_to_release.end()) {
00155 iter->second.send_buff_->release_acked(*sns_it);
00156 ++sns_it;
00157 }
00158 }
00159
00160
00161
00162 guard.release();
00163
00164 iter_t deliver_iter = to_deliver.begin();
00165 while (deliver_iter != to_deliver.end()) {
00166 deliver_iter->second->data_delivered();
00167 ++deliver_iter;
00168 }
00169 iter_t drop_iter = sn_tqe_map.begin();
00170 while (drop_iter != sn_tqe_map.end()) {
00171 drop_iter->second->data_dropped(true);
00172 ++drop_iter;
00173 }
00174 }
00175
00176 bool
00177 RtpsUdpDataLink::open(const ACE_SOCK_Dgram& unicast_socket)
00178 {
00179 unicast_socket_ = unicast_socket;
00180
00181 RtpsUdpInst& config = this->config();
00182
00183 if (config.use_multicast_) {
00184 const OPENDDS_STRING& net_if = config.multicast_interface_;
00185 #ifdef ACE_HAS_MAC_OSX
00186 multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00187 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00188 #endif
00189 if (multicast_socket_.join(config.multicast_group_address_, 1,
00190 net_if.empty() ? 0 :
00191 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00192 ACE_ERROR_RETURN((LM_ERROR,
00193 ACE_TEXT("(%P|%t) ERROR: ")
00194 ACE_TEXT("RtpsUdpDataLink::open: ")
00195 ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed: %m\n")),
00196 false);
00197 }
00198 }
00199
00200 if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, config.ttl_)) {
00201 ACE_ERROR_RETURN((LM_ERROR,
00202 ACE_TEXT("(%P|%t) ERROR: ")
00203 ACE_TEXT("RtpsUdpDataLink::open: ")
00204 ACE_TEXT("failed to set TTL: %d\n"),
00205 config.ttl_),
00206 false);
00207 }
00208
00209 if (config.send_buffer_size_ > 0) {
00210 int snd_size = config.send_buffer_size_;
00211 if (this->unicast_socket_.set_option(SOL_SOCKET,
00212 SO_SNDBUF,
00213 (void *) &snd_size,
00214 sizeof(snd_size)) < 0
00215 && errno != ENOTSUP) {
00216 ACE_ERROR_RETURN((LM_ERROR,
00217 ACE_TEXT("(%P|%t) ERROR: ")
00218 ACE_TEXT("RtpsUdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00219 snd_size),
00220 false);
00221 }
00222 }
00223
00224 if (config.rcv_buffer_size_ > 0) {
00225 int rcv_size = config.rcv_buffer_size_;
00226 if (this->unicast_socket_.set_option(SOL_SOCKET,
00227 SO_RCVBUF,
00228 (void *) &rcv_size,
00229 sizeof(int)) < 0
00230 && errno != ENOTSUP) {
00231 ACE_ERROR_RETURN((LM_ERROR,
00232 ACE_TEXT("(%P|%t) ERROR: ")
00233 ACE_TEXT("RtpsUdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00234 rcv_size),
00235 false);
00236 }
00237 }
00238
00239 send_strategy()->send_buffer(&multi_buff_);
00240
00241 if (start(send_strategy_,
00242 receive_strategy_) != 0) {
00243 stop_i();
00244 ACE_ERROR_RETURN((LM_ERROR,
00245 ACE_TEXT("(%P|%t) ERROR: ")
00246 ACE_TEXT("UdpDataLink::open: start failed!\n")),
00247 false);
00248 }
00249
00250 return true;
00251 }
00252
00253 void
00254 RtpsUdpDataLink::add_locator(const RepoId& remote_id,
00255 const ACE_INET_Addr& address,
00256 bool requires_inline_qos)
00257 {
00258 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00259 locators_[remote_id] = RemoteInfo(address, requires_inline_qos);
00260 }
00261
00262 void
00263 RtpsUdpDataLink::get_locators(const RepoId& local_id,
00264 OPENDDS_SET(ACE_INET_Addr)& addrs) const
00265 {
00266 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00267
00268 if (local_id == GUID_UNKNOWN) {
00269 for (iter_t iter = locators_.begin(); iter != locators_.end(); ++iter) {
00270 addrs.insert(iter->second.addr_);
00271 }
00272 return;
00273 }
00274
00275 const GUIDSeq_var peers = peer_ids(local_id);
00276 if (!peers.ptr()) {
00277 return;
00278 }
00279 for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00280 const ACE_INET_Addr addr = get_locator(peers[i]);
00281 if (addr != ACE_INET_Addr()) {
00282 addrs.insert(addr);
00283 }
00284 }
00285 }
00286
00287 ACE_INET_Addr
00288 RtpsUdpDataLink::get_locator(const RepoId& remote_id) const
00289 {
00290 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00291 const iter_t iter = locators_.find(remote_id);
00292 if (iter == locators_.end()) {
00293 const GuidConverter conv(remote_id);
00294 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::get_locator_i() - "
00295 "no locator found for peer %C\n", OPENDDS_STRING(conv).c_str()));
00296 return ACE_INET_Addr();
00297 }
00298 return iter->second.addr_;
00299 }
00300
00301 void
00302 RtpsUdpDataLink::associated(const RepoId& local_id, const RepoId& remote_id,
00303 bool local_reliable, bool remote_reliable,
00304 bool local_durable, bool remote_durable)
00305 {
00306 if (!local_reliable) {
00307 return;
00308 }
00309
00310 bool enable_heartbeat = false;
00311
00312 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00313 const GuidConverter conv(local_id);
00314 if (conv.isWriter() && remote_reliable) {
00315
00316 heartbeat_counts_.insert(HeartBeatCountMapType::value_type(local_id, 0));
00317 RtpsWriter& w = writers_[local_id];
00318 w.remote_readers_[remote_id].durable_ = remote_durable;
00319 w.durable_ = local_durable;
00320 enable_heartbeat = true;
00321
00322 } else if (conv.isReader()) {
00323 RtpsReaderMap::iterator rr = readers_.find(local_id);
00324 if (rr == readers_.end()) {
00325 rr = readers_.insert(RtpsReaderMap::value_type(local_id, RtpsReader()))
00326 .first;
00327 rr->second.durable_ = local_durable;
00328 }
00329 rr->second.remote_writers_[remote_id];
00330 reader_index_.insert(RtpsReaderIndex::value_type(remote_id, rr));
00331 }
00332
00333 g.release();
00334 if (enable_heartbeat) {
00335 heartbeat_->schedule_enable();
00336 }
00337 }
00338
00339 bool
00340 RtpsUdpDataLink::check_handshake_complete(const RepoId& local_id,
00341 const RepoId& remote_id)
00342 {
00343 const GuidConverter conv(local_id);
00344 if (conv.isWriter()) {
00345 RtpsWriterMap::iterator rw = writers_.find(local_id);
00346 if (rw == writers_.end()) {
00347 return true;
00348 }
00349 ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote_id);
00350 if (ri == rw->second.remote_readers_.end()) {
00351 return true;
00352 }
00353 return ri->second.handshake_done_;
00354
00355 } else if (conv.isReader()) {
00356 return true;
00357 }
00358 return false;
00359 }
00360
00361 void
00362 RtpsUdpDataLink::register_for_reader(const RepoId& writerid,
00363 const RepoId& readerid,
00364 const ACE_INET_Addr& address,
00365 OpenDDS::DCPS::DiscoveryListener* listener)
00366 {
00367 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00368 bool enableheartbeat = interesting_readers_.empty();
00369 interesting_readers_.insert(InterestingRemoteMapType::value_type(readerid, InterestingRemote(writerid, address, listener)));
00370 heartbeat_counts_[writerid] = 0;
00371 g.release();
00372 if (enableheartbeat) {
00373 heartbeat_->schedule_enable();
00374 }
00375 }
00376
00377 void
00378 RtpsUdpDataLink::unregister_for_reader(const RepoId& writerid,
00379 const RepoId& readerid)
00380 {
00381 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00382 for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
00383 limit = interesting_readers_.upper_bound(readerid);
00384 pos != limit;
00385 ) {
00386 if (pos->second.localid == writerid) {
00387 interesting_readers_.erase(pos++);
00388 } else {
00389 ++pos;
00390 }
00391 }
00392 }
00393
00394 void
00395 RtpsUdpDataLink::register_for_writer(const RepoId& readerid,
00396 const RepoId& writerid,
00397 const ACE_INET_Addr& address,
00398 OpenDDS::DCPS::DiscoveryListener* listener)
00399 {
00400 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00401 bool enableheartbeatchecker = interesting_writers_.empty();
00402 interesting_writers_.insert(InterestingRemoteMapType::value_type(writerid, InterestingRemote(readerid, address, listener)));
00403 g.release();
00404 if (enableheartbeatchecker) {
00405 heartbeatchecker_->schedule_enable();
00406 }
00407 }
00408
00409 void
00410 RtpsUdpDataLink::unregister_for_writer(const RepoId& readerid,
00411 const RepoId& writerid)
00412 {
00413 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00414 for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
00415 limit = interesting_writers_.upper_bound(writerid);
00416 pos != limit;
00417 ) {
00418 if (pos->second.localid == readerid) {
00419 interesting_writers_.erase(pos++);
00420 } else {
00421 ++pos;
00422 }
00423 }
00424 }
00425
00426 void
00427 RtpsUdpDataLink::pre_stop_i()
00428 {
00429 DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
00430 DataLink::pre_stop_i();
00431 OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00432 OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00433 {
00434 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00435
00436 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00437
00438 RtpsWriterMap::iterator iter = writers_.begin();
00439 while (iter != writers_.end()) {
00440 RtpsWriter& writer = iter->second;
00441 if (!writer.to_deliver_.empty()) {
00442 iter_t iter = writer.to_deliver_.begin();
00443 while (iter != writer.to_deliver_.end()) {
00444 to_deliver.push_back(iter->second);
00445 writer.to_deliver_.erase(iter);
00446 iter = writer.to_deliver_.begin();
00447 }
00448 }
00449 if (!writer.elems_not_acked_.empty()) {
00450 OPENDDS_SET(SequenceNumber) sns_to_release;
00451 iter_t iter = writer.elems_not_acked_.begin();
00452 while (iter != writer.elems_not_acked_.end()) {
00453 to_drop.push_back(iter->second);
00454 sns_to_release.insert(iter->first);
00455 writer.elems_not_acked_.erase(iter);
00456 iter = writer.elems_not_acked_.begin();
00457 }
00458 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00459 while (sns_it != sns_to_release.end()) {
00460 writer.send_buff_->release_acked(*sns_it);
00461 ++sns_it;
00462 }
00463 }
00464 RtpsWriterMap::iterator last = iter;
00465 ++iter;
00466 heartbeat_counts_.erase(last->first);
00467 writers_.erase(last);
00468 }
00469 }
00470 typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00471 tqe_iter deliver_it = to_deliver.begin();
00472 while (deliver_it != to_deliver.end()) {
00473 (*deliver_it)->data_delivered();
00474 ++deliver_it;
00475 }
00476 tqe_iter drop_it = to_drop.begin();
00477 while (drop_it != to_drop.end()) {
00478 (*drop_it)->data_dropped(true);
00479 ++drop_it;
00480 }
00481 }
00482
00483 void
00484 RtpsUdpDataLink::send_i(TransportQueueElement* element, bool relink)
00485 {
00486
00487
00488
00489 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00490 DataLink::send_i(element, relink);
00491 }
00492
00493 RemoveResult
00494 RtpsUdpDataLink::remove_sample(const DataSampleElement* sample, void*)
00495 {
00496
00497 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, REMOVE_ERROR);
00498 return DataLink::remove_sample(sample, &g);
00499 }
00500
00501 void
00502 RtpsUdpDataLink::release_reservations_i(const RepoId& remote_id,
00503 const RepoId& local_id)
00504 {
00505 OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00506 OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00507 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00508 using std::pair;
00509 const GuidConverter conv(local_id);
00510 if (conv.isWriter()) {
00511 const RtpsWriterMap::iterator rw = writers_.find(local_id);
00512
00513 if (rw != writers_.end()) {
00514 rw->second.remote_readers_.erase(remote_id);
00515
00516 if (rw->second.remote_readers_.empty()) {
00517 RtpsWriter& writer = rw->second;
00518 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00519
00520 if (!writer.to_deliver_.empty()) {
00521 iter_t iter = writer.to_deliver_.begin();
00522 while (iter != writer.to_deliver_.end()) {
00523 to_deliver.push_back(iter->second);
00524 writer.to_deliver_.erase(iter);
00525 iter = writer.to_deliver_.begin();
00526 }
00527 }
00528 if (!writer.elems_not_acked_.empty()) {
00529 OPENDDS_SET(SequenceNumber) sns_to_release;
00530 iter_t iter = writer.elems_not_acked_.begin();
00531 while (iter != writer.elems_not_acked_.end()) {
00532 to_drop.push_back(iter->second);
00533 sns_to_release.insert(iter->first);
00534 writer.elems_not_acked_.erase(iter);
00535 iter = writer.elems_not_acked_.begin();
00536 }
00537 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00538 while (sns_it != sns_to_release.end()) {
00539 writer.send_buff_->release_acked(*sns_it);
00540 ++sns_it;
00541 }
00542 }
00543 heartbeat_counts_.erase(rw->first);
00544 writers_.erase(rw);
00545 } else {
00546 process_acked_by_all_i(g, local_id);
00547 }
00548 }
00549
00550 } else if (conv.isReader()) {
00551 const RtpsReaderMap::iterator rr = readers_.find(local_id);
00552
00553 if (rr != readers_.end()) {
00554 rr->second.remote_writers_.erase(remote_id);
00555
00556 for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00557 reader_index_.equal_range(remote_id);
00558 iters.first != iters.second;) {
00559 if (iters.first->second == rr) {
00560 reader_index_.erase(iters.first++);
00561 } else {
00562 ++iters.first;
00563 }
00564 }
00565
00566 if (rr->second.remote_writers_.empty()) {
00567 readers_.erase(rr);
00568 }
00569 }
00570 }
00571 g.release();
00572 typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00573 tqe_iter deliver_it = to_deliver.begin();
00574 while (deliver_it != to_deliver.end()) {
00575 (*deliver_it)->data_delivered();
00576 ++deliver_it;
00577 }
00578 tqe_iter drop_it = to_drop.begin();
00579 while (drop_it != to_drop.end()) {
00580 (*drop_it)->data_dropped(true);
00581 ++drop_it;
00582 }
00583 }
00584
00585 void
00586 RtpsUdpDataLink::stop_i()
00587 {
00588 nack_reply_.cancel();
00589 heartbeat_reply_.cancel();
00590 heartbeat_->disable();
00591 unicast_socket_.close();
00592 multicast_socket_.close();
00593 }
00594
00595
00596
00597
00598 void
00599 RtpsUdpDataLink::MultiSendBuffer::retain_all(RepoId pub_id)
00600 {
00601 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00602 const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id);
00603 if (wi != outer_->writers_.end() && !wi->second.send_buff_.is_nil()) {
00604 wi->second.send_buff_->retain_all(pub_id);
00605 }
00606 }
00607
00608 void
00609 RtpsUdpDataLink::MultiSendBuffer::insert(SequenceNumber ,
00610 TransportSendStrategy::QueueType* q,
00611 ACE_Message_Block* chain)
00612 {
00613
00614
00615 const TransportQueueElement* const tqe = q->peek();
00616 const SequenceNumber seq = tqe->sequence();
00617 if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00618 return;
00619 }
00620
00621 const RepoId pub_id = tqe->publication_id();
00622
00623 const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id);
00624 if (wi == outer_->writers_.end()) {
00625 return;
00626 }
00627
00628 RcHandle<SingleSendBuffer>& send_buff = wi->second.send_buff_;
00629
00630 if (send_buff.is_nil()) {
00631 send_buff = make_rch<SingleSendBuffer>(SingleSendBuffer::UNLIMITED, 1 );
00632
00633 send_buff->bind(outer_->send_strategy());
00634 }
00635
00636 if (Transport_debug_level > 5) {
00637 const GuidConverter pub(pub_id);
00638 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert() - "
00639 "pub_id %C seq %q frag %d\n", OPENDDS_STRING(pub).c_str(), seq.getValue(),
00640 (int)tqe->is_fragment()));
00641 }
00642
00643 if (tqe->is_fragment()) {
00644 const RtpsCustomizedElement* const rce =
00645 dynamic_cast<const RtpsCustomizedElement*>(tqe);
00646 if (rce) {
00647 send_buff->insert_fragment(seq, rce->last_fragment(), q, chain);
00648 } else if (Transport_debug_level) {
00649 const GuidConverter pub(pub_id);
00650 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert()"
00651 " - ERROR: couldn't get fragment number for pub_id %C seq %q\n",
00652 OPENDDS_STRING(pub).c_str(), seq.getValue()));
00653 }
00654 } else {
00655 send_buff->insert(seq, q, chain);
00656 }
00657 }
00658
00659
00660
00661 namespace {
00662 ACE_Message_Block* submsgs_to_msgblock(const RTPS::SubmessageSeq& subm)
00663 {
00664 size_t size = 0, padding = 0;
00665 for (CORBA::ULong i = 0; i < subm.length(); ++i) {
00666 if ((size + padding) % 4) {
00667 padding += 4 - ((size + padding) % 4);
00668 }
00669 gen_find_size(subm[i], size, padding);
00670 }
00671
00672 ACE_Message_Block* hdr = new ACE_Message_Block(size + padding);
00673
00674 for (CORBA::ULong i = 0; i < subm.length(); ++i) {
00675
00676 Serializer ser(hdr, false, Serializer::ALIGN_CDR);
00677 ser << subm[i];
00678 const size_t len = hdr->length();
00679 if (len % 4) {
00680 hdr->wr_ptr(4 - (len % 4));
00681 }
00682 }
00683 return hdr;
00684 }
00685 }
00686
00687 TransportQueueElement*
00688 RtpsUdpDataLink::customize_queue_element(TransportQueueElement* element)
00689 {
00690 const ACE_Message_Block* msg = element->msg();
00691 if (!msg) {
00692 return element;
00693 }
00694
00695 const RepoId pub_id = element->publication_id();
00696 GUIDSeq_var peers = peer_ids(pub_id);
00697 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, 0);
00698 bool requires_inline_qos = this->requires_inline_qos(peers);
00699
00700 RTPS::SubmessageSeq subm;
00701
00702 const RtpsWriterMap::iterator rw = writers_.find(pub_id);
00703
00704 bool gap_ok = true;
00705 DestToEntityMap gap_receivers;
00706 if (rw != writers_.end() && !rw->second.remote_readers_.empty()) {
00707 for (ReaderInfoMap::iterator ri = rw->second.remote_readers_.begin();
00708 ri != rw->second.remote_readers_.end(); ++ri) {
00709 RepoId tmp;
00710 std::memcpy(tmp.guidPrefix, ri->first.guidPrefix, sizeof(GuidPrefix_t));
00711 tmp.entityId = ENTITYID_UNKNOWN;
00712 gap_receivers[tmp].push_back(ri->first);
00713
00714 if (ri->second.expecting_durable_data()) {
00715
00716
00717
00718
00719 gap_ok = false;
00720 break;
00721 }
00722 }
00723 }
00724
00725 if (gap_ok) {
00726 add_gap_submsg(subm, *element, gap_receivers);
00727 }
00728
00729 const SequenceNumber seq = element->sequence();
00730 if (rw != writers_.end() && seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00731 rw->second.expected_ = seq;
00732 ++rw->second.expected_;
00733 }
00734
00735 TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
00736 TransportCustomizedElement* tce =
00737 dynamic_cast<TransportCustomizedElement*>(element);
00738 TransportSendControlElement* tsce =
00739 dynamic_cast<TransportSendControlElement*>(element);
00740
00741 Message_Block_Ptr data;
00742 bool durable = false;
00743
00744
00745
00746 if (tsce) {
00747 if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
00748 data.reset(msg->cont()->duplicate());
00749
00750 RtpsSampleHeader::populate_data_control_submessages(
00751 subm, *tsce, requires_inline_qos);
00752 } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
00753 end_historic_samples(rw, tsce->header(), msg->cont());
00754 element->data_delivered();
00755 return 0;
00756 } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
00757 send_heartbeats_manual(tsce);
00758 element->data_delivered();
00759 return 0;
00760 } else {
00761 element->data_dropped(true );
00762 return 0;
00763 }
00764
00765 } else if (tse) {
00766
00767 data.reset(msg->cont()->duplicate());
00768 const DataSampleElement* dsle = tse->sample();
00769
00770 RtpsSampleHeader::populate_data_sample_submessages(
00771 subm, *dsle, requires_inline_qos);
00772 durable = dsle->get_header().historic_sample_;
00773
00774 } else if (tce) {
00775
00776 data.reset(msg->cont()->cont()->duplicate());
00777 const DataSampleElement* dsle = tce->original_send_element()->sample();
00778
00779 RtpsSampleHeader::populate_data_sample_submessages(
00780 subm, *dsle, requires_inline_qos);
00781 durable = dsle->get_header().historic_sample_;
00782
00783 } else {
00784 return element;
00785 }
00786
00787 #if defined(OPENDDS_SECURITY)
00788 send_strategy()->encode_payload(pub_id, data, subm);
00789 #endif
00790
00791 Message_Block_Ptr hdr(submsgs_to_msgblock(subm));
00792 hdr->cont(data.release());
00793 RtpsCustomizedElement* rtps =
00794 new RtpsCustomizedElement(element, move(hdr));
00795
00796
00797 if (durable && rw != writers_.end()) {
00798 const RepoId sub = element->subscription_id();
00799 if (sub != GUID_UNKNOWN) {
00800 ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(sub);
00801 if (ri != rw->second.remote_readers_.end()) {
00802 ri->second.durable_data_[rtps->sequence()] = rtps;
00803 ri->second.durable_timestamp_ = ACE_OS::gettimeofday();
00804 if (Transport_debug_level > 3) {
00805 const GuidConverter conv(pub_id), sub_conv(sub);
00806 ACE_DEBUG((LM_DEBUG,
00807 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00808 "storing durable data for local %C remote %C\n",
00809 OPENDDS_STRING(conv).c_str(), OPENDDS_STRING(sub_conv).c_str()));
00810 }
00811 return 0;
00812 }
00813 }
00814 } else if (durable && (Transport_debug_level)) {
00815 const GuidConverter conv(pub_id);
00816 ACE_ERROR((LM_ERROR,
00817 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00818 "WARNING: no RtpsWriter to store durable data for local %C\n",
00819 OPENDDS_STRING(conv).c_str()));
00820 }
00821
00822 return rtps;
00823 }
00824
00825 void
00826 RtpsUdpDataLink::end_historic_samples(RtpsWriterMap::iterator writer,
00827 const DataSampleHeader& header,
00828 ACE_Message_Block* body)
00829 {
00830
00831
00832 if (writer != writers_.end() && writer->second.durable_) {
00833 const ACE_Time_Value now = ACE_OS::gettimeofday();
00834 RepoId sub = GUID_UNKNOWN;
00835 if (body && header.message_length_ >= sizeof(sub)) {
00836 std::memcpy(&sub, body->rd_ptr(), header.message_length_);
00837 }
00838 typedef ReaderInfoMap::iterator iter_t;
00839 if (sub == GUID_UNKNOWN) {
00840 if (Transport_debug_level > 3) {
00841 const GuidConverter conv(writer->first);
00842 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
00843 "local %C all readers\n", OPENDDS_STRING(conv).c_str()));
00844 }
00845 for (iter_t iter = writer->second.remote_readers_.begin();
00846 iter != writer->second.remote_readers_.end(); ++iter) {
00847 if (iter->second.durable_) {
00848 iter->second.durable_timestamp_ = now;
00849 }
00850 }
00851 } else {
00852 iter_t iter = writer->second.remote_readers_.find(sub);
00853 if (iter != writer->second.remote_readers_.end()) {
00854 if (iter->second.durable_) {
00855 iter->second.durable_timestamp_ = now;
00856 if (Transport_debug_level > 3) {
00857 const GuidConverter conv(writer->first), sub_conv(sub);
00858 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
00859 " local %C remote %C\n", OPENDDS_STRING(conv).c_str(),
00860 OPENDDS_STRING(sub_conv).c_str()));
00861 }
00862 }
00863 }
00864 }
00865 }
00866 }
00867
00868 bool RtpsUdpDataLink::requires_inline_qos(const GUIDSeq_var& peers)
00869 {
00870 if (force_inline_qos_) {
00871
00872 return true;
00873 } else {
00874 if (!peers.ptr()) {
00875 return false;
00876 }
00877 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::iterator iter_t;
00878 for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00879 const iter_t iter = locators_.find(peers[i]);
00880 if (iter != locators_.end() && iter->second.requires_inline_qos_) {
00881 return true;
00882 }
00883 }
00884 return false;
00885 }
00886 }
00887
00888 bool RtpsUdpDataLink::force_inline_qos_ = false;
00889
00890 void
00891 RtpsUdpDataLink::add_gap_submsg(RTPS::SubmessageSeq& msg,
00892 const TransportQueueElement& tqe,
00893 const DestToEntityMap& dtem)
00894 {
00895
00896
00897
00898
00899 using namespace OpenDDS::RTPS;
00900
00901 const SequenceNumber seq = tqe.sequence();
00902 const RepoId pub = tqe.publication_id();
00903 if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || pub == GUID_UNKNOWN
00904 || tqe.subscription_id() != GUID_UNKNOWN) {
00905 return;
00906 }
00907
00908 const RtpsWriterMap::iterator wi = writers_.find(pub);
00909 if (wi == writers_.end()) {
00910 return;
00911 }
00912
00913 RtpsWriter& rw = wi->second;
00914
00915 if (seq != rw.expected_) {
00916 SequenceNumber firstMissing = rw.expected_;
00917
00918
00919
00920 const SequenceNumber_t gapStart = {firstMissing.getHigh(),
00921 firstMissing.getLow()},
00922 gapListBase = {seq.getHigh(),
00923 seq.getLow()};
00924
00925
00926
00927 LongSeq8 bitmap;
00928 bitmap.length(1);
00929 bitmap[0] = 0;
00930
00931 GapSubmessage gap = {
00932 {GAP, FLAG_E, 0 },
00933 ENTITYID_UNKNOWN,
00934 pub.entityId,
00935 gapStart,
00936 {gapListBase, 1, bitmap}
00937 };
00938
00939 size_t size = 0, padding = 0;
00940 gen_find_size(gap, size, padding);
00941 gap.smHeader.submessageLength =
00942 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
00943
00944 if (!rw.durable_) {
00945 const CORBA::ULong i = msg.length();
00946 msg.length(i + 1);
00947 msg[i].gap_sm(gap);
00948 } else {
00949 InfoDestinationSubmessage idst = {
00950 {INFO_DST, FLAG_E, INFO_DST_SZ},
00951 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
00952 };
00953 CORBA::ULong ml = msg.length();
00954
00955
00956
00957 DestToEntityMap::const_iterator iter = dtem.begin();
00958 while (iter != dtem.end()) {
00959 std::memcpy(idst.guidPrefix, iter->first.guidPrefix, sizeof(GuidPrefix_t));
00960 msg.length(ml + 1);
00961 msg[ml++].info_dst_sm(idst);
00962
00963 const OPENDDS_VECTOR(RepoId)& readers = iter->second;
00964 for (size_t i = 0; i < readers.size(); ++i) {
00965 gap.readerId = readers.at(i).entityId;
00966 msg.length(ml + 1);
00967 msg[ml++].gap_sm(gap);
00968 }
00969 ++iter;
00970 }
00971 }
00972 }
00973 }
00974
00975
00976
00977
00978 void
00979 RtpsUdpDataLink::received(const RTPS::DataSubmessage& data,
00980 const GuidPrefix_t& src_prefix)
00981 {
00982 datareader_dispatch(data, src_prefix, &RtpsUdpDataLink::process_data_i);
00983 }
00984
00985 bool
00986 RtpsUdpDataLink::process_data_i(const RTPS::DataSubmessage& data,
00987 const RepoId& src,
00988 RtpsReaderMap::value_type& rr)
00989 {
00990 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
00991 if (wi != rr.second.remote_writers_.end()) {
00992 WriterInfo& info = wi->second;
00993 SequenceNumber seq;
00994 seq.setValue(data.writerSN.high, data.writerSN.low);
00995 info.frags_.erase(seq);
00996 const RepoId& readerId = rr.first;
00997 if (info.recvd_.contains(seq)) {
00998 if (Transport_debug_level > 5) {
00999 GuidConverter writer(src);
01000 GuidConverter reader(readerId);
01001 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01002 ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because ALREADY received\n"),
01003 seq.getValue(),
01004 OPENDDS_STRING(writer).c_str(),
01005 OPENDDS_STRING(reader).c_str()));
01006 }
01007 receive_strategy()->withhold_data_from(readerId);
01008 } else if (info.recvd_.disjoint() ||
01009 (!info.recvd_.empty() && info.recvd_.cumulative_ack() != seq.previous())
01010 || (rr.second.durable_ && !info.recvd_.empty() && info.recvd_.low() > 1)
01011 || (rr.second.durable_ && info.recvd_.empty() && seq > 1)) {
01012 if (Transport_debug_level > 5) {
01013 GuidConverter writer(src);
01014 GuidConverter reader(readerId);
01015 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01016 ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because can't receive yet\n"),
01017 seq.getValue(),
01018 OPENDDS_STRING(writer).c_str(),
01019 OPENDDS_STRING(reader).c_str()));
01020 }
01021 const ReceivedDataSample* sample =
01022 receive_strategy()->withhold_data_from(readerId);
01023 info.held_.insert(std::make_pair(seq, *sample));
01024 } else {
01025 if (Transport_debug_level > 5) {
01026 GuidConverter writer(src);
01027 GuidConverter reader(readerId);
01028 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01029 ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
01030 seq.getValue(),
01031 OPENDDS_STRING(writer).c_str(),
01032 OPENDDS_STRING(reader).c_str()));
01033 }
01034 receive_strategy()->do_not_withhold_data_from(readerId);
01035 }
01036 info.recvd_.insert(seq);
01037 deliver_held_data(readerId, info, rr.second.durable_);
01038 } else {
01039 if (Transport_debug_level > 5) {
01040 GuidConverter writer(src);
01041 GuidConverter reader(rr.first);
01042 SequenceNumber seq;
01043 seq.setValue(data.writerSN.high, data.writerSN.low);
01044 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
01045 ACE_TEXT(" data seq: %q from %C to %C OK to deliver (Writer not currently in Reader remote writer map)\n"),
01046 seq.getValue(),
01047 OPENDDS_STRING(writer).c_str(),
01048 OPENDDS_STRING(reader).c_str()));
01049 }
01050 receive_strategy()->do_not_withhold_data_from(rr.first);
01051 }
01052 return false;
01053 }
01054
01055 void
01056 RtpsUdpDataLink::deliver_held_data(const RepoId& readerId, WriterInfo& info,
01057 bool durable)
01058 {
01059 if (durable && (info.recvd_.empty() || info.recvd_.low() > 1)) return;
01060 held_data_delivery_handler_.notify_delivery(readerId, info);
01061 }
01062
01063 void
01064 RtpsUdpDataLink::received(const RTPS::GapSubmessage& gap,
01065 const GuidPrefix_t& src_prefix)
01066 {
01067 datareader_dispatch(gap, src_prefix, &RtpsUdpDataLink::process_gap_i);
01068 }
01069
01070 bool
01071 RtpsUdpDataLink::process_gap_i(const RTPS::GapSubmessage& gap,
01072 const RepoId& src, RtpsReaderMap::value_type& rr)
01073 {
01074 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01075 if (wi != rr.second.remote_writers_.end()) {
01076 SequenceRange sr;
01077 sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
01078 SequenceNumber base;
01079 base.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
01080 SequenceNumber first_received = SequenceNumber::MAX_VALUE;
01081 if (!wi->second.recvd_.empty()) {
01082 OPENDDS_VECTOR(SequenceRange) missing = wi->second.recvd_.missing_sequence_ranges();
01083 if (!missing.empty()) {
01084 first_received = missing.front().second;
01085 }
01086 }
01087 sr.second = std::min(first_received, base.previous());
01088 if (sr.first <= sr.second) {
01089 if (Transport_debug_level > 5) {
01090 const GuidConverter conv(src);
01091 const GuidConverter rdr(rr.first);
01092 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01093 "Reader %C received GAP with range [%q, %q] (inserting range [%q, %q]) from %C\n",
01094 OPENDDS_STRING(rdr).c_str(),
01095 sr.first.getValue(), base.previous().getValue(),
01096 sr.first.getValue(), sr.second.getValue(),
01097 OPENDDS_STRING(conv).c_str()));
01098 }
01099 wi->second.recvd_.insert(sr);
01100 } else {
01101 const GuidConverter conv(src);
01102 VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01103 "received GAP with invalid range [%q, %q] from %C\n",
01104 sr.first.getValue(), sr.second.getValue(),
01105 OPENDDS_STRING(conv).c_str()), 2);
01106 }
01107 wi->second.recvd_.insert(base, gap.gapList.numBits,
01108 gap.gapList.bitmap.get_buffer());
01109 deliver_held_data(rr.first, wi->second, rr.second.durable_);
01110
01111 }
01112 return false;
01113 }
01114
01115 void
01116 RtpsUdpDataLink::received(const RTPS::HeartBeatSubmessage& heartbeat,
01117 const GuidPrefix_t& src_prefix)
01118 {
01119 RepoId src;
01120 std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01121 src.entityId = heartbeat.writerId;
01122
01123 bool schedule_acknack = false;
01124 const ACE_Time_Value now = ACE_OS::gettimeofday();
01125 OPENDDS_VECTOR(InterestingRemote) callbacks;
01126
01127 {
01128 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01129
01130
01131
01132
01133 for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
01134 limit = interesting_writers_.upper_bound(src);
01135 pos != limit;
01136 ++pos) {
01137 const RepoId& writerid = src;
01138 const RepoId& readerid = pos->second.localid;
01139
01140 RtpsReaderMap::const_iterator riter = readers_.find(readerid);
01141 if (riter == readers_.end()) {
01142
01143 interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01144 } else if (riter->second.remote_writers_.find(writerid) == riter->second.remote_writers_.end()) {
01145
01146 interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01147 }
01148 pos->second.last_activity = now;
01149 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01150 callbacks.push_back(pos->second);
01151 pos->second.status = InterestingRemote::EXISTS;
01152 }
01153 }
01154
01155 schedule_acknack = !interesting_ack_nacks_.empty();
01156 }
01157
01158 for (size_t i = 0; i < callbacks.size(); ++i) {
01159 callbacks[i].listener->writer_exists(src, callbacks[i].localid);
01160 }
01161
01162 if (schedule_acknack) {
01163 heartbeat_reply_.schedule();
01164 }
01165
01166 datareader_dispatch(heartbeat, src_prefix,
01167 &RtpsUdpDataLink::process_heartbeat_i);
01168 }
01169
01170 bool
01171 RtpsUdpDataLink::process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
01172 const RepoId& src,
01173 RtpsReaderMap::value_type& rr)
01174 {
01175 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01176 if (wi == rr.second.remote_writers_.end()) {
01177
01178 return false;
01179 }
01180
01181 WriterInfo& info = wi->second;
01182
01183 if (heartbeat.count.value <= info.heartbeat_recvd_count_) {
01184 return false;
01185 }
01186 info.heartbeat_recvd_count_ = heartbeat.count.value;
01187
01188 SequenceNumber& first = info.hb_range_.first;
01189 first.setValue(heartbeat.firstSN.high, heartbeat.firstSN.low);
01190 SequenceNumber& last = info.hb_range_.second;
01191 last.setValue(heartbeat.lastSN.high, heartbeat.lastSN.low);
01192 static const SequenceNumber starting, zero = SequenceNumber::ZERO();
01193
01194 DisjointSequence& recvd = info.recvd_;
01195 if (!rr.second.durable_ && info.initial_hb_) {
01196 if (last.getValue() < starting.getValue()) {
01197
01198 return false;
01199 }
01200
01201
01202 if (recvd.empty() || recvd.low() >= last) {
01203 recvd.insert(SequenceRange(zero, last));
01204 } else {
01205 recvd.insert(SequenceRange(zero, recvd.low()));
01206 }
01207 } else if (!recvd.empty()) {
01208
01209
01210 recvd.insert(SequenceRange(zero,
01211 (first > starting) ? first.previous() : zero));
01212 }
01213
01214 deliver_held_data(rr.first, info, rr.second.durable_);
01215
01216
01217
01218
01219 info.initial_hb_ = false;
01220
01221 const bool final = heartbeat.smHeader.flags & RTPS::FLAG_F,
01222 liveliness = heartbeat.smHeader.flags & RTPS::FLAG_L;
01223
01224 if (!final || (!liveliness && (info.should_nack() ||
01225 rr.second.nack_durable(info) ||
01226 receive_strategy()->has_fragments(info.hb_range_, wi->first)))) {
01227 info.ack_pending_ = true;
01228 return true;
01229 }
01230
01231
01232 return false;
01233 }
01234
01235 bool
01236 RtpsUdpDataLink::WriterInfo::should_nack() const
01237 {
01238 if (recvd_.disjoint() && recvd_.cumulative_ack() < hb_range_.second) {
01239 return true;
01240 }
01241 if (!recvd_.empty()) {
01242 return recvd_.high() < hb_range_.second;
01243 }
01244 return false;
01245 }
01246
01247 bool
01248 RtpsUdpDataLink::RtpsReader::nack_durable(const WriterInfo& info)
01249 {
01250 return durable_ && (info.recvd_.empty() ||
01251 info.recvd_.low() > info.hb_range_.first);
01252 }
01253
01254 void
01255 RtpsUdpDataLink::send_ack_nacks(RtpsReaderMap::iterator rr, bool finalFlag)
01256 {
01257 using namespace OpenDDS::RTPS;
01258
01259 WriterInfoMap& writers = rr->second.remote_writers_;
01260 for (WriterInfoMap::iterator wi = writers.begin(); wi != writers.end();
01261 ++wi) {
01262
01263
01264 DisjointSequence& recvd = wi->second.recvd_;
01265 const bool nack = wi->second.should_nack() ||
01266 rr->second.nack_durable(wi->second);
01267 bool final = finalFlag || !nack;
01268
01269 if (wi->second.ack_pending_ || nack || finalFlag) {
01270 const bool prev_ack_pending = wi->second.ack_pending_;
01271 wi->second.ack_pending_ = false;
01272
01273 SequenceNumber ack;
01274 CORBA::ULong num_bits = 1;
01275 LongSeq8 bitmap;
01276 bitmap.length(1);
01277 bitmap[0] = 0;
01278
01279 const SequenceNumber& hb_low = wi->second.hb_range_.first;
01280 const SequenceNumber& hb_high = wi->second.hb_range_.second;
01281 const SequenceNumber::Value hb_low_val = hb_low.getValue(),
01282 hb_high_val = hb_high.getValue();
01283
01284 if (recvd.empty()) {
01285
01286 ack = hb_low;
01287 bitmap.length(bitmap_num_longs(ack, hb_high));
01288 const CORBA::ULong idx = (hb_high_val > hb_low_val + 255)
01289 ? 255
01290 : CORBA::ULong(hb_high_val - hb_low_val);
01291 DisjointSequence::fill_bitmap_range(0, idx,
01292 bitmap.get_buffer(),
01293 bitmap.length(), num_bits);
01294 } else if (((prev_ack_pending && !nack) || rr->second.nack_durable(wi->second)) && recvd.low() > hb_low) {
01295
01296 ack = hb_low;
01297 const SequenceNumber& rec_low = recvd.low();
01298 const SequenceNumber::Value rec_low_val = rec_low.getValue();
01299 bitmap.length(bitmap_num_longs(ack, rec_low));
01300 const CORBA::ULong idx = (rec_low_val > hb_low_val + 255)
01301 ? 255
01302 : CORBA::ULong(rec_low_val - hb_low_val);
01303 DisjointSequence::fill_bitmap_range(0, idx,
01304 bitmap.get_buffer(),
01305 bitmap.length(), num_bits);
01306
01307 } else {
01308 ack = ++SequenceNumber(recvd.cumulative_ack());
01309 if (recvd.low().getValue() > 1) {
01310
01311
01312 recvd.insert(SequenceRange(SequenceNumber::ZERO(), recvd.low()));
01313 }
01314
01315 if (recvd.disjoint()) {
01316 bitmap.length(bitmap_num_longs(ack, recvd.last_ack().previous()));
01317 recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
01318 num_bits, true);
01319 }
01320 }
01321
01322 const SequenceNumber::Value ack_val = ack.getValue();
01323
01324 if (!recvd.empty() && hb_high > recvd.high()) {
01325 const SequenceNumber eff_high =
01326 (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
01327 const SequenceNumber::Value eff_high_val = eff_high.getValue();
01328
01329 const CORBA::ULong old_len = bitmap.length(),
01330 new_len = bitmap_num_longs(ack, eff_high);
01331 if (new_len > old_len) {
01332 bitmap.length(new_len);
01333 for (CORBA::ULong i = old_len; i < new_len; ++i) {
01334 bitmap[i] = 0;
01335 }
01336 }
01337 const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
01338 idx_recv_high = recvd.disjoint() ?
01339 CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
01340 DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
01341 bitmap.get_buffer(), new_len,
01342 num_bits);
01343 }
01344
01345
01346
01347
01348 bool frags_modified =
01349 receive_strategy()->remove_frags_from_bitmap(bitmap.get_buffer(),
01350 num_bits, ack, wi->first);
01351 if (frags_modified && !final) {
01352 final = true;
01353 for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
01354 if ((i + 1) * 32 <= num_bits) {
01355 if (bitmap[i]) {
01356 final = false;
01357 break;
01358 }
01359 } else {
01360 if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
01361 final = false;
01362 break;
01363 }
01364 }
01365 }
01366 }
01367
01368 AckNackSubmessage acknack = {
01369 {ACKNACK,
01370 CORBA::Octet(FLAG_E | (final ? FLAG_F : 0)),
01371 0 },
01372 rr->first.entityId,
01373 wi->first.entityId,
01374 {
01375 {ack.getHigh(), ack.getLow()},
01376 num_bits, bitmap
01377 },
01378 {++wi->second.acknack_count_}
01379 };
01380
01381 size_t size = 0, padding = 0;
01382 gen_find_size(acknack, size, padding);
01383 acknack.smHeader.submessageLength =
01384 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01385 InfoDestinationSubmessage info_dst = {
01386 {INFO_DST, FLAG_E, INFO_DST_SZ},
01387 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01388 };
01389 gen_find_size(info_dst, size, padding);
01390
01391 OPENDDS_VECTOR(NackFragSubmessage) nack_frags;
01392 size += generate_nack_frags(nack_frags, wi->second, wi->first);
01393
01394 ACE_Message_Block mb_acknack(size + padding);
01395
01396 Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01397 std::memcpy(info_dst.guidPrefix, wi->first.guidPrefix,
01398 sizeof(GuidPrefix_t));
01399 ser << info_dst;
01400
01401
01402 ser << acknack;
01403 for (size_t i = 0; i < nack_frags.size(); ++i) {
01404 nack_frags[i].readerId = rr->first.entityId;
01405 nack_frags[i].writerId = wi->first.entityId;
01406 ser << nack_frags[i];
01407 }
01408
01409 if (!locators_.count(wi->first)) {
01410 if (Transport_debug_level) {
01411 const GuidConverter conv(wi->first);
01412 ACE_ERROR((LM_ERROR,
01413 "(%P|%t) RtpsUdpDataLink::send_heartbeat_replies() - "
01414 "no locator for remote %C\n", OPENDDS_STRING(conv).c_str()));
01415 }
01416 } else {
01417 send_strategy()->send_rtps_control(mb_acknack,
01418 locators_[wi->first].addr_);
01419 }
01420 }
01421 }
01422 }
01423
01424 void
01425 RtpsUdpDataLink::send_heartbeat_replies()
01426 {
01427 using namespace OpenDDS::RTPS;
01428 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01429
01430 for (InterestingAckNackSetType::const_iterator pos = interesting_ack_nacks_.begin(),
01431 limit = interesting_ack_nacks_.end();
01432 pos != limit;
01433 ++pos) {
01434
01435 SequenceNumber ack;
01436 LongSeq8 bitmap;
01437 bitmap.length(0);
01438
01439 AckNackSubmessage acknack = {
01440 {ACKNACK,
01441 CORBA::Octet(FLAG_E | FLAG_F),
01442 0 },
01443 pos->readerid.entityId,
01444 pos->writerid.entityId,
01445 {
01446 {ack.getHigh(), ack.getLow()},
01447 0 , bitmap
01448 },
01449 {0 }
01450 };
01451
01452 size_t size = 0, padding = 0;
01453 gen_find_size(acknack, size, padding);
01454 acknack.smHeader.submessageLength =
01455 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01456 InfoDestinationSubmessage info_dst = {
01457 {INFO_DST, FLAG_E, INFO_DST_SZ},
01458 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01459 };
01460 gen_find_size(info_dst, size, padding);
01461
01462 ACE_Message_Block mb_acknack(size + padding);
01463
01464 Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01465 std::memcpy(info_dst.guidPrefix, pos->writerid.guidPrefix,
01466 sizeof(GuidPrefix_t));
01467 ser << info_dst;
01468
01469
01470 ser << acknack;
01471
01472 send_strategy()->send_rtps_control(mb_acknack, pos->writer_address);
01473 }
01474 interesting_ack_nacks_.clear();
01475
01476 for (RtpsReaderMap::iterator rr = readers_.begin(); rr != readers_.end();
01477 ++rr) {
01478 send_ack_nacks (rr);
01479 }
01480 }
01481
01482 size_t
01483 RtpsUdpDataLink::generate_nack_frags(OPENDDS_VECTOR(RTPS::NackFragSubmessage)& nf,
01484 WriterInfo& wi, const RepoId& pub_id)
01485 {
01486 typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
01487 typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
01488 RtpsUdpReceiveStrategy::FragmentInfo frag_info;
01489
01490
01491
01492 OPENDDS_VECTOR(SequenceRange) missing = wi.recvd_.missing_sequence_ranges();
01493 for (size_t i = 0; i < missing.size(); ++i) {
01494 receive_strategy()->has_fragments(missing[i], pub_id, &frag_info);
01495 }
01496
01497 if (!wi.recvd_.empty()) {
01498 const SequenceRange range(wi.recvd_.high(), wi.hb_range_.second);
01499 receive_strategy()->has_fragments(range, pub_id, &frag_info);
01500 }
01501 for (size_t i = 0; i < frag_info.size(); ++i) {
01502
01503 const iter_t heartbeat_frag = wi.frags_.find(frag_info[i].first);
01504 if (heartbeat_frag != wi.frags_.end()) {
01505 extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value);
01506 }
01507 }
01508
01509
01510 const iter_t low = wi.frags_.lower_bound(wi.recvd_.cumulative_ack()),
01511 high = wi.frags_.upper_bound(wi.recvd_.last_ack()),
01512 end = wi.frags_.end();
01513 for (iter_t iter = wi.frags_.begin(); iter != end; ++iter) {
01514 if (iter == low) {
01515
01516 if (high == end) {
01517 break;
01518 }
01519 iter = high;
01520 }
01521
01522 const SequenceRange range(iter->first, iter->first);
01523 if (receive_strategy()->has_fragments(range, pub_id, &frag_info)) {
01524 extend_bitmap_range(frag_info.back().second, iter->second.value);
01525 } else {
01526
01527 frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
01528 RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
01529 fnSet.bitmapBase.value = 1;
01530 fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
01531 fnSet.bitmap.length((fnSet.numBits + 31) / 32);
01532 for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
01533 fnSet.bitmap[i] = 0xFFFFFFFF;
01534 }
01535 }
01536 }
01537
01538 if (frag_info.empty()) {
01539 return 0;
01540 }
01541
01542 const RTPS::NackFragSubmessage nackfrag_prototype = {
01543 {RTPS::NACK_FRAG, RTPS::FLAG_E, 0 },
01544 ENTITYID_UNKNOWN,
01545 ENTITYID_UNKNOWN,
01546 {0, 0},
01547 RTPS::FragmentNumberSet(),
01548 {0}
01549 };
01550
01551 size_t size = 0, padding = 0;
01552 for (size_t i = 0; i < frag_info.size(); ++i) {
01553 nf.push_back(nackfrag_prototype);
01554 RTPS::NackFragSubmessage& nackfrag = nf.back();
01555 nackfrag.writerSN.low = frag_info[i].first.getLow();
01556 nackfrag.writerSN.high = frag_info[i].first.getHigh();
01557 nackfrag.fragmentNumberState = frag_info[i].second;
01558 nackfrag.count.value = ++wi.nackfrag_count_;
01559 const size_t before_size = size;
01560 gen_find_size(nackfrag, size, padding);
01561 nackfrag.smHeader.submessageLength =
01562 static_cast<CORBA::UShort>(size - before_size) - RTPS::SMHDR_SZ;
01563 }
01564 return size;
01565 }
01566
01567 void
01568 RtpsUdpDataLink::extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
01569 CORBA::ULong extent)
01570 {
01571 if (extent < fnSet.bitmapBase.value) {
01572 return;
01573 }
01574
01575 const CORBA::ULong new_num_bits = std::min(CORBA::ULong(255),
01576 extent - fnSet.bitmapBase.value + 1),
01577 len = (new_num_bits + 31) / 32;
01578 if (new_num_bits < fnSet.numBits) {
01579 return;
01580 }
01581 fnSet.bitmap.length(len);
01582
01583 DisjointSequence::fill_bitmap_range(fnSet.numBits + 1, new_num_bits,
01584 fnSet.bitmap.get_buffer(), len,
01585 fnSet.numBits);
01586 }
01587
01588 void
01589 RtpsUdpDataLink::received(const RTPS::HeartBeatFragSubmessage& hb_frag,
01590 const GuidPrefix_t& src_prefix)
01591 {
01592 datareader_dispatch(hb_frag, src_prefix, &RtpsUdpDataLink::process_hb_frag_i);
01593 }
01594
01595 bool
01596 RtpsUdpDataLink::process_hb_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
01597 const RepoId& src,
01598 RtpsReaderMap::value_type& rr)
01599 {
01600 WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01601 if (wi == rr.second.remote_writers_.end()) {
01602
01603 return false;
01604 }
01605
01606 if (hb_frag.count.value <= wi->second.hb_frag_recvd_count_) {
01607 return false;
01608 }
01609
01610 wi->second.hb_frag_recvd_count_ = hb_frag.count.value;
01611
01612 SequenceNumber seq;
01613 seq.setValue(hb_frag.writerSN.high, hb_frag.writerSN.low);
01614
01615
01616
01617
01618
01619 if (seq < wi->second.hb_range_.first || seq > wi->second.hb_range_.second
01620 || !wi->second.recvd_.contains(seq)) {
01621 wi->second.frags_[seq] = hb_frag.lastFragmentNum;
01622 wi->second.ack_pending_ = true;
01623 return true;
01624 }
01625 return false;
01626 }
01627
01628
01629
01630
01631 void
01632 RtpsUdpDataLink::received(const RTPS::AckNackSubmessage& acknack,
01633 const GuidPrefix_t& src_prefix)
01634 {
01635
01636 RepoId local;
01637 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01638 local.entityId = acknack.writerId;
01639
01640 RepoId remote;
01641 std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01642 remote.entityId = acknack.readerId;
01643
01644 const ACE_Time_Value now = ACE_OS::gettimeofday();
01645 OPENDDS_VECTOR(DiscoveryListener*) callbacks;
01646
01647 {
01648 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01649 for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
01650 limit = interesting_readers_.upper_bound(remote);
01651 pos != limit;
01652 ++pos) {
01653 pos->second.last_activity = now;
01654
01655 if (local == pos->second.localid) {
01656 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01657 callbacks.push_back(pos->second.listener);
01658 pos->second.status = InterestingRemote::EXISTS;
01659 }
01660 }
01661 }
01662 }
01663
01664 for (size_t i = 0; i < callbacks.size(); ++i) {
01665 callbacks[i]->reader_exists(remote, local);
01666 }
01667
01668 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01669 const RtpsWriterMap::iterator rw = writers_.find(local);
01670 if (rw == writers_.end()) {
01671 if (Transport_debug_level > 5) {
01672 GuidConverter local_conv(local);
01673 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01674 "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01675 }
01676 return;
01677 }
01678
01679 if (Transport_debug_level > 5) {
01680 GuidConverter local_conv(local), remote_conv(remote);
01681 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01682 "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01683 OPENDDS_STRING(remote_conv).c_str()));
01684 }
01685
01686 const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01687 if (ri == rw->second.remote_readers_.end()) {
01688 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01689 "WARNING ReaderInfo not found\n"));
01690 return;
01691 }
01692
01693 if (acknack.count.value <= ri->second.acknack_recvd_count_) {
01694 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01695 "WARNING Count indicates duplicate, dropping\n"));
01696 return;
01697 }
01698
01699 ri->second.acknack_recvd_count_ = acknack.count.value;
01700
01701 if (!ri->second.handshake_done_) {
01702 ri->second.handshake_done_ = true;
01703 invoke_on_start_callbacks(true);
01704 }
01705
01706 OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
01707 const bool final = acknack.smHeader.flags & RTPS::FLAG_F;
01708
01709 if (!ri->second.durable_data_.empty()) {
01710 if (Transport_debug_level > 5) {
01711 const GuidConverter local_conv(local), remote_conv(remote);
01712 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01713 "local %C has durable for remote %C\n",
01714 OPENDDS_STRING(local_conv).c_str(),
01715 OPENDDS_STRING(remote_conv).c_str()));
01716 }
01717 SequenceNumber ack;
01718 ack.setValue(acknack.readerSNState.bitmapBase.high,
01719 acknack.readerSNState.bitmapBase.low);
01720 const SequenceNumber& dd_last = ri->second.durable_data_.rbegin()->first;
01721 if (ack > dd_last) {
01722
01723 ri->second.durable_data_.swap(pendingCallbacks);
01724 if (Transport_debug_level > 5) {
01725 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01726 "durable data acked\n"));
01727 }
01728 } else {
01729 DisjointSequence requests;
01730 if (!requests.insert(ack, acknack.readerSNState.numBits,
01731 acknack.readerSNState.bitmap.get_buffer())
01732 && !final && ack == rw->second.heartbeat_high(ri->second)) {
01733
01734
01735
01736 if (ri->second.durable_data_.count(ack)) requests.insert(ack);
01737 }
01738
01739 bool sent_some = false;
01740 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01741 iter_t it = ri->second.durable_data_.begin();
01742 const OPENDDS_VECTOR(SequenceRange) psr = requests.present_sequence_ranges();
01743 SequenceNumber lastSent = SequenceNumber::ZERO();
01744 if (!requests.empty()) {
01745 lastSent = requests.low().previous();
01746 }
01747 DisjointSequence gaps;
01748 for (size_t i = 0; i < psr.size(); ++i) {
01749 for (; it != ri->second.durable_data_.end()
01750 && it->first < psr[i].first; ++it) ;
01751 for (; it != ri->second.durable_data_.end()
01752 && it->first <= psr[i].second; ++it) {
01753 if (Transport_debug_level > 5) {
01754 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01755 "durable resend %d\n", int(it->first.getValue())));
01756 }
01757 durability_resend(it->second);
01758
01759 sent_some = true;
01760 if (it->first > lastSent + 1) {
01761 gaps.insert(SequenceRange(lastSent + 1, it->first.previous()));
01762 }
01763 lastSent = it->first;
01764 }
01765 if (sent_some && lastSent < psr[i].second && psr[i].second < dd_last) {
01766 gaps.insert(SequenceRange(lastSent + 1, psr[i].second));
01767 }
01768 }
01769 if (!gaps.empty()) {
01770 if (Transport_debug_level > 5) {
01771 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01772 "sending durability gaps: "));
01773 gaps.dump();
01774 }
01775 send_durability_gaps(local, remote, gaps);
01776 }
01777 if (sent_some) {
01778 return;
01779 }
01780 const SequenceNumber& dd_first = ri->second.durable_data_.begin()->first;
01781 if (!requests.empty() && requests.high() < dd_first) {
01782
01783 requests.insert(SequenceRange(requests.high(), dd_first.previous()));
01784 if (Transport_debug_level > 5) {
01785 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01786 "sending durability gaps for all requests: "));
01787 requests.dump();
01788 }
01789 send_durability_gaps(local, remote, requests);
01790 return;
01791 }
01792 if (!requests.empty() && requests.low() < dd_first) {
01793
01794 for (size_t i = 0; i < psr.size(); ++i) {
01795 if (psr[i].first > dd_first) {
01796 break;
01797 }
01798 gaps.insert(SequenceRange(psr[i].first,
01799 std::min(psr[i].second, dd_first)));
01800 }
01801 if (Transport_debug_level > 5) {
01802 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01803 "sending durability gaps for some requests: "));
01804 gaps.dump();
01805 }
01806 send_durability_gaps(local, remote, gaps);
01807 return;
01808 }
01809 }
01810 }
01811 SequenceNumber ack;
01812 ack.setValue(acknack.readerSNState.bitmapBase.high,
01813 acknack.readerSNState.bitmapBase.low);
01814 if (ack != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
01815 && ack != SequenceNumber::ZERO()) {
01816 ri->second.cur_cumulative_ack_ = ack;
01817 }
01818
01819
01820 if (!final || bitmapNonEmpty(acknack.readerSNState)) {
01821 ri->second.requested_changes_.push_back(acknack.readerSNState);
01822 }
01823 process_acked_by_all_i(g, local);
01824 g.release();
01825 if (!final) {
01826 nack_reply_.schedule();
01827 }
01828 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01829 for (iter_t it = pendingCallbacks.begin();
01830 it != pendingCallbacks.end(); ++it) {
01831 it->second->data_delivered();
01832 }
01833 }
01834
01835 void
01836 RtpsUdpDataLink::received(const RTPS::NackFragSubmessage& nackfrag,
01837 const GuidPrefix_t& src_prefix)
01838 {
01839
01840 RepoId local;
01841 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01842 local.entityId = nackfrag.writerId;
01843
01844 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01845 const RtpsWriterMap::iterator rw = writers_.find(local);
01846 if (rw == writers_.end()) {
01847 if (Transport_debug_level > 5) {
01848 GuidConverter local_conv(local);
01849 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01850 "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01851 }
01852 return;
01853 }
01854
01855 RepoId remote;
01856 std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01857 remote.entityId = nackfrag.readerId;
01858
01859 if (Transport_debug_level > 5) {
01860 GuidConverter local_conv(local), remote_conv(remote);
01861 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01862 "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01863 OPENDDS_STRING(remote_conv).c_str()));
01864 }
01865
01866 const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01867 if (ri == rw->second.remote_readers_.end()) {
01868 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01869 "WARNING ReaderInfo not found\n"));
01870 return;
01871 }
01872
01873 if (nackfrag.count.value <= ri->second.nackfrag_recvd_count_) {
01874 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01875 "WARNING Count indicates duplicate, dropping\n"));
01876 return;
01877 }
01878
01879 ri->second.nackfrag_recvd_count_ = nackfrag.count.value;
01880
01881 SequenceNumber seq;
01882 seq.setValue(nackfrag.writerSN.high, nackfrag.writerSN.low);
01883 ri->second.requested_frags_[seq] = nackfrag.fragmentNumberState;
01884 g.release();
01885 nack_reply_.schedule();
01886 }
01887
01888 void
01889 RtpsUdpDataLink::send_nack_replies()
01890 {
01891 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01892
01893 using namespace OpenDDS::RTPS;
01894 typedef RtpsWriterMap::iterator rw_iter;
01895 for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
01896
01897
01898 OPENDDS_SET(ACE_INET_Addr) recipients;
01899 DisjointSequence requests;
01900 RtpsWriter& writer = rw->second;
01901
01902
01903 SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
01904
01905 #if defined(OPENDDS_SECURITY)
01906 const EntityId_t& pvs_writer =
01907 RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
01908 const bool is_pvs_writer =
01909 0 == std::memcmp(&pvs_writer, &rw->first.entityId, sizeof pvs_writer);
01910 #endif
01911
01912 typedef ReaderInfoMap::iterator ri_iter;
01913 const ri_iter end = writer.remote_readers_.end();
01914 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01915
01916 if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
01917 all_readers_ack = ri->second.cur_cumulative_ack_;
01918 }
01919
01920 #if defined(OPENDDS_SECURITY)
01921 if (is_pvs_writer && !ri->second.requested_changes_.empty()) {
01922 send_directed_nack_replies(rw->first, writer, ri->first, ri->second);
01923 continue;
01924 }
01925 #endif
01926
01927 process_requested_changes(requests, writer, ri->second);
01928
01929 if (!ri->second.requested_changes_.empty()) {
01930 if (locators_.count(ri->first)) {
01931 recipients.insert(locators_[ri->first].addr_);
01932 if (Transport_debug_level > 5) {
01933 const GuidConverter local_conv(rw->first), remote_conv(ri->first);
01934 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01935 "local %C remote %C requested resend\n",
01936 OPENDDS_STRING(local_conv).c_str(),
01937 OPENDDS_STRING(remote_conv).c_str()));
01938 }
01939 }
01940 ri->second.requested_changes_.clear();
01941 }
01942 }
01943
01944 DisjointSequence gaps;
01945 if (!requests.empty()) {
01946 if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
01947 gaps = requests;
01948 } else {
01949 OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
01950 SingleSendBuffer& sb = *writer.send_buff_;
01951 ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
01952 const RtpsUdpSendStrategy::OverrideToken ot =
01953 send_strategy()->override_destinations(recipients);
01954 for (size_t i = 0; i < ranges.size(); ++i) {
01955 if (Transport_debug_level > 5) {
01956 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01957 "resend data %d-%d\n", int(ranges[i].first.getValue()),
01958 int(ranges[i].second.getValue())));
01959 }
01960 sb.resend_i(ranges[i], &gaps);
01961 }
01962 }
01963 }
01964
01965 send_nackfrag_replies(writer, gaps, recipients);
01966
01967 if (!gaps.empty()) {
01968 if (Transport_debug_level > 5) {
01969 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01970 "GAPs:"));
01971 gaps.dump();
01972 }
01973 ACE_Message_Block* mb_gap =
01974 marshal_gaps(rw->first, GUID_UNKNOWN, gaps, writer.durable_);
01975 if (mb_gap) {
01976 send_strategy()->send_rtps_control(*mb_gap, recipients);
01977 mb_gap->release();
01978 }
01979 }
01980 if (all_readers_ack == SequenceNumber::MAX_VALUE) {
01981 continue;
01982 }
01983 }
01984 }
01985
01986 void
01987 RtpsUdpDataLink::send_nackfrag_replies(RtpsWriter& writer,
01988 DisjointSequence& gaps,
01989 OPENDDS_SET(ACE_INET_Addr)& gap_recipients)
01990 {
01991 typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
01992 OPENDDS_MAP(ACE_INET_Addr, FragmentInfo) requests;
01993
01994 typedef ReaderInfoMap::iterator ri_iter;
01995 const ri_iter end = writer.remote_readers_.end();
01996 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01997
01998 if (ri->second.requested_frags_.empty() || !locators_.count(ri->first)) {
01999 continue;
02000 }
02001
02002 const ACE_INET_Addr& remote_addr = locators_[ri->first].addr_;
02003
02004 typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet)::iterator rf_iter;
02005 const rf_iter rf_end = ri->second.requested_frags_.end();
02006 for (rf_iter rf = ri->second.requested_frags_.begin(); rf != rf_end; ++rf) {
02007
02008 const SequenceNumber& seq = rf->first;
02009 if (writer.send_buff_->contains(seq)) {
02010 FragmentInfo& fi = requests[remote_addr];
02011 fi[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
02012 rf->second.bitmap.get_buffer());
02013 } else {
02014 gaps.insert(seq);
02015 gap_recipients.insert(remote_addr);
02016 }
02017 }
02018 ri->second.requested_frags_.clear();
02019 }
02020
02021 typedef OPENDDS_MAP(ACE_INET_Addr, FragmentInfo)::iterator req_iter;
02022 for (req_iter req = requests.begin(); req != requests.end(); ++req) {
02023 const FragmentInfo& fi = req->second;
02024
02025 ACE_GUARD(TransportSendBuffer::LockType, guard,
02026 writer.send_buff_->strategy_lock());
02027 const RtpsUdpSendStrategy::OverrideToken ot =
02028 send_strategy()->override_destinations(req->first);
02029
02030 for (FragmentInfo::const_iterator sn_iter = fi.begin();
02031 sn_iter != fi.end(); ++sn_iter) {
02032 const SequenceNumber& seq = sn_iter->first;
02033 writer.send_buff_->resend_fragments_i(seq, sn_iter->second);
02034 }
02035 }
02036 }
02037
02038 void
02039 RtpsUdpDataLink::process_requested_changes(DisjointSequence& requests,
02040 const RtpsWriter& writer,
02041 const ReaderInfo& reader)
02042 {
02043 for (size_t i = 0; i < reader.requested_changes_.size(); ++i) {
02044 const RTPS::SequenceNumberSet& sn_state = reader.requested_changes_[i];
02045 SequenceNumber base;
02046 base.setValue(sn_state.bitmapBase.high, sn_state.bitmapBase.low);
02047 if (sn_state.numBits == 1 && !(sn_state.bitmap[0] & 1)
02048 && base == writer.heartbeat_high(reader)) {
02049
02050
02051
02052 if (!writer.send_buff_.is_nil() && writer.send_buff_->contains(base)) {
02053 requests.insert(base);
02054 }
02055 } else {
02056 requests.insert(base, sn_state.numBits, sn_state.bitmap.get_buffer());
02057 }
02058 }
02059 }
02060
02061 void
02062 RtpsUdpDataLink::send_directed_nack_replies(const RepoId& writerId,
02063 RtpsWriter& writer,
02064 const RepoId& readerId,
02065 ReaderInfo& reader)
02066 {
02067 if (!locators_.count(readerId)) {
02068 return;
02069 }
02070
02071 DisjointSequence requests;
02072 process_requested_changes(requests, writer, reader);
02073 reader.requested_changes_.clear();
02074
02075 DisjointSequence gaps;
02076 ACE_INET_Addr addr = locators_[readerId].addr_;
02077
02078 if (!requests.empty()) {
02079 if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
02080 gaps = requests;
02081 } else {
02082 OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
02083 SingleSendBuffer& sb = *writer.send_buff_;
02084 ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
02085 const RtpsUdpSendStrategy::OverrideToken ot =
02086 send_strategy()->override_destinations(addr);
02087 for (size_t i = 0; i < ranges.size(); ++i) {
02088 if (Transport_debug_level > 5) {
02089 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_directed_nack_replies "
02090 "resend data %d-%d\n", int(ranges[i].first.getValue()),
02091 int(ranges[i].second.getValue())));
02092 }
02093 sb.resend_i(ranges[i], &gaps, readerId);
02094 }
02095 }
02096 }
02097
02098 if (gaps.empty()) {
02099 return;
02100 }
02101 if (Transport_debug_level > 5) {
02102 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_directed_nack_replies GAPs: "));
02103 gaps.dump();
02104 }
02105 ACE_Message_Block* mb_gap =
02106 marshal_gaps(writerId, readerId, gaps, writer.durable_);
02107 if (mb_gap) {
02108 send_strategy()->send_rtps_control(*mb_gap, addr);
02109 mb_gap->release();
02110 }
02111 }
02112
02113 void
02114 RtpsUdpDataLink::process_acked_by_all_i(ACE_Guard<ACE_Thread_Mutex>& g, const RepoId& pub_id)
02115 {
02116 using namespace OpenDDS::RTPS;
02117 typedef RtpsWriterMap::iterator rw_iter;
02118 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
02119 OPENDDS_VECTOR(RepoId) to_check;
02120 rw_iter rw = writers_.find(pub_id);
02121 if (rw == writers_.end()) {
02122 return;
02123 }
02124 RtpsWriter& writer = rw->second;
02125 if (!writer.elems_not_acked_.empty()) {
02126
02127
02128
02129 SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
02130
02131 typedef ReaderInfoMap::iterator ri_iter;
02132 const ri_iter end = writer.remote_readers_.end();
02133 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
02134 if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
02135 all_readers_ack = ri->second.cur_cumulative_ack_;
02136 }
02137 }
02138 if (all_readers_ack == SequenceNumber::MAX_VALUE) {
02139 return;
02140 }
02141 OPENDDS_VECTOR(SequenceNumber) sns;
02142
02143
02144 iter_t it = writer.elems_not_acked_.begin();
02145 OPENDDS_SET(SequenceNumber) sns_to_release;
02146 while (it != writer.elems_not_acked_.end()) {
02147 if (it->first < all_readers_ack) {
02148 writer.to_deliver_.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
02149 sns_to_release.insert(it->first);
02150 iter_t last = it;
02151 ++it;
02152 writer.elems_not_acked_.erase(last);
02153 } else {
02154 break;
02155 }
02156 }
02157 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
02158 while (sns_it != sns_to_release.end()) {
02159 writer.send_buff_->release_acked(*sns_it);
02160 ++sns_it;
02161 }
02162 TransportQueueElement* tqe_to_deliver;
02163
02164 while (true) {
02165 rw_iter deliver_on_writer = writers_.find(pub_id);
02166 if (deliver_on_writer == writers_.end()) {
02167 break;
02168 }
02169 RtpsWriter& writer = deliver_on_writer->second;
02170 iter_t to_deliver_iter = writer.to_deliver_.begin();
02171 if (to_deliver_iter == writer.to_deliver_.end()) {
02172 break;
02173 }
02174 tqe_to_deliver = to_deliver_iter->second;
02175 writer.to_deliver_.erase(to_deliver_iter);
02176 g.release();
02177
02178 tqe_to_deliver->data_delivered();
02179 g.acquire();
02180 }
02181 }
02182 }
02183
02184 ACE_Message_Block*
02185 RtpsUdpDataLink::marshal_gaps(const RepoId& writer, const RepoId& reader,
02186 const DisjointSequence& gaps, bool durable)
02187 {
02188 using namespace RTPS;
02189
02190
02191 const SequenceNumber firstMissing = gaps.low(),
02192 base = ++SequenceNumber(gaps.cumulative_ack());
02193 const SequenceNumber_t gapStart = {firstMissing.getHigh(),
02194 firstMissing.getLow()},
02195 gapListBase = {base.getHigh(), base.getLow()};
02196 CORBA::ULong num_bits = 0;
02197 LongSeq8 bitmap;
02198
02199 if (gaps.disjoint()) {
02200 bitmap.length(bitmap_num_longs(base, gaps.high()));
02201 gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits);
02202
02203 } else {
02204 bitmap.length(1);
02205 bitmap[0] = 0;
02206 num_bits = 1;
02207 }
02208
02209 GapSubmessage gap = {
02210 {GAP, FLAG_E, 0 },
02211 reader.entityId,
02212 writer.entityId,
02213 gapStart,
02214 {gapListBase, num_bits, bitmap}
02215 };
02216
02217 if (Transport_debug_level > 5) {
02218 const GuidConverter conv(writer);
02219 SequenceRange sr;
02220 sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
02221 SequenceNumber srbase;
02222 srbase.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
02223 sr.second = srbase.previous();
02224 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::marshal_gaps "
02225 "GAP with range [%q, %q] from %C\n",
02226 sr.first.getValue(), sr.second.getValue(),
02227 OPENDDS_STRING(conv).c_str()));
02228 }
02229
02230 size_t gap_size = 0, padding = 0;
02231 gen_find_size(gap, gap_size, padding);
02232 gap.smHeader.submessageLength =
02233 static_cast<CORBA::UShort>(gap_size + padding) - SMHDR_SZ;
02234
02235
02236 OPENDDS_VECTOR(RepoId) readers;
02237 if (durable && reader.entityId == ENTITYID_UNKNOWN) {
02238 if (Transport_debug_level > 5) {
02239 const GuidConverter local_conv(writer);
02240 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps local %C "
02241 "durable writer\n", OPENDDS_STRING(local_conv).c_str()));
02242 }
02243 const RtpsWriterMap::iterator iter = writers_.find(writer);
02244 RtpsWriter& rw = iter->second;
02245 for (ReaderInfoMap::iterator ri = rw.remote_readers_.begin();
02246 ri != rw.remote_readers_.end(); ++ri) {
02247 if (!ri->second.expecting_durable_data()) {
02248 readers.push_back(ri->first);
02249 } else if (Transport_debug_level > 5) {
02250 const GuidConverter remote_conv(ri->first);
02251 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps reader "
02252 "%C is expecting durable data, no GAP sent\n",
02253 OPENDDS_STRING(remote_conv).c_str()));
02254 }
02255 }
02256 if (readers.empty()) return 0;
02257 }
02258
02259 const size_t size_per_idst = INFO_DST_SZ + SMHDR_SZ,
02260 prefix_sz = sizeof(reader.guidPrefix);
02261
02262 const size_t total_sz = readers.empty() ? (gap_size + padding) :
02263 (readers.size() * (gap_size + padding + size_per_idst));
02264
02265 ACE_Message_Block* mb_gap = new ACE_Message_Block(total_sz);
02266
02267
02268 Serializer ser(mb_gap, false, Serializer::ALIGN_CDR);
02269 if (readers.empty()) {
02270 ser << gap;
02271 } else {
02272 InfoDestinationSubmessage idst = {
02273 {INFO_DST, FLAG_E, INFO_DST_SZ},
02274 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02275 };
02276 for (size_t i = 0; i < readers.size(); ++i) {
02277 std::memcpy(idst.guidPrefix, readers[i].guidPrefix, prefix_sz);
02278 gap.readerId = readers[i].entityId;
02279 ser << idst;
02280 ser << gap;
02281 }
02282 }
02283 return mb_gap;
02284 }
02285
02286 void
02287 RtpsUdpDataLink::durability_resend(TransportQueueElement* element)
02288 {
02289 ACE_Message_Block* msg = const_cast<ACE_Message_Block*>(element->msg());
02290 send_strategy()->send_rtps_control(*msg,
02291 get_locator(element->subscription_id()));
02292 }
02293
02294 void
02295 RtpsUdpDataLink::send_durability_gaps(const RepoId& writer,
02296 const RepoId& reader,
02297 const DisjointSequence& gaps)
02298 {
02299 ACE_Message_Block mb(RTPS::INFO_DST_SZ + RTPS::SMHDR_SZ);
02300 Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02301 RTPS::InfoDestinationSubmessage info_dst = {
02302 {RTPS::INFO_DST, RTPS::FLAG_E, RTPS::INFO_DST_SZ},
02303 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02304 };
02305 std::memcpy(info_dst.guidPrefix, reader.guidPrefix, sizeof(GuidPrefix_t));
02306 ser << info_dst;
02307 mb.cont(marshal_gaps(writer, reader, gaps));
02308 send_strategy()->send_rtps_control(mb, get_locator(reader));
02309 mb.cont()->release();
02310 }
02311
02312 void
02313 RtpsUdpDataLink::send_heartbeats()
02314 {
02315 OPENDDS_VECTOR(CallbackType) readerDoesNotExistCallbacks;
02316 OPENDDS_VECTOR(TransportQueueElement*) pendingCallbacks;
02317
02318 {
02319 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02320
02321 if (writers_.empty() && interesting_readers_.empty()) {
02322 heartbeat_->disable();
02323 }
02324
02325 using namespace OpenDDS::RTPS;
02326 OPENDDS_VECTOR(HeartBeatSubmessage) subm;
02327 OPENDDS_SET(ACE_INET_Addr) recipients;
02328 const ACE_Time_Value now = ACE_OS::gettimeofday();
02329
02330 RepoIdSet writers_to_advertise;
02331
02332 RtpsUdpInst& config = this->config();
02333
02334 const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config.heartbeat_period_;
02335 const ACE_Time_Value tv3 = ACE_OS::gettimeofday() - 3 * config.heartbeat_period_;
02336 for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
02337 limit = interesting_readers_.end();
02338 pos != limit;
02339 ++pos) {
02340 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
02341 (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
02342 recipients.insert(pos->second.address);
02343 writers_to_advertise.insert(pos->second.localid);
02344 }
02345 if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02346 CallbackType callback(pos->first, pos->second);
02347 readerDoesNotExistCallbacks.push_back(callback);
02348 pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02349 }
02350 }
02351
02352 typedef RtpsWriterMap::iterator rw_iter;
02353 for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
02354 const bool has_data = !rw->second.send_buff_.is_nil()
02355 && !rw->second.send_buff_->empty();
02356 bool final = true, has_durable_data = false;
02357 SequenceNumber durable_max;
02358
02359 typedef ReaderInfoMap::iterator ri_iter;
02360 const ri_iter end = rw->second.remote_readers_.end();
02361 for (ri_iter ri = rw->second.remote_readers_.begin(); ri != end; ++ri) {
02362 if ((has_data || !ri->second.handshake_done_)
02363 && locators_.count(ri->first)) {
02364 recipients.insert(locators_[ri->first].addr_);
02365 if (final && !ri->second.handshake_done_) {
02366 final = false;
02367 }
02368 }
02369 if (!ri->second.durable_data_.empty()) {
02370 const ACE_Time_Value expiration =
02371 ri->second.durable_timestamp_ + config.durable_data_timeout_;
02372 if (now > expiration) {
02373 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator
02374 dd_iter;
02375 for (dd_iter it = ri->second.durable_data_.begin();
02376 it != ri->second.durable_data_.end(); ++it) {
02377 pendingCallbacks.push_back(it->second);
02378 }
02379 ri->second.durable_data_.clear();
02380 if (Transport_debug_level > 3) {
02381 const GuidConverter gw(rw->first), gr(ri->first);
02382 VDBG_LVL((LM_INFO, "(%P|%t) RtpsUdpDataLink::send_heartbeats - "
02383 "removed expired durable data for %C -> %C\n",
02384 OPENDDS_STRING(gw).c_str(), OPENDDS_STRING(gr).c_str()), 3);
02385 }
02386 } else {
02387 has_durable_data = true;
02388 if (ri->second.durable_data_.rbegin()->first > durable_max) {
02389 durable_max = ri->second.durable_data_.rbegin()->first;
02390 }
02391 if (locators_.count(ri->first)) {
02392 recipients.insert(locators_[ri->first].addr_);
02393 }
02394 }
02395 }
02396 }
02397
02398 if (!rw->second.elems_not_acked_.empty()) {
02399 final = false;
02400 }
02401
02402 if (writers_to_advertise.count(rw->first)) {
02403 final = false;
02404 writers_to_advertise.erase(rw->first);
02405 }
02406
02407 if (final && !has_data && !has_durable_data) {
02408 continue;
02409 }
02410
02411 const SequenceNumber firstSN = (rw->second.durable_ || !has_data)
02412 ? 1 : rw->second.send_buff_->low(),
02413 lastSN = std::max(durable_max,
02414 has_data ? rw->second.send_buff_->high() : 1);
02415
02416 const HeartBeatSubmessage hb = {
02417 {HEARTBEAT,
02418 CORBA::Octet(FLAG_E | (final ? FLAG_F : 0)),
02419 HEARTBEAT_SZ},
02420 ENTITYID_UNKNOWN,
02421 rw->first.entityId,
02422 {firstSN.getHigh(), firstSN.getLow()},
02423 {lastSN.getHigh(), lastSN.getLow()},
02424 {++heartbeat_counts_[rw->first]}
02425 };
02426 subm.push_back(hb);
02427 }
02428
02429 for (RepoIdSet::const_iterator pos = writers_to_advertise.begin(),
02430 limit = writers_to_advertise.end();
02431 pos != limit;
02432 ++pos) {
02433 const SequenceNumber SN = 1;
02434 const HeartBeatSubmessage hb = {
02435 {HEARTBEAT,
02436 FLAG_E,
02437 HEARTBEAT_SZ},
02438 ENTITYID_UNKNOWN,
02439 pos->entityId,
02440 {SN.getHigh(), SN.getLow()},
02441 {SN.getHigh(), SN.getLow()},
02442 {++heartbeat_counts_[*pos]}
02443 };
02444 subm.push_back(hb);
02445 }
02446
02447 send_directed_heartbeats(subm);
02448
02449 if (!subm.empty()) {
02450 ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * subm.size());
02451
02452 Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02453 bool send_ok = true;
02454 for (size_t i = 0; i < subm.size(); ++i) {
02455 if (!(ser << subm[i])) {
02456 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats() - "
02457 "failed to serialize HEARTBEAT submessage %B\n", i));
02458 send_ok = false;
02459 break;
02460 }
02461 }
02462 if (send_ok) {
02463 send_strategy()->send_rtps_control(mb, recipients);
02464 }
02465 }
02466 }
02467
02468 for (OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks.begin();
02469 iter != readerDoesNotExistCallbacks.end(); ++iter){
02470 const InterestingRemote& remote = iter->second;
02471 remote.listener->reader_does_not_exist(iter->first, remote.localid);
02472 }
02473
02474 for (size_t i = 0; i < pendingCallbacks.size(); ++i) {
02475 pendingCallbacks[i]->data_dropped();
02476 }
02477 }
02478
02479 void
02480 RtpsUdpDataLink::send_directed_heartbeats(OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)& hbs)
02481 {
02482 #if defined(OPENDDS_SECURITY)
02483 const EntityId_t& volatile_writer =
02484 RTPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER;
02485
02486 RTPS::InfoDestinationSubmessage idst;
02487 idst.smHeader.submessageId = RTPS::INFO_DST;
02488 idst.smHeader.flags = RTPS::FLAG_E;
02489 idst.smHeader.submessageLength = RTPS::INFO_DST_SZ;
02490 const size_t block_size = RTPS::INFO_DST_SZ + RTPS::HEARTBEAT_SZ
02491 + 2 * RTPS::SMHDR_SZ;
02492 Message_Block_Ptr mb;
02493
02494 typedef OPENDDS_VECTOR(RTPS::HeartBeatSubmessage)::iterator iter_t;
02495 iter_t it = hbs.begin(), last = hbs.end();
02496 while (it != last) {
02497 if (0 == std::memcmp(&it->writerId, &volatile_writer, sizeof(EntityId_t))) {
02498 RepoId local;
02499 RTPS::assign(local.guidPrefix, local_prefix_);
02500 local.entityId = it->writerId;
02501 RtpsWriterMap::const_iterator rw = writers_.find(local);
02502 if (rw != writers_.end()) {
02503 const ReaderInfoMap& rinfo = rw->second.remote_readers_;
02504 for (ReaderInfoMap::const_iterator ri = rinfo.begin();
02505 ri != rinfo.end(); ++ri) {
02506 RTPS::assign(idst.guidPrefix, ri->first.guidPrefix);
02507 it->readerId = ri->first.entityId;
02508 if (mb) {
02509 mb->reset();
02510 } else {
02511 mb.reset(new ACE_Message_Block(block_size));
02512 }
02513 Serializer ser(mb.get(), false, Serializer::ALIGN_CDR);
02514 ser << idst;
02515 ser << *it;
02516 send_strategy()->send_rtps_control(*mb, locators_[ri->first].addr_);
02517 }
02518 }
02519 std::iter_swap(it, --last);
02520 } else {
02521 ++it;
02522 }
02523 }
02524 hbs.erase(last, hbs.end());
02525 #else
02526 ACE_UNUSED_ARG(hbs);
02527 #endif
02528 }
02529
02530 void
02531 RtpsUdpDataLink::check_heartbeats()
02532 {
02533 OPENDDS_VECTOR(CallbackType) writerDoesNotExistCallbacks;
02534
02535
02536 const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * this->config().heartbeat_period_;
02537 {
02538 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02539
02540 for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
02541 pos != limit;
02542 ++pos) {
02543 if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02544 CallbackType callback(pos->first, pos->second);
02545 writerDoesNotExistCallbacks.push_back(callback);
02546 pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02547 }
02548 }
02549 }
02550
02551 OPENDDS_VECTOR(CallbackType)::iterator iter;
02552 for (iter = writerDoesNotExistCallbacks.begin(); iter != writerDoesNotExistCallbacks.end(); ++iter) {
02553 const RepoId& rid = iter->first;
02554 const InterestingRemote& remote = iter->second;
02555 remote.listener->writer_does_not_exist(rid, remote.localid);
02556 }
02557 }
02558
02559 void
02560 RtpsUdpDataLink::send_heartbeats_manual(const TransportSendControlElement* tsce)
02561 {
02562 using namespace OpenDDS::RTPS;
02563
02564 const RepoId pub_id = tsce->publication_id();
02565
02566
02567 OPENDDS_SET(ACE_INET_Addr) recipients;
02568 get_locators (pub_id, recipients);
02569 if (recipients.empty()) {
02570 return;
02571 }
02572
02573
02574
02575 SequenceNumber firstSN, lastSN;
02576 CORBA::Long counter;
02577 RtpsWriterMap::iterator pos = writers_.find (pub_id);
02578 if (pos != writers_.end ()) {
02579
02580 const bool has_data = !pos->second.send_buff_.is_nil() && !pos->second.send_buff_->empty();
02581 SequenceNumber durable_max;
02582 const ACE_Time_Value now = ACE_OS::gettimeofday();
02583 for (ReaderInfoMap::const_iterator ri = pos->second.remote_readers_.begin(), end = pos->second.remote_readers_.end();
02584 ri != end;
02585 ++ri) {
02586 if (!ri->second.durable_data_.empty()) {
02587 const ACE_Time_Value expiration = ri->second.durable_timestamp_ + config().durable_data_timeout_;
02588 if (now <= expiration &&
02589 ri->second.durable_data_.rbegin()->first > durable_max) {
02590 durable_max = ri->second.durable_data_.rbegin()->first;
02591 }
02592 }
02593 }
02594 firstSN = (pos->second.durable_ || !has_data) ? 1 : pos->second.send_buff_->low();
02595 lastSN = std::max(durable_max, has_data ? pos->second.send_buff_->high() : 1);
02596 counter = ++heartbeat_counts_[pos->first];
02597 } else {
02598
02599 firstSN = 1;
02600 lastSN = tsce->sequence();
02601 counter = ++this->best_effort_heartbeat_count_;
02602 }
02603
02604 const HeartBeatSubmessage hb = {
02605 {HEARTBEAT,
02606 CORBA::Octet(FLAG_E | FLAG_F | FLAG_L),
02607 HEARTBEAT_SZ},
02608 ENTITYID_UNKNOWN,
02609 pub_id.entityId,
02610 {firstSN.getHigh(), firstSN.getLow()},
02611 {lastSN.getHigh(), lastSN.getLow()},
02612 {counter}
02613 };
02614
02615 ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * 1);
02616
02617 Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02618 if ((ser << hb)) {
02619 send_strategy()->send_rtps_control(mb, recipients);
02620 }
02621 else {
02622 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats_manual() - "
02623 "failed to serialize HEARTBEAT submessage\n"));
02624 }
02625 }
02626
02627 #if defined(OPENDDS_SECURITY)
02628 void
02629 RtpsUdpDataLink::populate_security_handles(const RepoId& local_id,
02630 const RepoId& remote_id,
02631 const unsigned char* buffer,
02632 unsigned int buffer_size)
02633 {
02634 using DDS::Security::ParticipantCryptoHandle;
02635 using DDS::Security::DatawriterCryptoHandle;
02636 using DDS::Security::DatareaderCryptoHandle;
02637
02638 ACE_Data_Block db(buffer_size, ACE_Message_Block::MB_DATA,
02639 reinterpret_cast<const char*>(buffer),
02640 0 , 0 , ACE_Message_Block::DONT_DELETE, 0 );
02641 ACE_Message_Block mb(&db, ACE_Message_Block::DONT_DELETE, 0 );
02642 mb.wr_ptr(mb.space());
02643 DCPS::Serializer ser(&mb, ACE_CDR_BYTE_ORDER, DCPS::Serializer::ALIGN_CDR);
02644
02645 const EntityKind localKind(GuidConverter(local_id).entityKind());
02646 const RepoId& writer_id = (localKind == KIND_WRITER) ? local_id : remote_id;
02647 const RepoId& reader_id = (localKind == KIND_WRITER) ? remote_id : local_id;
02648
02649 while (mb.length()) {
02650 DDS::BinaryProperty_t prop;
02651 if (!(ser >> prop)) {
02652 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::populate_security_handles()"
02653 " - failed to deserialize BinaryProperty_t\n"));
02654 return;
02655 }
02656
02657 if (std::strcmp(prop.name.in(), RTPS::BLOB_PROP_PART_CRYPTO_HANDLE) == 0
02658 && prop.value.length() >= sizeof(ParticipantCryptoHandle)) {
02659 unsigned int handle = 0;
02660 for (unsigned int i = 0; i < prop.value.length(); ++i) {
02661 handle = handle << 8 | prop.value[i];
02662 }
02663
02664 RepoId remote_participant;
02665 RTPS::assign(remote_participant.guidPrefix, remote_id.guidPrefix);
02666 remote_participant.entityId = ENTITYID_PARTICIPANT;
02667 peer_crypto_handles_[remote_participant] = handle;
02668 ACE_DEBUG((LM_DEBUG, "(%P|%t) RPCH %C = %d\n",
02669 std::string(GuidConverter(remote_participant)).c_str(), handle));
02670 }
02671
02672 else if (std::strcmp(prop.name.in(), RTPS::BLOB_PROP_DW_CRYPTO_HANDLE) == 0
02673 && prop.value.length() >= sizeof(DatawriterCryptoHandle)) {
02674 unsigned int handle = 0;
02675 for (unsigned int i = 0; i < prop.value.length(); ++i) {
02676 handle = handle << 8 | prop.value[i];
02677 }
02678 peer_crypto_handles_[writer_id] = handle;
02679 ACE_DEBUG((LM_DEBUG, "(%P|%t) DWCH %C = %d\n",
02680 std::string(GuidConverter(writer_id)).c_str(), handle));
02681 }
02682
02683 else if (std::strcmp(prop.name.in(), RTPS::BLOB_PROP_DR_CRYPTO_HANDLE) == 0
02684 && prop.value.length() >= sizeof(DatareaderCryptoHandle)) {
02685 unsigned int handle = 0;
02686 for (unsigned int i = 0; i < prop.value.length(); ++i) {
02687 handle = handle << 8 | prop.value[i];
02688 }
02689 peer_crypto_handles_[reader_id] = handle;
02690 ACE_DEBUG((LM_DEBUG, "(%P|%t) DRCH %C = %d\n",
02691 std::string(GuidConverter(reader_id)).c_str(), handle));
02692 }
02693
02694 }
02695 }
02696 #endif
02697
02698 RtpsUdpDataLink::ReaderInfo::~ReaderInfo()
02699 {
02700 expire_durable_data();
02701 }
02702
02703 void
02704 RtpsUdpDataLink::ReaderInfo::expire_durable_data()
02705 {
02706 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
02707 for (iter_t it = durable_data_.begin(); it != durable_data_.end(); ++it) {
02708 it->second->data_dropped();
02709 }
02710 }
02711
02712 bool
02713 RtpsUdpDataLink::ReaderInfo::expecting_durable_data() const
02714 {
02715 return durable_ &&
02716 (durable_timestamp_ == ACE_Time_Value::zero
02717 || !durable_data_.empty());
02718 }
02719
02720 RtpsUdpDataLink::RtpsWriter::~RtpsWriter()
02721 {
02722 if (!to_deliver_.empty()) {
02723 ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: RtpsWriter::~RtpsWriter - deleting with %d elements left to deliver\n", to_deliver_.size()));
02724 }
02725 if (!elems_not_acked_.empty()) {
02726 ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: RtpsWriter::~RtpsWriter - deleting with %d elements left not fully acknowledged\n", elems_not_acked_.size()));
02727 }
02728 }
02729
02730 SequenceNumber
02731 RtpsUdpDataLink::RtpsWriter::heartbeat_high(const ReaderInfo& ri) const
02732 {
02733 const SequenceNumber durable_max =
02734 ri.durable_data_.empty() ? 0 : ri.durable_data_.rbegin()->first;
02735 const SequenceNumber data_max =
02736 send_buff_.is_nil() ? 0 : (send_buff_->empty() ? 0 : send_buff_->high());
02737 return std::max(durable_max, data_max);
02738 }
02739
02740 void
02741 RtpsUdpDataLink::RtpsWriter::add_elem_awaiting_ack(TransportQueueElement* element)
02742 {
02743 elems_not_acked_.insert(SnToTqeMap::value_type(element->sequence(), element));
02744 }
02745
02746
02747
02748
02749 void
02750 RtpsUdpDataLink::TimedDelay::schedule()
02751 {
02752 if (!scheduled_) {
02753 const long timer = outer_->get_reactor()->schedule_timer(this, 0, timeout_);
02754
02755 if (timer == -1) {
02756 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::TimedDelay::schedule "
02757 "failed to schedule timer %p\n", ACE_TEXT("")));
02758 } else {
02759 scheduled_ = true;
02760 }
02761 }
02762 }
02763
02764 void
02765 RtpsUdpDataLink::TimedDelay::cancel()
02766 {
02767 if (scheduled_) {
02768 outer_->get_reactor()->cancel_timer(this);
02769 scheduled_ = false;
02770 }
02771 }
02772
02773 void
02774 RtpsUdpDataLink::HeartBeat::enable()
02775 {
02776 if (!enabled_) {
02777 const ACE_Time_Value& per = outer_->config().heartbeat_period_;
02778 const long timer =
02779 outer_->get_reactor()->schedule_timer(this, 0, ACE_Time_Value::zero, per);
02780
02781 if (timer == -1) {
02782 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::HeartBeat::enable"
02783 " failed to schedule timer %p\n", ACE_TEXT("")));
02784 } else {
02785 enabled_ = true;
02786 }
02787 }
02788 }
02789
02790 void
02791 RtpsUdpDataLink::HeartBeat::disable()
02792 {
02793 if (enabled_) {
02794 outer_->get_reactor()->cancel_timer(this);
02795 enabled_ = false;
02796 }
02797 }
02798
02799 void
02800 RtpsUdpDataLink::send_final_acks (const RepoId& readerid)
02801 {
02802 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02803 RtpsReaderMap::iterator rr = readers_.find (readerid);
02804 if (rr != readers_.end ()) {
02805 send_ack_nacks (rr, true);
02806 }
02807 }
02808
02809
02810 int
02811 RtpsUdpDataLink::HeldDataDeliveryHandler::handle_exception(ACE_HANDLE )
02812 {
02813 ACE_ASSERT(link_->reactor_task_->get_reactor_owner() == ACE_Thread::self());
02814
02815 HeldData::iterator itr;
02816 for (itr = held_data_.begin(); itr != held_data_.end(); ++itr) {
02817 link_->data_received(itr->first, itr->second);
02818 }
02819 held_data_.clear();
02820 return 0;
02821 }
02822
02823 void RtpsUdpDataLink::HeldDataDeliveryHandler::notify_delivery(const RepoId& readerId, WriterInfo& info)
02824 {
02825 ACE_ASSERT(link_->reactor_task_->get_reactor_owner() == ACE_Thread::self());
02826
02827 const SequenceNumber ca = info.recvd_.cumulative_ack();
02828 typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter;
02829 const iter end = info.held_.upper_bound(ca);
02830
02831 for (iter it = info.held_.begin(); it != end; ) {
02832 if (Transport_debug_level > 5) {
02833 GuidConverter reader(readerId);
02834 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::HeldDataDeliveryHandler::notify_delivery -")
02835 ACE_TEXT(" deliver sequence: %q to %C\n"),
02836 it->second.header_.sequence_.getValue(),
02837 OPENDDS_STRING(reader).c_str()));
02838 }
02839
02840 held_data_.push_back(HeldDataEntry(it->second, readerId));
02841 info.held_.erase(it++);
02842 }
02843 link_->reactor_task_->get_reactor()->notify(this);
02844 }
02845
02846 ACE_Event_Handler::Reference_Count
02847 RtpsUdpDataLink::HeldDataDeliveryHandler::add_reference()
02848 {
02849 return link_->add_reference();
02850 }
02851
02852 ACE_Event_Handler::Reference_Count
02853 RtpsUdpDataLink::HeldDataDeliveryHandler::remove_reference()
02854 {
02855 return link_->remove_reference();
02856 }
02857
02858 OpenDDS::DCPS::RtpsUdpSendStrategy*
02859 OpenDDS::DCPS::RtpsUdpDataLink::send_strategy()
02860 {
02861 return static_cast<OpenDDS::DCPS::RtpsUdpSendStrategy*>(send_strategy_.in());
02862 }
02863
02864 OpenDDS::DCPS::RtpsUdpReceiveStrategy*
02865 OpenDDS::DCPS::RtpsUdpDataLink::receive_strategy()
02866 {
02867 return static_cast<OpenDDS::DCPS::RtpsUdpReceiveStrategy*>(receive_strategy_.in());
02868 }
02869
02870
02871 }
02872 }
02873
02874 OPENDDS_END_VERSIONED_NAMESPACE_DECL