LCOV - code coverage report
Current view: top level - DCPS - InternalDataReader.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 249 272 91.5 %
Date: 2023-04-30 01:32:43 Functions: 26 36 72.2 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_INTERNAL_DATA_READER_H
       9             : #define OPENDDS_DCPS_INTERNAL_DATA_READER_H
      10             : 
      11             : #include "dcps_export.h"
      12             : 
      13             : #ifndef ACE_LACKS_PRAGMA_ONCE
      14             : #  pragma once
      15             : #endif /* ACE_LACKS_PRAGMA_ONCE */
      16             : 
      17             : #include "RcObject.h"
      18             : #include "PoolAllocator.h"
      19             : #include "InternalDataReaderListener.h"
      20             : #include "Time_Helper.h"
      21             : #include "TimeTypes.h"
      22             : 
      23             : #include <dds/DdsDcpsCoreC.h>
      24             : #include <dds/DdsDcpsInfrastructureC.h>
      25             : 
      26             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      27             : 
      28             : namespace OpenDDS {
      29             : namespace DCPS {
      30             : 
      31             : class InternalEntity : public virtual RcObject {};
      32             : typedef WeakRcHandle<InternalEntity> InternalEntity_wrch;
      33             : 
      34             : typedef OPENDDS_VECTOR(DDS::SampleInfo) InternalSampleInfoSequence;
      35             : 
      36          56 : inline DDS::SampleInfo make_sample_info(DDS::SampleStateKind sample_state,
      37             :                                         DDS::ViewStateKind view_state,
      38             :                                         DDS::InstanceStateKind instance_state,
      39             :                                         CORBA::Long disposed_generation_count,
      40             :                                         CORBA::Long no_writers_generation_count,
      41             :                                         CORBA::Long sample_rank,
      42             :                                         CORBA::Long generation_rank,
      43             :                                         CORBA::Long absolute_generation_rank,
      44             :                                         bool valid_data)
      45             : {
      46             :   DDS::SampleInfo si;
      47          56 :   si.sample_state = sample_state;
      48          56 :   si.view_state = view_state;
      49          56 :   si.instance_state = instance_state;
      50          56 :   si.source_timestamp = make_time_t(0, 0); // TODO
      51          56 :   si.instance_handle = DDS::HANDLE_NIL; // TODO
      52          56 :   si.publication_handle = DDS::HANDLE_NIL; // TODO
      53          56 :   si.disposed_generation_count = disposed_generation_count;
      54          56 :   si.no_writers_generation_count = no_writers_generation_count;
      55          56 :   si.sample_rank = sample_rank;
      56          56 :   si.generation_rank = generation_rank;
      57          56 :   si.absolute_generation_rank = absolute_generation_rank;
      58          56 :   si.valid_data = valid_data;
      59          56 :   return si;
      60             : }
      61             : 
      62             : #ifndef OPENDDS_SAFETY_PROFILE
      63          28 : inline bool operator==(const DDS::SampleInfo& x, const DDS::SampleInfo& y)
      64             : {
      65          56 :   return x.sample_state == y.sample_state &&
      66          28 :     x.view_state == y.view_state &&
      67          28 :     x.instance_state == y.instance_state &&
      68          28 :     x.source_timestamp == y.source_timestamp &&
      69          28 :     x.instance_handle == y.instance_handle &&
      70          28 :     x.publication_handle == y.publication_handle &&
      71          28 :     x.disposed_generation_count == y.disposed_generation_count &&
      72          28 :     x.no_writers_generation_count == y.no_writers_generation_count &&
      73          28 :     x.sample_rank == y.sample_rank &&
      74          28 :     x.generation_rank == y.generation_rank &&
      75          84 :     x.absolute_generation_rank == y.absolute_generation_rank &&
      76          56 :     x.valid_data == y.valid_data;
      77             : }
      78             : #endif
      79             : 
      80             : class SampleInfoWrapper {
      81             : public:
      82          56 :   SampleInfoWrapper(const DDS::SampleInfo& sample_info)
      83          56 :     : si(sample_info)
      84          56 :   {}
      85             : 
      86          28 :   bool operator==(const SampleInfoWrapper& other) const
      87             :   {
      88          28 :     return si == other.si;
      89             :   }
      90             : 
      91             :   DDS::SampleInfo si;
      92             : };
      93             : 
      94             : template <typename T>
      95             : class InternalDataReader : public InternalEntity {
      96             : public:
      97             :   typedef OPENDDS_VECTOR(T) SampleSequence;
      98             :   typedef RcHandle<InternalDataReaderListener<T> > Listener_rch;
      99             :   typedef WeakRcHandle<InternalDataReaderListener<T> > Listener_wrch;
     100             : 
     101          29 :   explicit InternalDataReader(const DDS::DataReaderQos qos,
     102             :                               Listener_rch listener = Listener_rch())
     103          29 :     : qos_(qos)
     104          29 :     , listener_(listener)
     105          29 :   {}
     106             : 
     107             :   /// @name InternalTopic and InternalWriter Interface
     108             :   /// @{
     109           4 :   bool durable() const { return qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS; }
     110             : 
     111           5 :   void remove_publication(InternalEntity_wrch publication_handle, bool autodispose_unregistered_instances)
     112             :   {
     113           5 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     114             : 
     115             :     // FUTURE: Index by publication_handle to avoid the loop.
     116           5 :     bool schedule = false;
     117           8 :     for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ++pos) {
     118           3 :       if (autodispose_unregistered_instances && pos->second.dispose(publication_handle, qos_)) {
     119           2 :         schedule = true;
     120             :       }
     121           3 :       if (pos->second.unregister_instance(publication_handle, qos_)) {
     122           1 :         schedule = true;
     123             :       }
     124             :     }
     125             : 
     126           5 :     if (schedule) {
     127           3 :       const Listener_rch listener = listener_.lock();
     128           3 :       if (listener) {
     129           0 :         listener->schedule(rchandle_from(this));
     130             :         // TODO: If the listener doesn't do anything, then clean up then possibly clean up the instance.
     131             :       }
     132           3 :     }
     133           5 :   }
     134             : 
     135          29 :   void write(InternalEntity_wrch publication_handle, const T& sample)
     136             :   {
     137          29 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     138             : 
     139          29 :     const std::pair<typename InstanceMap::iterator, bool> p = instance_map_.insert(std::make_pair(sample, Instance()));
     140          29 :     p.first->second.write(publication_handle, sample, qos_);
     141             : 
     142          29 :     const Listener_rch listener = listener_.lock();
     143          29 :     if (listener) {
     144           1 :       listener->schedule(rchandle_from(this));
     145             :     }
     146          29 :   }
     147             : 
     148           5 :   void dispose(InternalEntity_wrch publication_handle, const T& sample)
     149             :   {
     150           5 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     151             : 
     152           5 :     typename InstanceMap::iterator pos = instance_map_.find(sample);
     153           5 :     if (pos == instance_map_.end()) {
     154           0 :       return;
     155             :     }
     156             : 
     157           5 :     if (pos->second.dispose(publication_handle, qos_)) {
     158           5 :       const Listener_rch listener = listener_.lock();
     159           5 :       if (listener) {
     160           0 :         listener->schedule(rchandle_from(this));
     161             :       }
     162           5 :     }
     163           5 :   }
     164             : 
     165           5 :   void unregister_instance(InternalEntity_wrch publication_handle, const T& sample)
     166             :   {
     167           5 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     168             : 
     169           5 :     typename InstanceMap::iterator pos = instance_map_.find(sample);
     170           5 :     if (pos == instance_map_.end()) {
     171           0 :       return;
     172             :     }
     173             : 
     174           5 :     if (pos->second.unregister_instance(publication_handle, qos_)) {
     175           4 :       const Listener_rch listener = listener_.lock();
     176           4 :       if (listener) {
     177           0 :         listener->schedule(rchandle_from(this));
     178             :       }
     179           4 :     }
     180           5 :   }
     181             :   /// @}
     182             : 
     183             :   /// @name User Interface
     184             :   /// @{
     185           1 :   void set_listener(Listener_rch listener)
     186             :   {
     187           1 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     188           1 :     listener_ = listener;
     189           1 :   }
     190             : 
     191           1 :   Listener_rch get_listener() const
     192             :   {
     193           1 :     ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, Listener_rch());
     194           1 :     return listener_.lock();
     195           1 :   }
     196             : 
     197          10 :   void read(SampleSequence& samples,
     198             :             InternalSampleInfoSequence& infos,
     199             :             CORBA::Long max_samples,
     200             :             DDS::SampleStateMask sample_states,
     201             :             DDS::ViewStateMask view_states,
     202             :             DDS::InstanceStateMask instance_states)
     203             :   {
     204          10 :     samples.clear();
     205          10 :     infos.clear();
     206             : 
     207             :     // TODO: Index to avoid the loop.
     208          10 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     209          20 :     for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ) {
     210          10 :       pos->second.read(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
     211          10 :       pos->second.purge_samples(qos_);
     212          10 :       if (pos->second.can_purge_instance(qos_)) {
     213           0 :         instance_map_.erase(pos++);
     214             :       } else {
     215          10 :         ++pos;
     216             :       }
     217             :     }
     218          10 :   }
     219             : 
     220          23 :   void take(SampleSequence& samples,
     221             :             InternalSampleInfoSequence& infos,
     222             :             CORBA::Long max_samples,
     223             :             DDS::SampleStateMask sample_states,
     224             :             DDS::ViewStateMask view_states,
     225             :             DDS::InstanceStateMask instance_states)
     226             :   {
     227          23 :     samples.clear();
     228          23 :     infos.clear();
     229             : 
     230             :     // TODO: Index to avoid the loop.
     231          23 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     232          48 :     for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end(); pos != limit; ) {
     233          25 :       pos->second.take(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
     234          25 :       pos->second.purge_samples(qos_);
     235          25 :       if (pos->second.can_purge_instance(qos_)) {
     236           7 :         instance_map_.erase(pos++);
     237             :       } else {
     238          18 :         ++pos;
     239             :       }
     240             :     }
     241          23 :   }
     242             : 
     243           2 :   void read_instance(SampleSequence& samples,
     244             :                      InternalSampleInfoSequence& infos,
     245             :                      CORBA::Long max_samples,
     246             :                      const T& key,
     247             :                      DDS::SampleStateMask sample_states,
     248             :                      DDS::ViewStateMask view_states,
     249             :                      DDS::InstanceStateMask instance_states)
     250             :   {
     251           2 :     samples.clear();
     252           2 :     infos.clear();
     253             : 
     254           2 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     255           2 :     typename InstanceMap::iterator pos = instance_map_.find(key);
     256           2 :     if (pos != instance_map_.end()) {
     257           1 :       pos->second.read(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
     258           1 :       pos->second.purge_samples(qos_);
     259           1 :       if (pos->second.can_purge_instance(qos_)) {
     260           0 :         instance_map_.erase(pos);
     261             :       }
     262             :     }
     263           2 :   }
     264             : 
     265           2 :   void take_instance(SampleSequence& samples,
     266             :                      InternalSampleInfoSequence& infos,
     267             :                      CORBA::Long max_samples,
     268             :                      const T& key,
     269             :                      DDS::SampleStateMask sample_states,
     270             :                      DDS::ViewStateMask view_states,
     271             :                      DDS::InstanceStateMask instance_states)
     272             :   {
     273           2 :     samples.clear();
     274           2 :     infos.clear();
     275             : 
     276           2 :     ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
     277           2 :     typename InstanceMap::iterator pos = instance_map_.find(key);
     278           2 :     if (pos != instance_map_.end()) {
     279           1 :       pos->second.take(pos->first, samples, infos, max_samples, sample_states, view_states, instance_states);
     280           1 :       pos->second.purge_samples(qos_);
     281           1 :       if (pos->second.can_purge_instance(qos_)) {
     282           0 :         instance_map_.erase(pos);
     283             :       }
     284             :     }
     285           2 :   }
     286             : /// @}
     287             : 
     288             : private:
     289             :   const DDS::DataReaderQos qos_;
     290             :   // Often, the listener will have the reader as a member.  Use a weak
     291             :   // pointer to prevent a cycle that prevents the listener from being
     292             :   // destroyed.
     293             :   Listener_wrch listener_;
     294             : 
     295             :   typedef OPENDDS_SET(InternalEntity_wrch) PublicationSet;
     296             : 
     297             :   class Instance {
     298             :   public:
     299             : 
     300          29 :     Instance()
     301          29 :       : view_state_(DDS::NEW_VIEW_STATE)
     302          29 :       , instance_state_(DDS::ALIVE_INSTANCE_STATE)
     303          29 :       , disposed_generation_count_(0)
     304          29 :       , no_writers_generation_count_(0)
     305          29 :       , informed_of_not_alive_(false)
     306             :     {
     307          29 :       disposed_expiration_date_.sec = 0;
     308          29 :       disposed_expiration_date_.nanosec = 0;
     309          29 :       no_writers_expiration_date_.sec = 0;
     310          29 :       no_writers_expiration_date_.nanosec = 0;
     311          29 :     }
     312             : 
     313             :     DDS::ViewStateKind view_state() const { return view_state_; }
     314             : 
     315             :     DDS::InstanceStateKind instance_state() const { return instance_state_; }
     316             : 
     317          37 :     void purge_samples(const DDS::DataReaderQos& qos)
     318             :     {
     319          83 :       if (instance_state_ == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE &&
     320          37 :           !is_infinite(qos.reader_data_lifecycle.autopurge_disposed_samples_delay) &&
     321          37 :           SystemTimePoint::now().to_dds_time() > disposed_expiration_date_) {
     322           0 :         not_read_samples_.clear();
     323           0 :         read_samples_.clear();
     324             :       }
     325          37 :     }
     326             : 
     327          37 :     bool can_purge_instance(const DDS::DataReaderQos& qos) const
     328             :     {
     329          16 :       if (instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
     330          30 :           not_read_samples_.empty() &&
     331          67 :           read_samples_.empty() &&
     332          11 :           publication_set_.empty()) {
     333           7 :         return true;
     334             :       }
     335             : 
     336          63 :       if (instance_state_ == DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE &&
     337          30 :           !is_infinite(qos.reader_data_lifecycle.autopurge_nowriter_samples_delay) &&
     338          30 :           SystemTimePoint::now().to_dds_time() > no_writers_expiration_date_) {
     339           0 :         return true;
     340             :       }
     341             : 
     342          30 :       return false;
     343             :     }
     344             : 
     345          11 :     void read(const T& key,
     346             :               SampleSequence& samples,
     347             :               InternalSampleInfoSequence& infos,
     348             :               CORBA::Long max_samples,
     349             :               DDS::SampleStateMask sample_states,
     350             :               DDS::ViewStateMask view_states,
     351             :               DDS::InstanceStateMask instance_states)
     352             :     {
     353          11 :       if (!((view_states & view_state_) && (instance_states & instance_state_))) {
     354           4 :         return;
     355             :       }
     356             : 
     357           7 :       CORBA::Long sample_count = 0;
     358             : 
     359           7 :       if (sample_states & DDS::READ_SAMPLE_STATE) {
     360           6 :         for (typename SampleList::const_iterator pos = read_samples_.begin(), limit = read_samples_.end();
     361           8 :              pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
     362           2 :           samples.push_back(pos->sample);
     363           2 :           infos.push_back(make_sample_info(DDS::READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
     364           2 :           ++sample_count;
     365             :         }
     366             :       }
     367             : 
     368           7 :       if (sample_states & DDS::NOT_READ_SAMPLE_STATE) {
     369           6 :         typename SampleList::iterator pos = not_read_samples_.begin();
     370           6 :         for (typename SampleList::iterator limit = not_read_samples_.end();
     371          10 :              pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
     372           4 :           samples.push_back(pos->sample);
     373           4 :           infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
     374           4 :           ++sample_count;
     375             :         }
     376           6 :         read_samples_.splice(read_samples_.end(), not_read_samples_, not_read_samples_.begin(), pos);
     377             : 
     378             :         // Generate a synthetic sample for not alive states.
     379           6 :         if (sample_count == 0 &&
     380           0 :             instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
     381           0 :             !informed_of_not_alive_) {
     382           0 :           samples.push_back(key);
     383           0 :           infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, disposed_generation_count_, no_writers_generation_count_, 0, 0, 0, false));
     384           0 :           ++sample_count;
     385             :         }
     386             :       }
     387             : 
     388           7 :       compute_ranks(sample_count, infos);
     389             : 
     390           7 :       if (sample_count) {
     391           6 :         view_state_ = DDS::NOT_NEW_VIEW_STATE;
     392           6 :         informed_of_not_alive_ = true;
     393             :       }
     394             :     }
     395             : 
     396          26 :     void take(const T& key,
     397             :               SampleSequence& samples,
     398             :               InternalSampleInfoSequence& infos,
     399             :               CORBA::Long max_samples,
     400             :               DDS::SampleStateMask sample_states,
     401             :               DDS::ViewStateMask view_states,
     402             :               DDS::InstanceStateMask instance_states)
     403             :     {
     404          26 :       if (!((view_states & view_state_) && (instance_states & instance_state_))) {
     405           4 :         return;
     406             :       }
     407             : 
     408          22 :       CORBA::Long sample_count = 0;
     409             : 
     410          22 :       if (sample_states & DDS::READ_SAMPLE_STATE) {
     411          21 :         typename SampleList::iterator pos = read_samples_.begin();
     412          21 :         for (typename SampleList::iterator limit = read_samples_.end();
     413          21 :              pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
     414           0 :           samples.push_back(pos->sample);
     415           0 :           infos.push_back(make_sample_info(DDS::READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
     416           0 :           ++sample_count;
     417             :         }
     418          21 :         read_samples_.erase(read_samples_.begin(), pos);
     419             :       }
     420             : 
     421          22 :       if (sample_states & DDS::NOT_READ_SAMPLE_STATE) {
     422          21 :         typename SampleList::iterator pos = not_read_samples_.begin();
     423          21 :         for (typename SampleList::iterator limit = not_read_samples_.end();
     424          42 :              pos != limit && (max_samples == DDS::LENGTH_UNLIMITED || sample_count < max_samples); ++pos) {
     425          21 :           samples.push_back(pos->sample);
     426          21 :           infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, pos->disposed_generation_count, pos->no_writers_generation_count, 0, 0, 0, true));
     427          21 :           ++sample_count;
     428             :         }
     429          21 :         not_read_samples_.erase(not_read_samples_.begin(), pos);
     430             : 
     431             :         // Generate a synthetic sample for not alive states.
     432          21 :         if (sample_count == 0 &&
     433           2 :             instance_state_ != DDS::ALIVE_INSTANCE_STATE &&
     434           1 :             !informed_of_not_alive_) {
     435           1 :           samples.push_back(key);
     436           1 :           infos.push_back(make_sample_info(DDS::NOT_READ_SAMPLE_STATE, view_state_, instance_state_, disposed_generation_count_, no_writers_generation_count_, 0, 0, 0, false));
     437           1 :           ++sample_count;
     438             :         }
     439             :       }
     440             : 
     441          22 :       compute_ranks(sample_count, infos);
     442             : 
     443          22 :       if (sample_count) {
     444          20 :         view_state_ = DDS::NOT_NEW_VIEW_STATE;
     445          20 :         informed_of_not_alive_ = true;
     446             :       }
     447             :     }
     448             : 
     449          29 :     void write(InternalEntity_wrch publication_handle,
     450             :                const T& sample,
     451             :                const DDS::DataReaderQos& qos)
     452             :     {
     453          29 :       publication_set_.insert(publication_handle);
     454             : 
     455          29 :       if (view_state_ == DDS::NOT_NEW_VIEW_STATE && instance_state_ != DDS::ALIVE_INSTANCE_STATE) {
     456           2 :         view_state_ = DDS::NEW_VIEW_STATE;
     457             :       }
     458             : 
     459          29 :       switch (instance_state_) {
     460          27 :       case DDS::ALIVE_INSTANCE_STATE:
     461          27 :         break;
     462           2 :       case DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE:
     463           2 :         ++disposed_generation_count_;
     464           2 :         break;
     465           0 :       case DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE:
     466           0 :         ++no_writers_generation_count_;
     467           0 :         break;
     468             :       }
     469             : 
     470          29 :       instance_state_ = DDS::ALIVE_INSTANCE_STATE;
     471             : 
     472          29 :       if (qos.history.kind == DDS::KEEP_LAST_HISTORY_QOS) {
     473          26 :         while (read_samples_.size() + not_read_samples_.size() >= static_cast<size_t>(qos.history.depth)) {
     474           2 :           if (!read_samples_.empty()) {
     475           1 :             read_samples_.pop_front();
     476             :           } else {
     477           1 :             not_read_samples_.pop_front();
     478             :           }
     479             :         }
     480             :       }
     481             : 
     482          29 :       not_read_samples_.push_back(SampleHolder(sample, disposed_generation_count_, no_writers_generation_count_));
     483          29 :     }
     484             : 
     485           7 :     bool dispose(InternalEntity_wrch publication_handle,
     486             :                  const DDS::DataReaderQos& qos)
     487             :     {
     488           7 :       publication_set_.insert(publication_handle);
     489             : 
     490           7 :       if (instance_state_ != DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE) {
     491           7 :         instance_state_ = DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE;
     492           7 :         disposed_expiration_date_ = SystemTimePoint::now().to_dds_time() + qos.reader_data_lifecycle.autopurge_disposed_samples_delay;
     493           7 :         informed_of_not_alive_ = false;
     494           7 :         return true;
     495             :       }
     496             : 
     497           0 :       return false;
     498             :     }
     499             : 
     500           8 :     bool unregister_instance(InternalEntity_wrch publication_handle,
     501             :                              const DDS::DataReaderQos& qos)
     502             :     {
     503           8 :       publication_set_.erase(publication_handle);
     504             : 
     505           8 :       if (publication_set_.empty() && instance_state_ == DDS::ALIVE_INSTANCE_STATE) {
     506           5 :         instance_state_ = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
     507           5 :         no_writers_expiration_date_ = SystemTimePoint::now().to_dds_time() + qos.reader_data_lifecycle.autopurge_nowriter_samples_delay;
     508           5 :         informed_of_not_alive_ = false;
     509           5 :         return true;
     510             :       }
     511             : 
     512           3 :       return false;
     513             :     }
     514             : 
     515             :   private:
     516             :     struct SampleHolder {
     517             :       T sample;
     518             :       CORBA::Long disposed_generation_count;
     519             :       CORBA::Long no_writers_generation_count;
     520             : 
     521          29 :       SampleHolder(const T& s,
     522             :                    CORBA::Long dgc,
     523             :                    CORBA::Long nwgc)
     524          29 :         : sample(s)
     525          29 :         , disposed_generation_count(dgc)
     526          29 :         , no_writers_generation_count(nwgc)
     527          29 :       {}
     528             :     };
     529             : 
     530             :     typedef OPENDDS_LIST(SampleHolder) SampleList;
     531             :     SampleList read_samples_;
     532             :     SampleList not_read_samples_;
     533             : 
     534             :     PublicationSet publication_set_;
     535             : 
     536             :     DDS::ViewStateKind view_state_;
     537             :     DDS::InstanceStateKind instance_state_;
     538             :     DDS::Time_t disposed_expiration_date_;
     539             :     DDS::Time_t no_writers_expiration_date_;
     540             :     CORBA::Long disposed_generation_count_;
     541             :     CORBA::Long no_writers_generation_count_;
     542             :     bool informed_of_not_alive_;
     543             : 
     544          29 :     void compute_ranks(CORBA::Long sample_count, InternalSampleInfoSequence& infos)
     545             :     {
     546          29 :       if (sample_count == 0) {
     547           3 :         return;
     548             :       }
     549             : 
     550          26 :       typename InternalSampleInfoSequence::reverse_iterator pos = infos.rbegin();
     551          26 :       const CORBA::Long mrsic = pos->disposed_generation_count + pos->no_writers_generation_count;
     552          26 :       const CORBA::Long mrs = disposed_generation_count_ + no_writers_generation_count_;
     553             : 
     554          54 :       for (CORBA::Long rank = 0; rank != sample_count; ++rank, ++pos) {
     555          28 :         pos->sample_rank = rank;
     556          28 :         pos->generation_rank = mrsic - (pos->disposed_generation_count + pos->no_writers_generation_count);
     557          28 :         pos->absolute_generation_rank = mrs - (pos->disposed_generation_count + pos->no_writers_generation_count);
     558             :       }
     559             :     }
     560             :   };
     561             : 
     562             :   typedef OPENDDS_MAP_T(T, Instance) InstanceMap;
     563             :   InstanceMap instance_map_;
     564             : 
     565             :   mutable ACE_Thread_Mutex mutex_;
     566             : };
     567             : 
     568             : } // namespace DCPS
     569             : } // namespace OpenDDS
     570             : 
     571             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     572             : 
     573             : #endif /* OPENDDS_DCPS_INTERNAL_DATA_READER_H */

Generated by: LCOV version 1.16