LCOV - code coverage report
Current view: top level - DCPS - InstanceState.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 218 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 27 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : #include "ace/Event_Handler.h"
      10             : #include "ace/Reactor.h"
      11             : #include "InstanceState.h"
      12             : #include "DataReaderImpl.h"
      13             : #include "SubscriptionInstance.h"
      14             : #include "ReceivedDataElementList.h"
      15             : #include "Time_Helper.h"
      16             : #include "DomainParticipantImpl.h"
      17             : #include "GuidConverter.h"
      18             : 
      19             : #if !defined (__ACE_INLINE__)
      20             : # include "InstanceState.inl"
      21             : #endif /* !__ACE_INLINE__ */
      22             : 
      23             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      24             : 
      25             : namespace OpenDDS {
      26             : namespace DCPS {
      27             : 
      28           0 : InstanceState::InstanceState(const DataReaderImpl_rch& reader,
      29             :                              ACE_Recursive_Thread_Mutex& lock,
      30           0 :                              DDS::InstanceHandle_t handle)
      31             :   : ReactorInterceptor(TheServiceParticipant->reactor(),
      32             :                        TheServiceParticipant->reactor_owner()),
      33           0 :     lock_(lock),
      34           0 :     instance_state_(0),
      35           0 :     view_state_(0),
      36           0 :     disposed_generation_count_(0),
      37           0 :     no_writers_generation_count_(0),
      38           0 :     empty_(true),
      39           0 :     release_pending_(false),
      40           0 :     release_timer_id_(-1),
      41           0 :     reader_(reader),
      42           0 :     handle_(handle),
      43           0 :     owner_(GUID_UNKNOWN),
      44             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
      45           0 :     exclusive_(reader->qos_.ownership.kind == DDS::EXCLUSIVE_OWNERSHIP_QOS),
      46             : #endif
      47           0 :     registered_(false)
      48           0 : {}
      49             : 
      50           0 : InstanceState::~InstanceState()
      51             : {
      52             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
      53           0 :   if (registered_) {
      54           0 :     RcHandle<DataReaderImpl> reader = reader_.lock();
      55           0 :     if (reader) {
      56           0 :       DataReaderImpl::OwnershipManagerPtr om = reader->ownership_manager();
      57           0 :       if (om) om->remove_instance(this);
      58           0 :     }
      59           0 :   }
      60             : #endif
      61           0 : }
      62             : 
      63           0 : void InstanceState::sample_info(DDS::SampleInfo& si, const ReceivedDataElement* de)
      64             : {
      65           0 :   si.sample_state = de->sample_state_;
      66           0 :   si.view_state = view_state_;
      67           0 :   si.instance_state = instance_state_;
      68           0 :   si.disposed_generation_count =
      69           0 :     static_cast<CORBA::Long>(disposed_generation_count_);
      70           0 :   si.no_writers_generation_count =
      71           0 :     static_cast<CORBA::Long>(no_writers_generation_count_);
      72           0 :   si.source_timestamp = de->source_timestamp_;
      73           0 :   si.instance_handle = handle_;
      74           0 :   RcHandle<DataReaderImpl> reader = reader_.lock();
      75           0 :   if (reader) {
      76           0 :     RcHandle<DomainParticipantImpl> participant = reader->participant_servant_.lock();
      77           0 :     si.publication_handle = participant ? participant->lookup_handle(de->pub_) : DDS::HANDLE_NIL;
      78           0 :   } else {
      79           0 :     si.publication_handle = DDS::HANDLE_NIL;
      80             :   }
      81           0 :   si.valid_data = de->valid_data_;
      82             :   /*
      83             :    * These are actually calculated later...
      84             :    */
      85           0 :   si.sample_rank = 0;
      86             : 
      87             :   // these aren't the real value, they're being saved
      88             :   // for a later calculation. the actual value is
      89             :   // calculated in DataReaderImpl::sample_info using
      90             :   // these values.
      91           0 :   si.generation_rank =
      92           0 :     static_cast<CORBA::Long>(de->disposed_generation_count_ +
      93           0 :                              de->no_writers_generation_count_);
      94           0 :   si.absolute_generation_rank =
      95           0 :     static_cast<CORBA::Long>(de->disposed_generation_count_ +
      96           0 :                              de->no_writers_generation_count_);
      97             : 
      98           0 :   si.opendds_reserved_publication_seq = de->sequence_.getValue();
      99           0 : }
     100             : 
     101             : // cannot ACE_INLINE because of #include loop
     102             : 
     103           0 : int InstanceState::handle_timeout(const ACE_Time_Value&, const void*)
     104             : {
     105           0 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
     106             : 
     107           0 :   if (DCPS_debug_level) {
     108           0 :     ACE_DEBUG((LM_NOTICE,
     109             :                ACE_TEXT("(%P|%t) NOTICE:")
     110             :                ACE_TEXT(" InstanceState::handle_timeout:")
     111             :                ACE_TEXT(" autopurging samples with instance handle 0x%x!\n"),
     112             :                handle_));
     113             :   }
     114           0 :   release();
     115             : 
     116           0 :   return 0;
     117           0 : }
     118             : 
     119           0 : bool InstanceState::dispose_was_received(const GUID_t& writer_id)
     120             : {
     121           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_, false);
     122           0 :   writers_.erase(writer_id);
     123             : 
     124             :   //
     125             :   // Manage the instance state on disposal here.
     126             :   //
     127             :   // If disposed by owner then the owner is not re-elected, it can
     128             :   // resume if the writer sends message again.
     129           0 :   if (instance_state_ & DDS::ALIVE_INSTANCE_STATE) {
     130             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     131           0 :     RcHandle<DataReaderImpl> reader = reader_.lock();
     132           0 :     if (reader) {
     133           0 :       DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager();
     134           0 :       if (! exclusive_
     135           0 :         || (owner_manager && owner_manager->is_owner (handle_, writer_id))) {
     136             : #endif
     137           0 :         instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
     138           0 :         state_updated();
     139           0 :         schedule_release();
     140           0 :         return true;
     141             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     142             :       }
     143           0 :     }
     144             : #endif
     145           0 :   }
     146             : 
     147           0 :   return false;
     148           0 : }
     149             : 
     150           0 : bool InstanceState::unregister_was_received(const GUID_t& writer_id)
     151             : {
     152           0 :   if (DCPS_debug_level > 1) {
     153           0 :     ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) InstanceState::unregister_was_received on %C\n"),
     154             :       LogGuid(writer_id).c_str()
     155             :     ));
     156             :   }
     157             : 
     158           0 :   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, lock_, false);
     159           0 :   writers_.erase(writer_id);
     160             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     161           0 :   if (exclusive_) {
     162             :     // If unregistered by owner then the ownership should be transferred to another
     163             :     // writer.
     164           0 :     RcHandle<DataReaderImpl> reader = reader_.lock();
     165           0 :     if (reader) {
     166           0 :       DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager();
     167           0 :       if (owner_manager)
     168           0 :         owner_manager->remove_writer (handle_, writer_id);
     169           0 :     }
     170           0 :   }
     171             : #endif
     172             : 
     173           0 :   if (writers_.empty() && (instance_state_ & DDS::ALIVE_INSTANCE_STATE)) {
     174           0 :     instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
     175           0 :     state_updated();
     176           0 :     schedule_release();
     177           0 :     return true;
     178             :   }
     179             : 
     180           0 :   return false;
     181           0 : }
     182             : 
     183           0 : void InstanceState::schedule_pending()
     184             : {
     185           0 :   release_pending_ = true;
     186           0 : }
     187             : 
     188           0 : void OpenDDS::DCPS::InstanceState::state_updated() const
     189             : {
     190           0 :   RcHandle<DataReaderImpl> reader = reader_.lock();
     191           0 :   if (reader) {
     192           0 :     reader->state_updated(handle_);
     193             :   }
     194           0 : }
     195             : 
     196           0 : void InstanceState::schedule_release()
     197             : {
     198           0 :   DDS::DataReaderQos qos;
     199           0 :   RcHandle<DataReaderImpl> reader = reader_.lock();
     200           0 :   if (reader) {
     201           0 :     qos = reader->qos_;
     202             :   } else {
     203           0 :     cancel_release();
     204           0 :     return;
     205             :   }
     206             : 
     207             :   DDS::Duration_t delay;
     208             : 
     209           0 :   switch (instance_state_) {
     210           0 :   case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
     211           0 :     delay = qos.reader_data_lifecycle.autopurge_nowriter_samples_delay;
     212           0 :     break;
     213             : 
     214           0 :   case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
     215           0 :     delay = qos.reader_data_lifecycle.autopurge_disposed_samples_delay;
     216           0 :     break;
     217             : 
     218           0 :   default:
     219           0 :     ACE_ERROR((LM_ERROR,
     220             :                ACE_TEXT("(%P|%t) ERROR: InstanceState::schedule_release:")
     221             :                ACE_TEXT(" Unsupported instance state: %d!\n"),
     222             :                instance_state_));
     223           0 :     return;
     224             :   }
     225             : 
     226           0 :   if (delay.sec != DDS::DURATION_INFINITE_SEC &&
     227           0 :       delay.nanosec != DDS::DURATION_INFINITE_NSEC) {
     228             : 
     229           0 :     execute_or_enqueue(make_rch<ScheduleCommand>(this, TimeDuration(delay)));
     230             : 
     231             :   } else {
     232             :     // N.B. instance transitions are always followed by a non-valid
     233             :     // sample being queued to the ReceivedDataElementList; marking
     234             :     // the release as pending prevents this sample from being lost
     235             :     // if all samples have been already removed from the instance.
     236           0 :     schedule_pending();
     237             :   }
     238           0 : }
     239             : 
     240           0 : void InstanceState::cancel_release()
     241             : {
     242           0 :   release_pending_ = false;
     243           0 :   execute_or_enqueue(make_rch<CancelCommand>(this));
     244           0 : }
     245             : 
     246           0 : bool InstanceState::release_if_empty()
     247             : {
     248           0 :   bool released = false;
     249           0 :   if (empty_ && writers_.empty()) {
     250           0 :     release();
     251           0 :     released = true;
     252             :   } else {
     253           0 :     schedule_pending();
     254             :   }
     255           0 :   return released;
     256             : }
     257             : 
     258           0 : void InstanceState::release()
     259             : {
     260           0 :   RcHandle<DataReaderImpl> reader = reader_.lock();
     261           0 :   if (reader) {
     262           0 :     reader->release_instance(handle_);
     263             :   }
     264           0 : }
     265             : 
     266           0 : void InstanceState::set_owner(const GUID_t& owner)
     267             : {
     268           0 :   ACE_Guard<ACE_Thread_Mutex> guard(owner_lock_);
     269           0 :   owner_ = owner;
     270           0 : }
     271             : 
     272           0 : GUID_t InstanceState::get_owner()
     273             : {
     274           0 :   ACE_Guard<ACE_Thread_Mutex> guard(owner_lock_);
     275           0 :   return owner_;
     276           0 : }
     277             : 
     278           0 : bool InstanceState::is_exclusive() const
     279             : {
     280           0 :   return exclusive_;
     281             : }
     282             : 
     283           0 : bool InstanceState::registered()
     284             : {
     285           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
     286           0 :   const bool ret = registered_;
     287           0 :   registered_ = true;
     288           0 :   return ret;
     289           0 : }
     290             : 
     291           0 : void InstanceState::registered(bool flag)
     292             : {
     293           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
     294           0 :   registered_ = flag;
     295           0 : }
     296             : 
     297           0 : void InstanceState::reset_ownership(DDS::InstanceHandle_t instance)
     298             : {
     299           0 :   ACE_Guard<ACE_Recursive_Thread_Mutex> guard(lock_);
     300           0 :   set_owner(GUID_UNKNOWN);
     301           0 :   registered_ = false;
     302             : 
     303           0 :   RcHandle<DataReaderImpl> reader = reader_.lock();
     304           0 :   if (reader) {
     305           0 :     reader->reset_ownership(instance);
     306             :   }
     307           0 : }
     308             : 
     309           0 : bool InstanceState::most_recent_generation(ReceivedDataElement* item) const
     310             : {
     311           0 :   return item->disposed_generation_count_ == disposed_generation_count_
     312           0 :     && item->no_writers_generation_count_ == no_writers_generation_count_;
     313             : }
     314             : 
     315           0 : bool InstanceState::reactor_is_shut_down() const
     316             : {
     317           0 :   return TheServiceParticipant->is_shut_down();
     318             : }
     319             : 
     320           0 : void InstanceState::CancelCommand::execute()
     321             : {
     322           0 :   if (instance_state_->release_timer_id_ != -1) {
     323           0 :     instance_state_->reactor()->cancel_timer(instance_state_);
     324           0 :     instance_state_->release_timer_id_ = -1;
     325             :   }
     326           0 : }
     327             : 
     328           0 : void InstanceState::ScheduleCommand::execute()
     329             : {
     330           0 :   if (instance_state_->release_timer_id_ != -1) {
     331           0 :     instance_state_->reactor()->cancel_timer(instance_state_);
     332             :   }
     333             : 
     334           0 :   instance_state_->release_timer_id_ =
     335           0 :     instance_state_->reactor()->schedule_timer(instance_state_, 0, delay_.value());
     336             : 
     337           0 :   if (instance_state_->release_timer_id_ == -1) {
     338           0 :     ACE_ERROR((LM_ERROR,
     339             :                ACE_TEXT("(%P|%t) ERROR: InstanceState::ScheduleCommand::execute:")
     340             :                ACE_TEXT(" Unable to schedule timer!\n")));
     341             :   }
     342           0 : }
     343             : 
     344           0 : const char* InstanceState::instance_state_string(DDS::InstanceStateKind value)
     345             : {
     346           0 :   switch (value) {
     347           0 :   case DDS::ALIVE_INSTANCE_STATE:
     348           0 :     return "ALIVE_INSTANCE_STATE";
     349           0 :   case DDS::NOT_ALIVE_INSTANCE_STATE:
     350           0 :     return "NOT_ALIVE_INSTANCE_STATE";
     351           0 :   case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
     352           0 :     return "NOT_ALIVE_DISPOSED_INSTANCE_STATE";
     353           0 :   case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
     354           0 :     return "NOT_ALIVE_NO_WRITERS_INSTANCE_STATE";
     355           0 :   case DDS::ANY_INSTANCE_STATE:
     356           0 :     return "ANY_INSTANCE_STATE";
     357           0 :   default:
     358           0 :     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: InstanceState::instance_state_string: ")
     359             :       ACE_TEXT("%d is either invalid or not recognized.\n"),
     360             :       value));
     361             : 
     362           0 :     return "Invalid instance state";
     363             :   }
     364             : }
     365             : 
     366           0 : OPENDDS_STRING InstanceState::instance_state_mask_string(DDS::InstanceStateMask mask)
     367             : {
     368           0 :   if (mask == DDS::ANY_INSTANCE_STATE) {
     369           0 :     return instance_state_string(DDS::ANY_INSTANCE_STATE);
     370             :   }
     371           0 :   if (mask == DDS::NOT_ALIVE_INSTANCE_STATE) {
     372           0 :     return instance_state_string(DDS::NOT_ALIVE_INSTANCE_STATE);
     373             :   }
     374           0 :   OPENDDS_STRING str;
     375           0 :   if (mask & DDS::ALIVE_INSTANCE_STATE) {
     376           0 :     str = instance_state_string(DDS::ALIVE_INSTANCE_STATE);
     377             :   }
     378           0 :   if (mask & DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) {
     379           0 :     if (!str.empty()) str += " | ";
     380           0 :     str += instance_state_string(DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
     381             :   }
     382           0 :   if (mask & DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE) {
     383           0 :     if (!str.empty()) str += " | ";
     384           0 :     str += instance_state_string(DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE);
     385             :   }
     386           0 :   return str;
     387           0 : }
     388             : 
     389             : }
     390             : }
     391             : 
     392             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16