00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsUdpDataLink.h"
00009 #include "RtpsUdpTransport.h"
00010 #include "RtpsUdpInst.h"
00011
00012 #include "dds/DCPS/transport/framework/TransportCustomizedElement.h"
00013 #include "dds/DCPS/transport/framework/TransportSendElement.h"
00014 #include "dds/DCPS/transport/framework/TransportSendControlElement.h"
00015 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00016
00017 #include "dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h"
00018 #include "dds/DCPS/RTPS/BaseMessageUtils.h"
00019 #include "dds/DCPS/RTPS/BaseMessageTypes.h"
00020 #include "dds/DCPS/RTPS/MessageTypes.h"
00021
00022 #include "ace/Default_Constants.h"
00023 #include "ace/Log_Msg.h"
00024 #include "ace/Message_Block.h"
00025 #include "ace/Reverse_Lock_T.h"
00026 #include "ace/Reactor.h"
00027
00028 #include <string.h>
00029
00030 #ifndef __ACE_INLINE__
00031 # include "RtpsUdpDataLink.inl"
00032 #endif
00033
00034 namespace {
00035
00036
00037
00038 CORBA::ULong
00039 bitmap_num_longs(const OpenDDS::DCPS::SequenceNumber& low,
00040 const OpenDDS::DCPS::SequenceNumber& high)
00041 {
00042 return std::min(CORBA::ULong(8),
00043 CORBA::ULong((high.getValue() - low.getValue() + 32) / 32));
00044 }
00045
00046 }
00047
00048 namespace OpenDDS {
00049 namespace DCPS {
00050
00051 RtpsUdpDataLink::RtpsUdpDataLink(RtpsUdpTransport* transport,
00052 const GuidPrefix_t& local_prefix,
00053 RtpsUdpInst* config,
00054 TransportReactorTask* reactor_task)
00055 : DataLink(transport,
00056 0,
00057 false,
00058 false),
00059 config_(config),
00060 reactor_task_(reactor_task, false),
00061 rtps_customized_element_allocator_(40, sizeof(RtpsCustomizedElement)),
00062 multi_buff_(this, config->nak_depth_),
00063 best_effort_heartbeat_count_(0),
00064 nack_reply_(this, &RtpsUdpDataLink::send_nack_replies,
00065 config->nak_response_delay_),
00066 heartbeat_reply_(this, &RtpsUdpDataLink::send_heartbeat_replies,
00067 config->heartbeat_response_delay_),
00068 heartbeat_(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::send_heartbeats),
00069 heartbeatchecker_(reactor_task->get_reactor(), reactor_task->get_reactor_owner(), this, &RtpsUdpDataLink::check_heartbeats)
00070 {
00071 std::memcpy(local_prefix_, local_prefix, sizeof(GuidPrefix_t));
00072 }
00073
00074 bool
00075 RtpsUdpDataLink::add_delayed_notification(TransportQueueElement* element)
00076 {
00077 RtpsWriterMap::iterator iter = writers_.find(element->publication_id());
00078 if (iter != writers_.end()) {
00079
00080 iter->second.add_elem_awaiting_ack(element);
00081 return true;
00082 }
00083 return false;
00084 }
00085
00086 void RtpsUdpDataLink::do_remove_sample(const RepoId& pub_id,
00087 const TransportQueueElement::MatchCriteria& criteria)
00088 {
00089 RtpsWriter::SnToTqeMap sn_tqe_map;
00090 RtpsWriter::SnToTqeMap to_deliver;
00091 typedef RtpsWriter::SnToTqeMap::iterator iter_t;
00092
00093 {
00094 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00095
00096 RtpsWriterMap::iterator iter = writers_.find(pub_id);
00097 if (iter != writers_.end() && !iter->second.elems_not_acked_.empty()) {
00098 to_deliver.insert(iter->second.to_deliver_.begin(), iter->second.to_deliver_.end());
00099 iter->second.to_deliver_.clear();
00100 iter_t it = iter->second.elems_not_acked_.begin();
00101 OPENDDS_SET(SequenceNumber) sns_to_release;
00102 while (it != iter->second.elems_not_acked_.end()) {
00103 if (criteria.matches(*it->second)) {
00104 sn_tqe_map.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
00105 sns_to_release.insert(it->first);
00106 iter_t last = it;
00107 ++it;
00108 iter->second.elems_not_acked_.erase(last);
00109 } else {
00110 ++it;
00111 }
00112 }
00113 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00114 while (sns_it != sns_to_release.end()) {
00115 iter->second.send_buff_->release_acked(*sns_it);
00116 ++sns_it;
00117 }
00118 }
00119 }
00120 iter_t deliver_iter = to_deliver.begin();
00121 while (deliver_iter != to_deliver.end()) {
00122 deliver_iter->second->data_delivered();
00123 ++deliver_iter;
00124 }
00125 iter_t drop_iter = sn_tqe_map.begin();
00126 while (drop_iter != sn_tqe_map.end()) {
00127 drop_iter->second->data_dropped(true);
00128 ++drop_iter;
00129 }
00130 }
00131
00132 bool
00133 RtpsUdpDataLink::open(const ACE_SOCK_Dgram& unicast_socket)
00134 {
00135 unicast_socket_ = unicast_socket;
00136
00137 if (config_->use_multicast_) {
00138 const OPENDDS_STRING& net_if = config_->multicast_interface_;
00139 #ifdef ACE_HAS_MAC_OSX
00140 multicast_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00141 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00142 #endif
00143 if (multicast_socket_.join(config_->multicast_group_address_, 1,
00144 net_if.empty() ? 0 :
00145 ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
00146 ACE_ERROR_RETURN((LM_ERROR,
00147 ACE_TEXT("(%P|%t) ERROR: ")
00148 ACE_TEXT("RtpsUdpDataLink::open: ")
00149 ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed: %m\n")),
00150 false);
00151 }
00152 }
00153
00154 if (!OpenDDS::DCPS::set_socket_multicast_ttl(unicast_socket_, config_->ttl_)) {
00155 ACE_ERROR_RETURN((LM_ERROR,
00156 ACE_TEXT("(%P|%t) ERROR: ")
00157 ACE_TEXT("RtpsUdpDataLink::open: ")
00158 ACE_TEXT("failed to set TTL: %d\n"),
00159 config_->ttl_),
00160 false);
00161 }
00162
00163 send_strategy_->send_buffer(&multi_buff_);
00164
00165 if (start(static_rchandle_cast<TransportSendStrategy>(send_strategy_),
00166 static_rchandle_cast<TransportStrategy>(recv_strategy_)) != 0) {
00167 stop_i();
00168 ACE_ERROR_RETURN((LM_ERROR,
00169 ACE_TEXT("(%P|%t) ERROR: ")
00170 ACE_TEXT("UdpDataLink::open: start failed!\n")),
00171 false);
00172 }
00173
00174 return true;
00175 }
00176
00177 void
00178 RtpsUdpDataLink::add_locator(const RepoId& remote_id,
00179 const ACE_INET_Addr& address,
00180 bool requires_inline_qos)
00181 {
00182 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00183 locators_[remote_id] = RemoteInfo(address, requires_inline_qos);
00184 }
00185
00186 void
00187 RtpsUdpDataLink::get_locators(const RepoId& local_id,
00188 OPENDDS_SET(ACE_INET_Addr)& addrs) const
00189 {
00190 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00191
00192 if (local_id == GUID_UNKNOWN) {
00193 for (iter_t iter = locators_.begin(); iter != locators_.end(); ++iter) {
00194 addrs.insert(iter->second.addr_);
00195 }
00196 return;
00197 }
00198
00199 const GUIDSeq_var peers = peer_ids(local_id);
00200 if (!peers.ptr()) {
00201 return;
00202 }
00203 for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00204 const ACE_INET_Addr addr = get_locator(peers[i]);
00205 if (addr != ACE_INET_Addr()) {
00206 addrs.insert(addr);
00207 }
00208 }
00209 }
00210
00211 ACE_INET_Addr
00212 RtpsUdpDataLink::get_locator(const RepoId& remote_id) const
00213 {
00214 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::const_iterator iter_t;
00215 const iter_t iter = locators_.find(remote_id);
00216 if (iter == locators_.end()) {
00217 const GuidConverter conv(remote_id);
00218 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::get_locator_i() - "
00219 "no locator found for peer %C\n", OPENDDS_STRING(conv).c_str()));
00220 return ACE_INET_Addr();
00221 }
00222 return iter->second.addr_;
00223 }
00224
00225 void
00226 RtpsUdpDataLink::associated(const RepoId& local_id, const RepoId& remote_id,
00227 bool local_reliable, bool remote_reliable,
00228 bool local_durable, bool remote_durable)
00229 {
00230 if (!local_reliable) {
00231 return;
00232 }
00233
00234 bool enable_heartbeat = false;
00235
00236 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00237 const GuidConverter conv(local_id);
00238 const EntityKind kind = conv.entityKind();
00239 if (kind == KIND_WRITER && remote_reliable) {
00240
00241 heartbeat_counts_.insert(HeartBeatCountMapType::value_type(local_id, 0));
00242 RtpsWriter& w = writers_[local_id];
00243 w.remote_readers_[remote_id].durable_ = remote_durable;
00244 w.durable_ = local_durable;
00245 enable_heartbeat = true;
00246
00247 } else if (kind == KIND_READER) {
00248 RtpsReaderMap::iterator rr = readers_.find(local_id);
00249 if (rr == readers_.end()) {
00250 rr = readers_.insert(RtpsReaderMap::value_type(local_id, RtpsReader()))
00251 .first;
00252 rr->second.durable_ = local_durable;
00253 }
00254 rr->second.remote_writers_[remote_id];
00255 reader_index_.insert(RtpsReaderIndex::value_type(remote_id, rr));
00256 }
00257
00258 g.release();
00259 if (enable_heartbeat) {
00260 heartbeat_.schedule_enable();
00261 }
00262 }
00263
00264 bool
00265 RtpsUdpDataLink::check_handshake_complete(const RepoId& local_id,
00266 const RepoId& remote_id)
00267 {
00268 const GuidConverter conv(local_id);
00269 const EntityKind kind = conv.entityKind();
00270 if (kind == KIND_WRITER) {
00271 RtpsWriterMap::iterator rw = writers_.find(local_id);
00272 if (rw == writers_.end()) {
00273 return true;
00274 }
00275 ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote_id);
00276 if (ri == rw->second.remote_readers_.end()) {
00277 return true;
00278 }
00279 return ri->second.handshake_done_;
00280
00281 } else if (kind == KIND_READER) {
00282 return true;
00283 }
00284 return false;
00285 }
00286
00287 void
00288 RtpsUdpDataLink::register_for_reader(const RepoId& writerid,
00289 const RepoId& readerid,
00290 const ACE_INET_Addr& address,
00291 OpenDDS::DCPS::DiscoveryListener* listener)
00292 {
00293 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00294 bool enableheartbeat = interesting_readers_.empty();
00295 interesting_readers_.insert(InterestingRemoteMapType::value_type(readerid, InterestingRemote(writerid, address, listener)));
00296 heartbeat_counts_[writerid] = 0;
00297 g.release();
00298 if (enableheartbeat) {
00299 heartbeat_.schedule_enable();
00300 }
00301 }
00302
00303 void
00304 RtpsUdpDataLink::unregister_for_reader(const RepoId& writerid,
00305 const RepoId& readerid)
00306 {
00307 OPENDDS_VECTOR(CallbackType) to_notify;
00308 {
00309 ACE_GUARD(ACE_Thread_Mutex, c, reader_no_longer_exists_lock_);
00310 to_notify.swap(readerDoesNotExistCallbacks_);
00311 }
00312 OPENDDS_VECTOR(CallbackType)::iterator iter = to_notify.begin();
00313 while(iter != to_notify.end()) {
00314 const RepoId& rid = iter->first;
00315 const InterestingRemote& remote = iter->second;
00316 remote.listener->reader_does_not_exist(rid, remote.localid);
00317 ++iter;
00318 }
00319 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00320 for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(readerid),
00321 limit = interesting_readers_.upper_bound(readerid);
00322 pos != limit;
00323 ) {
00324 if (pos->second.localid == writerid) {
00325 interesting_readers_.erase(pos++);
00326 } else {
00327 ++pos;
00328 }
00329 }
00330 }
00331
00332 void
00333 RtpsUdpDataLink::register_for_writer(const RepoId& readerid,
00334 const RepoId& writerid,
00335 const ACE_INET_Addr& address,
00336 OpenDDS::DCPS::DiscoveryListener* listener)
00337 {
00338 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00339 bool enableheartbeatchecker = interesting_writers_.empty();
00340 interesting_writers_.insert(InterestingRemoteMapType::value_type(writerid, InterestingRemote(readerid, address, listener)));
00341 g.release();
00342 if (enableheartbeatchecker) {
00343 heartbeatchecker_.schedule_enable();
00344 }
00345 }
00346
00347 void
00348 RtpsUdpDataLink::unregister_for_writer(const RepoId& readerid,
00349 const RepoId& writerid)
00350 {
00351 OPENDDS_VECTOR(CallbackType) to_notify;
00352 {
00353 ACE_GUARD(ACE_Thread_Mutex, c, writer_no_longer_exists_lock_);
00354 to_notify.swap(writerDoesNotExistCallbacks_);
00355 }
00356 OPENDDS_VECTOR(CallbackType)::iterator iter = to_notify.begin();
00357 while(iter != to_notify.end()) {
00358 const RepoId& rid = iter->first;
00359 const InterestingRemote& remote = iter->second;
00360 remote.listener->writer_does_not_exist(rid, remote.localid);
00361 ++iter;
00362 }
00363 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00364 for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(writerid),
00365 limit = interesting_writers_.upper_bound(writerid);
00366 pos != limit;
00367 ) {
00368 if (pos->second.localid == readerid) {
00369 interesting_writers_.erase(pos++);
00370 } else {
00371 ++pos;
00372 }
00373 }
00374 }
00375
00376 void
00377 RtpsUdpDataLink::pre_stop_i()
00378 {
00379 DBG_ENTRY_LVL("RtpsUdpDataLink","pre_stop_i",6);
00380 DataLink::pre_stop_i();
00381 OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00382 OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00383 {
00384 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00385
00386 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00387
00388 RtpsWriterMap::iterator iter = writers_.begin();
00389 while (iter != writers_.end()) {
00390 RtpsWriter& writer = iter->second;
00391 if (!writer.to_deliver_.empty()) {
00392 iter_t iter = writer.to_deliver_.begin();
00393 while (iter != writer.to_deliver_.end()) {
00394 to_deliver.push_back(iter->second);
00395 writer.to_deliver_.erase(iter);
00396 iter = writer.to_deliver_.begin();
00397 }
00398 }
00399 if (!writer.elems_not_acked_.empty()) {
00400 OPENDDS_SET(SequenceNumber) sns_to_release;
00401 iter_t iter = writer.elems_not_acked_.begin();
00402 while (iter != writer.elems_not_acked_.end()) {
00403 to_drop.push_back(iter->second);
00404 sns_to_release.insert(iter->first);
00405 writer.elems_not_acked_.erase(iter);
00406 iter = writer.elems_not_acked_.begin();
00407 }
00408 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00409 while (sns_it != sns_to_release.end()) {
00410 writer.send_buff_->release_acked(*sns_it);
00411 ++sns_it;
00412 }
00413 }
00414 RtpsWriterMap::iterator last = iter;
00415 ++iter;
00416 heartbeat_counts_.erase(last->first);
00417 writers_.erase(last);
00418 }
00419 }
00420 typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00421 tqe_iter deliver_it = to_deliver.begin();
00422 while (deliver_it != to_deliver.end()) {
00423 (*deliver_it)->data_delivered();
00424 ++deliver_it;
00425 }
00426 tqe_iter drop_it = to_drop.begin();
00427 while (drop_it != to_drop.end()) {
00428 (*drop_it)->data_dropped(true);
00429 ++drop_it;
00430 }
00431 }
00432
00433 void
00434 RtpsUdpDataLink::send_i(TransportQueueElement* element, bool relink)
00435 {
00436
00437
00438
00439 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00440 DataLink::send_i(element, relink);
00441 }
00442
00443 void
00444 RtpsUdpDataLink::release_reservations_i(const RepoId& remote_id,
00445 const RepoId& local_id)
00446 {
00447 OPENDDS_VECTOR(TransportQueueElement*) to_deliver;
00448 OPENDDS_VECTOR(TransportQueueElement*) to_drop;
00449 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00450 using std::pair;
00451 const GuidConverter conv(local_id);
00452 const EntityKind kind = conv.entityKind();
00453 if (kind == KIND_WRITER) {
00454 const RtpsWriterMap::iterator rw = writers_.find(local_id);
00455
00456 if (rw != writers_.end()) {
00457 rw->second.remote_readers_.erase(remote_id);
00458
00459 if (rw->second.remote_readers_.empty()) {
00460 RtpsWriter& writer = rw->second;
00461 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
00462
00463 if (!writer.to_deliver_.empty()) {
00464 iter_t iter = writer.to_deliver_.begin();
00465 while (iter != writer.to_deliver_.end()) {
00466 to_deliver.push_back(iter->second);
00467 writer.to_deliver_.erase(iter);
00468 iter = writer.to_deliver_.begin();
00469 }
00470 }
00471 if (!writer.elems_not_acked_.empty()) {
00472 OPENDDS_SET(SequenceNumber) sns_to_release;
00473 iter_t iter = writer.elems_not_acked_.begin();
00474 while (iter != writer.elems_not_acked_.end()) {
00475 to_drop.push_back(iter->second);
00476 sns_to_release.insert(iter->first);
00477 writer.elems_not_acked_.erase(iter);
00478 iter = writer.elems_not_acked_.begin();
00479 }
00480 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
00481 while (sns_it != sns_to_release.end()) {
00482 writer.send_buff_->release_acked(*sns_it);
00483 ++sns_it;
00484 }
00485 }
00486 heartbeat_counts_.erase(rw->first);
00487 writers_.erase(rw);
00488 }
00489 }
00490
00491 } else if (kind == KIND_READER) {
00492 const RtpsReaderMap::iterator rr = readers_.find(local_id);
00493
00494 if (rr != readers_.end()) {
00495 rr->second.remote_writers_.erase(remote_id);
00496
00497 for (pair<RtpsReaderIndex::iterator, RtpsReaderIndex::iterator> iters =
00498 reader_index_.equal_range(remote_id);
00499 iters.first != iters.second;) {
00500 if (iters.first->second == rr) {
00501 reader_index_.erase(iters.first++);
00502 } else {
00503 ++iters.first;
00504 }
00505 }
00506
00507 if (rr->second.remote_writers_.empty()) {
00508 readers_.erase(rr);
00509 }
00510 }
00511 }
00512 g.release();
00513 typedef OPENDDS_VECTOR(TransportQueueElement*)::iterator tqe_iter;
00514 tqe_iter deliver_it = to_deliver.begin();
00515 while (deliver_it != to_deliver.end()) {
00516 (*deliver_it)->data_delivered();
00517 ++deliver_it;
00518 }
00519 tqe_iter drop_it = to_drop.begin();
00520 while (drop_it != to_drop.end()) {
00521 (*drop_it)->data_dropped(true);
00522 ++drop_it;
00523 }
00524 }
00525
00526 void
00527 RtpsUdpDataLink::stop_i()
00528 {
00529 nack_reply_.cancel();
00530 heartbeat_reply_.cancel();
00531 heartbeat_.disable();
00532 unicast_socket_.close();
00533 multicast_socket_.close();
00534 }
00535
00536
00537
00538
00539 void
00540 RtpsUdpDataLink::MultiSendBuffer::retain_all(RepoId pub_id)
00541 {
00542 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_);
00543 const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id);
00544 if (wi != outer_->writers_.end() && !wi->second.send_buff_.is_nil()) {
00545 wi->second.send_buff_->retain_all(pub_id);
00546 }
00547 }
00548
00549 void
00550 RtpsUdpDataLink::MultiSendBuffer::insert(SequenceNumber ,
00551 TransportSendStrategy::QueueType* q,
00552 ACE_Message_Block* chain)
00553 {
00554
00555
00556 const TransportQueueElement* const tqe = q->peek();
00557 const SequenceNumber seq = tqe->sequence();
00558 if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00559 return;
00560 }
00561
00562 const RepoId pub_id = tqe->publication_id();
00563
00564 const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id);
00565 if (wi == outer_->writers_.end()) {
00566 return;
00567 }
00568
00569 RcHandle<SingleSendBuffer>& send_buff = wi->second.send_buff_;
00570
00571 if (send_buff.is_nil()) {
00572 send_buff = new SingleSendBuffer(SingleSendBuffer::UNLIMITED, 1 );
00573
00574 send_buff->bind(outer_->send_strategy_.in());
00575 }
00576
00577 if (Transport_debug_level > 5) {
00578 const GuidConverter pub(pub_id);
00579 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert() - "
00580 "pub_id %C seq %q frag %d\n", OPENDDS_STRING(pub).c_str(), seq.getValue(),
00581 (int)tqe->is_fragment()));
00582 }
00583
00584 if (tqe->is_fragment()) {
00585 const RtpsCustomizedElement* const rce =
00586 dynamic_cast<const RtpsCustomizedElement*>(tqe);
00587 if (rce) {
00588 send_buff->insert_fragment(seq, rce->last_fragment(), q, chain);
00589 } else if (Transport_debug_level) {
00590 const GuidConverter pub(pub_id);
00591 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert()"
00592 " - ERROR: couldn't get fragment number for pub_id %C seq %q\n",
00593 OPENDDS_STRING(pub).c_str(), seq.getValue()));
00594 }
00595 } else {
00596 send_buff->insert(seq, q, chain);
00597 }
00598 }
00599
00600
00601
00602 namespace {
00603 ACE_Message_Block* submsgs_to_msgblock(const RTPS::SubmessageSeq& subm)
00604 {
00605 size_t size = 0, padding = 0;
00606 for (CORBA::ULong i = 0; i < subm.length(); ++i) {
00607 if ((size + padding) % 4) {
00608 padding += 4 - ((size + padding) % 4);
00609 }
00610 gen_find_size(subm[i], size, padding);
00611 }
00612
00613 ACE_Message_Block* hdr = new ACE_Message_Block(size + padding);
00614
00615 for (CORBA::ULong i = 0; i < subm.length(); ++i) {
00616
00617 Serializer ser(hdr, false, Serializer::ALIGN_CDR);
00618 ser << subm[i];
00619 const size_t len = hdr->length();
00620 if (len % 4) {
00621 hdr->wr_ptr(4 - (len % 4));
00622 }
00623 }
00624 return hdr;
00625 }
00626 }
00627
00628 TransportQueueElement*
00629 RtpsUdpDataLink::customize_queue_element(TransportQueueElement* element)
00630 {
00631 const ACE_Message_Block* msg = element->msg();
00632 if (!msg) {
00633 return element;
00634 }
00635
00636 const RepoId pub_id = element->publication_id();
00637
00638 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, 0);
00639
00640 RTPS::SubmessageSeq subm;
00641
00642 const RtpsWriterMap::iterator rw = writers_.find(pub_id);
00643
00644 bool gap_ok = true;
00645 DestToEntityMap gap_receivers;
00646 if (rw != writers_.end() && !rw->second.remote_readers_.empty()) {
00647 for (ReaderInfoMap::iterator ri = rw->second.remote_readers_.begin();
00648 ri != rw->second.remote_readers_.end(); ++ri) {
00649 RepoId tmp;
00650 std::memcpy(tmp.guidPrefix, ri->first.guidPrefix, sizeof(GuidPrefix_t));
00651 tmp.entityId = ENTITYID_UNKNOWN;
00652 gap_receivers[tmp].push_back(ri->first);
00653
00654 if (ri->second.expecting_durable_data()) {
00655
00656
00657
00658
00659 gap_ok = false;
00660 break;
00661 }
00662 }
00663 }
00664
00665 if (gap_ok) {
00666 add_gap_submsg(subm, *element, gap_receivers);
00667 }
00668
00669 const SequenceNumber seq = element->sequence();
00670 if (rw != writers_.end() && seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00671 rw->second.expected_ = seq;
00672 ++rw->second.expected_;
00673 }
00674
00675 TransportSendElement* tse = dynamic_cast<TransportSendElement*>(element);
00676 TransportCustomizedElement* tce =
00677 dynamic_cast<TransportCustomizedElement*>(element);
00678 TransportSendControlElement* tsce =
00679 dynamic_cast<TransportSendControlElement*>(element);
00680
00681 ACE_Message_Block* data = 0;
00682 bool durable = false;
00683
00684
00685
00686 if (tsce) {
00687 if (RtpsSampleHeader::control_message_supported(tsce->header().message_id_)) {
00688 data = msg->cont()->duplicate();
00689
00690 RtpsSampleHeader::populate_data_control_submessages(
00691 subm, *tsce, requires_inline_qos(pub_id));
00692 } else if (tsce->header().message_id_ == END_HISTORIC_SAMPLES) {
00693 end_historic_samples(rw, tsce->header(), msg->cont());
00694 element->data_delivered();
00695 return 0;
00696 } else if (tsce->header().message_id_ == DATAWRITER_LIVELINESS) {
00697 send_heartbeats_manual(tsce);
00698 element->data_delivered();
00699 return 0;
00700 } else {
00701 element->data_dropped(true );
00702 return 0;
00703 }
00704
00705 } else if (tse) {
00706
00707 data = msg->cont()->duplicate();
00708 const DataSampleElement* dsle = tse->sample();
00709
00710 RtpsSampleHeader::populate_data_sample_submessages(
00711 subm, *dsle, requires_inline_qos(pub_id));
00712 durable = dsle->get_header().historic_sample_;
00713
00714 } else if (tce) {
00715
00716 data = msg->cont()->cont()->duplicate();
00717 const DataSampleElement* dsle = tce->original_send_element()->sample();
00718
00719 RtpsSampleHeader::populate_data_sample_submessages(
00720 subm, *dsle, requires_inline_qos(pub_id));
00721 durable = dsle->get_header().historic_sample_;
00722
00723 } else {
00724 return element;
00725 }
00726
00727 ACE_Message_Block* hdr = submsgs_to_msgblock(subm);
00728 hdr->cont(data);
00729 RtpsCustomizedElement* rtps =
00730 RtpsCustomizedElement::alloc(element, hdr,
00731 &rtps_customized_element_allocator_);
00732
00733
00734 if (durable && rw != writers_.end()) {
00735 const RepoId sub = element->subscription_id();
00736 if (sub != GUID_UNKNOWN) {
00737 ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(sub);
00738 if (ri != rw->second.remote_readers_.end()) {
00739 ri->second.durable_data_[rtps->sequence()] = rtps;
00740 ri->second.durable_timestamp_ = ACE_OS::gettimeofday();
00741 if (Transport_debug_level > 3) {
00742 const GuidConverter conv(pub_id), sub_conv(sub);
00743 ACE_DEBUG((LM_DEBUG,
00744 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00745 "storing durable data for local %C remote %C\n",
00746 OPENDDS_STRING(conv).c_str(), OPENDDS_STRING(sub_conv).c_str()));
00747 }
00748 return 0;
00749 }
00750 }
00751 } else if (durable && (Transport_debug_level)) {
00752 const GuidConverter conv(pub_id);
00753 ACE_DEBUG((LM_ERROR,
00754 "(%P|%t) RtpsUdpDataLink::customize_queue_element() - "
00755 "WARNING: no RtpsWriter to store durable data for local %C\n",
00756 OPENDDS_STRING(conv).c_str()));
00757 }
00758
00759 return rtps;
00760 }
00761
00762 void
00763 RtpsUdpDataLink::end_historic_samples(RtpsWriterMap::iterator writer,
00764 const DataSampleHeader& header,
00765 ACE_Message_Block* body)
00766 {
00767
00768
00769 if (writer != writers_.end() && writer->second.durable_) {
00770 const ACE_Time_Value now = ACE_OS::gettimeofday();
00771 RepoId sub = GUID_UNKNOWN;
00772 if (body && header.message_length_ >= sizeof(sub)) {
00773 std::memcpy(&sub, body->rd_ptr(), header.message_length_);
00774 }
00775 typedef ReaderInfoMap::iterator iter_t;
00776 if (sub == GUID_UNKNOWN) {
00777 if (Transport_debug_level > 3) {
00778 const GuidConverter conv(writer->first);
00779 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples "
00780 "local %C all readers\n", OPENDDS_STRING(conv).c_str()));
00781 }
00782 for (iter_t iter = writer->second.remote_readers_.begin();
00783 iter != writer->second.remote_readers_.end(); ++iter) {
00784 if (iter->second.durable_) {
00785 iter->second.durable_timestamp_ = now;
00786 }
00787 }
00788 } else {
00789 iter_t iter = writer->second.remote_readers_.find(sub);
00790 if (iter != writer->second.remote_readers_.end()) {
00791 if (iter->second.durable_) {
00792 iter->second.durable_timestamp_ = now;
00793 if (Transport_debug_level > 3) {
00794 const GuidConverter conv(writer->first), sub_conv(sub);
00795 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::end_historic_samples"
00796 " local %C remote %C\n", OPENDDS_STRING(conv).c_str(),
00797 OPENDDS_STRING(sub_conv).c_str()));
00798 }
00799 }
00800 }
00801 }
00802 }
00803 }
00804
00805 bool
00806 RtpsUdpDataLink::requires_inline_qos(const PublicationId& pub_id)
00807 {
00808 if (force_inline_qos_) {
00809
00810 return true;
00811 } else {
00812 const GUIDSeq_var peers = peer_ids(pub_id);
00813 if (!peers.ptr()) {
00814 return false;
00815 }
00816 typedef OPENDDS_MAP_CMP(RepoId, RemoteInfo, GUID_tKeyLessThan)::iterator iter_t;
00817 for (CORBA::ULong i = 0; i < peers->length(); ++i) {
00818 const iter_t iter = locators_.find(peers[i]);
00819 if (iter != locators_.end() && iter->second.requires_inline_qos_) {
00820 return true;
00821 }
00822 }
00823 return false;
00824 }
00825 }
00826
00827 bool RtpsUdpDataLink::force_inline_qos_ = false;
00828
00829 void
00830 RtpsUdpDataLink::add_gap_submsg(RTPS::SubmessageSeq& msg,
00831 const TransportQueueElement& tqe,
00832 const DestToEntityMap& dtem)
00833 {
00834
00835
00836
00837
00838 using namespace OpenDDS::RTPS;
00839
00840 const SequenceNumber seq = tqe.sequence();
00841 const RepoId pub = tqe.publication_id();
00842 if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN() || pub == GUID_UNKNOWN
00843 || tqe.subscription_id() != GUID_UNKNOWN) {
00844 return;
00845 }
00846
00847 const RtpsWriterMap::iterator wi = writers_.find(pub);
00848 if (wi == writers_.end()) {
00849 return;
00850 }
00851
00852 RtpsWriter& rw = wi->second;
00853
00854 if (seq != rw.expected_) {
00855 SequenceNumber firstMissing = rw.expected_;
00856
00857
00858
00859 const SequenceNumber_t gapStart = {firstMissing.getHigh(),
00860 firstMissing.getLow()},
00861 gapListBase = {seq.getHigh(),
00862 seq.getLow()};
00863
00864
00865
00866 LongSeq8 bitmap;
00867 bitmap.length(1);
00868 bitmap[0] = 0;
00869
00870 GapSubmessage gap = {
00871 {GAP, 1 , 0 },
00872 ENTITYID_UNKNOWN,
00873 pub.entityId,
00874 gapStart,
00875 {gapListBase, 1, bitmap}
00876 };
00877
00878 size_t size = 0, padding = 0;
00879 gen_find_size(gap, size, padding);
00880 gap.smHeader.submessageLength =
00881 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
00882
00883 if (!rw.durable_) {
00884 const CORBA::ULong i = msg.length();
00885 msg.length(i + 1);
00886 msg[i].gap_sm(gap);
00887 } else {
00888 InfoDestinationSubmessage idst = {
00889 {INFO_DST, 1 , INFO_DST_SZ},
00890 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
00891 };
00892 CORBA::ULong ml = msg.length();
00893
00894
00895
00896 DestToEntityMap::const_iterator iter = dtem.begin();
00897 while (iter != dtem.end()) {
00898 std::memcpy(idst.guidPrefix, iter->first.guidPrefix, sizeof(GuidPrefix_t));
00899 msg.length(ml + 1);
00900 msg[ml++].info_dst_sm(idst);
00901
00902 const OPENDDS_VECTOR(RepoId)& readers = iter->second;
00903 for (size_t i = 0; i < readers.size(); ++i) {
00904 gap.readerId = readers.at(i).entityId;
00905 msg.length(ml + 1);
00906 msg[ml++].gap_sm(gap);
00907 }
00908 ++iter;
00909 }
00910 }
00911 }
00912 }
00913
00914
00915
00916
00917 void
00918 RtpsUdpDataLink::received(const RTPS::DataSubmessage& data,
00919 const GuidPrefix_t& src_prefix)
00920 {
00921 datareader_dispatch(data, src_prefix, &RtpsUdpDataLink::process_data_i);
00922 }
00923
00924 bool
00925 RtpsUdpDataLink::process_data_i(const RTPS::DataSubmessage& data,
00926 const RepoId& src,
00927 RtpsReaderMap::value_type& rr)
00928 {
00929 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
00930 if (wi != rr.second.remote_writers_.end()) {
00931 WriterInfo& info = wi->second;
00932 SequenceNumber seq;
00933 seq.setValue(data.writerSN.high, data.writerSN.low);
00934 info.frags_.erase(seq);
00935 const RepoId& readerId = rr.first;
00936 if (info.recvd_.contains(seq)) {
00937 if (Transport_debug_level > 5) {
00938 GuidConverter writer(src);
00939 GuidConverter reader(readerId);
00940 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00941 ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because ALREADY received\n"),
00942 seq.getValue(),
00943 OPENDDS_STRING(writer).c_str(),
00944 OPENDDS_STRING(reader).c_str()));
00945 }
00946 recv_strategy_->withhold_data_from(readerId);
00947 } else if (info.recvd_.disjoint() ||
00948 (!info.recvd_.empty() && info.recvd_.cumulative_ack() != seq.previous())
00949 || (rr.second.durable_ && !info.recvd_.empty() && info.recvd_.low() > 1)
00950 || (rr.second.durable_ && info.recvd_.empty() && seq > 1)) {
00951 if (Transport_debug_level > 5) {
00952 GuidConverter writer(src);
00953 GuidConverter reader(readerId);
00954 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00955 ACE_TEXT(" data seq: %q from %C being WITHHELD from %C because can't receive yet\n"),
00956 seq.getValue(),
00957 OPENDDS_STRING(writer).c_str(),
00958 OPENDDS_STRING(reader).c_str()));
00959 }
00960 const ReceivedDataSample* sample =
00961 recv_strategy_->withhold_data_from(readerId);
00962 info.held_.insert(std::make_pair(seq, *sample));
00963 } else {
00964 if (Transport_debug_level > 5) {
00965 GuidConverter writer(src);
00966 GuidConverter reader(readerId);
00967 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00968 ACE_TEXT(" data seq: %q from %C to %C OK to deliver\n"),
00969 seq.getValue(),
00970 OPENDDS_STRING(writer).c_str(),
00971 OPENDDS_STRING(reader).c_str()));
00972 }
00973 recv_strategy_->do_not_withhold_data_from(readerId);
00974 }
00975 info.recvd_.insert(seq);
00976 deliver_held_data(readerId, info, rr.second.durable_);
00977 } else {
00978 if (Transport_debug_level > 5) {
00979 GuidConverter writer(src);
00980 GuidConverter reader(rr.first);
00981 SequenceNumber seq;
00982 seq.setValue(data.writerSN.high, data.writerSN.low);
00983 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::process_data_i(DataSubmessage) -")
00984 ACE_TEXT(" data seq: %q from %C to %C OK to deliver (Writer not currently in Reader remote writer map)\n"),
00985 seq.getValue(),
00986 OPENDDS_STRING(writer).c_str(),
00987 OPENDDS_STRING(reader).c_str()));
00988 }
00989 recv_strategy_->do_not_withhold_data_from(rr.first);
00990 }
00991 return false;
00992 }
00993
00994 void
00995 RtpsUdpDataLink::deliver_held_data(const RepoId& readerId, WriterInfo& info,
00996 bool durable)
00997 {
00998 if (durable && (info.recvd_.empty() || info.recvd_.low() > 1)) return;
00999 const SequenceNumber ca = info.recvd_.cumulative_ack();
01000 typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter;
01001 const iter end = info.held_.upper_bound(ca);
01002 for (iter it = info.held_.begin(); it != end; ) {
01003 if (Transport_debug_level > 5) {
01004 GuidConverter reader(readerId);
01005 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) RtpsUdpDataLink::deliver_held_data -")
01006 ACE_TEXT(" deliver sequence: %q to %C\n"),
01007 it->second.header_.sequence_.getValue(),
01008 OPENDDS_STRING(reader).c_str()));
01009 }
01010 data_received(it->second, readerId);
01011 info.held_.erase(it++);
01012 }
01013 }
01014
01015 void
01016 RtpsUdpDataLink::received(const RTPS::GapSubmessage& gap,
01017 const GuidPrefix_t& src_prefix)
01018 {
01019 datareader_dispatch(gap, src_prefix, &RtpsUdpDataLink::process_gap_i);
01020 }
01021
01022 bool
01023 RtpsUdpDataLink::process_gap_i(const RTPS::GapSubmessage& gap,
01024 const RepoId& src, RtpsReaderMap::value_type& rr)
01025 {
01026 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01027 if (wi != rr.second.remote_writers_.end()) {
01028 SequenceRange sr;
01029 sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
01030 SequenceNumber base;
01031 base.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
01032 sr.second = base.previous();
01033 if (sr.first <= sr.second) {
01034 if (Transport_debug_level > 5) {
01035 const GuidConverter conv(src);
01036 const GuidConverter rdr(rr.first);
01037 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01038 "Reader %C received GAP with range [%q, %q] from %C\n",
01039 OPENDDS_STRING(rdr).c_str(),
01040 sr.first.getValue(), sr.second.getValue(),
01041 OPENDDS_STRING(conv).c_str()));
01042 }
01043 wi->second.recvd_.insert(sr);
01044 } else {
01045 const GuidConverter conv(src);
01046 VDBG_LVL((LM_WARNING, "(%P|%t) RtpsUdpDataLink::process_gap_i "
01047 "received GAP with invalid range [%q, %q] from %C\n",
01048 sr.first.getValue(), sr.second.getValue(),
01049 OPENDDS_STRING(conv).c_str()), 2);
01050 }
01051 wi->second.recvd_.insert(base, gap.gapList.numBits,
01052 gap.gapList.bitmap.get_buffer());
01053 deliver_held_data(rr.first, wi->second, rr.second.durable_);
01054
01055 }
01056 return false;
01057 }
01058
01059 void
01060 RtpsUdpDataLink::received(const RTPS::HeartBeatSubmessage& heartbeat,
01061 const GuidPrefix_t& src_prefix)
01062 {
01063 RepoId src;
01064 std::memcpy(src.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01065 src.entityId = heartbeat.writerId;
01066
01067 bool schedule_acknack = false;
01068 const ACE_Time_Value now = ACE_OS::gettimeofday();
01069 OPENDDS_VECTOR(InterestingRemote) callbacks;
01070
01071 {
01072 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01073
01074
01075
01076
01077 for (InterestingRemoteMapType::iterator pos = interesting_writers_.lower_bound(src),
01078 limit = interesting_writers_.upper_bound(src);
01079 pos != limit;
01080 ++pos) {
01081 const RepoId& writerid = src;
01082 const RepoId& readerid = pos->second.localid;
01083
01084 RtpsReaderMap::const_iterator riter = readers_.find(readerid);
01085 if (riter == readers_.end()) {
01086
01087 interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01088 } else if (riter->second.remote_writers_.find(writerid) == riter->second.remote_writers_.end()) {
01089
01090 interesting_ack_nacks_.insert (InterestingAckNack(writerid, readerid, pos->second.address));
01091 }
01092 pos->second.last_activity = now;
01093 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01094 callbacks.push_back(pos->second);
01095 pos->second.status = InterestingRemote::EXISTS;
01096 }
01097 }
01098
01099 schedule_acknack = !interesting_ack_nacks_.empty();
01100 }
01101
01102 for (size_t i = 0; i < callbacks.size(); ++i) {
01103 callbacks[i].listener->writer_exists(src, callbacks[i].localid);
01104 }
01105
01106 if (schedule_acknack) {
01107 heartbeat_reply_.schedule();
01108 }
01109
01110 datareader_dispatch(heartbeat, src_prefix,
01111 &RtpsUdpDataLink::process_heartbeat_i);
01112 }
01113
01114 bool
01115 RtpsUdpDataLink::process_heartbeat_i(const RTPS::HeartBeatSubmessage& heartbeat,
01116 const RepoId& src,
01117 RtpsReaderMap::value_type& rr)
01118 {
01119 const WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01120 if (wi == rr.second.remote_writers_.end()) {
01121
01122 return false;
01123 }
01124
01125 WriterInfo& info = wi->second;
01126
01127 if (heartbeat.count.value <= info.heartbeat_recvd_count_) {
01128 return false;
01129 }
01130 info.heartbeat_recvd_count_ = heartbeat.count.value;
01131
01132 SequenceNumber& first = info.hb_range_.first;
01133 first.setValue(heartbeat.firstSN.high, heartbeat.firstSN.low);
01134 SequenceNumber& last = info.hb_range_.second;
01135 last.setValue(heartbeat.lastSN.high, heartbeat.lastSN.low);
01136 static const SequenceNumber starting, zero = SequenceNumber::ZERO();
01137
01138 DisjointSequence& recvd = info.recvd_;
01139 if (!rr.second.durable_ && info.initial_hb_) {
01140 if (last.getValue() < starting.getValue()) {
01141
01142 return false;
01143 }
01144
01145
01146 if (recvd.empty() || recvd.low() >= last) {
01147 recvd.insert(SequenceRange(zero,
01148 (last > starting) ? last.previous() : zero));
01149 } else {
01150 recvd.insert(SequenceRange(zero, recvd.low()));
01151 }
01152 } else if (!recvd.empty()) {
01153
01154
01155 recvd.insert(SequenceRange(zero,
01156 (first > starting) ? first.previous() : zero));
01157 }
01158
01159 deliver_held_data(rr.first, info, rr.second.durable_);
01160
01161
01162
01163
01164 info.initial_hb_ = false;
01165
01166 const bool final = heartbeat.smHeader.flags & 2 ,
01167 liveliness = heartbeat.smHeader.flags & 4 ;
01168
01169 if (!final || (!liveliness && (info.should_nack() ||
01170 rr.second.nack_durable(info) ||
01171 recv_strategy_->has_fragments(info.hb_range_, wi->first)))) {
01172 info.ack_pending_ = true;
01173 return true;
01174 }
01175
01176
01177 return false;
01178 }
01179
01180 bool
01181 RtpsUdpDataLink::WriterInfo::should_nack() const
01182 {
01183 if (recvd_.disjoint() && recvd_.cumulative_ack() < hb_range_.second) {
01184 return true;
01185 }
01186 if (!recvd_.empty()) {
01187 return recvd_.high() < hb_range_.second;
01188 }
01189 return false;
01190 }
01191
01192 bool
01193 RtpsUdpDataLink::RtpsReader::nack_durable(const WriterInfo& info)
01194 {
01195 return durable_ && (info.recvd_.empty() ||
01196 info.recvd_.low() > info.hb_range_.first);
01197 }
01198
01199 void
01200 RtpsUdpDataLink::send_ack_nacks(RtpsReaderMap::iterator rr, bool finalFlag)
01201 {
01202 using namespace OpenDDS::RTPS;
01203
01204 WriterInfoMap& writers = rr->second.remote_writers_;
01205 for (WriterInfoMap::iterator wi = writers.begin(); wi != writers.end();
01206 ++wi) {
01207
01208
01209 DisjointSequence& recvd = wi->second.recvd_;
01210 const bool nack = wi->second.should_nack() ||
01211 rr->second.nack_durable(wi->second);
01212 bool final = finalFlag || !nack;
01213
01214 if (wi->second.ack_pending_ || nack || finalFlag) {
01215 const bool prev_ack_pending = wi->second.ack_pending_;
01216 wi->second.ack_pending_ = false;
01217
01218 SequenceNumber ack;
01219 CORBA::ULong num_bits = 1;
01220 LongSeq8 bitmap;
01221 bitmap.length(1);
01222 bitmap[0] = 0;
01223
01224 const SequenceNumber& hb_low = wi->second.hb_range_.first;
01225 const SequenceNumber& hb_high = wi->second.hb_range_.second;
01226 const SequenceNumber::Value hb_low_val = hb_low.getValue(),
01227 hb_high_val = hb_high.getValue();
01228
01229 if (recvd.empty()) {
01230
01231 ack = hb_low;
01232 bitmap.length(bitmap_num_longs(ack, hb_high));
01233 const CORBA::ULong idx = (hb_high_val > hb_low_val + 255)
01234 ? 255
01235 : CORBA::ULong(hb_high_val - hb_low_val);
01236 DisjointSequence::fill_bitmap_range(0, idx,
01237 bitmap.get_buffer(),
01238 bitmap.length(), num_bits);
01239 } else if (((prev_ack_pending && !nack) || rr->second.nack_durable(wi->second)) && recvd.low() > hb_low) {
01240
01241 ack = hb_low;
01242 const SequenceNumber& rec_low = recvd.low();
01243 const SequenceNumber::Value rec_low_val = rec_low.getValue();
01244 bitmap.length(bitmap_num_longs(ack, rec_low));
01245 const CORBA::ULong idx = (rec_low_val > hb_low_val + 255)
01246 ? 255
01247 : CORBA::ULong(rec_low_val - hb_low_val);
01248 DisjointSequence::fill_bitmap_range(0, idx,
01249 bitmap.get_buffer(),
01250 bitmap.length(), num_bits);
01251
01252 } else {
01253 ack = ++SequenceNumber(recvd.cumulative_ack());
01254 if (recvd.low().getValue() > 1) {
01255
01256
01257 recvd.insert(SequenceRange(SequenceNumber::ZERO(), recvd.low()));
01258 }
01259
01260 if (recvd.disjoint()) {
01261 bitmap.length(bitmap_num_longs(ack, recvd.last_ack().previous()));
01262 recvd.to_bitmap(bitmap.get_buffer(), bitmap.length(),
01263 num_bits, true);
01264 }
01265 }
01266
01267 const SequenceNumber::Value ack_val = ack.getValue();
01268
01269 if (!recvd.empty() && hb_high > recvd.high()) {
01270 const SequenceNumber eff_high =
01271 (hb_high <= ack_val + 255) ? hb_high : (ack_val + 255);
01272 const SequenceNumber::Value eff_high_val = eff_high.getValue();
01273
01274 const CORBA::ULong old_len = bitmap.length(),
01275 new_len = bitmap_num_longs(ack, eff_high);
01276 if (new_len > old_len) {
01277 bitmap.length(new_len);
01278 for (CORBA::ULong i = old_len; i < new_len; ++i) {
01279 bitmap[i] = 0;
01280 }
01281 }
01282 const CORBA::ULong idx_hb_high = CORBA::ULong(eff_high_val - ack_val),
01283 idx_recv_high = recvd.disjoint() ?
01284 CORBA::ULong(recvd.high().getValue() - ack_val) : 0;
01285 DisjointSequence::fill_bitmap_range(idx_recv_high, idx_hb_high,
01286 bitmap.get_buffer(), new_len,
01287 num_bits);
01288 }
01289
01290
01291
01292
01293 bool frags_modified =
01294 recv_strategy_->remove_frags_from_bitmap(bitmap.get_buffer(),
01295 num_bits, ack, wi->first);
01296 if (frags_modified && !final) {
01297 final = true;
01298 for (CORBA::ULong i = 0; i < bitmap.length(); ++i) {
01299 if ((i + 1) * 32 <= num_bits) {
01300 if (bitmap[i]) {
01301 final = false;
01302 break;
01303 }
01304 } else {
01305 if ((0xffffffff << (32 - (num_bits % 32))) & bitmap[i]) {
01306 final = false;
01307 break;
01308 }
01309 }
01310 }
01311 }
01312
01313 AckNackSubmessage acknack = {
01314 {ACKNACK,
01315 CORBA::Octet(1 | (final ? 2 : 0)),
01316 0 },
01317 rr->first.entityId,
01318 wi->first.entityId,
01319 {
01320 {ack.getHigh(), ack.getLow()},
01321 num_bits, bitmap
01322 },
01323 {++wi->second.acknack_count_}
01324 };
01325
01326 size_t size = 0, padding = 0;
01327 gen_find_size(acknack, size, padding);
01328 acknack.smHeader.submessageLength =
01329 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01330 InfoDestinationSubmessage info_dst = {
01331 {INFO_DST, 1 , INFO_DST_SZ},
01332 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01333 };
01334 gen_find_size(info_dst, size, padding);
01335
01336 OPENDDS_VECTOR(NackFragSubmessage) nack_frags;
01337 size += generate_nack_frags(nack_frags, wi->second, wi->first);
01338
01339 ACE_Message_Block mb_acknack(size + padding);
01340
01341 Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01342 std::memcpy(info_dst.guidPrefix, wi->first.guidPrefix,
01343 sizeof(GuidPrefix_t));
01344 ser << info_dst;
01345
01346
01347 ser << acknack;
01348 for (size_t i = 0; i < nack_frags.size(); ++i) {
01349 nack_frags[i].readerId = rr->first.entityId;
01350 nack_frags[i].writerId = wi->first.entityId;
01351 ser << nack_frags[i];
01352 }
01353
01354 if (!locators_.count(wi->first)) {
01355 if (Transport_debug_level) {
01356 const GuidConverter conv(wi->first);
01357 ACE_DEBUG((LM_ERROR,
01358 "(%P|%t) RtpsUdpDataLink::send_heartbeat_replies() - "
01359 "no locator for remote %C\n", OPENDDS_STRING(conv).c_str()));
01360 }
01361 } else {
01362 send_strategy_->send_rtps_control(mb_acknack,
01363 locators_[wi->first].addr_);
01364 }
01365 }
01366 }
01367 }
01368
01369 void
01370 RtpsUdpDataLink::send_heartbeat_replies()
01371 {
01372 using namespace OpenDDS::RTPS;
01373 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01374
01375 for (InterestingAckNackSetType::const_iterator pos = interesting_ack_nacks_.begin(),
01376 limit = interesting_ack_nacks_.end();
01377 pos != limit;
01378 ++pos) {
01379
01380 SequenceNumber ack;
01381 LongSeq8 bitmap;
01382 bitmap.length(0);
01383
01384 AckNackSubmessage acknack = {
01385 {ACKNACK,
01386 CORBA::Octet(1 | 2 ),
01387 0 },
01388 pos->readerid.entityId,
01389 pos->writerid.entityId,
01390 {
01391 {ack.getHigh(), ack.getLow()},
01392 0 , bitmap
01393 },
01394 {0 }
01395 };
01396
01397 size_t size = 0, padding = 0;
01398 gen_find_size(acknack, size, padding);
01399 acknack.smHeader.submessageLength =
01400 static_cast<CORBA::UShort>(size + padding) - SMHDR_SZ;
01401 InfoDestinationSubmessage info_dst = {
01402 {INFO_DST, 1 , INFO_DST_SZ},
01403 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
01404 };
01405 gen_find_size(info_dst, size, padding);
01406
01407 ACE_Message_Block mb_acknack(size + padding);
01408
01409 Serializer ser(&mb_acknack, false, Serializer::ALIGN_CDR);
01410 std::memcpy(info_dst.guidPrefix, pos->writerid.guidPrefix,
01411 sizeof(GuidPrefix_t));
01412 ser << info_dst;
01413
01414
01415 ser << acknack;
01416
01417 send_strategy_->send_rtps_control(mb_acknack, pos->writer_address);
01418 }
01419 interesting_ack_nacks_.clear();
01420
01421 for (RtpsReaderMap::iterator rr = readers_.begin(); rr != readers_.end();
01422 ++rr) {
01423 send_ack_nacks (rr);
01424 }
01425 }
01426
01427 size_t
01428 RtpsUdpDataLink::generate_nack_frags(OPENDDS_VECTOR(RTPS::NackFragSubmessage)& nf,
01429 WriterInfo& wi, const RepoId& pub_id)
01430 {
01431 typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumber_t)::iterator iter_t;
01432 typedef RtpsUdpReceiveStrategy::FragmentInfo::value_type Frag_t;
01433 RtpsUdpReceiveStrategy::FragmentInfo frag_info;
01434
01435
01436
01437 OPENDDS_VECTOR(SequenceRange) missing = wi.recvd_.missing_sequence_ranges();
01438 for (size_t i = 0; i < missing.size(); ++i) {
01439 recv_strategy_->has_fragments(missing[i], pub_id, &frag_info);
01440 }
01441
01442 if (!wi.recvd_.empty()) {
01443 const SequenceRange range(wi.recvd_.high(), wi.hb_range_.second);
01444 recv_strategy_->has_fragments(range, pub_id, &frag_info);
01445 }
01446 for (size_t i = 0; i < frag_info.size(); ++i) {
01447
01448 const iter_t heartbeat_frag = wi.frags_.find(frag_info[i].first);
01449 if (heartbeat_frag != wi.frags_.end()) {
01450 extend_bitmap_range(frag_info[i].second, heartbeat_frag->second.value);
01451 }
01452 }
01453
01454
01455 const iter_t low = wi.frags_.lower_bound(wi.recvd_.cumulative_ack()),
01456 high = wi.frags_.upper_bound(wi.recvd_.last_ack()),
01457 end = wi.frags_.end();
01458 for (iter_t iter = wi.frags_.begin(); iter != end; ++iter) {
01459 if (iter == low) {
01460
01461 if (high == end) {
01462 break;
01463 }
01464 iter = high;
01465 }
01466
01467 const SequenceRange range(iter->first, iter->first);
01468 if (recv_strategy_->has_fragments(range, pub_id, &frag_info)) {
01469 extend_bitmap_range(frag_info.back().second, iter->second.value);
01470 } else {
01471
01472 frag_info.push_back(Frag_t(iter->first, RTPS::FragmentNumberSet()));
01473 RTPS::FragmentNumberSet& fnSet = frag_info.back().second;
01474 fnSet.bitmapBase.value = 1;
01475 fnSet.numBits = std::min(CORBA::ULong(256), iter->second.value);
01476 fnSet.bitmap.length((fnSet.numBits + 31) / 32);
01477 for (CORBA::ULong i = 0; i < fnSet.bitmap.length(); ++i) {
01478 fnSet.bitmap[i] = 0xFFFFFFFF;
01479 }
01480 }
01481 }
01482
01483 if (frag_info.empty()) {
01484 return 0;
01485 }
01486
01487 const RTPS::NackFragSubmessage nackfrag_prototype = {
01488 {RTPS::NACK_FRAG, 1 , 0 },
01489 ENTITYID_UNKNOWN,
01490 ENTITYID_UNKNOWN,
01491 {0, 0},
01492 RTPS::FragmentNumberSet(),
01493 {0}
01494 };
01495
01496 size_t size = 0, padding = 0;
01497 for (size_t i = 0; i < frag_info.size(); ++i) {
01498 nf.push_back(nackfrag_prototype);
01499 RTPS::NackFragSubmessage& nackfrag = nf.back();
01500 nackfrag.writerSN.low = frag_info[i].first.getLow();
01501 nackfrag.writerSN.high = frag_info[i].first.getHigh();
01502 nackfrag.fragmentNumberState = frag_info[i].second;
01503 nackfrag.count.value = ++wi.nackfrag_count_;
01504 const size_t before_size = size;
01505 gen_find_size(nackfrag, size, padding);
01506 nackfrag.smHeader.submessageLength =
01507 static_cast<CORBA::UShort>(size - before_size) - RTPS::SMHDR_SZ;
01508 }
01509 return size;
01510 }
01511
01512 void
01513 RtpsUdpDataLink::extend_bitmap_range(RTPS::FragmentNumberSet& fnSet,
01514 CORBA::ULong extent)
01515 {
01516 if (extent < fnSet.bitmapBase.value) {
01517 return;
01518 }
01519
01520 const CORBA::ULong new_num_bits = std::min(CORBA::ULong(255),
01521 extent - fnSet.bitmapBase.value + 1),
01522 len = (new_num_bits + 31) / 32;
01523 if (new_num_bits < fnSet.numBits) {
01524 return;
01525 }
01526 fnSet.bitmap.length(len);
01527
01528 DisjointSequence::fill_bitmap_range(fnSet.numBits + 1, new_num_bits,
01529 fnSet.bitmap.get_buffer(), len,
01530 fnSet.numBits);
01531 }
01532
01533 void
01534 RtpsUdpDataLink::received(const RTPS::HeartBeatFragSubmessage& hb_frag,
01535 const GuidPrefix_t& src_prefix)
01536 {
01537 datareader_dispatch(hb_frag, src_prefix, &RtpsUdpDataLink::process_hb_frag_i);
01538 }
01539
01540 bool
01541 RtpsUdpDataLink::process_hb_frag_i(const RTPS::HeartBeatFragSubmessage& hb_frag,
01542 const RepoId& src,
01543 RtpsReaderMap::value_type& rr)
01544 {
01545 WriterInfoMap::iterator wi = rr.second.remote_writers_.find(src);
01546 if (wi == rr.second.remote_writers_.end()) {
01547
01548 return false;
01549 }
01550
01551 if (hb_frag.count.value <= wi->second.hb_frag_recvd_count_) {
01552 return false;
01553 }
01554
01555 wi->second.hb_frag_recvd_count_ = hb_frag.count.value;
01556
01557 SequenceNumber seq;
01558 seq.setValue(hb_frag.writerSN.high, hb_frag.writerSN.low);
01559
01560
01561
01562
01563
01564 if (seq < wi->second.hb_range_.first || seq > wi->second.hb_range_.second
01565 || !wi->second.recvd_.contains(seq)) {
01566 wi->second.frags_[seq] = hb_frag.lastFragmentNum;
01567 wi->second.ack_pending_ = true;
01568 return true;
01569 }
01570 return false;
01571 }
01572
01573
01574
01575
01576 void
01577 RtpsUdpDataLink::received(const RTPS::AckNackSubmessage& acknack,
01578 const GuidPrefix_t& src_prefix)
01579 {
01580
01581 RepoId local;
01582 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01583 local.entityId = acknack.writerId;
01584
01585 RepoId remote;
01586 std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01587 remote.entityId = acknack.readerId;
01588
01589 const ACE_Time_Value now = ACE_OS::gettimeofday();
01590 OPENDDS_VECTOR(DiscoveryListener*) callbacks;
01591
01592 {
01593 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01594 for (InterestingRemoteMapType::iterator pos = interesting_readers_.lower_bound(remote),
01595 limit = interesting_readers_.upper_bound(remote);
01596 pos != limit;
01597 ++pos) {
01598 pos->second.last_activity = now;
01599
01600 if (local == pos->second.localid) {
01601 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST) {
01602 callbacks.push_back(pos->second.listener);
01603 pos->second.status = InterestingRemote::EXISTS;
01604 }
01605 }
01606 }
01607 }
01608
01609 for (size_t i = 0; i < callbacks.size(); ++i) {
01610 callbacks[i]->reader_exists(remote, local);
01611 }
01612
01613 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01614 const RtpsWriterMap::iterator rw = writers_.find(local);
01615 if (rw == writers_.end()) {
01616 if (Transport_debug_level > 5) {
01617 GuidConverter local_conv(local);
01618 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01619 "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01620 }
01621 return;
01622 }
01623
01624 if (Transport_debug_level > 5) {
01625 GuidConverter local_conv(local), remote_conv(remote);
01626 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01627 "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01628 OPENDDS_STRING(remote_conv).c_str()));
01629 }
01630
01631 const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01632 if (ri == rw->second.remote_readers_.end()) {
01633 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01634 "WARNING ReaderInfo not found\n"));
01635 return;
01636 }
01637
01638 if (acknack.count.value <= ri->second.acknack_recvd_count_) {
01639 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(ACKNACK) "
01640 "WARNING Count indicates duplicate, dropping\n"));
01641 return;
01642 }
01643
01644 ri->second.acknack_recvd_count_ = acknack.count.value;
01645
01646 if (!ri->second.handshake_done_) {
01647 ri->second.handshake_done_ = true;
01648 invoke_on_start_callbacks(true);
01649 }
01650
01651 OPENDDS_MAP(SequenceNumber, TransportQueueElement*) pendingCallbacks;
01652 const bool final = acknack.smHeader.flags & 2 ;
01653
01654 if (!ri->second.durable_data_.empty()) {
01655 if (Transport_debug_level > 5) {
01656 const GuidConverter local_conv(local), remote_conv(remote);
01657 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01658 "local %C has durable for remote %C\n",
01659 OPENDDS_STRING(local_conv).c_str(),
01660 OPENDDS_STRING(remote_conv).c_str()));
01661 }
01662 SequenceNumber ack;
01663 ack.setValue(acknack.readerSNState.bitmapBase.high,
01664 acknack.readerSNState.bitmapBase.low);
01665 const SequenceNumber& dd_last = ri->second.durable_data_.rbegin()->first;
01666 if (ack > dd_last) {
01667
01668 ri->second.durable_data_.swap(pendingCallbacks);
01669 if (Transport_debug_level > 5) {
01670 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01671 "durable data acked\n"));
01672 }
01673 } else {
01674 DisjointSequence requests;
01675 if (!requests.insert(ack, acknack.readerSNState.numBits,
01676 acknack.readerSNState.bitmap.get_buffer())
01677 && !final && ack == rw->second.heartbeat_high(ri->second)) {
01678
01679
01680
01681 if (ri->second.durable_data_.count(ack)) requests.insert(ack);
01682 }
01683
01684 bool sent_some = false;
01685 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01686 iter_t it = ri->second.durable_data_.begin();
01687 const OPENDDS_VECTOR(SequenceRange) psr = requests.present_sequence_ranges();
01688 SequenceNumber lastSent = SequenceNumber::ZERO();
01689 if (!requests.empty()) {
01690 lastSent = requests.low().previous();
01691 }
01692 DisjointSequence gaps;
01693 for (size_t i = 0; i < psr.size(); ++i) {
01694 for (; it != ri->second.durable_data_.end()
01695 && it->first < psr[i].first; ++it) ;
01696 for (; it != ri->second.durable_data_.end()
01697 && it->first <= psr[i].second; ++it) {
01698 if (Transport_debug_level > 5) {
01699 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01700 "durable resend %d\n", int(it->first.getValue())));
01701 }
01702 durability_resend(it->second);
01703
01704 sent_some = true;
01705 if (it->first > lastSent + 1) {
01706 gaps.insert(SequenceRange(lastSent + 1, it->first.previous()));
01707 }
01708 lastSent = it->first;
01709 }
01710 if (sent_some && lastSent < psr[i].second && psr[i].second < dd_last) {
01711 gaps.insert(SequenceRange(lastSent + 1, psr[i].second));
01712 }
01713 }
01714 if (!gaps.empty()) {
01715 if (Transport_debug_level > 5) {
01716 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01717 "sending durability gaps: "));
01718 gaps.dump();
01719 }
01720 send_durability_gaps(local, remote, gaps);
01721 }
01722 if (sent_some) {
01723 return;
01724 }
01725 const SequenceNumber& dd_first = ri->second.durable_data_.begin()->first;
01726 if (!requests.empty() && requests.high() < dd_first) {
01727
01728 if (Transport_debug_level > 5) {
01729 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01730 "sending durability gaps for all requests: "));
01731 requests.dump();
01732 }
01733 send_durability_gaps(local, remote, requests);
01734 return;
01735 }
01736 if (!requests.empty() && requests.low() < dd_first) {
01737
01738 for (size_t i = 0; i < psr.size(); ++i) {
01739 if (psr[i].first > dd_first) {
01740 break;
01741 }
01742 gaps.insert(SequenceRange(psr[i].first,
01743 std::min(psr[i].second, dd_first)));
01744 }
01745 if (Transport_debug_level > 5) {
01746 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::received(ACKNACK) "
01747 "sending durability gaps for some requests: "));
01748 gaps.dump();
01749 }
01750 send_durability_gaps(local, remote, gaps);
01751 return;
01752 }
01753 }
01754 }
01755 SequenceNumber ack;
01756 ack.setValue(acknack.readerSNState.bitmapBase.high,
01757 acknack.readerSNState.bitmapBase.low);
01758 if (ack != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
01759 && ack != SequenceNumber::ZERO()) {
01760 ri->second.cur_cumulative_ack_ = ack;
01761 }
01762
01763
01764 if (!final) {
01765 ri->second.requested_changes_.push_back(acknack.readerSNState);
01766 }
01767 process_acked_by_all_i(g, local);
01768 g.release();
01769 if (!final) {
01770 nack_reply_.schedule();
01771 }
01772 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01773 for (iter_t it = pendingCallbacks.begin();
01774 it != pendingCallbacks.end(); ++it) {
01775 it->second->data_delivered();
01776 }
01777 }
01778
01779 void
01780 RtpsUdpDataLink::received(const RTPS::NackFragSubmessage& nackfrag,
01781 const GuidPrefix_t& src_prefix)
01782 {
01783
01784 RepoId local;
01785 std::memcpy(local.guidPrefix, local_prefix_, sizeof(GuidPrefix_t));
01786 local.entityId = nackfrag.writerId;
01787
01788 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01789 const RtpsWriterMap::iterator rw = writers_.find(local);
01790 if (rw == writers_.end()) {
01791 if (Transport_debug_level > 5) {
01792 GuidConverter local_conv(local);
01793 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01794 "WARNING local %C no RtpsWriter\n", OPENDDS_STRING(local_conv).c_str()));
01795 }
01796 return;
01797 }
01798
01799 RepoId remote;
01800 std::memcpy(remote.guidPrefix, src_prefix, sizeof(GuidPrefix_t));
01801 remote.entityId = nackfrag.readerId;
01802
01803 if (Transport_debug_level > 5) {
01804 GuidConverter local_conv(local), remote_conv(remote);
01805 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01806 "local %C remote %C\n", OPENDDS_STRING(local_conv).c_str(),
01807 OPENDDS_STRING(remote_conv).c_str()));
01808 }
01809
01810 const ReaderInfoMap::iterator ri = rw->second.remote_readers_.find(remote);
01811 if (ri == rw->second.remote_readers_.end()) {
01812 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01813 "WARNING ReaderInfo not found\n"));
01814 return;
01815 }
01816
01817 if (nackfrag.count.value <= ri->second.nackfrag_recvd_count_) {
01818 VDBG((LM_WARNING, "(%P|%t) RtpsUdpDataLink::received(NACK_FRAG) "
01819 "WARNING Count indicates duplicate, dropping\n"));
01820 return;
01821 }
01822
01823 ri->second.nackfrag_recvd_count_ = nackfrag.count.value;
01824
01825 SequenceNumber seq;
01826 seq.setValue(nackfrag.writerSN.high, nackfrag.writerSN.low);
01827 ri->second.requested_frags_[seq] = nackfrag.fragmentNumberState;
01828 g.release();
01829 nack_reply_.schedule();
01830 }
01831
01832 void
01833 RtpsUdpDataLink::send_nack_replies()
01834 {
01835 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01836
01837 using namespace OpenDDS::RTPS;
01838 typedef RtpsWriterMap::iterator rw_iter;
01839 for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
01840
01841
01842 OPENDDS_SET(ACE_INET_Addr) recipients;
01843 DisjointSequence requests;
01844 RtpsWriter& writer = rw->second;
01845
01846
01847 SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
01848
01849 typedef ReaderInfoMap::iterator ri_iter;
01850 const ri_iter end = writer.remote_readers_.end();
01851 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01852
01853 if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
01854 all_readers_ack = ri->second.cur_cumulative_ack_;
01855 }
01856
01857 for (size_t i = 0; i < ri->second.requested_changes_.size(); ++i) {
01858 const SequenceNumberSet& sn_state = ri->second.requested_changes_[i];
01859 SequenceNumber base;
01860 base.setValue(sn_state.bitmapBase.high, sn_state.bitmapBase.low);
01861 if (sn_state.numBits == 1 && !(sn_state.bitmap[0] & 1)
01862 && base == writer.heartbeat_high(ri->second)) {
01863
01864
01865
01866 if (!writer.send_buff_.is_nil() && writer.send_buff_->contains(base)) {
01867 requests.insert(base);
01868 }
01869 } else {
01870 requests.insert(base, sn_state.numBits, sn_state.bitmap.get_buffer());
01871 }
01872 }
01873
01874 if (ri->second.requested_changes_.size()) {
01875 if (locators_.count(ri->first)) {
01876 recipients.insert(locators_[ri->first].addr_);
01877 if (Transport_debug_level > 5) {
01878 const GuidConverter local_conv(rw->first), remote_conv(ri->first);
01879 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01880 "local %C remote %C requested resend\n",
01881 OPENDDS_STRING(local_conv).c_str(),
01882 OPENDDS_STRING(remote_conv).c_str()));
01883 }
01884 }
01885 ri->second.requested_changes_.clear();
01886 }
01887 }
01888
01889 DisjointSequence gaps;
01890 if (!requests.empty()) {
01891 if (writer.send_buff_.is_nil() || writer.send_buff_->empty()) {
01892 gaps = requests;
01893 } else {
01894 OPENDDS_VECTOR(SequenceRange) ranges = requests.present_sequence_ranges();
01895 SingleSendBuffer& sb = *writer.send_buff_;
01896 ACE_GUARD(TransportSendBuffer::LockType, guard, sb.strategy_lock());
01897 const RtpsUdpSendStrategy::OverrideToken ot =
01898 send_strategy_->override_destinations(recipients);
01899 for (size_t i = 0; i < ranges.size(); ++i) {
01900 if (Transport_debug_level > 5) {
01901 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01902 "resend data %d-%d\n", int(ranges[i].first.getValue()),
01903 int(ranges[i].second.getValue())));
01904 }
01905 sb.resend_i(ranges[i], &gaps);
01906 }
01907 }
01908 }
01909
01910 send_nackfrag_replies(writer, gaps, recipients);
01911
01912 if (!gaps.empty()) {
01913 if (Transport_debug_level > 5) {
01914 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::send_nack_replies "
01915 "GAPs:"));
01916 gaps.dump();
01917 }
01918 ACE_Message_Block* mb_gap =
01919 marshal_gaps(rw->first, GUID_UNKNOWN, gaps, writer.durable_);
01920 if (mb_gap) {
01921 send_strategy_->send_rtps_control(*mb_gap, recipients);
01922 mb_gap->release();
01923 }
01924 }
01925 if (all_readers_ack == SequenceNumber::MAX_VALUE) {
01926 continue;
01927 }
01928 }
01929 }
01930
01931 void
01932 RtpsUdpDataLink::send_nackfrag_replies(RtpsWriter& writer,
01933 DisjointSequence& gaps,
01934 OPENDDS_SET(ACE_INET_Addr)& gap_recipients)
01935 {
01936 typedef OPENDDS_MAP(SequenceNumber, DisjointSequence) FragmentInfo;
01937 OPENDDS_MAP(ACE_INET_Addr, FragmentInfo) requests;
01938
01939 typedef ReaderInfoMap::iterator ri_iter;
01940 const ri_iter end = writer.remote_readers_.end();
01941 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
01942
01943 if (ri->second.requested_frags_.empty() || !locators_.count(ri->first)) {
01944 continue;
01945 }
01946
01947 const ACE_INET_Addr& remote_addr = locators_[ri->first].addr_;
01948
01949 typedef OPENDDS_MAP(SequenceNumber, RTPS::FragmentNumberSet)::iterator rf_iter;
01950 const rf_iter rf_end = ri->second.requested_frags_.end();
01951 for (rf_iter rf = ri->second.requested_frags_.begin(); rf != rf_end; ++rf) {
01952
01953 const SequenceNumber& seq = rf->first;
01954 if (writer.send_buff_->contains(seq)) {
01955 FragmentInfo& fi = requests[remote_addr];
01956 fi[seq].insert(rf->second.bitmapBase.value, rf->second.numBits,
01957 rf->second.bitmap.get_buffer());
01958 } else {
01959 gaps.insert(seq);
01960 gap_recipients.insert(remote_addr);
01961 }
01962 }
01963 ri->second.requested_frags_.clear();
01964 }
01965
01966 typedef OPENDDS_MAP(ACE_INET_Addr, FragmentInfo)::iterator req_iter;
01967 for (req_iter req = requests.begin(); req != requests.end(); ++req) {
01968 const FragmentInfo& fi = req->second;
01969
01970 ACE_GUARD(TransportSendBuffer::LockType, guard,
01971 writer.send_buff_->strategy_lock());
01972 const RtpsUdpSendStrategy::OverrideToken ot =
01973 send_strategy_->override_destinations(req->first);
01974
01975 for (FragmentInfo::const_iterator sn_iter = fi.begin();
01976 sn_iter != fi.end(); ++sn_iter) {
01977 const SequenceNumber& seq = sn_iter->first;
01978 writer.send_buff_->resend_fragments_i(seq, sn_iter->second);
01979 }
01980 }
01981 }
01982
01983 void
01984 RtpsUdpDataLink::process_acked_by_all_i(ACE_Guard<ACE_Thread_Mutex>& g, const RepoId& pub_id)
01985 {
01986 using namespace OpenDDS::RTPS;
01987 typedef RtpsWriterMap::iterator rw_iter;
01988 typedef OPENDDS_MULTIMAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
01989 OPENDDS_VECTOR(RepoId) to_check;
01990 rw_iter rw = writers_.find(pub_id);
01991 if (rw == writers_.end()) {
01992 return;
01993 }
01994 RtpsWriter& writer = rw->second;
01995 if (!writer.elems_not_acked_.empty()) {
01996
01997
01998
01999 SequenceNumber all_readers_ack = SequenceNumber::MAX_VALUE;
02000
02001 typedef ReaderInfoMap::iterator ri_iter;
02002 const ri_iter end = writer.remote_readers_.end();
02003 for (ri_iter ri = writer.remote_readers_.begin(); ri != end; ++ri) {
02004 if (ri->second.cur_cumulative_ack_ < all_readers_ack) {
02005 all_readers_ack = ri->second.cur_cumulative_ack_;
02006 }
02007 }
02008 if (all_readers_ack == SequenceNumber::MAX_VALUE) {
02009 return;
02010 }
02011 OPENDDS_VECTOR(SequenceNumber) sns;
02012
02013
02014 iter_t it = writer.elems_not_acked_.begin();
02015 OPENDDS_SET(SequenceNumber) sns_to_release;
02016 while (it != writer.elems_not_acked_.end()) {
02017 if (it->first < all_readers_ack) {
02018 writer.to_deliver_.insert(RtpsWriter::SnToTqeMap::value_type(it->first, it->second));
02019 sns_to_release.insert(it->first);
02020 iter_t last = it;
02021 ++it;
02022 writer.elems_not_acked_.erase(last);
02023 } else {
02024 break;
02025 }
02026 }
02027 OPENDDS_SET(SequenceNumber)::iterator sns_it = sns_to_release.begin();
02028 while (sns_it != sns_to_release.end()) {
02029 writer.send_buff_->release_acked(*sns_it);
02030 ++sns_it;
02031 }
02032 TransportQueueElement* tqe_to_deliver;
02033
02034 while (true) {
02035 rw_iter deliver_on_writer = writers_.find(pub_id);
02036 if (deliver_on_writer == writers_.end()) {
02037 break;
02038 }
02039 RtpsWriter& writer = deliver_on_writer->second;
02040 iter_t to_deliver_iter = writer.to_deliver_.begin();
02041 if (to_deliver_iter == writer.to_deliver_.end()) {
02042 break;
02043 }
02044 tqe_to_deliver = to_deliver_iter->second;
02045 writer.to_deliver_.erase(to_deliver_iter);
02046 g.release();
02047
02048 tqe_to_deliver->data_delivered();
02049 g.acquire();
02050 }
02051 }
02052 }
02053
02054 ACE_Message_Block*
02055 RtpsUdpDataLink::marshal_gaps(const RepoId& writer, const RepoId& reader,
02056 const DisjointSequence& gaps, bool durable)
02057 {
02058 using namespace RTPS;
02059
02060
02061 const SequenceNumber firstMissing = gaps.low(),
02062 base = ++SequenceNumber(gaps.cumulative_ack());
02063 const SequenceNumber_t gapStart = {firstMissing.getHigh(),
02064 firstMissing.getLow()},
02065 gapListBase = {base.getHigh(), base.getLow()};
02066 CORBA::ULong num_bits = 0;
02067 LongSeq8 bitmap;
02068
02069 if (gaps.disjoint()) {
02070 bitmap.length(bitmap_num_longs(base, gaps.high()));
02071 gaps.to_bitmap(bitmap.get_buffer(), bitmap.length(), num_bits);
02072
02073 } else {
02074 bitmap.length(1);
02075 bitmap[0] = 0;
02076 num_bits = 1;
02077 }
02078
02079 GapSubmessage gap = {
02080 {GAP, 1 , 0 },
02081 reader.entityId,
02082 writer.entityId,
02083 gapStart,
02084 {gapListBase, num_bits, bitmap}
02085 };
02086
02087 if (Transport_debug_level > 5) {
02088 const GuidConverter conv(writer);
02089 SequenceRange sr;
02090 sr.first.setValue(gap.gapStart.high, gap.gapStart.low);
02091 SequenceNumber srbase;
02092 srbase.setValue(gap.gapList.bitmapBase.high, gap.gapList.bitmapBase.low);
02093 sr.second = srbase.previous();
02094 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::marshal_gaps "
02095 "GAP with range [%q, %q] from %C\n",
02096 sr.first.getValue(), sr.second.getValue(),
02097 OPENDDS_STRING(conv).c_str()));
02098 }
02099
02100 size_t gap_size = 0, padding = 0;
02101 gen_find_size(gap, gap_size, padding);
02102 gap.smHeader.submessageLength =
02103 static_cast<CORBA::UShort>(gap_size + padding) - SMHDR_SZ;
02104
02105
02106 OPENDDS_VECTOR(RepoId) readers;
02107 if (durable && reader.entityId == ENTITYID_UNKNOWN) {
02108 if (Transport_debug_level > 5) {
02109 const GuidConverter local_conv(writer);
02110 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps local %C "
02111 "durable writer\n", OPENDDS_STRING(local_conv).c_str()));
02112 }
02113 const RtpsWriterMap::iterator iter = writers_.find(writer);
02114 RtpsWriter& rw = iter->second;
02115 for (ReaderInfoMap::iterator ri = rw.remote_readers_.begin();
02116 ri != rw.remote_readers_.end(); ++ri) {
02117 if (!ri->second.expecting_durable_data()) {
02118 readers.push_back(ri->first);
02119 } else if (Transport_debug_level > 5) {
02120 const GuidConverter remote_conv(ri->first);
02121 ACE_DEBUG((LM_DEBUG, "RtpsUdpDataLink::marshal_gaps reader "
02122 "%C is expecting durable data, no GAP sent\n",
02123 OPENDDS_STRING(remote_conv).c_str()));
02124 }
02125 }
02126 if (readers.empty()) return 0;
02127 }
02128
02129 const size_t size_per_idst = INFO_DST_SZ + SMHDR_SZ,
02130 prefix_sz = sizeof(reader.guidPrefix);
02131
02132 const size_t total_sz = readers.empty() ? (gap_size + padding) :
02133 (readers.size() * (gap_size + padding + size_per_idst));
02134
02135 ACE_Message_Block* mb_gap = new ACE_Message_Block(total_sz);
02136
02137
02138 Serializer ser(mb_gap, false, Serializer::ALIGN_CDR);
02139 if (readers.empty()) {
02140 ser << gap;
02141 } else {
02142 InfoDestinationSubmessage idst = {
02143 {INFO_DST, 1 , INFO_DST_SZ},
02144 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02145 };
02146 for (size_t i = 0; i < readers.size(); ++i) {
02147 std::memcpy(idst.guidPrefix, readers[i].guidPrefix, prefix_sz);
02148 gap.readerId = readers[i].entityId;
02149 ser << idst;
02150 ser << gap;
02151 }
02152 }
02153 return mb_gap;
02154 }
02155
02156 void
02157 RtpsUdpDataLink::durability_resend(TransportQueueElement* element)
02158 {
02159 ACE_Message_Block* msg = const_cast<ACE_Message_Block*>(element->msg());
02160 send_strategy_->send_rtps_control(*msg,
02161 get_locator(element->subscription_id()));
02162 }
02163
02164 void
02165 RtpsUdpDataLink::send_durability_gaps(const RepoId& writer,
02166 const RepoId& reader,
02167 const DisjointSequence& gaps)
02168 {
02169 ACE_Message_Block mb(RTPS::INFO_DST_SZ + RTPS::SMHDR_SZ);
02170 Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02171 RTPS::InfoDestinationSubmessage info_dst = {
02172 {RTPS::INFO_DST, 1 , RTPS::INFO_DST_SZ},
02173 {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
02174 };
02175 std::memcpy(info_dst.guidPrefix, reader.guidPrefix, sizeof(GuidPrefix_t));
02176 ser << info_dst;
02177 mb.cont(marshal_gaps(writer, reader, gaps));
02178 send_strategy_->send_rtps_control(mb, get_locator(reader));
02179 mb.cont()->release();
02180 }
02181
02182 void
02183 RtpsUdpDataLink::send_heartbeats()
02184 {
02185 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02186 ACE_GUARD(ACE_Thread_Mutex, c, reader_no_longer_exists_lock_);
02187
02188 if (writers_.empty() && interesting_readers_.empty()) {
02189 heartbeat_.disable();
02190 }
02191
02192 using namespace OpenDDS::RTPS;
02193 OPENDDS_VECTOR(HeartBeatSubmessage) subm;
02194 OPENDDS_SET(ACE_INET_Addr) recipients;
02195 OPENDDS_VECTOR(TransportQueueElement*) pendingCallbacks;
02196 const ACE_Time_Value now = ACE_OS::gettimeofday();
02197
02198 RepoIdSet writers_to_advertise;
02199
02200 const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config_->heartbeat_period_;
02201 const ACE_Time_Value tv3 = ACE_OS::gettimeofday() - 3 * config_->heartbeat_period_;
02202 for (InterestingRemoteMapType::iterator pos = interesting_readers_.begin(),
02203 limit = interesting_readers_.end();
02204 pos != limit;
02205 ++pos) {
02206 if (pos->second.status == InterestingRemote::DOES_NOT_EXIST ||
02207 (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv3)) {
02208 recipients.insert(pos->second.address);
02209 writers_to_advertise.insert(pos->second.localid);
02210 }
02211 if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02212 CallbackType callback(pos->first, pos->second);
02213 readerDoesNotExistCallbacks_.push_back(callback);
02214 pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02215 }
02216 }
02217
02218 typedef RtpsWriterMap::iterator rw_iter;
02219 for (rw_iter rw = writers_.begin(); rw != writers_.end(); ++rw) {
02220 const bool has_data = !rw->second.send_buff_.is_nil()
02221 && !rw->second.send_buff_->empty();
02222 bool final = true, has_durable_data = false;
02223 SequenceNumber durable_max;
02224
02225 typedef ReaderInfoMap::iterator ri_iter;
02226 const ri_iter end = rw->second.remote_readers_.end();
02227 for (ri_iter ri = rw->second.remote_readers_.begin(); ri != end; ++ri) {
02228 if ((has_data || !ri->second.handshake_done_)
02229 && locators_.count(ri->first)) {
02230 recipients.insert(locators_[ri->first].addr_);
02231 if (final && !ri->second.handshake_done_) {
02232 final = false;
02233 }
02234 }
02235 if (!ri->second.durable_data_.empty()) {
02236 const ACE_Time_Value expiration =
02237 ri->second.durable_timestamp_ + config_->durable_data_timeout_;
02238 if (now > expiration) {
02239 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator
02240 dd_iter;
02241 for (dd_iter it = ri->second.durable_data_.begin();
02242 it != ri->second.durable_data_.end(); ++it) {
02243 pendingCallbacks.push_back(it->second);
02244 }
02245 ri->second.durable_data_.clear();
02246 if (Transport_debug_level > 3) {
02247 const GuidConverter gw(rw->first), gr(ri->first);
02248 VDBG_LVL((LM_INFO, "(%P|%t) RtpsUdpDataLink::send_heartbeats - "
02249 "removed expired durable data for %C -> %C\n",
02250 OPENDDS_STRING(gw).c_str(), OPENDDS_STRING(gr).c_str()), 3);
02251 }
02252 } else {
02253 has_durable_data = true;
02254 if (ri->second.durable_data_.rbegin()->first > durable_max) {
02255 durable_max = ri->second.durable_data_.rbegin()->first;
02256 }
02257 if (locators_.count(ri->first)) {
02258 recipients.insert(locators_[ri->first].addr_);
02259 }
02260 }
02261 }
02262 }
02263
02264 if (!rw->second.elems_not_acked_.empty()) {
02265 final = false;
02266 }
02267
02268 if (writers_to_advertise.count(rw->first)) {
02269 final = false;
02270 writers_to_advertise.erase(rw->first);
02271 }
02272
02273 if (final && !has_data && !has_durable_data) {
02274 continue;
02275 }
02276
02277 const SequenceNumber firstSN = (rw->second.durable_ || !has_data)
02278 ? 1 : rw->second.send_buff_->low(),
02279 lastSN = std::max(durable_max,
02280 has_data ? rw->second.send_buff_->high() : 1);
02281
02282 const HeartBeatSubmessage hb = {
02283 {HEARTBEAT,
02284 CORBA::Octet(1 | (final ? 2 : 0)),
02285 HEARTBEAT_SZ},
02286 ENTITYID_UNKNOWN,
02287 rw->first.entityId,
02288 {firstSN.getHigh(), firstSN.getLow()},
02289 {lastSN.getHigh(), lastSN.getLow()},
02290 {++heartbeat_counts_[rw->first]}
02291 };
02292 subm.push_back(hb);
02293 }
02294
02295 for (RepoIdSet::const_iterator pos = writers_to_advertise.begin(),
02296 limit = writers_to_advertise.end();
02297 pos != limit;
02298 ++pos) {
02299 const SequenceNumber SN = 1;
02300 const HeartBeatSubmessage hb = {
02301 {HEARTBEAT,
02302 CORBA::Octet(1 ),
02303 HEARTBEAT_SZ},
02304 ENTITYID_UNKNOWN,
02305 pos->entityId,
02306 {SN.getHigh(), SN.getLow()},
02307 {SN.getHigh(), SN.getLow()},
02308 {++heartbeat_counts_[*pos]}
02309 };
02310 subm.push_back(hb);
02311 }
02312
02313 if (!subm.empty()) {
02314 ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * subm.size());
02315
02316 Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02317 bool send_ok = true;
02318 for (size_t i = 0; i < subm.size(); ++i) {
02319 if (!(ser << subm[i])) {
02320 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats() - "
02321 "failed to serialize HEARTBEAT submessage %B\n", i));
02322 send_ok = false;
02323 break;
02324 }
02325 }
02326 if (send_ok) {
02327 send_strategy_->send_rtps_control(mb, recipients);
02328 }
02329 }
02330 c.release();
02331 g.release();
02332
02333 while(true) {
02334 c.acquire();
02335 if (readerDoesNotExistCallbacks_.empty()) {
02336 break;
02337 }
02338 OPENDDS_VECTOR(CallbackType)::iterator iter = readerDoesNotExistCallbacks_.begin();
02339 const RepoId& rid = iter->first;
02340 const InterestingRemote& remote = iter->second;
02341 readerDoesNotExistCallbacks_.erase(iter);
02342 c.release();
02343 remote.listener->reader_does_not_exist(rid, remote.localid);
02344 }
02345
02346 for (size_t i = 0; i < pendingCallbacks.size(); ++i) {
02347 pendingCallbacks[i]->data_dropped();
02348 }
02349 }
02350
02351 void
02352 RtpsUdpDataLink::check_heartbeats()
02353 {
02354
02355 const ACE_Time_Value tv = ACE_OS::gettimeofday() - 10 * config_->heartbeat_period_;
02356 ACE_GUARD(ACE_Thread_Mutex, g, lock_);
02357 ACE_GUARD(ACE_Thread_Mutex, c, writer_no_longer_exists_lock_);
02358 for (InterestingRemoteMapType::iterator pos = interesting_writers_.begin(), limit = interesting_writers_.end();
02359 pos != limit;
02360 ++pos) {
02361 if (pos->second.status == InterestingRemote::EXISTS && pos->second.last_activity < tv) {
02362 CallbackType callback(pos->first, pos->second);
02363 writerDoesNotExistCallbacks_.push_back(callback);
02364 pos->second.status = InterestingRemote::DOES_NOT_EXIST;
02365 }
02366 }
02367 c.release();
02368 g.release();
02369 while(true) {
02370 c.acquire();
02371 if (writerDoesNotExistCallbacks_.empty()) {
02372 break;
02373 }
02374 OPENDDS_VECTOR(CallbackType)::iterator iter = writerDoesNotExistCallbacks_.begin();
02375 const RepoId& rid = iter->first;
02376 const InterestingRemote& remote = iter->second;
02377 writerDoesNotExistCallbacks_.erase(iter);
02378 c.release();
02379 remote.listener->writer_does_not_exist(rid, remote.localid);
02380 }
02381 }
02382
02383 void
02384 RtpsUdpDataLink::send_heartbeats_manual(const TransportSendControlElement* tsce)
02385 {
02386 using namespace OpenDDS::RTPS;
02387
02388 const RepoId pub_id = tsce->publication_id();
02389
02390
02391 OPENDDS_SET(ACE_INET_Addr) recipients;
02392 get_locators (pub_id, recipients);
02393 if (recipients.empty()) {
02394 return;
02395 }
02396
02397
02398
02399 SequenceNumber firstSN, lastSN;
02400 CORBA::Long counter;
02401 RtpsWriterMap::iterator pos = writers_.find (pub_id);
02402 if (pos != writers_.end ()) {
02403
02404 const bool has_data = !pos->second.send_buff_.is_nil() && !pos->second.send_buff_->empty();
02405 SequenceNumber durable_max;
02406 const ACE_Time_Value now = ACE_OS::gettimeofday();
02407 for (ReaderInfoMap::const_iterator ri = pos->second.remote_readers_.begin(), end = pos->second.remote_readers_.end();
02408 ri != end;
02409 ++ri) {
02410 if (!ri->second.durable_data_.empty()) {
02411 const ACE_Time_Value expiration = ri->second.durable_timestamp_ + config_->durable_data_timeout_;
02412 if (now <= expiration &&
02413 ri->second.durable_data_.rbegin()->first > durable_max) {
02414 durable_max = ri->second.durable_data_.rbegin()->first;
02415 }
02416 }
02417 }
02418 firstSN = (pos->second.durable_ || !has_data) ? 1 : pos->second.send_buff_->low();
02419 lastSN = std::max(durable_max, has_data ? pos->second.send_buff_->high() : 1);
02420 counter = ++heartbeat_counts_[pos->first];
02421 } else {
02422
02423 firstSN = 1;
02424 lastSN = tsce->sequence();
02425 counter = ++this->best_effort_heartbeat_count_;
02426 }
02427
02428 const HeartBeatSubmessage hb = {
02429 {HEARTBEAT,
02430 CORBA::Octet(1 | 2 | 4 ),
02431 HEARTBEAT_SZ},
02432 ENTITYID_UNKNOWN,
02433 pub_id.entityId,
02434 {firstSN.getHigh(), firstSN.getLow()},
02435 {lastSN.getHigh(), lastSN.getLow()},
02436 {counter}
02437 };
02438
02439 ACE_Message_Block mb((HEARTBEAT_SZ + SMHDR_SZ) * 1);
02440
02441 Serializer ser(&mb, false, Serializer::ALIGN_CDR);
02442 if ((ser << hb)) {
02443 send_strategy_->send_rtps_control(mb, recipients);
02444 }
02445 else {
02446 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::send_heartbeats_manual() - "
02447 "failed to serialize HEARTBEAT submessage\n"));
02448 }
02449 }
02450
02451 RtpsUdpDataLink::ReaderInfo::~ReaderInfo()
02452 {
02453 expire_durable_data();
02454 }
02455
02456 void
02457 RtpsUdpDataLink::ReaderInfo::expire_durable_data()
02458 {
02459 typedef OPENDDS_MAP(SequenceNumber, TransportQueueElement*)::iterator iter_t;
02460 for (iter_t it = durable_data_.begin(); it != durable_data_.end(); ++it) {
02461 it->second->data_dropped();
02462 }
02463 }
02464
02465 bool
02466 RtpsUdpDataLink::ReaderInfo::expecting_durable_data() const
02467 {
02468 return durable_ &&
02469 (durable_timestamp_ == ACE_Time_Value::zero
02470 || !durable_data_.empty());
02471 }
02472
02473 RtpsUdpDataLink::RtpsWriter::~RtpsWriter()
02474 {
02475 if (!to_deliver_.empty()) {
02476 ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: RtpsWriter::~RtpsWriter - deleting with %d elements left to deliver\n", to_deliver_.size()));
02477 }
02478 if (!elems_not_acked_.empty()) {
02479 ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: RtpsWriter::~RtpsWriter - deleting with %d elements left not fully acknowledged\n", elems_not_acked_.size()));
02480 }
02481 }
02482
02483 SequenceNumber
02484 RtpsUdpDataLink::RtpsWriter::heartbeat_high(const ReaderInfo& ri) const
02485 {
02486 const SequenceNumber durable_max =
02487 ri.durable_data_.empty() ? 0 : ri.durable_data_.rbegin()->first;
02488 const SequenceNumber data_max =
02489 send_buff_.is_nil() ? 0 : (send_buff_->empty() ? 0 : send_buff_->high());
02490 return std::max(durable_max, data_max);
02491 }
02492
02493 void
02494 RtpsUdpDataLink::RtpsWriter::add_elem_awaiting_ack(TransportQueueElement* element)
02495 {
02496 elems_not_acked_.insert(SnToTqeMap::value_type(element->sequence(), element));
02497 }
02498
02499
02500
02501
02502 void
02503 RtpsUdpDataLink::TimedDelay::schedule()
02504 {
02505 if (!scheduled_) {
02506 const long timer = outer_->get_reactor()->schedule_timer(this, 0, timeout_);
02507
02508 if (timer == -1) {
02509 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::TimedDelay::schedule "
02510 "failed to schedule timer %p\n", ACE_TEXT("")));
02511 } else {
02512 scheduled_ = true;
02513 }
02514 }
02515 }
02516
02517 void
02518 RtpsUdpDataLink::TimedDelay::cancel()
02519 {
02520 if (scheduled_) {
02521 outer_->get_reactor()->cancel_timer(this);
02522 scheduled_ = false;
02523 }
02524 }
02525
02526 void
02527 RtpsUdpDataLink::HeartBeat::enable()
02528 {
02529 if (!enabled_) {
02530 const ACE_Time_Value& per = outer_->config_->heartbeat_period_;
02531 const long timer =
02532 outer_->get_reactor()->schedule_timer(this, 0, ACE_Time_Value::zero, per);
02533
02534 if (timer == -1) {
02535 ACE_DEBUG((LM_ERROR, "(%P|%t) RtpsUdpDataLink::HeartBeat::enable"
02536 " failed to schedule timer %p\n", ACE_TEXT("")));
02537 } else {
02538 enabled_ = true;
02539 }
02540 }
02541 }
02542
02543 void
02544 RtpsUdpDataLink::HeartBeat::disable()
02545 {
02546 if (enabled_) {
02547 outer_->get_reactor()->cancel_timer(this);
02548 enabled_ = false;
02549 }
02550 }
02551
02552 void
02553 RtpsUdpDataLink::send_final_acks (const RepoId& readerid)
02554 {
02555 RtpsReaderMap::iterator rr = readers_.find (readerid);
02556 if (rr != readers_.end ()) {
02557 send_ack_nacks (rr, true);
02558 }
02559 }
02560
02561 }
02562 }