LCOV - code coverage report
Current view: top level - DCPS - WriterInfo.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 167 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 26 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       8             : #include "dcps_export.h"
       9             : 
      10             : #include "DataReaderImpl.h"
      11             : #include "GuidConverter.h"
      12             : #include "Service_Participant.h"
      13             : #include "Time_Helper.h"
      14             : #include "WriterInfo.h"
      15             : 
      16             : #include "ace/OS_NS_sys_time.h"
      17             : 
      18             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      19             : 
      20             : namespace OpenDDS {
      21             : namespace DCPS {
      22             : 
      23           0 : WriterInfoListener::WriterInfoListener()
      24           0 :   : subscription_id_(GUID_UNKNOWN)
      25             : {
      26           0 : }
      27             : 
      28           0 : WriterInfoListener::~WriterInfoListener()
      29             : {
      30           0 : }
      31             : 
      32             : /// tell instances when a DataWriter transitions to being alive
      33             : /// The writer state is inout parameter, it has to be set ALIVE before
      34             : /// handle_timeout is called since some subroutine use the state.
      35             : void
      36           0 : WriterInfoListener::writer_became_alive(WriterInfo&,
      37             :                                         const MonotonicTimePoint&)
      38             : {
      39           0 : }
      40             : 
      41             : /// tell instances when a DataWriter transitions to DEAD
      42             : /// The writer state is inout parameter, the state is set to DEAD
      43             : /// when it returns.
      44             : void
      45           0 : WriterInfoListener::writer_became_dead(WriterInfo&)
      46             : {
      47           0 : }
      48             : 
      49             : /// tell instance when a DataWriter is removed.
      50             : /// The liveliness status need update.
      51             : void
      52           0 : WriterInfoListener::writer_removed(WriterInfo&)
      53             : {
      54           0 : }
      55             : 
      56           0 : WriterInfo::WriterInfo(const WriterInfoListener_rch& reader,
      57             :                        const GUID_t& writer_id,
      58           0 :                        const ::DDS::DataWriterQos& writer_qos)
      59           0 :   : last_liveliness_activity_time_(MonotonicTimePoint::now())
      60           0 :   , historic_samples_timer_(NO_TIMER)
      61           0 :   , last_historic_seq_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
      62           0 :   , waiting_for_end_historic_samples_(false)
      63           0 :   , delivering_historic_samples_(false)
      64           0 :   , delivering_historic_samples_cv_(mutex_)
      65           0 :   , state_(NOT_SET)
      66           0 :   , reader_(reader)
      67           0 :   , writer_id_(writer_id)
      68           0 :   , writer_qos_(writer_qos)
      69           0 :   , handle_(DDS::HANDLE_NIL)
      70             : {
      71             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
      72           0 :   reset_coherent_info();
      73             : #endif
      74             : 
      75           0 :   if (DCPS_debug_level >= 5) {
      76           0 :     ACE_DEBUG((LM_DEBUG,
      77             :                ACE_TEXT("(%P|%t) WriterInfo::WriterInfo: ")
      78             :                ACE_TEXT("writer %C added to reader %C.\n"),
      79             :                LogGuid(writer_id).c_str(),
      80             :                LogGuid(reader->subscription_id_).c_str()));
      81             :   }
      82           0 : }
      83             : 
      84           0 : const char* WriterInfo::get_state_str() const
      85             : {
      86           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      87           0 :   switch (state_) {
      88           0 :   case NOT_SET:
      89           0 :     return "NOT_SET";
      90           0 :   case ALIVE:
      91           0 :     return "ALIVE";
      92           0 :   case DEAD:
      93           0 :     return "DEAD";
      94           0 :   default:
      95           0 :     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: WriterInfo::get_state_str: ")
      96             :       ACE_TEXT("%d is either invalid or not recognized.\n"),
      97             :       state_));
      98           0 :     return "Invalid state";
      99             :   }
     100           0 : }
     101             : 
     102             : void
     103           0 : WriterInfo::schedule_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper, const ACE_Time_Value& ten_seconds)
     104             : {
     105           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     106           0 :   const void* arg = reinterpret_cast<const void*>(this);
     107           0 :   historic_samples_timer_ = sweeper->reactor()->schedule_timer(sweeper, arg, ten_seconds);
     108           0 : }
     109             : 
     110             : void
     111           0 : WriterInfo::cancel_historic_samples_timer(EndHistoricSamplesMissedSweeper* sweeper)
     112             : {
     113           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     114           0 :   if (historic_samples_timer_ != WriterInfo::NO_TIMER) {
     115           0 :     sweeper->reactor()->cancel_timer(historic_samples_timer_);
     116           0 :     historic_samples_timer_ = WriterInfo::NO_TIMER;
     117             :   }
     118           0 : }
     119             : 
     120             : bool
     121           0 : WriterInfo::check_end_historic_samples(EndHistoricSamplesMissedSweeper* sweeper, OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& to_deliver)
     122             : {
     123           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     124           0 :   while (delivering_historic_samples_) {
     125           0 :     delivering_historic_samples_cv_.wait(TheServiceParticipant->get_thread_status_manager());
     126             :   }
     127           0 :   if (waiting_for_end_historic_samples_) {
     128           0 :     bool result = false;
     129           0 :     RcHandle<WriterInfo> info = rchandle_from(this);
     130           0 :     if (!historic_samples_.empty()) {
     131           0 :       last_historic_seq_ = historic_samples_.rbegin()->first;
     132           0 :       delivering_historic_samples_ = true;
     133           0 :       to_deliver.swap(historic_samples_);
     134           0 :       result = true;
     135             :     }
     136           0 :     guard.release();
     137           0 :     sweeper->cancel_timer(info);
     138           0 :     return result;
     139           0 :   }
     140           0 :   return false;
     141           0 : }
     142             : 
     143             : bool
     144           0 : WriterInfo::check_historic(const SequenceNumber& seq, const ReceivedDataSample& sample, SequenceNumber& last_historic_seq)
     145             : {
     146           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     147           0 :   last_historic_seq = last_historic_seq_;
     148           0 :   if (waiting_for_end_historic_samples_) {
     149           0 :     historic_samples_.insert(std::make_pair(seq, sample));
     150           0 :     return true;
     151             :   }
     152           0 :   return false;
     153           0 : }
     154             : 
     155             : void
     156           0 : WriterInfo::finished_delivering_historic()
     157             : {
     158           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     159           0 :   delivering_historic_samples_ = false;
     160           0 :   delivering_historic_samples_cv_.notify_all();
     161           0 : }
     162             : 
     163             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     164             : void
     165           0 : WriterInfo::add_coherent_samples(const SequenceNumber& seq)
     166             : {
     167           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     168           0 :   if (coherent_samples_ == 0) {
     169           0 :     static const SequenceNumber defaultSN;
     170           0 :     const SequenceRange resetRange(defaultSN, seq);
     171           0 :     coherent_sample_sequence_.reset();
     172           0 :     coherent_sample_sequence_.insert(resetRange);
     173             :   }
     174             :   else {
     175           0 :     coherent_sample_sequence_.insert(seq);
     176             :   }
     177           0 : }
     178             : 
     179             : void
     180           0 : WriterInfo::coherent_change(bool group_coherent, const GUID_t& publisher_id)
     181             : {
     182           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     183           0 :   group_coherent_ = group_coherent;
     184           0 :   publisher_id_ = publisher_id;
     185           0 :   ++coherent_samples_;
     186           0 : }
     187             : #endif
     188             : 
     189             : void
     190           0 : WriterInfo::clear_owner_evaluated()
     191             : {
     192           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     193           0 :   owner_evaluated_.clear();
     194           0 : }
     195             : 
     196             : void
     197           0 : WriterInfo::set_owner_evaluated(::DDS::InstanceHandle_t instance, bool flag)
     198             : {
     199           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     200           0 :   if (flag ||
     201           0 :       (!flag && owner_evaluated_.find(instance) != owner_evaluated_.end())) {
     202           0 :     owner_evaluated_[instance] = flag;
     203             :   }
     204           0 : }
     205             : 
     206             : bool
     207           0 : WriterInfo::is_owner_evaluated(::DDS::InstanceHandle_t instance)
     208             : {
     209           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     210           0 :   OwnerEvaluateFlags::iterator iter = owner_evaluated_.find(instance);
     211           0 :   if (iter == owner_evaluated_.end()) {
     212           0 :     owner_evaluated_.insert(OwnerEvaluateFlags::value_type(instance, false));
     213           0 :     return false;
     214             :   }
     215             :   else
     216           0 :     return iter->second;
     217           0 : }
     218             : 
     219             : MonotonicTimePoint
     220           0 : WriterInfo::check_activity(const MonotonicTimePoint& now)
     221             : {
     222           0 :   MonotonicTimePoint expires_at(MonotonicTimePoint::max_value);
     223             : 
     224           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     225             : 
     226           0 :   WriterInfoListener_rch reader = reader_.lock();
     227             : 
     228             :   // We only need check the liveliness with the non-zero liveliness_lease_duration_.
     229           0 :   if (state_ == ALIVE && reader && !reader->liveliness_lease_duration_.is_zero()) {
     230           0 :     expires_at = last_liveliness_activity_time_ + reader->liveliness_lease_duration_;
     231             : 
     232           0 :     if (expires_at <= now) {
     233             :       // let all instances know this write is not alive.
     234           0 :       guard.release();
     235           0 :       reader->writer_became_dead(*this);
     236           0 :       expires_at = MonotonicTimePoint::max_value;
     237             :     }
     238             :   }
     239             : 
     240           0 :   return expires_at;
     241           0 : }
     242             : 
     243             : void
     244           0 : WriterInfo::removed()
     245             : {
     246           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     247           0 :   WriterInfoListener_rch reader = reader_.lock();
     248           0 :   guard.release();
     249           0 :   if (reader) {
     250           0 :     reader->writer_removed(*this);
     251             :   }
     252           0 : }
     253             : 
     254             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     255             : Coherent_State
     256           0 : WriterInfo::coherent_change_received()
     257             : {
     258           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     259           0 :   if (writer_coherent_samples_.num_samples_ == 0) {
     260           0 :     return NOT_COMPLETED_YET;
     261             :   }
     262             : 
     263           0 :   if (!coherent_sample_sequence_.disjoint()
     264           0 :       && (coherent_sample_sequence_.high()
     265           0 :           == writer_coherent_samples_.last_sample_)) {
     266           0 :     return COMPLETED;
     267             :   }
     268             : 
     269           0 :   if (coherent_sample_sequence_.high() >
     270           0 :       writer_coherent_samples_.last_sample_) {
     271           0 :     return REJECTED;
     272             :   }
     273             : 
     274           0 :   return NOT_COMPLETED_YET;
     275           0 : }
     276             : 
     277             : void
     278           0 : WriterInfo::reset_coherent_info()
     279             : {
     280           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     281           0 :   coherent_samples_ = 0;
     282           0 :   group_coherent_ = false;
     283           0 :   publisher_id_ = GUID_UNKNOWN;
     284           0 :   coherent_sample_sequence_.reset();
     285           0 :   writer_coherent_samples_.reset();
     286           0 :   group_coherent_samples_.clear();
     287           0 : }
     288             : 
     289             : 
     290             : void
     291           0 : WriterInfo::set_group_info(const CoherentChangeControl& info)
     292             : {
     293           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     294           0 :   if (!(publisher_id_ == info.publisher_id_)
     295           0 :       || group_coherent_ != info.group_coherent_) {
     296           0 :     WriterInfoListener_rch reader = reader_.lock();
     297           0 :     ACE_ERROR((LM_ERROR,
     298             :                ACE_TEXT("(%P|%t) ERROR: WriterInfo::set_group_info()")
     299             :                ACE_TEXT(" reader %C writer %C incorrect coherent info !\n"),
     300             :                reader ? LogGuid(reader->subscription_id_).c_str() : "",
     301             :                LogGuid(writer_id_).c_str()));
     302           0 :   }
     303             : 
     304           0 :   writer_coherent_samples_ = info.coherent_samples_;
     305           0 :   group_coherent_samples_ = info.group_coherent_samples_;
     306           0 : }
     307             : 
     308             : #endif  // OPENDDS_NO_OBJECT_MODEL_PROFILE
     309             : 
     310             : }
     311             : }
     312             : 
     313             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16