LCOV - code coverage report
Current view: top level - DCPS/transport/framework - DataLink.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 629 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 50 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : #include "DataLink.h"
      10             : 
      11             : #include "ReceivedDataSample.h"
      12             : 
      13             : #include "TransportImpl.h"
      14             : #include "TransportInst.h"
      15             : #include "TransportClient.h"
      16             : 
      17             : #include "dds/DCPS/DataWriterImpl.h"
      18             : #include "dds/DCPS/DataReaderImpl.h"
      19             : #include "dds/DCPS/Service_Participant.h"
      20             : #include "dds/DCPS/GuidConverter.h"
      21             : #include "dds/DdsDcpsGuidTypeSupportImpl.h"
      22             : #include "dds/DCPS/Util.h"
      23             : #include "dds/DCPS/Definitions.h"
      24             : #include "dds/DCPS/SafetyProfileStreams.h"
      25             : 
      26             : #include "EntryExit.h"
      27             : #include "tao/debug.h"
      28             : #include "ace/Reactor.h"
      29             : #include "ace/SOCK.h"
      30             : 
      31             : 
      32             : #if !defined (__ACE_INLINE__)
      33             : #include "DataLink.inl"
      34             : #endif /* __ACE_INLINE__ */
      35             : 
      36             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      37             : 
      38             : namespace OpenDDS {
      39             : namespace DCPS {
      40             : 
      41             : /// Only called by our TransportImpl object.
      42           0 : DataLink::DataLink(const TransportImpl_rch& impl, Priority priority, bool is_loopback,
      43           0 :                    bool is_active)
      44           0 :   : stopped_(false),
      45           0 :     impl_(impl),
      46           0 :     transport_priority_(priority),
      47           0 :     scheduling_release_(false),
      48           0 :     is_loopback_(is_loopback),
      49           0 :     is_active_(is_active),
      50           0 :     started_(false),
      51           0 :     send_response_listener_("DataLink"),
      52           0 :     interceptor_(impl->reactor(), impl->reactor_owner())
      53             : {
      54             :   DBG_ENTRY_LVL("DataLink", "DataLink", 6);
      55             : 
      56           0 :   id_ = DataLink::get_next_datalink_id();
      57             : 
      58           0 :   long datalink_release_delay = TransportInst::DEFAULT_DATALINK_RELEASE_DELAY;
      59           0 :   size_t control_chunks = TransportInst::DEFAULT_DATALINK_CONTROL_CHUNKS;
      60             : 
      61           0 :   TransportInst_rch cfg = impl->config();
      62           0 :   if (cfg) {
      63           0 :     datalink_release_delay = cfg->datalink_release_delay_;
      64           0 :     if (cfg->thread_per_connection_) {
      65           0 :       thr_per_con_send_task_.reset(new ThreadPerConnectionSendTask(this));
      66             : 
      67           0 :       if (thr_per_con_send_task_->open() == -1) {
      68           0 :         ACE_ERROR((LM_ERROR,
      69             :                    ACE_TEXT("(%P|%t) DataLink::DataLink: ")
      70             :                    ACE_TEXT("failed to open ThreadPerConnectionSendTask\n")));
      71             : 
      72           0 :       } else if (DCPS_debug_level > 4) {
      73           0 :         ACE_DEBUG((LM_DEBUG,
      74             :                    ACE_TEXT("(%P|%t) DataLink::DataLink - ")
      75             :                    ACE_TEXT("started new thread to send data with.\n")));
      76             :       }
      77             :     }
      78           0 :     control_chunks = cfg->datalink_control_chunks_;
      79             :   }
      80             : 
      81             :   // Initialize transport control sample allocators:
      82           0 :   datalink_release_delay_ = TimeDuration::from_msec(datalink_release_delay);
      83             : 
      84           0 :   this->mb_allocator_.reset(new MessageBlockAllocator(control_chunks));
      85           0 :   this->db_allocator_.reset(new DataBlockAllocator(control_chunks));
      86           0 : }
      87             : 
      88           0 : DataLink::~DataLink()
      89             : {
      90             :   DBG_ENTRY_LVL("DataLink", "~DataLink", 6);
      91             : 
      92           0 :   if (!assoc_by_local_.empty()) {
      93           0 :     ACE_DEBUG((LM_WARNING,
      94             :                ACE_TEXT("(%P|%t) WARNING: DataLink[%@]::~DataLink() - ")
      95             :                ACE_TEXT("link still in use by %d entities when deleted!\n"),
      96             :                this, assoc_by_local_.size()));
      97             :   }
      98             : 
      99           0 :   if (this->thr_per_con_send_task_ != 0) {
     100           0 :     this->thr_per_con_send_task_->close(1);
     101             :   }
     102           0 : }
     103             : 
     104             : TransportImpl_rch
     105           0 : DataLink::impl() const
     106             : {
     107           0 :   return impl_.lock();
     108             : }
     109             : 
     110             : bool
     111           0 : DataLink::add_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote)
     112             : {
     113           0 :   const DataLink_rch link(this, inc_count());
     114             : 
     115           0 :   TransportClient_rch client_lock = client.lock();
     116           0 :   const GUID_t client_id = client_lock ? client_lock->get_guid() : GUID_UNKNOWN;
     117             : 
     118           0 :   GuardType guard(strategy_lock_);
     119             : 
     120           0 :   if (client_lock) {
     121           0 :     PendingOnStartsMap::iterator it = pending_on_starts_.find(remote);
     122           0 :     if (it != pending_on_starts_.end()) {
     123           0 :       RepoIdSet::iterator it2 = it->second.find(client_id);
     124           0 :       if (it2 != it->second.end()) {
     125           0 :         it->second.erase(it2);
     126           0 :         if (it->second.empty()) {
     127           0 :           pending_on_starts_.erase(it);
     128             :         }
     129           0 :         guard.release();
     130           0 :         interceptor_.execute_or_enqueue(make_rch<ImmediateStart>(link, client, remote));
     131             :       } else {
     132           0 :         on_start_callbacks_[remote][client_id] = client;
     133             :       }
     134             :     } else {
     135           0 :       on_start_callbacks_[remote][client_id] = client;
     136             :     }
     137             :   }
     138             : 
     139           0 :   if (started_ && !send_strategy_.is_nil()) {
     140           0 :     return false; // link already started
     141             :   }
     142           0 :   return true;
     143           0 : }
     144             : 
     145             : void
     146           0 : DataLink::remove_startup_callbacks(const GUID_t& local, const GUID_t& remote)
     147             : {
     148           0 :   GuardType guard(strategy_lock_);
     149             : 
     150           0 :   OnStartCallbackMap::iterator oit = on_start_callbacks_.find(remote);
     151           0 :   if (oit != on_start_callbacks_.end()) {
     152           0 :     RepoToClientMap::iterator oit2 = oit->second.find(local);
     153           0 :     if (oit2 != oit->second.end()) {
     154           0 :       oit->second.erase(oit2);
     155           0 :       if (oit->second.empty()) {
     156           0 :         on_start_callbacks_.erase(oit);
     157             :       }
     158             :     }
     159             :   }
     160           0 :   PendingOnStartsMap::iterator pit = pending_on_starts_.find(remote);
     161           0 :   if (pit != pending_on_starts_.end()) {
     162           0 :     RepoIdSet::iterator pit2 = pit->second.find(local);
     163           0 :     if (pit2 != pit->second.end()) {
     164           0 :       pit->second.erase(pit2);
     165           0 :       if (pit->second.empty()) {
     166           0 :         pending_on_starts_.erase(pit);
     167             :       }
     168             :     }
     169             :   }
     170           0 : }
     171             : 
     172             : void
     173           0 : DataLink::remove_on_start_callback(const TransportClient_wrch& client, const GUID_t& remote)
     174             : {
     175           0 :   TransportClient_rch client_lock = client.lock();
     176           0 :   if (client_lock) {
     177           0 :     const GUID_t id = client_lock->get_guid();
     178             : 
     179           0 :     GuardType guard(strategy_lock_);
     180           0 :     OnStartCallbackMap::iterator it = on_start_callbacks_.find(remote);
     181           0 :     if (it != on_start_callbacks_.end()) {
     182           0 :       RepoToClientMap::iterator it2 = it->second.find(id);
     183           0 :       if (it2 != it->second.end()) {
     184           0 :         it->second.erase(it2);
     185           0 :         if (it->second.empty()) {
     186           0 :           on_start_callbacks_.erase(it);
     187             :         }
     188             :       }
     189             :     }
     190           0 :   }
     191           0 : }
     192             : 
     193             : void
     194           0 : DataLink::invoke_on_start_callbacks(bool success)
     195             : {
     196           0 :   const DataLink_rch link(success ? this : 0, inc_count());
     197             : 
     198             :   while (true) {
     199           0 :     GuardType guard(strategy_lock_);
     200             : 
     201           0 :     if (on_start_callbacks_.empty()) {
     202           0 :       break;
     203             :     }
     204             : 
     205           0 :     GUID_t remote = GUID_UNKNOWN;
     206           0 :     TransportClient_wrch client;
     207           0 :     OnStartCallbackMap::iterator it = on_start_callbacks_.begin();
     208           0 :     if (it != on_start_callbacks_.end()) {
     209           0 :       remote = it->first;
     210           0 :       RepoToClientMap::iterator it2 = it->second.begin();
     211           0 :       if (it2 != it->second.end()) {
     212           0 :         client = it2->second;
     213           0 :         it->second.erase(it2);
     214           0 :         if (it->second.empty()) {
     215           0 :           on_start_callbacks_.erase(it);
     216             :         }
     217             :       }
     218             :     }
     219             : 
     220           0 :     guard.release();
     221           0 :     if (success) {
     222           0 :       TransportClient_rch client_lock = client.lock();
     223           0 :       if (client_lock) {
     224           0 :         client_lock->use_datalink(remote, link);
     225             :       }
     226           0 :     }
     227           0 :   }
     228           0 : }
     229             : 
     230           0 : bool DataLink::invoke_on_start_callbacks(const GUID_t& local, const GUID_t& remote, bool success)
     231             : {
     232           0 :   const DataLink_rch link(success ? this : 0, inc_count());
     233             : 
     234           0 :   TransportClient_wrch client;
     235           0 :   bool made_callback = false;
     236             : 
     237             :   {
     238           0 :     GuardType guard(strategy_lock_);
     239             : 
     240           0 :     OnStartCallbackMap::iterator it = on_start_callbacks_.find(remote);
     241           0 :     if (it != on_start_callbacks_.end()) {
     242           0 :       RepoToClientMap::iterator it2 = it->second.find(local);
     243           0 :       if (it2 != it->second.end()) {
     244           0 :         client = it2->second;
     245           0 :         it->second.erase(it2);
     246           0 :         if (it->second.empty()) {
     247           0 :           on_start_callbacks_.erase(it);
     248             :         }
     249             :       } else {
     250           0 :         pending_on_starts_[remote].insert(local);
     251             :       }
     252             :     } else {
     253           0 :       pending_on_starts_[remote].insert(local);
     254             :     }
     255           0 :   }
     256             : 
     257           0 :   if (success) {
     258           0 :     TransportClient_rch client_lock = client.lock();
     259           0 :     if (client_lock) {
     260           0 :       client_lock->use_datalink(remote, link);
     261           0 :       made_callback = true;
     262             :     }
     263           0 :   }
     264             : 
     265           0 :   return made_callback;
     266           0 : }
     267             : 
     268             : //Reactor invokes this after being notified in schedule_stop or cancel_release
     269             : int
     270           0 : DataLink::handle_exception(ACE_HANDLE /* fd */)
     271             : {
     272           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
     273             : 
     274           0 :   const MonotonicTimePoint now = MonotonicTimePoint::now();
     275           0 :   if (scheduled_to_stop_at_.is_zero()) {
     276           0 :     if (DCPS_debug_level > 0) {
     277           0 :       ACE_DEBUG((LM_DEBUG,
     278             :                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - not scheduling or stopping\n")));
     279             :     }
     280           0 :     TransportImpl_rch impl = impl_.lock();
     281           0 :     if (impl) {
     282           0 :       ACE_Reactor_Timer_Interface* reactor = impl->timer();
     283           0 :       if (reactor && reactor->cancel_timer(this) > 0) {
     284           0 :         if (DCPS_debug_level > 0) {
     285           0 :           ACE_DEBUG((LM_DEBUG,
     286             :                      ACE_TEXT("(%P|%t) DataLink::handle_exception() - cancelled future release timer\n")));
     287             :         }
     288             :       }
     289             :     }
     290           0 :     return 0;
     291           0 :   } else if (scheduled_to_stop_at_ <= now) {
     292           0 :     if (this->scheduling_release_) {
     293           0 :       if (DCPS_debug_level > 0) {
     294           0 :         ACE_DEBUG((LM_DEBUG,
     295             :                    ACE_TEXT("(%P|%t) DataLink::handle_exception() - delay already elapsed so handle_timeout now\n")));
     296             :       }
     297           0 :       this->handle_timeout(ACE_Time_Value::zero, 0);
     298           0 :       return 0;
     299             :     }
     300           0 :     if (DCPS_debug_level > 0) {
     301           0 :       ACE_DEBUG((LM_DEBUG,
     302             :                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - stopping now\n")));
     303             :     }
     304           0 :     this->stop();
     305           0 :     return 0;
     306             :   } else /* SCHEDULE TO STOP IN THE FUTURE*/ {
     307           0 :     if (DCPS_debug_level > 0) {
     308           0 :       ACE_DEBUG((LM_DEBUG,
     309             :                  ACE_TEXT("(%P|%t) DataLink::handle_exception() - (delay) scheduling timer for future release\n")));
     310             :     }
     311           0 :     TransportImpl_rch impl = impl_.lock();
     312           0 :     if (impl) {
     313           0 :       ACE_Reactor_Timer_Interface* reactor = impl->timer();
     314           0 :       const TimeDuration future_release_time = scheduled_to_stop_at_ - now;
     315           0 :       reactor->schedule_timer(this, 0, future_release_time.value());
     316           0 :     }
     317           0 :   }
     318           0 :   return 0;
     319           0 : }
     320             : 
     321             : //Allows DataLink::stop to be done on the reactor thread so that
     322             : //this thread avoids possibly deadlocking trying to access reactor
     323             : //to stop strategies or schedule timers
     324             : void
     325           0 : DataLink::schedule_stop(const MonotonicTimePoint& schedule_to_stop_at)
     326             : {
     327           0 :   if (!stopped_ && scheduled_to_stop_at_.is_zero()) {
     328           0 :     this->scheduled_to_stop_at_ = schedule_to_stop_at;
     329           0 :     notify_reactor();
     330             :     // reactor will invoke our DataLink::handle_exception()
     331             :   } else {
     332           0 :     if (DCPS_debug_level > 0) {
     333           0 :       ACE_DEBUG((LM_DEBUG,
     334             :                  ACE_TEXT("(%P|%t) DataLink::schedule_stop() - Already stopped or already scheduled for stop\n")));
     335             :     }
     336             :   }
     337           0 : }
     338             : 
     339             : void
     340           0 : DataLink::notify_reactor()
     341             : {
     342           0 :   TransportImpl_rch impl = impl_.lock();
     343           0 :   if (impl) {
     344           0 :     ReactorTask_rch rt(impl->reactor_task());
     345           0 :     if (rt) {
     346           0 :       ACE_Reactor* reactor = rt->get_reactor();
     347           0 :       if (reactor) {
     348           0 :         reactor->notify(this);
     349             :       }
     350             :     }
     351           0 :   }
     352           0 : }
     353             : 
     354             : void
     355           0 : DataLink::stop()
     356             : {
     357           0 :   pre_stop_i();
     358             : 
     359           0 :   TransportSendStrategy_rch send_strategy;
     360           0 :   TransportStrategy_rch recv_strategy;
     361             : 
     362             :   {
     363           0 :     GuardType guard(strategy_lock_);
     364             : 
     365           0 :     if (stopped_) return;
     366             : 
     367           0 :     send_strategy = send_strategy_;
     368           0 :     send_strategy_.reset();
     369             : 
     370           0 :     recv_strategy = receive_strategy_;
     371           0 :     receive_strategy_.reset();
     372           0 :   }
     373             : 
     374           0 :   if (!send_strategy.is_nil()) {
     375           0 :     send_strategy->stop();
     376             :   }
     377             : 
     378           0 :   if (!recv_strategy.is_nil()) {
     379           0 :     recv_strategy->stop();
     380             :   }
     381             : 
     382           0 :   stop_i();
     383           0 :   stopped_ = true;
     384           0 :   scheduled_to_stop_at_ = MonotonicTimePoint::zero_value;
     385           0 : }
     386             : 
     387             : void
     388           0 : DataLink::resume_send()
     389             : {
     390           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
     391             : 
     392           0 :   if (strategy && strategy->isDirectMode()) {
     393           0 :     strategy->resume_send();
     394             :   }
     395           0 : }
     396             : 
     397             : int
     398           0 : DataLink::make_reservation(const GUID_t& remote_subscription_id,
     399             :                            const GUID_t& local_publication_id,
     400             :                            const TransportSendListener_wrch& send_listener,
     401             :                            bool reliable)
     402             : {
     403             :   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
     404             : 
     405           0 :   if (DCPS_debug_level > 9) {
     406           0 :     LogGuid local_log(local_publication_id), remote_log(remote_subscription_id);
     407           0 :     ACE_DEBUG((LM_DEBUG,
     408             :         ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
     409             :         ACE_TEXT("creating association local publication %C ")
     410             :         ACE_TEXT("<--> with remote subscription %C.\n"),
     411             :         local_log .c_str(),
     412             :         remote_log.c_str()));
     413           0 :   }
     414             : 
     415           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
     416             : 
     417           0 :   if (strategy) {
     418           0 :     strategy->link_released(false);
     419             :   }
     420             : 
     421             :   {
     422           0 :     GuardType guard(pub_sub_maps_lock_);
     423             : 
     424           0 :     LocalAssociationInfo& info = assoc_by_local_[local_publication_id];
     425           0 :     info.reliable_ = reliable;
     426           0 :     info.associated_.insert(remote_subscription_id);
     427           0 :     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_subscription_id];
     428             : 
     429           0 :     if (rls.is_nil())
     430           0 :       rls = make_rch<ReceiveListenerSet>();
     431           0 :     rls->insert(local_publication_id, TransportReceiveListener_rch());
     432             : 
     433           0 :     send_listeners_.insert(std::make_pair(local_publication_id, send_listener));
     434           0 :   }
     435           0 :   return 0;
     436           0 : }
     437             : 
     438             : int
     439           0 : DataLink::make_reservation(const GUID_t& remote_publication_id,
     440             :                            const GUID_t& local_subscription_id,
     441             :                            const TransportReceiveListener_wrch& receive_listener,
     442             :                            bool reliable)
     443             : {
     444             :   DBG_ENTRY_LVL("DataLink", "make_reservation", 6);
     445             : 
     446           0 :   if (DCPS_debug_level > 9) {
     447           0 :     LogGuid local(local_subscription_id), remote(remote_publication_id);
     448           0 :     ACE_DEBUG((LM_DEBUG,
     449             :                ACE_TEXT("(%P|%t) DataLink::make_reservation() - ")
     450             :                ACE_TEXT("creating association local subscription %C ")
     451             :                ACE_TEXT("<--> with remote publication %C.\n"),
     452             :                local.c_str(), remote.c_str()));
     453           0 :   }
     454             : 
     455           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
     456             : 
     457           0 :   if (strategy) {
     458           0 :     strategy->link_released(false);
     459             :   }
     460             : 
     461             :   {
     462           0 :     GuardType guard(pub_sub_maps_lock_);
     463             : 
     464           0 :     LocalAssociationInfo& info = assoc_by_local_[local_subscription_id];
     465           0 :     info.reliable_ = reliable;
     466           0 :     info.associated_.insert(remote_publication_id);
     467           0 :     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_publication_id];
     468             : 
     469           0 :     if (rls.is_nil())
     470           0 :       rls = make_rch<ReceiveListenerSet>();
     471           0 :     rls->insert(local_subscription_id, receive_listener);
     472             : 
     473           0 :     recv_listeners_.insert(std::make_pair(local_subscription_id,
     474             :                                           receive_listener));
     475           0 :   }
     476           0 :   return 0;
     477           0 : }
     478             : 
     479             : template <typename Seq>
     480           0 : void set_to_seq(const RepoIdSet& rids, Seq& seq)
     481             : {
     482           0 :   seq.length(static_cast<CORBA::ULong>(rids.size()));
     483           0 :   CORBA::ULong i = 0;
     484           0 :   for (RepoIdSet::const_iterator iter = rids.begin(); iter != rids.end(); ++iter) {
     485           0 :     seq[i++] = *iter;
     486             :   }
     487           0 : }
     488             : 
     489             : GUIDSeq*
     490           0 : DataLink::peer_ids(const GUID_t& local_id) const
     491             : {
     492           0 :   GuardType guard(pub_sub_maps_lock_);
     493             : 
     494           0 :   const AssocByLocal::const_iterator iter = assoc_by_local_.find(local_id);
     495             : 
     496           0 :   if (iter == assoc_by_local_.end())
     497           0 :     return 0;
     498             : 
     499           0 :   GUIDSeq_var result = new GUIDSeq;
     500           0 :   set_to_seq(iter->second.associated_, static_cast<GUIDSeq&>(result));
     501           0 :   return result._retn();
     502           0 : }
     503             : 
     504             : /// This gets invoked when a TransportClient::remove_associations()
     505             : /// call has been made.  Because this DataLink can be shared amongst
     506             : /// different TransportClient objects, and different threads could
     507             : /// be "managing" the different TransportClient objects, we need
     508             : /// to make sure that this release_reservations() works in conjunction
     509             : /// with a simultaneous call (in another thread) to one of this
     510             : /// DataLink's make_reservation() methods.
     511             : void
     512           0 : DataLink::release_reservations(GUID_t remote_id, GUID_t local_id,
     513             :                                DataLinkSetMap& released_locals)
     514             : {
     515             :   DBG_ENTRY_LVL("DataLink", "release_reservations", 6);
     516             : 
     517           0 :   if (DCPS_debug_level > 9) {
     518           0 :     GuidConverter local(local_id);
     519           0 :     GuidConverter remote(remote_id);
     520           0 :     ACE_DEBUG((LM_DEBUG,
     521             :                ACE_TEXT("(%P|%t) DataLink::release_reservations() - ")
     522             :                ACE_TEXT("releasing association local: %C ")
     523             :                ACE_TEXT("<--> with remote %C.\n"),
     524             :                OPENDDS_STRING(local).c_str(),
     525             :                OPENDDS_STRING(remote).c_str()));
     526           0 :   }
     527             : 
     528           0 :   remove_startup_callbacks(local_id, remote_id);
     529             : 
     530             :   //let the specific class release its reservations
     531             :   //done this way to prevent deadlock of holding pub_sub_maps_lock_
     532             :   //then obtaining a specific class lock in release_reservations_i
     533             :   //which reverses lock ordering of the active send logic of needing
     534             :   //the specific class lock before obtaining the over arching DataLink
     535             :   //pub_sub_maps_lock_
     536           0 :   this->release_reservations_i(remote_id, local_id);
     537             : 
     538           0 :   bool release_remote_required = false;
     539             :   {
     540           0 :     GuardType guard(this->pub_sub_maps_lock_);
     541             : 
     542           0 :     if (this->stopped_) return;
     543             : 
     544           0 :     ReceiveListenerSet_rch& rls = assoc_by_remote_[remote_id];
     545           0 :     if (rls->size() == 1) {
     546           0 :       assoc_by_remote_.erase(remote_id);
     547           0 :       release_remote_required = true;
     548             :     } else {
     549           0 :       rls->remove(local_id);
     550             :     }
     551           0 :     RepoIdSet& ris = assoc_by_local_[local_id].associated_;
     552           0 :     if (ris.size() == 1) {
     553           0 :       DataLinkSet_rch& links = released_locals[local_id];
     554           0 :       if (links.is_nil()) {
     555           0 :         links = make_rch<DataLinkSet>();
     556             :       }
     557           0 :       links->insert_link(rchandle_from(this));
     558           0 :       assoc_by_local_.erase(local_id);
     559             :     } else {
     560           0 :       ris.erase(remote_id);
     561             :     }
     562             : 
     563           0 :     if (assoc_by_local_.empty()) {
     564           0 :       VDBG_LVL((LM_DEBUG,
     565             :                 ACE_TEXT("(%P|%t) DataLink::release_reservations: ")
     566             :                 ACE_TEXT("release_datalink due to no remaining pubs or subs.\n")), 5);
     567             : 
     568           0 :       guard.release();
     569           0 :       TransportImpl_rch impl = impl_.lock();
     570           0 :       if (impl) {
     571           0 :         impl->release_datalink(this);
     572             :       }
     573           0 :     }
     574           0 :   }
     575           0 :   if (release_remote_required) {
     576           0 :     release_remote_i(remote_id);
     577             :   }
     578             : }
     579             : 
     580             : void
     581           0 : DataLink::schedule_delayed_release()
     582             : {
     583             :   DBG_ENTRY_LVL("DataLink", "schedule_delayed_release", 6);
     584             : 
     585           0 :   VDBG((LM_DEBUG, "(%P|%t) DataLink[%@]::schedule_delayed_release\n", this));
     586             : 
     587             :   // The samples have to be removed at this point, otherwise the samples
     588             :   // can not be delivered when new association is added and still use
     589             :   // this connection/datalink.
     590           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
     591             : 
     592           0 :   if (strategy) {
     593           0 :     strategy->clear(TransportSendStrategy::MODE_DIRECT);
     594             :   }
     595             : 
     596           0 :   const MonotonicTimePoint future_release_time(MonotonicTimePoint::now() + datalink_release_delay_);
     597           0 :   schedule_stop(future_release_time);
     598           0 : }
     599             : 
     600             : bool
     601           0 : DataLink::cancel_release()
     602             : {
     603             :   DBG_ENTRY_LVL("DataLink", "cancel_release", 6);
     604           0 :   if (stopped_) {
     605           0 :     if (DCPS_debug_level > 0) {
     606           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] already stopped_ cannot cancel release\n", this));
     607             :     }
     608           0 :     return false;
     609             :   }
     610           0 :   if (scheduling_release_) {
     611           0 :     if (DCPS_debug_level > 0) {
     612           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::cancel_release - link[%@] currently scheduling release, notify reactor of cancel\n", this));
     613             :     }
     614           0 :     this->set_scheduling_release(false);
     615           0 :     scheduled_to_stop_at_ = MonotonicTimePoint::zero_value;
     616           0 :     notify_reactor();
     617             :   }
     618           0 :   return true;
     619             : }
     620             : 
     621             : void
     622           0 : DataLink::stop_i()
     623             : {
     624             :   DBG_ENTRY_LVL("DataLink", "stop_i", 6);
     625           0 : }
     626             : 
     627             : ACE_Message_Block*
     628           0 : DataLink::create_control(char submessage_id,
     629             :                          DataSampleHeader& header,
     630             :                          Message_Block_Ptr data)
     631             : {
     632             :   DBG_ENTRY_LVL("DataLink", "create_control", 6);
     633             : 
     634           0 :   header.byte_order_ = ACE_CDR_BYTE_ORDER;
     635           0 :   header.message_id_ = TRANSPORT_CONTROL;
     636           0 :   header.submessage_id_ = submessage_id;
     637           0 :   header.message_length_ = static_cast<ACE_UINT32>(data->total_length());
     638             : 
     639           0 :   ACE_Message_Block* message = 0;
     640           0 :   ACE_NEW_MALLOC_RETURN(message,
     641             :                         static_cast<ACE_Message_Block*>(
     642             :                           this->mb_allocator_->malloc(sizeof(ACE_Message_Block))),
     643             :                         ACE_Message_Block(header.get_max_serialized_size(),
     644             :                                           ACE_Message_Block::MB_DATA,
     645             :                                           data.release(),
     646             :                                           0,  // data
     647             :                                           0,  // allocator_strategy
     648             :                                           0,  // locking_strategy
     649             :                                           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
     650             :                                           ACE_Time_Value::zero,
     651             :                                           ACE_Time_Value::max_time,
     652             :                                           this->db_allocator_.get(),
     653             :                                           this->mb_allocator_.get()),
     654             :                         0);
     655             : 
     656           0 :   if (!(*message << header)) {
     657           0 :     ACE_ERROR((LM_ERROR,
     658             :                ACE_TEXT("(%P|%t) DataLink::create_control: ")
     659             :                ACE_TEXT("cannot put header in message\n")));
     660           0 :     ACE_DES_FREE(message, this->mb_allocator_->free, ACE_Message_Block);
     661           0 :     message = 0;
     662             :   }
     663             : 
     664           0 :   return message;
     665             : }
     666             : 
     667             : SendControlStatus
     668           0 : DataLink::send_control(const DataSampleHeader& header, Message_Block_Ptr message)
     669             : {
     670             :   DBG_ENTRY_LVL("DataLink", "send_control", 6);
     671             : 
     672             :   TransportSendControlElement* const elem = new TransportSendControlElement(1, // initial_count
     673             :                                        GUID_UNKNOWN, &send_response_listener_,
     674           0 :                                        header, move(message));
     675             : 
     676           0 :   send_response_listener_.track_message();
     677             : 
     678           0 :   GUID_t senderId(header.publication_id_);
     679           0 :   send_start();
     680           0 :   send(elem);
     681           0 :   send_stop(senderId);
     682             : 
     683           0 :   return SEND_CONTROL_OK;
     684             : }
     685             : 
     686             : /// This method will "deliver" the sample to all TransportReceiveListeners
     687             : /// within this DataLink that are interested in the (remote) publisher id
     688             : /// that sent the sample.
     689             : int
     690           0 : DataLink::data_received(ReceivedDataSample& sample,
     691             :                         const GUID_t& readerId /* = GUID_UNKNOWN */)
     692             : {
     693           0 :   data_received_i(sample, readerId, RepoIdSet(), ReceiveListenerSet::SET_EXCLUDED);
     694           0 :   return 0;
     695             : }
     696             : 
     697             : void
     698           0 : DataLink::data_received_include(ReceivedDataSample& sample, const RepoIdSet& incl)
     699             : {
     700           0 :   data_received_i(sample, GUID_UNKNOWN, incl, ReceiveListenerSet::SET_INCLUDED);
     701           0 : }
     702             : 
     703             : void
     704           0 : DataLink::data_received_i(ReceivedDataSample& sample,
     705             :                           const GUID_t& readerId,
     706             :                           const RepoIdSet& incl_excl,
     707             :                           ReceiveListenerSet::ConstrainReceiveSet constrain)
     708             : {
     709             :   DBG_ENTRY_LVL("DataLink", "data_received_i", 6);
     710             :   // Which remote publication sent this message?
     711           0 :   const GUID_t& publication_id = sample.header_.publication_id_;
     712             : 
     713             :   // Locate the set of TransportReceiveListeners associated with this
     714             :   // DataLink that are interested in hearing about any samples received
     715             :   // from the remote publisher_id.
     716           0 :   if (DCPS_debug_level > 9) {
     717           0 :     const GuidConverter converter(publication_id);
     718           0 :     const GuidConverter reader(readerId);
     719           0 :     ACE_DEBUG((LM_DEBUG,
     720             :                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
     721             :                ACE_TEXT("from publication %C received sample: %C to readerId %C (%C).\n"),
     722             :                OPENDDS_STRING(converter).c_str(),
     723             :                to_string(sample.header_).c_str(),
     724             :                OPENDDS_STRING(reader).c_str(),
     725             :                constrain == ReceiveListenerSet::SET_EXCLUDED ? "SET_EXCLUDED" : "SET_INCLUDED"));
     726           0 :   }
     727             : 
     728           0 :   if (Transport_debug_level > 9) {
     729           0 :     const GuidConverter converter(publication_id);
     730           0 :     ACE_DEBUG((LM_DEBUG,
     731             :                ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
     732             :                ACE_TEXT("from publication %C received sample: %C.\n"),
     733             :                OPENDDS_STRING(converter).c_str(),
     734             :                to_string(sample.header_).c_str()));
     735           0 :   }
     736             : 
     737           0 :   ReceiveListenerSet_rch listener_set;
     738           0 :   TransportReceiveListener_rch listener;
     739             :   {
     740           0 :     GuardType guard(this->pub_sub_maps_lock_);
     741           0 :     AssocByRemote::iterator iter = assoc_by_remote_.find(publication_id);
     742           0 :     if (iter != assoc_by_remote_.end()) {
     743           0 :       listener_set = iter->second;
     744             :     } else {
     745           0 :       listener = this->default_listener_.lock();
     746             :     }
     747           0 :   }
     748             : 
     749           0 :   if (listener_set.is_nil()) {
     750           0 :     if (listener) {
     751           0 :       listener->data_received(sample);
     752             :     } else {
     753             :       // Nobody has any interest in this message.  Drop it on the floor.
     754           0 :       if (Transport_debug_level > 4) {
     755           0 :         const GuidConverter converter(publication_id);
     756           0 :         ACE_DEBUG((LM_DEBUG,
     757             :                    ACE_TEXT("(%P|%t) DataLink::data_received_i: ")
     758             :                    ACE_TEXT(" discarding sample from publication %C due to no listeners.\n"),
     759             :                    OPENDDS_STRING(converter).c_str()));
     760           0 :       }
     761             :     }
     762           0 :     return;
     763             :   }
     764             : 
     765           0 :   if (readerId != GUID_UNKNOWN) {
     766           0 :     listener_set->data_received(sample, readerId);
     767           0 :     return;
     768             :   }
     769             : 
     770             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
     771             : 
     772           0 :   if (sample.header_.content_filter_
     773           0 :       && sample.header_.content_filter_entries_.length()) {
     774           0 :     ReceiveListenerSet subset(*listener_set.in());
     775           0 :     subset.remove_all(sample.header_.content_filter_entries_);
     776           0 :     subset.data_received(sample, incl_excl, constrain);
     777             : 
     778           0 :   } else {
     779             : #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
     780             : 
     781           0 :     if (DCPS_debug_level > 9) {
     782             :       // Just get the set to do our dirty work by having it iterate over its
     783             :       // collection of TransportReceiveListeners, and invoke the data_received()
     784             :       // method on each one.
     785           0 :       OPENDDS_STRING included_ids;
     786           0 :       bool first = true;
     787           0 :       RepoIdSet::const_iterator iter = incl_excl.begin();
     788           0 :       while(iter != incl_excl.end()) {
     789           0 :         included_ids += (first ? "" : "\n") + OPENDDS_STRING(GuidConverter(*iter));
     790           0 :         first = false;
     791           0 :         ++iter;
     792             :       }
     793           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DataLink::data_received_i - normal data received to each subscription in listener_set %C ids:%C\n",
     794             :                  constrain == ReceiveListenerSet::SET_EXCLUDED ? "exclude" : "include", included_ids.c_str()));
     795           0 :     }
     796           0 :     listener_set->data_received(sample, incl_excl, constrain);
     797             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
     798             :   }
     799             : 
     800             : #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
     801           0 : }
     802             : 
     803             : // static
     804             : ACE_UINT64
     805           0 : DataLink::get_next_datalink_id()
     806             : {
     807             :   static ACE_UINT64 next_id = 0;
     808           0 :   static LockType lock;
     809             : 
     810             :   ACE_UINT64 id;
     811             :   {
     812           0 :     GuardType guard(lock);
     813           0 :     id = next_id++;
     814             : 
     815           0 :     if (0 == next_id) {
     816           0 :       ACE_ERROR((LM_ERROR,
     817             :                  ACE_TEXT("ERROR: DataLink::get_next_datalink_id: ")
     818             :                  ACE_TEXT("has rolled over and is reusing ids!\n")));
     819             :     }
     820           0 :   }
     821             : 
     822           0 :   return id;
     823             : }
     824             : 
     825             : void
     826           0 : DataLink::transport_shutdown()
     827             : {
     828             :   DBG_ENTRY_LVL("DataLink", "transport_shutdown", 6);
     829             : 
     830             :   //this->cancel_release();
     831           0 :   this->set_scheduling_release(false);
     832           0 :   scheduled_to_stop_at_ = MonotonicTimePoint::zero_value;
     833             : 
     834             :   {
     835           0 :     TransportImpl_rch impl = impl_.lock();
     836           0 :     if (impl) {
     837           0 :       ACE_Reactor_Timer_Interface* reactor = impl->timer();
     838           0 :       reactor->cancel_timer(this);
     839             :     }
     840           0 :   }
     841           0 :   this->stop();
     842             :   // this->send_listeners_.clear();
     843             :   // this->recv_listeners_.clear();
     844             :   // Drop our reference to the TransportImpl object
     845           0 : }
     846             : 
     847             : void
     848           0 : DataLink::notify(ConnectionNotice notice)
     849             : {
     850             :   DBG_ENTRY_LVL("DataLink", "notify", 6);
     851             : 
     852           0 :   VDBG((LM_DEBUG,
     853             :         ACE_TEXT("(%P|%t) DataLink::notify: this(%X) notify %C\n"),
     854             :         this,
     855             :         connection_notice_as_str(notice)));
     856             : 
     857           0 :   GuardType guard(this->pub_sub_maps_lock_);
     858             : 
     859             :   // Notify the datawriters
     860             :   // the lost publications due to a connection problem.
     861           0 :   for (IdToSendListenerMap::iterator itr = send_listeners_.begin();
     862           0 :        itr != send_listeners_.end(); ++itr) {
     863             : 
     864           0 :     TransportSendListener_rch tsl = itr->second.lock();
     865             : 
     866           0 :     if (tsl) {
     867           0 :       if (Transport_debug_level > 0) {
     868           0 :         GuidConverter converter(itr->first);
     869           0 :         ACE_DEBUG((LM_DEBUG,
     870             :                    ACE_TEXT("(%P|%t) DataLink::notify: ")
     871             :                    ACE_TEXT("notify pub %C %C.\n"),
     872             :                    OPENDDS_STRING(converter).c_str(),
     873             :                    connection_notice_as_str(notice)));
     874           0 :       }
     875           0 :       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
     876           0 :       if (local_it == assoc_by_local_.end()) {
     877           0 :         if (Transport_debug_level) {
     878           0 :           GuidConverter converter(itr->first);
     879           0 :           ACE_DEBUG((LM_DEBUG,
     880             :                      ACE_TEXT("(%P|%t) DataLink::notify: ")
     881             :                      ACE_TEXT("try to notify pub %C %C - no associations to notify.\n"),
     882             :                      OPENDDS_STRING(converter).c_str(),
     883             :                      connection_notice_as_str(notice)));
     884           0 :         }
     885           0 :         break;
     886             :       }
     887           0 :       const RepoIdSet& rids = local_it->second.associated_;
     888             : 
     889           0 :       ReaderIdSeq subids;
     890           0 :       set_to_seq(rids, subids);
     891             : 
     892           0 :       switch (notice) {
     893           0 :       case DISCONNECTED:
     894           0 :         tsl->notify_publication_disconnected(subids);
     895           0 :         break;
     896             : 
     897           0 :       case RECONNECTED:
     898           0 :         tsl->notify_publication_reconnected(subids);
     899           0 :         break;
     900             : 
     901           0 :       case LOST:
     902           0 :         tsl->notify_publication_lost(subids);
     903           0 :         break;
     904             : 
     905           0 :       default:
     906           0 :         ACE_ERROR((LM_ERROR,
     907             :                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
     908             :                    ACE_TEXT("unknown notice to TransportSendListener\n")));
     909           0 :         break;
     910             :       }
     911             : 
     912           0 :     } else {
     913           0 :       if (Transport_debug_level > 0) {
     914           0 :         GuidConverter converter(itr->first);
     915           0 :         ACE_DEBUG((LM_DEBUG,
     916             :                    ACE_TEXT("(%P|%t) DataLink::notify: ")
     917             :                    ACE_TEXT("not notify pub %C %C\n"),
     918             :                    OPENDDS_STRING(converter).c_str(),
     919             :                    connection_notice_as_str(notice)));
     920           0 :       }
     921             :     }
     922           0 :   }
     923             : 
     924             :   // Notify the datareaders registered with TransportImpl
     925             :   // the lost subscriptions due to a connection problem.
     926           0 :   for (IdToRecvListenerMap::iterator itr = recv_listeners_.begin();
     927           0 :        itr != recv_listeners_.end(); ++itr) {
     928             : 
     929           0 :     TransportReceiveListener_rch trl = itr->second.lock();
     930             : 
     931           0 :     if (trl) {
     932           0 :       if (Transport_debug_level > 0) {
     933           0 :         GuidConverter converter(itr->first);
     934           0 :         ACE_DEBUG((LM_DEBUG,
     935             :                    ACE_TEXT("(%P|%t) DataLink::notify: ")
     936             :                    ACE_TEXT("notify sub %C %C.\n"),
     937             :                    OPENDDS_STRING(converter).c_str(),
     938             :                    connection_notice_as_str(notice)));
     939           0 :       }
     940           0 :       AssocByLocal::iterator local_it = assoc_by_local_.find(itr->first);
     941           0 :       if (local_it == assoc_by_local_.end()) {
     942           0 :         if (Transport_debug_level) {
     943           0 :           GuidConverter converter(itr->first);
     944           0 :           ACE_DEBUG((LM_DEBUG,
     945             :                      ACE_TEXT("(%P|%t) DataLink::notify: ")
     946             :                      ACE_TEXT("try to notify sub %C %C - no associations to notify.\n"),
     947             :                      OPENDDS_STRING(converter).c_str(),
     948             :                      connection_notice_as_str(notice)));
     949           0 :         }
     950           0 :         break;
     951             :       }
     952           0 :       const RepoIdSet& rids = local_it->second.associated_;
     953             : 
     954           0 :       WriterIdSeq pubids;
     955           0 :       set_to_seq(rids, pubids);
     956             : 
     957           0 :       switch (notice) {
     958           0 :       case DISCONNECTED:
     959           0 :         trl->notify_subscription_disconnected(pubids);
     960           0 :         break;
     961             : 
     962           0 :       case RECONNECTED:
     963           0 :         trl->notify_subscription_reconnected(pubids);
     964           0 :         break;
     965             : 
     966           0 :       case LOST:
     967           0 :         trl->notify_subscription_lost(pubids);
     968           0 :         break;
     969             : 
     970           0 :       default:
     971           0 :         ACE_ERROR((LM_ERROR,
     972             :                    ACE_TEXT("(%P|%t) ERROR: DataLink::notify: ")
     973             :                    ACE_TEXT("unknown notice to datareader.\n")));
     974           0 :         break;
     975             :       }
     976             : 
     977           0 :     } else {
     978           0 :       if (Transport_debug_level > 0) {
     979           0 :         GuidConverter converter(itr->first);
     980           0 :         ACE_DEBUG((LM_DEBUG,
     981             :                    ACE_TEXT("(%P|%t) DataLink::notify: ")
     982             :                    ACE_TEXT("not notify sub %C subscription lost.\n"),
     983             :                    OPENDDS_STRING(converter).c_str()));
     984           0 :       }
     985             : 
     986             :     }
     987           0 :   }
     988           0 : }
     989             : 
     990             : 
     991             : 
     992             : void
     993           0 : DataLink::pre_stop_i()
     994             : {
     995           0 :   if (this->thr_per_con_send_task_ != 0) {
     996           0 :     this->thr_per_con_send_task_->close(1);
     997             :   }
     998           0 : }
     999             : 
    1000             : void
    1001           0 : DataLink::release_resources()
    1002             : {
    1003             :   DBG_ENTRY_LVL("DataLink", "release_resources", 6);
    1004             : 
    1005           0 :   this->prepare_release();
    1006           0 :   TransportImpl_rch impl = impl_.lock();
    1007           0 :   if (impl) {
    1008           0 :     impl->release_link_resources(this);
    1009             :   }
    1010           0 : }
    1011             : 
    1012             : bool
    1013           0 : DataLink::is_target(const GUID_t& remote_id)
    1014             : {
    1015           0 :   GuardType guard(this->pub_sub_maps_lock_);
    1016           0 :   return assoc_by_remote_.count(remote_id);
    1017           0 : }
    1018             : 
    1019             : GUIDSeq*
    1020           0 : DataLink::target_intersection(const GUID_t& pub_id, const GUIDSeq& in,
    1021             :                               size_t& n_subs)
    1022             : {
    1023           0 :   GUIDSeq_var res;
    1024           0 :   GuardType guard(this->pub_sub_maps_lock_);
    1025           0 :   AssocByLocal::const_iterator iter = assoc_by_local_.find(pub_id);
    1026             : 
    1027           0 :   if (iter != assoc_by_local_.end()) {
    1028           0 :     n_subs = iter->second.associated_.size();
    1029           0 :     const CORBA::ULong len = in.length();
    1030             : 
    1031           0 :     for (CORBA::ULong i(0); i < len; ++i) {
    1032           0 :       if (iter->second.associated_.count(in[i])) {
    1033           0 :         if (res.ptr() == 0) {
    1034           0 :           res = new GUIDSeq;
    1035             :         }
    1036             : 
    1037           0 :         push_back(res.inout(), in[i]);
    1038             :       }
    1039             :     }
    1040             :   }
    1041             : 
    1042           0 :   return res._retn();
    1043           0 : }
    1044             : 
    1045           0 : void DataLink::prepare_release()
    1046             : {
    1047           0 :   GuardType guard(this->pub_sub_maps_lock_);
    1048             : 
    1049           0 :   if (!assoc_releasing_.empty()) {
    1050           0 :     ACE_ERROR((LM_ERROR,
    1051             :                ACE_TEXT("(%P|%t) DataLink::prepare_release: ")
    1052             :                ACE_TEXT("already prepared for release.\n")));
    1053           0 :     return;
    1054             :   }
    1055             : 
    1056           0 :   assoc_releasing_ = assoc_by_local_;
    1057           0 : }
    1058             : 
    1059           0 : void DataLink::clear_associations()
    1060             : {
    1061           0 :   for (AssocByLocal::iterator iter = assoc_releasing_.begin();
    1062           0 :        iter != assoc_releasing_.end(); ++iter) {
    1063           0 :     TransportSendListener_rch tsl = send_listener_for(iter->first);
    1064           0 :     if (tsl) {
    1065           0 :       ReaderIdSeq sub_ids;
    1066           0 :       set_to_seq(iter->second.associated_, sub_ids);
    1067           0 :       tsl->remove_associations(sub_ids, false);
    1068           0 :       continue;
    1069           0 :     }
    1070           0 :     TransportReceiveListener_rch trl = recv_listener_for(iter->first);
    1071           0 :     if (trl) {
    1072           0 :       WriterIdSeq pub_ids;
    1073           0 :       set_to_seq(iter->second.associated_, pub_ids);
    1074           0 :       trl->remove_associations(pub_ids, false);
    1075           0 :     }
    1076           0 :   }
    1077           0 :   assoc_releasing_.clear();
    1078           0 : }
    1079             : 
    1080             : int
    1081           0 : DataLink::handle_timeout(const ACE_Time_Value& /*tv*/, const void* /*arg*/)
    1082             : {
    1083           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    1084             : 
    1085           0 :   if (!scheduled_to_stop_at_.is_zero()) {
    1086           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DataLink::handle_timeout called\n"), 4);
    1087             :     {
    1088           0 :       TransportImpl_rch impl = impl_.lock();
    1089           0 :       if (impl) {
    1090           0 :         impl->unbind_link(this);
    1091             :       }
    1092           0 :     }
    1093           0 :     if (assoc_by_remote_.empty() && assoc_by_local_.empty()) {
    1094           0 :       this->stop();
    1095             :     }
    1096             :   }
    1097           0 :   return 0;
    1098           0 : }
    1099             : 
    1100             : int
    1101           0 : DataLink::handle_close(ACE_HANDLE h, ACE_Reactor_Mask m)
    1102             : {
    1103           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
    1104             : 
    1105           0 :   if (h == ACE_INVALID_HANDLE && m == TIMER_MASK) {
    1106             :     // Reactor is shutting down with this timer still pending.
    1107             :     // Take the same cleanup actions as if the timeout had expired.
    1108           0 :     handle_timeout(ACE_Time_Value::zero, 0);
    1109             :   }
    1110             : 
    1111           0 :   return 0;
    1112           0 : }
    1113             : 
    1114             : void
    1115           0 : DataLink::set_dscp_codepoint(int cp, ACE_SOCK& socket)
    1116             : {
    1117             :   /**
    1118             :    * The following IPV6 code was lifted in spirit from the RTCORBA
    1119             :    * implementation of setting the DiffServ codepoint.
    1120             :    */
    1121           0 :   int result = 0;
    1122             : 
    1123             :   // Shift the code point up to bits, so that we only use the DS field
    1124           0 :   int tos = cp << 2;
    1125             : 
    1126           0 :   const char* which = "IPV4 TOS";
    1127             : #if defined (ACE_HAS_IPV6)
    1128             :   ACE_INET_Addr local_address;
    1129             : 
    1130             :   if (socket.get_local_addr(local_address) == -1) {
    1131             :     return;
    1132             : 
    1133             :   } else if (local_address.get_type() == AF_INET6)
    1134             : #if !defined (IPV6_TCLASS)
    1135             :   {
    1136             :     if (DCPS_debug_level > 0) {
    1137             :       ACE_ERROR((LM_ERROR,
    1138             :                  ACE_TEXT("(%P|%t) ERROR: DataLink::set_dscp_codepoint() - ")
    1139             :                  ACE_TEXT("IPV6 TCLASS not supported yet, not setting codepoint %d.\n"),
    1140             :                  cp));
    1141             :     }
    1142             : 
    1143             :     return;
    1144             :   }
    1145             : 
    1146             : #else /* IPV6_TCLASS */
    1147             :   {
    1148             :     which = "IPV6 TCLASS";
    1149             :     result = socket.set_option(
    1150             :                IPPROTO_IPV6,
    1151             :                IPV6_TCLASS,
    1152             :                &tos,
    1153             :                sizeof(tos));
    1154             : 
    1155             :   } else // This is a bit tricky and might be hard to follow...
    1156             : 
    1157             : #endif /* IPV6_TCLASS */
    1158             : #endif /* ACE_HAS_IPV6 */
    1159             : 
    1160             : #ifdef IP_TOS
    1161           0 :   result = socket.set_option(
    1162             :              IPPROTO_IP,
    1163             :              IP_TOS,
    1164             :              &tos,
    1165             :              sizeof(tos));
    1166             : 
    1167           0 :   if ((result == -1) && (errno != ENOTSUP)
    1168             : #ifdef WSAEINVAL
    1169             :       && (errno != WSAEINVAL)
    1170             : #endif /* WSAINVAL */
    1171             :      ) {
    1172             : #endif /* IP_TOS */
    1173           0 :     ACE_DEBUG((LM_DEBUG,
    1174             :                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
    1175             :                ACE_TEXT("failed to set the %C codepoint to %d: %m, ")
    1176             :                ACE_TEXT("try running as superuser.\n"),
    1177             :                which,
    1178             :                cp));
    1179             : #ifdef IP_TOS
    1180           0 :   } else if (DCPS_debug_level > 4) {
    1181           0 :     ACE_DEBUG((LM_DEBUG,
    1182             :                ACE_TEXT("(%P|%t) DataLink::set_dscp_codepoint() - ")
    1183             :                ACE_TEXT("set %C codepoint to %d.\n"),
    1184             :                which,
    1185             :                cp));
    1186             :   }
    1187             : #endif /* IP_TOS */
    1188           0 : }
    1189             : 
    1190             : bool
    1191           0 : DataLink::handle_send_request_ack(TransportQueueElement* element)
    1192             : {
    1193           0 :   element->data_delivered();
    1194           0 :   return true;
    1195             : }
    1196             : 
    1197             : bool
    1198           0 : DataLink::Interceptor::reactor_is_shut_down() const {
    1199           0 :   return false;
    1200             : }
    1201             : 
    1202             : void
    1203           0 : DataLink::ImmediateStart::execute() {
    1204           0 :   TransportClient_rch client_lock = client_.lock();
    1205           0 :   if (client_lock) {
    1206           0 :     client_lock->use_datalink(remote_, link_);
    1207             :   }
    1208           0 : }
    1209             : 
    1210             : 
    1211             : void
    1212           0 : DataLink::network_change() const
    1213             : {
    1214           0 :   IdToSendListenerMap send_listeners;
    1215           0 :   IdToRecvListenerMap recv_listeners;
    1216             :   {
    1217           0 :     GuardType guard(pub_sub_maps_lock_);
    1218           0 :     send_listeners = send_listeners_;
    1219           0 :     recv_listeners = recv_listeners_;
    1220           0 :   }
    1221           0 :   for (IdToSendListenerMap::const_iterator itr = send_listeners.begin();
    1222           0 :        itr != send_listeners.end(); ++itr) {
    1223           0 :     TransportSendListener_rch tsl = itr->second.lock();
    1224           0 :     if (tsl) {
    1225           0 :       tsl->transport_discovery_change();
    1226             :     }
    1227           0 :   }
    1228             : 
    1229           0 :   for (IdToRecvListenerMap::const_iterator itr = recv_listeners.begin();
    1230           0 :        itr != recv_listeners.end(); ++itr) {
    1231           0 :     TransportReceiveListener_rch trl = itr->second.lock();
    1232           0 :     if (trl) {
    1233           0 :       trl->transport_discovery_change();
    1234             :     }
    1235           0 :   }
    1236           0 : }
    1237             : 
    1238             : void
    1239           0 : DataLink::replay_durable_data(const GUID_t& local_pub_id, const GUID_t& remote_sub_id) const
    1240             : {
    1241           0 :   GuidConverter local(local_pub_id);
    1242           0 :   GuidConverter remote(remote_sub_id);
    1243           0 :   TransportSendListener_rch send_listener = send_listener_for(local_pub_id);
    1244           0 :   if (send_listener) {
    1245           0 :     send_listener->replay_durable_data_for(remote_sub_id);
    1246             :   }
    1247           0 : }
    1248             : 
    1249             : #ifndef OPENDDS_SAFETY_PROFILE
    1250             : std::ostream&
    1251           0 : operator<<(std::ostream& str, const DataLink& value)
    1252             : {
    1253           0 :   str << "   There are " << value.assoc_by_local_.size()
    1254           0 :       << " local entities currently using this link";
    1255             : 
    1256           0 :   if (!value.assoc_by_local_.empty()) {
    1257           0 :     str << " comprising following associations:";
    1258             :   }
    1259           0 :   str << std::endl;
    1260             : 
    1261             :   typedef DataLink::AssocByLocal::const_iterator assoc_iter_t;
    1262           0 :   const DataLink::AssocByLocal& abl = value.assoc_by_local_;
    1263           0 :   for (assoc_iter_t ait = abl.begin(); ait != abl.end(); ++ait) {
    1264           0 :     const RepoIdSet& set = ait->second.associated_;
    1265           0 :     for (RepoIdSet::const_iterator rit = set.begin(); rit != set.end(); ++rit) {
    1266           0 :       str << GuidConverter(ait->first) << " --> "
    1267           0 :           << GuidConverter(*rit) << "   " << std::endl;
    1268             :     }
    1269             :   }
    1270           0 :   return str;
    1271             : }
    1272             : #endif
    1273             : 
    1274             : void
    1275           0 : DataLink::terminate_send_if_suspended()
    1276             : {
    1277           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
    1278             : 
    1279           0 :   if (strategy) {
    1280           0 :     strategy->terminate_send_if_suspended();
    1281             :   }
    1282           0 : }
    1283             : 
    1284             : }
    1285             : }
    1286             : 
    1287             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16