LCOV - code coverage report
Current view: top level - DCPS - BuiltInTopicUtils.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 138 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 20 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : 
      10             : #include "BuiltInTopicUtils.h"
      11             : 
      12             : #include "BuiltInTopicDataReaderImpls.h"
      13             : #include "BitPubListenerImpl.h"
      14             : #include "Logging.h"
      15             : 
      16             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      17             : 
      18             : namespace OpenDDS {
      19             : namespace DCPS {
      20             : 
      21             : const char* const BUILT_IN_PARTICIPANT_TOPIC = "DCPSParticipant";
      22             : const char* const BUILT_IN_PARTICIPANT_TOPIC_TYPE = "PARTICIPANT_BUILT_IN_TOPIC_TYPE";
      23             : 
      24             : const char* const BUILT_IN_TOPIC_TOPIC = "DCPSTopic";
      25             : const char* const BUILT_IN_TOPIC_TOPIC_TYPE = "TOPIC_BUILT_IN_TOPIC_TYPE";
      26             : 
      27             : const char* const BUILT_IN_SUBSCRIPTION_TOPIC = "DCPSSubscription";
      28             : const char* const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE = "SUBSCRIPTION_BUILT_IN_TOPIC_TYPE";
      29             : 
      30             : const char* const BUILT_IN_PUBLICATION_TOPIC = "DCPSPublication";
      31             : const char* const BUILT_IN_PUBLICATION_TOPIC_TYPE = "PUBLICATION_BUILT_IN_TOPIC_TYPE";
      32             : 
      33             : const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC = "OpenDDSParticipantLocation";
      34             : const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE = "PARTICIPANT_LOCATION_BUILT_IN_TOPIC_TYPE";
      35             : 
      36             : const char* const BUILT_IN_CONNECTION_RECORD_TOPIC = "OpenDDSConnectionRecord";
      37             : const char* const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE = "CONNECTION_RECORD_BUILT_IN_TOPIC_TYPE";
      38             : 
      39             : const char* const BUILT_IN_INTERNAL_THREAD_TOPIC = "OpenDDSInternalThread";
      40             : const char* const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE = "INTERNAL_THREAD_BUILT_IN_TOPIC_TYPE";
      41             : 
      42           0 : DDS::InstanceHandle_t BitSubscriber::add_participant(const DDS::ParticipantBuiltinTopicData& part,
      43             :                                                      DDS::ViewStateKind view_state)
      44             : {
      45           0 :   return add_i<ParticipantBuiltinTopicDataDataReaderImpl>(BUILT_IN_PARTICIPANT_TOPIC, part, view_state);
      46             : }
      47             : 
      48           0 : void BitSubscriber::remove_participant(DDS::InstanceHandle_t part_ih,
      49             :                                        DDS::InstanceHandle_t loc_ih)
      50             : {
      51             : #ifndef DDS_HAS_MINIMUM_BIT
      52           0 :   remove_i(BUILT_IN_PARTICIPANT_TOPIC, part_ih);
      53           0 :   remove_i(BUILT_IN_PARTICIPANT_LOCATION_TOPIC, loc_ih);
      54             : #else
      55             :   ACE_UNUSED_ARG(part_ih);
      56             :   ACE_UNUSED_ARG(loc_ih);
      57             : #endif
      58           0 : }
      59             : 
      60           0 : DDS::ReturnCode_t BitSubscriber::get_discovered_participant_data(DDS::ParticipantBuiltinTopicData& participant_data,
      61             :                                                                  DDS::InstanceHandle_t participant_handle)
      62             : {
      63             : #ifndef DDS_HAS_MINIMUM_BIT
      64           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, DDS::RETCODE_NO_DATA);
      65             : 
      66           0 :   if (!bit_subscriber_) {
      67           0 :     return DDS::RETCODE_NO_DATA;
      68             :   }
      69             : 
      70           0 :   DDS::SampleInfoSeq info;
      71           0 :   DDS::ParticipantBuiltinTopicDataSeq data;
      72           0 :   DDS::DataReader_var dr = bit_subscriber_->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
      73             :   DDS::ParticipantBuiltinTopicDataDataReader_var bit_part_dr =
      74           0 :     DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr);
      75             : 
      76           0 :   const DDS::ReturnCode_t ret = bit_part_dr->read_instance(data,
      77             :                                                            info,
      78             :                                                            1,
      79             :                                                            participant_handle,
      80             :                                                            DDS::ANY_SAMPLE_STATE,
      81             :                                                            DDS::ANY_VIEW_STATE,
      82             :                                                            DDS::ANY_INSTANCE_STATE);
      83             : 
      84           0 :   if (ret == DDS::RETCODE_OK) {
      85           0 :     if (info[0].valid_data) {
      86           0 :       participant_data = data[0];
      87             :     } else {
      88           0 :       return DDS::RETCODE_NO_DATA;
      89             :     }
      90             :   }
      91             : 
      92           0 :   return ret;
      93             : #else
      94             :   ACE_UNUSED_ARG(participant_data);
      95             :   ACE_UNUSED_ARG(participant_handle);
      96             :   return DDS::RETCODE_NO_DATA;
      97             : #endif
      98           0 : }
      99             : 
     100           0 : DDS::ReturnCode_t BitSubscriber::get_discovered_topic_data(DDS::TopicBuiltinTopicData& topic_data,
     101             :                                                            DDS::InstanceHandle_t topic_handle)
     102             : {
     103             : #ifndef DDS_HAS_MINIMUM_BIT
     104           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, DDS::RETCODE_NO_DATA);
     105             : 
     106           0 :   if (!bit_subscriber_) {
     107           0 :     return DDS::RETCODE_NO_DATA;
     108             :   }
     109             : 
     110           0 :   DDS::SampleInfoSeq info;
     111           0 :   DDS::TopicBuiltinTopicDataSeq data;
     112           0 :   DDS::DataReader_var dr = bit_subscriber_->lookup_datareader(BUILT_IN_TOPIC_TOPIC);
     113             :   DDS::TopicBuiltinTopicDataDataReader_var bit_topic_dr =
     114           0 :     DDS::TopicBuiltinTopicDataDataReader::_narrow(dr);
     115             : 
     116           0 :   const DDS::ReturnCode_t ret = bit_topic_dr->read_instance(data,
     117             :                                                             info,
     118             :                                                             1,
     119             :                                                             topic_handle,
     120             :                                                             DDS::ANY_SAMPLE_STATE,
     121             :                                                             DDS::ANY_VIEW_STATE,
     122             :                                                             DDS::ANY_INSTANCE_STATE);
     123             : 
     124           0 :   if (ret == DDS::RETCODE_OK) {
     125           0 :     if (info[0].valid_data) {
     126           0 :       topic_data = data[0];
     127             :     } else {
     128           0 :       return DDS::RETCODE_NO_DATA;
     129             :     }
     130             :   }
     131             : 
     132           0 :   return ret;
     133             : #else
     134             :   ACE_UNUSED_ARG(topic_data);
     135             :   ACE_UNUSED_ARG(topic_handle);
     136             :   return DDS::RETCODE_NO_DATA;
     137             : #endif
     138           0 : }
     139             : 
     140           0 : DDS::InstanceHandle_t BitSubscriber::add_publication(const DDS::PublicationBuiltinTopicData& pub,
     141             :                                                      DDS::ViewStateKind view_state)
     142             : {
     143           0 :   return add_i<PublicationBuiltinTopicDataDataReaderImpl>(BUILT_IN_PUBLICATION_TOPIC, pub, view_state);
     144             : }
     145             : 
     146           0 : void BitSubscriber::remove_publication(DDS::InstanceHandle_t pub_ih)
     147             : {
     148           0 :   remove_i(BUILT_IN_PUBLICATION_TOPIC, pub_ih);
     149           0 : }
     150             : 
     151           0 : DDS::InstanceHandle_t BitSubscriber::add_subscription(const DDS::SubscriptionBuiltinTopicData& sub,
     152             :                                                       DDS::ViewStateKind view_state)
     153             : {
     154           0 :   return add_i<SubscriptionBuiltinTopicDataDataReaderImpl>(BUILT_IN_SUBSCRIPTION_TOPIC, sub, view_state);
     155             : }
     156             : 
     157           0 : void BitSubscriber::remove_subscription(DDS::InstanceHandle_t sub_ih)
     158             : {
     159           0 :   remove_i(BUILT_IN_SUBSCRIPTION_TOPIC, sub_ih);
     160           0 : }
     161             : 
     162           0 : DDS::InstanceHandle_t BitSubscriber::add_participant_location(const ParticipantLocationBuiltinTopicData& loc,
     163             :                                                               DDS::ViewStateKind view_state)
     164             : {
     165           0 :   return add_i<ParticipantLocationBuiltinTopicDataDataReaderImpl>(BUILT_IN_PARTICIPANT_LOCATION_TOPIC, loc, view_state);
     166             : }
     167             : 
     168           0 : DDS::InstanceHandle_t BitSubscriber::add_connection_record(const ConnectionRecord& cr,
     169             :                                                            DDS::ViewStateKind view_state)
     170             : {
     171           0 :   return add_i<ConnectionRecordDataReaderImpl>(BUILT_IN_CONNECTION_RECORD_TOPIC, cr, view_state);
     172             : }
     173             : 
     174           0 : void BitSubscriber::remove_connection_record(const ConnectionRecord& cr)
     175             : {
     176             : #ifndef DDS_HAS_MINIMUM_BIT
     177           0 :   ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     178             : 
     179           0 :   if (!bit_subscriber_) {
     180           0 :     return;
     181             :   }
     182             : 
     183           0 :   DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_CONNECTION_RECORD_TOPIC);
     184             : 
     185           0 :   if (!d) {
     186           0 :     return;
     187             :   }
     188             : 
     189           0 :   ConnectionRecordDataReaderImpl* bit = dynamic_cast<ConnectionRecordDataReaderImpl*>(d.in());
     190           0 :   if (!bit) {
     191           0 :     return;
     192             :   }
     193             : 
     194           0 :   bit->set_instance_state(bit->lookup_instance(cr), DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
     195             : #else
     196             :   ACE_UNUSED_ARG(cr);
     197             : #endif
     198           0 : }
     199             : 
     200           0 : DDS::InstanceHandle_t BitSubscriber::add_thread_status(const InternalThreadBuiltinTopicData& ts,
     201             :                                                        DDS::ViewStateKind view_state,
     202             :                                                        const SystemTimePoint& timestamp)
     203             : {
     204             : #ifndef DDS_HAS_MINIMUM_BIT
     205           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, DDS::HANDLE_NIL);
     206             : 
     207           0 :   if (!bit_subscriber_) {
     208           0 :     return DDS::HANDLE_NIL;
     209             :   }
     210             : 
     211           0 :   DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_INTERNAL_THREAD_TOPIC);
     212           0 :   if (!d) {
     213           0 :     return DDS::HANDLE_NIL;
     214             :   }
     215             : 
     216           0 :   InternalThreadBuiltinTopicDataDataReaderImpl* bit = dynamic_cast<InternalThreadBuiltinTopicDataDataReaderImpl*>(d.in());
     217           0 :   if (!bit) {
     218           0 :     return DDS::HANDLE_NIL;
     219             :   }
     220             : 
     221           0 :   return bit->store_synthetic_data(ts, view_state, timestamp);
     222             : #else
     223             :   ACE_UNUSED_ARG(ts);
     224             :   ACE_UNUSED_ARG(view_state);
     225             :   ACE_UNUSED_ARG(timestamp);
     226             :   return DDS::HANDLE_NIL;
     227             : #endif /* DDS_HAS_MINIMUM_BIT */
     228           0 : }
     229             : 
     230           0 : void BitSubscriber::remove_thread_status(const InternalThreadBuiltinTopicData& ts)
     231             : {
     232             : #ifndef DDS_HAS_MINIMUM_BIT
     233           0 :   ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     234             : 
     235           0 :   if (!bit_subscriber_) {
     236           0 :     return;
     237             :   }
     238             : 
     239           0 :   DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_INTERNAL_THREAD_TOPIC);
     240             : 
     241           0 :   if (!d) {
     242           0 :     return;
     243             :   }
     244             : 
     245           0 :   InternalThreadBuiltinTopicDataDataReaderImpl* bit = dynamic_cast<InternalThreadBuiltinTopicDataDataReaderImpl*>(d.in());
     246           0 :   if (!bit) {
     247           0 :     return;
     248             :   }
     249             : 
     250           0 :   bit->set_instance_state(bit->lookup_instance(ts), DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
     251             : #else
     252             :   ACE_UNUSED_ARG(ts);
     253             : #endif
     254           0 : }
     255             : 
     256           0 : void BitSubscriber::bit_pub_listener_hack(DomainParticipantImpl* participant)
     257             : {
     258             : #ifndef DDS_HAS_MINIMUM_BIT
     259           0 :   ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     260             : 
     261           0 :   if (!bit_subscriber_) {
     262           0 :     return;
     263             :   }
     264             : 
     265             :   DDS::DataReader_var dr =
     266           0 :     bit_subscriber_->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
     267             :   DDS::PublicationBuiltinTopicDataDataReader_var bit_pub_dr =
     268           0 :     DDS::PublicationBuiltinTopicDataDataReader::_narrow(dr);
     269             : 
     270           0 :   if (bit_pub_dr) {
     271           0 :     DDS::DataReaderListener_var listener = bit_pub_dr->get_listener();
     272           0 :     if (!listener) {
     273             :       DDS::DataReaderListener_var bit_pub_listener =
     274           0 :         new BitPubListenerImpl(participant);
     275           0 :       bit_pub_dr->set_listener(bit_pub_listener, DDS::DATA_AVAILABLE_STATUS);
     276             :       // Must call on_data_available when attaching a listener late - samples may be waiting
     277           0 :       DataReaderImpl* reader = dynamic_cast<DataReaderImpl*>(bit_pub_dr.in());
     278           0 :       if (!reader) {
     279           0 :         return;
     280             :       }
     281           0 :       TheServiceParticipant->job_queue()->enqueue(make_rch<DataReaderImpl::OnDataAvailable>(bit_pub_listener, rchandle_from(reader), true, false, false));
     282           0 :     }
     283           0 :   }
     284             : #else
     285             :   ACE_UNUSED_ARG(participant);
     286             : #endif
     287           0 : }
     288             : 
     289             : template <typename DataReaderImpl, typename Sample>
     290           0 : DDS::InstanceHandle_t BitSubscriber::add_i(const char* topic_name,
     291             :                                            const Sample& sample,
     292             :                                            DDS::ViewStateKind view_state)
     293             : {
     294             : #ifndef DDS_HAS_MINIMUM_BIT
     295           0 :   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, DDS::HANDLE_NIL);
     296             : 
     297           0 :   if (!bit_subscriber_) {
     298           0 :     if (log_bits) {
     299           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ bit_subscriber_ is null for topic %C, returning nil\n", this, topic_name));
     300             :     }
     301           0 :     return DDS::HANDLE_NIL;
     302             :   }
     303             : 
     304           0 :   DDS::DataReader_var d = bit_subscriber_->lookup_datareader(topic_name);
     305           0 :   if (!d) {
     306           0 :     if (log_bits) {
     307           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ DataReader is null for topic %C, returning nil\n", this, topic_name));
     308             :     }
     309           0 :     return DDS::HANDLE_NIL;
     310             :   }
     311             : 
     312           0 :   DataReaderImpl* bit = dynamic_cast<DataReaderImpl*>(d.in());
     313           0 :   if (!bit) {
     314           0 :     if (log_bits) {
     315           0 :       ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ dynamic_cast failed for topic %C, returning nil\n", this, topic_name));
     316             :     }
     317           0 :     return DDS::HANDLE_NIL;
     318             :   }
     319             : 
     320           0 :   const DDS::InstanceHandle_t ih = bit->store_synthetic_data(sample, view_state);
     321           0 :   if (log_bits) {
     322           0 :     ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ returning instance handle %d for topic %C\n", this, ih, topic_name));
     323             :   }
     324           0 :   return ih;
     325             : #else
     326             :   if (log_bits) {
     327             :     ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ DDS_HAS_MINIMUM_BIT is not defined, returning nil\n", this, topic_name));
     328             :   }
     329             :   ACE_UNUSED_ARG(sample);
     330             :   ACE_UNUSED_ARG(view_state);
     331             :   return DDS::HANDLE_NIL;
     332             : #endif /* DDS_HAS_MINIMUM_BIT */
     333           0 : }
     334             : 
     335           0 : void BitSubscriber::remove_i(const char* topic_name,
     336             :                              DDS::InstanceHandle_t ih)
     337             : {
     338             : #ifndef DDS_HAS_MINIMUM_BIT
     339           0 :   if (ih != DDS::HANDLE_NIL) {
     340           0 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     341             : 
     342           0 :     if (!bit_subscriber_) {
     343           0 :       return;
     344             :     }
     345             : 
     346           0 :     DDS::DataReader_var d = bit_subscriber_->lookup_datareader(topic_name);
     347           0 :     if (!d) {
     348           0 :       return;
     349             :     }
     350             : 
     351           0 :     DataReaderImpl* bit = dynamic_cast<DataReaderImpl*>(d.in());
     352           0 :     if (!bit) {
     353           0 :       return;
     354             :     }
     355           0 :     bit->set_instance_state(ih, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
     356           0 :   }
     357             : #else
     358             :   ACE_UNUSED_ARG(topic_name);
     359             :   ACE_UNUSED_ARG(ih);
     360             : #endif
     361             : }
     362             : 
     363             : } // namespace DCPS
     364             : } // namespace OpenDDS
     365             : 
     366             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16