DataReaderImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_DATAREADER_H
00009 #define OPENDDS_DCPS_DATAREADER_H
00010 
00011 #include "dcps_export.h"
00012 #include "EntityImpl.h"
00013 #include "dds/DdsDcpsTopicC.h"
00014 #include "dds/DdsDcpsSubscriptionExtC.h"
00015 #include "dds/DdsDcpsDomainC.h"
00016 #include "dds/DdsDcpsTopicC.h"
00017 #include "Definitions.h"
00018 #include "dds/DCPS/DataReaderCallbacks.h"
00019 #include "dds/DCPS/transport/framework/ReceivedDataSample.h"
00020 #include "dds/DCPS/transport/framework/TransportReceiveListener.h"
00021 #include "dds/DCPS/transport/framework/TransportClient.h"
00022 #include "DisjointSequence.h"
00023 #include "SubscriptionInstance.h"
00024 #include "InstanceState.h"
00025 #include "Cached_Allocator_With_Overflow_T.h"
00026 #include "ZeroCopyInfoSeq_T.h"
00027 #include "Stats_T.h"
00028 #include "OwnershipManager.h"
00029 #include "ContentFilteredTopicImpl.h"
00030 #include "GroupRakeData.h"
00031 #include "CoherentChangeControl.h"
00032 #include "AssociationData.h"
00033 #include "dds/DdsDcpsInfrastructureC.h"
00034 #include "RcHandle_T.h"
00035 #include "RcObject.h"
00036 #include "WriterInfo.h"
00037 #include "ReactorInterceptor.h"
00038 #include "Service_Participant.h"
00039 #include "PoolAllocator.h"
00040 #include "RemoveAssociationSweeper.h"
00041 #include "RcEventHandler.h"
00042 #include "TopicImpl.h"
00043 #include "DomainParticipantImpl.h"
00044 
00045 #include "ace/String_Base.h"
00046 #include "ace/Reverse_Lock_T.h"
00047 #include "ace/Atomic_Op.h"
00048 #include "ace/Reactor.h"
00049 
00050 #include "dds/DCPS/PoolAllocator.h"
00051 #include <memory>
00052 
00053 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00054 #pragma once
00055 #endif /* ACE_LACKS_PRAGMA_ONCE */
00056 
00057 class DDS_TEST;
00058 
00059 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00060 
00061 namespace OpenDDS {
00062 namespace DCPS {
00063 
00064 class SubscriberImpl;
00065 class DomainParticipantImpl;
00066 class SubscriptionInstance;
00067 class TopicImpl;
00068 class TopicDescriptionImpl;
00069 class RequestedDeadlineWatchdog;
00070 class Monitor;
00071 class DataReaderImpl;
00072 class FilterEvaluator;
00073 
00074 typedef Cached_Allocator_With_Overflow<OpenDDS::DCPS::ReceivedDataElementMemoryBlock, ACE_Null_Mutex>
00075 ReceivedDataAllocator;
00076 
00077 enum MarshalingType {
00078   FULL_MARSHALING,
00079   KEY_ONLY_MARSHALING
00080 };
00081 
00082 /// Elements stored for managing statistical data.
00083 class OpenDDS_Dcps_Export WriterStats {
00084 public:
00085   /// Default constructor.
00086   WriterStats(
00087     int amount = 0,
00088     DataCollector<double>::OnFull type = DataCollector<double>::KeepOldest);
00089 
00090   /// Add a datum to the latency statistics.
00091   void add_stat(const ACE_Time_Value& delay);
00092 
00093   /// Extract the current latency statistics for this writer.
00094   LatencyStatistics get_stats() const;
00095 
00096   /// Reset the latency statistics for this writer.
00097   void reset_stats();
00098 
00099 #ifndef OPENDDS_SAFETY_PROFILE
00100   /// Dump any raw data.
00101   std::ostream& raw_data(std::ostream& str) const;
00102 #endif
00103 
00104 private:
00105   /// Latency statistics for the DataWriter to this DataReader.
00106   Stats<double> stats_;
00107 };
00108 
00109 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00110 
00111 class OpenDDS_Dcps_Export AbstractSamples
00112 {
00113 public:
00114   virtual ~AbstractSamples(){}
00115   virtual void reserve(CORBA::ULong size)=0;
00116   virtual void push_back(const DDS::SampleInfo& info, const void* sample)=0;
00117 };
00118 
00119 #endif
00120 
00121 
00122 // Class to cleanup in case EndHistoricSamples is missed
00123 class EndHistoricSamplesMissedSweeper : public ReactorInterceptor {
00124 public:
00125   EndHistoricSamplesMissedSweeper(ACE_Reactor* reactor,
00126                                   ACE_thread_t owner,
00127                                   DataReaderImpl* reader);
00128 
00129   void schedule_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00130   void cancel_timer(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info);
00131 
00132   // Arg will be PublicationId
00133   int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00134 
00135   virtual bool reactor_is_shut_down() const
00136   {
00137     return TheServiceParticipant->is_shut_down();
00138   }
00139 
00140 private:
00141   ~EndHistoricSamplesMissedSweeper();
00142 
00143   WeakRcHandle<DataReaderImpl> reader_;
00144   OPENDDS_SET(RcHandle<OpenDDS::DCPS::WriterInfo>) info_set_;
00145 
00146   class CommandBase : public Command {
00147   public:
00148     CommandBase(EndHistoricSamplesMissedSweeper* sweeper,
00149                 OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00150       : sweeper_ (sweeper)
00151       , info_(info)
00152     { }
00153 
00154   protected:
00155     EndHistoricSamplesMissedSweeper* sweeper_;
00156     OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo> info_;
00157   };
00158 
00159   class ScheduleCommand : public CommandBase {
00160   public:
00161     ScheduleCommand(EndHistoricSamplesMissedSweeper* sweeper,
00162                     OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00163       : CommandBase(sweeper, info)
00164     { }
00165     virtual void execute();
00166   };
00167 
00168   class CancelCommand : public CommandBase {
00169   public:
00170     CancelCommand(EndHistoricSamplesMissedSweeper* sweeper,
00171                   OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::WriterInfo>& info)
00172       : CommandBase(sweeper, info)
00173     { }
00174     virtual void execute();
00175   };
00176 };
00177 
00178 /**
00179 * @class DataReaderImpl
00180 *
00181 * @brief Implements the DDS::DataReader interface.
00182 *
00183 * See the DDS specification, OMG formal/04-12-02, for a description of
00184 * the interface this class is implementing.
00185 *
00186 * This class must be inherited by the type-specific datareader which
00187 * is specific to the data-type associated with the topic.
00188 *
00189 */
00190 class OpenDDS_Dcps_Export DataReaderImpl
00191   : public virtual LocalObject<DataReaderEx>,
00192     public virtual DataReaderCallbacks,
00193     public virtual EntityImpl,
00194     public virtual TransportClient,
00195     public virtual TransportReceiveListener,
00196     private WriterInfoListener {
00197 public:
00198   friend class RequestedDeadlineWatchdog;
00199   friend class QueryConditionImpl;
00200   friend class SubscriberImpl;
00201 
00202   typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType;
00203 
00204   /// Type of collection of statistics for writers to this reader.
00205   typedef OPENDDS_MAP_CMP(PublicationId, WriterStats, GUID_tKeyLessThan) StatsMapType;
00206 
00207   DataReaderImpl();
00208 
00209   virtual ~DataReaderImpl();
00210 
00211   virtual DDS::InstanceHandle_t get_instance_handle();
00212 
00213   virtual void add_association(const RepoId& yourId,
00214                                const WriterAssociation& writer,
00215                                bool active);
00216 
00217   virtual void transport_assoc_done(int flags, const RepoId& remote_id);
00218 
00219   virtual void association_complete(const RepoId& remote_id);
00220 
00221   virtual void remove_associations(const WriterIdSeq& writers, bool callback);
00222 
00223   virtual void update_incompatible_qos(const IncompatibleQosStatus& status);
00224 
00225   virtual void inconsistent_topic();
00226 
00227   virtual void signal_liveliness(const RepoId& remote_participant);
00228 
00229   /**
00230   * This is used to retrieve the listener for a certain status change.
00231   * If this datareader has a registered listener and the status kind
00232   * is in the listener mask then the listener is returned.
00233   * Otherwise, the query for the listener is propagated up to the
00234   * factory/subscriber.
00235   */
00236   DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind);
00237 
00238   /// tell instances when a DataWriter transitions to being alive
00239   /// The writer state is inout parameter, it has to be set ALIVE before
00240   /// handle_timeout is called since some subroutine use the state.
00241   void writer_became_alive(WriterInfo& info,
00242                            const ACE_Time_Value& when);
00243 
00244   /// tell instances when a DataWriter transitions to DEAD
00245   /// The writer state is inout parameter, the state is set to DEAD
00246   /// when it returns.
00247   void writer_became_dead(WriterInfo& info,
00248                           const ACE_Time_Value& when);
00249 
00250   /// tell instance when a DataWriter is removed.
00251   /// The liveliness status need update.
00252   void writer_removed(WriterInfo& info);
00253 
00254   virtual void cleanup();
00255 
00256   void init(
00257     TopicDescriptionImpl* a_topic_desc,
00258     const DDS::DataReaderQos &  qos,
00259     DDS::DataReaderListener_ptr a_listener,
00260     const DDS::StatusMask &     mask,
00261     DomainParticipantImpl*        participant,
00262     SubscriberImpl*               subscriber);
00263 
00264   virtual DDS::ReadCondition_ptr create_readcondition(
00265     DDS::SampleStateMask sample_states,
00266     DDS::ViewStateMask view_states,
00267     DDS::InstanceStateMask instance_states);
00268 
00269 #ifndef OPENDDS_NO_QUERY_CONDITION
00270   virtual DDS::QueryCondition_ptr create_querycondition(
00271     DDS::SampleStateMask sample_states,
00272     DDS::ViewStateMask view_states,
00273     DDS::InstanceStateMask instance_states,
00274     const char * query_expression,
00275     const DDS::StringSeq & query_parameters);
00276 #endif
00277 
00278   virtual DDS::ReturnCode_t delete_readcondition(
00279     DDS::ReadCondition_ptr a_condition);
00280 
00281   virtual DDS::ReturnCode_t delete_contained_entities();
00282 
00283   virtual DDS::ReturnCode_t set_qos(
00284     const DDS::DataReaderQos & qos);
00285 
00286   virtual DDS::ReturnCode_t get_qos(
00287     DDS::DataReaderQos & qos);
00288 
00289   virtual DDS::ReturnCode_t set_listener(
00290     DDS::DataReaderListener_ptr a_listener,
00291     DDS::StatusMask mask);
00292 
00293   virtual DDS::DataReaderListener_ptr get_listener();
00294 
00295   virtual DDS::TopicDescription_ptr get_topicdescription();
00296 
00297   virtual DDS::Subscriber_ptr get_subscriber();
00298 
00299   virtual DDS::ReturnCode_t get_sample_rejected_status(
00300     DDS::SampleRejectedStatus & status);
00301 
00302   virtual DDS::ReturnCode_t get_liveliness_changed_status(
00303     DDS::LivelinessChangedStatus & status);
00304 
00305   virtual DDS::ReturnCode_t get_requested_deadline_missed_status(
00306     DDS::RequestedDeadlineMissedStatus & status);
00307 
00308   virtual DDS::ReturnCode_t get_requested_incompatible_qos_status(
00309     DDS::RequestedIncompatibleQosStatus & status);
00310 
00311   virtual DDS::ReturnCode_t get_subscription_matched_status(
00312     DDS::SubscriptionMatchedStatus & status);
00313 
00314   virtual DDS::ReturnCode_t get_sample_lost_status(
00315     DDS::SampleLostStatus & status);
00316 
00317   virtual DDS::ReturnCode_t wait_for_historical_data(
00318     const DDS::Duration_t & max_wait);
00319 
00320   virtual DDS::ReturnCode_t get_matched_publications(
00321     DDS::InstanceHandleSeq & publication_handles);
00322 
00323 #if !defined (DDS_HAS_MINIMUM_BIT)
00324   virtual DDS::ReturnCode_t get_matched_publication_data(
00325     DDS::PublicationBuiltinTopicData & publication_data,
00326     DDS::InstanceHandle_t publication_handle);
00327 #endif // !defined (DDS_HAS_MINIMUM_BIT)
00328 
00329   virtual DDS::ReturnCode_t enable();
00330 
00331 #ifndef OPENDDS_SAFETY_PROFILE
00332   virtual void get_latency_stats(
00333     OpenDDS::DCPS::LatencyStatisticsSeq & stats);
00334 #endif
00335 
00336   virtual void reset_latency_stats();
00337 
00338   virtual CORBA::Boolean statistics_enabled();
00339 
00340   virtual void statistics_enabled(
00341     CORBA::Boolean statistics_enabled);
00342 
00343   /// @name Raw Latency Statistics Interfaces
00344   /// @{
00345 
00346   /// Expose the statistics container.
00347   const StatsMapType& raw_latency_statistics() const;
00348 
00349   /// Configure the size of the raw data collection buffer.
00350   unsigned int& raw_latency_buffer_size();
00351 
00352   /// Configure the type of the raw data collection buffer.
00353   DataCollector<double>::OnFull& raw_latency_buffer_type();
00354 
00355   /// @}
00356 
00357   /// update liveliness info for this writer.
00358   void writer_activity(const DataSampleHeader& header);
00359 
00360   /// process a message that has been received - could be control or a data sample.
00361   virtual void data_received(const ReceivedDataSample& sample);
00362 
00363   virtual bool check_transport_qos(const TransportInst& inst);
00364 
00365   RepoId get_subscription_id() const;
00366 
00367   bool have_sample_states(DDS::SampleStateMask sample_states) const;
00368   bool have_view_states(DDS::ViewStateMask view_states) const;
00369   bool have_instance_states(DDS::InstanceStateMask instance_states) const;
00370   bool contains_sample(DDS::SampleStateMask sample_states,
00371                        DDS::ViewStateMask view_states,
00372                        DDS::InstanceStateMask instance_states);
00373 
00374 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00375   virtual bool contains_sample_filtered(DDS::SampleStateMask sample_states,
00376                                         DDS::ViewStateMask view_states,
00377                                         DDS::InstanceStateMask instance_states,
00378                                         const FilterEvaluator& evaluator,
00379                                         const DDS::StringSeq& params) = 0;
00380 #endif
00381 
00382   virtual void dds_demarshal(const ReceivedDataSample& sample,
00383                              SubscriptionInstance_rch& instance,
00384                              bool & is_new_instance,
00385                              bool & filtered,
00386                              MarshalingType marshaling_type)= 0;
00387 
00388   virtual void dispose_unregister(const ReceivedDataSample& sample,
00389                                   SubscriptionInstance_rch& instance);
00390 
00391   void process_latency(const ReceivedDataSample& sample);
00392   void notify_latency(PublicationId writer);
00393 
00394   CORBA::Long get_depth() const {
00395     return depth_;
00396   }
00397   size_t get_n_chunks() const {
00398     return n_chunks_;
00399   }
00400 
00401   void liveliness_lost();
00402 
00403   void remove_all_associations();
00404 
00405   void notify_subscription_disconnected(const WriterIdSeq& pubids);
00406   void notify_subscription_reconnected(const WriterIdSeq& pubids);
00407   void notify_subscription_lost(const WriterIdSeq& pubids);
00408   void notify_liveliness_change();
00409 
00410   bool is_bit() const;
00411 
00412   /**
00413    * This method is used for a precondition check of delete_datareader.
00414    *
00415    * @retval true We have zero-copy samples loaned out
00416    * @retval false We have no zero-copy samples loaned out
00417    */
00418   bool has_zero_copies();
00419 
00420   /// Release the instance with the handle.
00421   void release_instance(DDS::InstanceHandle_t handle);
00422 
00423   // Reset time interval for each instance.
00424   void reschedule_deadline();
00425 
00426   ACE_Reactor_Timer_Interface* get_reactor();
00427 
00428   RepoId get_topic_id();
00429   RepoId get_dp_id();
00430 
00431   typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
00432   void get_instance_handles(InstanceHandleVec& instance_handles);
00433 
00434   typedef std::pair<PublicationId, WriterInfo::WriterState> WriterStatePair;
00435   typedef OPENDDS_VECTOR(WriterStatePair) WriterStatePairVec;
00436   void get_writer_states(WriterStatePairVec& writer_states);
00437 
00438 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00439   void update_ownership_strength (const PublicationId& pub_id,
00440                                   const CORBA::Long& ownership_strength);
00441 
00442   // Access to OwnershipManager is only valid when the domain participant is valid;
00443   // therefore, we must lock the domain pariticipant when using  OwnershipManager.
00444   class OwnershipManagerPtr
00445   {
00446   public:
00447     OwnershipManagerPtr(DataReaderImpl* reader)
00448       : participant_( reader->is_exclusive_ownership_ ? reader->participant_servant_.lock() : RcHandle<DomainParticipantImpl>())
00449     {
00450     }
00451     operator bool() const { return participant_.in(); }
00452     OwnershipManager* operator->() const
00453     {
00454       return participant_->ownership_manager();
00455     }
00456 
00457   private:
00458     RcHandle<DomainParticipantImpl> participant_;
00459   };
00460   friend class OwnershipManagerPtr;
00461 
00462   OwnershipManagerPtr ownership_manager() { return OwnershipManagerPtr(this); }
00463 #endif
00464 
00465   virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
00466                                OpenDDS::DCPS::SubscriptionInstance_rch& instance) = 0;
00467 
00468 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
00469 
00470 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00471 
00472   void enable_filtering(ContentFilteredTopicImpl* cft);
00473 
00474   DDS::ContentFilteredTopic_ptr get_cf_topic() const;
00475 
00476 #endif
00477 
00478   void update_subscription_params(const DDS::StringSeq& params) const;
00479 
00480   typedef OPENDDS_VECTOR(void*) GenericSeq;
00481 
00482   struct GenericBundle {
00483     GenericSeq samples_;
00484     DDS::SampleInfoSeq info_;
00485   };
00486 
00487   virtual DDS::ReturnCode_t read_generic(GenericBundle& gen,
00488     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00489     DDS::InstanceStateMask instance_states, bool adjust_ref_count ) = 0;
00490 
00491   virtual DDS::ReturnCode_t take(
00492     AbstractSamples& samples,
00493     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00494     DDS::InstanceStateMask instance_states)=0;
00495 
00496   virtual DDS::InstanceHandle_t lookup_instance_generic(const void* data) = 0;
00497 
00498   virtual DDS::ReturnCode_t read_instance_generic(void*& data,
00499     DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
00500     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00501     DDS::InstanceStateMask instance_states) = 0;
00502 
00503   virtual DDS::ReturnCode_t read_next_instance_generic(void*& data,
00504     DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
00505     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00506     DDS::InstanceStateMask instance_states) = 0;
00507 
00508   virtual void set_instance_state(DDS::InstanceHandle_t instance,
00509                                   DDS::InstanceStateKind state) = 0;
00510 
00511 #endif
00512 
00513 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00514   void begin_access();
00515   void end_access();
00516   void get_ordered_data(GroupRakeData& data,
00517                         DDS::SampleStateMask sample_states,
00518                         DDS::ViewStateMask view_states,
00519                         DDS::InstanceStateMask instance_states);
00520 
00521   void accept_coherent (PublicationId& writer_id,
00522                         RepoId& publisher_id);
00523   void reject_coherent (PublicationId& writer_id,
00524                         RepoId& publisher_id);
00525   void coherent_change_received (RepoId publisher_id, Coherent_State& result);
00526 
00527   void coherent_changes_completed (DataReaderImpl* reader);
00528 
00529   void reset_coherent_info (const PublicationId& writer_id,
00530                             const RepoId& publisher_id);
00531 #endif
00532 
00533   // Called upon subscriber qos change to update the local cache.
00534   void set_subscriber_qos(const DDS::SubscriberQos & qos);
00535 
00536   // Set the instance related writers to reevaluate the owner.
00537   void reset_ownership (DDS::InstanceHandle_t instance);
00538 
00539   virtual RcHandle<EntityImpl> parent() const;
00540 
00541   void disable_transport();
00542 
00543   virtual void register_for_writer(const RepoId& /*participant*/,
00544                                    const RepoId& /*readerid*/,
00545                                    const RepoId& /*writerid*/,
00546                                    const TransportLocatorSeq& /*locators*/,
00547                                    DiscoveryListener* /*listener*/);
00548 
00549   virtual void unregister_for_writer(const RepoId& /*participant*/,
00550                                      const RepoId& /*readerid*/,
00551                                      const RepoId& /*writerid*/);
00552 
00553 protected:
00554   virtual void remove_associations_i(const WriterIdSeq& writers, bool callback);
00555   void remove_publication(const PublicationId& pub_id);
00556 
00557   void prepare_to_delete();
00558 
00559   RcHandle<SubscriberImpl> get_subscriber_servant();
00560 
00561   void post_read_or_take();
00562 
00563   // type specific DataReader's part of enable.
00564   virtual DDS::ReturnCode_t enable_specific() = 0;
00565 
00566   void sample_info(DDS::SampleInfo & sample_info,
00567                    const ReceivedDataElement *ptr);
00568 
00569   CORBA::Long total_samples() const;
00570 
00571   void set_sample_lost_status(const DDS::SampleLostStatus& status);
00572   void set_sample_rejected_status(
00573     const DDS::SampleRejectedStatus& status);
00574 
00575 //remove document this!
00576   SubscriptionInstance_rch get_handle_instance(
00577     DDS::InstanceHandle_t handle);
00578 
00579   /**
00580   * Get an instance handle for a new instance.
00581   */
00582   DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t& key);
00583 
00584   virtual void purge_data(SubscriptionInstance_rch instance) = 0;
00585 
00586   virtual void release_instance_i(DDS::InstanceHandle_t handle) = 0;
00587 
00588   bool has_readcondition(DDS::ReadCondition_ptr a_condition);
00589 
00590   /// @TODO: document why the instances_ container is mutable.
00591   mutable SubscriptionInstanceMapType instances_;
00592 
00593   /// Assume since the container is mutable(?!!?) it may need to use the
00594   /// lock while const.
00595   /// @TODO: remove the recursive nature of the instances_lock if not needed.
00596   mutable ACE_Recursive_Thread_Mutex instances_lock_;
00597 
00598   /// Check if the received data sample or instance should
00599   /// be filtered.
00600   /**
00601    * @note Filtering will only occur if the application
00602    *       configured a finite duration in the Topic's LIFESPAN
00603    *       QoS policy or DataReader's TIME_BASED_FILTER QoS policy.
00604    */
00605   bool filter_sample(const DataSampleHeader& header);
00606 
00607   bool ownership_filter_instance(const SubscriptionInstance_rch& instance,
00608                                  const PublicationId& pubid);
00609   bool time_based_filter_instance(const SubscriptionInstance_rch& instance,
00610                                   ACE_Time_Value& filter_time_expired);
00611 
00612   void accept_sample_processing(const SubscriptionInstance_rch& instance, const DataSampleHeader& header, bool is_new_instance);
00613 
00614   virtual void qos_change(const DDS::DataReaderQos& qos);
00615 
00616   /// Data has arrived into the cache, unblock waiting ReadConditions
00617   void notify_read_conditions();
00618 
00619   unique_ptr<ReceivedDataAllocator>  rd_allocator_;
00620   DDS::DataReaderQos           qos_;
00621 
00622   // Status conditions accessible by subclasses.
00623   DDS::SampleRejectedStatus sample_rejected_status_;
00624   DDS::SampleLostStatus sample_lost_status_;
00625 
00626   /// lock protecting sample container as well as statuses.
00627   ACE_Recursive_Thread_Mutex   sample_lock_;
00628 
00629   typedef ACE_Reverse_Lock<ACE_Recursive_Thread_Mutex> Reverse_Lock_t;
00630   Reverse_Lock_t reverse_sample_lock_;
00631 
00632   WeakRcHandle<DomainParticipantImpl> participant_servant_;
00633   TopicDescriptionPtr<TopicImpl>      topic_servant_;
00634 
00635 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00636   bool is_exclusive_ownership_;
00637 
00638 #endif
00639 
00640 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00641   TopicDescriptionPtr<ContentFilteredTopicImpl> content_filtered_topic_;
00642 #endif
00643 
00644 
00645   /// Is accessing to Group coherent changes ?
00646   bool coherent_;
00647 
00648   /// Ordered group samples.
00649   GroupRakeData group_coherent_ordered_data_;
00650 
00651   DDS::SubscriberQos subqos_;
00652 
00653 protected:
00654   virtual void add_link(const DataLink_rch& link, const RepoId& peer);
00655 
00656 private:
00657 
00658   void notify_subscription_lost(const DDS::InstanceHandleSeq& handles);
00659 
00660   /// Lookup the instance handles by the publication repo ids
00661   void lookup_instance_handles(const WriterIdSeq& ids,
00662                                DDS::InstanceHandleSeq& hdls);
00663 
00664   void instances_liveliness_update(WriterInfo& info,
00665                                    const ACE_Time_Value& when);
00666 
00667 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00668   bool verify_coherent_changes_completion(WriterInfo* writer);
00669   bool coherent_change_received(WriterInfo* writer);
00670 #endif
00671 
00672   const RepoId& get_repo_id() const { return this->subscription_id_; }
00673   DDS::DomainId_t domain_id() const { return this->domain_id_; }
00674 
00675   Priority get_priority_value(const AssociationData& data) const {
00676     return data.publication_transport_priority_;
00677   }
00678 
00679 #if defined(OPENDDS_SECURITY)
00680   DDS::Security::ParticipantCryptoHandle get_crypto_handle() const;
00681 #endif
00682 
00683   /// when done handling historic samples, resume
00684   void resume_sample_processing(const PublicationId& pub_id);
00685 
00686   /// collect samples received before END_HISTORIC_SAMPLES
00687   /// returns false if normal processing of this sample should be skipped
00688   bool check_historic(const ReceivedDataSample& sample);
00689 
00690   /// deliver samples that were held by check_historic()
00691   void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)& samples);
00692 
00693   friend class InstanceState;
00694   friend class EndHistoricSamplesMissedSweeper;
00695   friend class RemoveAssociationSweeper<DataReaderImpl>;
00696 
00697   friend class ::DDS_TEST; //allows tests to get at private data
00698 
00699   DDS::TopicDescription_var    topic_desc_;
00700   DDS::StatusMask              listener_mask_;
00701   DDS::DataReaderListener_var  listener_;
00702   DDS::DomainId_t              domain_id_;
00703   RepoId                       dp_id_;
00704   // subscriber_servant_ has to be a weak pinter because it may be used from the
00705   // transport reactor thread and that thread doesn't have the owenership of the
00706   // the subscriber_servant_ object.
00707   WeakRcHandle<SubscriberImpl>              subscriber_servant_;
00708   RcHandle<EndHistoricSamplesMissedSweeper> end_historic_sweeper_;
00709   RcHandle<RemoveAssociationSweeper<DataReaderImpl> > remove_association_sweeper_;
00710 
00711   CORBA::Long                  depth_;
00712   size_t                       n_chunks_;
00713 
00714   //Used to protect access to id_to_handle_map_
00715   ACE_Recursive_Thread_Mutex   publication_handle_lock_;
00716   Reverse_Lock_t reverse_pub_handle_lock_;
00717 
00718   typedef OPENDDS_MAP_CMP(RepoId, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap;
00719   RepoIdToHandleMap            id_to_handle_map_;
00720 
00721   // Status conditions.
00722   DDS::LivelinessChangedStatus         liveliness_changed_status_;
00723   DDS::RequestedDeadlineMissedStatus   requested_deadline_missed_status_;
00724   DDS::RequestedIncompatibleQosStatus  requested_incompatible_qos_status_;
00725   DDS::SubscriptionMatchedStatus       subscription_match_status_;
00726 
00727   // OpenDDS extended status.  This is only available via listener.
00728   BudgetExceededStatus                 budget_exceeded_status_;
00729 
00730   /**
00731    * @todo The subscription_lost_status_ and
00732    *       subscription_reconnecting_status_ are left here for
00733    *       future use when we add get_subscription_lost_status()
00734    *       and get_subscription_reconnecting_status() methods.
00735    */
00736   // Statistics of the lost subscriptions due to lost connection.
00737   SubscriptionLostStatus               subscription_lost_status_;
00738   // Statistics of the subscriptions that are associated with a
00739   // reconnecting datalink.
00740   // SubscriptionReconnectingStatus      subscription_reconnecting_status_;
00741 
00742   /// The orb's reactor to be used to register the liveliness
00743   /// timer.
00744   ACE_Reactor_Timer_Interface* reactor_;
00745 
00746   class LivelinessTimer : public ReactorInterceptor {
00747   public:
00748     LivelinessTimer(ACE_Reactor* reactor,
00749                     ACE_thread_t owner,
00750                     DataReaderImpl* data_reader)
00751       : ReactorInterceptor(reactor, owner)
00752       , data_reader_(*data_reader)
00753       , liveliness_timer_id_(-1)
00754     { }
00755 
00756     void check_liveliness()
00757     {
00758       CheckLivelinessCommand c(this);
00759       execute_or_enqueue(c);
00760     }
00761 
00762     void cancel_timer()
00763     {
00764       CancelCommand c(this);
00765       execute_or_enqueue(c);
00766     }
00767 
00768     virtual bool reactor_is_shut_down() const
00769     {
00770       return TheServiceParticipant->is_shut_down();
00771     }
00772 
00773   private:
00774     ~LivelinessTimer() { }
00775 
00776     WeakRcHandle<DataReaderImpl> data_reader_;
00777 
00778     /// liveliness timer id; -1 if no timer is set
00779     long liveliness_timer_id_;
00780     void check_liveliness_i(bool cancel, const ACE_Time_Value& current_time);
00781 
00782     int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
00783 
00784     class CommandBase : public Command {
00785     public:
00786       CommandBase(LivelinessTimer* timer)
00787         : timer_(timer)
00788       { }
00789 
00790     protected:
00791       LivelinessTimer* timer_;
00792     };
00793 
00794     class CheckLivelinessCommand : public CommandBase {
00795     public:
00796       CheckLivelinessCommand(LivelinessTimer* timer)
00797         : CommandBase(timer)
00798       { }
00799       virtual void execute()
00800       {
00801         timer_->check_liveliness_i(true, ACE_OS::gettimeofday());
00802       }
00803     };
00804 
00805     class CancelCommand : public CommandBase {
00806     public:
00807       CancelCommand(LivelinessTimer* timer)
00808         : CommandBase(timer)
00809       { }
00810       virtual void execute()
00811       {
00812         if (timer_->liveliness_timer_id_ != -1) {
00813           timer_->reactor()->cancel_timer(timer_);
00814         }
00815       }
00816     };
00817   };
00818   RcHandle<LivelinessTimer> liveliness_timer_;
00819 
00820   CORBA::Long last_deadline_missed_total_count_;
00821   /// Watchdog responsible for reporting missed offered
00822   /// deadlines.
00823   RcHandle<RequestedDeadlineWatchdog> watchdog_;
00824 
00825   /// Flag indicates that this datareader is a builtin topic
00826   /// datareader.
00827   bool is_bit_;
00828 
00829   bool always_get_history_;
00830 
00831   /// Flag indicating status of statistics gathering.
00832   bool statistics_enabled_;
00833 
00834   /// publications writing to this reader.
00835   typedef OPENDDS_MAP_CMP(PublicationId, RcHandle<WriterInfo>,
00836                    GUID_tKeyLessThan) WriterMapType;
00837 
00838   WriterMapType writers_;
00839 
00840   /// RW lock for reading/writing publications.
00841   ACE_RW_Thread_Mutex writers_lock_;
00842 
00843   /// Statistics for this reader, collected for each writer.
00844   StatsMapType statistics_;
00845 
00846   /// Bound (or initial reservation) of raw latency buffer.
00847   unsigned int raw_latency_buffer_size_;
00848 
00849   /// Type of raw latency data buffer.
00850   DataCollector<double>::OnFull raw_latency_buffer_type_;
00851 
00852   typedef VarLess<DDS::ReadCondition> RCCompLess;
00853   typedef OPENDDS_SET_CMP(DDS::ReadCondition_var,  RCCompLess) ReadConditionSet;
00854   ReadConditionSet read_conditions_;
00855 
00856   /// Monitor object for this entity
00857   Monitor* monitor_;
00858 
00859   /// Periodic Monitor object for this entity
00860   Monitor* periodic_monitor_;
00861 
00862   bool transport_disabled_;
00863 };
00864 
00865 typedef RcHandle<DataReaderImpl> DataReaderImpl_rch;
00866 
00867 } // namespace DCPS
00868 } // namespace OpenDDS
00869 
00870 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00871 
00872 #if defined (__ACE_INLINE__)
00873 # include "DataReaderImpl.inl"
00874 #endif  /* __ACE_INLINE__ */
00875 
00876 #endif /* OPENDDS_DCPS_DATAREADER_H  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1