00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "DataLink.h"
00010
00011 #include "ReceivedDataSample.h"
00012
00013 #include "TransportImpl.h"
00014 #include "TransportInst.h"
00015 #include "TransportClient.h"
00016
00017 #include "dds/DCPS/DataWriterImpl.h"
00018 #include "dds/DCPS/DataReaderImpl.h"
00019 #include "dds/DCPS/Service_Participant.h"
00020 #include "dds/DCPS/GuidConverter.h"
00021 #include "dds/DdsDcpsGuidTypeSupportImpl.h"
00022 #include "dds/DCPS/Util.h"
00023 #include "dds/DCPS/Definitions.h"
00024 #include "dds/DCPS/SafetyProfileStreams.h"
00025
00026 #include "EntryExit.h"
00027 #include "tao/ORB_Core.h"
00028 #include "tao/debug.h"
00029 #include "ace/Reactor.h"
00030 #include "ace/SOCK.h"
00031
00032
00033 #if !defined (__ACE_INLINE__)
00034 #include "DataLink.inl"
00035 #endif
00036
00037 namespace OpenDDS {
00038 namespace DCPS {
00039
00040
00041 DataLink::DataLink(TransportImpl* impl, Priority priority, bool is_loopback,
00042 bool is_active)
00043 : stopped_(false),
00044 scheduled_to_stop_at_(ACE_Time_Value::zero),
00045 default_listener_(0),
00046 thr_per_con_send_task_(0),
00047 transport_priority_(priority),
00048 scheduling_release_(false),
00049 send_control_allocator_(0),
00050 mb_allocator_(0),
00051 db_allocator_(0),
00052 is_loopback_(is_loopback),
00053 is_active_(is_active),
00054 started_(false),
00055 send_response_listener_("DataLink")
00056 {
00057 DBG_ENTRY_LVL("DataLink", "DataLink", 6);
00058
00059 impl->_add_ref();
00060 this->impl_ = impl;
00061
00062 datalink_release_delay_.sec(this->impl_->config_->datalink_release_delay_ / 1000);
00063 datalink_release_delay_.usec(this->impl_->config_->datalink_release_delay_ % 1000 * 1000);
00064
00065 id_ = DataLink::get_next_datalink_id();
00066
00067 if (this->impl_->config_->thread_per_connection_) {
00068 this->thr_per_con_send_task_ = new ThreadPerConnectionSendTask(this);
00069
00070 if (this->thr_per_con_send_task_->open() == -1) {
00071 ACE_ERROR((LM_ERROR,
00072 ACE_TEXT("(%P|%t) DataLink::DataLink: ")
00073 ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
00074
00075 } else if (DCPS_debug_level > 4) {
00076 ACE_DEBUG((LM_DEBUG,
00077 ACE_TEXT("(%P|%t) DataLink::DataLink - ")
00078 ACE_TEXT("started new thread to send data with.\n")));
00079 }
00080 }
00081
00082
00083 size_t control_chunks = this->impl_->config_->datalink_control_chunks_;
00084
00085 this->send_control_allocator_ =
00086 new TransportSendControlElementAllocator(control_chunks);
00087
00088 this->mb_allocator_ = new MessageBlockAllocator(control_chunks);
00089 this->db_allocator_ = new DataBlockAllocator(control_chunks);
00090 }
00091
00092 DataLink::~DataLink()
00093 {
00094 DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
00095
00096 if (!assoc_by_local_.empty()) {
00097 ACE_DEBUG((LM_WARNING,
00098 ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
00099 ACE_TEXT("link still in use by %d entities when deleted!\n"),
00100 this, assoc_by_local_.size()));
00101 }
00102
00103 delete this->db_allocator_;
00104 delete this->mb_allocator_;
00105
00106 delete this->send_control_allocator_;
00107
00108 if (this->thr_per_con_send_task_ != 0) {
00109 this->thr_per_con_send_task_->close(1);
00110 delete this->thr_per_con_send_task_;
00111 }
00112 }
00113
00114 TransportImpl_rch
00115 DataLink::impl() const
00116 {
00117 return impl_;
00118 }
00119
00120 void
00121 DataLink::invoke_on_start_callbacks(bool success)
00122 {
00123 const DataLink_rch link(success ? this : 0, false);
00124
00125 while (true) {
00126 GuardType guard(strategy_lock_);
00127
00128 if (on_start_callbacks_.empty()) {
00129 break;
00130 }
00131
00132 OnStartCallback last_callback = on_start_callbacks_.back();
00133 on_start_callbacks_.pop_back();
00134
00135 guard.release();
00136 last_callback.first->use_datalink(last_callback.second, link);
00137 }
00138 }
00139
00140
00141 int
00142 DataLink::handle_exception(ACE_HANDLE )
00143 {
00144 if(this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00145 if (DCPS_debug_level > 0) {
00146 ACE_DEBUG((LM_DEBUG,
00147 ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
00148 }
00149 if (this->impl_ != 0) {
00150 ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00151 if (reactor->cancel_timer(this) > 0) {
00152 if (DCPS_debug_level > 0) {
00153 ACE_DEBUG((LM_DEBUG,
00154 ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
00155 }
00156 }
00157 } else {
00158 if (DCPS_debug_level > 0) {
00159 ACE_DEBUG((LM_DEBUG,
00160 ACE_TEXT("(%P|%t) DataLink::handle_exception() - impl_ == 0\n")));
00161 }
00162 }
00163 this->_remove_ref();
00164 return 0;
00165 } else if (this->scheduled_to_stop_at_ <= ACE_OS::gettimeofday()) {
00166 if (this->scheduling_release_) {
00167 if (DCPS_debug_level > 0) {
00168 ACE_DEBUG((LM_DEBUG,
00169 ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
00170 }
00171 this->handle_timeout(ACE_Time_Value::zero, 0);
00172 return 0;
00173 }
00174 if (DCPS_debug_level > 0) {
00175 ACE_DEBUG((LM_DEBUG,
00176 ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
00177 }
00178 this->stop();
00179 this->_remove_ref();
00180 return 0;
00181 } else {
00182 if (DCPS_debug_level > 0) {
00183 ACE_DEBUG((LM_DEBUG,
00184 ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
00185 }
00186 this->_add_ref();
00187 ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00188 ACE_Time_Value future_release_time = this->scheduled_to_stop_at_ - ACE_OS::gettimeofday();
00189 reactor->schedule_timer(this, 0, future_release_time);
00190 }
00191 return 0;
00192 }
00193
00194
00195
00196
00197 void
00198 DataLink::schedule_stop(ACE_Time_Value& schedule_to_stop_at)
00199 {
00200 if (!this->stopped_ && this->scheduled_to_stop_at_ == ACE_Time_Value::zero) {
00201
00202
00203 this->_add_ref();
00204 this->scheduled_to_stop_at_ = schedule_to_stop_at;
00205 TransportReactorTask_rch reactor(this->impl_->reactor_task());
00206 reactor->get_reactor()->notify(this);
00207
00208 } else {
00209 if (DCPS_debug_level > 0) {
00210 ACE_DEBUG((LM_DEBUG,
00211 ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
00212 }
00213 }
00214 }
00215
00216 void
00217 DataLink::stop()
00218 {
00219 this->pre_stop_i();
00220
00221 TransportSendStrategy_rch send_strategy;
00222 TransportStrategy_rch recv_strategy;
00223
00224 {
00225 GuardType guard(this->strategy_lock_);
00226
00227 if (this->stopped_) return;
00228
00229 send_strategy = this->send_strategy_;
00230 this->send_strategy_ = 0;
00231
00232 recv_strategy = this->receive_strategy_;
00233 this->receive_strategy_ = 0;
00234 }
00235
00236 if (!send_strategy.is_nil()) {
00237 send_strategy->stop();
00238 }
00239
00240 if (!recv_strategy.is_nil()) {
00241 recv_strategy->stop();
00242 }
00243
00244 this->stop_i();
00245 this->stopped_ = true;
00246 this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00247 }
00248
00249 void
00250 DataLink::resume_send()
00251 {
00252 if (!this->send_strategy_->isDirectMode())
00253 this->send_strategy_->resume_send();
00254 }
00255
00256 int
00257 DataLink::make_reservation(const RepoId& remote_subscription_id,
00258 const RepoId& local_publication_id,
00259 TransportSendListener* send_listener)
00260 {
00261 DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00262
00263 if (DCPS_debug_level > 9) {
00264 GuidConverter local(local_publication_id), remote(remote_subscription_id);
00265 ACE_DEBUG((LM_DEBUG,
00266 ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00267 ACE_TEXT("creating association local publication %C ")
00268 ACE_TEXT("<--> with remote subscription %C.\n"),
00269 OPENDDS_STRING(local).c_str(),
00270 OPENDDS_STRING(remote).c_str()));
00271 }
00272
00273 {
00274 GuardType guard(strategy_lock_);
00275
00276 if (!send_strategy_.is_nil()) {
00277 send_strategy_->link_released(false);
00278 }
00279 }
00280 {
00281 GuardType guard(pub_sub_maps_lock_);
00282
00283 assoc_by_local_[local_publication_id].insert(remote_subscription_id);
00284 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
00285
00286 if (rls.is_nil())
00287 rls = new ReceiveListenerSet;
00288 rls->insert(local_publication_id, 0);
00289
00290 send_listeners_[local_publication_id] = send_listener;
00291 }
00292 return 0;
00293 }
00294
00295 int
00296 DataLink::make_reservation(const RepoId& remote_publication_id,
00297 const RepoId& local_subscription_id,
00298 TransportReceiveListener* receive_listener)
00299 {
00300 DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
00301
00302 if (DCPS_debug_level > 9) {
00303 GuidConverter local(local_subscription_id), remote(remote_publication_id);
00304 ACE_DEBUG((LM_DEBUG,
00305 ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
00306 ACE_TEXT("creating association local subscription %C ")
00307 ACE_TEXT("<--> with remote publication %C.\n"),
00308 OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str()));
00309 }
00310
00311 {
00312 GuardType guard(strategy_lock_);
00313
00314 if (!send_strategy_.is_nil()) {
00315 send_strategy_->link_released(false);
00316 }
00317 }
00318 {
00319 GuardType guard(pub_sub_maps_lock_);
00320
00321 assoc_by_local_[local_subscription_id].insert(remote_publication_id);
00322 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
00323
00324 if (rls.is_nil())
00325 rls = new ReceiveListenerSet;
00326 rls->insert(local_subscription_id, receive_listener);
00327
00328 recv_listeners_[local_subscription_id] = receive_listener;
00329 }
00330 return 0;
00331 }
00332
00333 template <typename Seq>
00334 void set_to_seq(const RepoIdSet& rids, Seq& seq)
00335 {
00336 seq.length(static_cast<CORBA::ULong>(rids.size()));
00337 CORBA::ULong i = 0;
00338 for (RepoIdSet::const_iterator iter = rids.begin(); iter != rids.end(); ++iter) {
00339 seq[i++] = *iter;
00340 }
00341 }
00342
00343 GUIDSeq*
00344 DataLink::peer_ids(const RepoId& local_id) const
00345 {
00346 GuardType guard(pub_sub_maps_lock_);
00347
00348 const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
00349
00350 if (iter == assoc_by_local_.end())
00351 return 0;
00352
00353 GUIDSeq_var result = new GUIDSeq;
00354 set_to_seq(iter->second, static_cast<GUIDSeq&>(result));
00355 return result._retn();
00356 }
00357
00358
00359
00360
00361
00362
00363
00364
00365 void
00366 DataLink::release_reservations(RepoId remote_id, RepoId local_id,
00367 DataLinkSetMap& released_locals)
00368 {
00369 DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
00370
00371 if (DCPS_debug_level > 9) {
00372 GuidConverter local(local_id);
00373 GuidConverter remote(remote_id);
00374 ACE_DEBUG((LM_DEBUG,
00375 ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
00376 ACE_TEXT("releasing association local: %C ")
00377 ACE_TEXT("<--> with remote %C.\n"),
00378 OPENDDS_STRING(local).c_str(),
00379 OPENDDS_STRING(remote).c_str()));
00380 }
00381
00382
00383
00384
00385
00386
00387
00388 this->release_reservations_i(remote_id, local_id);
00389
00390 GuardType guard(this->pub_sub_maps_lock_);
00391
00392 ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
00393 if (rls->size() == 1) {
00394 assoc_by_remote_.erase(remote_id);
00395 release_remote_i(remote_id);
00396 } else {
00397 rls->remove(local_id);
00398 }
00399 RepoIdSet& ris = assoc_by_local_[local_id];
00400 if (ris.size() == 1) {
00401 DataLinkSet_rch& links = released_locals[local_id];
00402 if (links.is_nil())
00403 links = new DataLinkSet;
00404 links->insert_link(this);
00405 {
00406 GuardType guard(this->released_assoc_by_local_lock_);
00407 released_assoc_by_local_[local_id].insert(remote_id);
00408 }
00409 assoc_by_local_.erase(local_id);
00410 } else {
00411 ris.erase(remote_id);
00412 }
00413
00414 if (assoc_by_local_.empty()) {
00415 VDBG_LVL((LM_DEBUG,
00416 ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
00417 ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
00418 this->impl_->release_datalink(this);
00419 }
00420 }
00421
00422 void
00423 DataLink::schedule_delayed_release()
00424 {
00425 DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
00426
00427 VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
00428
00429
00430
00431
00432 if (!this->send_strategy_.is_nil()) {
00433 this->send_strategy_->clear();
00434 }
00435
00436 ACE_Time_Value future_release_time = ACE_OS::gettimeofday() + this->datalink_release_delay_;
00437 this->schedule_stop(future_release_time);
00438 }
00439
00440 bool
00441 DataLink::cancel_release()
00442 {
00443 DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
00444 if (stopped_) {
00445 if (DCPS_debug_level > 0) {
00446 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
00447 }
00448 return false;
00449 }
00450 if (scheduling_release_) {
00451 if (DCPS_debug_level > 0) {
00452 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
00453 }
00454 this->set_scheduling_release(false);
00455 this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00456 TransportReactorTask_rch reactor(this->impl_->reactor_task());
00457 reactor->get_reactor()->notify(this);
00458 }
00459 return true;
00460 }
00461
00462 void
00463 DataLink::stop_i()
00464 {
00465 DBG_ENTRY_LVL("DataLink", "stop_i", 6);
00466 }
00467
00468 ACE_Message_Block*
00469 DataLink::create_control(char submessage_id,
00470 DataSampleHeader& header,
00471 ACE_Message_Block* data)
00472 {
00473 DBG_ENTRY_LVL("DataLink", "create_control", 6);
00474
00475 header.byte_order_ = ACE_CDR_BYTE_ORDER;
00476 header.message_id_ = TRANSPORT_CONTROL;
00477 header.submessage_id_ = submessage_id;
00478 header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
00479
00480 ACE_Message_Block* message;
00481 ACE_NEW_MALLOC_RETURN(message,
00482 static_cast<ACE_Message_Block*>(
00483 this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
00484 ACE_Message_Block(header.max_marshaled_size(),
00485 ACE_Message_Block::MB_DATA,
00486 data,
00487 0,
00488 0,
00489 0,
00490 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00491 ACE_Time_Value::zero,
00492 ACE_Time_Value::max_time,
00493 this->db_allocator_,
00494 this->mb_allocator_),
00495 0);
00496
00497 *message << header;
00498
00499 return message;
00500 }
00501
00502 SendControlStatus
00503 DataLink::send_control(const DataSampleHeader& header, ACE_Message_Block* message)
00504 {
00505 DBG_ENTRY_LVL("DataLink", "send_control", 6);
00506
00507 TransportSendControlElement* const elem =
00508 TransportSendControlElement::alloc(1,
00509 GUID_UNKNOWN, &send_response_listener_,
00510 header, message, send_control_allocator_);
00511 if (!elem) return SEND_CONTROL_ERROR;
00512
00513 send_response_listener_.track_message();
00514
00515 RepoId senderId(header.publication_id_);
00516 send_start();
00517 send(elem);
00518 send_stop(senderId);
00519
00520 return SEND_CONTROL_OK;
00521 }
00522
00523
00524
00525
00526 int
00527 DataLink::data_received(ReceivedDataSample& sample,
00528 const RepoId& readerId )
00529 {
00530 data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED);
00531 return 0;
00532 }
00533
00534 void
00535 DataLink::data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl)
00536 {
00537 data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED);
00538 }
00539
00540 void
00541 DataLink::data_received_i(ReceivedDataSample& sample,
00542 const RepoId& readerId,
00543 const RepoIdSet& incl_excl,
00544 ReceiveListenerSet::ConstrainReceiveSet constrain)
00545 {
00546 DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
00547
00548 const RepoId& publication_id = sample.header_.publication_id_;
00549
00550
00551
00552
00553 if (DCPS_debug_level > 9) {
00554 const GuidConverter converter(publication_id);
00555 const GuidConverter reader(readerId);
00556 ACE_DEBUG((LM_DEBUG,
00557 ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00558 ACE_TEXT("from publication %C received sample: %C to readerId %C (%s).\n"),
00559 OPENDDS_STRING(converter).c_str(),
00560 to_string(sample.header_).c_str(),
00561 OPENDDS_STRING(reader).c_str(),
00562 constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
00563 }
00564
00565 if (Transport_debug_level > 9) {
00566 const GuidConverter converter(publication_id);
00567 ACE_DEBUG((LM_DEBUG,
00568 ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00569 ACE_TEXT("from publication %C received sample: %C.\n"),
00570 OPENDDS_STRING(converter).c_str(),
00571 to_string(sample.header_).c_str()));
00572 }
00573
00574 ReceiveListenerSet_rch listener_set;
00575 {
00576 GuardType guard(this->pub_sub_maps_lock_);
00577 AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
00578 if (iter != assoc_by_remote_.end())
00579 listener_set = iter->second;
00580
00581 if (listener_set.is_nil() && this->default_listener_) {
00582 this->default_listener_->data_received(sample);
00583 return;
00584 }
00585 }
00586
00587 if (listener_set.is_nil()) {
00588
00589 if (Transport_debug_level > 4) {
00590 const GuidConverter converter(publication_id);
00591 ACE_DEBUG((LM_DEBUG,
00592 ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
00593 ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
00594 OPENDDS_STRING(converter).c_str()));
00595 }
00596
00597 return;
00598 }
00599
00600 if (readerId != GUID_UNKNOWN) {
00601 listener_set->data_received(sample, readerId);
00602 return;
00603 }
00604
00605 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00606
00607 if (sample.header_.content_filter_
00608 && sample.header_.content_filter_entries_.length()) {
00609 ReceiveListenerSet subset(*listener_set.in());
00610 subset.remove_all(sample.header_.content_filter_entries_);
00611 subset.data_received(sample, incl_excl, constrain);
00612
00613 } else {
00614 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00615
00616 if (DCPS_debug_level > 9) {
00617
00618
00619
00620 OPENDDS_STRING included_ids;
00621 bool first = true;
00622 RepoIdSet::const_iterator iter = incl_excl.begin();
00623 while(iter != incl_excl.end()) {
00624 included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
00625 first = false;
00626 ++iter;
00627 }
00628 ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %s ids:%C\n",
00629 constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
00630 }
00631 listener_set->data_received(sample, incl_excl, constrain);
00632 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00633 }
00634
00635 #endif // OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00636 }
00637
00638
00639 ACE_UINT64
00640 DataLink::get_next_datalink_id()
00641 {
00642 static ACE_UINT64 next_id = 0;
00643 static LockType lock;
00644
00645 ACE_UINT64 id;
00646 {
00647 GuardType guard(lock);
00648 id = next_id++;
00649
00650 if (0 == next_id) {
00651 ACE_ERROR((LM_ERROR,
00652 ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
00653 ACE_TEXT("has rolled over and is reusing ids!\n")));
00654 }
00655 }
00656
00657 return id;
00658 }
00659
00660 void
00661 DataLink::transport_shutdown()
00662 {
00663 DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
00664
00665 if (!this->send_strategy_.is_nil()) {
00666 this->send_strategy_->transport_shutdown();
00667 }
00668
00669
00670 this->set_scheduling_release(false);
00671 this->scheduled_to_stop_at_ = ACE_Time_Value::zero;
00672 ACE_Reactor_Timer_Interface* reactor = this->impl_->timer();
00673 reactor->cancel_timer(this);
00674
00675 this->stop();
00676
00677
00678 this->impl_ = 0;
00679 }
00680
00681 void
00682 DataLink::notify(ConnectionNotice notice)
00683 {
00684 DBG_ENTRY_LVL("DataLink", "notify", 6);
00685
00686 VDBG((LM_DEBUG,
00687 ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
00688 this,
00689 connection_notice_as_str(notice)));
00690
00691 GuardType guard(this->pub_sub_maps_lock_);
00692
00693
00694
00695 for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
00696 itr != send_listeners_.end(); ++itr) {
00697
00698 TransportSendListener* tsl = itr->second;
00699
00700 if (tsl != 0) {
00701 if (Transport_debug_level > 0) {
00702 GuidConverter converter(itr->first);
00703 ACE_DEBUG((LM_DEBUG,
00704 ACE_TEXT("(%P|%t) DataLink::notify: ")
00705 ACE_TEXT("notify pub %C %C.\n"),
00706 OPENDDS_STRING(converter).c_str(),
00707 connection_notice_as_str(notice)));
00708 }
00709 AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00710 if (local_it == assoc_by_local_.end()) {
00711 if (Transport_debug_level) {
00712 GuidConverter converter(itr->first);
00713 ACE_DEBUG((LM_DEBUG,
00714 ACE_TEXT("(%P|%t) DataLink::notify: ")
00715 ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
00716 OPENDDS_STRING(converter).c_str(),
00717 connection_notice_as_str(notice)));
00718 }
00719 break;
00720 }
00721 const RepoIdSet& rids = local_it->second;
00722
00723 ReaderIdSeq subids;
00724 set_to_seq(rids, subids);
00725
00726 switch (notice) {
00727 case DISCONNECTED:
00728 tsl->notify_publication_disconnected(subids);
00729 break;
00730
00731 case RECONNECTED:
00732 tsl->notify_publication_reconnected(subids);
00733 break;
00734
00735 case LOST:
00736 tsl->notify_publication_lost(subids);
00737 break;
00738
00739 default:
00740 ACE_ERROR((LM_ERROR,
00741 ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00742 ACE_TEXT("unknown notice to TransportSendListener\n")));
00743 break;
00744 }
00745
00746 } else {
00747 if (Transport_debug_level > 0) {
00748 GuidConverter converter(itr->first);
00749 ACE_DEBUG((LM_DEBUG,
00750 ACE_TEXT("(%P|%t) DataLink::notify: ")
00751 ACE_TEXT("not notify pub %C %C \n"),
00752 OPENDDS_STRING(converter).c_str(),
00753 connection_notice_as_str(notice)));
00754 }
00755 }
00756
00757 }
00758
00759
00760
00761 for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
00762 itr != recv_listeners_.end(); ++itr) {
00763
00764 TransportReceiveListener* trl = itr->second;
00765
00766 if (trl != 0) {
00767 if (Transport_debug_level > 0) {
00768 GuidConverter converter(itr->first);
00769 ACE_DEBUG((LM_DEBUG,
00770 ACE_TEXT("(%P|%t) DataLink::notify: ")
00771 ACE_TEXT("notify sub %C %C.\n"),
00772 OPENDDS_STRING(converter).c_str(),
00773 connection_notice_as_str(notice)));
00774 }
00775 AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
00776 if (local_it == assoc_by_local_.end()) {
00777 if (Transport_debug_level) {
00778 GuidConverter converter(itr->first);
00779 ACE_DEBUG((LM_DEBUG,
00780 ACE_TEXT("(%P|%t) DataLink::notify: ")
00781 ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
00782 OPENDDS_STRING(converter).c_str(),
00783 connection_notice_as_str(notice)));
00784 }
00785 break;
00786 }
00787 const RepoIdSet& rids = local_it->second;
00788
00789 WriterIdSeq pubids;
00790 set_to_seq(rids, pubids);
00791
00792 switch (notice) {
00793 case DISCONNECTED:
00794 trl->notify_subscription_disconnected(pubids);
00795 break;
00796
00797 case RECONNECTED:
00798 trl->notify_subscription_reconnected(pubids);
00799 break;
00800
00801 case LOST:
00802 trl->notify_subscription_lost(pubids);
00803 break;
00804
00805 default:
00806 ACE_ERROR((LM_ERROR,
00807 ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
00808 ACE_TEXT("unknown notice to datareader.\n")));
00809 break;
00810 }
00811
00812 } else {
00813 if (Transport_debug_level > 0) {
00814 GuidConverter converter(itr->first);
00815 ACE_DEBUG((LM_DEBUG,
00816 ACE_TEXT("(%P|%t) DataLink::notify: ")
00817 ACE_TEXT("not notify sub %C subscription lost.\n"),
00818 OPENDDS_STRING(converter).c_str()));
00819 }
00820
00821 }
00822 }
00823 }
00824
00825 void DataLink::notify_connection_deleted()
00826 {
00827 GuardType guard(this->released_assoc_by_local_lock_);
00828 for (AssocByLocal::iterator iter = released_assoc_by_local_.begin();
00829 iter != released_assoc_by_local_.end(); ++iter) {
00830 TransportSendListener* const tsl = send_listener_for(iter->first);
00831 if (tsl) {
00832 for (RepoIdSet::iterator ris_it = iter->second.begin();
00833 ris_it != iter->second.end(); ++ris_it) {
00834 tsl->notify_connection_deleted(*ris_it);
00835 }
00836 iter->second.clear();
00837 continue;
00838 }
00839 TransportReceiveListener* const trl = recv_listener_for(iter->first);
00840 if (trl) {
00841 for (RepoIdSet::iterator ris_it = iter->second.begin();
00842 ris_it != iter->second.end(); ++ris_it) {
00843 trl->notify_connection_deleted(*ris_it);
00844 }
00845 iter->second.clear();
00846 }
00847 }
00848 }
00849
00850 void
00851 DataLink::pre_stop_i()
00852 {
00853 if (this->thr_per_con_send_task_ != 0) {
00854 this->thr_per_con_send_task_->close(1);
00855 }
00856 }
00857
00858 bool
00859 DataLink::release_resources()
00860 {
00861 DBG_ENTRY_LVL("DataLink", "release_resources", 6);
00862
00863 this->prepare_release();
00864
00865 return impl_->release_link_resources(this);
00866 }
00867
00868 bool
00869 DataLink::is_target(const RepoId& remote_sub_id)
00870 {
00871 GuardType guard(this->pub_sub_maps_lock_);
00872 return assoc_by_remote_.count(remote_sub_id);
00873 }
00874
00875 GUIDSeq*
00876 DataLink::target_intersection(const RepoId& pub_id, const GUIDSeq& in,
00877 size_t& n_subs)
00878 {
00879 GUIDSeq_var res;
00880 GuardType guard(this->pub_sub_maps_lock_);
00881 AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
00882
00883 if (iter != assoc_by_local_.end()) {
00884 n_subs = iter->second.size();
00885 const CORBA::ULong len = in.length();
00886
00887 for (CORBA::ULong i(0); i < len; ++i) {
00888 if (iter->second.count(in[i])) {
00889 if (res.ptr() == 0) {
00890 res = new GUIDSeq;
00891 }
00892
00893 push_back(res.inout(), in[i]);
00894 }
00895 }
00896 }
00897
00898 return res._retn();
00899 }
00900
00901 void DataLink::prepare_release()
00902 {
00903 GuardType guard(this->pub_sub_maps_lock_);
00904
00905 if (!assoc_releasing_.empty()) {
00906 ACE_ERROR((LM_ERROR,
00907 ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
00908 ACE_TEXT("already prepared for release.\n")));
00909 return;
00910 }
00911
00912 assoc_releasing_ = assoc_by_local_;
00913 }
00914
00915 void DataLink::clear_associations()
00916 {
00917 for (AssocByLocal::iterator iter = assoc_releasing_.begin();
00918 iter != assoc_releasing_.end(); ++iter) {
00919 TransportSendListener* const tsl = send_listener_for(iter->first);
00920 if (tsl) {
00921 ReaderIdSeq sub_ids;
00922 set_to_seq(iter->second, sub_ids);
00923 tsl->remove_associations(sub_ids, false);
00924 continue;
00925 }
00926 TransportReceiveListener* const trl = recv_listener_for(iter->first);
00927 if (trl) {
00928 WriterIdSeq pub_ids;
00929 set_to_seq(iter->second, pub_ids);
00930 trl->remove_associations(pub_ids, false);
00931 }
00932 }
00933 assoc_releasing_.clear();
00934 }
00935
00936 int
00937 DataLink::handle_timeout(const ACE_Time_Value& , const void* )
00938 {
00939 if (this->scheduled_to_stop_at_ != ACE_Time_Value::zero) {
00940 VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
00941 this->impl_->unbind_link(this);
00942
00943 if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
00944 this->stop();
00945 }
00946 }
00947 this->_remove_ref();
00948 return 0;
00949 }
00950
00951 int
00952 DataLink::handle_close(ACE_HANDLE h, ACE_Reactor_Mask m)
00953 {
00954 if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
00955
00956
00957 handle_timeout(ACE_Time_Value::zero, 0);
00958 }
00959
00960 return 0;
00961 }
00962
00963 void
00964 DataLink::set_dscp_codepoint(int cp, ACE_SOCK& socket)
00965 {
00966
00967
00968
00969
00970 int result = 0;
00971
00972
00973 int tos = cp << 2;
00974
00975 const char* which = "IPV4 TOS";
00976 #if defined (ACE_HAS_IPV6)
00977 ACE_INET_Addr local_address;
00978
00979 if (socket.get_local_addr(local_address) == -1) {
00980 return;
00981
00982 } else if (local_address.get_type() == AF_INET6)
00983 #if !defined (IPV6_TCLASS)
00984 {
00985 if (DCPS_debug_level > 0) {
00986 ACE_ERROR((LM_ERROR,
00987 ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
00988 ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
00989 cp));
00990 }
00991
00992 return;
00993 }
00994
00995 #else
00996 {
00997 which = "IPV6 TCLASS";
00998 result = socket.set_option(
00999 IPPROTO_IPV6,
01000 IPV6_TCLASS,
01001 &tos,
01002 sizeof(tos));
01003
01004 } else
01005
01006 #endif
01007 #endif
01008
01009 #ifdef IP_TOS
01010 result = socket.set_option(
01011 IPPROTO_IP,
01012 IP_TOS,
01013 &tos,
01014 sizeof(tos));
01015
01016 if ((result == -1) && (errno != ENOTSUP)
01017 #ifdef WSAEINVAL
01018 && (errno != WSAEINVAL)
01019 #endif
01020 ) {
01021 #endif // IP_TOS
01022 ACE_DEBUG((LM_DEBUG,
01023 ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01024 ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
01025 ACE_TEXT("try running as superuser.\n"),
01026 which,
01027 cp));
01028 #ifdef IP_TOS
01029 } else if (DCPS_debug_level > 4) {
01030 ACE_DEBUG((LM_DEBUG,
01031 ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
01032 ACE_TEXT("set %C codepoint to %d.\n"),
01033 which,
01034 cp));
01035 }
01036 #endif
01037 }
01038
01039 #ifndef OPENDDS_SAFETY_PROFILE
01040 std::ostream&
01041 operator<<(std::ostream& str, const DataLink& value)
01042 {
01043 str << " There are " << value.assoc_by_local_.size()
01044 << " local entities currently using this link comprising following associations:"
01045 << std::endl;
01046
01047 for (DataLink::AssocByLocal::const_iterator
01048 localId = value.assoc_by_local_.begin();
01049 localId != value.assoc_by_local_.end();
01050 ++localId) {
01051 for (RepoIdSet::const_iterator
01052 remoteId = localId->second.begin();
01053 remoteId != localId->second.end();
01054 ++remoteId) {
01055 str << GuidConverter(localId->first) << " --> "
01056 << GuidConverter(*remoteId) << " " << std::endl;
01057 }
01058 }
01059 return str;
01060 }
01061 #endif
01062
01063 }
01064 }