LCOV - code coverage report
Current view: top level - DCPS/transport/framework - DataLink.inl (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 96 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 17 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "TransportSendStrategy.h"
       9             : #include "TransportStrategy.h"
      10             : #include "ThreadPerConnectionSendTask.h"
      11             : #include "EntryExit.h"
      12             : #include "dds/DCPS/GuidConverter.h"
      13             : 
      14             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      15             : 
      16             : namespace OpenDDS {
      17             : namespace DCPS {
      18             : 
      19             : ACE_INLINE
      20             : Priority&
      21           0 : DataLink::transport_priority()
      22             : {
      23           0 :   return this->transport_priority_;
      24             : }
      25             : 
      26             : ACE_INLINE
      27             : Priority
      28             : DataLink::transport_priority() const
      29             : {
      30             :   return this->transport_priority_;
      31             : }
      32             : 
      33             : 
      34             : ACE_INLINE
      35             : bool& DataLink::is_loopback()
      36             : {
      37             :   return this->is_loopback_;
      38             : }
      39             : 
      40             : 
      41             : ACE_INLINE
      42             : bool  DataLink::is_loopback() const
      43             : {
      44             :   return this->is_loopback_;
      45             : }
      46             : 
      47             : 
      48             : ACE_INLINE
      49             : bool& DataLink::is_active()
      50             : {
      51             :   return this->is_active_;
      52             : }
      53             : 
      54             : 
      55             : ACE_INLINE
      56             : bool  DataLink::is_active() const
      57             : {
      58             :   return this->is_active_;
      59             : }
      60             : 
      61             : ACE_INLINE const TimeDuration&
      62             : DataLink::datalink_release_delay() const
      63             : {
      64             :   return this->datalink_release_delay_;
      65             : }
      66             : 
      67             : ACE_INLINE void
      68           0 : DataLink::send_start()
      69             : {
      70             :   DBG_ENTRY_LVL("DataLink","send_start",6);
      71             : 
      72           0 :   if (this->thr_per_con_send_task_ != 0) {
      73           0 :     this->thr_per_con_send_task_->add_request(SEND_START);
      74             : 
      75             :   } else
      76           0 :     this->send_start_i();
      77           0 : }
      78             : 
      79             : ACE_INLINE void
      80           0 : DataLink::send_start_i()
      81             : {
      82             :   DBG_ENTRY_LVL("DataLink","send_start_i",6);
      83             :   // This one is easy.  Simply delegate to our TransportSendStrategy
      84             :   // data member.
      85             : 
      86           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
      87             : 
      88           0 :   if (!strategy.is_nil()) {
      89           0 :     strategy->send_start();
      90             :   }
      91           0 : }
      92             : 
      93             : ACE_INLINE void
      94           0 : DataLink::send(TransportQueueElement* element)
      95             : {
      96             :   DBG_ENTRY_LVL("DataLink","send",6);
      97             : 
      98           0 :   if (element->is_request_ack() && handle_send_request_ack(element)) {
      99           0 :     return;
     100             :   }
     101             : 
     102           0 :   element = this->customize_queue_element(element);
     103           0 :   if (!element) {
     104           0 :     return;
     105             :   }
     106             : 
     107           0 :   if (this->thr_per_con_send_task_ != 0) {
     108           0 :     if (this->thr_per_con_send_task_->add_request(SEND, element) == -1) {
     109           0 :       element->data_dropped(true);
     110             :     }
     111             : 
     112             :   } else {
     113           0 :     this->send_i(element);
     114             : 
     115             :   }
     116             : }
     117             : 
     118             : ACE_INLINE void
     119           0 : DataLink::send_i(TransportQueueElement* element, bool relink)
     120             : {
     121             :   DBG_ENTRY_LVL("DataLink","send_i",6);
     122             :   // This one is easy.  Simply delegate to our TransportSendStrategy
     123             :   // data member.
     124             : 
     125           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
     126             : 
     127           0 :   if (strategy) {
     128           0 :     strategy->send(element, relink);
     129             :   } else {
     130           0 :     element->data_dropped(true);
     131             :   }
     132           0 : }
     133             : 
     134             : ACE_INLINE void
     135           0 : DataLink::send_stop(GUID_t repoId)
     136             : {
     137             :   DBG_ENTRY_LVL("DataLink","send_stop",6);
     138             : 
     139           0 :   if (this->thr_per_con_send_task_ != 0) {
     140           0 :     this->thr_per_con_send_task_->add_request(SEND_STOP);
     141             : 
     142             :   } else
     143           0 :     this->send_stop_i(repoId);
     144           0 : }
     145             : 
     146             : ACE_INLINE void
     147           0 : DataLink::send_stop_i(GUID_t repoId)
     148             : {
     149             :   DBG_ENTRY_LVL("DataLink","send_stop_i",6);
     150             :   // This one is easy.  Simply delegate to our TransportSendStrategy
     151             :   // data member.
     152             : 
     153           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
     154             : 
     155           0 :   if (!strategy.is_nil()) {
     156           0 :     strategy->send_stop(repoId);
     157             :   }
     158           0 : }
     159             : 
     160             : ACE_INLINE
     161           0 : void  DataLink::set_scheduling_release(bool scheduling_release)
     162             : {
     163           0 :   this->scheduling_release_ = scheduling_release;
     164           0 : }
     165             : 
     166             : ACE_INLINE RemoveResult
     167           0 : DataLink::remove_sample(const DataSampleElement* sample)
     168             : {
     169             :   DBG_ENTRY_LVL("DataLink", "remove_sample", 6);
     170             : 
     171           0 :   if (this->thr_per_con_send_task_ != 0) {
     172           0 :     const RemoveResult rr = this->thr_per_con_send_task_->remove_sample(sample);
     173           0 :     if (rr == REMOVE_RELEASED || rr == REMOVE_FOUND) {
     174           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     175             :             "Removed sample from ThreadPerConnection queue.\n"));
     176           0 :       return rr;
     177             :     }
     178             :   }
     179             : 
     180           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
     181             : 
     182           0 :   if (!strategy.is_nil()) {
     183           0 :     return strategy->remove_sample(sample);
     184             :   }
     185             : 
     186           0 :   return REMOVE_NOT_FOUND;
     187           0 : }
     188             : 
     189             : ACE_INLINE void
     190           0 : DataLink::remove_all_msgs(const GUID_t& pub_id)
     191             : {
     192             :   DBG_ENTRY_LVL("DataLink","remove_all_msgs",6);
     193             : 
     194             :   // This one is easy.  Simply delegate to our TransportSendStrategy
     195             :   // data member.
     196             : 
     197           0 :   TransportSendStrategy_rch strategy = get_send_strategy();
     198             : 
     199           0 :   if (!strategy.is_nil()) {
     200           0 :     strategy->remove_all_msgs(pub_id);
     201             :   }
     202           0 : }
     203             : 
     204             : ACE_INLINE DataLinkIdType
     205           0 : DataLink::id() const
     206             : {
     207             :   DBG_ENTRY_LVL("DataLink","id",6);
     208           0 :   return id_;
     209             : }
     210             : 
     211             : ACE_INLINE int
     212             : DataLink::start(const TransportSendStrategy_rch& send_strategy,
     213             :                 const TransportStrategy_rch& receive_strategy, bool invoke_all)
     214             : {
     215             :   DBG_ENTRY_LVL("DataLink","start",6);
     216             : 
     217             :   // We assume that the send_strategy is not NULL, but the receive_strategy
     218             :   // is allowed to be NULL.
     219             : 
     220             :   // Attempt to start the strategies, and if there is a start() failure,
     221             :   // make sure to stop() any strategy that was already start()'ed.
     222             :   if (send_strategy->start() != 0) {
     223             :     // Failed to start the TransportSendStrategy.
     224             :     invoke_on_start_callbacks(false);
     225             :     return -1;
     226             :   }
     227             : 
     228             :   if ((!receive_strategy.is_nil()) && (receive_strategy->start() != 0)) {
     229             :     // Failed to start the TransportReceiveStrategy.
     230             : 
     231             :     // Remember to stop() the TransportSendStrategy since we did start it,
     232             :     // and now need to "undo" that action.
     233             :     send_strategy->stop();
     234             :     invoke_on_start_callbacks(false);
     235             :     return -1;
     236             :   }
     237             : 
     238             :   // We started both strategy objects.  Save them to data members since
     239             :   // we will now take ownership of them.
     240             :   {
     241             :     GuardType guard(this->strategy_lock_);
     242             : 
     243             :     this->send_strategy_    = send_strategy;
     244             :     this->receive_strategy_ = receive_strategy;
     245             :   }
     246             :   if (invoke_all) {
     247             :     invoke_on_start_callbacks(true);
     248             :   }
     249             :   {
     250             :     //catch any associations added during initial invoke_on_start_callbacks
     251             :     //only after first use_datalink has resolved does datalink's state truly
     252             :     //change to started, thus can't let pending associations proceed normally yet
     253             :     GuardType guard(this->strategy_lock_);
     254             :     this->started_ = true;
     255             :   }
     256             :   //Now state transitioned to started so no new on_start_callbacks will be added
     257             :   //so resolve any added during transition to started.
     258             :   if (invoke_all) {
     259             :     invoke_on_start_callbacks(true);
     260             :   }
     261             :   return 0;
     262             : }
     263             : 
     264             : 
     265             : ACE_INLINE
     266             : const char*
     267           0 : DataLink::connection_notice_as_str(ConnectionNotice notice)
     268             : {
     269             :   static const char* NoticeStr[] = { "DISCONNECTED",
     270             :                                      "RECONNECTED",
     271             :                                      "LOST"
     272             :                                    };
     273             : 
     274           0 :   return NoticeStr [notice];
     275             : }
     276             : 
     277             : ACE_INLINE
     278             : void
     279             : DataLink::terminate_send()
     280             : {
     281             :   TransportSendStrategy_rch strategy = get_send_strategy();
     282             :   if (strategy) {
     283             :     strategy->terminate_send(false);
     284             :   }
     285             : }
     286             : 
     287             : ACE_INLINE
     288             : void
     289           0 : DataLink::remove_listener(const GUID_t& local_id)
     290             : {
     291           0 :   GuardType guard(pub_sub_maps_lock_);
     292             :   {
     293           0 :     IdToSendListenerMap::iterator pos = send_listeners_.find(local_id);
     294           0 :     if (pos != send_listeners_.end()) {
     295           0 :       send_listeners_.erase(pos);
     296           0 :       if (Transport_debug_level > 5) {
     297           0 :         LogGuid logger(local_id);
     298           0 :         ACE_DEBUG((LM_DEBUG,
     299             :                    ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
     300             :                    ACE_TEXT("removed %C from send_listeners\n"),
     301             :                    logger.c_str()));
     302           0 :       }
     303           0 :       return;
     304             :     }
     305             :   }
     306             :   {
     307           0 :     IdToRecvListenerMap::iterator pos = recv_listeners_.find(local_id);
     308           0 :     if (pos != recv_listeners_.end()) {
     309           0 :       recv_listeners_.erase(pos);
     310           0 :       if (Transport_debug_level > 5) {
     311           0 :         LogGuid logger(local_id);
     312           0 :         ACE_DEBUG((LM_DEBUG,
     313             :                    ACE_TEXT("(%P|%t) DataLink::remove_listener: ")
     314             :                    ACE_TEXT("removed %C from recv_listeners\n"),
     315             :                    logger.c_str()));
     316           0 :       }
     317           0 :       return;
     318             :     }
     319             :   }
     320           0 : }
     321             : 
     322             : ACE_INLINE
     323             : TransportSendListener_rch
     324           0 : DataLink::send_listener_for(const GUID_t& pub_id) const
     325             : {
     326             :   // pub_map_ (and send_listeners_) are already locked when entering this
     327             :   // private method.
     328             :   IdToSendListenerMap::const_iterator found =
     329           0 :     this->send_listeners_.find(pub_id);
     330           0 :   if (found == this->send_listeners_.end()) {
     331           0 :     return TransportSendListener_rch();
     332             :   }
     333           0 :   return found->second.lock();
     334             : }
     335             : 
     336             : ACE_INLINE
     337             : TransportReceiveListener_rch
     338           0 : DataLink::recv_listener_for(const GUID_t& sub_id) const
     339             : {
     340             :   // sub_map_ (and recv_listeners_) are already locked when entering this
     341             :   // private method.
     342             :   IdToRecvListenerMap::const_iterator found =
     343           0 :     this->recv_listeners_.find(sub_id);
     344           0 :   if (found == this->recv_listeners_.end()) {
     345           0 :     return TransportReceiveListener_rch();
     346             :   }
     347           0 :   return found->second.lock();
     348             : }
     349             : 
     350             : ACE_INLINE
     351             : void
     352             : DataLink::default_listener(const TransportReceiveListener_wrch& trl)
     353             : {
     354             :   GuardType guard(this->pub_sub_maps_lock_);
     355             :   this->default_listener_ = trl;
     356             : }
     357             : 
     358             : ACE_INLINE
     359             : TransportReceiveListener_wrch
     360             : DataLink::default_listener() const
     361             : {
     362             :   GuardType guard(this->pub_sub_maps_lock_);
     363             :   return this->default_listener_;
     364             : }
     365             : 
     366             : ACE_INLINE
     367             : void
     368           0 : DataLink::send_final_acks (const GUID_t& /*readerid*/)
     369             : {
     370           0 : }
     371             : 
     372             : ACE_INLINE
     373             : TransportSendStrategy_rch
     374           0 : DataLink::get_send_strategy()
     375             : {
     376           0 :   GuardType guard(strategy_lock_);
     377           0 :   return send_strategy_;
     378           0 : }
     379             : 
     380             : }
     381             : }
     382             : 
     383             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16