SubscriberImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "debug.h"
00010 #include "SubscriberImpl.h"
00011 #include "FeatureDisabledQosCheck.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Qos_Helper.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "MonitorFactory.h"
00017 #include "DataReaderImpl.h"
00018 #include "Service_Participant.h"
00019 #include "dds/DdsDcpsTypeSupportExtC.h"
00020 #include "TopicDescriptionImpl.h"
00021 #include "Marked_Default_Qos.h"
00022 #include "Transient_Kludge.h"
00023 #include "ContentFilteredTopicImpl.h"
00024 #include "MultiTopicImpl.h"
00025 #include "GroupRakeData.h"
00026 #include "MultiTopicDataReaderBase.h"
00027 #include "Util.h"
00028 #include "dds/DCPS/transport/framework/TransportImpl.h"
00029 #include "dds/DCPS/transport/framework/DataLinkSet.h"
00030 
00031 #include "tao/debug.h"
00032 
00033 #include "ace/Auto_Ptr.h"
00034 #include "ace/Vector_T.h"
00035 
00036 #include <stdexcept>
00037 
00038 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00039 
00040 namespace OpenDDS {
00041 namespace DCPS {
00042 
00043 SubscriberImpl::SubscriberImpl(DDS::InstanceHandle_t       handle,
00044                                const DDS::SubscriberQos &  qos,
00045                                DDS::SubscriberListener_ptr a_listener,
00046                                const DDS::StatusMask&      mask,
00047                                DomainParticipantImpl*      participant)
00048   : handle_(handle),
00049   qos_(qos),
00050   default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
00051   listener_mask_(mask),
00052   participant_(*participant),
00053   domain_id_(participant->get_domain_id()),
00054   raw_latency_buffer_size_(0),
00055   raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00056   monitor_(0),
00057   access_depth_ (0)
00058 {
00059   //Note: OK to duplicate a nil.
00060   listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00061 
00062   monitor_ = TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this);
00063 }
00064 
00065 SubscriberImpl::~SubscriberImpl()
00066 {
00067   // The datareaders should be deleted already before calling delete
00068   // subscriber.
00069   if (!is_clean()) {
00070     ACE_ERROR((LM_ERROR,
00071                ACE_TEXT("(%P|%t) ERROR: ")
00072                ACE_TEXT("SubscriberImpl::~SubscriberImpl, ")
00073                ACE_TEXT("%B datareaders still exist.\n"),
00074                datareader_map_.size ()));
00075   }
00076 }
00077 
00078 DDS::InstanceHandle_t
00079 SubscriberImpl::get_instance_handle()
00080 {
00081   return handle_;
00082 }
00083 
00084 bool
00085 SubscriberImpl::contains_reader(DDS::InstanceHandle_t a_handle)
00086 {
00087   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00088                    guard,
00089                    this->si_lock_,
00090                    false);
00091 
00092   for (DataReaderSet::iterator it(datareader_set_.begin());
00093        it != datareader_set_.end(); ++it) {
00094     if (a_handle == (*it)->get_instance_handle())
00095       return true;
00096   }
00097 
00098   return false;
00099 }
00100 
00101 DDS::DataReader_ptr
00102 SubscriberImpl::create_datareader(
00103   DDS::TopicDescription_ptr   a_topic_desc,
00104   const DDS::DataReaderQos &  qos,
00105   DDS::DataReaderListener_ptr a_listener,
00106   DDS::StatusMask             mask)
00107 {
00108   if (CORBA::is_nil(a_topic_desc)) {
00109     ACE_ERROR((LM_ERROR,
00110                ACE_TEXT("(%P|%t) ERROR: ")
00111                ACE_TEXT("SubscriberImpl::create_datareader, ")
00112                ACE_TEXT("topic desc is nil.\n")));
00113     return DDS::DataReader::_nil();
00114   }
00115 
00116   DDS::DataReaderQos dr_qos;
00117   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00118   if (!participant)
00119     return DDS::DataReader::_nil();
00120 
00121   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
00122 
00123 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00124   ContentFilteredTopicImpl* cft = 0;
00125 #endif
00126 #ifndef OPENDDS_NO_MULTI_TOPIC
00127   MultiTopicImpl* mt = 0;
00128 #else
00129   bool mt = false;
00130 #endif
00131 
00132   if (!topic_servant) {
00133 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00134     cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
00135     if (cft) {
00136       DDS::Topic_var related;
00137       related = cft->get_related_topic();
00138       topic_servant = dynamic_cast<TopicImpl*>(related.in());
00139     }
00140     else
00141 #endif
00142     {
00143 #ifndef OPENDDS_NO_MULTI_TOPIC
00144       mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
00145 #endif
00146     }
00147   }
00148 
00149   if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
00150     return DDS::DataReader::_nil();
00151 
00152 #ifndef OPENDDS_NO_MULTI_TOPIC
00153   if (mt) {
00154     try {
00155       DDS::DataReader_var dr =
00156         mt->get_type_support()->create_multitopic_datareader();
00157       MultiTopicDataReaderBase* mtdr =
00158         dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
00159       mtdr->init(dr_qos, a_listener, mask, this, mt);
00160       if (enabled_.value() && qos_.entity_factory.autoenable_created_entities) {
00161         if (dr->enable() != DDS::RETCODE_OK) {
00162           ACE_ERROR((LM_ERROR,
00163                      ACE_TEXT("(%P|%t) ERROR: ")
00164                      ACE_TEXT("SubscriberImpl::create_datareader, ")
00165                      ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
00166           return DDS::DataReader::_nil();
00167         }
00168         multitopic_reader_enabled(dr);
00169       }
00170       return dr._retn();
00171     } catch (const std::exception& e) {
00172       ACE_ERROR((LM_ERROR,
00173                  ACE_TEXT("(%P|%t) ERROR: ")
00174                  ACE_TEXT("SubscriberImpl::create_datareader, ")
00175                  ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
00176                  e.what()));
00177     }
00178     return DDS::DataReader::_nil();
00179   }
00180 #endif
00181 
00182   OpenDDS::DCPS::TypeSupport_ptr typesupport =
00183     topic_servant->get_type_support();
00184 
00185   if (0 == typesupport) {
00186     CORBA::String_var name = a_topic_desc->get_name();
00187     ACE_ERROR((LM_ERROR,
00188                ACE_TEXT("(%P|%t) ERROR: ")
00189                ACE_TEXT("SubscriberImpl::create_datareader, ")
00190                ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00191                name.in()));
00192     return DDS::DataReader::_nil();
00193   }
00194 
00195   DDS::DataReader_var dr_obj = typesupport->create_datareader();
00196 
00197   DataReaderImpl* dr_servant =
00198     dynamic_cast<DataReaderImpl*>(dr_obj.in());
00199 
00200   if (dr_servant == 0) {
00201     ACE_ERROR((LM_ERROR,
00202         ACE_TEXT("(%P|%t) ERROR: ")
00203         ACE_TEXT("SubscriberImpl::create_datareader, ")
00204         ACE_TEXT("servant is nil.\n")));
00205     return DDS::DataReader::_nil();
00206   }
00207 
00208 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00209   if (cft) {
00210     dr_servant->enable_filtering(cft);
00211   }
00212 #endif
00213 
00214   // Propagate the latency buffer data collection configuration.
00215   // @TODO: Determine whether we want to exclude the Builtin Topic
00216   //        readers from data gathering.
00217   dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_;
00218   dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_;
00219 
00220 
00221   dr_servant->init(topic_servant,
00222                    dr_qos,
00223                    a_listener,
00224                    mask,
00225                    participant.in(),
00226                    this);
00227 
00228   if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) {
00229     const DDS::ReturnCode_t ret = dr_servant->enable();
00230 
00231     if (ret != DDS::RETCODE_OK) {
00232       ACE_ERROR((LM_WARNING,
00233                  ACE_TEXT("(%P|%t) WARNING: ")
00234                  ACE_TEXT("SubscriberImpl::create_datareader, ")
00235                  ACE_TEXT("enable failed.\n")));
00236       return DDS::DataReader::_nil();
00237     }
00238   } else {
00239     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, 0);
00240     readers_not_enabled_.insert(rchandle_from(dr_servant));
00241   }
00242 
00243   // add created data reader to this' data reader container -
00244   // done in enable_reader
00245   return DDS::DataReader::_duplicate(dr_obj.in());
00246 }
00247 
00248 DDS::ReturnCode_t
00249 SubscriberImpl::delete_datareader(::DDS::DataReader_ptr a_datareader)
00250 {
00251   DBG_ENTRY_LVL("SubscriberImpl", "delete_datareader", 6);
00252 
00253   DataReaderImpl_rch dr_servant = rchandle_from(dynamic_cast<DataReaderImpl*>(a_datareader));
00254 
00255   if (dr_servant) { // for MultiTopic this will be false
00256     DDS::Subscriber_var dr_subscriber(dr_servant->get_subscriber());
00257 
00258     if (dr_subscriber.in() != this) {
00259       RepoId id = dr_servant->get_subscription_id();
00260       GuidConverter converter(id);
00261       ACE_ERROR((LM_ERROR,
00262                  ACE_TEXT("(%P|%t) SubscriberImpl::delete_datareader: ")
00263                  ACE_TEXT("data reader %C doesn't belong to this subscriber.\n"),
00264                  OPENDDS_STRING(converter).c_str()));
00265       return DDS::RETCODE_PRECONDITION_NOT_MET;
00266     }
00267 
00268     if (dr_servant->has_zero_copies()) {
00269       return DDS::RETCODE_PRECONDITION_NOT_MET;
00270     }
00271   }
00272   if (dr_servant) {
00273     // marks entity as deleted and stops future associating
00274     dr_servant->prepare_to_delete();
00275   }
00276 
00277   {
00278     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00279                      guard,
00280                      this->si_lock_,
00281                      DDS::RETCODE_ERROR);
00282 
00283     DataReaderMap::iterator it;
00284 
00285     for (it = datareader_map_.begin();
00286          it != datareader_map_.end();
00287          ++it) {
00288       if (it->second == dr_servant) {
00289         break;
00290       }
00291     }
00292 
00293     if (it == datareader_map_.end()) {
00294       DDS::TopicDescription_var td = a_datareader->get_topicdescription();
00295       CORBA::String_var topic_name = td->get_name();
00296 #ifndef OPENDDS_NO_MULTI_TOPIC
00297       OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00298         multitopic_reader_map_.find(topic_name.in());
00299       if (mt_iter != multitopic_reader_map_.end()) {
00300         DDS::DataReader_ptr ptr = mt_iter->second;
00301         MultiTopicDataReaderBase* mtdrb = dynamic_cast<MultiTopicDataReaderBase*>(ptr);
00302         if (!mtdrb) {
00303           ACE_ERROR_RETURN((LM_ERROR,
00304             ACE_TEXT("(%P|%t) ERROR: ")
00305             ACE_TEXT("SubscriberImpl::delete_datareader: ")
00306             ACE_TEXT("datareader(topic_name=%C)")
00307             ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n"),
00308             topic_name.in()), ::DDS::RETCODE_ERROR);
00309         }
00310         mtdrb->cleanup();
00311         multitopic_reader_map_.erase(mt_iter);
00312         return DDS::RETCODE_OK;
00313       }
00314 #endif
00315       if (!dr_servant) {
00316         ACE_ERROR_RETURN((LM_ERROR,
00317                           ACE_TEXT("(%P|%t) ERROR: ")
00318                           ACE_TEXT("SubscriberImpl::delete_datareader: ")
00319                           ACE_TEXT("datareader(topic_name=%C)")
00320                           ACE_TEXT("for unknown repo id not found.\n"),
00321                           topic_name.in()), ::DDS::RETCODE_ERROR);
00322       }
00323       RepoId id = dr_servant->get_subscription_id();
00324       GuidConverter converter(id);
00325       ACE_ERROR_RETURN((LM_ERROR,
00326                         ACE_TEXT("(%P|%t) ERROR: ")
00327                         ACE_TEXT("SubscriberImpl::delete_datareader: ")
00328                         ACE_TEXT("datareader(topic_name=%C) %C not found.\n"),
00329                         topic_name.in(),
00330                         OPENDDS_STRING(converter).c_str()),
00331                         ::DDS::RETCODE_ERROR);
00332     }
00333 
00334     datareader_map_.erase(it);
00335     datareader_set_.erase(dr_servant);
00336   }
00337 
00338   if (this->monitor_) {
00339     this->monitor_->report();
00340   }
00341 
00342   if (!dr_servant) {
00343     ACE_ERROR_RETURN((LM_ERROR,
00344                       ACE_TEXT("(%P|%t) ERROR: ")
00345                       ACE_TEXT("SubscriberImpl::delete_datareader: ")
00346                       ACE_TEXT("could not remove unknown subscription.\n")),
00347                       ::DDS::RETCODE_ERROR);
00348   }
00349 
00350   RepoId subscription_id = dr_servant->get_subscription_id();
00351   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00352   if (!disco->remove_subscription(this->domain_id_,
00353                                   this->dp_id_,
00354                                   subscription_id)) {
00355     ACE_ERROR_RETURN((LM_ERROR,
00356                       ACE_TEXT("(%P|%t) ERROR: ")
00357                       ACE_TEXT("SubscriberImpl::delete_datareader: ")
00358                       ACE_TEXT(" could not remove subscription from discovery.\n")),
00359                       ::DDS::RETCODE_ERROR);
00360   }
00361 
00362   // Call remove association before unregistering the datareader from the transport,
00363   // otherwise some callbacks resulted from remove_association may be lost.
00364   dr_servant->remove_all_associations();
00365   dr_servant->cleanup();
00366   return DDS::RETCODE_OK;
00367 }
00368 
00369 DDS::ReturnCode_t
00370 SubscriberImpl::delete_contained_entities()
00371 {
00372   // mark that the entity is being deleted
00373   set_deleted(true);
00374 
00375   ACE_Vector<DDS::DataReader_ptr> drs;
00376 
00377 #ifndef OPENDDS_NO_MULTI_TOPIC
00378   {
00379     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00380                      guard,
00381                      this->si_lock_,
00382                      DDS::RETCODE_ERROR);
00383     for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00384            multitopic_reader_map_.begin();
00385          mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
00386       drs.push_back(mt_iter->second);
00387     }
00388   }
00389 
00390   for (size_t i = 0; i < drs.size(); ++i) {
00391     const DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00392     if (ret != DDS::RETCODE_OK) {
00393       ACE_ERROR_RETURN((LM_ERROR,
00394                         ACE_TEXT("(%P|%t) ERROR: ")
00395                         ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00396                         ACE_TEXT("failed to delete datareader\n")),
00397                        ret);
00398     }
00399   }
00400   drs.clear();
00401 #endif
00402 
00403   {
00404     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00405                      guard,
00406                      this->si_lock_,
00407                      DDS::RETCODE_ERROR);
00408     DataReaderMap::iterator it;
00409     DataReaderMap::iterator itEnd = datareader_map_.end();
00410 
00411     for (it = datareader_map_.begin(); it != itEnd; ++it) {
00412       drs.push_back(it->second.in());
00413     }
00414   }
00415 
00416   for (size_t i = 0; i < drs.size(); ++i) {
00417     DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00418 
00419     if (ret != DDS::RETCODE_OK) {
00420       ACE_ERROR_RETURN((LM_ERROR,
00421                         ACE_TEXT("(%P|%t) ERROR: ")
00422                         ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00423                         ACE_TEXT("failed to delete datareader\n")),
00424                        ret);
00425     }
00426   }
00427 
00428   // the subscriber can now start creating new publications
00429   set_deleted(false);
00430 
00431   return DDS::RETCODE_OK;
00432 }
00433 
00434 DDS::DataReader_ptr
00435 SubscriberImpl::lookup_datareader(
00436   const char * topic_name)
00437 {
00438   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00439                    guard,
00440                    this->si_lock_,
00441                    DDS::DataReader::_nil());
00442 
00443   // If multiple entries whose key is "topic_name" then which one is
00444   // returned ? Spec does not limit which one should give.
00445   DataReaderMap::iterator it = datareader_map_.find(topic_name);
00446 
00447   if (it == datareader_map_.end()) {
00448 #ifndef OPENDDS_NO_MULTI_TOPIC
00449     OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00450       multitopic_reader_map_.find(topic_name);
00451     if (mt_iter != multitopic_reader_map_.end()) {
00452       return DDS::DataReader::_duplicate(mt_iter->second);
00453     }
00454 #endif
00455 
00456     if (DCPS_debug_level >= 2) {
00457       ACE_DEBUG((LM_DEBUG,
00458                  ACE_TEXT("(%P|%t) ")
00459                  ACE_TEXT("SubscriberImpl::lookup_datareader, ")
00460                  ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
00461                  topic_name));
00462     }
00463 
00464     return DDS::DataReader::_nil();
00465 
00466   } else {
00467     return DDS::DataReader::_duplicate(it->second.in());
00468   }
00469 }
00470 
00471 DDS::ReturnCode_t
00472 SubscriberImpl::get_datareaders(
00473   DDS::DataReaderSeq &   readers,
00474   DDS::SampleStateMask   sample_states,
00475   DDS::ViewStateMask     view_states,
00476   DDS::InstanceStateMask instance_states)
00477 {
00478   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00479                    guard,
00480                    this->si_lock_,
00481                    DDS::RETCODE_ERROR);
00482 
00483 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00484   // If access_scope is GROUP and ordered_access is true then return readers as
00485   // list which may contain same readers multiple times. Otherwise return readers
00486   // as set.
00487   if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) {
00488     if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
00489       return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00490     }
00491 
00492     if (this->qos_.presentation.ordered_access) {
00493 
00494       GroupRakeData data;
00495       for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00496            pos != datareader_set_.end(); ++pos) {
00497         (*pos)->get_ordered_data (data, sample_states, view_states, instance_states);
00498       }
00499 
00500       // Return list of readers in the order of the source timestamp of the received
00501       // samples from readers.
00502       data.get_datareaders (readers);
00503 
00504       return DDS::RETCODE_OK;
00505     }
00506   }
00507 #endif
00508 
00509   // Return set of datareaders.
00510   readers.length(0);
00511 
00512   for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00513        pos != datareader_set_.end(); ++pos) {
00514     if ((*pos)->have_sample_states(sample_states) &&
00515         (*pos)->have_view_states(view_states) &&
00516         (*pos)->have_instance_states(instance_states)) {
00517       push_back(readers, DDS::DataReader::_duplicate(pos->in()));
00518     }
00519   }
00520 
00521   return DDS::RETCODE_OK;
00522 }
00523 
00524 DDS::ReturnCode_t
00525 SubscriberImpl::notify_datareaders()
00526 {
00527   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00528                    guard,
00529                    this->si_lock_,
00530                    DDS::RETCODE_ERROR);
00531 
00532   DataReaderMap::iterator it;
00533 
00534   for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
00535     if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00536       DDS::DataReaderListener_var listener = it->second->get_listener();
00537       if (!CORBA::is_nil (listener)) {
00538         listener->on_data_available(it->second.in());
00539       }
00540 
00541       it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00542     }
00543   }
00544 
00545 #ifndef OPENDDS_NO_MULTI_TOPIC
00546   for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator it =
00547          multitopic_reader_map_.begin(); it != multitopic_reader_map_.end();
00548        ++it) {
00549     MultiTopicDataReaderBase* dri =
00550       dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
00551 
00552     if (!dri) {
00553       ACE_ERROR_RETURN((LM_ERROR,
00554         ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ")
00555         ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n")),
00556         ::DDS::RETCODE_ERROR);
00557     }
00558 
00559     if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00560       DDS::DataReaderListener_var listener = dri->get_listener();
00561       if (!CORBA::is_nil(listener)) {
00562         listener->on_data_available(dri);
00563       }
00564       dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00565     }
00566   }
00567 #endif
00568 
00569   return DDS::RETCODE_OK;
00570 }
00571 
00572 DDS::ReturnCode_t
00573 SubscriberImpl::set_qos(
00574   const DDS::SubscriberQos & qos)
00575 {
00576 
00577   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00578 
00579   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00580     if (qos_ == qos)
00581       return DDS::RETCODE_OK;
00582 
00583     // for the not changeable qos, it can be changed before enable
00584     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00585       return DDS::RETCODE_IMMUTABLE_POLICY;
00586 
00587     } else {
00588       qos_ = qos;
00589 
00590       DrIdToQosMap idToQosMap;
00591       {
00592         ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00593                          guard,
00594                          this->si_lock_,
00595                          DDS::RETCODE_ERROR);
00596         // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx
00597         DataReaderMap::iterator endIter = datareader_map_.end();
00598 
00599         for (DataReaderMap::iterator iter = datareader_map_.begin();
00600              iter != endIter; ++iter) {
00601           DataReaderImpl_rch reader = iter->second;
00602           reader->set_subscriber_qos (qos);
00603           DDS::DataReaderQos qos;
00604           reader->get_qos(qos);
00605           RepoId id = reader->get_subscription_id();
00606           std::pair<DrIdToQosMap::iterator, bool> pair
00607             = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
00608 
00609           if (pair.second == false) {
00610             GuidConverter converter(id);
00611             ACE_ERROR_RETURN((LM_ERROR,
00612                               ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
00613                               ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
00614                               OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR);
00615           }
00616         }
00617       }
00618 
00619       DrIdToQosMap::iterator iter = idToQosMap.begin();
00620 
00621       while (iter != idToQosMap.end()) {
00622         Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00623         const bool status
00624           = disco->update_subscription_qos(this->domain_id_,
00625                                            this->dp_id_,
00626                                            iter->first,
00627                                            iter->second,
00628                                            this->qos_);
00629 
00630         if (!status) {
00631           ACE_ERROR_RETURN((LM_ERROR,
00632                             ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
00633                             ACE_TEXT("failed. \n")),
00634                            DDS::RETCODE_ERROR);
00635         }
00636 
00637         ++iter;
00638       }
00639     }
00640 
00641     return DDS::RETCODE_OK;
00642 
00643   } else {
00644     return DDS::RETCODE_INCONSISTENT_POLICY;
00645   }
00646 }
00647 
00648 DDS::ReturnCode_t
00649 SubscriberImpl::get_qos(
00650   DDS::SubscriberQos & qos)
00651 {
00652   qos = qos_;
00653   return DDS::RETCODE_OK;
00654 }
00655 
00656 DDS::ReturnCode_t
00657 SubscriberImpl::set_listener(
00658   DDS::SubscriberListener_ptr a_listener,
00659   DDS::StatusMask             mask)
00660 {
00661   listener_mask_ = mask;
00662   //note: OK to duplicate  a nil object ref
00663   listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00664   return DDS::RETCODE_OK;
00665 }
00666 
00667 DDS::SubscriberListener_ptr
00668 SubscriberImpl::get_listener()
00669 {
00670   return DDS::SubscriberListener::_duplicate(listener_.in());
00671 }
00672 
00673 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00674 
00675 DDS::ReturnCode_t
00676 SubscriberImpl::begin_access()
00677 {
00678   if (enabled_ == false) {
00679     ACE_ERROR_RETURN((LM_ERROR,
00680                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
00681                       ACE_TEXT(" Subscriber is not enabled!\n")),
00682                      DDS::RETCODE_NOT_ENABLED);
00683   }
00684 
00685   if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00686     return DDS::RETCODE_OK;
00687   }
00688 
00689   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00690                    guard,
00691                    this->si_lock_,
00692                    DDS::RETCODE_ERROR);
00693 
00694   ++this->access_depth_;
00695 
00696   // We should only notify subscription on the first
00697   // and last change to the current change set:
00698   if (this->access_depth_ == 1) {
00699     for (DataReaderSet::iterator it = this->datareader_set_.begin();
00700          it != this->datareader_set_.end(); ++it) {
00701       (*it)->begin_access();
00702     }
00703   }
00704 
00705   return DDS::RETCODE_OK;
00706 }
00707 
00708 DDS::ReturnCode_t
00709 SubscriberImpl::end_access()
00710 {
00711   if (enabled_ == false) {
00712     ACE_ERROR_RETURN((LM_ERROR,
00713                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00714                       ACE_TEXT(" Publisher is not enabled!\n")),
00715                      DDS::RETCODE_NOT_ENABLED);
00716   }
00717 
00718   if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00719     return DDS::RETCODE_OK;
00720   }
00721 
00722   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00723                    guard,
00724                    this->si_lock_,
00725                    DDS::RETCODE_ERROR);
00726 
00727   if (this->access_depth_ == 0) {
00728     ACE_ERROR_RETURN((LM_ERROR,
00729                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00730                       ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00731                      DDS::RETCODE_PRECONDITION_NOT_MET);
00732   }
00733 
00734   --this->access_depth_;
00735 
00736   // We should only notify subscription on the first
00737   // and last change to the current change set:
00738   if (this->access_depth_ == 0) {
00739     for (DataReaderSet::iterator it = this->datareader_set_.begin();
00740          it != this->datareader_set_.end(); ++it) {
00741       (*it)->end_access();
00742     }
00743   }
00744 
00745   return DDS::RETCODE_OK;
00746 }
00747 
00748 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00749 
00750 DDS::DomainParticipant_ptr
00751 SubscriberImpl::get_participant()
00752 {
00753   return participant_.lock()._retn();
00754 }
00755 
00756 DDS::ReturnCode_t
00757 SubscriberImpl::set_default_datareader_qos(
00758   const DDS::DataReaderQos & qos)
00759 {
00760   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00761     default_datareader_qos_ = qos;
00762     return DDS::RETCODE_OK;
00763 
00764   } else {
00765     return DDS::RETCODE_INCONSISTENT_POLICY;
00766   }
00767 }
00768 
00769 DDS::ReturnCode_t
00770 SubscriberImpl::get_default_datareader_qos(
00771   DDS::DataReaderQos & qos)
00772 {
00773   qos = default_datareader_qos_;
00774   return DDS::RETCODE_OK;
00775 }
00776 
00777 DDS::ReturnCode_t
00778 SubscriberImpl::copy_from_topic_qos(
00779   DDS::DataReaderQos &  a_datareader_qos,
00780   const DDS::TopicQos & a_topic_qos)
00781 {
00782   if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
00783     return DDS::RETCODE_OK;
00784 
00785   } else {
00786     return DDS::RETCODE_INCONSISTENT_POLICY;
00787   }
00788 }
00789 
00790 DDS::ReturnCode_t
00791 SubscriberImpl::enable()
00792 {
00793   //According spec:
00794   // - Calling enable on an already enabled Entity returns OK and has no
00795   // effect.
00796   // - Calling enable on an Entity whose factory is not enabled will fail
00797   // and return PRECONDITION_NOT_MET.
00798 
00799   if (this->is_enabled()) {
00800     return DDS::RETCODE_OK;
00801   }
00802 
00803   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00804   if (!participant || participant->is_enabled() == false) {
00805     return DDS::RETCODE_PRECONDITION_NOT_MET;
00806   }
00807 
00808   dp_id_ = participant->get_id();
00809 
00810   if (this->monitor_) {
00811     this->monitor_->report();
00812   }
00813 
00814   this->set_enabled();
00815 
00816   if (qos_.entity_factory.autoenable_created_entities) {
00817     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
00818     DataReaderSet readers;
00819     readers_not_enabled_.swap(readers);
00820     for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) {
00821       (*it)->enable();
00822     }
00823   }
00824 
00825   return DDS::RETCODE_OK;
00826 }
00827 
00828 bool
00829 SubscriberImpl::is_clean() const
00830 {
00831   const bool sub_is_clean = datareader_map_.empty();
00832 
00833   if (!sub_is_clean && !TheTransientKludge->is_enabled()) {
00834     // Four BIT datareaders.
00835     return datareader_map_.size() == 4;
00836   }
00837 
00838   return sub_is_clean;
00839 }
00840 
00841 void
00842 SubscriberImpl::data_received(DataReaderImpl* reader)
00843 {
00844   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00845             guard,
00846             this->si_lock_);
00847   datareader_set_.insert(rchandle_from(reader));
00848 }
00849 
00850 DDS::ReturnCode_t
00851 SubscriberImpl::reader_enabled(const char*     topic_name,
00852                                DataReaderImpl* reader_ptr)
00853 {
00854   if (DCPS_debug_level >= 4) {
00855     ACE_DEBUG((LM_DEBUG,
00856                ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
00857                ACE_TEXT("datareader(topic_name=%C) enabled\n"),
00858                topic_name));
00859   }
00860 
00861   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR);
00862   DataReaderImpl_rch reader = rchandle_from(reader_ptr);
00863   readers_not_enabled_.erase(reader);
00864 
00865   this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
00866 
00867   if (this->monitor_) {
00868     this->monitor_->report();
00869   }
00870 
00871   return DDS::RETCODE_OK;
00872 }
00873 
00874 #ifndef OPENDDS_NO_MULTI_TOPIC
00875 DDS::ReturnCode_t
00876 SubscriberImpl::multitopic_reader_enabled(DDS::DataReader_ptr reader)
00877 {
00878   DDS::TopicDescription_var td = reader->get_topicdescription();
00879   CORBA::String_var topic = td->get_name();
00880   multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
00881   return DDS::RETCODE_OK;
00882 }
00883 
00884 void
00885 SubscriberImpl::remove_from_datareader_set(DataReaderImpl* reader)
00886 {
00887   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, si_lock_);
00888   datareader_set_.erase(rchandle_from(reader));
00889 }
00890 #endif
00891 
00892 DDS::SubscriberListener_ptr
00893 SubscriberImpl::listener_for(::DDS::StatusKind kind)
00894 {
00895   // per 2.1.4.3.1 Listener Access to Plain Communication Status
00896   // use this entities factory if listener is mask not enabled
00897   // for this kind.
00898   RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
00899   if (! participant)
00900     return 0;
00901 
00902   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
00903     return participant->listener_for(kind);
00904 
00905   } else {
00906     return DDS::SubscriberListener::_duplicate(listener_.in());
00907   }
00908 }
00909 
00910 unsigned int&
00911 SubscriberImpl::raw_latency_buffer_size()
00912 {
00913   return this->raw_latency_buffer_size_;
00914 }
00915 
00916 DataCollector<double>::OnFull&
00917 SubscriberImpl::raw_latency_buffer_type()
00918 {
00919   return this->raw_latency_buffer_type_;
00920 }
00921 
00922 void
00923 SubscriberImpl::get_subscription_ids(SubscriptionIdVec& subs)
00924 {
00925   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00926                    guard,
00927                    this->si_lock_,
00928                    );
00929 
00930   subs.reserve(datareader_map_.size());
00931   for (DataReaderMap::iterator iter = datareader_map_.begin();
00932        iter != datareader_map_.end();
00933        ++iter) {
00934     subs.push_back(iter->second->get_subscription_id());
00935   }
00936 }
00937 
00938 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00939 void
00940 SubscriberImpl::update_ownership_strength (const PublicationId& pub_id,
00941                                            const CORBA::Long&   ownership_strength)
00942 {
00943   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00944                    guard,
00945                    this->si_lock_,
00946                    );
00947 
00948   for (DataReaderMap::iterator iter = datareader_map_.begin();
00949        iter != datareader_map_.end();
00950        ++iter) {
00951     if (!iter->second->is_bit()) {
00952       iter->second->update_ownership_strength(pub_id, ownership_strength);
00953     }
00954   }
00955 }
00956 #endif
00957 
00958 
00959 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00960 void
00961 SubscriberImpl::coherent_change_received (RepoId&         publisher_id,
00962                                           DataReaderImpl* reader,
00963                                           Coherent_State& group_state)
00964 {
00965   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00966             guard,
00967             this->si_lock_);
00968 
00969   // Verify if all readers complete the coherent changes. The result
00970   // is either COMPLETED or REJECTED.
00971   group_state = COMPLETED;
00972   DataReaderSet::const_iterator endIter = datareader_set_.end();
00973   for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00974        iter != endIter; ++iter) {
00975 
00976     Coherent_State state = COMPLETED;
00977     (*iter)->coherent_change_received (publisher_id, state);
00978     if (state == NOT_COMPLETED_YET) {
00979       group_state = NOT_COMPLETED_YET;
00980       return;
00981     }
00982     else if (state == REJECTED) {
00983       group_state = REJECTED;
00984     }
00985   }
00986 
00987   PublicationId writerId = GUID_UNKNOWN;
00988   for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00989        iter != endIter; ++iter) {
00990     if (group_state == COMPLETED) {
00991       (*iter)->accept_coherent (writerId, publisher_id);
00992     }
00993     else {   //REJECTED
00994       (*iter)->reject_coherent (writerId, publisher_id);
00995     }
00996   }
00997 
00998   if (group_state == COMPLETED) {
00999     for (DataReaderSet::const_iterator iter = datareader_set_.begin();
01000          iter != endIter; ++iter) {
01001       (*iter)->coherent_changes_completed (reader);
01002       (*iter)->reset_coherent_info (writerId, publisher_id);
01003     }
01004   }
01005 }
01006 #endif
01007 
01008 RcHandle<EntityImpl>
01009 SubscriberImpl::parent() const
01010 {
01011   return this->participant_.lock();
01012 }
01013 
01014 bool
01015 SubscriberImpl::validate_datareader_qos(const DDS::DataReaderQos & qos,
01016                                         const DDS::DataReaderQos & default_qos,
01017                                         DDS::Topic_ptr             a_topic,
01018                                         DDS::DataReaderQos &       dr_qos,
01019                                         bool                       mt)
01020 {
01021 
01022 
01023   if (qos == DATAREADER_QOS_DEFAULT) {
01024     dr_qos = default_qos;
01025 
01026   } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
01027 
01028 #ifndef OPENDDS_NO_MULTI_TOPIC
01029     if (mt) {
01030       if (DCPS_debug_level) {
01031         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
01032                    ACE_TEXT("SubscriberImpl::create_datareader, ")
01033                    ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
01034                    ACE_TEXT("to create a MultiTopic DataReader.\n")));
01035       }
01036       return DDS::DataReader::_nil();
01037     }
01038 #else
01039     ACE_UNUSED_ARG(mt);
01040 #endif
01041     DDS::TopicQos topic_qos;
01042     a_topic->get_qos(topic_qos);
01043 
01044     dr_qos = default_qos;
01045 
01046     Qos_Helper::copy_from_topic_qos(dr_qos,
01047                                     topic_qos);
01048 
01049   } else {
01050     dr_qos = qos;
01051   }
01052 
01053   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false);
01054   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false);
01055   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false);
01056 
01057   if (!Qos_Helper::valid(dr_qos)) {
01058     ACE_ERROR((LM_ERROR,
01059                ACE_TEXT("(%P|%t) ERROR: ")
01060                ACE_TEXT("SubscriberImpl::create_datareader, ")
01061                ACE_TEXT("invalid qos.\n")));
01062     return false;
01063   }
01064 
01065   if (!Qos_Helper::consistent(dr_qos)) {
01066     ACE_ERROR((LM_ERROR,
01067                ACE_TEXT("(%P|%t) ERROR: ")
01068                ACE_TEXT("SubscriberImpl::create_datareader, ")
01069                ACE_TEXT("inconsistent qos.\n")));
01070     return false;
01071   }
01072 
01073 
01074   return true;
01075 }
01076 
01077 
01078 } // namespace DCPS
01079 } // namespace OpenDDS
01080 
01081 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1