SubscriberImpl.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_SUBSCRIBER_H
00009 #define OPENDDS_DCPS_SUBSCRIBER_H
00010
00011 #include "dds/DCPS/DataReaderCallbacks.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "EntityImpl.h"
00014 #include "Definitions.h"
00015 #include "DataCollector_T.h"
00016 #include "DataReaderImpl.h"
00017 #include "ace/Recursive_Thread_Mutex.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019
00020 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00021 #pragma once
00022 #endif
00023
00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00025
00026 namespace OpenDDS {
00027 namespace DCPS {
00028
00029 class DomainParticipantImpl;
00030 class Monitor;
00031
00032 #ifndef OPENDDS_NO_MULTI_TOPIC
00033 class MultiTopicImpl;
00034 #endif
00035
00036 class OpenDDS_Dcps_Export SubscriberImpl
00037 : public virtual OpenDDS::DCPS::LocalObject<DDS::Subscriber>
00038 , public virtual EntityImpl {
00039 public:
00040
00041 SubscriberImpl(DDS::InstanceHandle_t handle,
00042 const DDS::SubscriberQos& qos,
00043 DDS::SubscriberListener_ptr a_listener,
00044 const DDS::StatusMask& mask,
00045 DomainParticipantImpl* participant);
00046
00047 virtual ~SubscriberImpl();
00048
00049 virtual DDS::InstanceHandle_t get_instance_handle();
00050
00051 bool contains_reader(DDS::InstanceHandle_t a_handle);
00052
00053 virtual DDS::DataReader_ptr create_datareader(
00054 DDS::TopicDescription_ptr a_topic_desc,
00055 const DDS::DataReaderQos& qos,
00056 DDS::DataReaderListener_ptr a_listener,
00057 DDS::StatusMask mask);
00058
00059 virtual DDS::ReturnCode_t delete_datareader(
00060 DDS::DataReader_ptr a_datareader);
00061
00062 virtual DDS::ReturnCode_t delete_contained_entities();
00063
00064 virtual DDS::DataReader_ptr lookup_datareader(
00065 const char* topic_name);
00066
00067 virtual DDS::ReturnCode_t get_datareaders(
00068 DDS::DataReaderSeq& readers,
00069 DDS::SampleStateMask sample_states,
00070 DDS::ViewStateMask view_states,
00071 DDS::InstanceStateMask instance_states);
00072
00073 virtual DDS::ReturnCode_t notify_datareaders();
00074
00075 virtual DDS::ReturnCode_t set_qos(
00076 const DDS::SubscriberQos& qos);
00077
00078 virtual DDS::ReturnCode_t get_qos(
00079 DDS::SubscriberQos& qos);
00080
00081 virtual DDS::ReturnCode_t set_listener(
00082 DDS::SubscriberListener_ptr a_listener,
00083 DDS::StatusMask mask);
00084
00085 virtual DDS::SubscriberListener_ptr get_listener();
00086
00087 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00088
00089 virtual DDS::ReturnCode_t begin_access();
00090
00091 virtual DDS::ReturnCode_t end_access();
00092
00093 #endif
00094
00095 virtual DDS::DomainParticipant_ptr get_participant();
00096
00097 virtual DDS::ReturnCode_t set_default_datareader_qos(
00098 const DDS::DataReaderQos& qos);
00099
00100 virtual DDS::ReturnCode_t get_default_datareader_qos(
00101 DDS::DataReaderQos& qos);
00102
00103 virtual DDS::ReturnCode_t copy_from_topic_qos(
00104 DDS::DataReaderQos& a_datareader_qos,
00105 const DDS::TopicQos& a_topic_qos);
00106
00107 virtual DDS::ReturnCode_t enable();
00108
00109
00110
00111
00112
00113 bool is_clean() const;
00114
00115
00116 void data_received(DataReaderImpl* reader);
00117
00118 DDS::ReturnCode_t reader_enabled(const char* topic_name,
00119 DataReaderImpl* reader);
00120
00121 #ifndef OPENDDS_NO_MULTI_TOPIC
00122 DDS::ReturnCode_t multitopic_reader_enabled(DDS::DataReader_ptr reader);
00123 void remove_from_datareader_set(DataReaderImpl* reader);
00124 #endif
00125
00126 DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind);
00127
00128
00129
00130
00131
00132 unsigned int& raw_latency_buffer_size();
00133
00134
00135 DataCollector<double>::OnFull& raw_latency_buffer_type();
00136
00137
00138
00139 typedef OPENDDS_VECTOR(RepoId) SubscriptionIdVec;
00140
00141
00142 void get_subscription_ids(SubscriptionIdVec& subs);
00143
00144 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00145 void update_ownership_strength (const PublicationId& pub_id,
00146 const CORBA::Long& ownership_strength);
00147 #endif
00148
00149 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00150 void coherent_change_received(RepoId& publisher_id,
00151 DataReaderImpl* reader,
00152 Coherent_State& group_state);
00153 #endif
00154
00155 virtual RcHandle<EntityImpl> parent() const;
00156
00157 static bool validate_datareader_qos(const DDS::DataReaderQos & qos,
00158 const DDS::DataReaderQos & default_qos,
00159 DDS::Topic_ptr a_topic,
00160 DDS::DataReaderQos & result_qos, bool mt);
00161
00162 private:
00163
00164
00165
00166 typedef OPENDDS_MULTIMAP(OPENDDS_STRING, DataReaderImpl_rch) DataReaderMap;
00167
00168
00169
00170
00171
00172 typedef OPENDDS_SET(DataReaderImpl_rch) DataReaderSet;
00173
00174
00175 typedef OPENDDS_MAP_CMP(RepoId, DDS::DataReaderQos, GUID_tKeyLessThan) DrIdToQosMap;
00176
00177 DDS::InstanceHandle_t handle_;
00178
00179 DDS::SubscriberQos qos_;
00180 DDS::DataReaderQos default_datareader_qos_;
00181
00182 DDS::StatusMask listener_mask_;
00183 DDS::SubscriberListener_var listener_;
00184
00185 DataReaderSet readers_not_enabled_;
00186 DataReaderMap datareader_map_;
00187 DataReaderSet datareader_set_;
00188
00189 #ifndef OPENDDS_NO_MULTI_TOPIC
00190 OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var) multitopic_reader_map_;
00191 #endif
00192
00193 WeakRcHandle<DomainParticipantImpl> participant_;
00194
00195 DDS::DomainId_t domain_id_;
00196 RepoId dp_id_;
00197
00198
00199 unsigned int raw_latency_buffer_size_;
00200
00201
00202 DataCollector<double>::OnFull raw_latency_buffer_type_;
00203
00204
00205 ACE_Recursive_Thread_Mutex si_lock_;
00206
00207
00208 Monitor* monitor_;
00209
00210 int access_depth_;
00211 };
00212
00213 }
00214 }
00215
00216 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00217
00218 #endif