00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "debug.h"
00010 #include "SubscriberImpl.h"
00011 #include "FeatureDisabledQosCheck.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Qos_Helper.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "MonitorFactory.h"
00017 #include "DataReaderImpl.h"
00018 #include "Service_Participant.h"
00019 #include "dds/DdsDcpsTypeSupportExtC.h"
00020 #include "TopicDescriptionImpl.h"
00021 #include "Marked_Default_Qos.h"
00022 #include "Transient_Kludge.h"
00023 #include "ContentFilteredTopicImpl.h"
00024 #include "MultiTopicImpl.h"
00025 #include "GroupRakeData.h"
00026 #include "MultiTopicDataReaderBase.h"
00027 #include "Util.h"
00028 #include "dds/DCPS/transport/framework/TransportImpl.h"
00029 #include "dds/DCPS/transport/framework/DataLinkSet.h"
00030
00031 #include "tao/debug.h"
00032
00033 #include "ace/Auto_Ptr.h"
00034 #include "ace/Vector_T.h"
00035
00036 #include <stdexcept>
00037
00038 namespace OpenDDS {
00039 namespace DCPS {
00040
00041
00042 SubscriberImpl::SubscriberImpl(DDS::InstanceHandle_t handle,
00043 const DDS::SubscriberQos & qos,
00044 DDS::SubscriberListener_ptr a_listener,
00045 const DDS::StatusMask& mask,
00046 DomainParticipantImpl* participant)
00047 : handle_(handle),
00048 qos_(qos),
00049 default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
00050 listener_mask_(mask),
00051 participant_(participant),
00052 domain_id_(participant->get_domain_id()),
00053 raw_latency_buffer_size_(0),
00054 raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00055 monitor_(0),
00056 access_depth_ (0)
00057 {
00058
00059 listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00060
00061 monitor_ = TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this);
00062 }
00063
00064
00065 SubscriberImpl::~SubscriberImpl()
00066 {
00067
00068
00069
00070 if (!is_clean()) {
00071 ACE_ERROR((LM_ERROR,
00072 ACE_TEXT("(%P|%t) ERROR: ")
00073 ACE_TEXT("SubscriberImpl::~SubscriberImpl, ")
00074 ACE_TEXT("some datareaders still exist.\n")));
00075 }
00076 }
00077
00078 DDS::InstanceHandle_t
00079 SubscriberImpl::get_instance_handle()
00080 {
00081 return handle_;
00082 }
00083
00084 bool
00085 SubscriberImpl::contains_reader(DDS::InstanceHandle_t a_handle)
00086 {
00087 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00088 guard,
00089 this->si_lock_,
00090 false);
00091
00092 for (DataReaderSet::iterator it(datareader_set_.begin());
00093 it != datareader_set_.end(); ++it) {
00094 if (a_handle == (*it)->get_instance_handle())
00095 return true;
00096 }
00097
00098 return false;
00099 }
00100
00101 DDS::DataReader_ptr
00102 SubscriberImpl::create_datareader(
00103 DDS::TopicDescription_ptr a_topic_desc,
00104 const DDS::DataReaderQos & qos,
00105 DDS::DataReaderListener_ptr a_listener,
00106 DDS::StatusMask mask)
00107 {
00108 if (CORBA::is_nil(a_topic_desc)) {
00109 ACE_ERROR((LM_ERROR,
00110 ACE_TEXT("(%P|%t) ERROR: ")
00111 ACE_TEXT("SubscriberImpl::create_datareader, ")
00112 ACE_TEXT("topic desc is nil.\n")));
00113 return DDS::DataReader::_nil();
00114 }
00115
00116 DDS::DataReaderQos dr_qos;
00117
00118 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
00119
00120 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00121 ContentFilteredTopicImpl* cft = 0;
00122 #endif
00123 #ifndef OPENDDS_NO_MULTI_TOPIC
00124 MultiTopicImpl* mt = 0;
00125 #else
00126 bool mt = false;
00127 #endif
00128
00129 if (!topic_servant) {
00130 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00131 cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
00132 if (cft) {
00133 DDS::Topic_var related;
00134 related = cft->get_related_topic();
00135 topic_servant = dynamic_cast<TopicImpl*>(related.in());
00136 }
00137 else
00138 #endif
00139 {
00140 #ifndef OPENDDS_NO_MULTI_TOPIC
00141 mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
00142 #endif
00143 }
00144 }
00145
00146 if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
00147 return DDS::DataReader::_nil();
00148
00149 #ifndef OPENDDS_NO_MULTI_TOPIC
00150 if (mt) {
00151 try {
00152 DDS::DataReader_var dr =
00153 mt->get_type_support()->create_multitopic_datareader();
00154 MultiTopicDataReaderBase* mtdr =
00155 dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
00156 mtdr->init(dr_qos, a_listener, mask, this, mt);
00157 if (enabled_.value() && qos_.entity_factory.autoenable_created_entities) {
00158 if (dr->enable() != DDS::RETCODE_OK) {
00159 ACE_ERROR((LM_ERROR,
00160 ACE_TEXT("(%P|%t) ERROR: ")
00161 ACE_TEXT("SubscriberImpl::create_datareader, ")
00162 ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
00163 return DDS::DataReader::_nil();
00164 }
00165 multitopic_reader_enabled(dr);
00166 }
00167 return dr._retn();
00168 } catch (const std::exception& e) {
00169 ACE_ERROR((LM_ERROR,
00170 ACE_TEXT("(%P|%t) ERROR: ")
00171 ACE_TEXT("SubscriberImpl::create_datareader, ")
00172 ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
00173 e.what()));
00174 }
00175 return DDS::DataReader::_nil();
00176 }
00177 #endif
00178
00179 OpenDDS::DCPS::TypeSupport_ptr typesupport =
00180 topic_servant->get_type_support();
00181
00182 if (0 == typesupport) {
00183 CORBA::String_var name = a_topic_desc->get_name();
00184 ACE_ERROR((LM_ERROR,
00185 ACE_TEXT("(%P|%t) ERROR: ")
00186 ACE_TEXT("SubscriberImpl::create_datareader, ")
00187 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00188 name.in()));
00189 return DDS::DataReader::_nil();
00190 }
00191
00192 DDS::DataReader_var dr_obj = typesupport->create_datareader();
00193
00194 DataReaderImpl* dr_servant =
00195 dynamic_cast<DataReaderImpl*>(dr_obj.in());
00196
00197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00198 if (cft) {
00199 dr_servant->enable_filtering(cft);
00200 }
00201 #endif
00202
00203
00204
00205
00206 dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_;
00207 dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_;
00208
00209 dr_servant->init(topic_servant,
00210 dr_qos,
00211 a_listener,
00212 mask,
00213 participant_,
00214 this,
00215 dr_obj.in());
00216
00217 if ((this->enabled_ == true)
00218 && (qos_.entity_factory.autoenable_created_entities == 1)) {
00219 DDS::ReturnCode_t ret
00220 = dr_servant->enable();
00221
00222 if (ret != DDS::RETCODE_OK) {
00223 ACE_ERROR((LM_ERROR,
00224 ACE_TEXT("(%P|%t) ERROR: ")
00225 ACE_TEXT("SubscriberImpl::create_datareader, ")
00226 ACE_TEXT("enable failed.\n")));
00227 return DDS::DataReader::_nil();
00228 }
00229 }
00230
00231
00232
00233 return DDS::DataReader::_duplicate(dr_obj.in());
00234
00235 }
00236
00237 DDS::ReturnCode_t
00238 SubscriberImpl::delete_datareader(::DDS::DataReader_ptr a_datareader)
00239 {
00240 DBG_ENTRY_LVL("SubscriberImpl", "delete_datareader", 6);
00241
00242 DataReaderImpl* dr_servant = dynamic_cast<DataReaderImpl*>(a_datareader);
00243
00244 if (dr_servant) {
00245 DDS::Subscriber_var dr_subscriber(dr_servant->get_subscriber());
00246
00247 if (dr_subscriber.in() != this) {
00248 RepoId id = dr_servant->get_subscription_id();
00249 GuidConverter converter(id);
00250 ACE_ERROR((LM_ERROR,
00251 ACE_TEXT("(%P|%t) SubscriberImpl::delete_datareader: ")
00252 ACE_TEXT("data reader %C doesn't belong to this subscriber.\n"),
00253 OPENDDS_STRING(converter).c_str()));
00254 return DDS::RETCODE_PRECONDITION_NOT_MET;
00255 }
00256
00257 int loans = dr_servant->num_zero_copies();
00258
00259 if (0 != loans) {
00260 return DDS::RETCODE_PRECONDITION_NOT_MET;
00261 }
00262 }
00263 if (dr_servant) {
00264
00265 dr_servant->prepare_to_delete();
00266 }
00267
00268 {
00269 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00270 guard,
00271 this->si_lock_,
00272 DDS::RETCODE_ERROR);
00273
00274 DataReaderMap::iterator it;
00275
00276 for (it = datareader_map_.begin();
00277 it != datareader_map_.end();
00278 ++it) {
00279 if (it->second == dr_servant) {
00280 break;
00281 }
00282 }
00283
00284 if (it == datareader_map_.end()) {
00285 DDS::TopicDescription_var td = a_datareader->get_topicdescription();
00286 CORBA::String_var topic_name = td->get_name();
00287 #ifndef OPENDDS_NO_MULTI_TOPIC
00288 OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00289 multitopic_reader_map_.find(topic_name.in());
00290 if (mt_iter != multitopic_reader_map_.end()) {
00291 DDS::DataReader_ptr ptr = mt_iter->second;
00292 dynamic_cast<MultiTopicDataReaderBase*>(ptr)->cleanup();
00293 multitopic_reader_map_.erase(mt_iter);
00294 return DDS::RETCODE_OK;
00295 }
00296 #endif
00297 RepoId id = dr_servant->get_subscription_id();
00298 GuidConverter converter(id);
00299 ACE_ERROR_RETURN((LM_ERROR,
00300 ACE_TEXT("(%P|%t) ERROR: ")
00301 ACE_TEXT("SubscriberImpl::delete_datareader: ")
00302 ACE_TEXT("datareader(topic_name=%C) %C not found.\n"),
00303 topic_name.in(),
00304 OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR);
00305 }
00306
00307 datareader_map_.erase(it);
00308 datareader_set_.erase(dr_servant);
00309 }
00310
00311 if (this->monitor_) {
00312 this->monitor_->report();
00313 }
00314
00315 RepoId subscription_id = dr_servant->get_subscription_id();
00316 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00317 if (!disco->remove_subscription(this->domain_id_,
00318 participant_->get_id(),
00319 subscription_id)) {
00320 ACE_ERROR_RETURN((LM_ERROR,
00321 ACE_TEXT("(%P|%t) ERROR: ")
00322 ACE_TEXT("SubscriberImpl::delete_datareader: ")
00323 ACE_TEXT(" could not remove subscription from discovery.\n")),
00324 ::DDS::RETCODE_ERROR);
00325 }
00326
00327
00328
00329 dr_servant->remove_all_associations();
00330 dr_servant->cleanup();
00331
00332
00333 dr_servant->_remove_ref();
00334 return DDS::RETCODE_OK;
00335 }
00336
00337 DDS::ReturnCode_t
00338 SubscriberImpl::delete_contained_entities()
00339 {
00340
00341 set_deleted(true);
00342
00343 ACE_Vector<DDS::DataReader_ptr> drs;
00344
00345 #ifndef OPENDDS_NO_MULTI_TOPIC
00346 {
00347 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00348 guard,
00349 this->si_lock_,
00350 DDS::RETCODE_ERROR);
00351 for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00352 multitopic_reader_map_.begin();
00353 mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
00354 drs.push_back(mt_iter->second);
00355 }
00356 }
00357
00358 for (size_t i = 0; i < drs.size(); ++i) {
00359 const DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00360 if (ret != DDS::RETCODE_OK) {
00361 ACE_ERROR_RETURN((LM_ERROR,
00362 ACE_TEXT("(%P|%t) ERROR: ")
00363 ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00364 ACE_TEXT("failed to delete datareader\n")),
00365 ret);
00366 }
00367 }
00368 drs.clear();
00369 #endif
00370
00371 {
00372 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00373 guard,
00374 this->si_lock_,
00375 DDS::RETCODE_ERROR);
00376 DataReaderMap::iterator it;
00377 DataReaderMap::iterator itEnd = datareader_map_.end();
00378
00379 for (it = datareader_map_.begin(); it != itEnd; ++it) {
00380 drs.push_back(it->second);
00381 }
00382 }
00383
00384 size_t num_rds = drs.size();
00385
00386 for (size_t i = 0; i < num_rds; ++i) {
00387 DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00388
00389 if (ret != DDS::RETCODE_OK) {
00390 ACE_ERROR_RETURN((LM_ERROR,
00391 ACE_TEXT("(%P|%t) ERROR: ")
00392 ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00393 ACE_TEXT("failed to delete datareader\n")),
00394 ret);
00395 }
00396 }
00397
00398
00399 set_deleted(false);
00400
00401 return DDS::RETCODE_OK;
00402 }
00403
00404 DDS::DataReader_ptr
00405 SubscriberImpl::lookup_datareader(
00406 const char * topic_name)
00407 {
00408 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00409 guard,
00410 this->si_lock_,
00411 DDS::DataReader::_nil());
00412
00413
00414
00415 DataReaderMap::iterator it = datareader_map_.find(topic_name);
00416
00417 if (it == datareader_map_.end()) {
00418 #ifndef OPENDDS_NO_MULTI_TOPIC
00419 OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00420 multitopic_reader_map_.find(topic_name);
00421 if (mt_iter != multitopic_reader_map_.end()) {
00422 return DDS::DataReader::_duplicate(mt_iter->second);
00423 }
00424 #endif
00425
00426 if (DCPS_debug_level >= 2) {
00427 ACE_DEBUG((LM_DEBUG,
00428 ACE_TEXT("(%P|%t) ")
00429 ACE_TEXT("SubscriberImpl::lookup_datareader, ")
00430 ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
00431 topic_name));
00432 }
00433
00434 return DDS::DataReader::_nil();
00435
00436 } else {
00437 return DDS::DataReader::_duplicate(it->second);
00438 }
00439 }
00440
00441 DDS::ReturnCode_t
00442 SubscriberImpl::get_datareaders(
00443 DDS::DataReaderSeq & readers,
00444 DDS::SampleStateMask sample_states,
00445 DDS::ViewStateMask view_states,
00446 DDS::InstanceStateMask instance_states)
00447 {
00448 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00449 guard,
00450 this->si_lock_,
00451 DDS::RETCODE_ERROR);
00452
00453 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00454
00455
00456
00457 if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) {
00458 if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
00459 return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00460 }
00461
00462 if (this->qos_.presentation.ordered_access) {
00463
00464 GroupRakeData data;
00465 for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00466 pos != datareader_set_.end(); ++pos) {
00467 (*pos)->get_ordered_data (data, sample_states, view_states, instance_states);
00468 }
00469
00470
00471
00472 data.get_datareaders (readers);
00473
00474 return DDS::RETCODE_OK;
00475 }
00476 }
00477 #endif
00478
00479
00480 int count(0);
00481 readers.length(count);
00482
00483 for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00484 pos != datareader_set_.end(); ++pos) {
00485 if ((*pos)->have_sample_states(sample_states) &&
00486 (*pos)->have_view_states(view_states) &&
00487 (*pos)->have_instance_states(instance_states)) {
00488 push_back(readers, (*pos)->get_dr_obj_ref());
00489 ++count;
00490 }
00491 }
00492
00493 return DDS::RETCODE_OK;
00494 }
00495
00496 DDS::ReturnCode_t
00497 SubscriberImpl::notify_datareaders()
00498 {
00499 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00500 guard,
00501 this->si_lock_,
00502 DDS::RETCODE_ERROR);
00503
00504 DataReaderMap::iterator it;
00505
00506 for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
00507 if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00508 DDS::DataReaderListener_var listener = it->second->get_listener();
00509 if (!CORBA::is_nil (listener)) {
00510 listener->on_data_available(it->second);
00511 }
00512
00513 it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00514 }
00515 }
00516
00517 #ifndef OPENDDS_NO_MULTI_TOPIC
00518 for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator it =
00519 multitopic_reader_map_.begin(); it != multitopic_reader_map_.end();
00520 ++it) {
00521 MultiTopicDataReaderBase* dri =
00522 dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
00523 if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00524 DDS::DataReaderListener_var listener = dri->get_listener();
00525 if (!CORBA::is_nil(listener)) {
00526 listener->on_data_available(dri);
00527 }
00528 dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00529 }
00530 }
00531 #endif
00532
00533 return DDS::RETCODE_OK;
00534 }
00535
00536 DDS::ReturnCode_t
00537 SubscriberImpl::set_qos(
00538 const DDS::SubscriberQos & qos)
00539 {
00540
00541 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00542
00543 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00544 if (qos_ == qos)
00545 return DDS::RETCODE_OK;
00546
00547
00548 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00549 return DDS::RETCODE_IMMUTABLE_POLICY;
00550
00551 } else {
00552 qos_ = qos;
00553
00554 DrIdToQosMap idToQosMap;
00555 {
00556 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00557 guard,
00558 this->si_lock_,
00559 DDS::RETCODE_ERROR);
00560
00561 DataReaderMap::iterator endIter = datareader_map_.end();
00562
00563 for (DataReaderMap::iterator iter = datareader_map_.begin();
00564 iter != endIter; ++iter) {
00565 DataReaderImpl* reader = iter->second;
00566 reader->set_subscriber_qos (qos);
00567 DDS::DataReaderQos qos;
00568 reader->get_qos(qos);
00569 RepoId id = reader->get_subscription_id();
00570 std::pair<DrIdToQosMap::iterator, bool> pair
00571 = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
00572
00573 if (pair.second == false) {
00574 GuidConverter converter(id);
00575 ACE_ERROR_RETURN((LM_ERROR,
00576 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
00577 ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
00578 OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR);
00579 }
00580 }
00581 }
00582
00583 DrIdToQosMap::iterator iter = idToQosMap.begin();
00584
00585 while (iter != idToQosMap.end()) {
00586 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00587 const bool status
00588 = disco->update_subscription_qos(this->domain_id_,
00589 participant_->get_id(),
00590 iter->first,
00591 iter->second,
00592 this->qos_);
00593
00594 if (!status) {
00595 ACE_ERROR_RETURN((LM_ERROR,
00596 ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
00597 ACE_TEXT("failed. \n")),
00598 DDS::RETCODE_ERROR);
00599 }
00600
00601 ++iter;
00602 }
00603 }
00604
00605 return DDS::RETCODE_OK;
00606
00607 } else {
00608 return DDS::RETCODE_INCONSISTENT_POLICY;
00609 }
00610 }
00611
00612 DDS::ReturnCode_t
00613 SubscriberImpl::get_qos(
00614 DDS::SubscriberQos & qos)
00615 {
00616 qos = qos_;
00617 return DDS::RETCODE_OK;
00618 }
00619
00620 DDS::ReturnCode_t
00621 SubscriberImpl::set_listener(
00622 DDS::SubscriberListener_ptr a_listener,
00623 DDS::StatusMask mask)
00624 {
00625 listener_mask_ = mask;
00626
00627 listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00628 return DDS::RETCODE_OK;
00629 }
00630
00631 DDS::SubscriberListener_ptr
00632 SubscriberImpl::get_listener()
00633 {
00634 return DDS::SubscriberListener::_duplicate(listener_.in());
00635 }
00636
00637 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00638
00639 DDS::ReturnCode_t
00640 SubscriberImpl::begin_access()
00641 {
00642 if (enabled_ == false) {
00643 ACE_ERROR_RETURN((LM_ERROR,
00644 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
00645 ACE_TEXT(" Subscriber is not enabled!\n")),
00646 DDS::RETCODE_NOT_ENABLED);
00647 }
00648
00649 if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00650 return DDS::RETCODE_OK;
00651 }
00652
00653 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00654 guard,
00655 this->si_lock_,
00656 DDS::RETCODE_ERROR);
00657
00658 ++this->access_depth_;
00659
00660
00661
00662 if (this->access_depth_ == 1) {
00663 for (DataReaderSet::iterator it = this->datareader_set_.begin();
00664 it != this->datareader_set_.end(); ++it) {
00665 (*it)->begin_access();
00666 }
00667 }
00668
00669 return DDS::RETCODE_OK;
00670 }
00671
00672 DDS::ReturnCode_t
00673 SubscriberImpl::end_access()
00674 {
00675 if (enabled_ == false) {
00676 ACE_ERROR_RETURN((LM_ERROR,
00677 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00678 ACE_TEXT(" Publisher is not enabled!\n")),
00679 DDS::RETCODE_NOT_ENABLED);
00680 }
00681
00682 if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00683 return DDS::RETCODE_OK;
00684 }
00685
00686 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00687 guard,
00688 this->si_lock_,
00689 DDS::RETCODE_ERROR);
00690
00691 if (this->access_depth_ == 0) {
00692 ACE_ERROR_RETURN((LM_ERROR,
00693 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00694 ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00695 DDS::RETCODE_PRECONDITION_NOT_MET);
00696 }
00697
00698 --this->access_depth_;
00699
00700
00701
00702 if (this->access_depth_ == 0) {
00703 for (DataReaderSet::iterator it = this->datareader_set_.begin();
00704 it != this->datareader_set_.end(); ++it) {
00705 (*it)->end_access();
00706 }
00707 }
00708
00709 return DDS::RETCODE_OK;
00710 }
00711
00712 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00713
00714 DDS::DomainParticipant_ptr
00715 SubscriberImpl::get_participant()
00716 {
00717 return DDS::DomainParticipant::_duplicate(participant_);
00718 }
00719
00720 DDS::ReturnCode_t
00721 SubscriberImpl::set_default_datareader_qos(
00722 const DDS::DataReaderQos & qos)
00723 {
00724 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00725 default_datareader_qos_ = qos;
00726 return DDS::RETCODE_OK;
00727
00728 } else {
00729 return DDS::RETCODE_INCONSISTENT_POLICY;
00730 }
00731 }
00732
00733 DDS::ReturnCode_t
00734 SubscriberImpl::get_default_datareader_qos(
00735 DDS::DataReaderQos & qos)
00736 {
00737 qos = default_datareader_qos_;
00738 return DDS::RETCODE_OK;
00739 }
00740
00741 DDS::ReturnCode_t
00742 SubscriberImpl::copy_from_topic_qos(
00743 DDS::DataReaderQos & a_datareader_qos,
00744 const DDS::TopicQos & a_topic_qos)
00745 {
00746 if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
00747 return DDS::RETCODE_OK;
00748
00749 } else {
00750 return DDS::RETCODE_INCONSISTENT_POLICY;
00751 }
00752 }
00753
00754 DDS::ReturnCode_t
00755 SubscriberImpl::enable()
00756 {
00757
00758
00759
00760
00761
00762
00763 if (this->is_enabled()) {
00764 return DDS::RETCODE_OK;
00765 }
00766
00767 if (this->participant_->is_enabled() == false) {
00768 return DDS::RETCODE_PRECONDITION_NOT_MET;
00769 }
00770
00771 if (this->monitor_) {
00772 this->monitor_->report();
00773 }
00774
00775 this->set_enabled();
00776 return DDS::RETCODE_OK;
00777 }
00778
00779 bool
00780 SubscriberImpl::is_clean() const
00781 {
00782 bool sub_is_clean = datareader_map_.empty();
00783
00784 if (!sub_is_clean && !TheTransientKludge->is_enabled()) {
00785
00786 return datareader_map_.size() == 4;
00787 }
00788
00789 return sub_is_clean;
00790 }
00791
00792 void
00793 SubscriberImpl::data_received(DataReaderImpl* reader)
00794 {
00795 ACE_GUARD(ACE_Recursive_Thread_Mutex,
00796 guard,
00797 this->si_lock_);
00798 datareader_set_.insert(reader);
00799 }
00800
00801 DDS::ReturnCode_t
00802 SubscriberImpl::reader_enabled(const char* topic_name,
00803 DataReaderImpl* reader)
00804 {
00805 if (DCPS_debug_level >= 4) {
00806 ACE_DEBUG((LM_DEBUG,
00807 ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
00808 ACE_TEXT("datareader(topic_name=%C) enabled\n"),
00809 topic_name));
00810 }
00811
00812 this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
00813
00814
00815
00816 reader->_add_ref();
00817
00818 if (this->monitor_) {
00819 this->monitor_->report();
00820 }
00821
00822 return DDS::RETCODE_OK;
00823 }
00824
00825 #ifndef OPENDDS_NO_MULTI_TOPIC
00826 DDS::ReturnCode_t
00827 SubscriberImpl::multitopic_reader_enabled(DDS::DataReader_ptr reader)
00828 {
00829 DDS::TopicDescription_var td = reader->get_topicdescription();
00830 CORBA::String_var topic = td->get_name();
00831 multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
00832 return DDS::RETCODE_OK;
00833 }
00834
00835 void
00836 SubscriberImpl::remove_from_datareader_set(DataReaderImpl* reader)
00837 {
00838 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, si_lock_);
00839 datareader_set_.erase(reader);
00840 }
00841 #endif
00842
00843 DDS::SubscriberListener_ptr
00844 SubscriberImpl::listener_for(::DDS::StatusKind kind)
00845 {
00846
00847
00848
00849 if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
00850 return participant_->listener_for(kind);
00851
00852 } else {
00853 return DDS::SubscriberListener::_duplicate(listener_.in());
00854 }
00855 }
00856
00857 unsigned int&
00858 SubscriberImpl::raw_latency_buffer_size()
00859 {
00860 return this->raw_latency_buffer_size_;
00861 }
00862
00863 DataCollector<double>::OnFull&
00864 SubscriberImpl::raw_latency_buffer_type()
00865 {
00866 return this->raw_latency_buffer_type_;
00867 }
00868
00869 void
00870 SubscriberImpl::get_subscription_ids(SubscriptionIdVec& subs)
00871 {
00872 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00873 guard,
00874 this->si_lock_,
00875 );
00876
00877 subs.reserve(datareader_map_.size());
00878 for (DataReaderMap::iterator iter = datareader_map_.begin();
00879 iter != datareader_map_.end();
00880 ++iter) {
00881 subs.push_back(iter->second->get_subscription_id());
00882 }
00883 }
00884
00885 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00886 void
00887 SubscriberImpl::update_ownership_strength (const PublicationId& pub_id,
00888 const CORBA::Long& ownership_strength)
00889 {
00890 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00891 guard,
00892 this->si_lock_,
00893 );
00894
00895 for (DataReaderMap::iterator iter = datareader_map_.begin();
00896 iter != datareader_map_.end();
00897 ++iter) {
00898 if (!iter->second->is_bit()) {
00899 iter->second->update_ownership_strength(pub_id, ownership_strength);
00900 }
00901 }
00902 }
00903 #endif
00904
00905
00906 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00907 void
00908 SubscriberImpl::coherent_change_received (RepoId& publisher_id,
00909 DataReaderImpl* reader,
00910 Coherent_State& group_state)
00911 {
00912
00913
00914 group_state = COMPLETED;
00915 DataReaderSet::const_iterator endIter = datareader_set_.end();
00916 for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00917 iter != endIter; ++iter) {
00918
00919 Coherent_State state = COMPLETED;
00920 (*iter)->coherent_change_received (publisher_id, state);
00921 if (state == NOT_COMPLETED_YET) {
00922 group_state = state;
00923 return;
00924 }
00925 else if (state == REJECTED) {
00926 group_state = REJECTED;
00927 }
00928 }
00929
00930 PublicationId writerId = GUID_UNKNOWN;
00931 for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00932 iter != endIter; ++iter) {
00933 if (group_state == COMPLETED) {
00934 (*iter)->accept_coherent (writerId, publisher_id);
00935 }
00936 else {
00937 (*iter)->reject_coherent (writerId, publisher_id);
00938 }
00939 }
00940
00941 if (group_state == COMPLETED) {
00942 for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00943 iter != endIter; ++iter) {
00944 (*iter)->coherent_changes_completed (reader);
00945 (*iter)->reset_coherent_info (writerId, publisher_id);
00946 }
00947 }
00948 }
00949 #endif
00950
00951 EntityImpl*
00952 SubscriberImpl::parent() const
00953 {
00954 return this->participant_;
00955 }
00956
00957 bool
00958 SubscriberImpl::validate_datareader_qos(const DDS::DataReaderQos & qos,
00959 const DDS::DataReaderQos & default_qos,
00960 DDS::Topic_ptr a_topic,
00961 DDS::DataReaderQos & dr_qos,
00962 bool mt)
00963 {
00964
00965
00966 if (qos == DATAREADER_QOS_DEFAULT) {
00967 dr_qos = default_qos;
00968
00969 } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
00970
00971 #ifndef OPENDDS_NO_MULTI_TOPIC
00972 if (mt) {
00973 if (DCPS_debug_level) {
00974 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00975 ACE_TEXT("SubscriberImpl::create_datareader, ")
00976 ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
00977 ACE_TEXT("to create a MultiTopic DataReader.\n")));
00978 }
00979 return DDS::DataReader::_nil();
00980 }
00981 #else
00982 ACE_UNUSED_ARG(mt);
00983 #endif
00984 DDS::TopicQos topic_qos;
00985 a_topic->get_qos(topic_qos);
00986
00987 dr_qos = default_qos;
00988
00989 Qos_Helper::copy_from_topic_qos(dr_qos,
00990 topic_qos);
00991
00992 } else {
00993 dr_qos = qos;
00994 }
00995
00996 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false);
00997 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false);
00998 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false);
00999
01000 if (!Qos_Helper::valid(dr_qos)) {
01001 ACE_ERROR((LM_ERROR,
01002 ACE_TEXT("(%P|%t) ERROR: ")
01003 ACE_TEXT("SubscriberImpl::create_datareader, ")
01004 ACE_TEXT("invalid qos.\n")));
01005 return false;
01006 }
01007
01008 if (!Qos_Helper::consistent(dr_qos)) {
01009 ACE_ERROR((LM_ERROR,
01010 ACE_TEXT("(%P|%t) ERROR: ")
01011 ACE_TEXT("SubscriberImpl::create_datareader, ")
01012 ACE_TEXT("inconsistent qos.\n")));
01013 return false;
01014 }
01015
01016
01017 return true;
01018 }
01019
01020
01021 }
01022 }