SubscriberImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_SUBSCRIBER_H
00009 #define OPENDDS_DCPS_SUBSCRIBER_H
00010 
00011 #include "dds/DCPS/DataReaderCallbacks.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "EntityImpl.h"
00014 #include "Definitions.h"
00015 #include "DataCollector_T.h"
00016 #include "DataReaderImpl.h"
00017 #include "ace/Recursive_Thread_Mutex.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019 
00020 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00021 #pragma once
00022 #endif /* ACE_LACKS_PRAGMA_ONCE */
00023 
00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 class DomainParticipantImpl;
00030 class Monitor;
00031 
00032 #ifndef OPENDDS_NO_MULTI_TOPIC
00033 class MultiTopicImpl;
00034 #endif
00035 
00036 class OpenDDS_Dcps_Export SubscriberImpl
00037   : public virtual OpenDDS::DCPS::LocalObject<DDS::Subscriber>
00038   , public virtual EntityImpl {
00039 public:
00040 
00041   SubscriberImpl(DDS::InstanceHandle_t handle,
00042                  const DDS::SubscriberQos& qos,
00043                  DDS::SubscriberListener_ptr a_listener,
00044                  const DDS::StatusMask& mask,
00045                  DomainParticipantImpl* participant);
00046 
00047   virtual ~SubscriberImpl();
00048 
00049   virtual DDS::InstanceHandle_t get_instance_handle();
00050 
00051   bool contains_reader(DDS::InstanceHandle_t a_handle);
00052 
00053   virtual DDS::DataReader_ptr create_datareader(
00054     DDS::TopicDescription_ptr a_topic_desc,
00055     const DDS::DataReaderQos& qos,
00056     DDS::DataReaderListener_ptr a_listener,
00057     DDS::StatusMask mask);
00058 
00059   virtual DDS::ReturnCode_t delete_datareader(
00060     DDS::DataReader_ptr a_datareader);
00061 
00062   virtual DDS::ReturnCode_t delete_contained_entities();
00063 
00064   virtual DDS::DataReader_ptr lookup_datareader(
00065     const char* topic_name);
00066 
00067   virtual DDS::ReturnCode_t get_datareaders(
00068     DDS::DataReaderSeq& readers,
00069     DDS::SampleStateMask sample_states,
00070     DDS::ViewStateMask view_states,
00071     DDS::InstanceStateMask instance_states);
00072 
00073   virtual DDS::ReturnCode_t notify_datareaders();
00074 
00075   virtual DDS::ReturnCode_t set_qos(
00076     const DDS::SubscriberQos& qos);
00077 
00078   virtual DDS::ReturnCode_t get_qos(
00079     DDS::SubscriberQos& qos);
00080 
00081   virtual DDS::ReturnCode_t set_listener(
00082     DDS::SubscriberListener_ptr a_listener,
00083     DDS::StatusMask mask);
00084 
00085   virtual DDS::SubscriberListener_ptr get_listener();
00086 
00087 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00088 
00089   virtual DDS::ReturnCode_t begin_access();
00090 
00091   virtual DDS::ReturnCode_t end_access();
00092 
00093 #endif
00094 
00095   virtual DDS::DomainParticipant_ptr get_participant();
00096 
00097   virtual DDS::ReturnCode_t set_default_datareader_qos(
00098     const DDS::DataReaderQos& qos);
00099 
00100   virtual DDS::ReturnCode_t get_default_datareader_qos(
00101     DDS::DataReaderQos& qos);
00102 
00103   virtual DDS::ReturnCode_t copy_from_topic_qos(
00104     DDS::DataReaderQos& a_datareader_qos,
00105     const DDS::TopicQos& a_topic_qos);
00106 
00107   virtual DDS::ReturnCode_t enable();
00108 
00109   /** This method is not defined in the IDL and is defined for
00110   *  internal use.
00111   *  Check if there is any datareader associated with it.
00112   */
00113   bool is_clean() const;
00114 
00115   // called by DataReaderImpl::data_received
00116   void data_received(DataReaderImpl* reader);
00117 
00118   DDS::ReturnCode_t reader_enabled(const char* topic_name,
00119                                    DataReaderImpl* reader);
00120 
00121 #ifndef OPENDDS_NO_MULTI_TOPIC
00122   DDS::ReturnCode_t multitopic_reader_enabled(DDS::DataReader_ptr reader);
00123   void remove_from_datareader_set(DataReaderImpl* reader);
00124 #endif
00125 
00126   DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind);
00127 
00128   /// @name Raw Latency Statistics Configuration Interfaces
00129   /// @{
00130 
00131   /// Configure the size of the raw data collection buffer.
00132   unsigned int& raw_latency_buffer_size();
00133 
00134   /// Configure the type of the raw data collection buffer.
00135   DataCollector<double>::OnFull& raw_latency_buffer_type();
00136 
00137   /// @}
00138 
00139   typedef OPENDDS_VECTOR(RepoId) SubscriptionIdVec;
00140   /// Populates a std::vector with the SubscriptionIds (GUIDs)
00141   /// of this Subscriber's Data Readers
00142   void get_subscription_ids(SubscriptionIdVec& subs);
00143 
00144 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00145   void update_ownership_strength (const PublicationId& pub_id,
00146                                   const CORBA::Long& ownership_strength);
00147 #endif
00148 
00149 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00150   void coherent_change_received(RepoId& publisher_id,
00151                                 DataReaderImpl* reader,
00152                                 Coherent_State& group_state);
00153 #endif
00154 
00155   virtual RcHandle<EntityImpl> parent() const;
00156 
00157   static bool validate_datareader_qos(const DDS::DataReaderQos & qos,
00158                                       const DDS::DataReaderQos & default_qos,
00159                                       DDS::Topic_ptr a_topic,
00160                                       DDS::DataReaderQos & result_qos, bool mt);
00161 
00162 private:
00163 
00164   /// Keep track of all the DataReaders attached to this
00165   /// Subscriber: key is the topic_name
00166   typedef OPENDDS_MULTIMAP(OPENDDS_STRING, DataReaderImpl_rch) DataReaderMap;
00167 
00168   /// Keep track of DataReaders with data
00169   /// std::set for now, want to encapsulate
00170   /// this so we can switch between a set or
00171   /// list depending on Presentation QoS.
00172   typedef OPENDDS_SET(DataReaderImpl_rch) DataReaderSet;
00173 
00174   /// DataReader id to qos map.
00175   typedef OPENDDS_MAP_CMP(RepoId, DDS::DataReaderQos, GUID_tKeyLessThan) DrIdToQosMap;
00176 
00177   DDS::InstanceHandle_t        handle_;
00178 
00179   DDS::SubscriberQos           qos_;
00180   DDS::DataReaderQos           default_datareader_qos_;
00181 
00182   DDS::StatusMask              listener_mask_;
00183   DDS::SubscriberListener_var  listener_;
00184 
00185   DataReaderSet                readers_not_enabled_;
00186   DataReaderMap                datareader_map_;
00187   DataReaderSet                datareader_set_;
00188 
00189 #ifndef OPENDDS_NO_MULTI_TOPIC
00190   OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var) multitopic_reader_map_;
00191 #endif
00192 
00193   WeakRcHandle<DomainParticipantImpl> participant_;
00194 
00195   DDS::DomainId_t              domain_id_;
00196   RepoId                       dp_id_;
00197 
00198   /// Bound (or initial reservation) of raw latency buffers.
00199   unsigned int raw_latency_buffer_size_;
00200 
00201   /// Type of raw latency data buffers.
00202   DataCollector<double>::OnFull raw_latency_buffer_type_;
00203 
00204   /// this lock protects the data structures in this class.
00205   ACE_Recursive_Thread_Mutex   si_lock_;
00206 
00207   /// Monitor object for this entity
00208   Monitor* monitor_;
00209 
00210   int access_depth_;
00211 };
00212 
00213 } // namespace DCPS
00214 } // namespace OpenDDS
00215 
00216 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00217 
00218 #endif /* OPENDDS_DCPS_SUBSCRIBER_H  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1