DataReaderImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_DATAREADER_H
00009 #define OPENDDS_DCPS_DATAREADER_H
00010 
00011 #include "dcps_export.h"
00012 #include "EntityImpl.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsSubscriptionExtC.h"
00015 #include "dds/DdsDcpsDomainC.h"
00016 #include "dds/DdsDcpsTopicC.h"
00017 #include "Definitions.h"
00018 #include "dds/DCPS/DataReaderCallbacks.h"
00019 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00020 #include "dds/DCPS/transport/framework/TransportReceiveListener.h"
00021 #include "dds/DCPS/transport/framework/TransportClient.h"
00022 #include "DisjointSequence.h"
00023 #include "SubscriptionInstance.h"
00024 #include "InstanceState.h"
00025 #include "Cached_Allocator_With_Overflow_T.h"
00026 #include "ZeroCopyInfoSeq_T.h"
00027 #include "Stats_T.h"
00028 #include "OwnershipManager.h"
00029 #include "ContentFilteredTopicImpl.h"
00030 #include "GroupRakeData.h"
00031 #include "CoherentChangeControl.h"
00032 #include "AssociationData.h"
00033 #include "dds/DdsDcpsInfrastructureC.h"
00034 #include "RcHandle_T.h"
00035 #include "RcObject_T.h"
00036 #include "WriterInfo.h"
00037 #include "ReactorInterceptor.h"
00038 #include "Service_Participant.h"
00039 #include "PoolAllocator.h"
00040 #include "RemoveAssociationSweeper.h"
00041 
00042 #include "ace/String_Base.h"
00043 #include "ace/Reverse_Lock_T.h"
00044 #include "ace/Atomic_Op.h"
00045 #include "ace/Reactor.h"
00046 
00047 #include "dds/DCPS/PoolAllocator.h"
00048 #include <memory>
00049 
00050 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00051 #pragma once
00052 #endif /* ACE_LACKS_PRAGMA_ONCE */
00053 
00054 class DDS_TEST;
00055 
00056 namespace OpenDDS {
00057 namespace DCPS {
00058 
00059 class SubscriberImpl;
00060 class DomainParticipantImpl;
00061 class SubscriptionInstance;
00062 class TopicImpl;
00063 class TopicDescriptionImpl;
00064 class RequestedDeadlineWatchdog;
00065 class Monitor;
00066 class DataReaderImpl;
00067 class FilterEvaluator;
00068 
00069 typedef Cached_Allocator_With_Overflow<OpenDDS::DCPS::ReceivedDataElement, ACE_Null_Mutex>
00070 ReceivedDataAllocator;
00071 
00072 enum MarshalingType {
00073   FULL_MARSHALING,
00074   KEY_ONLY_MARSHALING
00075 };
00076 
00077 /// Elements stored for managing statistical data.
00078 class OpenDDS_Dcps_Export WriterStats {
00079 public:
00080   /// Default constructor.
00081   WriterStats(
00082     int amount = 0,
00083     DataCollector<double>::OnFull type = DataCollector<double>::KeepOldest);
00084 
00085   /// Add a datum to the latency statistics.
00086   void add_stat(const ACE_Time_Value& delay);
00087 
00088   /// Extract the current latency statistics for this writer.
00089   LatencyStatistics get_stats() const;
00090 
00091   /// Reset the latency statistics for this writer.
00092   void reset_stats();
00093 
00094 #ifndef OPENDDS_SAFETY_PROFILE
00095   /// Dump any raw data.
00096   std::ostream& raw_data(std::ostream& str) const;
00097 #endif
00098 
00099 private:
00100   /// Latency statistics for the DataWriter to this DataReader.
00101   Stats<double> stats_;
00102 };
00103 
00104 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00105 
00106 class OpenDDS_Dcps_Export AbstractSamples
00107 {
00108 public:
00109   virtual ~AbstractSamples(){}
00110   virtual void reserve(CORBA::ULong size)=0;
00111   virtual void push_back(const DDS::SampleInfo& info, const void* sample)=0;
00112 };
00113 
00114 #endif
00115 
00116 // Class to cleanup in case EndHistoricSamples is missed
00117 class EndHistoricSamplesMissedSweeper : public ReactorInterceptor {
00118 public:
00119   EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
00120                                   ACE_thread_t owner,
00121                                   DataReaderImpl* reader);
00122 
00123   void schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00124   void cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00125 
00126   // Arg will be PublicationId
00127   int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00128 
00129   virtual bool reactor_is_shut_down() const
00130   {
00131     return TheServiceParticipant->is_shut_down();
00132   }
00133 
00134 private:
00135   ~EndHistoricSamplesMissedSweeper();
00136 
00137   DataReaderImpl* reader_;
00138 
00139   class CommandBase : public Command {
00140   public:
00141     CommandBase(EndHistoricSamplesMissedSweeper* sweeper,
00142                 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00143       : sweeper_ (sweeper)
00144       , info_(info)
00145     { }
00146 
00147   protected:
00148     EndHistoricSamplesMissedSweeper* sweeper_;
00149     OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo> info_;
00150   };
00151 
00152   class ScheduleCommand : public CommandBase {
00153   public:
00154     ScheduleCommand(EndHistoricSamplesMissedSweeper* sweeper,
00155                     OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00156       : CommandBase(sweeper, info)
00157     { }
00158     virtual void execute();
00159   };
00160 
00161   class CancelCommand : public CommandBase {
00162   public:
00163     CancelCommand(EndHistoricSamplesMissedSweeper* sweeper,
00164                   OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00165       : CommandBase(sweeper, info)
00166     { }
00167     virtual void execute();
00168   };
00169 };
00170 
00171 /**
00172 * @class DataReaderImpl
00173 *
00174 * @brief Implements the DDS::DataReader interface.
00175 *
00176 * See the DDS specification, OMG formal/04-12-02, for a description of
00177 * the interface this class is implementing.
00178 *
00179 * This class must be inherited by the type-specific datareader which
00180 * is specific to the data-type associated with the topic.
00181 *
00182 */
00183 class OpenDDS_Dcps_Export DataReaderImpl
00184   : public virtual LocalObject<DataReaderEx>,
00185     public virtual DataReaderCallbacks,
00186     public virtual EntityImpl,
00187     public virtual TransportClient,
00188     public virtual TransportReceiveListener,
00189     private WriterInfoListener {
00190 public:
00191   friend class RequestedDeadlineWatchdog;
00192   friend class QueryConditionImpl;
00193   friend class SubscriberImpl;
00194 
00195   typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance*) SubscriptionInstanceMapType;
00196 
00197   /// Type of collection of statistics for writers to this reader.
00198   typedef OPENDDS_MAP_CMP(PublicationId, WriterStats, GUID_tKeyLessThan) StatsMapType;
00199 
00200   //Constructor
00201   DataReaderImpl();
00202 
00203   //Destructor
00204   virtual ~DataReaderImpl();
00205 
00206   virtual DDS::InstanceHandle_t get_instance_handle();
00207 
00208   virtual void add_association(const RepoId& yourId,
00209                                const WriterAssociation& writer,
00210                                bool active);
00211 
00212   virtual void transport_assoc_done(int flags, const RepoId& remote_id);
00213 
00214   virtual void association_complete(const RepoId& remote_id);
00215 
00216   virtual void remove_associations(const WriterIdSeq& writers, bool callback);
00217 
00218   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00219 
00220   virtual void inconsistent_topic();
00221 
00222   virtual void signal_liveliness(const RepoId& remote_participant);
00223 
00224   /**
00225   * This is used to retrieve the listener for a certain status change.
00226   * If this datareader has a registered listener and the status kind
00227   * is in the listener mask then the listener is returned.
00228   * Otherwise, the query for the listener is propagated up to the
00229   * factory/subscriber.
00230   */
00231   DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind);
00232 
00233   /// tell instances when a DataWriter transitions to being alive
00234   /// The writer state is inout parameter, it has to be set ALIVE before
00235   /// handle_timeout is called since some subroutine use the state.
00236   void writer_became_alive(WriterInfo& info,
00237                            const ACE_Time_Value& when);
00238 
00239   /// tell instances when a DataWriter transitions to DEAD
00240   /// The writer state is inout parameter, the state is set to DEAD
00241   /// when it returns.
00242   void writer_became_dead(WriterInfo& info,
00243                           const ACE_Time_Value& when);
00244 
00245   /// tell instance when a DataWriter is removed.
00246   /// The liveliness status need update.
00247   void writer_removed(WriterInfo& info);
00248 
00249   /**
00250    * cleanup the DataWriter.
00251    */
00252   void cleanup();
00253 
00254   void init(
00255     TopicDescriptionImpl* a_topic_desc,
00256     const DDS::DataReaderQos &  qos,
00257     DDS::DataReaderListener_ptr a_listener,
00258     const DDS::StatusMask &     mask,
00259     DomainParticipantImpl*        participant,
00260     SubscriberImpl*               subscriber,
00261     DDS::DataReader_ptr         dr_objref);
00262 
00263   virtual DDS::ReadCondition_ptr create_readcondition(
00264     DDS::SampleStateMask sample_states,
00265     DDS::ViewStateMask view_states,
00266     DDS::InstanceStateMask instance_states);
00267 
00268 #ifndef OPENDDS_NO_QUERY_CONDITION
00269   virtual DDS::QueryCondition_ptr create_querycondition(
00270     DDS::SampleStateMask sample_states,
00271     DDS::ViewStateMask view_states,
00272     DDS::InstanceStateMask instance_states,
00273     const char * query_expression,
00274     const DDS::StringSeq & query_parameters);
00275 #endif
00276 
00277   virtual DDS::ReturnCode_t delete_readcondition(
00278     DDS::ReadCondition_ptr a_condition);
00279 
00280   virtual DDS::ReturnCode_t delete_contained_entities();
00281 
00282   virtual DDS::ReturnCode_t set_qos(
00283     const DDS::DataReaderQos & qos);
00284 
00285   virtual DDS::ReturnCode_t get_qos(
00286     DDS::DataReaderQos & qos);
00287 
00288   virtual DDS::ReturnCode_t set_listener(
00289     DDS::DataReaderListener_ptr a_listener,
00290     DDS::StatusMask mask);
00291 
00292   virtual DDS::DataReaderListener_ptr get_listener();
00293 
00294   virtual DDS::TopicDescription_ptr get_topicdescription();
00295 
00296   virtual DDS::Subscriber_ptr get_subscriber();
00297 
00298   virtual DDS::ReturnCode_t get_sample_rejected_status(
00299     DDS::SampleRejectedStatus & status);
00300 
00301   virtual DDS::ReturnCode_t get_liveliness_changed_status(
00302     DDS::LivelinessChangedStatus & status);
00303 
00304   virtual DDS::ReturnCode_t get_requested_deadline_missed_status(
00305     DDS::RequestedDeadlineMissedStatus & status);
00306 
00307   virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(
00308     DDS::RequestedIncompatibleQosStatus & status);
00309 
00310   virtual DDS::ReturnCode_t get_subscription_matched_status(
00311     DDS::SubscriptionMatchedStatus & status);
00312 
00313   virtual DDS::ReturnCode_t get_sample_lost_status(
00314     DDS::SampleLostStatus & status);
00315 
00316   virtual DDS::ReturnCode_t wait_for_historical_data(
00317     const DDS::Duration_t & max_wait);
00318 
00319   virtual DDS::ReturnCode_t get_matched_publications(
00320     DDS::InstanceHandleSeq & publication_handles);
00321 
00322 #if !defined (DDS_HAS_MINIMUM_BIT)
00323   virtual DDS::ReturnCode_t get_matched_publication_data(
00324     DDS::PublicationBuiltinTopicData & publication_data,
00325     DDS::InstanceHandle_t publication_handle);
00326 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00327 
00328   virtual DDS::ReturnCode_t enable();
00329 
00330 #ifndef OPENDDS_SAFETY_PROFILE
00331   virtual void get_latency_stats(
00332     OpenDDS::DCPS::LatencyStatisticsSeq & stats);
00333 #endif
00334 
00335   virtual void reset_latency_stats();
00336 
00337   virtual CORBA::Boolean statistics_enabled();
00338 
00339   virtual void statistics_enabled(
00340     CORBA::Boolean statistics_enabled);
00341 
00342   /// @name Raw Latency Statistics Interfaces
00343   /// @{
00344 
00345   /// Expose the statistics container.
00346   const StatsMapType& raw_latency_statistics() const;
00347 
00348   /// Configure the size of the raw data collection buffer.
00349   unsigned int& raw_latency_buffer_size();
00350 
00351   /// Configure the type of the raw data collection buffer.
00352   DataCollector<double>::OnFull& raw_latency_buffer_type();
00353 
00354   /// @}
00355 
00356   /// update liveliness info for this writer.
00357   void writer_activity(const DataSampleHeader& header);
00358 
00359   /// process a message that has been received - could be control or a data sample.
00360   virtual void data_received(const ReceivedDataSample& sample);
00361 
00362   virtual bool check_transport_qos(const TransportInst& inst);
00363 
00364   RepoId get_subscription_id() const;
00365 
00366   DDS::DataReader_ptr get_dr_obj_ref();
00367 
00368   char *get_topic_name() const;
00369 
00370   bool have_sample_states(DDS::SampleStateMask sample_states) const;
00371   bool have_view_states(DDS::ViewStateMask view_states) const;
00372   bool have_instance_states(DDS::InstanceStateMask instance_states) const;
00373   bool contains_sample(DDS::SampleStateMask sample_states,
00374                        DDS::ViewStateMask view_states,
00375                        DDS::InstanceStateMask instance_states);
00376 
00377 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00378   virtual bool contains_sample_filtered(DDS::SampleStateMask sample_states,
00379                                         DDS::ViewStateMask view_states,
00380                                         DDS::InstanceStateMask instance_states,
00381                                         const FilterEvaluator& evaluator,
00382                                         const DDS::StringSeq& params) = 0;
00383 #endif
00384 
00385   virtual void dds_demarshal(const ReceivedDataSample& sample,
00386                              SubscriptionInstance*& instance,
00387                              bool & is_new_instance,
00388                              bool & filtered,
00389                              MarshalingType marshaling_type)= 0;
00390 
00391   virtual void dispose_unregister(const ReceivedDataSample& sample,
00392                                   SubscriptionInstance*& instance);
00393 
00394   void process_latency(const ReceivedDataSample& sample);
00395   void notify_latency(PublicationId writer);
00396 
00397   CORBA::Long get_depth() const {
00398     return depth_;
00399   }
00400   size_t get_n_chunks() const {
00401     return n_chunks_;
00402   }
00403 
00404   void liveliness_lost();
00405 
00406   void remove_all_associations();
00407 
00408   void notify_subscription_disconnected(const WriterIdSeq& pubids);
00409   void notify_subscription_reconnected(const WriterIdSeq& pubids);
00410   void notify_subscription_lost(const WriterIdSeq& pubids);
00411   virtual void notify_connection_deleted(const RepoId& peerId);
00412   void notify_liveliness_change();
00413 
00414   bool is_bit() const;
00415 
00416   /** This method provides virtual access to type specific code
00417    * that is used when loans are automatically returned.
00418    * The destructor of the sequence supporing zero-copy read calls this
00419    * method on the datareader that provided the loan.
00420    *
00421    * @param seq - The sequence of loaned values.
00422    *
00423    * @returns Always RETCODE_OK.
00424    *
00425    * thows NONE.
00426    */
00427   virtual DDS::ReturnCode_t auto_return_loan(void* seq) = 0;
00428 
00429   /** This method is used for a precondition check of delete_datareader.
00430    *
00431    * @returns the number of outstanding zero-copy samples loaned out.
00432    */
00433   virtual int num_zero_copies();
00434 
00435   virtual void dec_ref_data_element(ReceivedDataElement* r) = 0;
00436 
00437   /// Release the instance with the handle.
00438   void release_instance(DDS::InstanceHandle_t handle);
00439 
00440   // Reset time interval for each instance.
00441   void reschedule_deadline();
00442 
00443   ACE_Reactor_Timer_Interface* get_reactor();
00444 
00445   RepoId get_topic_id();
00446   RepoId get_dp_id();
00447 
00448   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00449   void get_instance_handles(InstanceHandleVec& instance_handles);
00450 
00451   typedef std::pair<PublicationId, WriterInfo::WriterState> WriterStatePair;
00452   typedef OPENDDS_VECTOR(WriterStatePair) WriterStatePairVec;
00453   void get_writer_states(WriterStatePairVec& writer_states);
00454 
00455 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00456   void update_ownership_strength (const PublicationId& pub_id,
00457                                   const CORBA::Long& ownership_strength);
00458   OwnershipManager* ownership_manager() const { return owner_manager_; }
00459 #endif
00460 
00461   virtual void delete_instance_map (void* map) = 0;
00462   virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
00463                                OpenDDS::DCPS::SubscriptionInstance*& instance) = 0;
00464 
00465 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00466 
00467 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00468 
00469   void enable_filtering(ContentFilteredTopicImpl* cft);
00470 
00471   DDS::ContentFilteredTopic_ptr get_cf_topic() const;
00472 
00473 #endif
00474 
00475   void update_subscription_params(const DDS::StringSeq& params) const;
00476 
00477   typedef OPENDDS_VECTOR(void*) GenericSeq;
00478 
00479   struct GenericBundle {
00480     GenericSeq samples_;
00481     DDS::SampleInfoSeq info_;
00482   };
00483 
00484   virtual DDS::ReturnCode_t read_generic(GenericBundle& gen,
00485     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00486     DDS::InstanceStateMask instance_states, bool adjust_ref_count ) = 0;
00487 
00488   virtual DDS::ReturnCode_t take(
00489     AbstractSamples& samples,
00490     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00491     DDS::InstanceStateMask instance_states)=0;
00492 
00493   virtual DDS::InstanceHandle_t lookup_instance_generic(const void* data) = 0;
00494 
00495   virtual DDS::ReturnCode_t read_instance_generic(void*& data,
00496     DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
00497     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00498     DDS::InstanceStateMask instance_states) = 0;
00499 
00500   virtual DDS::ReturnCode_t read_next_instance_generic(void*& data,
00501     DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
00502     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00503     DDS::InstanceStateMask instance_states) = 0;
00504 
00505   virtual void set_instance_state(DDS::InstanceHandle_t instance,
00506                                   DDS::InstanceStateKind state) = 0;
00507 
00508 #endif
00509 
00510 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00511   void begin_access();
00512   void end_access();
00513   void get_ordered_data(GroupRakeData& data,
00514                         DDS::SampleStateMask sample_states,
00515                         DDS::ViewStateMask view_states,
00516                         DDS::InstanceStateMask instance_states);
00517 
00518   void accept_coherent (PublicationId& writer_id,
00519                         RepoId& publisher_id);
00520   void reject_coherent (PublicationId& writer_id,
00521                         RepoId& publisher_id);
00522   void coherent_change_received (RepoId publisher_id, Coherent_State& result);
00523 
00524   void coherent_changes_completed (DataReaderImpl* reader);
00525 
00526   void reset_coherent_info (const PublicationId& writer_id,
00527                             const RepoId& publisher_id);
00528 #endif
00529 
00530   // Called upon subscriber qos change to update the local cache.
00531   void set_subscriber_qos(const DDS::SubscriberQos & qos);
00532 
00533   // Set the instance related writers to reevaluate the owner.
00534   void reset_ownership (::DDS::InstanceHandle_t instance);
00535 
00536   virtual EntityImpl* parent() const;
00537 
00538   void disable_transport();
00539 
00540   virtual void register_for_writer(const RepoId& /*participant*/,
00541                                    const RepoId& /*readerid*/,
00542                                    const RepoId& /*writerid*/,
00543                                    const TransportLocatorSeq& /*locators*/,
00544                                    DiscoveryListener* /*listener*/);
00545 
00546   virtual void unregister_for_writer(const RepoId& /*participant*/,
00547                                      const RepoId& /*readerid*/,
00548                                      const RepoId& /*writerid*/);
00549 
00550 protected:
00551   virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
00552   void remove_or_reschedule(const PublicationId& pub_id);
00553 
00554   void prepare_to_delete();
00555 
00556   SubscriberImpl* get_subscriber_servant();
00557 
00558   void post_read_or_take();
00559 
00560   // type specific DataReader's part of enable.
00561   virtual DDS::ReturnCode_t enable_specific() = 0;
00562 
00563   void sample_info(DDS::SampleInfo & sample_info,
00564                    const ReceivedDataElement *ptr);
00565 
00566   CORBA::Long total_samples() const;
00567 
00568   void set_sample_lost_status(const DDS::SampleLostStatus& status);
00569   void set_sample_rejected_status(
00570     const DDS::SampleRejectedStatus& status);
00571 
00572 //remove document this!
00573   SubscriptionInstance* get_handle_instance(
00574     DDS::InstanceHandle_t handle);
00575 
00576   /**
00577   * Get an instance handle for a new instance.
00578   */
00579   DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t& key);
00580 
00581   virtual void purge_data(SubscriptionInstance* instance) = 0;
00582 
00583   virtual void release_instance_i(DDS::InstanceHandle_t handle) = 0;
00584 
00585   bool has_readcondition(DDS::ReadCondition_ptr a_condition);
00586 
00587   /// @TODO: document why the instances_ container is mutable.
00588   mutable SubscriptionInstanceMapType instances_;
00589 
00590   /// Assume since the container is mutable(?!!?) it may need to use the
00591   /// lock while const.
00592   /// @TODO: remove the recursive nature of the instances_lock if not needed.
00593   mutable ACE_Recursive_Thread_Mutex instances_lock_;
00594 
00595   /// Check if the received data sample or instance should
00596   /// be filtered.
00597   /**
00598    * @note Filtering will only occur if the application
00599    *       configured a finite duration in the Topic's LIFESPAN
00600    *       QoS policy or DataReader's TIME_BASED_FILTER QoS policy.
00601    */
00602   bool filter_sample(const DataSampleHeader& header);
00603   bool filter_instance(SubscriptionInstance* instance,
00604                        const PublicationId& pubid);
00605 
00606   /// Data has arrived into the cache, unblock waiting ReadConditions
00607   void notify_read_conditions();
00608 
00609   ReceivedDataAllocator        *rd_allocator_;
00610   DDS::DataReaderQos           qos_;
00611 
00612   // Status conditions accessible by subclasses.
00613   DDS::SampleRejectedStatus sample_rejected_status_;
00614   DDS::SampleLostStatus sample_lost_status_;
00615 
00616   /// lock protecting sample container as well as statuses.
00617   ACE_Recursive_Thread_Mutex   sample_lock_;
00618 
00619   typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> Reverse_Lock_t;
00620   Reverse_Lock_t reverse_sample_lock_;
00621 
00622   DomainParticipantImpl*       participant_servant_;
00623   TopicImpl*                   topic_servant_;
00624 
00625 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00626   bool is_exclusive_ownership_;
00627 
00628   OwnershipManager* owner_manager_;
00629 #endif
00630 
00631 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00632   DDS::ContentFilteredTopic_var content_filtered_topic_;
00633 #endif
00634 
00635 
00636   /// Is accessing to Group coherent changes ?
00637   bool coherent_;
00638 
00639   /// Ordered group samples.
00640   GroupRakeData group_coherent_ordered_data_;
00641 
00642   DDS::SubscriberQos subqos_;
00643 
00644 protected:
00645     virtual void add_link(const DataLink_rch& link, const RepoId& peer);
00646 
00647 private:
00648 
00649   void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
00650 
00651   /// Lookup the instance handles by the publication repo ids
00652   bool lookup_instance_handles(const WriterIdSeq& ids,
00653                                DDS::InstanceHandleSeq& hdls);
00654 
00655   void instances_liveliness_update(WriterInfo& info,
00656                                    const ACE_Time_Value& when);
00657 
00658 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00659   bool verify_coherent_changes_completion(WriterInfo* writer);
00660   bool coherent_change_received(WriterInfo* writer);
00661 #endif
00662 
00663   const RepoId& get_repo_id() const { return this->subscription_id_; }
00664   DDS::DomainId_t domain_id() const { return this->domain_id_; }
00665 
00666   Priority get_priority_value(const AssociationData& data) const {
00667     return data.publication_transport_priority_;
00668   }
00669 
00670   /// when done handling historic samples, resume
00671   void resume_sample_processing(const PublicationId& pub_id);
00672 
00673   /// collect samples received before END_HISTORIC_SAMPLES
00674   /// returns false if normal processing of this sample should be skipped
00675   bool check_historic(const ReceivedDataSample& sample);
00676 
00677   /// deliver samples that were held by check_historic()
00678   void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples);
00679 
00680   void listener_add_ref() { EntityImpl::_add_ref(); }
00681   void listener_remove_ref() { EntityImpl::_remove_ref(); }
00682 
00683   friend class InstanceState;
00684   friend class EndHistoricSamplesMissedSweeper;
00685   friend class RemoveAssociationSweeper<DataReaderImpl>;
00686 
00687   friend class ::DDS_TEST; //allows tests to get at private data
00688 
00689   DDS::TopicDescription_var    topic_desc_;
00690   DDS::StatusMask              listener_mask_;
00691   DDS::DataReaderListener_var  listener_;
00692   DDS::DomainId_t              domain_id_;
00693   SubscriberImpl*              subscriber_servant_;
00694   DDS::DataReader_var          dr_local_objref_;
00695   EndHistoricSamplesMissedSweeper* end_historic_sweeper_;
00696   RemoveAssociationSweeper<DataReaderImpl>*        remove_association_sweeper_;
00697 
00698   CORBA::Long                  depth_;
00699   size_t                       n_chunks_;
00700 
00701   //Used to protect access to id_to_handle_map_
00702   ACE_Recursive_Thread_Mutex   publication_handle_lock_;
00703   Reverse_Lock_t reverse_pub_handle_lock_;
00704 
00705   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00706   RepoIdToHandleMap            id_to_handle_map_;
00707 
00708   // Status conditions.
00709   DDS::LivelinessChangedStatus         liveliness_changed_status_;
00710   DDS::RequestedDeadlineMissedStatus   requested_deadline_missed_status_;
00711   DDS::RequestedIncompatibleQosStatus  requested_incompatible_qos_status_;
00712   DDS::SubscriptionMatchedStatus       subscription_match_status_;
00713 
00714   // OpenDDS extended status.  This is only available via listener.
00715   BudgetExceededStatus                 budget_exceeded_status_;
00716 
00717   /**
00718    * @todo The subscription_lost_status_ and
00719    *       subscription_reconnecting_status_ are left here for
00720    *       future use when we add get_subscription_lost_status()
00721    *       and get_subscription_reconnecting_status() methods.
00722    */
00723   // Statistics of the lost subscriptions due to lost connection.
00724   SubscriptionLostStatus               subscription_lost_status_;
00725   // Statistics of the subscriptions that are associated with a
00726   // reconnecting datalink.
00727   // SubscriptionReconnectingStatus      subscription_reconnecting_status_;
00728 
00729   /// The orb's reactor to be used to register the liveliness
00730   /// timer.
00731   ACE_Reactor_Timer_Interface* reactor_;
00732 
00733   class LivelinessTimer : public ReactorInterceptor {
00734   public:
00735     LivelinessTimer(ACE_Reactor* reactor,
00736                     ACE_thread_t owner,
00737                     DataReaderImpl* data_reader)
00738       : ReactorInterceptor(reactor, owner)
00739       , data_reader_(data_reader)
00740       , liveliness_timer_id_(-1)
00741     { }
00742 
00743     void check_liveliness()
00744     {
00745       CheckLivelinessCommand c(this);
00746       execute_or_enqueue(c);
00747     }
00748 
00749     void cancel_timer()
00750     {
00751       CancelCommand c(this);
00752       execute_or_enqueue(c);
00753     }
00754 
00755     virtual bool reactor_is_shut_down() const
00756     {
00757       return TheServiceParticipant->is_shut_down();
00758     }
00759 
00760   private:
00761     ~LivelinessTimer() { }
00762 
00763     DataReaderImpl* data_reader_;
00764 
00765     /// liveliness timer id; -1 if no timer is set
00766     long liveliness_timer_id_;
00767     void check_liveliness_i(bool cancel, const ACE_Time_Value& current_time);
00768 
00769     int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00770 
00771     class CommandBase : public Command {
00772     public:
00773       CommandBase(LivelinessTimer* timer)
00774         : timer_(timer)
00775       { }
00776 
00777     protected:
00778       LivelinessTimer* timer_;
00779     };
00780 
00781     class CheckLivelinessCommand : public CommandBase {
00782     public:
00783       CheckLivelinessCommand(LivelinessTimer* timer)
00784         : CommandBase(timer)
00785       { }
00786       virtual void execute()
00787       {
00788         timer_->check_liveliness_i(true, ACE_OS::gettimeofday());
00789       }
00790     };
00791 
00792     class CancelCommand : public CommandBase {
00793     public:
00794       CancelCommand(LivelinessTimer* timer)
00795         : CommandBase(timer)
00796       { }
00797       virtual void execute()
00798       {
00799         if (timer_->liveliness_timer_id_ != -1) {
00800           timer_->reactor()->cancel_timer(timer_);
00801         }
00802       }
00803     };
00804   };
00805   LivelinessTimer* liveliness_timer_;
00806 
00807   CORBA::Long last_deadline_missed_total_count_;
00808   /// Watchdog responsible for reporting missed offered
00809   /// deadlines.
00810   RequestedDeadlineWatchdog* watchdog_;
00811 
00812   /// Flag indicates that this datareader is a builtin topic
00813   /// datareader.
00814   bool is_bit_;
00815 
00816   /// Flag indicates that the init() is called.
00817   bool initialized_;
00818   bool always_get_history_;
00819 
00820   /// Flag indicating status of statistics gathering.
00821   bool statistics_enabled_;
00822 
00823   /// publications writing to this reader.
00824   typedef OPENDDS_MAP_CMP(PublicationId, RcHandle<WriterInfo>,
00825                    GUID_tKeyLessThan) WriterMapType;
00826 
00827   WriterMapType writers_;
00828 
00829   /// RW lock for reading/writing publications.
00830   ACE_RW_Thread_Mutex writers_lock_;
00831 
00832   /// Statistics for this reader, collected for each writer.
00833   StatsMapType statistics_;
00834 
00835   /// Bound (or initial reservation) of raw latency buffer.
00836   unsigned int raw_latency_buffer_size_;
00837 
00838   /// Type of raw latency data buffer.
00839   DataCollector<double>::OnFull raw_latency_buffer_type_;
00840 
00841   typedef VarLess<DDS::ReadCondition> RCCompLess;
00842   typedef OPENDDS_SET_CMP(DDS::ReadCondition_var,  RCCompLess) ReadConditionSet;
00843   ReadConditionSet read_conditions_;
00844 
00845   /// Monitor object for this entity
00846   Monitor* monitor_;
00847 
00848   /// Periodic Monitor object for this entity
00849   Monitor* periodic_monitor_;
00850 
00851   bool transport_disabled_;
00852 };
00853 
00854 } // namespace DCPS
00855 } // namespace OpenDDS
00856 
00857 #if defined (__ACE_INLINE__)
00858 # include "DataReaderImpl.inl"
00859 #endif  /* __ACE_INLINE__ */
00860 
00861 #endif /* OPENDDS_DCPS_DATAREADER_H  */

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7