Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
7 :
8 : #include "debug.h"
9 : #include "SubscriberImpl.h"
10 : #include "FeatureDisabledQosCheck.h"
11 : #include "DomainParticipantImpl.h"
12 : #include "Qos_Helper.h"
13 : #include "GuidConverter.h"
14 : #include "BuiltInTopicUtils.h"
15 : #include "TopicImpl.h"
16 : #include "MonitorFactory.h"
17 : #include "DataReaderImpl.h"
18 : #include "Service_Participant.h"
19 : #include "TopicDescriptionImpl.h"
20 : #include "Marked_Default_Qos.h"
21 : #include "Transient_Kludge.h"
22 : #include "ContentFilteredTopicImpl.h"
23 : #include "MultiTopicImpl.h"
24 : #include "GroupRakeData.h"
25 : #include "MultiTopicDataReaderBase.h"
26 : #include "Util.h"
27 : #include "transport/framework/TransportImpl.h"
28 : #include "transport/framework/DataLinkSet.h"
29 : #include "DCPS_Utils.h"
30 : #include "PoolAllocator.h"
31 :
32 : #include <dds/DdsDcpsTypeSupportExtC.h>
33 :
34 : #include <stdexcept>
35 :
36 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
37 :
38 : namespace OpenDDS {
39 : namespace DCPS {
40 :
41 0 : SubscriberImpl::SubscriberImpl(DDS::InstanceHandle_t handle,
42 : const DDS::SubscriberQos & qos,
43 : DDS::SubscriberListener_ptr a_listener,
44 : const DDS::StatusMask& mask,
45 0 : DomainParticipantImpl* participant)
46 0 : : handle_(handle),
47 0 : qos_(qos),
48 0 : default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
49 0 : listener_mask_(mask),
50 0 : participant_(*participant),
51 0 : domain_id_(participant->get_domain_id()),
52 0 : raw_latency_buffer_size_(0),
53 0 : raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
54 0 : access_depth_ (0)
55 : {
56 : //Note: OK to duplicate a nil.
57 0 : listener_ = DDS::SubscriberListener::_duplicate(a_listener);
58 :
59 0 : monitor_.reset(TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this));
60 0 : }
61 :
62 0 : SubscriberImpl::~SubscriberImpl()
63 : {
64 0 : const RcHandle<DomainParticipantImpl> participant = participant_.lock();
65 0 : if (participant) {
66 0 : participant->return_handle(handle_);
67 : }
68 :
69 : // The datareaders should be deleted already before calling delete
70 : // subscriber.
71 0 : String leftover_entities;
72 0 : if (!is_clean(&leftover_entities)) {
73 0 : if (log_level >= LogLevel::Warning) {
74 0 : ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: SubscriberImpl::~SubscriberImpl: "
75 : "%C still exist\n", leftover_entities.c_str()));
76 : }
77 : }
78 0 : }
79 :
80 : DDS::InstanceHandle_t
81 0 : SubscriberImpl::get_instance_handle()
82 : {
83 0 : return handle_;
84 : }
85 :
86 : bool
87 0 : SubscriberImpl::contains_reader(DDS::InstanceHandle_t a_handle)
88 : {
89 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
90 : guard,
91 : this->si_lock_,
92 : false);
93 :
94 0 : for (DataReaderMap::iterator it(datareader_map_.begin());
95 0 : it != datareader_map_.end(); ++it) {
96 0 : if (a_handle == it->second->get_instance_handle()) {
97 0 : return true;
98 : }
99 : }
100 :
101 0 : return false;
102 0 : }
103 :
104 : DDS::DataReader_ptr
105 0 : SubscriberImpl::create_datareader(
106 : DDS::TopicDescription_ptr a_topic_desc,
107 : const DDS::DataReaderQos & qos,
108 : DDS::DataReaderListener_ptr a_listener,
109 : DDS::StatusMask mask)
110 : {
111 0 : if (CORBA::is_nil(a_topic_desc)) {
112 0 : if (DCPS_debug_level > 0) {
113 0 : ACE_ERROR((LM_ERROR,
114 : ACE_TEXT("(%P|%t) ERROR: ")
115 : ACE_TEXT("SubscriberImpl::create_datareader, ")
116 : ACE_TEXT("topic desc is nil.\n")));
117 : }
118 0 : return DDS::DataReader::_nil();
119 : }
120 :
121 0 : DDS::DataReaderQos dr_qos;
122 0 : RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
123 0 : if (!participant)
124 0 : return DDS::DataReader::_nil();
125 :
126 0 : TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
127 :
128 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
129 0 : ContentFilteredTopicImpl* cft = 0;
130 : #endif
131 : #ifndef OPENDDS_NO_MULTI_TOPIC
132 0 : MultiTopicImpl* mt = 0;
133 : #else
134 : bool mt = false;
135 : #endif
136 :
137 0 : if (!topic_servant) {
138 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
139 0 : cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
140 0 : if (cft) {
141 0 : DDS::Topic_var related;
142 0 : related = cft->get_related_topic();
143 0 : topic_servant = dynamic_cast<TopicImpl*>(related.in());
144 0 : }
145 : else
146 : #endif
147 : {
148 : #ifndef OPENDDS_NO_MULTI_TOPIC
149 0 : mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
150 : #endif
151 : }
152 : }
153 :
154 0 : if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
155 0 : return DDS::DataReader::_nil();
156 :
157 : #ifndef OPENDDS_NO_MULTI_TOPIC
158 0 : if (mt) {
159 : try {
160 : DDS::DataReader_var dr =
161 0 : mt->get_type_support()->create_multitopic_datareader();
162 : MultiTopicDataReaderBase* mtdr =
163 0 : dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
164 0 : mtdr->init(dr_qos, a_listener, mask, this, mt);
165 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
166 0 : if (dr->enable() != DDS::RETCODE_OK) {
167 0 : if (DCPS_debug_level > 0) {
168 0 : ACE_ERROR((LM_ERROR,
169 : ACE_TEXT("(%P|%t) ERROR: ")
170 : ACE_TEXT("SubscriberImpl::create_datareader, ")
171 : ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
172 : }
173 0 : return DDS::DataReader::_nil();
174 : }
175 0 : multitopic_reader_enabled(dr);
176 : }
177 0 : return dr._retn();
178 0 : } catch (const std::exception& e) {
179 0 : if (DCPS_debug_level > 0) {
180 0 : ACE_ERROR((LM_ERROR,
181 : ACE_TEXT("(%P|%t) ERROR: ")
182 : ACE_TEXT("SubscriberImpl::create_datareader, ")
183 : ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
184 : e.what()));
185 : }
186 0 : }
187 0 : return DDS::DataReader::_nil();
188 : }
189 : #endif
190 :
191 : OpenDDS::DCPS::TypeSupport_ptr typesupport =
192 0 : topic_servant->get_type_support();
193 :
194 0 : if (0 == typesupport) {
195 0 : CORBA::String_var name = a_topic_desc->get_name();
196 0 : if (DCPS_debug_level > 0) {
197 0 : ACE_ERROR((LM_ERROR,
198 : ACE_TEXT("(%P|%t) ERROR: ")
199 : ACE_TEXT("SubscriberImpl::create_datareader, ")
200 : ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
201 : name.in()));
202 : }
203 0 : return DDS::DataReader::_nil();
204 0 : }
205 :
206 0 : DDS::DataReader_var dr_obj = typesupport->create_datareader();
207 :
208 : DataReaderImpl* dr_servant =
209 0 : dynamic_cast<DataReaderImpl*>(dr_obj.in());
210 :
211 0 : if (dr_servant == 0) {
212 0 : if (DCPS_debug_level > 0) {
213 0 : ACE_ERROR((LM_ERROR,
214 : ACE_TEXT("(%P|%t) ERROR: ")
215 : ACE_TEXT("SubscriberImpl::create_datareader, ")
216 : ACE_TEXT("servant is nil.\n")));
217 : }
218 0 : return DDS::DataReader::_nil();
219 : }
220 :
221 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
222 0 : if (cft) {
223 0 : dr_servant->enable_filtering(cft);
224 : }
225 : #endif
226 :
227 : // Propagate the latency buffer data collection configuration.
228 : // @TODO: Determine whether we want to exclude the Builtin Topic
229 : // readers from data gathering.
230 0 : dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_;
231 0 : dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_;
232 :
233 :
234 0 : dr_servant->init(topic_servant,
235 : dr_qos,
236 : a_listener,
237 : mask,
238 : participant.in(),
239 : this);
240 :
241 0 : if (enabled_ && qos_.entity_factory.autoenable_created_entities) {
242 0 : const DDS::ReturnCode_t ret = dr_servant->enable();
243 :
244 0 : if (ret != DDS::RETCODE_OK) {
245 0 : if (DCPS_debug_level > 0) {
246 0 : ACE_ERROR((LM_WARNING,
247 : ACE_TEXT("(%P|%t) WARNING: ")
248 : ACE_TEXT("SubscriberImpl::create_datareader, ")
249 : ACE_TEXT("enable failed.\n")));
250 : }
251 0 : return DDS::DataReader::_nil();
252 : }
253 : } else {
254 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, 0);
255 0 : readers_not_enabled_.insert(rchandle_from(dr_servant));
256 0 : }
257 :
258 : // add created data reader to this' data reader container -
259 : // done in enable_reader
260 0 : return DDS::DataReader::_duplicate(dr_obj.in());
261 0 : }
262 :
263 : DDS::ReturnCode_t
264 0 : SubscriberImpl::delete_datareader(::DDS::DataReader_ptr a_datareader)
265 : {
266 : DBG_ENTRY_LVL("SubscriberImpl", "delete_datareader", 6);
267 :
268 0 : DataReaderImpl_rch dr_servant = rchandle_from(dynamic_cast<DataReaderImpl*>(a_datareader));
269 :
270 0 : if (dr_servant) { // for MultiTopic this will be false
271 0 : const char* reason = " (ERROR: unknown reason)";
272 0 : DDS::ReturnCode_t rc = DDS::RETCODE_OK;
273 0 : RcHandle<SubscriberImpl> dr_subscriber = dr_servant->get_subscriber_servant();
274 0 : if (dr_subscriber.get() != this) {
275 0 : reason = "doesn't belong to this subscriber.";
276 0 : rc = DDS::RETCODE_PRECONDITION_NOT_MET;
277 0 : } else if (dr_servant->has_zero_copies()) {
278 0 : reason = "has outstanding zero-copy samples loaned out.";
279 0 : rc = DDS::RETCODE_PRECONDITION_NOT_MET;
280 0 : } else if (!dr_servant->read_conditions_.empty()) {
281 0 : reason = "has read conditions attached.";
282 0 : rc = DDS::RETCODE_PRECONDITION_NOT_MET;
283 : }
284 0 : if (rc != DDS::RETCODE_OK) {
285 0 : if (log_level >= LogLevel::Notice) {
286 0 : DDS::TopicDescription_var topic = a_datareader->get_topicdescription();
287 0 : CORBA::String_var topic_name = topic->get_name();
288 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: SubscriberImpl::delete_datareader: "
289 : "on reader %C (topic \"%C\") will return \"%C\" because it %C\n",
290 : LogGuid(dr_servant->get_id()).c_str(), topic_name.in(),
291 : retcode_to_string(rc), reason));
292 0 : }
293 0 : return rc;
294 : }
295 :
296 : // marks entity as deleted and stops future associating
297 0 : dr_servant->prepare_to_delete();
298 0 : }
299 :
300 : {
301 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
302 : si_guard,
303 : this->si_lock_,
304 : DDS::RETCODE_ERROR);
305 :
306 0 : DataReaderMap::iterator it;
307 0 : for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
308 0 : if (it->second == dr_servant) {
309 0 : break;
310 : }
311 : }
312 :
313 0 : if (it == datareader_map_.end()) {
314 0 : DDS::TopicDescription_var td = a_datareader->get_topicdescription();
315 0 : CORBA::String_var topic_name = td->get_name();
316 : #ifndef OPENDDS_NO_MULTI_TOPIC
317 0 : MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.find(topic_name.in());
318 0 : if (mt_iter != multitopic_reader_map_.end()) {
319 0 : DDS::DataReader_ptr ptr = mt_iter->second;
320 0 : MultiTopicDataReaderBase* mtdrb = dynamic_cast<MultiTopicDataReaderBase*>(ptr);
321 0 : if (!mtdrb) {
322 0 : if (DCPS_debug_level > 0) {
323 0 : ACE_ERROR((LM_ERROR,
324 : ACE_TEXT("(%P|%t) ERROR: ")
325 : ACE_TEXT("SubscriberImpl::delete_datareader: ")
326 : ACE_TEXT("datareader(topic_name=%C)")
327 : ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n"),
328 : topic_name.in()));
329 : }
330 0 : return ::DDS::RETCODE_ERROR;
331 : }
332 0 : mtdrb->cleanup();
333 0 : multitopic_reader_map_.erase(mt_iter);
334 0 : return DDS::RETCODE_OK;
335 : }
336 : #endif
337 0 : if (!dr_servant) {
338 0 : if (DCPS_debug_level > 0) {
339 0 : ACE_ERROR((LM_ERROR,
340 : ACE_TEXT("(%P|%t) ERROR: ")
341 : ACE_TEXT("SubscriberImpl::delete_datareader: ")
342 : ACE_TEXT("datareader(topic_name=%C)")
343 : ACE_TEXT("for unknown repo id not found.\n"),
344 : topic_name.in()));
345 : }
346 0 : return ::DDS::RETCODE_ERROR;
347 : }
348 0 : if (DCPS_debug_level > 0) {
349 0 : GUID_t id = dr_servant->get_guid();
350 0 : ACE_ERROR((LM_ERROR,
351 : ACE_TEXT("(%P|%t) ERROR: ")
352 : ACE_TEXT("SubscriberImpl::delete_datareader: ")
353 : ACE_TEXT("datareader(topic_name=%C) %C not found.\n"),
354 : topic_name.in(),
355 : LogGuid(id).c_str()));
356 : }
357 0 : return ::DDS::RETCODE_ERROR;
358 0 : }
359 :
360 0 : datareader_map_.erase(it);
361 :
362 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
363 : dr_set_guard,
364 : this->dr_set_lock_,
365 : DDS::RETCODE_ERROR);
366 0 : datareader_set_.erase(dr_servant);
367 0 : }
368 :
369 0 : if (this->monitor_) {
370 0 : this->monitor_->report();
371 : }
372 :
373 0 : const GUID_t subscription_id = dr_servant->get_guid();
374 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
375 0 : if (!disco->remove_subscription(this->domain_id_,
376 0 : this->dp_id_,
377 : subscription_id)) {
378 0 : if (DCPS_debug_level > 0) {
379 0 : ACE_ERROR((LM_ERROR,
380 : ACE_TEXT("(%P|%t) ERROR: ")
381 : ACE_TEXT("SubscriberImpl::delete_datareader: ")
382 : ACE_TEXT(" could not remove subscription from discovery.\n")));
383 : }
384 0 : return ::DDS::RETCODE_ERROR;
385 : }
386 :
387 : // Call remove association before unregistering the datareader from the transport,
388 : // otherwise some callbacks resulted from remove_association may be lost.
389 0 : dr_servant->remove_all_associations();
390 0 : dr_servant->cleanup();
391 0 : return DDS::RETCODE_OK;
392 0 : }
393 :
394 : DDS::ReturnCode_t
395 0 : SubscriberImpl::delete_contained_entities()
396 : {
397 : // mark that the entity is being deleted
398 0 : set_deleted(true);
399 :
400 0 : OPENDDS_VECTOR(DDS::DataReader*) drs;
401 :
402 : #ifndef OPENDDS_NO_MULTI_TOPIC
403 : {
404 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
405 : guard,
406 : this->si_lock_,
407 : DDS::RETCODE_ERROR);
408 0 : for (MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.begin();
409 0 : mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
410 0 : drs.push_back(mt_iter->second);
411 : }
412 0 : }
413 :
414 0 : for (size_t i = 0; i < drs.size(); ++i) {
415 0 : DDS::ReturnCode_t ret = drs[i]->delete_contained_entities();
416 0 : if (ret == DDS::RETCODE_OK) {
417 0 : ret = delete_datareader(drs[i]);
418 : }
419 0 : if (ret != DDS::RETCODE_OK) {
420 0 : if (DCPS_debug_level > 0) {
421 0 : ACE_ERROR((LM_ERROR,
422 : ACE_TEXT("(%P|%t) ERROR: ")
423 : ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
424 : ACE_TEXT("failed to delete datareader\n")));
425 : }
426 0 : return ret;
427 : }
428 : }
429 0 : drs.clear();
430 : #endif
431 :
432 : {
433 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
434 : guard,
435 : this->si_lock_,
436 : DDS::RETCODE_ERROR);
437 0 : DataReaderMap::iterator it;
438 0 : DataReaderMap::iterator itEnd = datareader_map_.end();
439 :
440 0 : for (it = datareader_map_.begin(); it != itEnd; ++it) {
441 0 : drs.push_back(it->second.in());
442 : }
443 0 : }
444 :
445 0 : for (size_t i = 0; i < drs.size(); ++i) {
446 0 : DDS::ReturnCode_t ret = drs[i]->delete_contained_entities();
447 0 : if (ret == DDS::RETCODE_OK) {
448 0 : ret = delete_datareader(drs[i]);
449 : }
450 0 : if (ret != DDS::RETCODE_OK) {
451 0 : if (DCPS_debug_level > 0) {
452 0 : ACE_ERROR((LM_ERROR,
453 : ACE_TEXT("(%P|%t) ERROR: ")
454 : ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
455 : ACE_TEXT("failed to delete datareader\n")));
456 : }
457 0 : return ret;
458 : }
459 : }
460 :
461 : // the subscriber can now start creating new publications
462 0 : set_deleted(false);
463 :
464 0 : return DDS::RETCODE_OK;
465 0 : }
466 :
467 : DDS::DataReader_ptr
468 0 : SubscriberImpl::lookup_datareader(
469 : const char * topic_name)
470 : {
471 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
472 : guard,
473 : this->si_lock_,
474 : DDS::DataReader::_nil());
475 :
476 : // If multiple entries whose key is "topic_name" then which one is
477 : // returned ? Spec does not limit which one should give.
478 0 : DataReaderMap::iterator it = datareader_map_.find(topic_name);
479 :
480 0 : if (it == datareader_map_.end()) {
481 : #ifndef OPENDDS_NO_MULTI_TOPIC
482 0 : MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.find(topic_name);
483 0 : if (mt_iter != multitopic_reader_map_.end()) {
484 0 : return DDS::DataReader::_duplicate(mt_iter->second);
485 : }
486 : #endif
487 :
488 0 : if (DCPS_debug_level >= 2) {
489 0 : ACE_DEBUG((LM_DEBUG,
490 : ACE_TEXT("(%P|%t) ")
491 : ACE_TEXT("SubscriberImpl::lookup_datareader, ")
492 : ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
493 : topic_name));
494 : }
495 :
496 0 : return DDS::DataReader::_nil();
497 :
498 : } else {
499 0 : return DDS::DataReader::_duplicate(it->second.in());
500 : }
501 0 : }
502 :
503 : DDS::ReturnCode_t
504 0 : SubscriberImpl::get_datareaders(
505 : DDS::DataReaderSeq & readers,
506 : DDS::SampleStateMask sample_states,
507 : DDS::ViewStateMask view_states,
508 : DDS::InstanceStateMask instance_states)
509 : {
510 0 : DataReaderSet localreaders;
511 : {
512 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
513 : guard,
514 : this->dr_set_lock_,
515 : DDS::RETCODE_ERROR);
516 0 : localreaders = datareader_set_;
517 0 : }
518 :
519 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
520 : // If access_scope is GROUP and ordered_access is true then return readers as
521 : // list which may contain same readers multiple times. Otherwise return readers
522 : // as set.
523 0 : if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) {
524 0 : if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
525 0 : return ::DDS::RETCODE_PRECONDITION_NOT_MET;
526 : }
527 0 : if (this->qos_.presentation.ordered_access) {
528 :
529 0 : GroupRakeData data;
530 0 : for (DataReaderSet::const_iterator pos = localreaders.begin();
531 0 : pos != localreaders.end(); ++pos) {
532 0 : (*pos)->get_ordered_data(data, sample_states, view_states, instance_states);
533 : }
534 :
535 : // Return list of readers in the order of the source timestamp of the received
536 : // samples from readers.
537 0 : data.get_datareaders(readers);
538 0 : return DDS::RETCODE_OK;
539 0 : }
540 : }
541 : #endif
542 :
543 : // Return set of datareaders.
544 0 : readers.length(0);
545 0 : for (DataReaderSet::const_iterator pos = localreaders.begin();
546 0 : pos != localreaders.end(); ++pos) {
547 0 : if ((*pos)->have_sample_states(sample_states) &&
548 0 : (*pos)->have_view_states(view_states) &&
549 0 : (*pos)->have_instance_states(instance_states)) {
550 0 : push_back(readers, DDS::DataReader::_duplicate(pos->in()));
551 : }
552 : }
553 :
554 0 : return DDS::RETCODE_OK;
555 0 : }
556 :
557 : DDS::ReturnCode_t
558 0 : SubscriberImpl::notify_datareaders()
559 : {
560 0 : DataReaderMap localreadermap;
561 : {
562 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
563 : guard,
564 : this->si_lock_,
565 : DDS::RETCODE_ERROR);
566 0 : localreadermap = datareader_map_;
567 0 : }
568 0 : for (DataReaderMap::iterator it = localreadermap.begin(); it != localreadermap.end(); ++it) {
569 0 : if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
570 0 : DDS::DataReaderListener_var listener = it->second->get_listener();
571 0 : if (!it->second->is_bit()) {
572 0 : it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
573 0 : if (listener) {
574 0 : listener->on_data_available(it->second.in());
575 : }
576 : } else {
577 0 : TheServiceParticipant->job_queue()->enqueue(make_rch<DataReaderImpl::OnDataAvailable>(listener, it->second, listener, true, false));
578 : }
579 0 : }
580 : }
581 :
582 : #ifndef OPENDDS_NO_MULTI_TOPIC
583 0 : MultitopicReaderMap localmtr;
584 : {
585 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
586 : guard,
587 : this->si_lock_,
588 : DDS::RETCODE_ERROR);
589 0 : localmtr = multitopic_reader_map_;
590 0 : }
591 :
592 0 : for (MultitopicReaderMap::iterator it = localmtr.begin();
593 0 : it != localmtr.end(); ++it) {
594 : MultiTopicDataReaderBase* dri =
595 0 : dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
596 :
597 0 : if (!dri) {
598 0 : if (DCPS_debug_level > 0) {
599 0 : ACE_ERROR((LM_ERROR,
600 : ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ")
601 : ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n")));
602 : }
603 0 : return ::DDS::RETCODE_ERROR;
604 : }
605 :
606 0 : if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
607 0 : DDS::DataReaderListener_var listener = dri->get_listener();
608 0 : dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
609 0 : if (!CORBA::is_nil(listener)) {
610 0 : listener->on_data_available(dri);
611 : }
612 0 : }
613 : }
614 : #endif
615 :
616 0 : return DDS::RETCODE_OK;
617 0 : }
618 :
619 : DDS::ReturnCode_t
620 0 : SubscriberImpl::set_qos(
621 : const DDS::SubscriberQos & qos)
622 : {
623 :
624 : OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
625 :
626 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
627 0 : if (qos_ == qos)
628 0 : return DDS::RETCODE_OK;
629 :
630 : // for the not changeable qos, it can be changed before enable
631 0 : if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
632 0 : return DDS::RETCODE_IMMUTABLE_POLICY;
633 :
634 : } else {
635 0 : qos_ = qos;
636 :
637 0 : DrIdToQosMap idToQosMap;
638 : {
639 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
640 : guard,
641 : this->si_lock_,
642 : DDS::RETCODE_ERROR);
643 : // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx
644 0 : DataReaderMap::iterator endIter = datareader_map_.end();
645 :
646 0 : for (DataReaderMap::iterator iter = datareader_map_.begin();
647 0 : iter != endIter; ++iter) {
648 0 : DataReaderImpl_rch reader = iter->second;
649 0 : reader->set_subscriber_qos (qos);
650 0 : DDS::DataReaderQos qos = reader->qos_;
651 0 : GUID_t id = reader->get_guid();
652 : std::pair<DrIdToQosMap::iterator, bool> pair
653 0 : = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
654 :
655 0 : if (!pair.second) {
656 0 : if (DCPS_debug_level > 0) {
657 0 : ACE_ERROR((LM_ERROR,
658 : ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
659 : ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
660 : LogGuid(id).c_str()));
661 : }
662 0 : return ::DDS::RETCODE_ERROR;
663 : }
664 0 : }
665 0 : }
666 :
667 0 : DrIdToQosMap::iterator iter = idToQosMap.begin();
668 :
669 0 : while (iter != idToQosMap.end()) {
670 0 : Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
671 : const bool status
672 0 : = disco->update_subscription_qos(this->domain_id_,
673 0 : this->dp_id_,
674 0 : iter->first,
675 0 : iter->second,
676 0 : this->qos_);
677 :
678 0 : if (!status) {
679 0 : if (DCPS_debug_level > 0) {
680 0 : ACE_ERROR((LM_ERROR,
681 : ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
682 : ACE_TEXT("failed.\n")));
683 : }
684 0 : return DDS::RETCODE_ERROR;
685 : }
686 :
687 0 : ++iter;
688 0 : }
689 0 : }
690 :
691 0 : return DDS::RETCODE_OK;
692 :
693 : } else {
694 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
695 : }
696 : }
697 :
698 : DDS::ReturnCode_t
699 0 : SubscriberImpl::get_qos(
700 : DDS::SubscriberQos & qos)
701 : {
702 0 : qos = qos_;
703 0 : return DDS::RETCODE_OK;
704 : }
705 :
706 : DDS::ReturnCode_t
707 0 : SubscriberImpl::set_listener(
708 : DDS::SubscriberListener_ptr a_listener,
709 : DDS::StatusMask mask)
710 : {
711 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
712 0 : listener_mask_ = mask;
713 : //note: OK to duplicate a nil object ref
714 0 : listener_ = DDS::SubscriberListener::_duplicate(a_listener);
715 0 : return DDS::RETCODE_OK;
716 0 : }
717 :
718 : DDS::SubscriberListener_ptr
719 0 : SubscriberImpl::get_listener()
720 : {
721 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
722 0 : return DDS::SubscriberListener::_duplicate(listener_.in());
723 0 : }
724 :
725 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
726 :
727 : DDS::ReturnCode_t
728 0 : SubscriberImpl::begin_access()
729 : {
730 0 : DataReaderSet to_call;
731 : {
732 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
733 : si_guard,
734 : si_lock_,
735 : DDS::RETCODE_ERROR);
736 0 : if (!enabled_) {
737 0 : if (DCPS_debug_level > 0) {
738 0 : ACE_ERROR((LM_ERROR,
739 : ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
740 : ACE_TEXT(" Subscriber is not enabled!\n")));
741 : }
742 0 : return DDS::RETCODE_NOT_ENABLED;
743 : }
744 :
745 0 : if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
746 0 : return DDS::RETCODE_OK;
747 : }
748 :
749 0 : ++access_depth_;
750 : // We should only notify subscription on the first
751 : // and last change to the current change set:
752 0 : if (access_depth_ == 1) {
753 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
754 : dr_set_guard,
755 : dr_set_lock_,
756 : DDS::RETCODE_ERROR);
757 0 : to_call = datareader_set_;
758 0 : }
759 0 : }
760 :
761 0 : for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
762 0 : (*it)->begin_access();
763 : }
764 0 : return DDS::RETCODE_OK;
765 0 : }
766 :
767 : DDS::ReturnCode_t
768 0 : SubscriberImpl::end_access()
769 : {
770 0 : DataReaderSet to_call;
771 : {
772 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
773 : si_guard,
774 : si_lock_,
775 : DDS::RETCODE_ERROR);
776 0 : if (!enabled_) {
777 0 : if (DCPS_debug_level > 0) {
778 0 : ACE_ERROR((LM_ERROR,
779 : ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
780 : ACE_TEXT(" Publisher is not enabled!\n")));
781 : }
782 0 : return DDS::RETCODE_NOT_ENABLED;
783 : }
784 :
785 0 : if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
786 0 : return DDS::RETCODE_OK;
787 : }
788 :
789 0 : if (access_depth_ == 0) {
790 0 : if (DCPS_debug_level > 0) {
791 0 : ACE_ERROR((LM_ERROR,
792 : ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
793 : ACE_TEXT(" No matching call to begin_coherent_changes!\n")));
794 : }
795 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
796 : }
797 :
798 0 : --access_depth_;
799 : // We should only notify subscription on the first
800 : // and last change to the current change set:
801 0 : if (access_depth_ == 0) {
802 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
803 : dr_set_guard,
804 : dr_set_lock_,
805 : DDS::RETCODE_ERROR);
806 0 : to_call = datareader_set_;
807 0 : }
808 0 : }
809 :
810 0 : for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
811 0 : (*it)->end_access();
812 : }
813 0 : return DDS::RETCODE_OK;
814 0 : }
815 :
816 : #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
817 :
818 : DDS::DomainParticipant_ptr
819 0 : SubscriberImpl::get_participant()
820 : {
821 0 : return participant_.lock()._retn();
822 : }
823 :
824 : DDS::ReturnCode_t
825 0 : SubscriberImpl::set_default_datareader_qos(
826 : const DDS::DataReaderQos & qos)
827 : {
828 0 : if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
829 0 : default_datareader_qos_ = qos;
830 0 : return DDS::RETCODE_OK;
831 :
832 : } else {
833 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
834 : }
835 : }
836 :
837 : DDS::ReturnCode_t
838 0 : SubscriberImpl::get_default_datareader_qos(
839 : DDS::DataReaderQos & qos)
840 : {
841 0 : qos = default_datareader_qos_;
842 0 : return DDS::RETCODE_OK;
843 : }
844 :
845 : DDS::ReturnCode_t
846 0 : SubscriberImpl::copy_from_topic_qos(
847 : DDS::DataReaderQos & a_datareader_qos,
848 : const DDS::TopicQos & a_topic_qos)
849 : {
850 0 : if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
851 0 : return DDS::RETCODE_OK;
852 :
853 : } else {
854 0 : return DDS::RETCODE_INCONSISTENT_POLICY;
855 : }
856 : }
857 :
858 : DDS::ReturnCode_t
859 0 : SubscriberImpl::enable()
860 : {
861 : //According spec:
862 : // - Calling enable on an already enabled Entity returns OK and has no
863 : // effect.
864 : // - Calling enable on an Entity whose factory is not enabled will fail
865 : // and return PRECONDITION_NOT_MET.
866 :
867 0 : if (this->is_enabled()) {
868 0 : return DDS::RETCODE_OK;
869 : }
870 :
871 0 : RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
872 0 : if (!participant || !participant->is_enabled()) {
873 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
874 : }
875 :
876 0 : dp_id_ = participant->get_id();
877 :
878 0 : if (this->monitor_) {
879 0 : this->monitor_->report();
880 : }
881 :
882 0 : this->set_enabled();
883 :
884 0 : if (qos_.entity_factory.autoenable_created_entities) {
885 0 : DataReaderSet readers;
886 : {
887 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
888 0 : readers_not_enabled_.swap(readers);
889 0 : }
890 0 : for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) {
891 0 : (*it)->enable();
892 : }
893 0 : }
894 :
895 0 : return DDS::RETCODE_OK;
896 0 : }
897 :
898 0 : bool SubscriberImpl::is_clean(String* leftover_entities) const
899 : {
900 0 : if (leftover_entities) {
901 0 : leftover_entities->clear();
902 : }
903 :
904 0 : size_t reader_count = datareader_map_.size();
905 0 : if (reader_count && !TheTransientKludge->is_enabled()) {
906 : // BIT datareaders.
907 0 : reader_count = reader_count == NUMBER_OF_BUILT_IN_TOPICS ? 0 : reader_count;
908 : }
909 0 : if (leftover_entities && reader_count) {
910 0 : *leftover_entities += to_dds_string(reader_count) + " reader(s)";
911 : }
912 :
913 0 : return reader_count == 0;
914 : }
915 :
916 : void
917 0 : SubscriberImpl::data_received(DataReaderImpl* reader)
918 : {
919 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
920 : guard,
921 : this->dr_set_lock_);
922 0 : datareader_set_.insert(rchandle_from(reader));
923 0 : }
924 :
925 : DDS::ReturnCode_t
926 0 : SubscriberImpl::reader_enabled(const char* topic_name,
927 : DataReaderImpl* reader_ptr)
928 : {
929 0 : if (DCPS_debug_level >= 4) {
930 0 : ACE_DEBUG((LM_DEBUG,
931 : ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
932 : ACE_TEXT("datareader(topic_name=%C) enabled\n"),
933 : topic_name));
934 : }
935 :
936 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
937 0 : DataReaderImpl_rch reader = rchandle_from(reader_ptr);
938 0 : readers_not_enabled_.erase(reader);
939 :
940 0 : this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
941 :
942 0 : if (this->monitor_) {
943 0 : this->monitor_->report();
944 : }
945 :
946 0 : return DDS::RETCODE_OK;
947 0 : }
948 :
949 : #ifndef OPENDDS_NO_MULTI_TOPIC
950 : DDS::ReturnCode_t
951 0 : SubscriberImpl::multitopic_reader_enabled(DDS::DataReader_ptr reader)
952 : {
953 0 : DDS::TopicDescription_var td = reader->get_topicdescription();
954 0 : CORBA::String_var topic = td->get_name();
955 0 : multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
956 0 : return DDS::RETCODE_OK;
957 0 : }
958 :
959 : void
960 0 : SubscriberImpl::remove_from_datareader_set(DataReaderImpl* reader)
961 : {
962 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, dr_set_lock_);
963 0 : datareader_set_.erase(rchandle_from(reader));
964 0 : }
965 : #endif
966 :
967 : DDS::SubscriberListener_ptr
968 0 : SubscriberImpl::listener_for(::DDS::StatusKind kind)
969 : {
970 : // per 2.1.4.3.1 Listener Access to Plain Communication Status
971 : // use this entities factory if listener is mask not enabled
972 : // for this kind.
973 0 : RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
974 0 : if (! participant)
975 0 : return 0;
976 :
977 0 : ACE_Guard<ACE_Thread_Mutex> g(listener_mutex_);
978 0 : if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
979 0 : g.release();
980 0 : return participant->listener_for(kind);
981 :
982 : } else {
983 0 : return DDS::SubscriberListener::_duplicate(listener_.in());
984 : }
985 0 : }
986 :
987 : unsigned int&
988 0 : SubscriberImpl::raw_latency_buffer_size()
989 : {
990 0 : return this->raw_latency_buffer_size_;
991 : }
992 :
993 : DataCollector<double>::OnFull&
994 0 : SubscriberImpl::raw_latency_buffer_type()
995 : {
996 0 : return this->raw_latency_buffer_type_;
997 : }
998 :
999 : void
1000 0 : SubscriberImpl::get_subscription_ids(SubscriptionIdVec& subs)
1001 : {
1002 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1003 : guard,
1004 : this->si_lock_,
1005 : );
1006 :
1007 0 : subs.reserve(datareader_map_.size());
1008 0 : for (DataReaderMap::iterator iter = datareader_map_.begin();
1009 0 : iter != datareader_map_.end();
1010 0 : ++iter) {
1011 0 : subs.push_back(iter->second->get_guid());
1012 : }
1013 0 : }
1014 :
1015 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1016 : void
1017 0 : SubscriberImpl::update_ownership_strength (const GUID_t& pub_id,
1018 : const CORBA::Long& ownership_strength)
1019 : {
1020 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
1021 : guard,
1022 : this->si_lock_,
1023 : );
1024 :
1025 0 : for (DataReaderMap::iterator iter = datareader_map_.begin();
1026 0 : iter != datareader_map_.end();
1027 0 : ++iter) {
1028 0 : if (!iter->second->is_bit()) {
1029 0 : iter->second->update_ownership_strength(pub_id, ownership_strength);
1030 : }
1031 : }
1032 0 : }
1033 : #endif
1034 :
1035 :
1036 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1037 : void
1038 0 : SubscriberImpl::coherent_change_received (const GUID_t& publisher_id,
1039 : DataReaderImpl* reader,
1040 : Coherent_State& group_state)
1041 : {
1042 0 : DataReaderSet localdrs;
1043 : {
1044 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex,
1045 : guard,
1046 : this->dr_set_lock_);
1047 0 : localdrs = datareader_set_;
1048 0 : }
1049 : // Verify if all readers complete the coherent changes. The result
1050 : // is either COMPLETED or REJECTED.
1051 0 : group_state = COMPLETED;
1052 0 : for (DataReaderSet::const_iterator iter = localdrs.begin();
1053 0 : iter != localdrs.end(); ++iter) {
1054 :
1055 0 : Coherent_State state = COMPLETED;
1056 0 : (*iter)->coherent_change_received (publisher_id, state);
1057 0 : if (state == NOT_COMPLETED_YET) {
1058 0 : group_state = NOT_COMPLETED_YET;
1059 0 : return;
1060 : }
1061 0 : else if (state == REJECTED) {
1062 0 : group_state = REJECTED;
1063 : }
1064 : }
1065 :
1066 0 : GUID_t writerId = GUID_UNKNOWN;
1067 0 : for (DataReaderSet::const_iterator iter = localdrs.begin();
1068 0 : iter != localdrs.end(); ++iter) {
1069 0 : if (group_state == COMPLETED) {
1070 0 : (*iter)->accept_coherent (writerId, publisher_id);
1071 : }
1072 : else { //REJECTED
1073 0 : (*iter)->reject_coherent (writerId, publisher_id);
1074 : }
1075 : }
1076 :
1077 0 : if (group_state == COMPLETED) {
1078 0 : for (DataReaderSet::const_iterator iter = localdrs.begin();
1079 0 : iter != localdrs.end(); ++iter) {
1080 0 : (*iter)->coherent_changes_completed (reader);
1081 0 : (*iter)->reset_coherent_info (writerId, publisher_id);
1082 : }
1083 : }
1084 0 : }
1085 : #endif
1086 :
1087 : RcHandle<EntityImpl>
1088 0 : SubscriberImpl::parent() const
1089 : {
1090 0 : return this->participant_.lock();
1091 : }
1092 :
1093 : bool
1094 0 : SubscriberImpl::validate_datareader_qos(const DDS::DataReaderQos & qos,
1095 : const DDS::DataReaderQos & default_qos,
1096 : DDS::Topic_ptr a_topic,
1097 : DDS::DataReaderQos & dr_qos,
1098 : bool mt)
1099 : {
1100 :
1101 :
1102 0 : if (qos == DATAREADER_QOS_DEFAULT) {
1103 0 : dr_qos = default_qos;
1104 :
1105 0 : } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
1106 :
1107 : #ifndef OPENDDS_NO_MULTI_TOPIC
1108 0 : if (mt) {
1109 0 : if (DCPS_debug_level > 0) {
1110 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
1111 : ACE_TEXT("SubscriberImpl::create_datareader, ")
1112 : ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
1113 : ACE_TEXT("to create a MultiTopic DataReader.\n")));
1114 : }
1115 0 : return false;
1116 : }
1117 : #else
1118 : ACE_UNUSED_ARG(mt);
1119 : #endif
1120 0 : DDS::TopicQos topic_qos;
1121 0 : a_topic->get_qos(topic_qos);
1122 :
1123 0 : dr_qos = default_qos;
1124 :
1125 0 : Qos_Helper::copy_from_topic_qos(dr_qos,
1126 : topic_qos);
1127 :
1128 0 : } else {
1129 0 : dr_qos = qos;
1130 : }
1131 :
1132 : OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false);
1133 : OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false);
1134 : OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false);
1135 :
1136 0 : if (!Qos_Helper::valid(dr_qos)) {
1137 0 : if (DCPS_debug_level > 0) {
1138 0 : ACE_ERROR((LM_ERROR,
1139 : ACE_TEXT("(%P|%t) ERROR: ")
1140 : ACE_TEXT("SubscriberImpl::create_datareader, ")
1141 : ACE_TEXT("invalid qos.\n")));
1142 : }
1143 0 : return false;
1144 : }
1145 :
1146 0 : if (!Qos_Helper::consistent(dr_qos)) {
1147 0 : if (DCPS_debug_level > 0) {
1148 0 : ACE_ERROR((LM_ERROR,
1149 : ACE_TEXT("(%P|%t) ERROR: ")
1150 : ACE_TEXT("SubscriberImpl::create_datareader, ")
1151 : ACE_TEXT("inconsistent qos.\n")));
1152 : }
1153 0 : return false;
1154 : }
1155 :
1156 0 : return true;
1157 : }
1158 :
1159 :
1160 : } // namespace DCPS
1161 : } // namespace OpenDDS
1162 :
1163 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|