SubscriberImpl.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_SUBSCRIBER_H
00009 #define OPENDDS_DCPS_SUBSCRIBER_H
00010 
00011 #include "dds/DCPS/DataReaderCallbacks.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "EntityImpl.h"
00014 #include "Definitions.h"
00015 #include "DataCollector_T.h"
00016 #include "DataReaderImpl.h"
00017 #include "ace/Synch.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019 
00020 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00021 #pragma once
00022 #endif /* ACE_LACKS_PRAGMA_ONCE */
00023 
00024 namespace OpenDDS {
00025 namespace DCPS {
00026 
00027 class DomainParticipantImpl;
00028 class Monitor;
00029 
00030 #ifndef OPENDDS_NO_MULTI_TOPIC
00031 class MultiTopicImpl;
00032 #endif
00033 
00034 class OpenDDS_Dcps_Export SubscriberImpl
00035   : public virtual OpenDDS::DCPS::LocalObject<DDS::Subscriber>
00036   , public virtual EntityImpl {
00037 public:
00038 
00039   SubscriberImpl(DDS::InstanceHandle_t handle,
00040                  const DDS::SubscriberQos& qos,
00041                  DDS::SubscriberListener_ptr a_listener,
00042                  const DDS::StatusMask& mask,
00043                  DomainParticipantImpl* participant);
00044 
00045   virtual ~SubscriberImpl();
00046 
00047   virtual DDS::InstanceHandle_t get_instance_handle();
00048 
00049   bool contains_reader(DDS::InstanceHandle_t a_handle);
00050 
00051   virtual DDS::DataReader_ptr create_datareader(
00052     DDS::TopicDescription_ptr a_topic_desc,
00053     const DDS::DataReaderQos& qos,
00054     DDS::DataReaderListener_ptr a_listener,
00055     DDS::StatusMask mask);
00056 
00057   virtual DDS::ReturnCode_t delete_datareader(
00058     DDS::DataReader_ptr a_datareader);
00059 
00060   virtual DDS::ReturnCode_t delete_contained_entities();
00061 
00062   virtual DDS::DataReader_ptr lookup_datareader(
00063     const char* topic_name);
00064 
00065   virtual DDS::ReturnCode_t get_datareaders(
00066     DDS::DataReaderSeq& readers,
00067     DDS::SampleStateMask sample_states,
00068     DDS::ViewStateMask view_states,
00069     DDS::InstanceStateMask instance_states);
00070 
00071   virtual DDS::ReturnCode_t notify_datareaders();
00072 
00073   virtual DDS::ReturnCode_t set_qos(
00074     const DDS::SubscriberQos& qos);
00075 
00076   virtual DDS::ReturnCode_t get_qos(
00077     DDS::SubscriberQos& qos);
00078 
00079   virtual DDS::ReturnCode_t set_listener(
00080     DDS::SubscriberListener_ptr a_listener,
00081     DDS::StatusMask mask);
00082 
00083   virtual DDS::SubscriberListener_ptr get_listener();
00084 
00085 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00086 
00087   virtual DDS::ReturnCode_t begin_access();
00088 
00089   virtual DDS::ReturnCode_t end_access();
00090 
00091 #endif
00092 
00093   virtual DDS::DomainParticipant_ptr get_participant();
00094 
00095   virtual DDS::ReturnCode_t set_default_datareader_qos(
00096     const DDS::DataReaderQos& qos);
00097 
00098   virtual DDS::ReturnCode_t get_default_datareader_qos(
00099     DDS::DataReaderQos& qos);
00100 
00101   virtual DDS::ReturnCode_t copy_from_topic_qos(
00102     DDS::DataReaderQos& a_datareader_qos,
00103     const DDS::TopicQos& a_topic_qos);
00104 
00105   virtual DDS::ReturnCode_t enable();
00106 
00107   /** This method is not defined in the IDL and is defined for
00108   *  internal use.
00109   *  Check if there is any datareader associated with it.
00110   */
00111   bool is_clean() const;
00112 
00113   // called by DataReaderImpl::data_received
00114   void data_received(DataReaderImpl* reader);
00115 
00116   DDS::ReturnCode_t reader_enabled(const char* topic_name,
00117                                    DataReaderImpl* reader);
00118 
00119 #ifndef OPENDDS_NO_MULTI_TOPIC
00120   DDS::ReturnCode_t multitopic_reader_enabled(DDS::DataReader_ptr reader);
00121   void remove_from_datareader_set(DataReaderImpl* reader);
00122 #endif
00123 
00124   DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind);
00125 
00126   /// @name Raw Latency Statistics Configuration Interfaces
00127   /// @{
00128 
00129   /// Configure the size of the raw data collection buffer.
00130   unsigned int& raw_latency_buffer_size();
00131 
00132   /// Configure the type of the raw data collection buffer.
00133   DataCollector<double>::OnFull& raw_latency_buffer_type();
00134 
00135   /// @}
00136 
00137   typedef OPENDDS_VECTOR(RepoId) SubscriptionIdVec;
00138   /// Populates a std::vector with the SubscriptionIds (GUIDs)
00139   /// of this Subscriber's Data Readers
00140   void get_subscription_ids(SubscriptionIdVec& subs);
00141 
00142 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00143   void update_ownership_strength (const PublicationId& pub_id,
00144                                   const CORBA::Long& ownership_strength);
00145 #endif
00146 
00147 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00148   void coherent_change_received(RepoId& publisher_id,
00149                                 DataReaderImpl* reader,
00150                                 Coherent_State& group_state);
00151 #endif
00152 
00153   virtual EntityImpl* parent() const;
00154 
00155   static bool validate_datareader_qos(const DDS::DataReaderQos & qos,
00156                                       const DDS::DataReaderQos & default_qos,
00157                                       DDS::Topic_ptr a_topic,
00158                                       DDS::DataReaderQos & result_qos, bool mt);
00159 
00160 private:
00161 
00162   /// Keep track of all the DataReaders attached to this
00163   /// Subscriber: key is the topic_name
00164   typedef OPENDDS_MULTIMAP(OPENDDS_STRING, DataReaderImpl*) DataReaderMap;
00165 
00166   /// Keep track of DataReaders with data
00167   /// std::set for now, want to encapsulate
00168   /// this so we can switch between a set or
00169   /// list depending on Presentation Qos.
00170   typedef OPENDDS_SET(DataReaderImpl*) DataReaderSet;
00171 
00172   /// DataReader id to qos map.
00173   typedef OPENDDS_MAP_CMP(RepoId, DDS::DataReaderQos, GUID_tKeyLessThan) DrIdToQosMap;
00174 
00175   DDS::InstanceHandle_t        handle_;
00176 
00177   DDS::SubscriberQos           qos_;
00178   DDS::DataReaderQos           default_datareader_qos_;
00179 
00180   DDS::StatusMask              listener_mask_;
00181   DDS::SubscriberListener_var  listener_;
00182 
00183   DataReaderMap                datareader_map_;
00184   DataReaderSet                datareader_set_;
00185 
00186 #ifndef OPENDDS_NO_MULTI_TOPIC
00187   OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var) multitopic_reader_map_;
00188 #endif
00189 
00190   DomainParticipantImpl*       participant_;
00191   DDS::DomainParticipant_var   participant_objref_;
00192 
00193   DDS::DomainId_t              domain_id_;
00194 
00195   /// Bound (or initial reservation) of raw latency buffers.
00196   unsigned int raw_latency_buffer_size_;
00197 
00198   /// Type of raw latency data buffers.
00199   DataCollector<double>::OnFull raw_latency_buffer_type_;
00200 
00201   /// this lock protects the data structures in this class.
00202   ACE_Recursive_Thread_Mutex   si_lock_;
00203 
00204   /// Monitor object for this entity
00205   Monitor* monitor_;
00206 
00207   int access_depth_;
00208 };
00209 
00210 } // namespace DCPS
00211 } // namespace OpenDDS
00212 
00213 #endif /* OPENDDS_DCPS_SUBSCRIBER_H  */

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7