Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 :
10 : #include "TopicImpl.h"
11 :
12 : #include "Qos_Helper.h"
13 : #include "FeatureDisabledQosCheck.h"
14 : #include "Definitions.h"
15 : #include "Service_Participant.h"
16 : #include "DomainParticipantImpl.h"
17 : #include "MonitorFactory.h"
18 : #include "DCPS_Utils.h"
19 : #include "transport/framework/TransportExceptions.h"
20 :
21 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
22 :
23 : namespace OpenDDS {
24 : namespace DCPS {
25 :
26 0 : TopicImpl::TopicImpl(const char* topic_name,
27 : const char* type_name,
28 : OpenDDS::DCPS::TypeSupport_ptr type_support,
29 : const DDS::TopicQos & qos,
30 : DDS::TopicListener_ptr a_listener,
31 : const DDS::StatusMask & mask,
32 0 : DomainParticipantImpl* participant)
33 : : TopicDescriptionImpl(topic_name,
34 : type_name,
35 : type_support,
36 : participant),
37 0 : qos_(qos),
38 0 : listener_mask_(mask),
39 0 : listener_(DDS::TopicListener::_duplicate(a_listener)),
40 0 : id_(GUID_UNKNOWN)
41 : {
42 0 : inconsistent_topic_status_.total_count = 0;
43 0 : inconsistent_topic_status_.total_count_change = 0;
44 0 : monitor_.reset(TheServiceParticipant->monitor_factory_->create_topic_monitor(this));
45 0 : }
46 :
47 0 : TopicImpl::~TopicImpl()
48 : {
49 0 : }
50 :
51 0 : DDS::ReturnCode_t TopicImpl::set_qos(const DDS::TopicQos& qos_arg)
52 : {
53 0 : DDS::TopicQos qos = qos_arg;
54 :
55 : OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
56 : OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
57 : OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
58 : OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
59 :
60 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
61 0 : if (qos_ == qos)
62 0 : return DDS::RETCODE_OK;
63 :
64 : // for the not changeable qos, it can be changed before enable
65 0 : if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
66 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
67 :
68 : } else {
69 0 : qos_ = qos;
70 :
71 : Discovery_rch disco =
72 0 : TheServiceParticipant->get_discovery(participant_->get_domain_id());
73 : const bool status =
74 0 : disco->update_topic_qos(this->id_, participant_->get_domain_id(),
75 0 : participant_->get_id(), qos_);
76 :
77 0 : if (!status) {
78 0 : ACE_ERROR_RETURN((LM_ERROR,
79 : ACE_TEXT("(%P|%t) TopicImpl::set_qos, ")
80 : ACE_TEXT("failed on compatibility check.\n")),
81 : DDS::RETCODE_ERROR);
82 : }
83 0 : }
84 :
85 0 : return DDS::RETCODE_OK;
86 :
87 : } else {
88 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
89 : }
90 0 : }
91 :
92 : DDS::ReturnCode_t
93 0 : TopicImpl::get_qos(DDS::TopicQos& qos)
94 : {
95 0 : qos = qos_;
96 0 : return DDS::RETCODE_OK;
97 : }
98 :
99 : DDS::ReturnCode_t
100 0 : TopicImpl::set_listener(DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
101 : {
102 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
103 0 : listener_mask_ = mask;
104 : //note: OK to duplicate a nil object ref
105 0 : listener_ = DDS::TopicListener::_duplicate(a_listener);
106 0 : return DDS::RETCODE_OK;
107 0 : }
108 :
109 : DDS::TopicListener_ptr
110 0 : TopicImpl::get_listener()
111 : {
112 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
113 0 : return DDS::TopicListener::_duplicate(listener_.in());
114 0 : }
115 :
116 : DDS::ReturnCode_t
117 0 : TopicImpl::get_inconsistent_topic_status(DDS::InconsistentTopicStatus& a_status)
118 : {
119 0 : ACE_Guard<ACE_Thread_Mutex> g(status_mutex_);
120 0 : set_status_changed_flag(DDS::INCONSISTENT_TOPIC_STATUS, false);
121 0 : a_status = inconsistent_topic_status_;
122 0 : inconsistent_topic_status_.total_count_change = 0;
123 0 : return DDS::RETCODE_OK;
124 0 : }
125 :
126 : DDS::ReturnCode_t
127 0 : TopicImpl::enable()
128 : {
129 : //According spec:
130 : // - Calling enable on an already enabled Entity returns OK and has no
131 : // effect.
132 : // - Calling enable on an Entity whose factory is not enabled will fail
133 : // and return PRECONDITION_NOT_MET.
134 :
135 0 : if (this->is_enabled()) {
136 0 : return DDS::RETCODE_OK;
137 : }
138 :
139 0 : if (!this->participant_->is_enabled()) {
140 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
141 : }
142 :
143 0 : if (id_ == GUID_UNKNOWN) {
144 0 : const DDS::DomainId_t dom_id = participant_->get_domain_id();
145 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(dom_id);
146 0 : TopicStatus status = disco->assert_topic(id_,
147 : dom_id,
148 0 : participant_->get_id(),
149 0 : topic_name_.c_str(),
150 0 : type_name_.c_str(),
151 0 : qos_,
152 0 : type_support_ ? type_support_->has_dcps_key() : false,
153 : this);
154 0 : if (status != CREATED && status != FOUND) {
155 0 : if (DCPS_debug_level >= 1) {
156 0 : ACE_ERROR((LM_ERROR,
157 : ACE_TEXT("(%P|%t) ERROR: TopicImpl::enable, ")
158 : ACE_TEXT("assert_topic failed with return value <%C>.\n"),
159 : topicstatus_to_string(status)));
160 : }
161 0 : return DDS::RETCODE_ERROR;
162 : }
163 0 : }
164 :
165 0 : if (this->monitor_) {
166 0 : monitor_->report();
167 : }
168 0 : return this->set_enabled();
169 : }
170 :
171 : GUID_t
172 0 : TopicImpl::get_id() const
173 : {
174 0 : return id_;
175 : }
176 :
177 : DDS::InstanceHandle_t
178 0 : TopicImpl::get_instance_handle()
179 : {
180 0 : return get_entity_instance_handle(id_, rchandle_from(participant_));
181 : }
182 :
183 : const char*
184 0 : TopicImpl::type_name() const
185 : {
186 0 : return this->type_name_.c_str();
187 : }
188 :
189 : const char*
190 0 : TopicImpl::topic_name() const
191 : {
192 0 : return this->topic_name_.c_str();
193 : }
194 :
195 :
196 : void
197 0 : TopicImpl::transport_config(const TransportConfig_rch&)
198 : {
199 0 : throw Transport::MiscProblem();
200 : }
201 :
202 : void
203 0 : TopicImpl::inconsistent_topic(int count)
204 : {
205 0 : ACE_Guard<ACE_Thread_Mutex> status_guard(status_mutex_);
206 0 : inconsistent_topic_status_.total_count_change += count - inconsistent_topic_status_.total_count;
207 0 : inconsistent_topic_status_.total_count = count;
208 :
209 0 : set_status_changed_flag(DDS::INCONSISTENT_TOPIC_STATUS, true);
210 :
211 0 : DDS::TopicListener_var listener;
212 : {
213 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
214 0 : listener = listener_;
215 0 : if (!listener || !(listener_mask_ & DDS::INCONSISTENT_TOPIC_STATUS)) {
216 0 : g.release();
217 0 : listener = participant_->listener_for(DDS::INCONSISTENT_TOPIC_STATUS);
218 : }
219 0 : }
220 0 : if (listener) {
221 0 : const DDS::InconsistentTopicStatus status = inconsistent_topic_status_;
222 0 : inconsistent_topic_status_.total_count_change = 0;
223 0 : status_guard.release();
224 0 : listener->on_inconsistent_topic(this, status);
225 : } else {
226 0 : status_guard.release();
227 : }
228 :
229 0 : notify_status_condition();
230 0 : }
231 :
232 0 : bool TopicImpl::check_data_representation(const DDS::DataRepresentationIdSeq& qos_ids, bool is_data_writer)
233 : {
234 0 : if (!type_support_) {
235 0 : return true;
236 : }
237 0 : DDS::DataRepresentationIdSeq type_allowed_reprs;
238 0 : type_support_->representations_allowed_by_type(type_allowed_reprs);
239 : //default for blank annotation is to allow all types of data representation
240 0 : if (type_allowed_reprs.length() == 0) {
241 0 : return true;
242 : }
243 0 : if (qos_ids.length() == 0) {
244 0 : if (log_level >= LogLevel::Notice) {
245 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: TopicImpl::check_data_representation: "
246 : "representation qos is blank.\n"));
247 : }
248 0 : return false;
249 : }
250 : //Data Writer will only use the 1st QoS declared
251 0 : if (is_data_writer) {
252 0 : DDS::DataRepresentationId_t id = qos_ids[0];
253 0 : for (CORBA::ULong j = 0; j < type_allowed_reprs.length(); ++j) {
254 0 : if (id == type_allowed_reprs[j]) {
255 0 : return true;
256 : }
257 : }
258 : } else { // if data reader compare both lists for a compatible QoS
259 0 : for (CORBA::ULong i = 0; i < qos_ids.length(); ++i) {
260 0 : for (CORBA::ULong j = 0; j < type_allowed_reprs.length(); ++j) {
261 0 : if (qos_ids[i] == type_allowed_reprs[j]) {
262 0 : return true;
263 : }
264 : }
265 : }
266 : }
267 0 : if (log_level >= LogLevel::Notice) {
268 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: TopicImpl::check_data_representation: "
269 : "none of the data representation QoS: %C is allowed by the "
270 : "topic type IDL annotations: %C\n", repr_seq_to_string(qos_ids, is_data_writer).c_str(), repr_seq_to_string(type_allowed_reprs).c_str()));
271 : }
272 0 : return false;
273 0 : }
274 :
275 : } // namespace DCPS
276 : } // namespace OpenDDS
277 :
278 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|