00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_SUBSCRIBER_H
00009 #define OPENDDS_DCPS_SUBSCRIBER_H
00010
00011 #include "dds/DCPS/DataReaderCallbacks.h"
00012 #include "dds/DdsDcpsInfoUtilsC.h"
00013 #include "EntityImpl.h"
00014 #include "Definitions.h"
00015 #include "DataCollector_T.h"
00016 #include "DataReaderImpl.h"
00017 #include "ace/Synch.h"
00018 #include "dds/DCPS/PoolAllocator.h"
00019
00020 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00021 #pragma once
00022 #endif
00023
00024 namespace OpenDDS {
00025 namespace DCPS {
00026
00027 class DomainParticipantImpl;
00028 class Monitor;
00029
00030 #ifndef OPENDDS_NO_MULTI_TOPIC
00031 class MultiTopicImpl;
00032 #endif
00033
00034 class OpenDDS_Dcps_Export SubscriberImpl
00035 : public virtual OpenDDS::DCPS::LocalObject<DDS::Subscriber>
00036 , public virtual EntityImpl {
00037 public:
00038
00039 SubscriberImpl(DDS::InstanceHandle_t handle,
00040 const DDS::SubscriberQos& qos,
00041 DDS::SubscriberListener_ptr a_listener,
00042 const DDS::StatusMask& mask,
00043 DomainParticipantImpl* participant);
00044
00045 virtual ~SubscriberImpl();
00046
00047 virtual DDS::InstanceHandle_t get_instance_handle();
00048
00049 bool contains_reader(DDS::InstanceHandle_t a_handle);
00050
00051 virtual DDS::DataReader_ptr create_datareader(
00052 DDS::TopicDescription_ptr a_topic_desc,
00053 const DDS::DataReaderQos& qos,
00054 DDS::DataReaderListener_ptr a_listener,
00055 DDS::StatusMask mask);
00056
00057 virtual DDS::ReturnCode_t delete_datareader(
00058 DDS::DataReader_ptr a_datareader);
00059
00060 virtual DDS::ReturnCode_t delete_contained_entities();
00061
00062 virtual DDS::DataReader_ptr lookup_datareader(
00063 const char* topic_name);
00064
00065 virtual DDS::ReturnCode_t get_datareaders(
00066 DDS::DataReaderSeq& readers,
00067 DDS::SampleStateMask sample_states,
00068 DDS::ViewStateMask view_states,
00069 DDS::InstanceStateMask instance_states);
00070
00071 virtual DDS::ReturnCode_t notify_datareaders();
00072
00073 virtual DDS::ReturnCode_t set_qos(
00074 const DDS::SubscriberQos& qos);
00075
00076 virtual DDS::ReturnCode_t get_qos(
00077 DDS::SubscriberQos& qos);
00078
00079 virtual DDS::ReturnCode_t set_listener(
00080 DDS::SubscriberListener_ptr a_listener,
00081 DDS::StatusMask mask);
00082
00083 virtual DDS::SubscriberListener_ptr get_listener();
00084
00085 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00086
00087 virtual DDS::ReturnCode_t begin_access();
00088
00089 virtual DDS::ReturnCode_t end_access();
00090
00091 #endif
00092
00093 virtual DDS::DomainParticipant_ptr get_participant();
00094
00095 virtual DDS::ReturnCode_t set_default_datareader_qos(
00096 const DDS::DataReaderQos& qos);
00097
00098 virtual DDS::ReturnCode_t get_default_datareader_qos(
00099 DDS::DataReaderQos& qos);
00100
00101 virtual DDS::ReturnCode_t copy_from_topic_qos(
00102 DDS::DataReaderQos& a_datareader_qos,
00103 const DDS::TopicQos& a_topic_qos);
00104
00105 virtual DDS::ReturnCode_t enable();
00106
00107
00108
00109
00110
00111 bool is_clean() const;
00112
00113
00114 void data_received(DataReaderImpl* reader);
00115
00116 DDS::ReturnCode_t reader_enabled(const char* topic_name,
00117 DataReaderImpl* reader);
00118
00119 #ifndef OPENDDS_NO_MULTI_TOPIC
00120 DDS::ReturnCode_t multitopic_reader_enabled(DDS::DataReader_ptr reader);
00121 void remove_from_datareader_set(DataReaderImpl* reader);
00122 #endif
00123
00124 DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind);
00125
00126
00127
00128
00129
00130 unsigned int& raw_latency_buffer_size();
00131
00132
00133 DataCollector<double>::OnFull& raw_latency_buffer_type();
00134
00135
00136
00137 typedef OPENDDS_VECTOR(RepoId) SubscriptionIdVec;
00138
00139
00140 void get_subscription_ids(SubscriptionIdVec& subs);
00141
00142 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00143 void update_ownership_strength (const PublicationId& pub_id,
00144 const CORBA::Long& ownership_strength);
00145 #endif
00146
00147 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00148 void coherent_change_received(RepoId& publisher_id,
00149 DataReaderImpl* reader,
00150 Coherent_State& group_state);
00151 #endif
00152
00153 virtual EntityImpl* parent() const;
00154
00155 static bool validate_datareader_qos(const DDS::DataReaderQos & qos,
00156 const DDS::DataReaderQos & default_qos,
00157 DDS::Topic_ptr a_topic,
00158 DDS::DataReaderQos & result_qos, bool mt);
00159
00160 private:
00161
00162
00163
00164 typedef OPENDDS_MULTIMAP(OPENDDS_STRING, DataReaderImpl*) DataReaderMap;
00165
00166
00167
00168
00169
00170 typedef OPENDDS_SET(DataReaderImpl*) DataReaderSet;
00171
00172
00173 typedef OPENDDS_MAP_CMP(RepoId, DDS::DataReaderQos, GUID_tKeyLessThan) DrIdToQosMap;
00174
00175 DDS::InstanceHandle_t handle_;
00176
00177 DDS::SubscriberQos qos_;
00178 DDS::DataReaderQos default_datareader_qos_;
00179
00180 DDS::StatusMask listener_mask_;
00181 DDS::SubscriberListener_var listener_;
00182
00183 DataReaderMap datareader_map_;
00184 DataReaderSet datareader_set_;
00185
00186 #ifndef OPENDDS_NO_MULTI_TOPIC
00187 OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var) multitopic_reader_map_;
00188 #endif
00189
00190 DomainParticipantImpl* participant_;
00191 DDS::DomainParticipant_var participant_objref_;
00192
00193 DDS::DomainId_t domain_id_;
00194
00195
00196 unsigned int raw_latency_buffer_size_;
00197
00198
00199 DataCollector<double>::OnFull raw_latency_buffer_type_;
00200
00201
00202 ACE_Recursive_Thread_Mutex si_lock_;
00203
00204
00205 Monitor* monitor_;
00206
00207 int access_depth_;
00208 };
00209
00210 }
00211 }
00212
00213 #endif