LCOV - code coverage report
Current view: top level - DCPS/transport/framework - DataLinkSet.inl (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 130 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 10 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "EntryExit.h"
       9             : #include "DataLink.h"
      10             : #include "TransportSendElement.h"
      11             : #include "dds/DCPS/DataSampleHeader.h"
      12             : #include "dds/DCPS/Util.h"
      13             : #include "dds/DCPS/Definitions.h"
      14             : #include "dds/DCPS/GuidConverter.h"
      15             : 
      16             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
      17             : #include "dds/DdsDcpsGuidTypeSupportImpl.h"
      18             : #include "TransportCustomizedElement.h"
      19             : #endif
      20             : 
      21             : ACE_INLINE void
      22           0 : OpenDDS::DCPS::DataLinkSet::send(DataSampleElement* sample)
      23             : {
      24             :   DBG_ENTRY_LVL("DataLinkSet", "send", 6);
      25           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send element %@.\n",
      26             :             sample), 5);
      27             : 
      28             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
      29             :   const bool customHeader =
      30           0 :     DataSampleHeader::test_flag(CONTENT_FILTER_FLAG, sample->get_sample());
      31             : #endif
      32             : 
      33           0 :   MapType map_copy;
      34             :   {
      35           0 :     GuardType guard(lock_);
      36           0 :     map_copy = map_;
      37           0 :   }
      38             : 
      39           0 :   if (map_copy.size()) {
      40           0 :     TransportSendElement* send_element = new TransportSendElement(static_cast<int>(map_copy.size()), sample);
      41           0 :     for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
      42             : 
      43             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
      44           0 :       if (customHeader) {
      45             :         typedef std::map<DataLinkIdType, GUIDSeq_var>::iterator FilterIter;
      46           0 :         FilterIter fi = sample->get_filter_per_link().find(itr->first);
      47           0 :         GUIDSeq* guids = 0;
      48           0 :         if (fi != sample->get_filter_per_link().end()) {
      49           0 :           guids = fi->second.ptr();
      50             :         }
      51             : 
      52           0 :         VDBG_LVL((LM_DEBUG,
      53             :                   "(%P|%t) DBG: DataLink %@ filtering %d subscribers.\n",
      54             :                   itr->second.in(), guids ? guids->length() : 0), 5);
      55             : 
      56           0 :         Message_Block_Ptr mb (send_element->msg()->duplicate());
      57             : 
      58           0 :         DataSampleHeader::add_cfentries(guids, mb.get());
      59             : 
      60           0 :         TransportCustomizedElement* tce = new TransportCustomizedElement(send_element);
      61           0 :         tce->set_msg(move(mb)); // tce now owns ACE_Message_Block chain
      62             : 
      63           0 :         itr->second->send(tce);
      64             : 
      65           0 :       } else {
      66             : #endif /* OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE */
      67             : 
      68             :         // Tell the DataLink to send it.
      69           0 :         itr->second->send(send_element);
      70             : 
      71             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
      72             :       }
      73             : #endif
      74             :     }
      75           0 :   } else if (sample->get_send_listener()) {
      76           0 :     sample->get_send_listener()->data_dropped(sample, true);
      77             :   }
      78           0 : }
      79             : 
      80             : ACE_INLINE void
      81           0 : OpenDDS::DCPS::DataLinkSet::send_control(DataSampleElement* sample)
      82             : {
      83             :   DBG_ENTRY_LVL("DataLinkSet", "send_control", 6);
      84           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG: DataLinkSet::send_control %@.\n", sample));
      85           0 :   MapType map_copy;
      86             :   {
      87           0 :     GuardType guard(lock_);
      88           0 :     map_copy = map_;
      89           0 :   }
      90             : 
      91             :   TransportSendControlElement* send_element =
      92           0 :     new TransportSendControlElement(static_cast<int>(map_copy.size()), sample);
      93             : 
      94           0 :   for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
      95           0 :     itr->second->send(send_element);
      96             :   }
      97           0 : }
      98             : 
      99             : ACE_INLINE OpenDDS::DCPS::SendControlStatus
     100           0 : OpenDDS::DCPS::DataLinkSet::send_control(GUID_t                           pub_id,
     101             :                                          const TransportSendListener_rch& listener,
     102             :                                          const DataSampleHeader&          header,
     103             :                                          Message_Block_Ptr                msg)
     104             : {
     105             :   DBG_ENTRY_LVL("DataLinkSet","send_control",6);
     106             :   //Optimized - use cached allocator.
     107             : 
     108           0 :   MapType dup_map;
     109           0 :   copy_map_to(dup_map);
     110             : 
     111           0 :   if (dup_map.empty()) {
     112             :     // similar to the "no links" case in TransportClient::send()
     113           0 :     if (DCPS_debug_level > 4) {
     114           0 :       const LogGuid logger(pub_id);
     115           0 :       ACE_DEBUG((LM_DEBUG,
     116             :                  ACE_TEXT("(%P|%t) DataLinkSet::send_control: ")
     117             :                  ACE_TEXT("no links for publication %C, ")
     118             :                  ACE_TEXT("not sending control message.\n"),
     119             :                  logger.c_str()));
     120           0 :     }
     121           0 :     listener->control_delivered(msg);
     122           0 :     return SEND_CONTROL_OK;
     123             :   }
     124             : 
     125             :   TransportSendControlElement* const send_element =
     126           0 :     new TransportSendControlElement(static_cast<int>(dup_map.size()), pub_id,
     127           0 :                                        listener.in(), header, move(msg));
     128             : 
     129           0 :   for (MapType::iterator itr = dup_map.begin();
     130           0 :        itr != dup_map.end();
     131           0 :        ++itr) {
     132           0 :     itr->second->send_start();
     133           0 :     itr->second->send(send_element);
     134           0 :     itr->second->send_stop(pub_id);
     135             :   }
     136             : 
     137           0 :   return SEND_CONTROL_OK;
     138           0 : }
     139             : 
     140             : ACE_INLINE void
     141           0 : OpenDDS::DCPS::DataLinkSet::send_response(
     142             :   GUID_t pub_id,
     143             :   const DataSampleHeader& header,
     144             :   Message_Block_Ptr response)
     145             : {
     146             :   DBG_ENTRY_LVL("DataLinkSet","send_response",6);
     147           0 :   GuardType guard(this->lock_);
     148             : 
     149             :   TransportSendControlElement* const send_element =
     150           0 :     new TransportSendControlElement(static_cast<int>(map_.size()), pub_id,
     151             :                                        &send_response_listener_, header,
     152           0 :                                        move(response));
     153           0 :   if (!send_element) return;
     154           0 :   send_response_listener_.track_message();
     155             : 
     156           0 :   for (MapType::iterator itr = map_.begin();
     157           0 :        itr != map_.end();
     158           0 :        ++itr) {
     159           0 :     itr->second->send_start();
     160           0 :     itr->second->send(send_element);
     161           0 :     itr->second->send_stop(pub_id);
     162             :   }
     163           0 : }
     164             : 
     165             : ACE_INLINE bool
     166           0 : OpenDDS::DCPS::DataLinkSet::remove_sample(const DataSampleElement* sample)
     167             : {
     168             :   DBG_ENTRY_LVL("DataLinkSet", "remove_sample", 6);
     169           0 :   MapType map_copy;
     170             :   {
     171           0 :     GuardType guard(lock_);
     172           0 :     map_copy = map_;
     173           0 :   }
     174           0 :   const MapType::iterator end = map_copy.end();
     175           0 :   for (MapType::iterator itr = map_copy.begin(); itr != end; ++itr) {
     176           0 :     if (itr->second->remove_sample(sample) == REMOVE_RELEASED) {
     177           0 :       return true;
     178             :     }
     179             :   }
     180             : 
     181           0 :   return false;
     182           0 : }
     183             : 
     184             : ACE_INLINE bool
     185           0 : OpenDDS::DCPS::DataLinkSet::remove_all_msgs(const GUID_t& pub_id)
     186             : {
     187             :   DBG_ENTRY_LVL("DataLinkSet", "remove_all_msgs", 6);
     188           0 :   MapType map_copy;
     189             :   {
     190           0 :     GuardType guard(lock_);
     191           0 :     map_copy = map_;
     192           0 :   }
     193           0 :   const MapType::iterator end = map_copy.end();
     194           0 :   for (MapType::iterator itr = map_copy.begin(); itr != end; ++itr) {
     195           0 :     itr->second->remove_all_msgs(pub_id);
     196             :   }
     197             : 
     198           0 :   return true;
     199           0 : }
     200             : 
     201             : ACE_INLINE void
     202           0 : OpenDDS::DCPS::DataLinkSet::send_start(DataLinkSet* in)
     203             : {
     204             :   DBG_ENTRY_LVL("DataLinkSet","send_start",6);
     205             : 
     206             :   typedef OPENDDS_VECTOR(DataLink_rch) SendStartVec;
     207           0 :   SendStartVec send_start_vec;
     208             : 
     209             :   {
     210           0 :     GuardType guard1(lock_);
     211           0 :     GuardType guard2(in->lock_);
     212           0 :     for (MapType::iterator itr = in->map_.begin(); itr != in->map_.end(); ++itr) {
     213             :       // Attempt to add the current DataLink to this set.
     214           0 :       int result = OpenDDS::DCPS::bind(map_, itr->first, itr->second);
     215             : 
     216           0 :       if (result == 0) {
     217             :         // We successfully added the current DataLink to this set,
     218             :         // meaning that it wasn't already a member.  We should tell
     219             :         // the DataLink about the send_start() event.
     220           0 :         send_start_vec.push_back(itr->second);
     221             : 
     222           0 :       } else if (result == -1) {
     223           0 :         ACE_ERROR((LM_ERROR,
     224             :                    "(%P|%t) ERROR: Failed to bind data link into set.\n"));
     225             :       }
     226             : 
     227             :       // Note that there is a possibility that the result == 1, which
     228             :       // means that the DataLink already exists in our map_->  We skip
     229             :       // all of these cases.
     230             :     }
     231           0 :   }
     232             : 
     233           0 :   for (SendStartVec::iterator it = send_start_vec.begin(); it != send_start_vec.end(); ++it) {
     234           0 :     (*it)->send_start();
     235             :   }
     236           0 : }
     237             : 
     238             : ACE_INLINE void
     239           0 : OpenDDS::DCPS::DataLinkSet::send_stop(GUID_t repoId)
     240             : {
     241             :   DBG_ENTRY_LVL("DataLinkSet","send_stop",6);
     242             :   // Iterate over our map_ and tell each DataLink about the send_stop() event.
     243           0 :   MapType map_copy;
     244             :   {
     245           0 :     GuardType guard(lock_);
     246           0 :     map_copy = map_;
     247           0 :     map_.clear();
     248           0 :   }
     249             : 
     250           0 :   for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
     251           0 :     itr->second->send_stop(repoId);
     252             :   }
     253           0 : }
     254             : 
     255             : ACE_INLINE void
     256           0 : OpenDDS::DCPS::DataLinkSet::copy_map_to(MapType& target)
     257             : {
     258           0 :   target.clear();
     259             : 
     260             :   // Lock the existing map
     261           0 :   GuardType guard(this->lock_);
     262             : 
     263             : 
     264             :   // Copy to target
     265           0 :   for (MapType::iterator itr = map_.begin();
     266           0 :        itr != map_.end();
     267           0 :        ++itr) {
     268           0 :     target.insert(*itr);
     269             :   }
     270           0 : }
     271             : 
     272             : ACE_INLINE void
     273           0 : OpenDDS::DCPS::DataLinkSet::send_final_acks(const GUID_t& readerid)
     274             : {
     275           0 :   MapType map_copy;
     276             :   {
     277           0 :     GuardType guard(lock_);
     278           0 :     map_copy = map_;
     279           0 :     map_.clear();
     280           0 :   }
     281             : 
     282           0 :   for (MapType::iterator itr = map_copy.begin(); itr != map_copy.end(); ++itr) {
     283           0 :     itr->second->send_final_acks(readerid);
     284             :   }
     285           0 : }

Generated by: LCOV version 1.16