00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "debug.h"
00010 #include "SubscriberImpl.h"
00011 #include "FeatureDisabledQosCheck.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Qos_Helper.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "MonitorFactory.h"
00017 #include "DataReaderImpl.h"
00018 #include "Service_Participant.h"
00019 #include "dds/DdsDcpsTypeSupportExtC.h"
00020 #include "TopicDescriptionImpl.h"
00021 #include "Marked_Default_Qos.h"
00022 #include "Transient_Kludge.h"
00023 #include "ContentFilteredTopicImpl.h"
00024 #include "MultiTopicImpl.h"
00025 #include "GroupRakeData.h"
00026 #include "MultiTopicDataReaderBase.h"
00027 #include "Util.h"
00028 #include "dds/DCPS/transport/framework/TransportImpl.h"
00029 #include "dds/DCPS/transport/framework/DataLinkSet.h"
00030
00031 #include "tao/debug.h"
00032
00033 #include "ace/Auto_Ptr.h"
00034 #include "ace/Vector_T.h"
00035
00036 #include <stdexcept>
00037
00038 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00039
00040 namespace OpenDDS {
00041 namespace DCPS {
00042
00043 SubscriberImpl::SubscriberImpl(DDS::InstanceHandle_t handle,
00044 const DDS::SubscriberQos & qos,
00045 DDS::SubscriberListener_ptr a_listener,
00046 const DDS::StatusMask& mask,
00047 DomainParticipantImpl* participant)
00048 : handle_(handle),
00049 qos_(qos),
00050 default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
00051 listener_mask_(mask),
00052 participant_(*participant),
00053 domain_id_(participant->get_domain_id()),
00054 raw_latency_buffer_size_(0),
00055 raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00056 monitor_(0),
00057 access_depth_ (0)
00058 {
00059
00060 listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00061
00062 monitor_ = TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this);
00063 }
00064
00065 SubscriberImpl::~SubscriberImpl()
00066 {
00067
00068
00069 if (!is_clean()) {
00070 ACE_ERROR((LM_ERROR,
00071 ACE_TEXT("(%P|%t) ERROR: ")
00072 ACE_TEXT("SubscriberImpl::~SubscriberImpl, ")
00073 ACE_TEXT("%B datareaders still exist.\n"),
00074 datareader_map_.size ()));
00075 }
00076 }
00077
00078 DDS::InstanceHandle_t
00079 SubscriberImpl::get_instance_handle()
00080 {
00081 return handle_;
00082 }
00083
00084 bool
00085 SubscriberImpl::contains_reader(DDS::InstanceHandle_t a_handle)
00086 {
00087 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00088 guard,
00089 this->si_lock_,
00090 false);
00091
00092 for (DataReaderSet::iterator it(datareader_set_.begin());
00093 it != datareader_set_.end(); ++it) {
00094 if (a_handle == (*it)->get_instance_handle())
00095 return true;
00096 }
00097
00098 return false;
00099 }
00100
00101 DDS::DataReader_ptr
00102 SubscriberImpl::create_datareader(
00103 DDS::TopicDescription_ptr a_topic_desc,
00104 const DDS::DataReaderQos & qos,
00105 DDS::DataReaderListener_ptr a_listener,
00106 DDS::StatusMask mask)
00107 {
00108 if (CORBA::is_nil(a_topic_desc)) {
00109 ACE_ERROR((LM_ERROR,
00110 ACE_TEXT("(%P|%t) ERROR: ")
00111 ACE_TEXT("SubscriberImpl::create_datareader, ")
00112 ACE_TEXT("topic desc is nil.\n")));
00113 return DDS::DataReader::_nil();
00114 }
00115
00116 DDS::DataReaderQos dr_qos;
00117 RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00118 if (!participant)
00119 return DDS::DataReader::_nil();
00120
00121 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
00122
00123 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00124 ContentFilteredTopicImpl* cft = 0;
00125 #endif
00126 #ifndef OPENDDS_NO_MULTI_TOPIC
00127 MultiTopicImpl* mt = 0;
00128 #else
00129 bool mt = false;
00130 #endif
00131
00132 if (!topic_servant) {
00133 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00134 cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
00135 if (cft) {
00136 DDS::Topic_var related;
00137 related = cft->get_related_topic();
00138 topic_servant = dynamic_cast<TopicImpl*>(related.in());
00139 }
00140 else
00141 #endif
00142 {
00143 #ifndef OPENDDS_NO_MULTI_TOPIC
00144 mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
00145 #endif
00146 }
00147 }
00148
00149 if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
00150 return DDS::DataReader::_nil();
00151
00152 #ifndef OPENDDS_NO_MULTI_TOPIC
00153 if (mt) {
00154 try {
00155 DDS::DataReader_var dr =
00156 mt->get_type_support()->create_multitopic_datareader();
00157 MultiTopicDataReaderBase* mtdr =
00158 dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
00159 mtdr->init(dr_qos, a_listener, mask, this, mt);
00160 if (enabled_.value() && qos_.entity_factory.autoenable_created_entities) {
00161 if (dr->enable() != DDS::RETCODE_OK) {
00162 ACE_ERROR((LM_ERROR,
00163 ACE_TEXT("(%P|%t) ERROR: ")
00164 ACE_TEXT("SubscriberImpl::create_datareader, ")
00165 ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
00166 return DDS::DataReader::_nil();
00167 }
00168 multitopic_reader_enabled(dr);
00169 }
00170 return dr._retn();
00171 } catch (const std::exception& e) {
00172 ACE_ERROR((LM_ERROR,
00173 ACE_TEXT("(%P|%t) ERROR: ")
00174 ACE_TEXT("SubscriberImpl::create_datareader, ")
00175 ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
00176 e.what()));
00177 }
00178 return DDS::DataReader::_nil();
00179 }
00180 #endif
00181
00182 OpenDDS::DCPS::TypeSupport_ptr typesupport =
00183 topic_servant->get_type_support();
00184
00185 if (0 == typesupport) {
00186 CORBA::String_var name = a_topic_desc->get_name();
00187 ACE_ERROR((LM_ERROR,
00188 ACE_TEXT("(%P|%t) ERROR: ")
00189 ACE_TEXT("SubscriberImpl::create_datareader, ")
00190 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00191 name.in()));
00192 return DDS::DataReader::_nil();
00193 }
00194
00195 DDS::DataReader_var dr_obj = typesupport->create_datareader();
00196
00197 DataReaderImpl* dr_servant =
00198 dynamic_cast<DataReaderImpl*>(dr_obj.in());
00199
00200 if (dr_servant == 0) {
00201 ACE_ERROR((LM_ERROR,
00202 ACE_TEXT("(%P|%t) ERROR: ")
00203 ACE_TEXT("SubscriberImpl::create_datareader, ")
00204 ACE_TEXT("servant is nil.\n")));
00205 return DDS::DataReader::_nil();
00206 }
00207
00208 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00209 if (cft) {
00210 dr_servant->enable_filtering(cft);
00211 }
00212 #endif
00213
00214
00215
00216
00217 dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_;
00218 dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_;
00219
00220
00221 dr_servant->init(topic_servant,
00222 dr_qos,
00223 a_listener,
00224 mask,
00225 participant.in(),
00226 this);
00227
00228 if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00229 const DDS::ReturnCode_t ret = dr_servant->enable();
00230
00231 if (ret != DDS::RETCODE_OK) {
00232 ACE_ERROR((LM_WARNING,
00233 ACE_TEXT("(%P|%t) WARNING: ")
00234 ACE_TEXT("SubscriberImpl::create_datareader, ")
00235 ACE_TEXT("enable failed.\n")));
00236 return DDS::DataReader::_nil();
00237 }
00238 } else {
00239 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, 0);
00240 readers_not_enabled_.insert(rchandle_from(dr_servant));
00241 }
00242
00243
00244
00245 return DDS::DataReader::_duplicate(dr_obj.in());
00246 }
00247
00248 DDS::ReturnCode_t
00249 SubscriberImpl::delete_datareader(::DDS::DataReader_ptr a_datareader)
00250 {
00251 DBG_ENTRY_LVL("SubscriberImpl", "delete_datareader", 6);
00252
00253 DataReaderImpl_rch dr_servant = rchandle_from(dynamic_cast<DataReaderImpl*>(a_datareader));
00254
00255 if (dr_servant) {
00256 DDS::Subscriber_var dr_subscriber(dr_servant->get_subscriber());
00257
00258 if (dr_subscriber.in() != this) {
00259 RepoId id = dr_servant->get_subscription_id();
00260 GuidConverter converter(id);
00261 ACE_ERROR((LM_ERROR,
00262 ACE_TEXT("(%P|%t) SubscriberImpl::delete_datareader: ")
00263 ACE_TEXT("data reader %C doesn't belong to this subscriber.\n"),
00264 OPENDDS_STRING(converter).c_str()));
00265 return DDS::RETCODE_PRECONDITION_NOT_MET;
00266 }
00267
00268 if (dr_servant->has_zero_copies()) {
00269 return DDS::RETCODE_PRECONDITION_NOT_MET;
00270 }
00271 }
00272 if (dr_servant) {
00273
00274 dr_servant->prepare_to_delete();
00275 }
00276
00277 {
00278 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00279 guard,
00280 this->si_lock_,
00281 DDS::RETCODE_ERROR);
00282
00283 DataReaderMap::iterator it;
00284
00285 for (it = datareader_map_.begin();
00286 it != datareader_map_.end();
00287 ++it) {
00288 if (it->second == dr_servant) {
00289 break;
00290 }
00291 }
00292
00293 if (it == datareader_map_.end()) {
00294 DDS::TopicDescription_var td = a_datareader->get_topicdescription();
00295 CORBA::String_var topic_name = td->get_name();
00296 #ifndef OPENDDS_NO_MULTI_TOPIC
00297 OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00298 multitopic_reader_map_.find(topic_name.in());
00299 if (mt_iter != multitopic_reader_map_.end()) {
00300 DDS::DataReader_ptr ptr = mt_iter->second;
00301 MultiTopicDataReaderBase* mtdrb = dynamic_cast<MultiTopicDataReaderBase*>(ptr);
00302 if (!mtdrb) {
00303 ACE_ERROR_RETURN((LM_ERROR,
00304 ACE_TEXT("(%P|%t) ERROR: ")
00305 ACE_TEXT("SubscriberImpl::delete_datareader: ")
00306 ACE_TEXT("datareader(topic_name=%C)")
00307 ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n"),
00308 topic_name.in()), ::DDS::RETCODE_ERROR);
00309 }
00310 mtdrb->cleanup();
00311 multitopic_reader_map_.erase(mt_iter);
00312 return DDS::RETCODE_OK;
00313 }
00314 #endif
00315 if (!dr_servant) {
00316 ACE_ERROR_RETURN((LM_ERROR,
00317 ACE_TEXT("(%P|%t) ERROR: ")
00318 ACE_TEXT("SubscriberImpl::delete_datareader: ")
00319 ACE_TEXT("datareader(topic_name=%C)")
00320 ACE_TEXT("for unknown repo id not found.\n"),
00321 topic_name.in()), ::DDS::RETCODE_ERROR);
00322 }
00323 RepoId id = dr_servant->get_subscription_id();
00324 GuidConverter converter(id);
00325 ACE_ERROR_RETURN((LM_ERROR,
00326 ACE_TEXT("(%P|%t) ERROR: ")
00327 ACE_TEXT("SubscriberImpl::delete_datareader: ")
00328 ACE_TEXT("datareader(topic_name=%C) %C not found.\n"),
00329 topic_name.in(),
00330 OPENDDS_STRING(converter).c_str()),
00331 ::DDS::RETCODE_ERROR);
00332 }
00333
00334 datareader_map_.erase(it);
00335 datareader_set_.erase(dr_servant);
00336 }
00337
00338 if (this->monitor_) {
00339 this->monitor_->report();
00340 }
00341
00342 if (!dr_servant) {
00343 ACE_ERROR_RETURN((LM_ERROR,
00344 ACE_TEXT("(%P|%t) ERROR: ")
00345 ACE_TEXT("SubscriberImpl::delete_datareader: ")
00346 ACE_TEXT("could not remove unknown subscription.\n")),
00347 ::DDS::RETCODE_ERROR);
00348 }
00349
00350 RepoId subscription_id = dr_servant->get_subscription_id();
00351 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00352 if (!disco->remove_subscription(this->domain_id_,
00353 this->dp_id_,
00354 subscription_id)) {
00355 ACE_ERROR_RETURN((LM_ERROR,
00356 ACE_TEXT("(%P|%t) ERROR: ")
00357 ACE_TEXT("SubscriberImpl::delete_datareader: ")
00358 ACE_TEXT(" could not remove subscription from discovery.\n")),
00359 ::DDS::RETCODE_ERROR);
00360 }
00361
00362
00363
00364 dr_servant->remove_all_associations();
00365 dr_servant->cleanup();
00366 return DDS::RETCODE_OK;
00367 }
00368
00369 DDS::ReturnCode_t
00370 SubscriberImpl::delete_contained_entities()
00371 {
00372
00373 set_deleted(true);
00374
00375 ACE_Vector<DDS::DataReader_ptr> drs;
00376
00377 #ifndef OPENDDS_NO_MULTI_TOPIC
00378 {
00379 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00380 guard,
00381 this->si_lock_,
00382 DDS::RETCODE_ERROR);
00383 for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00384 multitopic_reader_map_.begin();
00385 mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
00386 drs.push_back(mt_iter->second);
00387 }
00388 }
00389
00390 for (size_t i = 0; i < drs.size(); ++i) {
00391 const DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00392 if (ret != DDS::RETCODE_OK) {
00393 ACE_ERROR_RETURN((LM_ERROR,
00394 ACE_TEXT("(%P|%t) ERROR: ")
00395 ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00396 ACE_TEXT("failed to delete datareader\n")),
00397 ret);
00398 }
00399 }
00400 drs.clear();
00401 #endif
00402
00403 {
00404 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00405 guard,
00406 this->si_lock_,
00407 DDS::RETCODE_ERROR);
00408 DataReaderMap::iterator it;
00409 DataReaderMap::iterator itEnd = datareader_map_.end();
00410
00411 for (it = datareader_map_.begin(); it != itEnd; ++it) {
00412 drs.push_back(it->second.in());
00413 }
00414 }
00415
00416 for (size_t i = 0; i < drs.size(); ++i) {
00417 DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00418
00419 if (ret != DDS::RETCODE_OK) {
00420 ACE_ERROR_RETURN((LM_ERROR,
00421 ACE_TEXT("(%P|%t) ERROR: ")
00422 ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00423 ACE_TEXT("failed to delete datareader\n")),
00424 ret);
00425 }
00426 }
00427
00428
00429 set_deleted(false);
00430
00431 return DDS::RETCODE_OK;
00432 }
00433
00434 DDS::DataReader_ptr
00435 SubscriberImpl::lookup_datareader(
00436 const char * topic_name)
00437 {
00438 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00439 guard,
00440 this->si_lock_,
00441 DDS::DataReader::_nil());
00442
00443
00444
00445 DataReaderMap::iterator it = datareader_map_.find(topic_name);
00446
00447 if (it == datareader_map_.end()) {
00448 #ifndef OPENDDS_NO_MULTI_TOPIC
00449 OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00450 multitopic_reader_map_.find(topic_name);
00451 if (mt_iter != multitopic_reader_map_.end()) {
00452 return DDS::DataReader::_duplicate(mt_iter->second);
00453 }
00454 #endif
00455
00456 if (DCPS_debug_level >= 2) {
00457 ACE_DEBUG((LM_DEBUG,
00458 ACE_TEXT("(%P|%t) ")
00459 ACE_TEXT("SubscriberImpl::lookup_datareader, ")
00460 ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
00461 topic_name));
00462 }
00463
00464 return DDS::DataReader::_nil();
00465
00466 } else {
00467 return DDS::DataReader::_duplicate(it->second.in());
00468 }
00469 }
00470
00471 DDS::ReturnCode_t
00472 SubscriberImpl::get_datareaders(
00473 DDS::DataReaderSeq & readers,
00474 DDS::SampleStateMask sample_states,
00475 DDS::ViewStateMask view_states,
00476 DDS::InstanceStateMask instance_states)
00477 {
00478 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00479 guard,
00480 this->si_lock_,
00481 DDS::RETCODE_ERROR);
00482
00483 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00484
00485
00486
00487 if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) {
00488 if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
00489 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00490 }
00491
00492 if (this->qos_.presentation.ordered_access) {
00493
00494 GroupRakeData data;
00495 for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00496 pos != datareader_set_.end(); ++pos) {
00497 (*pos)->get_ordered_data (data, sample_states, view_states, instance_states);
00498 }
00499
00500
00501
00502 data.get_datareaders (readers);
00503
00504 return DDS::RETCODE_OK;
00505 }
00506 }
00507 #endif
00508
00509
00510 readers.length(0);
00511
00512 for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00513 pos != datareader_set_.end(); ++pos) {
00514 if ((*pos)->have_sample_states(sample_states) &&
00515 (*pos)->have_view_states(view_states) &&
00516 (*pos)->have_instance_states(instance_states)) {
00517 push_back(readers, DDS::DataReader::_duplicate(pos->in()));
00518 }
00519 }
00520
00521 return DDS::RETCODE_OK;
00522 }
00523
00524 DDS::ReturnCode_t
00525 SubscriberImpl::notify_datareaders()
00526 {
00527 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00528 guard,
00529 this->si_lock_,
00530 DDS::RETCODE_ERROR);
00531
00532 DataReaderMap::iterator it;
00533
00534 for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
00535 if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00536 DDS::DataReaderListener_var listener = it->second->get_listener();
00537 if (!CORBA::is_nil (listener)) {
00538 listener->on_data_available(it->second.in());
00539 }
00540
00541 it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00542 }
00543 }
00544
00545 #ifndef OPENDDS_NO_MULTI_TOPIC
00546 for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator it =
00547 multitopic_reader_map_.begin(); it != multitopic_reader_map_.end();
00548 ++it) {
00549 MultiTopicDataReaderBase* dri =
00550 dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
00551
00552 if (!dri) {
00553 ACE_ERROR_RETURN((LM_ERROR,
00554 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ")
00555 ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n")),
00556 ::DDS::RETCODE_ERROR);
00557 }
00558
00559 if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00560 DDS::DataReaderListener_var listener = dri->get_listener();
00561 if (!CORBA::is_nil(listener)) {
00562 listener->on_data_available(dri);
00563 }
00564 dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00565 }
00566 }
00567 #endif
00568
00569 return DDS::RETCODE_OK;
00570 }
00571
00572 DDS::ReturnCode_t
00573 SubscriberImpl::set_qos(
00574 const DDS::SubscriberQos & qos)
00575 {
00576
00577 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00578
00579 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00580 if (qos_ == qos)
00581 return DDS::RETCODE_OK;
00582
00583
00584 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00585 return DDS::RETCODE_IMMUTABLE_POLICY;
00586
00587 } else {
00588 qos_ = qos;
00589
00590 DrIdToQosMap idToQosMap;
00591 {
00592 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00593 guard,
00594 this->si_lock_,
00595 DDS::RETCODE_ERROR);
00596
00597 DataReaderMap::iterator endIter = datareader_map_.end();
00598
00599 for (DataReaderMap::iterator iter = datareader_map_.begin();
00600 iter != endIter; ++iter) {
00601 DataReaderImpl_rch reader = iter->second;
00602 reader->set_subscriber_qos (qos);
00603 DDS::DataReaderQos qos;
00604 reader->get_qos(qos);
00605 RepoId id = reader->get_subscription_id();
00606 std::pair<DrIdToQosMap::iterator, bool> pair
00607 = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
00608
00609 if (pair.second == false) {
00610 GuidConverter converter(id);
00611 ACE_ERROR_RETURN((LM_ERROR,
00612 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
00613 ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
00614 OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR);
00615 }
00616 }
00617 }
00618
00619 DrIdToQosMap::iterator iter = idToQosMap.begin();
00620
00621 while (iter != idToQosMap.end()) {
00622 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00623 const bool status
00624 = disco->update_subscription_qos(this->domain_id_,
00625 this->dp_id_,
00626 iter->first,
00627 iter->second,
00628 this->qos_);
00629
00630 if (!status) {
00631 ACE_ERROR_RETURN((LM_ERROR,
00632 ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
00633 ACE_TEXT("failed. \n")),
00634 DDS::RETCODE_ERROR);
00635 }
00636
00637 ++iter;
00638 }
00639 }
00640
00641 return DDS::RETCODE_OK;
00642
00643 } else {
00644 return DDS::RETCODE_INCONSISTENT_POLICY;
00645 }
00646 }
00647
00648 DDS::ReturnCode_t
00649 SubscriberImpl::get_qos(
00650 DDS::SubscriberQos & qos)
00651 {
00652 qos = qos_;
00653 return DDS::RETCODE_OK;
00654 }
00655
00656 DDS::ReturnCode_t
00657 SubscriberImpl::set_listener(
00658 DDS::SubscriberListener_ptr a_listener,
00659 DDS::StatusMask mask)
00660 {
00661 listener_mask_ = mask;
00662
00663 listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00664 return DDS::RETCODE_OK;
00665 }
00666
00667 DDS::SubscriberListener_ptr
00668 SubscriberImpl::get_listener()
00669 {
00670 return DDS::SubscriberListener::_duplicate(listener_.in());
00671 }
00672
00673 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00674
00675 DDS::ReturnCode_t
00676 SubscriberImpl::begin_access()
00677 {
00678 if (enabled_ == false) {
00679 ACE_ERROR_RETURN((LM_ERROR,
00680 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
00681 ACE_TEXT(" Subscriber is not enabled!\n")),
00682 DDS::RETCODE_NOT_ENABLED);
00683 }
00684
00685 if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00686 return DDS::RETCODE_OK;
00687 }
00688
00689 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00690 guard,
00691 this->si_lock_,
00692 DDS::RETCODE_ERROR);
00693
00694 ++this->access_depth_;
00695
00696
00697
00698 if (this->access_depth_ == 1) {
00699 for (DataReaderSet::iterator it = this->datareader_set_.begin();
00700 it != this->datareader_set_.end(); ++it) {
00701 (*it)->begin_access();
00702 }
00703 }
00704
00705 return DDS::RETCODE_OK;
00706 }
00707
00708 DDS::ReturnCode_t
00709 SubscriberImpl::end_access()
00710 {
00711 if (enabled_ == false) {
00712 ACE_ERROR_RETURN((LM_ERROR,
00713 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00714 ACE_TEXT(" Publisher is not enabled!\n")),
00715 DDS::RETCODE_NOT_ENABLED);
00716 }
00717
00718 if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00719 return DDS::RETCODE_OK;
00720 }
00721
00722 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00723 guard,
00724 this->si_lock_,
00725 DDS::RETCODE_ERROR);
00726
00727 if (this->access_depth_ == 0) {
00728 ACE_ERROR_RETURN((LM_ERROR,
00729 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00730 ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00731 DDS::RETCODE_PRECONDITION_NOT_MET);
00732 }
00733
00734 --this->access_depth_;
00735
00736
00737
00738 if (this->access_depth_ == 0) {
00739 for (DataReaderSet::iterator it = this->datareader_set_.begin();
00740 it != this->datareader_set_.end(); ++it) {
00741 (*it)->end_access();
00742 }
00743 }
00744
00745 return DDS::RETCODE_OK;
00746 }
00747
00748 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00749
00750 DDS::DomainParticipant_ptr
00751 SubscriberImpl::get_participant()
00752 {
00753 return participant_.lock()._retn();
00754 }
00755
00756 DDS::ReturnCode_t
00757 SubscriberImpl::set_default_datareader_qos(
00758 const DDS::DataReaderQos & qos)
00759 {
00760 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00761 default_datareader_qos_ = qos;
00762 return DDS::RETCODE_OK;
00763
00764 } else {
00765 return DDS::RETCODE_INCONSISTENT_POLICY;
00766 }
00767 }
00768
00769 DDS::ReturnCode_t
00770 SubscriberImpl::get_default_datareader_qos(
00771 DDS::DataReaderQos & qos)
00772 {
00773 qos = default_datareader_qos_;
00774 return DDS::RETCODE_OK;
00775 }
00776
00777 DDS::ReturnCode_t
00778 SubscriberImpl::copy_from_topic_qos(
00779 DDS::DataReaderQos & a_datareader_qos,
00780 const DDS::TopicQos & a_topic_qos)
00781 {
00782 if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
00783 return DDS::RETCODE_OK;
00784
00785 } else {
00786 return DDS::RETCODE_INCONSISTENT_POLICY;
00787 }
00788 }
00789
00790 DDS::ReturnCode_t
00791 SubscriberImpl::enable()
00792 {
00793
00794
00795
00796
00797
00798
00799 if (this->is_enabled()) {
00800 return DDS::RETCODE_OK;
00801 }
00802
00803 RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00804 if (!participant || participant->is_enabled() == false) {
00805 return DDS::RETCODE_PRECONDITION_NOT_MET;
00806 }
00807
00808 dp_id_ = participant->get_id();
00809
00810 if (this->monitor_) {
00811 this->monitor_->report();
00812 }
00813
00814 this->set_enabled();
00815
00816 if (qos_.entity_factory.autoenable_created_entities) {
00817 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
00818 DataReaderSet readers;
00819 readers_not_enabled_.swap(readers);
00820 for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) {
00821 (*it)->enable();
00822 }
00823 }
00824
00825 return DDS::RETCODE_OK;
00826 }
00827
00828 bool
00829 SubscriberImpl::is_clean() const
00830 {
00831 const bool sub_is_clean = datareader_map_.empty();
00832
00833 if (!sub_is_clean && !TheTransientKludge->is_enabled()) {
00834
00835 return datareader_map_.size() == 4;
00836 }
00837
00838 return sub_is_clean;
00839 }
00840
00841 void
00842 SubscriberImpl::data_received(DataReaderImpl* reader)
00843 {
00844 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00845 guard,
00846 this->si_lock_);
00847 datareader_set_.insert(rchandle_from(reader));
00848 }
00849
00850 DDS::ReturnCode_t
00851 SubscriberImpl::reader_enabled(const char* topic_name,
00852 DataReaderImpl* reader_ptr)
00853 {
00854 if (DCPS_debug_level >= 4) {
00855 ACE_DEBUG((LM_DEBUG,
00856 ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
00857 ACE_TEXT("datareader(topic_name=%C) enabled\n"),
00858 topic_name));
00859 }
00860
00861 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
00862 DataReaderImpl_rch reader = rchandle_from(reader_ptr);
00863 readers_not_enabled_.erase(reader);
00864
00865 this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
00866
00867 if (this->monitor_) {
00868 this->monitor_->report();
00869 }
00870
00871 return DDS::RETCODE_OK;
00872 }
00873
00874 #ifndef OPENDDS_NO_MULTI_TOPIC
00875 DDS::ReturnCode_t
00876 SubscriberImpl::multitopic_reader_enabled(DDS::DataReader_ptr reader)
00877 {
00878 DDS::TopicDescription_var td = reader->get_topicdescription();
00879 CORBA::String_var topic = td->get_name();
00880 multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
00881 return DDS::RETCODE_OK;
00882 }
00883
00884 void
00885 SubscriberImpl::remove_from_datareader_set(DataReaderImpl* reader)
00886 {
00887 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, si_lock_);
00888 datareader_set_.erase(rchandle_from(reader));
00889 }
00890 #endif
00891
00892 DDS::SubscriberListener_ptr
00893 SubscriberImpl::listener_for(::DDS::StatusKind kind)
00894 {
00895
00896
00897
00898 RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00899 if (! participant)
00900 return 0;
00901
00902 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
00903 return participant->listener_for(kind);
00904
00905 } else {
00906 return DDS::SubscriberListener::_duplicate(listener_.in());
00907 }
00908 }
00909
00910 unsigned int&
00911 SubscriberImpl::raw_latency_buffer_size()
00912 {
00913 return this->raw_latency_buffer_size_;
00914 }
00915
00916 DataCollector<double>::OnFull&
00917 SubscriberImpl::raw_latency_buffer_type()
00918 {
00919 return this->raw_latency_buffer_type_;
00920 }
00921
00922 void
00923 SubscriberImpl::get_subscription_ids(SubscriptionIdVec& subs)
00924 {
00925 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00926 guard,
00927 this->si_lock_,
00928 );
00929
00930 subs.reserve(datareader_map_.size());
00931 for (DataReaderMap::iterator iter = datareader_map_.begin();
00932 iter != datareader_map_.end();
00933 ++iter) {
00934 subs.push_back(iter->second->get_subscription_id());
00935 }
00936 }
00937
00938 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00939 void
00940 SubscriberImpl::update_ownership_strength (const PublicationId& pub_id,
00941 const CORBA::Long& ownership_strength)
00942 {
00943 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00944 guard,
00945 this->si_lock_,
00946 );
00947
00948 for (DataReaderMap::iterator iter = datareader_map_.begin();
00949 iter != datareader_map_.end();
00950 ++iter) {
00951 if (!iter->second->is_bit()) {
00952 iter->second->update_ownership_strength(pub_id, ownership_strength);
00953 }
00954 }
00955 }
00956 #endif
00957
00958
00959 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00960 void
00961 SubscriberImpl::coherent_change_received (RepoId& publisher_id,
00962 DataReaderImpl* reader,
00963 Coherent_State& group_state)
00964 {
00965 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00966 guard,
00967 this->si_lock_);
00968
00969
00970
00971 group_state = COMPLETED;
00972 DataReaderSet::const_iterator endIter = datareader_set_.end();
00973 for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00974 iter != endIter; ++iter) {
00975
00976 Coherent_State state = COMPLETED;
00977 (*iter)->coherent_change_received (publisher_id, state);
00978 if (state == NOT_COMPLETED_YET) {
00979 group_state = NOT_COMPLETED_YET;
00980 return;
00981 }
00982 else if (state == REJECTED) {
00983 group_state = REJECTED;
00984 }
00985 }
00986
00987 PublicationId writerId = GUID_UNKNOWN;
00988 for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00989 iter != endIter; ++iter) {
00990 if (group_state == COMPLETED) {
00991 (*iter)->accept_coherent (writerId, publisher_id);
00992 }
00993 else {
00994 (*iter)->reject_coherent (writerId, publisher_id);
00995 }
00996 }
00997
00998 if (group_state == COMPLETED) {
00999 for (DataReaderSet::const_iterator iter = datareader_set_.begin();
01000 iter != endIter; ++iter) {
01001 (*iter)->coherent_changes_completed (reader);
01002 (*iter)->reset_coherent_info (writerId, publisher_id);
01003 }
01004 }
01005 }
01006 #endif
01007
01008 RcHandle<EntityImpl>
01009 SubscriberImpl::parent() const
01010 {
01011 return this->participant_.lock();
01012 }
01013
01014 bool
01015 SubscriberImpl::validate_datareader_qos(const DDS::DataReaderQos & qos,
01016 const DDS::DataReaderQos & default_qos,
01017 DDS::Topic_ptr a_topic,
01018 DDS::DataReaderQos & dr_qos,
01019 bool mt)
01020 {
01021
01022
01023 if (qos == DATAREADER_QOS_DEFAULT) {
01024 dr_qos = default_qos;
01025
01026 } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
01027
01028 #ifndef OPENDDS_NO_MULTI_TOPIC
01029 if (mt) {
01030 if (DCPS_debug_level) {
01031 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
01032 ACE_TEXT("SubscriberImpl::create_datareader, ")
01033 ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
01034 ACE_TEXT("to create a MultiTopic DataReader.\n")));
01035 }
01036 return DDS::DataReader::_nil();
01037 }
01038 #else
01039 ACE_UNUSED_ARG(mt);
01040 #endif
01041 DDS::TopicQos topic_qos;
01042 a_topic->get_qos(topic_qos);
01043
01044 dr_qos = default_qos;
01045
01046 Qos_Helper::copy_from_topic_qos(dr_qos,
01047 topic_qos);
01048
01049 } else {
01050 dr_qos = qos;
01051 }
01052
01053 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false);
01054 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false);
01055 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false);
01056
01057 if (!Qos_Helper::valid(dr_qos)) {
01058 ACE_ERROR((LM_ERROR,
01059 ACE_TEXT("(%P|%t) ERROR: ")
01060 ACE_TEXT("SubscriberImpl::create_datareader, ")
01061 ACE_TEXT("invalid qos.\n")));
01062 return false;
01063 }
01064
01065 if (!Qos_Helper::consistent(dr_qos)) {
01066 ACE_ERROR((LM_ERROR,
01067 ACE_TEXT("(%P|%t) ERROR: ")
01068 ACE_TEXT("SubscriberImpl::create_datareader, ")
01069 ACE_TEXT("inconsistent qos.\n")));
01070 return false;
01071 }
01072
01073
01074 return true;
01075 }
01076
01077
01078 }
01079 }
01080
01081 OPENDDS_END_VERSIONED_NAMESPACE_DECL