00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DataLink.h"
00010
00011 #include "ReceivedDataSample.h"
00012
00013 #include "TransportImpl.h"
00014 #include "TransportInst.h"
00015 #include "TransportClient.h"
00016
00017 #include "dds/DCPS/DataWriterImpl.h"
00018 #include "dds/DCPS/DataReaderImpl.h"
00019 #include "dds/DCPS/Service_Participant.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00022 #include "dds/DCPS/Util.h"
00023 #include "dds/DCPS/Definitions.h"
00024 #include "dds/DCPS/SafetyProfileStreams.h"
00025
00026 #include "EntryExit.h"
00027 #include "tao/debug.h"
00028 #include "ace/Reactor.h"
00029 #include "ace/SOCK.h"
00030
00031
00032 #if !defined (__ACE_INLINE__)
00033 #include "DataLink.inl"
00034 #endif
00035
00036 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00037
00038 namespace OpenDDS {
00039 namespace DCPS {
00040
00041
00042 DataLink::DataLink(TransportImpl& impl, Priority priority, bool is_loopback,
00043 bool is_active)
00044 : stopped_(false),
00045 scheduled_to_stop_at_(ACE_Time_Value::zero),
00046 impl_(impl),
00047 transport_priority_(priority),
00048 scheduling_release_(false),
00049 is_loopback_(is_loopback),
00050 is_active_(is_active),
00051 started_(false),
00052 send_response_listener_("DataLink")
00053 {
00054 DBG_ENTRY_LVL("DataLink", "DataLink", 6);
00055
00056 datalink_release_delay_.sec(impl.config().datalink_release_delay_ / 1000);
00057 datalink_release_delay_.usec(impl.config().datalink_release_delay_ % 1000 * 1000);
00058
00059 id_ = DataLink::get_next_datalink_id();
00060
00061 if (impl.config().thread_per_connection_) {
00062 this->thr_per_con_send_task_.reset(new ThreadPerConnectionSendTask(this));
00063
00064 if (this->thr_per_con_send_task_->open() == -1) {
00065 ACE_ERROR((LM_ERROR,
00066 ACE_TEXT("(%P|%t) DataLink::DataLink: ")
00067 ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
00068
00069 } else if (DCPS_debug_level > 4) {
00070 ACE_DEBUG((LM_DEBUG,
00071 ACE_TEXT("(%P|%t) DataLink::DataLink - ")
00072 ACE_TEXT("started new thread to send data with.\n")));
00073 }
00074 }
00075
00076
00077 size_t control_chunks = impl.config().datalink_control_chunks_;
00078
00079 this->mb_allocator_.reset(new MessageBlockAllocator(control_chunks));
00080 this->db_allocator_.reset(new DataBlockAllocator(control_chunks));
00081 }
00082
00083 DataLink::~DataLink()
00084 {
00085 DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
00086
00087 if (!assoc_by_local_.empty()) {
00088 ACE_DEBUG((LM_WARNING,
00089 ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
00090 ACE_TEXT("link still in use by %d entities when deleted!\n"),
00091 this, assoc_by_local_.size()));
00092 }
00093
00094 if (this->thr_per_con_send_task_ != 0) {
00095 this->thr_per_con_send_task_->close(1);
00096 }
00097 }
00098
00099 TransportImpl&
00100 DataLink::impl() const
00101 {
00102 return impl_;
00103 }
00104
00105
00106 bool
00107 DataLink::add_on_start_callback(const TransportClient_wrch& client, const RepoId& remote)
00108 {
00109 GuardType guard(strategy_lock_);
00110
00111 if (started_ && !send_strategy_.is_nil()) {
00112 return false;
00113 }
00114 on_start_callbacks_.push_back(std::make_pair(client, remote));
00115 return true;
00116 }
00117
00118 void
00119 DataLink::remove_on_start_callback(const TransportClient_wrch& client, const RepoId& remote)
00120 {
00121 GuardType guard(strategy_lock_);
00122 on_start_callbacks_.erase(
00123 std::remove(on_start_callbacks_.begin(),
00124 on_start_callbacks_.end(),
00125 std::make_pair(client, remote)),
00126 on_start_callbacks_.end());
00127 }
00128
00129
00130 void
00131 DataLink::invoke_on_start_callbacks(bool success)
00132 {
00133 const DataLink_rch link(success ? this : 0, inc_count());
00134
00135 while (true) {
00136 GuardType guard(strategy_lock_);
00137
00138 if (on_start_callbacks_.empty()) {
00139 break;
00140 }
00141
00142 OnStartCallback last_callback = on_start_callbacks_.back();
00143 on_start_callbacks_.pop_back();
00144
00145 guard.release();
00146 TransportClient_rch client = last_callback.first.lock();
00147 if (client)
00148 client->use_datalink(last_callback.second, link);
00149 }
00150 }
00151
00152
00153 int
00154 DataLink::handle_exception(ACE_HANDLE )
00155 {
00156 if(this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00157 if (DCPS_debug_level > 0) {
00158 ACE_DEBUG((LM_DEBUG,
00159 ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
00160 }
00161 ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00162 if (reactor->cancel_timer(this) > 0) {
00163 if (DCPS_debug_level > 0) {
00164 ACE_DEBUG((LM_DEBUG,
00165 ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
00166 }
00167 }
00168 return 0;
00169 } else if (this->scheduled_to_stop_at_ <= ACE_OS::gettimeofday()) {
00170 if (this->scheduling_release_) {
00171 if (DCPS_debug_level > 0) {
00172 ACE_DEBUG((LM_DEBUG,
00173 ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
00174 }
00175 this->handle_timeout(ACE_Time_Value::zero, 0);
00176 return 0;
00177 }
00178 if (DCPS_debug_level > 0) {
00179 ACE_DEBUG((LM_DEBUG,
00180 ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
00181 }
00182 this->stop();
00183 return 0;
00184 } else {
00185 if (DCPS_debug_level > 0) {
00186 ACE_DEBUG((LM_DEBUG,
00187 ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
00188 }
00189 ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00190 ACE_Time_Value future_release_time = this->scheduled_to_stop_at_ - ACE_OS::gettimeofday();
00191 reactor->schedule_timer(this, 0, future_release_time);
00192 }
00193 return 0;
00194 }
00195
00196
00197
00198
00199 void
00200 DataLink::schedule_stop(const ACE_Time_Value& schedule_to_stop_at)
00201 {
00202 if (!this->stopped_ && this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00203 this->scheduled_to_stop_at_ = schedule_to_stop_at;
00204 notify_reactor();
00205
00206 } else {
00207 if (DCPS_debug_level > 0) {
00208 ACE_DEBUG((LM_DEBUG,
00209 ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
00210 }
00211 }
00212 }
00213
00214 void
00215 DataLink::notify_reactor()
00216 {
00217 TransportReactorTask_rch reactor(impl_.reactor_task());
00218 reactor->get_reactor()->notify(this);
00219 }
00220
00221 void
00222 DataLink::stop()
00223 {
00224 this->pre_stop_i();
00225
00226 TransportSendStrategy_rch send_strategy;
00227 TransportStrategy_rch recv_strategy;
00228
00229 {
00230 GuardType guard(this->strategy_lock_);
00231
00232 if (this->stopped_) return;
00233
00234 send_strategy = this->send_strategy_;
00235 this->send_strategy_.reset();
00236
00237 recv_strategy = this->receive_strategy_;
00238 this->receive_strategy_.reset();
00239 }
00240
00241 if (!send_strategy.is_nil()) {
00242 send_strategy->stop();
00243 }
00244
00245 if (!recv_strategy.is_nil()) {
00246 recv_strategy->stop();
00247 }
00248
00249 this->stop_i();
00250 this->stopped_ = true;
00251 this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00252 }
00253
00254 void
00255 DataLink::resume_send()
00256 {
00257 if (!this->send_strategy_->isDirectMode())
00258 this->send_strategy_->resume_send();
00259 }
00260
00261 int
00262 DataLink::make_reservation(const RepoId& remote_subscription_id,
00263 const RepoId& local_publication_id,
00264 const TransportSendListener_wrch& send_listener)
00265 {
00266 DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00267
00268 if (DCPS_debug_level > 9) {
00269 GuidConverter local(local_publication_id), remote(remote_subscription_id);
00270 ACE_DEBUG((LM_DEBUG,
00271 ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00272 ACE_TEXT("creating association local publication %C ")
00273 ACE_TEXT("<--> with remote subscription %C.\n"),
00274 OPENDDS_STRING(local).c_str(),
00275 OPENDDS_STRING(remote).c_str()));
00276 }
00277
00278 {
00279 GuardType guard(strategy_lock_);
00280
00281 if (!send_strategy_.is_nil()) {
00282 send_strategy_->link_released(false);
00283 }
00284 }
00285 {
00286 GuardType guard(pub_sub_maps_lock_);
00287
00288 assoc_by_local_[local_publication_id].insert(remote_subscription_id);
00289 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
00290
00291 if (rls.is_nil())
00292 rls = make_rch<ReceiveListenerSet>();
00293 rls->insert(local_publication_id, TransportReceiveListener_rch());
00294
00295 send_listeners_.insert(std::make_pair(local_publication_id,send_listener));
00296 }
00297 return 0;
00298 }
00299
00300 int
00301 DataLink::make_reservation(const RepoId& remote_publication_id,
00302 const RepoId& local_subscription_id,
00303 const TransportReceiveListener_wrch& receive_listener)
00304 {
00305 DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00306
00307 if (DCPS_debug_level > 9) {
00308 GuidConverter local(local_subscription_id), remote(remote_publication_id);
00309 ACE_DEBUG((LM_DEBUG,
00310 ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00311 ACE_TEXT("creating association local subscription %C ")
00312 ACE_TEXT("<--> with remote publication %C.\n"),
00313 OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str()));
00314 }
00315
00316 {
00317 GuardType guard(strategy_lock_);
00318
00319 if (!send_strategy_.is_nil()) {
00320 send_strategy_->link_released(false);
00321 }
00322 }
00323 {
00324 GuardType guard(pub_sub_maps_lock_);
00325
00326 assoc_by_local_[local_subscription_id].insert(remote_publication_id);
00327 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
00328
00329 if (rls.is_nil())
00330 rls = make_rch<ReceiveListenerSet>();
00331 rls->insert(local_subscription_id, receive_listener);
00332
00333 recv_listeners_.insert(std::make_pair(local_subscription_id,
00334 receive_listener));
00335 }
00336 return 0;
00337 }
00338
00339 template <typename Seq>
00340 void set_to_seq(const RepoIdSet& rids, Seq& seq)
00341 {
00342 seq.length(static_cast<CORBA::ULong>(rids.size()));
00343 CORBA::ULong i = 0;
00344 for (RepoIdSet::const_iterator iter = rids.begin(); iter != rids.end(); ++iter) {
00345 seq[i++] = *iter;
00346 }
00347 }
00348
00349 GUIDSeq*
00350 DataLink::peer_ids(const RepoId& local_id) const
00351 {
00352 GuardType guard(pub_sub_maps_lock_);
00353
00354 const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
00355
00356 if (iter == assoc_by_local_.end())
00357 return 0;
00358
00359 GUIDSeq_var result = new GUIDSeq;
00360 set_to_seq(iter->second, static_cast<GUIDSeq&>(result));
00361 return result._retn();
00362 }
00363
00364
00365
00366
00367
00368
00369
00370
00371 void
00372 DataLink::release_reservations(RepoId remote_id, RepoId local_id,
00373 DataLinkSetMap& released_locals)
00374 {
00375 DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
00376
00377 if (DCPS_debug_level > 9) {
00378 GuidConverter local(local_id);
00379 GuidConverter remote(remote_id);
00380 ACE_DEBUG((LM_DEBUG,
00381 ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
00382 ACE_TEXT("releasing association local: %C ")
00383 ACE_TEXT("<--> with remote %C.\n"),
00384 OPENDDS_STRING(local).c_str(),
00385 OPENDDS_STRING(remote).c_str()));
00386 }
00387
00388
00389
00390
00391
00392
00393
00394 this->release_reservations_i(remote_id, local_id);
00395
00396 bool release_remote_required = false;
00397 {
00398 GuardType guard(this->pub_sub_maps_lock_);
00399
00400 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
00401 if (rls->size() == 1) {
00402 assoc_by_remote_.erase(remote_id);
00403 release_remote_required = true;
00404 } else {
00405 rls->remove(local_id);
00406 }
00407 RepoIdSet& ris = assoc_by_local_[local_id];
00408 if (ris.size() == 1) {
00409 DataLinkSet_rch& links = released_locals[local_id];
00410 if (links.is_nil())
00411 links = make_rch<DataLinkSet>();
00412 links->insert_link(rchandle_from(this));
00413 assoc_by_local_.erase(local_id);
00414 } else {
00415 ris.erase(remote_id);
00416 }
00417
00418 if (assoc_by_local_.empty()) {
00419 VDBG_LVL((LM_DEBUG,
00420 ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
00421 ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
00422
00423 impl_.release_datalink(this);
00424 }
00425 }
00426 if (release_remote_required)
00427 release_remote_i(remote_id);
00428 }
00429
00430 void
00431 DataLink::schedule_delayed_release()
00432 {
00433 DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
00434
00435 VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
00436
00437
00438
00439
00440 if (!this->send_strategy_.is_nil()) {
00441 this->send_strategy_->clear();
00442 }
00443
00444 ACE_Time_Value future_release_time = ACE_OS::gettimeofday() + this->datalink_release_delay_;
00445 this->schedule_stop(future_release_time);
00446 }
00447
00448 bool
00449 DataLink::cancel_release()
00450 {
00451 DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
00452 if (stopped_) {
00453 if (DCPS_debug_level > 0) {
00454 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
00455 }
00456 return false;
00457 }
00458 if (scheduling_release_) {
00459 if (DCPS_debug_level > 0) {
00460 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
00461 }
00462 this->set_scheduling_release(false);
00463 this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00464 notify_reactor();
00465 }
00466 return true;
00467 }
00468
00469 void
00470 DataLink::stop_i()
00471 {
00472 DBG_ENTRY_LVL("DataLink", "stop_i", 6);
00473 }
00474
00475 ACE_Message_Block*
00476 DataLink::create_control(char submessage_id,
00477 DataSampleHeader& header,
00478 Message_Block_Ptr data)
00479 {
00480 DBG_ENTRY_LVL("DataLink", "create_control", 6);
00481
00482 header.byte_order_ = ACE_CDR_BYTE_ORDER;
00483 header.message_id_ = TRANSPORT_CONTROL;
00484 header.submessage_id_ = submessage_id;
00485 header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
00486
00487 ACE_Message_Block* message = 0;
00488 ACE_NEW_MALLOC_RETURN(message,
00489 static_cast<ACE_Message_Block*>(
00490 this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
00491 ACE_Message_Block(header.max_marshaled_size(),
00492 ACE_Message_Block::MB_DATA,
00493 data.release(),
00494 0,
00495 0,
00496 0,
00497 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00498 ACE_Time_Value::zero,
00499 ACE_Time_Value::max_time,
00500 this->db_allocator_.get(),
00501 this->mb_allocator_.get()),
00502 0);
00503
00504 if (!(*message << header)) {
00505 ACE_ERROR((LM_ERROR,
00506 ACE_TEXT("(%P|%t) DataLink::create_control: ")
00507 ACE_TEXT("cannot put header in message\n")));
00508 ACE_DES_FREE(message, this->mb_allocator_->free, ACE_Message_Block);
00509 message = 0;
00510 }
00511
00512 return message;
00513 }
00514
00515 SendControlStatus
00516 DataLink::send_control(const DataSampleHeader& header, Message_Block_Ptr message)
00517 {
00518 DBG_ENTRY_LVL("DataLink", "send_control", 6);
00519
00520 TransportSendControlElement* const elem = new TransportSendControlElement(1,
00521 GUID_UNKNOWN, &send_response_listener_,
00522 header, move(message));
00523
00524 send_response_listener_.track_message();
00525
00526 RepoId senderId(header.publication_id_);
00527 send_start();
00528 send(elem);
00529 send_stop(senderId);
00530
00531 return SEND_CONTROL_OK;
00532 }
00533
00534
00535
00536
00537 int
00538 DataLink::data_received(ReceivedDataSample& sample,
00539 const RepoId& readerId )
00540 {
00541 data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED);
00542 return 0;
00543 }
00544
00545 void
00546 DataLink::data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl)
00547 {
00548 data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED);
00549 }
00550
00551 void
00552 DataLink::data_received_i(ReceivedDataSample& sample,
00553 const RepoId& readerId,
00554 const RepoIdSet& incl_excl,
00555 ReceiveListenerSet::ConstrainReceiveSet constrain)
00556 {
00557 DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
00558
00559 const RepoId& publication_id = sample.header_.publication_id_;
00560
00561
00562
00563
00564 if (DCPS_debug_level > 9) {
00565 const GuidConverter converter(publication_id);
00566 const GuidConverter reader(readerId);
00567 ACE_DEBUG((LM_DEBUG,
00568 ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00569 ACE_TEXT("from publication %C received sample: %C to readerId %C (%C).\n"),
00570 OPENDDS_STRING(converter).c_str(),
00571 to_string(sample.header_).c_str(),
00572 OPENDDS_STRING(reader).c_str(),
00573 constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
00574 }
00575
00576 if (Transport_debug_level > 9) {
00577 const GuidConverter converter(publication_id);
00578 ACE_DEBUG((LM_DEBUG,
00579 ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00580 ACE_TEXT("from publication %C received sample: %C.\n"),
00581 OPENDDS_STRING(converter).c_str(),
00582 to_string(sample.header_).c_str()));
00583 }
00584
00585 ReceiveListenerSet_rch listener_set;
00586 {
00587 GuardType guard(this->pub_sub_maps_lock_);
00588 AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
00589 if (iter != assoc_by_remote_.end())
00590 listener_set = iter->second;
00591
00592 if (listener_set.is_nil()) {
00593 TransportReceiveListener_rch listener = this->default_listener_.lock();
00594 if (listener)
00595 listener->data_received(sample);
00596 return;
00597 }
00598 }
00599
00600 if (listener_set.is_nil()) {
00601
00602 if (Transport_debug_level > 4) {
00603 const GuidConverter converter(publication_id);
00604 ACE_DEBUG((LM_DEBUG,
00605 ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00606 ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
00607 OPENDDS_STRING(converter).c_str()));
00608 }
00609
00610 return;
00611 }
00612
00613 if (readerId != GUID_UNKNOWN) {
00614 listener_set->data_received(sample, readerId);
00615 return;
00616 }
00617
00618 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00619
00620 if (sample.header_.content_filter_
00621 && sample.header_.content_filter_entries_.length()) {
00622 ReceiveListenerSet subset(*listener_set.in());
00623 subset.remove_all(sample.header_.content_filter_entries_);
00624 subset.data_received(sample, incl_excl, constrain);
00625
00626 } else {
00627 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00628
00629 if (DCPS_debug_level > 9) {
00630
00631
00632
00633 OPENDDS_STRING included_ids;
00634 bool first = true;
00635 RepoIdSet::const_iterator iter = incl_excl.begin();
00636 while(iter != incl_excl.end()) {
00637 included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00638 first = false;
00639 ++iter;
00640 }
00641 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n",
00642 constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
00643 }
00644 listener_set->data_received(sample, incl_excl, constrain);
00645 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00646 }
00647
00648 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00649 }
00650
00651
00652 ACE_UINT64
00653 DataLink::get_next_datalink_id()
00654 {
00655 static ACE_UINT64 next_id = 0;
00656 static LockType lock;
00657
00658 ACE_UINT64 id;
00659 {
00660 GuardType guard(lock);
00661 id = next_id++;
00662
00663 if (0 == next_id) {
00664 ACE_ERROR((LM_ERROR,
00665 ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
00666 ACE_TEXT("has rolled over and is reusing ids!\n")));
00667 }
00668 }
00669
00670 return id;
00671 }
00672
00673 void
00674 DataLink::transport_shutdown()
00675 {
00676 DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
00677
00678
00679 this->set_scheduling_release(false);
00680 this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00681
00682 ACE_Reactor_Timer_Interface* reactor = impl_.timer();
00683 reactor->cancel_timer(this);
00684
00685 this->stop();
00686
00687
00688
00689 }
00690
00691 void
00692 DataLink::notify(ConnectionNotice notice)
00693 {
00694 DBG_ENTRY_LVL("DataLink", "notify", 6);
00695
00696 VDBG((LM_DEBUG,
00697 ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
00698 this,
00699 connection_notice_as_str(notice)));
00700
00701 GuardType guard(this->pub_sub_maps_lock_);
00702
00703
00704
00705 for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
00706 itr != send_listeners_.end(); ++itr) {
00707
00708 TransportSendListener_rch tsl = itr->second.lock();
00709
00710 if (tsl) {
00711 if (Transport_debug_level > 0) {
00712 GuidConverter converter(itr->first);
00713 ACE_DEBUG((LM_DEBUG,
00714 ACE_TEXT("(%P|%t) DataLink::notify: ")
00715 ACE_TEXT("notify pub %C %C.\n"),
00716 OPENDDS_STRING(converter).c_str(),
00717 connection_notice_as_str(notice)));
00718 }
00719 AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00720 if (local_it == assoc_by_local_.end()) {
00721 if (Transport_debug_level) {
00722 GuidConverter converter(itr->first);
00723 ACE_DEBUG((LM_DEBUG,
00724 ACE_TEXT("(%P|%t) DataLink::notify: ")
00725 ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
00726 OPENDDS_STRING(converter).c_str(),
00727 connection_notice_as_str(notice)));
00728 }
00729 break;
00730 }
00731 const RepoIdSet& rids = local_it->second;
00732
00733 ReaderIdSeq subids;
00734 set_to_seq(rids, subids);
00735
00736 switch (notice) {
00737 case DISCONNECTED:
00738 tsl->notify_publication_disconnected(subids);
00739 break;
00740
00741 case RECONNECTED:
00742 tsl->notify_publication_reconnected(subids);
00743 break;
00744
00745 case LOST:
00746 tsl->notify_publication_lost(subids);
00747 break;
00748
00749 default:
00750 ACE_ERROR((LM_ERROR,
00751 ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00752 ACE_TEXT("unknown notice to TransportSendListener\n")));
00753 break;
00754 }
00755
00756 } else {
00757 if (Transport_debug_level > 0) {
00758 GuidConverter converter(itr->first);
00759 ACE_DEBUG((LM_DEBUG,
00760 ACE_TEXT("(%P|%t) DataLink::notify: ")
00761 ACE_TEXT("not notify pub %C %C \n"),
00762 OPENDDS_STRING(converter).c_str(),
00763 connection_notice_as_str(notice)));
00764 }
00765 }
00766
00767 }
00768
00769
00770
00771 for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
00772 itr != recv_listeners_.end(); ++itr) {
00773
00774 TransportReceiveListener_rch trl = itr->second.lock();
00775
00776 if (trl) {
00777 if (Transport_debug_level > 0) {
00778 GuidConverter converter(itr->first);
00779 ACE_DEBUG((LM_DEBUG,
00780 ACE_TEXT("(%P|%t) DataLink::notify: ")
00781 ACE_TEXT("notify sub %C %C.\n"),
00782 OPENDDS_STRING(converter).c_str(),
00783 connection_notice_as_str(notice)));
00784 }
00785 AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00786 if (local_it == assoc_by_local_.end()) {
00787 if (Transport_debug_level) {
00788 GuidConverter converter(itr->first);
00789 ACE_DEBUG((LM_DEBUG,
00790 ACE_TEXT("(%P|%t) DataLink::notify: ")
00791 ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
00792 OPENDDS_STRING(converter).c_str(),
00793 connection_notice_as_str(notice)));
00794 }
00795 break;
00796 }
00797 const RepoIdSet& rids = local_it->second;
00798
00799 WriterIdSeq pubids;
00800 set_to_seq(rids, pubids);
00801
00802 switch (notice) {
00803 case DISCONNECTED:
00804 trl->notify_subscription_disconnected(pubids);
00805 break;
00806
00807 case RECONNECTED:
00808 trl->notify_subscription_reconnected(pubids);
00809 break;
00810
00811 case LOST:
00812 trl->notify_subscription_lost(pubids);
00813 break;
00814
00815 default:
00816 ACE_ERROR((LM_ERROR,
00817 ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00818 ACE_TEXT("unknown notice to datareader.\n")));
00819 break;
00820 }
00821
00822 } else {
00823 if (Transport_debug_level > 0) {
00824 GuidConverter converter(itr->first);
00825 ACE_DEBUG((LM_DEBUG,
00826 ACE_TEXT("(%P|%t) DataLink::notify: ")
00827 ACE_TEXT("not notify sub %C subscription lost.\n"),
00828 OPENDDS_STRING(converter).c_str()));
00829 }
00830
00831 }
00832 }
00833 }
00834
00835
00836
00837 void
00838 DataLink::pre_stop_i()
00839 {
00840 if (this->thr_per_con_send_task_ != 0) {
00841 this->thr_per_con_send_task_->close(1);
00842 }
00843 }
00844
00845 bool
00846 DataLink::release_resources()
00847 {
00848 DBG_ENTRY_LVL("DataLink", "release_resources", 6);
00849
00850 this->prepare_release();
00851 return impl_.release_link_resources(this);
00852 }
00853
00854 bool
00855 DataLink::is_target(const RepoId& remote_sub_id)
00856 {
00857 GuardType guard(this->pub_sub_maps_lock_);
00858 return assoc_by_remote_.count(remote_sub_id);
00859 }
00860
00861 GUIDSeq*
00862 DataLink::target_intersection(const RepoId& pub_id, const GUIDSeq& in,
00863 size_t& n_subs)
00864 {
00865 GUIDSeq_var res;
00866 GuardType guard(this->pub_sub_maps_lock_);
00867 AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
00868
00869 if (iter != assoc_by_local_.end()) {
00870 n_subs = iter->second.size();
00871 const CORBA::ULong len = in.length();
00872
00873 for (CORBA::ULong i(0); i < len; ++i) {
00874 if (iter->second.count(in[i])) {
00875 if (res.ptr() == 0) {
00876 res = new GUIDSeq;
00877 }
00878
00879 push_back(res.inout(), in[i]);
00880 }
00881 }
00882 }
00883
00884 return res._retn();
00885 }
00886
00887 void DataLink::prepare_release()
00888 {
00889 GuardType guard(this->pub_sub_maps_lock_);
00890
00891 if (!assoc_releasing_.empty()) {
00892 ACE_ERROR((LM_ERROR,
00893 ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
00894 ACE_TEXT("already prepared for release.\n")));
00895 return;
00896 }
00897
00898 assoc_releasing_ = assoc_by_local_;
00899 }
00900
00901 void DataLink::clear_associations()
00902 {
00903 for (AssocByLocal::iterator iter = assoc_releasing_.begin();
00904 iter != assoc_releasing_.end(); ++iter) {
00905 TransportSendListener_rch tsl = send_listener_for(iter->first);
00906 if (tsl) {
00907 ReaderIdSeq sub_ids;
00908 set_to_seq(iter->second, sub_ids);
00909 tsl->remove_associations(sub_ids, false);
00910 continue;
00911 }
00912 TransportReceiveListener_rch trl = recv_listener_for(iter->first);
00913 if (trl) {
00914 WriterIdSeq pub_ids;
00915 set_to_seq(iter->second, pub_ids);
00916 trl->remove_associations(pub_ids, false);
00917 }
00918 }
00919 assoc_releasing_.clear();
00920 }
00921
00922 int
00923 DataLink::handle_timeout(const ACE_Time_Value& , const void* )
00924 {
00925 if (this->scheduled_to_stop_at_ != ACE_Time_Value::zero) {
00926 VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
00927 impl_.unbind_link(this);
00928
00929 if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
00930 this->stop();
00931 }
00932 }
00933 return 0;
00934 }
00935
00936 int
00937 DataLink::handle_close(ACE_HANDLE h, ACE_Reactor_Mask m)
00938 {
00939 if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
00940
00941
00942 handle_timeout(ACE_Time_Value::zero, 0);
00943 }
00944
00945 return 0;
00946 }
00947
00948 void
00949 DataLink::set_dscp_codepoint(int cp, ACE_SOCK& socket)
00950 {
00951
00952
00953
00954
00955 int result = 0;
00956
00957
00958 int tos = cp << 2;
00959
00960 const char* which = "IPV4 TOS";
00961 #if defined (ACE_HAS_IPV6)
00962 ACE_INET_Addr local_address;
00963
00964 if (socket.get_local_addr(local_address) == -1) {
00965 return;
00966
00967 } else if (local_address.get_type() == AF_INET6)
00968 #if !defined (IPV6_TCLASS)
00969 {
00970 if (DCPS_debug_level > 0) {
00971 ACE_ERROR((LM_ERROR,
00972 ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
00973 ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
00974 cp));
00975 }
00976
00977 return;
00978 }
00979
00980 #else
00981 {
00982 which = "IPV6 TCLASS";
00983 result = socket.set_option(
00984 IPPROTO_IPV6,
00985 IPV6_TCLASS,
00986 &tos,
00987 sizeof(tos));
00988
00989 } else
00990
00991 #endif
00992 #endif
00993
00994 #ifdef IP_TOS
00995 result = socket.set_option(
00996 IPPROTO_IP,
00997 IP_TOS,
00998 &tos,
00999 sizeof(tos));
01000
01001 if ((result == -1) && (errno != ENOTSUP)
01002 #ifdef WSAEINVAL
01003 && (errno != WSAEINVAL)
01004 #endif
01005 ) {
01006 #endif // IP_TOS
01007 ACE_DEBUG((LM_DEBUG,
01008 ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01009 ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
01010 ACE_TEXT("try running as superuser.\n"),
01011 which,
01012 cp));
01013 #ifdef IP_TOS
01014 } else if (DCPS_debug_level > 4) {
01015 ACE_DEBUG((LM_DEBUG,
01016 ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01017 ACE_TEXT("set %C codepoint to %d.\n"),
01018 which,
01019 cp));
01020 }
01021 #endif
01022 }
01023
01024 bool
01025 DataLink::handle_send_request_ack(TransportQueueElement* element)
01026 {
01027 element->data_delivered();
01028 return true;
01029 }
01030
01031 #ifndef OPENDDS_SAFETY_PROFILE
01032 std::ostream&
01033 operator<<(std::ostream& str, const DataLink& value)
01034 {
01035 str << " There are " << value.assoc_by_local_.size()
01036 << " local entities currently using this link comprising following associations:"
01037 << std::endl;
01038
01039 for (DataLink::AssocByLocal::const_iterator
01040 localId = value.assoc_by_local_.begin();
01041 localId != value.assoc_by_local_.end();
01042 ++localId) {
01043 for (RepoIdSet::const_iterator
01044 remoteId = localId->second.begin();
01045 remoteId != localId->second.end();
01046 ++remoteId) {
01047 str << GuidConverter(localId->first) << " --> "
01048 << GuidConverter(*remoteId) << " " << std::endl;
01049 }
01050 }
01051 return str;
01052 }
01053 #endif
01054 }
01055 }
01056
01057 OPENDDS_END_VERSIONED_NAMESPACE_DECL