LCOV - code coverage report
Current view: top level - DCPS - DataReaderImpl.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 115 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 142 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_DATAREADERIMPL_H
       9             : #define OPENDDS_DCPS_DATAREADERIMPL_H
      10             : 
      11             : #include "dcps_export.h"
      12             : 
      13             : #include "AssociationData.h"
      14             : #include "Cached_Allocator_With_Overflow_T.h"
      15             : #include "CoherentChangeControl.h"
      16             : #include "ContentFilteredTopicImpl.h"
      17             : #include "DataReaderCallbacks.h"
      18             : #include "Definitions.h"
      19             : #include "DisjointSequence.h"
      20             : #include "DomainParticipantImpl.h"
      21             : #include "EntityImpl.h"
      22             : #include "GroupRakeData.h"
      23             : #include "InstanceState.h"
      24             : #include "MultiTopicImpl.h"
      25             : #include "OwnershipManager.h"
      26             : #include "PoolAllocator.h"
      27             : #include "RcEventHandler.h"
      28             : #include "RcHandle_T.h"
      29             : #include "RcObject.h"
      30             : #include "ReactorInterceptor.h"
      31             : #include "Service_Participant.h"
      32             : #include "Stats_T.h"
      33             : #include "SubscriptionInstance.h"
      34             : #include "TimeTypes.h"
      35             : #include "TopicImpl.h"
      36             : #include "WriterInfo.h"
      37             : #include "ZeroCopyInfoSeq_T.h"
      38             : #include "AtomicBool.h"
      39             : #include "transport/framework/ReceivedDataSample.h"
      40             : #include "transport/framework/TransportClient.h"
      41             : #include "transport/framework/TransportReceiveListener.h"
      42             : 
      43             : #include <dds/DdsDcpsTopicC.h>
      44             : #include <dds/DdsDcpsSubscriptionExtC.h>
      45             : #include <dds/DdsDcpsDomainC.h>
      46             : #include <dds/DdsDcpsTopicC.h>
      47             : #include <dds/DdsDcpsInfrastructureC.h>
      48             : 
      49             : #include <ace/String_Base.h>
      50             : #include <ace/Reverse_Lock_T.h>
      51             : #include <ace/Reactor.h>
      52             : 
      53             : #include <memory>
      54             : 
      55             : #if !defined (ACE_LACKS_PRAGMA_ONCE)
      56             : #pragma once
      57             : #endif /* ACE_LACKS_PRAGMA_ONCE */
      58             : 
      59             : class DDS_TEST;
      60             : 
      61             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      62             : 
      63             : namespace OpenDDS {
      64             : namespace DCPS {
      65             : 
      66             : class SubscriberImpl;
      67             : class DomainParticipantImpl;
      68             : class SubscriptionInstance;
      69             : class TopicImpl;
      70             : class TopicDescriptionImpl;
      71             : class Monitor;
      72             : class DataReaderImpl;
      73             : class FilterEvaluator;
      74             : 
      75             : typedef Cached_Allocator_With_Overflow<ReceivedDataElementMemoryBlock, ACE_Thread_Mutex>
      76             : ReceivedDataAllocator;
      77             : 
      78             : enum MarshalingType {
      79             :   FULL_MARSHALING,
      80             :   KEY_ONLY_MARSHALING
      81             : };
      82             : 
      83             : /// Elements stored for managing statistical data.
      84             : class OpenDDS_Dcps_Export WriterStats {
      85             : public:
      86             :   /// Default constructor.
      87             :   WriterStats(
      88             :     int amount = 0,
      89             :     DataCollector<double>::OnFull type = DataCollector<double>::KeepOldest);
      90             : #ifdef ACE_HAS_CPP11
      91           0 :   WriterStats(const WriterStats&) = default;
      92             : #endif
      93             : 
      94             :   /// Add a datum to the latency statistics.
      95             :   void add_stat(const TimeDuration& delay);
      96             : 
      97             :   /// Extract the current latency statistics for this writer.
      98             :   LatencyStatistics get_stats() const;
      99             : 
     100             :   /// Reset the latency statistics for this writer.
     101             :   void reset_stats();
     102             : 
     103             : #ifndef OPENDDS_SAFETY_PROFILE
     104             :   /// Dump any raw data.
     105             :   std::ostream& raw_data(std::ostream& str) const;
     106             : #endif
     107             : 
     108             : private:
     109             :   /// Latency statistics for the DataWriter to this DataReader.
     110             :   Stats<double> stats_;
     111             : };
     112             : 
     113             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
     114             : 
     115             : class OpenDDS_Dcps_Export AbstractSamples
     116             : {
     117             : public:
     118             :   virtual ~AbstractSamples(){}
     119             :   virtual void reserve(CORBA::ULong size)=0;
     120             :   virtual void push_back(const DDS::SampleInfo& info, const void* sample)=0;
     121             : };
     122             : 
     123             : #endif
     124             : 
     125             : // Class to cleanup in case EndHistoricSamples is missed
     126             : class EndHistoricSamplesMissedSweeper : public ReactorInterceptor {
     127             : public:
     128             :   EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
     129             :                                   ACE_thread_t owner,
     130             :                                   DataReaderImpl* reader);
     131             : 
     132             :   void schedule_timer(WriterInfo_rch& info);
     133             :   void cancel_timer(WriterInfo_rch& info);
     134             : 
     135             :   // Arg will be PublicationId
     136             :   int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
     137             : 
     138           0 :   virtual bool reactor_is_shut_down() const
     139             :   {
     140           0 :     return TheServiceParticipant->is_shut_down();
     141             :   }
     142             : 
     143             : private:
     144             :   ~EndHistoricSamplesMissedSweeper();
     145             : 
     146             :   WeakRcHandle<DataReaderImpl> reader_;
     147             :   OPENDDS_SET(WriterInfo_rch) info_set_;
     148             : 
     149             :   class CommandBase : public Command {
     150             :   public:
     151           0 :     CommandBase(EndHistoricSamplesMissedSweeper* sweeper,
     152             :                 WriterInfo_rch& info)
     153           0 :       : sweeper_(sweeper)
     154           0 :       , info_(info)
     155           0 :     { }
     156             : 
     157             :   protected:
     158             :     EndHistoricSamplesMissedSweeper* sweeper_;
     159             :     WriterInfo_rch info_;
     160             :   };
     161             : 
     162             :   class ScheduleCommand : public CommandBase {
     163             :   public:
     164           0 :     ScheduleCommand(EndHistoricSamplesMissedSweeper* sweeper,
     165             :                     WriterInfo_rch& info)
     166           0 :       : CommandBase(sweeper, info)
     167           0 :     { }
     168             :     virtual void execute();
     169             :   };
     170             : 
     171             :   class CancelCommand : public CommandBase {
     172             :   public:
     173           0 :     CancelCommand(EndHistoricSamplesMissedSweeper* sweeper,
     174             :                   WriterInfo_rch& info)
     175           0 :       : CommandBase(sweeper, info)
     176           0 :     { }
     177             :     virtual void execute();
     178             :   };
     179             : };
     180             : 
     181             : class MessageHolder : public virtual RcObject {
     182             : public:
     183             :   virtual const void* get() const = 0;
     184             : };
     185             : 
     186             : template <typename T>
     187             : class MessageHolder_T : public MessageHolder {
     188             : public:
     189           0 :   MessageHolder_T(const T& v) : v_(v) {}
     190           0 :   const void* get() const { return &v_; }
     191             : private:
     192             :   T v_;
     193             : };
     194             : 
     195             : /**
     196             : * @class DataReaderImpl
     197             : *
     198             : * @brief Implements the DDS::DataReader interface.
     199             : *
     200             : * See the DDS specification, OMG formal/2015-04-10, for a description of
     201             : * the interface this class is implementing.
     202             : *
     203             : * This class must be inherited by the type-specific datareader which
     204             : * is specific to the data-type associated with the topic.
     205             : *
     206             : */
     207             : class OpenDDS_Dcps_Export DataReaderImpl
     208             :   : public virtual LocalObject<DataReaderEx>,
     209             :     public virtual DataReaderCallbacks,
     210             :     public virtual EntityImpl,
     211             :     public virtual TransportClient,
     212             :     public virtual TransportReceiveListener,
     213             :     private WriterInfoListener {
     214             : public:
     215             :   friend class RequestedDeadlineWatchdog;
     216             :   friend class QueryConditionImpl;
     217             :   friend class SubscriberImpl;
     218             : 
     219             :   typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType;
     220             :   typedef OPENDDS_SET(DDS::InstanceHandle_t) InstanceSet;
     221             :   typedef OPENDDS_SET(SubscriptionInstance_rch) SubscriptionInstanceSet;
     222             :   /// Type of collection of statistics for writers to this reader.
     223             :   typedef OPENDDS_MAP_CMP(GUID_t, WriterStats, GUID_tKeyLessThan) StatsMapType;
     224             : 
     225             :   DataReaderImpl();
     226             : 
     227             :   virtual ~DataReaderImpl();
     228             : 
     229             :   virtual DDS::InstanceHandle_t get_instance_handle();
     230             : 
     231             :   virtual void add_association(const GUID_t& yourId,
     232             :                                const WriterAssociation& writer,
     233             :                                bool active);
     234             : 
     235             :   virtual void transport_assoc_done(int flags, const GUID_t& remote_id);
     236             : 
     237             :   virtual void remove_associations(const WriterIdSeq& writers, bool callback);
     238             : 
     239             :   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
     240             : 
     241             :   virtual void signal_liveliness(const GUID_t& remote_participant);
     242             : 
     243             :   /**
     244             :   * This is used to retrieve the listener for a certain status change.
     245             :   * If this datareader has a registered listener and the status kind
     246             :   * is in the listener mask then the listener is returned.
     247             :   * Otherwise, the query for the listener is propagated up to the
     248             :   * factory/subscriber.
     249             :   */
     250             :   DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind);
     251             : 
     252             :   /// tell instances when a DataWriter transitions to being alive
     253             :   /// The writer state is inout parameter, it has to be set ALIVE before
     254             :   /// handle_timeout is called since some subroutine use the state.
     255             :   void writer_became_alive(WriterInfo& info,
     256             :                            const MonotonicTimePoint& when);
     257             : 
     258             :   /// tell instances when a DataWriter transitions to DEAD
     259             :   /// The writer state is inout parameter, the state is set to DEAD
     260             :   /// when it returns.
     261             :   void writer_became_dead(WriterInfo& info);
     262             : 
     263             :   /// tell instance when a DataWriter is removed.
     264             :   /// The liveliness status need update.
     265             :   void writer_removed(WriterInfo& info);
     266             : 
     267             :   virtual void cleanup();
     268             : 
     269             :   void init(
     270             :     TopicDescriptionImpl* a_topic_desc,
     271             :     const DDS::DataReaderQos& qos,
     272             :     DDS::DataReaderListener_ptr a_listener,
     273             :     const DDS::StatusMask& mask,
     274             :     DomainParticipantImpl* participant,
     275             :     SubscriberImpl* subscriber);
     276             : 
     277             :   virtual DDS::ReadCondition_ptr create_readcondition(
     278             :     DDS::SampleStateMask sample_states,
     279             :     DDS::ViewStateMask view_states,
     280             :     DDS::InstanceStateMask instance_states);
     281             : 
     282             : #ifndef OPENDDS_NO_QUERY_CONDITION
     283             :   virtual DDS::QueryCondition_ptr create_querycondition(
     284             :     DDS::SampleStateMask sample_states,
     285             :     DDS::ViewStateMask view_states,
     286             :     DDS::InstanceStateMask instance_states,
     287             :     const char * query_expression,
     288             :     const DDS::StringSeq & query_parameters);
     289             : #endif
     290             : 
     291             :   virtual DDS::ReturnCode_t delete_readcondition(
     292             :     DDS::ReadCondition_ptr a_condition);
     293             : 
     294             :   virtual DDS::ReturnCode_t delete_contained_entities();
     295             : 
     296             :   virtual DDS::ReturnCode_t set_qos(
     297             :     const DDS::DataReaderQos & qos);
     298             : 
     299             :   virtual DDS::ReturnCode_t get_qos(
     300             :     DDS::DataReaderQos & qos);
     301             : 
     302             :   virtual DDS::ReturnCode_t set_listener(
     303             :     DDS::DataReaderListener_ptr a_listener,
     304             :     DDS::StatusMask mask);
     305             : 
     306             :   virtual DDS::DataReaderListener_ptr get_listener();
     307             : 
     308             :   virtual DDS::TopicDescription_ptr get_topicdescription();
     309             : 
     310             :   virtual DDS::Subscriber_ptr get_subscriber();
     311             : 
     312             :   virtual DDS::ReturnCode_t get_sample_rejected_status(
     313             :     DDS::SampleRejectedStatus & status);
     314             : 
     315             :   virtual DDS::ReturnCode_t get_liveliness_changed_status(
     316             :     DDS::LivelinessChangedStatus & status);
     317             : 
     318             :   virtual DDS::ReturnCode_t get_requested_deadline_missed_status(
     319             :     DDS::RequestedDeadlineMissedStatus & status);
     320             : 
     321             :   virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(
     322             :     DDS::RequestedIncompatibleQosStatus & status);
     323             : 
     324             :   virtual DDS::ReturnCode_t get_subscription_matched_status(
     325             :     DDS::SubscriptionMatchedStatus & status);
     326             : 
     327             :   virtual DDS::ReturnCode_t get_sample_lost_status(
     328             :     DDS::SampleLostStatus & status);
     329             : 
     330             :   virtual DDS::ReturnCode_t wait_for_historical_data(
     331             :     const DDS::Duration_t & max_wait);
     332             : 
     333             :   virtual DDS::ReturnCode_t get_matched_publications(
     334             :     DDS::InstanceHandleSeq & publication_handles);
     335             : 
     336             : #if !defined (DDS_HAS_MINIMUM_BIT)
     337             :   virtual DDS::ReturnCode_t get_matched_publication_data(
     338             :     DDS::PublicationBuiltinTopicData & publication_data,
     339             :     DDS::InstanceHandle_t publication_handle);
     340             : #endif // !defined (DDS_HAS_MINIMUM_BIT)
     341             : 
     342             :   virtual DDS::ReturnCode_t enable();
     343             : 
     344             : #ifndef OPENDDS_SAFETY_PROFILE
     345             :   virtual void get_latency_stats(
     346             :     LatencyStatisticsSeq & stats);
     347             : #endif
     348             : 
     349             :   virtual void reset_latency_stats();
     350             : 
     351             :   virtual CORBA::Boolean statistics_enabled();
     352             : 
     353             :   virtual void statistics_enabled(
     354             :     CORBA::Boolean statistics_enabled);
     355             : 
     356             :   /// @name Raw Latency Statistics Interfaces
     357             :   /// @{
     358             : 
     359             :   /// Expose the statistics container.
     360             :   const StatsMapType& raw_latency_statistics() const;
     361             : 
     362             :   /// Configure the size of the raw data collection buffer.
     363             :   unsigned int& raw_latency_buffer_size();
     364             : 
     365             :   /// Configure the type of the raw data collection buffer.
     366             :   DataCollector<double>::OnFull& raw_latency_buffer_type();
     367             : 
     368             :   /// @}
     369             : 
     370             :   /// update liveliness info for this writer.
     371             :   void writer_activity(const DataSampleHeader& header);
     372             : 
     373             :   /// process a message that has been received - could be control or a data sample.
     374             :   virtual void data_received(const ReceivedDataSample& sample);
     375             : 
     376             :   void transport_discovery_change();
     377             : 
     378             :   virtual bool check_transport_qos(const TransportInst& inst);
     379             : 
     380             :   bool have_sample_states(DDS::SampleStateMask sample_states) const;
     381             :   bool have_view_states(DDS::ViewStateMask view_states) const;
     382             :   bool have_instance_states(DDS::InstanceStateMask instance_states) const;
     383             :   bool contains_sample(DDS::SampleStateMask sample_states,
     384             :                        DDS::ViewStateMask view_states,
     385             :                        DDS::InstanceStateMask instance_states);
     386             : 
     387             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
     388             :   virtual bool contains_sample_filtered(DDS::SampleStateMask sample_states,
     389             :                                         DDS::ViewStateMask view_states,
     390             :                                         DDS::InstanceStateMask instance_states,
     391             :                                         const FilterEvaluator& evaluator,
     392             :                                         const DDS::StringSeq& params) = 0;
     393             : #endif
     394             : 
     395             :   virtual RcHandle<MessageHolder> dds_demarshal(const ReceivedDataSample& sample,
     396             :                                                 DDS::InstanceHandle_t publication_handle,
     397             :                                                 SubscriptionInstance_rch& instance,
     398             :                                                 bool& is_new_instance,
     399             :                                                 bool& filtered,
     400             :                                                 MarshalingType marshaling_type,
     401             :                                                 bool full_copy) = 0;
     402             : 
     403             :   virtual void dispose_unregister(const ReceivedDataSample& sample,
     404             :                                   DDS::InstanceHandle_t publication_handle,
     405             :                                   SubscriptionInstance_rch& instance);
     406             : 
     407             :   void process_latency(const ReceivedDataSample& sample);
     408             :   void notify_latency(GUID_t writer);
     409             : 
     410           0 :   size_t get_depth() const
     411             :   {
     412           0 :     return static_cast<size_t>(depth_);
     413             :   }
     414           0 :   size_t get_n_chunks() const
     415             :   {
     416           0 :     return n_chunks_;
     417             :   }
     418             : 
     419             :   void liveliness_lost();
     420             : 
     421             :   void remove_all_associations();
     422             : 
     423             :   void notify_subscription_disconnected(const WriterIdSeq& pubids);
     424             :   void notify_subscription_reconnected(const WriterIdSeq& pubids);
     425             :   void notify_subscription_lost(const WriterIdSeq& pubids);
     426             :   void notify_liveliness_change();
     427             : 
     428             :   bool is_bit() const;
     429             : 
     430             :   /**
     431             :    * This method is used for a precondition check of delete_datareader.
     432             :    *
     433             :    * @retval true We have zero-copy samples loaned out
     434             :    * @retval false We have no zero-copy samples loaned out
     435             :    */
     436             :   bool has_zero_copies();
     437             : 
     438             :   /// Release the instance with the handle.
     439             :   void release_instance(DDS::InstanceHandle_t handle);
     440             : 
     441             :   // Take appropriate actions upon learning instance or view state has been updated
     442             :   void state_updated(DDS::InstanceHandle_t handle);
     443             : 
     444             :   /// Release all instances held by the reader.
     445             :   virtual void release_all_instances() = 0;
     446             : 
     447             :   ACE_Reactor_Timer_Interface* get_reactor();
     448             : 
     449             :   GUID_t get_topic_id();
     450             :   GUID_t get_dp_id();
     451             : 
     452             :   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
     453             :   void get_instance_handles(InstanceHandleVec& instance_handles);
     454             : 
     455             :   typedef std::pair<GUID_t, WriterInfo::WriterState> WriterStatePair;
     456             :   typedef OPENDDS_VECTOR(WriterStatePair) WriterStatePairVec;
     457             :   void get_writer_states(WriterStatePairVec& writer_states);
     458             : 
     459             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     460             :   void update_ownership_strength (const GUID_t& pub_id,
     461             :                                   const CORBA::Long& ownership_strength);
     462             : 
     463             :   // Access to OwnershipManager is only valid when the domain participant is valid;
     464             :   // therefore, we must lock the domain pariticipant when using  OwnershipManager.
     465             :   class OwnershipManagerPtr
     466             :   {
     467             :   public:
     468           0 :     OwnershipManagerPtr(DataReaderImpl* reader)
     469           0 :       : participant_( (reader && reader->is_exclusive_ownership_) ? reader->participant_servant_.lock() : RcHandle<DomainParticipantImpl>())
     470             :     {
     471           0 :     }
     472           0 :     operator bool() const { return participant_.in(); }
     473           0 :     OwnershipManager* operator->() const
     474             :     {
     475           0 :       return participant_ ? participant_->ownership_manager() : 0;
     476             :     }
     477             : 
     478             :   private:
     479             :     RcHandle<DomainParticipantImpl> participant_;
     480             :   };
     481             :   friend class OwnershipManagerPtr;
     482             : 
     483             :   struct OwnershipManagerScopedAccess {
     484           0 :     OwnershipManagerScopedAccess() : om_(0), lock_result_(0) {}
     485           0 :     explicit OwnershipManagerScopedAccess(DataReaderImpl::OwnershipManagerPtr om) : om_(om), lock_result_(om_ ? om_->instance_lock_acquire() : 0) {}
     486           0 :     ~OwnershipManagerScopedAccess() { release(); }
     487             : 
     488           0 :     void swap(OwnershipManagerScopedAccess& rhs)
     489             :     {
     490           0 :       if (&rhs != this) {
     491           0 :         std::swap(om_, rhs.om_);
     492           0 :         std::swap(lock_result_, rhs.lock_result_);
     493             :       }
     494           0 :     }
     495             : 
     496           0 :     int release()
     497             :     {
     498           0 :       int result = 0;
     499           0 :       if (om_ && !lock_result_) {
     500           0 :         result = om_->instance_lock_release();
     501             :       }
     502           0 :       om_ = 0;
     503           0 :       lock_result_ = 0;
     504           0 :       return result;
     505             :     }
     506             : 
     507             :     OwnershipManagerPtr om_;
     508             :     int lock_result_;
     509             :   };
     510             : 
     511           0 :   OwnershipManagerPtr ownership_manager() { return OwnershipManagerPtr(this); }
     512             : #endif
     513             : 
     514             :   virtual void lookup_instance(const ReceivedDataSample& sample,
     515             :                                SubscriptionInstance_rch& instance) = 0;
     516             : 
     517             : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
     518             : 
     519             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     520             : 
     521             :   void enable_filtering(ContentFilteredTopicImpl* cft);
     522             : 
     523             :   DDS::ContentFilteredTopic_ptr get_cf_topic() const;
     524             : 
     525             : #endif
     526             : 
     527             : #ifndef OPENDDS_NO_MULTI_TOPIC
     528             : 
     529             :   void enable_multi_topic(MultiTopicImpl* mt);
     530             : 
     531             : #endif
     532             : 
     533             :   void update_subscription_params(const DDS::StringSeq& params) const;
     534             : 
     535             :   typedef OPENDDS_VECTOR(void*) GenericSeq;
     536             : 
     537             :   struct GenericBundle {
     538             :     GenericSeq samples_;
     539             :     DDS::SampleInfoSeq info_;
     540             :   };
     541             : 
     542             :   virtual DDS::ReturnCode_t read_generic(GenericBundle& gen,
     543             :     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
     544             :     DDS::InstanceStateMask instance_states, bool adjust_ref_count ) = 0;
     545             : 
     546             :   virtual DDS::ReturnCode_t take(
     547             :     AbstractSamples& samples,
     548             :     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
     549             :     DDS::InstanceStateMask instance_states)=0;
     550             : 
     551             :   virtual DDS::InstanceHandle_t lookup_instance_generic(const void* data) = 0;
     552             : 
     553             :   virtual DDS::ReturnCode_t read_instance_generic(void*& data,
     554             :     DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
     555             :     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
     556             :     DDS::InstanceStateMask instance_states) = 0;
     557             : 
     558             :   virtual DDS::ReturnCode_t read_next_instance_generic(void*& data,
     559             :     DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
     560             :     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
     561             :     DDS::InstanceStateMask instance_states) = 0;
     562             : 
     563             : #endif
     564             : 
     565           0 :   void set_instance_state(DDS::InstanceHandle_t instance,
     566             :                           DDS::InstanceStateKind state,
     567             :                           const SystemTimePoint& timestamp = SystemTimePoint::now(),
     568             :                           const GUID_t& guid = GUID_UNKNOWN)
     569             :   {
     570           0 :     DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
     571             :     {
     572           0 :       ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, publication_handle_lock_);
     573           0 :       RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(guid);
     574           0 :       if (pos != publication_id_to_handle_map_.end()) {
     575           0 :         publication_handle = pos->second;
     576             :       }
     577           0 :     }
     578             : 
     579           0 :     ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
     580           0 :     set_instance_state_i(instance, publication_handle, state, timestamp, guid);
     581           0 :   }
     582             : 
     583             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     584             :   void begin_access();
     585             :   void end_access();
     586             :   void get_ordered_data(GroupRakeData& data,
     587             :                         DDS::SampleStateMask sample_states,
     588             :                         DDS::ViewStateMask view_states,
     589             :                         DDS::InstanceStateMask instance_states);
     590             : 
     591             :   void accept_coherent (const GUID_t& writer_id,
     592             :                         const GUID_t& publisher_id);
     593             :   void reject_coherent (const GUID_t& writer_id,
     594             :                         const GUID_t& publisher_id);
     595             :   void coherent_change_received (const GUID_t& publisher_id, Coherent_State& result);
     596             : 
     597             :   void coherent_changes_completed (DataReaderImpl* reader);
     598             : 
     599             :   void reset_coherent_info (const GUID_t& writer_id,
     600             :                             const GUID_t& publisher_id);
     601             : #endif
     602             : 
     603             :   // Called upon subscriber qos change to update the local cache.
     604             :   void set_subscriber_qos(const DDS::SubscriberQos & qos);
     605             : 
     606             :   // Set the instance related writers to reevaluate the owner.
     607             :   void reset_ownership (DDS::InstanceHandle_t instance);
     608             : 
     609             :   virtual RcHandle<EntityImpl> parent() const;
     610             : 
     611             :   void disable_transport();
     612             : 
     613             :   virtual void register_for_writer(const GUID_t& /*participant*/,
     614             :                                    const GUID_t& /*readerid*/,
     615             :                                    const GUID_t& /*writerid*/,
     616             :                                    const TransportLocatorSeq& /*locators*/,
     617             :                                    DiscoveryListener* /*listener*/);
     618             : 
     619             :   virtual void unregister_for_writer(const GUID_t& /*participant*/,
     620             :                                      const GUID_t& /*readerid*/,
     621             :                                      const GUID_t& /*writerid*/);
     622             : 
     623             :   virtual void update_locators(const GUID_t& remote,
     624             :                                const TransportLocatorSeq& locators);
     625             : 
     626             :   virtual DCPS::WeakRcHandle<ICE::Endpoint> get_ice_endpoint();
     627             : 
     628           0 :   GUID_t get_guid() const
     629             :   {
     630           0 :     ACE_Guard<ACE_Thread_Mutex> guard(subscription_id_mutex_);
     631           0 :     ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
     632           0 :     while (!has_subscription_id_ && !get_deleted()) {
     633           0 :       subscription_id_condition_.wait(thread_status_manager);
     634             :     }
     635           0 :     return subscription_id_;
     636           0 :   }
     637             : 
     638             :   void return_handle(DDS::InstanceHandle_t handle);
     639             : 
     640           0 :   const ValueDispatcher* get_value_dispatcher() const
     641             :   {
     642           0 :     TopicDescriptionPtr<TopicImpl> temp(topic_servant_);
     643           0 :     return temp ? dynamic_cast<const ValueDispatcher*>(temp->get_type_support()) : 0;
     644           0 :   }
     645             : 
     646             : protected:
     647             : 
     648             :   // Update max flag if the spec ever changes
     649             :   static const CORBA::ULong MAX_SAMPLE_STATE_FLAG = DDS::NOT_READ_SAMPLE_STATE;
     650             :   static const CORBA::ULong MAX_SAMPLE_STATE_MASK = (MAX_SAMPLE_STATE_FLAG << 1) - 1;
     651             :   static const CORBA::ULong MAX_SAMPLE_STATE_BITS = 2u;
     652             : 
     653             :   // Update max flag if the spec ever changes
     654             :   static const CORBA::ULong MAX_VIEW_STATE_FLAG = DDS::NOT_NEW_VIEW_STATE;
     655             :   static const CORBA::ULong MAX_VIEW_STATE_MASK = (MAX_VIEW_STATE_FLAG << 1) - 1;
     656             :   static const CORBA::ULong MAX_VIEW_STATE_BITS = 2u;
     657             : 
     658             :   // Update max flag if the spec ever changes
     659             :   static const CORBA::ULong MAX_INSTANCE_STATE_FLAG = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE;
     660             :   static const CORBA::ULong MAX_INSTANCE_STATE_MASK = (MAX_INSTANCE_STATE_FLAG << 1) - 1;
     661             :   static const CORBA::ULong MAX_INSTANCE_STATE_BITS = 3u;
     662             : 
     663             :   // These may need to be updated if the spec ever changes
     664             :   static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT = MAX_INSTANCE_STATE_BITS;
     665             :   static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT = COMBINED_VIEW_STATE_SHIFT + MAX_VIEW_STATE_BITS;
     666             : 
     667             :   typedef OPENDDS_SET(DDS::InstanceHandle_t) HandleSet;
     668             :   typedef OPENDDS_MAP(CORBA::ULong, HandleSet) LookupMap;
     669             : 
     670           0 :   static CORBA::ULong to_combined_states(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
     671             :   {
     672           0 :     sample_states &= MAX_SAMPLE_STATE_MASK;
     673           0 :     view_states &= MAX_VIEW_STATE_MASK;
     674           0 :     instance_states &= MAX_INSTANCE_STATE_MASK;
     675           0 :     if (!(sample_states && view_states && instance_states)) {
     676             :       // catch-all for "bogus" lookups
     677           0 :       return 0;
     678             :     }
     679           0 :     return (sample_states << COMBINED_SAMPLE_STATE_SHIFT) | (view_states << COMBINED_VIEW_STATE_SHIFT) | instance_states;
     680             :   }
     681             : 
     682           0 :   static void split_combined_states(CORBA::ULong combined, CORBA::ULong& sample_states, CORBA::ULong& view_states, CORBA::ULong& instance_states)
     683             :   {
     684           0 :     sample_states = (combined >> COMBINED_SAMPLE_STATE_SHIFT) & MAX_SAMPLE_STATE_MASK;
     685           0 :     view_states = (combined >> COMBINED_VIEW_STATE_SHIFT) & MAX_VIEW_STATE_MASK;
     686           0 :     instance_states = combined & MAX_INSTANCE_STATE_MASK;
     687           0 :   }
     688             : 
     689             :   void initialize_lookup_maps();
     690             :   void update_lookup_maps(const SubscriptionInstanceMapType::iterator& input);
     691             :   void remove_from_lookup_maps(DDS::InstanceHandle_t handle);
     692             :   const HandleSet& lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const;
     693             : 
     694             :   LookupMap combined_state_lookup_;
     695             : 
     696             :   // Perform cast to get extended version of listener (otherwise nil)
     697             :   DataReaderListener_ptr get_ext_listener();
     698             : 
     699             :   virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
     700             : 
     701             :   void prepare_to_delete();
     702             : 
     703             :   /// Setup deserialization options
     704             :   DDS::ReturnCode_t setup_deserialization();
     705             : 
     706             :   RcHandle<SubscriberImpl> get_subscriber_servant();
     707             : 
     708             :   void post_read_or_take();
     709             : 
     710             :   // type specific DataReader's part of enable.
     711             :   virtual DDS::ReturnCode_t enable_specific() = 0;
     712             : 
     713             :   void sample_info(DDS::SampleInfo & sample_info,
     714             :                    const ReceivedDataElement *ptr);
     715             : 
     716             :   CORBA::Long total_samples() const;
     717             : 
     718             :   void set_sample_lost_status(const DDS::SampleLostStatus& status);
     719             :   void set_sample_rejected_status(
     720             :     const DDS::SampleRejectedStatus& status);
     721             : 
     722             :   SubscriptionInstance_rch get_handle_instance(
     723             :     DDS::InstanceHandle_t handle);
     724             : 
     725             :   /**
     726             :   * Get an instance handle for a new instance.
     727             :   */
     728             :   DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t& key);
     729             : 
     730             :   virtual void purge_data(SubscriptionInstance_rch instance) = 0;
     731             : 
     732             :   virtual void release_instance_i(DDS::InstanceHandle_t handle) = 0;
     733             :   virtual void state_updated_i(DDS::InstanceHandle_t handle) = 0;
     734             : 
     735             :   bool has_readcondition(DDS::ReadCondition_ptr a_condition);
     736             : 
     737             :   /// @TODO: document why the instances_ container is mutable.
     738             :   mutable SubscriptionInstanceMapType instances_;
     739             : 
     740             :   /// Assume since the container is mutable(?!!?) it may need to use the
     741             :   /// lock while const.
     742             :   /// @TODO: remove the recursive nature of the instances_lock if not needed.
     743             :   mutable ACE_Recursive_Thread_Mutex instances_lock_;
     744             : 
     745             :   /// Check if the received data sample or instance should
     746             :   /// be filtered.
     747             :   /**
     748             :    * @note Filtering will only occur if the application
     749             :    *       configured a finite duration in the Topic's LIFESPAN
     750             :    *       QoS policy or DataReader's TIME_BASED_FILTER QoS policy.
     751             :    */
     752             :   bool filter_sample(const DataSampleHeader& header);
     753             : 
     754             :   bool ownership_filter_instance(const SubscriptionInstance_rch& instance,
     755             :                                  const GUID_t& pubid);
     756             :   bool time_based_filter_instance(const SubscriptionInstance_rch& instance,
     757             :                                   MonotonicTimePoint& now,
     758             :                                   MonotonicTimePoint& deadline);
     759             : 
     760             :   void accept_sample_processing(const SubscriptionInstance_rch& instance, const DataSampleHeader& header, bool is_new_instance);
     761             : 
     762             :   virtual void qos_change(const DDS::DataReaderQos& qos);
     763             : 
     764             :   /// Data has arrived into the cache, unblock waiting ReadConditions
     765             :   void notify_read_conditions();
     766             : 
     767             :   bool has_subscription_id_;
     768             :   mutable ACE_Thread_Mutex subscription_id_mutex_;
     769             :   mutable ConditionVariable<ACE_Thread_Mutex> subscription_id_condition_;
     770             : 
     771             :   unique_ptr<ReceivedDataAllocator> rd_allocator_;
     772             :   DDS::DataReaderQos qos_;
     773             :   DDS::DataReaderQos passed_qos_;
     774             : 
     775             :   // Status conditions accessible by subclasses.
     776             :   DDS::SampleRejectedStatus sample_rejected_status_;
     777             :   DDS::SampleLostStatus sample_lost_status_;
     778             : 
     779             :   /// lock protecting sample container as well as statuses.
     780             :   ACE_Recursive_Thread_Mutex sample_lock_;
     781             : 
     782             :   typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> Reverse_Lock_t;
     783             :   Reverse_Lock_t reverse_sample_lock_;
     784             : 
     785             :   WeakRcHandle<DomainParticipantImpl> participant_servant_;
     786             :   TopicDescriptionPtr<TopicImpl> topic_servant_;
     787             :   TypeSupportImpl* type_support_;
     788             :   GUID_t topic_id_;
     789             : 
     790             : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
     791             :   bool is_exclusive_ownership_;
     792             : 
     793             : #endif
     794             : 
     795             : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
     796             :   mutable ACE_Thread_Mutex content_filtered_topic_mutex_;
     797             :   TopicDescriptionPtr<ContentFilteredTopicImpl> content_filtered_topic_;
     798             : #endif
     799             : 
     800             : #ifndef OPENDDS_NO_MULTI_TOPIC
     801             :   TopicDescriptionPtr<MultiTopicImpl> multi_topic_;
     802             : #endif
     803             : 
     804             :   /// Is accessing to Group coherent changes ?
     805             :   bool coherent_;
     806             : 
     807             :   /// Ordered group samples.
     808             :   GroupRakeData group_coherent_ordered_data_;
     809             : 
     810             :   DDS::SubscriberQos subqos_;
     811             : 
     812             :   virtual void add_link(const DataLink_rch& link, const GUID_t& peer);
     813             : 
     814             : private:
     815           0 :   virtual void install_type_support(TypeSupportImpl*) {}
     816             : 
     817             :   virtual void set_instance_state_i(DDS::InstanceHandle_t instance,
     818             :                                     DDS::InstanceHandle_t publication_handle,
     819             :                                     DDS::InstanceStateKind state,
     820             :                                     const SystemTimePoint& timestamp,
     821             :                                     const GUID_t& guid) = 0;
     822             : 
     823             :   void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
     824             : 
     825             :   /// Lookup the instance handles by the publication repo ids
     826             :   void lookup_instance_handles(const WriterIdSeq& ids,
     827             :                                DDS::InstanceHandleSeq& hdls);
     828             : 
     829             :   void instances_liveliness_update(const GUID_t& writer,
     830             :                                    DDS::InstanceHandle_t publication_handle);
     831             : 
     832             : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
     833             :   bool verify_coherent_changes_completion(WriterInfo* writer);
     834             :   bool coherent_change_received(WriterInfo* writer);
     835             : #endif
     836             : 
     837           0 :   RcHandle<BitSubscriber> get_builtin_subscriber_proxy() const
     838             :   {
     839           0 :     RcHandle<DomainParticipantImpl> participant_servant = participant_servant_.lock();
     840           0 :     if (participant_servant) {
     841           0 :       return participant_servant->get_builtin_subscriber_proxy();
     842             :     }
     843             : 
     844           0 :     return RcHandle<BitSubscriber>();
     845           0 :   }
     846             : 
     847           0 :   DDS::DomainId_t domain_id() const { return this->domain_id_; }
     848             : 
     849           0 :   Priority get_priority_value(const AssociationData& data) const {
     850           0 :     return data.publication_transport_priority_;
     851             :   }
     852             : 
     853             : #if defined(OPENDDS_SECURITY)
     854             :   DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
     855             : #endif
     856             : 
     857             :   /// when done handling historic samples, resume
     858             :   void resume_sample_processing(const GUID_t& pub_id);
     859             : 
     860             :   /// collect samples received before END_HISTORIC_SAMPLES
     861             :   /// returns false if normal processing of this sample should be skipped
     862             :   bool check_historic(const ReceivedDataSample& sample);
     863             : 
     864             :   /// deliver samples that were held by check_historic()
     865             :   void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples);
     866             : 
     867             :   friend class InstanceState;
     868             :   friend class EndHistoricSamplesMissedSweeper;
     869             : 
     870             :   friend class ::DDS_TEST; //allows tests to get at private data
     871             : 
     872             :   DDS::TopicDescription_var    topic_desc_;
     873             :   ACE_Thread_Mutex             listener_mutex_;
     874             :   DDS::StatusMask              listener_mask_;
     875             :   DDS::DataReaderListener_var  listener_;
     876             :   DDS::DomainId_t              domain_id_;
     877             :   GUID_t                       dp_id_;
     878             :   // subscriber_servant_ has to be a weak pinter because it may be used from the
     879             :   // transport reactor thread and that thread doesn't have the owenership of the
     880             :   // the subscriber_servant_ object.
     881             :   WeakRcHandle<SubscriberImpl>              subscriber_servant_;
     882             :   RcHandle<EndHistoricSamplesMissedSweeper> end_historic_sweeper_;
     883             : 
     884             :   CORBA::Long                  depth_;
     885             :   size_t                       n_chunks_;
     886             : 
     887             :   //Used to protect access to id_to_handle_map_
     888             :   ACE_Recursive_Thread_Mutex   publication_handle_lock_;
     889             : 
     890             :   typedef OPENDDS_MAP_CMP(GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
     891             :   RepoIdToHandleMap            publication_id_to_handle_map_;
     892             : 
     893             :   // Status conditions.
     894             :   DDS::LivelinessChangedStatus         liveliness_changed_status_;
     895             :   DDS::RequestedDeadlineMissedStatus   requested_deadline_missed_status_;
     896             :   DDS::RequestedIncompatibleQosStatus  requested_incompatible_qos_status_;
     897             :   DDS::SubscriptionMatchedStatus       subscription_match_status_;
     898             : 
     899             :   // OpenDDS extended status.  This is only available via listener.
     900             :   BudgetExceededStatus                 budget_exceeded_status_;
     901             : 
     902             :   /**
     903             :    * @todo The subscription_lost_status_ and
     904             :    *       subscription_reconnecting_status_ are left here for
     905             :    *       future use when we add get_subscription_lost_status()
     906             :    *       and get_subscription_reconnecting_status() methods.
     907             :    */
     908             :   // Statistics of the lost subscriptions due to lost connection.
     909             :   SubscriptionLostStatus               subscription_lost_status_;
     910             :   // Statistics of the subscriptions that are associated with a
     911             :   // reconnecting datalink.
     912             :   // SubscriptionReconnectingStatus      subscription_reconnecting_status_;
     913             : 
     914             :   /// The orb's reactor to be used to register the liveliness
     915             :   /// timer.
     916             :   ACE_Reactor_Timer_Interface* reactor_;
     917             : 
     918             :   class LivelinessTimer : public ReactorInterceptor {
     919             :   public:
     920           0 :     LivelinessTimer(ACE_Reactor* reactor,
     921             :                     ACE_thread_t owner,
     922             :                     DataReaderImpl* data_reader)
     923           0 :       : ReactorInterceptor(reactor, owner)
     924           0 :       , data_reader_(*data_reader)
     925           0 :       , liveliness_timer_id_(-1)
     926           0 :     { }
     927             : 
     928             :     void check_liveliness();
     929             : 
     930             :     void cancel_timer()
     931             :     {
     932             :       execute_or_enqueue(make_rch<CancelCommand>(this));
     933             :     }
     934             : 
     935           0 :     virtual bool reactor_is_shut_down() const
     936             :     {
     937           0 :       return TheServiceParticipant->is_shut_down();
     938             :     }
     939             : 
     940             :   private:
     941           0 :     ~LivelinessTimer() { }
     942             : 
     943             :     WeakRcHandle<DataReaderImpl> data_reader_;
     944             : 
     945             :     /// liveliness timer id; -1 if no timer is set
     946             :     long liveliness_timer_id_;
     947             :     void check_liveliness_i(bool cancel, const MonotonicTimePoint& now);
     948             : 
     949             :     int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
     950             : 
     951             :     class CommandBase : public Command {
     952             :     public:
     953           0 :       CommandBase(LivelinessTimer* timer)
     954           0 :         : timer_(timer)
     955           0 :       { }
     956             : 
     957             :     protected:
     958             :       LivelinessTimer* timer_;
     959             :     };
     960             : 
     961             :     class CheckLivelinessCommand : public CommandBase {
     962             :     public:
     963           0 :       CheckLivelinessCommand(LivelinessTimer* timer)
     964           0 :         : CommandBase(timer)
     965           0 :       { }
     966           0 :       virtual void execute()
     967             :       {
     968           0 :         timer_->check_liveliness_i(true, MonotonicTimePoint::now());
     969           0 :       }
     970             :     };
     971             : 
     972             :     class CancelCommand : public CommandBase {
     973             :     public:
     974             :       CancelCommand(LivelinessTimer* timer)
     975             :         : CommandBase(timer)
     976             :       { }
     977             :       virtual void execute()
     978             :       {
     979             :         if (timer_->liveliness_timer_id_ != -1) {
     980             :           timer_->reactor()->cancel_timer(timer_);
     981             :         }
     982             :       }
     983             :     };
     984             :   };
     985             :   RcHandle<LivelinessTimer> liveliness_timer_;
     986             : 
     987             :   CORBA::Long last_deadline_missed_total_count_;
     988             :   /// Watchdog responsible for reporting missed offered
     989             :   /// deadlines.
     990             :   TimeDuration deadline_period_;
     991             :   typedef OPENDDS_MULTIMAP(MonotonicTimePoint, SubscriptionInstance_rch) DeadlineQueue;
     992             :   DeadlineQueue deadline_queue_;
     993             :   bool deadline_queue_enabled_;
     994             :   typedef PmfSporadicTask<DataReaderImpl> DRISporadicTask;
     995             :   RcHandle<DRISporadicTask> deadline_task_;
     996             : 
     997             :   void schedule_deadline(SubscriptionInstance_rch instance,
     998             :                          bool timer_called);
     999             :   void reset_deadline_period(const TimeDuration& deadline_period);
    1000             :   void reschedule_deadline(SubscriptionInstance_rch instance,
    1001             :                            const MonotonicTimePoint& now);
    1002             :   void cancel_deadline(SubscriptionInstance_rch instance);
    1003             :   void cancel_all_deadlines();
    1004             :   void deadline_task(const MonotonicTimePoint& now);
    1005             :   void process_deadline(SubscriptionInstance_rch instance,
    1006             :                         const MonotonicTimePoint& now,
    1007             :                         bool timer_called);
    1008             : 
    1009             :   /// Flag indicates that this datareader is a builtin topic
    1010             :   /// datareader.
    1011             :   bool is_bit_;
    1012             : 
    1013             :   bool always_get_history_;
    1014             : 
    1015             :   /// Flag indicating status of statistics gathering.
    1016             :   AtomicBool statistics_enabled_;
    1017             : 
    1018             :   /// publications writing to this reader.
    1019             :   typedef OPENDDS_MAP_CMP(GUID_t, WriterInfo_rch,
    1020             :                    GUID_tKeyLessThan) WriterMapType;
    1021             : 
    1022             :   WriterMapType writers_;
    1023             : 
    1024             :   /// RW lock for reading/writing publications.
    1025             :   ACE_RW_Thread_Mutex writers_lock_;
    1026             : 
    1027             :   /// Statistics for this reader, collected for each writer.
    1028             :   StatsMapType statistics_;
    1029             :   ACE_Recursive_Thread_Mutex statistics_lock_;
    1030             : 
    1031             :   /// Bound (or initial reservation) of raw latency buffer.
    1032             :   unsigned int raw_latency_buffer_size_;
    1033             : 
    1034             :   /// Type of raw latency data buffer.
    1035             :   DataCollector<double>::OnFull raw_latency_buffer_type_;
    1036             : 
    1037             :   typedef VarLess<DDS::ReadCondition> RCCompLess;
    1038             :   typedef OPENDDS_SET_CMP(DDS::ReadCondition_var,  RCCompLess) ReadConditionSet;
    1039             :   ReadConditionSet read_conditions_;
    1040             : 
    1041             :   /// Monitor object for this entity
    1042             :   unique_ptr<Monitor> monitor_;
    1043             : 
    1044             :   /// Periodic Monitor object for this entity
    1045             :   unique_ptr<Monitor>  periodic_monitor_;
    1046             : 
    1047             :   bool transport_disabled_;
    1048             : 
    1049             : protected:
    1050             :   typedef OPENDDS_SET(Encoding::Kind) EncodingKinds;
    1051             :   EncodingKinds decoding_modes_;
    1052             : 
    1053             : public:
    1054             :   class OpenDDS_Dcps_Export OnDataOnReaders : public Job {
    1055             :   public:
    1056           0 :     OnDataOnReaders(WeakRcHandle<SubscriberImpl> subscriber,
    1057             :                     DDS::SubscriberListener_var sub_listener,
    1058             :                     WeakRcHandle<DataReaderImpl> data_reader,
    1059             :                     bool call,
    1060             :                     bool set_reader_status)
    1061           0 :       : subscriber_(subscriber)
    1062           0 :       , sub_listener_(sub_listener)
    1063           0 :       , data_reader_(data_reader)
    1064           0 :       , call_(call)
    1065           0 :       , set_reader_status_(set_reader_status)
    1066           0 :     {}
    1067             : 
    1068             :   private:
    1069             :     virtual void execute();
    1070             : 
    1071             :     WeakRcHandle<SubscriberImpl> subscriber_;
    1072             :     DDS::SubscriberListener_var sub_listener_;
    1073             :     WeakRcHandle<DataReaderImpl> data_reader_;
    1074             :     const bool call_;
    1075             :     const bool set_reader_status_;
    1076             :   };
    1077             : 
    1078             :   class OpenDDS_Dcps_Export OnDataAvailable : public Job {
    1079             :   public:
    1080           0 :     OnDataAvailable(DDS::DataReaderListener_var listener,
    1081             :                     WeakRcHandle<DataReaderImpl> data_reader,
    1082             :                     bool call,
    1083             :                     bool set_reader_status,
    1084             :                     bool set_subscriber_status)
    1085           0 :       : listener_(listener)
    1086           0 :       , data_reader_(data_reader)
    1087           0 :       , call_(call)
    1088           0 :       , set_reader_status_(set_reader_status)
    1089           0 :       , set_subscriber_status_(set_subscriber_status)
    1090           0 :     {}
    1091             : 
    1092             :   private:
    1093             :     virtual void execute();
    1094             : 
    1095             :     DDS::DataReaderListener_var listener_;
    1096             :     WeakRcHandle<DataReaderImpl> data_reader_;
    1097             :     const bool call_;
    1098             :     const bool set_reader_status_;
    1099             :     const bool set_subscriber_status_;
    1100             :   };
    1101             : 
    1102             : protected:
    1103             : #ifdef OPENDDS_SECURITY
    1104             :   Security::SecurityConfig_rch security_config_;
    1105             :   DDS::DynamicType_var dynamic_type_;
    1106             : #endif
    1107             : 
    1108             :   TransportMessageBlockAllocator mb_alloc_;
    1109             : };
    1110             : 
    1111             : typedef RcHandle<DataReaderImpl> DataReaderImpl_rch;
    1112             : 
    1113             : } // namespace DCPS
    1114             : } // namespace OpenDDS
    1115             : 
    1116             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
    1117             : 
    1118             : #if defined (__ACE_INLINE__)
    1119             : # include "DataReaderImpl.inl"
    1120             : #endif  /* __ACE_INLINE__ */
    1121             : 
    1122             : #endif /* OPENDDS_DCPS_DATAREADER_H  */

Generated by: LCOV version 1.16