SubscriberImpl.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "debug.h"
00010 #include "SubscriberImpl.h"
00011 #include "FeatureDisabledQosCheck.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Qos_Helper.h"
00014 #include "GuidConverter.h"
00015 #include "TopicImpl.h"
00016 #include "MonitorFactory.h"
00017 #include "DataReaderImpl.h"
00018 #include "Service_Participant.h"
00019 #include "dds/DdsDcpsTypeSupportExtC.h"
00020 #include "TopicDescriptionImpl.h"
00021 #include "Marked_Default_Qos.h"
00022 #include "Transient_Kludge.h"
00023 #include "ContentFilteredTopicImpl.h"
00024 #include "MultiTopicImpl.h"
00025 #include "GroupRakeData.h"
00026 #include "MultiTopicDataReaderBase.h"
00027 #include "Util.h"
00028 #include "dds/DCPS/transport/framework/TransportImpl.h"
00029 #include "dds/DCPS/transport/framework/DataLinkSet.h"
00030 
00031 #include "tao/debug.h"
00032 
00033 #include "ace/Auto_Ptr.h"
00034 #include "ace/Vector_T.h"
00035 
00036 #include <stdexcept>
00037 
00038 namespace OpenDDS {
00039 namespace DCPS {
00040 
00041 // Implementation skeleton constructor
00042 SubscriberImpl::SubscriberImpl(DDS::InstanceHandle_t       handle,
00043                                const DDS::SubscriberQos &  qos,
00044                                DDS::SubscriberListener_ptr a_listener,
00045                                const DDS::StatusMask&      mask,
00046                                DomainParticipantImpl*      participant)
00047   : handle_(handle),
00048   qos_(qos),
00049   default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
00050   listener_mask_(mask),
00051   participant_(participant),
00052   domain_id_(participant->get_domain_id()),
00053   raw_latency_buffer_size_(0),
00054   raw_latency_buffer_type_(DataCollector<double>::KeepOldest),
00055   monitor_(0),
00056   access_depth_ (0)
00057 {
00058   //Note: OK to duplicate a nil.
00059   listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00060 
00061   monitor_ = TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this);
00062 }
00063 
00064 // Implementation skeleton destructor
00065 SubscriberImpl::~SubscriberImpl()
00066 {
00067   //
00068   // The datareders should be deleted already before calling delete
00069   // subscriber.
00070   if (!is_clean()) {
00071     ACE_ERROR((LM_ERROR,
00072                ACE_TEXT("(%P|%t) ERROR: ")
00073                ACE_TEXT("SubscriberImpl::~SubscriberImpl, ")
00074                ACE_TEXT("some datareaders still exist.\n")));
00075   }
00076 }
00077 
00078 DDS::InstanceHandle_t
00079 SubscriberImpl::get_instance_handle()
00080 {
00081   return handle_;
00082 }
00083 
00084 bool
00085 SubscriberImpl::contains_reader(DDS::InstanceHandle_t a_handle)
00086 {
00087   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00088                    guard,
00089                    this->si_lock_,
00090                    false);
00091 
00092   for (DataReaderSet::iterator it(datareader_set_.begin());
00093        it != datareader_set_.end(); ++it) {
00094     if (a_handle == (*it)->get_instance_handle())
00095       return true;
00096   }
00097 
00098   return false;
00099 }
00100 
00101 DDS::DataReader_ptr
00102 SubscriberImpl::create_datareader(
00103   DDS::TopicDescription_ptr   a_topic_desc,
00104   const DDS::DataReaderQos &  qos,
00105   DDS::DataReaderListener_ptr a_listener,
00106   DDS::StatusMask             mask)
00107 {
00108   if (CORBA::is_nil(a_topic_desc)) {
00109     ACE_ERROR((LM_ERROR,
00110                ACE_TEXT("(%P|%t) ERROR: ")
00111                ACE_TEXT("SubscriberImpl::create_datareader, ")
00112                ACE_TEXT("topic desc is nil.\n")));
00113     return DDS::DataReader::_nil();
00114   }
00115 
00116   DDS::DataReaderQos dr_qos;
00117 
00118   TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
00119 
00120 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00121   ContentFilteredTopicImpl* cft = 0;
00122 #endif
00123 #ifndef OPENDDS_NO_MULTI_TOPIC
00124   MultiTopicImpl* mt = 0;
00125 #else
00126   bool mt = false;
00127 #endif
00128 
00129   if (!topic_servant) {
00130 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00131     cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
00132     if (cft) {
00133       DDS::Topic_var related;
00134       related = cft->get_related_topic();
00135       topic_servant = dynamic_cast<TopicImpl*>(related.in());
00136     }
00137     else
00138 #endif
00139     {
00140 #ifndef OPENDDS_NO_MULTI_TOPIC
00141       mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
00142 #endif
00143     }
00144   }
00145 
00146   if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
00147     return DDS::DataReader::_nil();
00148 
00149 #ifndef OPENDDS_NO_MULTI_TOPIC
00150   if (mt) {
00151     try {
00152       DDS::DataReader_var dr =
00153         mt->get_type_support()->create_multitopic_datareader();
00154       MultiTopicDataReaderBase* mtdr =
00155         dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
00156       mtdr->init(dr_qos, a_listener, mask, this, mt);
00157       if (enabled_.value() && qos_.entity_factory.autoenable_created_entities) {
00158         if (dr->enable() != DDS::RETCODE_OK) {
00159           ACE_ERROR((LM_ERROR,
00160                      ACE_TEXT("(%P|%t) ERROR: ")
00161                      ACE_TEXT("SubscriberImpl::create_datareader, ")
00162                      ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
00163           return DDS::DataReader::_nil();
00164         }
00165         multitopic_reader_enabled(dr);
00166       }
00167       return dr._retn();
00168     } catch (const std::exception& e) {
00169       ACE_ERROR((LM_ERROR,
00170                  ACE_TEXT("(%P|%t) ERROR: ")
00171                  ACE_TEXT("SubscriberImpl::create_datareader, ")
00172                  ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
00173                  e.what()));
00174     }
00175     return DDS::DataReader::_nil();
00176   }
00177 #endif
00178 
00179   OpenDDS::DCPS::TypeSupport_ptr typesupport =
00180     topic_servant->get_type_support();
00181 
00182   if (0 == typesupport) {
00183     CORBA::String_var name = a_topic_desc->get_name();
00184     ACE_ERROR((LM_ERROR,
00185                ACE_TEXT("(%P|%t) ERROR: ")
00186                ACE_TEXT("SubscriberImpl::create_datareader, ")
00187                ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
00188                name.in()));
00189     return DDS::DataReader::_nil();
00190   }
00191 
00192   DDS::DataReader_var dr_obj = typesupport->create_datareader();
00193 
00194   DataReaderImpl* dr_servant =
00195     dynamic_cast<DataReaderImpl*>(dr_obj.in());
00196 
00197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00198   if (cft) {
00199     dr_servant->enable_filtering(cft);
00200   }
00201 #endif
00202 
00203   // Propagate the latency buffer data collection configuration.
00204   // @TODO: Determine whether we want to exclude the Builtin Topic
00205   //        readers from data gathering.
00206   dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_;
00207   dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_;
00208 
00209   dr_servant->init(topic_servant,
00210                    dr_qos,
00211                    a_listener,
00212                    mask,
00213                    participant_,
00214                    this,
00215                    dr_obj.in());
00216 
00217   if ((this->enabled_ == true)
00218       && (qos_.entity_factory.autoenable_created_entities == 1)) {
00219     DDS::ReturnCode_t ret
00220       = dr_servant->enable();
00221 
00222     if (ret != DDS::RETCODE_OK) {
00223       ACE_ERROR((LM_ERROR,
00224                  ACE_TEXT("(%P|%t) ERROR: ")
00225                  ACE_TEXT("SubscriberImpl::create_datareader, ")
00226                  ACE_TEXT("enable failed.\n")));
00227       return DDS::DataReader::_nil();
00228     }
00229   }
00230 
00231   // add created data reader to this' data reader container -
00232   // done in enable_reader
00233   return DDS::DataReader::_duplicate(dr_obj.in());
00234 
00235 }
00236 
00237 DDS::ReturnCode_t
00238 SubscriberImpl::delete_datareader(::DDS::DataReader_ptr a_datareader)
00239 {
00240   DBG_ENTRY_LVL("SubscriberImpl", "delete_datareader", 6);
00241 
00242   DataReaderImpl* dr_servant = dynamic_cast<DataReaderImpl*>(a_datareader);
00243 
00244   if (dr_servant) { // for MultiTopic this will be false
00245     DDS::Subscriber_var dr_subscriber(dr_servant->get_subscriber());
00246 
00247     if (dr_subscriber.in() != this) {
00248       RepoId id = dr_servant->get_subscription_id();
00249       GuidConverter converter(id);
00250       ACE_ERROR((LM_ERROR,
00251                  ACE_TEXT("(%P|%t) SubscriberImpl::delete_datareader: ")
00252                  ACE_TEXT("data reader %C doesn't belong to this subscriber.\n"),
00253                  OPENDDS_STRING(converter).c_str()));
00254       return DDS::RETCODE_PRECONDITION_NOT_MET;
00255     }
00256 
00257     int loans = dr_servant->num_zero_copies();
00258 
00259     if (0 != loans) {
00260       return DDS::RETCODE_PRECONDITION_NOT_MET;
00261     }
00262   }
00263   if (dr_servant) {
00264     // marks entity as deleted and stops future associating
00265     dr_servant->prepare_to_delete();
00266   }
00267 
00268   {
00269     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00270                      guard,
00271                      this->si_lock_,
00272                      DDS::RETCODE_ERROR);
00273 
00274     DataReaderMap::iterator it;
00275 
00276     for (it = datareader_map_.begin();
00277          it != datareader_map_.end();
00278          ++it) {
00279       if (it->second == dr_servant) {
00280         break;
00281       }
00282     }
00283 
00284     if (it == datareader_map_.end()) {
00285       DDS::TopicDescription_var td = a_datareader->get_topicdescription();
00286       CORBA::String_var topic_name = td->get_name();
00287 #ifndef OPENDDS_NO_MULTI_TOPIC
00288       OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00289         multitopic_reader_map_.find(topic_name.in());
00290       if (mt_iter != multitopic_reader_map_.end()) {
00291         DDS::DataReader_ptr ptr = mt_iter->second;
00292         dynamic_cast<MultiTopicDataReaderBase*>(ptr)->cleanup();
00293         multitopic_reader_map_.erase(mt_iter);
00294         return DDS::RETCODE_OK;
00295       }
00296 #endif
00297       RepoId id = dr_servant->get_subscription_id();
00298       GuidConverter converter(id);
00299       ACE_ERROR_RETURN((LM_ERROR,
00300                         ACE_TEXT("(%P|%t) ERROR: ")
00301                         ACE_TEXT("SubscriberImpl::delete_datareader: ")
00302                         ACE_TEXT("datareader(topic_name=%C) %C not found.\n"),
00303                         topic_name.in(),
00304                         OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR);
00305     }
00306 
00307     datareader_map_.erase(it);
00308     datareader_set_.erase(dr_servant);
00309   }
00310 
00311   if (this->monitor_) {
00312     this->monitor_->report();
00313   }
00314 
00315   RepoId subscription_id  = dr_servant->get_subscription_id();
00316   Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00317   if (!disco->remove_subscription(this->domain_id_,
00318                                   participant_->get_id(),
00319                                   subscription_id)) {
00320     ACE_ERROR_RETURN((LM_ERROR,
00321                       ACE_TEXT("(%P|%t) ERROR: ")
00322                       ACE_TEXT("SubscriberImpl::delete_datareader: ")
00323                       ACE_TEXT(" could not remove subscription from discovery.\n")),
00324                      ::DDS::RETCODE_ERROR);
00325   }
00326 
00327   // Call remove association before unregistering the datareader from the transport,
00328   // otherwise some callbacks resulted from remove_association may be lost.
00329   dr_servant->remove_all_associations();
00330   dr_servant->cleanup();
00331   // Decrease the ref count after the servant is removed
00332   // from the datareader map.
00333   dr_servant->_remove_ref();
00334   return DDS::RETCODE_OK;
00335 }
00336 
00337 DDS::ReturnCode_t
00338 SubscriberImpl::delete_contained_entities()
00339 {
00340   // mark that the entity is being deleted
00341   set_deleted(true);
00342 
00343   ACE_Vector<DDS::DataReader_ptr> drs;
00344 
00345 #ifndef OPENDDS_NO_MULTI_TOPIC
00346   {
00347     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00348                      guard,
00349                      this->si_lock_,
00350                      DDS::RETCODE_ERROR);
00351     for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00352            multitopic_reader_map_.begin();
00353          mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
00354       drs.push_back(mt_iter->second);
00355     }
00356   }
00357 
00358   for (size_t i = 0; i < drs.size(); ++i) {
00359     const DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00360     if (ret != DDS::RETCODE_OK) {
00361       ACE_ERROR_RETURN((LM_ERROR,
00362                         ACE_TEXT("(%P|%t) ERROR: ")
00363                         ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00364                         ACE_TEXT("failed to delete datareader\n")),
00365                        ret);
00366     }
00367   }
00368   drs.clear();
00369 #endif
00370 
00371   {
00372     ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00373                      guard,
00374                      this->si_lock_,
00375                      DDS::RETCODE_ERROR);
00376     DataReaderMap::iterator it;
00377     DataReaderMap::iterator itEnd = datareader_map_.end();
00378 
00379     for (it = datareader_map_.begin(); it != itEnd; ++it) {
00380       drs.push_back(it->second);
00381     }
00382   }
00383 
00384   size_t num_rds = drs.size();
00385 
00386   for (size_t i = 0; i < num_rds; ++i) {
00387     DDS::ReturnCode_t ret = delete_datareader(drs[i]);
00388 
00389     if (ret != DDS::RETCODE_OK) {
00390       ACE_ERROR_RETURN((LM_ERROR,
00391                         ACE_TEXT("(%P|%t) ERROR: ")
00392                         ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
00393                         ACE_TEXT("failed to delete datareader\n")),
00394                        ret);
00395     }
00396   }
00397 
00398   // the subscriber can now start creating new publications
00399   set_deleted(false);
00400 
00401   return DDS::RETCODE_OK;
00402 }
00403 
00404 DDS::DataReader_ptr
00405 SubscriberImpl::lookup_datareader(
00406   const char * topic_name)
00407 {
00408   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00409                    guard,
00410                    this->si_lock_,
00411                    DDS::DataReader::_nil());
00412 
00413   // If multiple entries whose key is "topic_name" then which one is
00414   // returned ? Spec does not limit which one should give.
00415   DataReaderMap::iterator it = datareader_map_.find(topic_name);
00416 
00417   if (it == datareader_map_.end()) {
00418 #ifndef OPENDDS_NO_MULTI_TOPIC
00419     OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter =
00420       multitopic_reader_map_.find(topic_name);
00421     if (mt_iter != multitopic_reader_map_.end()) {
00422       return DDS::DataReader::_duplicate(mt_iter->second);
00423     }
00424 #endif
00425 
00426     if (DCPS_debug_level >= 2) {
00427       ACE_DEBUG((LM_DEBUG,
00428                  ACE_TEXT("(%P|%t) ")
00429                  ACE_TEXT("SubscriberImpl::lookup_datareader, ")
00430                  ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
00431                  topic_name));
00432     }
00433 
00434     return DDS::DataReader::_nil();
00435 
00436   } else {
00437     return DDS::DataReader::_duplicate(it->second);
00438   }
00439 }
00440 
00441 DDS::ReturnCode_t
00442 SubscriberImpl::get_datareaders(
00443   DDS::DataReaderSeq &   readers,
00444   DDS::SampleStateMask   sample_states,
00445   DDS::ViewStateMask     view_states,
00446   DDS::InstanceStateMask instance_states)
00447 {
00448   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00449                    guard,
00450                    this->si_lock_,
00451                    DDS::RETCODE_ERROR);
00452 
00453 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00454   // If access_scope is GROUP and ordered_access is true then return readers as
00455   // list which may contain same readers multiple times. Otherwise return readers
00456   // as set.
00457   if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) {
00458     if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
00459       return ::DDS::RETCODE_PRECONDITION_NOT_MET;
00460     }
00461 
00462     if (this->qos_.presentation.ordered_access) {
00463 
00464       GroupRakeData data;
00465       for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00466            pos != datareader_set_.end(); ++pos) {
00467         (*pos)->get_ordered_data (data, sample_states, view_states, instance_states);
00468       }
00469 
00470       // Return list of readers in the order of the source timestamp of the received
00471       // samples from readers.
00472       data.get_datareaders (readers);
00473 
00474       return DDS::RETCODE_OK;
00475     }
00476   }
00477 #endif
00478 
00479   // Return set of datareaders.
00480   int count(0);
00481   readers.length(count);
00482 
00483   for (DataReaderSet::const_iterator pos = datareader_set_.begin();
00484        pos != datareader_set_.end(); ++pos) {
00485     if ((*pos)->have_sample_states(sample_states) &&
00486         (*pos)->have_view_states(view_states) &&
00487         (*pos)->have_instance_states(instance_states)) {
00488       push_back(readers, (*pos)->get_dr_obj_ref());
00489       ++count;
00490     }
00491   }
00492 
00493   return DDS::RETCODE_OK;
00494 }
00495 
00496 DDS::ReturnCode_t
00497 SubscriberImpl::notify_datareaders()
00498 {
00499   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00500                    guard,
00501                    this->si_lock_,
00502                    DDS::RETCODE_ERROR);
00503 
00504   DataReaderMap::iterator it;
00505 
00506   for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
00507     if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00508       DDS::DataReaderListener_var listener = it->second->get_listener();
00509       if (!CORBA::is_nil (listener)) {
00510         listener->on_data_available(it->second);
00511       }
00512 
00513       it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00514     }
00515   }
00516 
00517 #ifndef OPENDDS_NO_MULTI_TOPIC
00518   for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator it =
00519          multitopic_reader_map_.begin(); it != multitopic_reader_map_.end();
00520        ++it) {
00521     MultiTopicDataReaderBase* dri =
00522       dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
00523     if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
00524       DDS::DataReaderListener_var listener = dri->get_listener();
00525       if (!CORBA::is_nil(listener)) {
00526         listener->on_data_available(dri);
00527       }
00528       dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
00529     }
00530   }
00531 #endif
00532 
00533   return DDS::RETCODE_OK;
00534 }
00535 
00536 DDS::ReturnCode_t
00537 SubscriberImpl::set_qos(
00538   const DDS::SubscriberQos & qos)
00539 {
00540 
00541   OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED);
00542 
00543   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00544     if (qos_ == qos)
00545       return DDS::RETCODE_OK;
00546 
00547     // for the not changeable qos, it can be changed before enable
00548     if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) {
00549       return DDS::RETCODE_IMMUTABLE_POLICY;
00550 
00551     } else {
00552       qos_ = qos;
00553 
00554       DrIdToQosMap idToQosMap;
00555       {
00556         ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00557                          guard,
00558                          this->si_lock_,
00559                          DDS::RETCODE_ERROR);
00560         // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx
00561         DataReaderMap::iterator endIter = datareader_map_.end();
00562 
00563         for (DataReaderMap::iterator iter = datareader_map_.begin();
00564              iter != endIter; ++iter) {
00565           DataReaderImpl* reader = iter->second;
00566           reader->set_subscriber_qos (qos);
00567           DDS::DataReaderQos qos;
00568           reader->get_qos(qos);
00569           RepoId id = reader->get_subscription_id();
00570           std::pair<DrIdToQosMap::iterator, bool> pair
00571             = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
00572 
00573           if (pair.second == false) {
00574             GuidConverter converter(id);
00575             ACE_ERROR_RETURN((LM_ERROR,
00576                               ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
00577                               ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
00578                               OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR);
00579           }
00580         }
00581       }
00582 
00583       DrIdToQosMap::iterator iter = idToQosMap.begin();
00584 
00585       while (iter != idToQosMap.end()) {
00586         Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
00587         const bool status
00588           = disco->update_subscription_qos(this->domain_id_,
00589                                            participant_->get_id(),
00590                                            iter->first,
00591                                            iter->second,
00592                                            this->qos_);
00593 
00594         if (!status) {
00595           ACE_ERROR_RETURN((LM_ERROR,
00596                             ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
00597                             ACE_TEXT("failed. \n")),
00598                            DDS::RETCODE_ERROR);
00599         }
00600 
00601         ++iter;
00602       }
00603     }
00604 
00605     return DDS::RETCODE_OK;
00606 
00607   } else {
00608     return DDS::RETCODE_INCONSISTENT_POLICY;
00609   }
00610 }
00611 
00612 DDS::ReturnCode_t
00613 SubscriberImpl::get_qos(
00614   DDS::SubscriberQos & qos)
00615 {
00616   qos = qos_;
00617   return DDS::RETCODE_OK;
00618 }
00619 
00620 DDS::ReturnCode_t
00621 SubscriberImpl::set_listener(
00622   DDS::SubscriberListener_ptr a_listener,
00623   DDS::StatusMask             mask)
00624 {
00625   listener_mask_ = mask;
00626   //note: OK to duplicate  a nil object ref
00627   listener_ = DDS::SubscriberListener::_duplicate(a_listener);
00628   return DDS::RETCODE_OK;
00629 }
00630 
00631 DDS::SubscriberListener_ptr
00632 SubscriberImpl::get_listener()
00633 {
00634   return DDS::SubscriberListener::_duplicate(listener_.in());
00635 }
00636 
00637 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00638 
00639 DDS::ReturnCode_t
00640 SubscriberImpl::begin_access()
00641 {
00642   if (enabled_ == false) {
00643     ACE_ERROR_RETURN((LM_ERROR,
00644                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
00645                       ACE_TEXT(" Subscriber is not enabled!\n")),
00646                      DDS::RETCODE_NOT_ENABLED);
00647   }
00648 
00649   if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00650     return DDS::RETCODE_OK;
00651   }
00652 
00653   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00654                    guard,
00655                    this->si_lock_,
00656                    DDS::RETCODE_ERROR);
00657 
00658   ++this->access_depth_;
00659 
00660   // We should only notify subscription on the first
00661   // and last change to the current change set:
00662   if (this->access_depth_ == 1) {
00663     for (DataReaderSet::iterator it = this->datareader_set_.begin();
00664          it != this->datareader_set_.end(); ++it) {
00665       (*it)->begin_access();
00666     }
00667   }
00668 
00669   return DDS::RETCODE_OK;
00670 }
00671 
00672 DDS::ReturnCode_t
00673 SubscriberImpl::end_access()
00674 {
00675   if (enabled_ == false) {
00676     ACE_ERROR_RETURN((LM_ERROR,
00677                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00678                       ACE_TEXT(" Publisher is not enabled!\n")),
00679                      DDS::RETCODE_NOT_ENABLED);
00680   }
00681 
00682   if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) {
00683     return DDS::RETCODE_OK;
00684   }
00685 
00686   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00687                    guard,
00688                    this->si_lock_,
00689                    DDS::RETCODE_ERROR);
00690 
00691   if (this->access_depth_ == 0) {
00692     ACE_ERROR_RETURN((LM_ERROR,
00693                       ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
00694                       ACE_TEXT(" No matching call to begin_coherent_changes!\n")),
00695                      DDS::RETCODE_PRECONDITION_NOT_MET);
00696   }
00697 
00698   --this->access_depth_;
00699 
00700   // We should only notify subscription on the first
00701   // and last change to the current change set:
00702   if (this->access_depth_ == 0) {
00703     for (DataReaderSet::iterator it = this->datareader_set_.begin();
00704          it != this->datareader_set_.end(); ++it) {
00705       (*it)->end_access();
00706     }
00707   }
00708 
00709   return DDS::RETCODE_OK;
00710 }
00711 
00712 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
00713 
00714 DDS::DomainParticipant_ptr
00715 SubscriberImpl::get_participant()
00716 {
00717   return DDS::DomainParticipant::_duplicate(participant_);
00718 }
00719 
00720 DDS::ReturnCode_t
00721 SubscriberImpl::set_default_datareader_qos(
00722   const DDS::DataReaderQos & qos)
00723 {
00724   if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
00725     default_datareader_qos_ = qos;
00726     return DDS::RETCODE_OK;
00727 
00728   } else {
00729     return DDS::RETCODE_INCONSISTENT_POLICY;
00730   }
00731 }
00732 
00733 DDS::ReturnCode_t
00734 SubscriberImpl::get_default_datareader_qos(
00735   DDS::DataReaderQos & qos)
00736 {
00737   qos = default_datareader_qos_;
00738   return DDS::RETCODE_OK;
00739 }
00740 
00741 DDS::ReturnCode_t
00742 SubscriberImpl::copy_from_topic_qos(
00743   DDS::DataReaderQos &  a_datareader_qos,
00744   const DDS::TopicQos & a_topic_qos)
00745 {
00746   if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
00747     return DDS::RETCODE_OK;
00748 
00749   } else {
00750     return DDS::RETCODE_INCONSISTENT_POLICY;
00751   }
00752 }
00753 
00754 DDS::ReturnCode_t
00755 SubscriberImpl::enable()
00756 {
00757   //According spec:
00758   // - Calling enable on an already enabled Entity returns OK and has no
00759   // effect.
00760   // - Calling enable on an Entity whose factory is not enabled will fail
00761   // and return PRECONDITION_NOT_MET.
00762 
00763   if (this->is_enabled()) {
00764     return DDS::RETCODE_OK;
00765   }
00766 
00767   if (this->participant_->is_enabled() == false) {
00768     return DDS::RETCODE_PRECONDITION_NOT_MET;
00769   }
00770 
00771   if (this->monitor_) {
00772     this->monitor_->report();
00773   }
00774 
00775   this->set_enabled();
00776   return DDS::RETCODE_OK;
00777 }
00778 
00779 bool
00780 SubscriberImpl::is_clean() const
00781 {
00782   bool sub_is_clean = datareader_map_.empty();
00783 
00784   if (!sub_is_clean && !TheTransientKludge->is_enabled()) {
00785     // Four BIT datareaders.
00786     return datareader_map_.size() == 4;
00787   }
00788 
00789   return sub_is_clean;
00790 }
00791 
00792 void
00793 SubscriberImpl::data_received(DataReaderImpl* reader)
00794 {
00795   ACE_GUARD(ACE_Recursive_Thread_Mutex,
00796             guard,
00797             this->si_lock_);
00798   datareader_set_.insert(reader);
00799 }
00800 
00801 DDS::ReturnCode_t
00802 SubscriberImpl::reader_enabled(const char*     topic_name,
00803                                DataReaderImpl* reader)
00804 {
00805   if (DCPS_debug_level >= 4) {
00806     ACE_DEBUG((LM_DEBUG,
00807                ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
00808                ACE_TEXT("datareader(topic_name=%C) enabled\n"),
00809                topic_name));
00810   }
00811 
00812   this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
00813 
00814   // Increase the ref count when the servant is referenced
00815   // by the datareader map.
00816   reader->_add_ref();
00817 
00818   if (this->monitor_) {
00819     this->monitor_->report();
00820   }
00821 
00822   return DDS::RETCODE_OK;
00823 }
00824 
00825 #ifndef OPENDDS_NO_MULTI_TOPIC
00826 DDS::ReturnCode_t
00827 SubscriberImpl::multitopic_reader_enabled(DDS::DataReader_ptr reader)
00828 {
00829   DDS::TopicDescription_var td = reader->get_topicdescription();
00830   CORBA::String_var topic = td->get_name();
00831   multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
00832   return DDS::RETCODE_OK;
00833 }
00834 
00835 void
00836 SubscriberImpl::remove_from_datareader_set(DataReaderImpl* reader)
00837 {
00838   ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, si_lock_);
00839   datareader_set_.erase(reader);
00840 }
00841 #endif
00842 
00843 DDS::SubscriberListener_ptr
00844 SubscriberImpl::listener_for(::DDS::StatusKind kind)
00845 {
00846   // per 2.1.4.3.1 Listener Access to Plain Communication Status
00847   // use this entities factory if listener is mask not enabled
00848   // for this kind.
00849   if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
00850     return participant_->listener_for(kind);
00851 
00852   } else {
00853     return DDS::SubscriberListener::_duplicate(listener_.in());
00854   }
00855 }
00856 
00857 unsigned int&
00858 SubscriberImpl::raw_latency_buffer_size()
00859 {
00860   return this->raw_latency_buffer_size_;
00861 }
00862 
00863 DataCollector<double>::OnFull&
00864 SubscriberImpl::raw_latency_buffer_type()
00865 {
00866   return this->raw_latency_buffer_type_;
00867 }
00868 
00869 void
00870 SubscriberImpl::get_subscription_ids(SubscriptionIdVec& subs)
00871 {
00872   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00873                    guard,
00874                    this->si_lock_,
00875                    );
00876 
00877   subs.reserve(datareader_map_.size());
00878   for (DataReaderMap::iterator iter = datareader_map_.begin();
00879        iter != datareader_map_.end();
00880        ++iter) {
00881     subs.push_back(iter->second->get_subscription_id());
00882   }
00883 }
00884 
00885 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00886 void
00887 SubscriberImpl::update_ownership_strength (const PublicationId& pub_id,
00888                                            const CORBA::Long&   ownership_strength)
00889 {
00890   ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,
00891                    guard,
00892                    this->si_lock_,
00893                    );
00894 
00895   for (DataReaderMap::iterator iter = datareader_map_.begin();
00896        iter != datareader_map_.end();
00897        ++iter) {
00898     if (!iter->second->is_bit()) {
00899       iter->second->update_ownership_strength(pub_id, ownership_strength);
00900     }
00901   }
00902 }
00903 #endif
00904 
00905 
00906 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
00907 void
00908 SubscriberImpl::coherent_change_received (RepoId&         publisher_id,
00909                                           DataReaderImpl* reader,
00910                                           Coherent_State& group_state)
00911 {
00912   // Verify if all readers complete the coherent changes. The result
00913   // is either COMPLETED or REJECTED.
00914   group_state = COMPLETED;
00915   DataReaderSet::const_iterator endIter = datareader_set_.end();
00916   for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00917        iter != endIter; ++iter) {
00918 
00919     Coherent_State state = COMPLETED;
00920     (*iter)->coherent_change_received (publisher_id, state);
00921     if (state == NOT_COMPLETED_YET) {
00922       group_state = state;
00923       return;
00924     }
00925     else if (state == REJECTED) {
00926       group_state = REJECTED;
00927     }
00928   }
00929 
00930   PublicationId writerId = GUID_UNKNOWN;
00931   for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00932        iter != endIter; ++iter) {
00933     if (group_state == COMPLETED) {
00934       (*iter)->accept_coherent (writerId, publisher_id);
00935     }
00936     else {   //REJECTED
00937       (*iter)->reject_coherent (writerId, publisher_id);
00938     }
00939   }
00940 
00941   if (group_state == COMPLETED) {
00942     for (DataReaderSet::const_iterator iter = datareader_set_.begin();
00943          iter != endIter; ++iter) {
00944       (*iter)->coherent_changes_completed (reader);
00945       (*iter)->reset_coherent_info (writerId, publisher_id);
00946     }
00947   }
00948 }
00949 #endif
00950 
00951 EntityImpl*
00952 SubscriberImpl::parent() const
00953 {
00954   return this->participant_;
00955 }
00956 
00957 bool
00958 SubscriberImpl::validate_datareader_qos(const DDS::DataReaderQos & qos,
00959                                         const DDS::DataReaderQos & default_qos,
00960                                         DDS::Topic_ptr             a_topic,
00961                                         DDS::DataReaderQos &       dr_qos,
00962                                         bool                       mt)
00963 {
00964 
00965 
00966   if (qos == DATAREADER_QOS_DEFAULT) {
00967     dr_qos = default_qos;
00968 
00969   } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
00970 
00971 #ifndef OPENDDS_NO_MULTI_TOPIC
00972     if (mt) {
00973       if (DCPS_debug_level) {
00974         ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00975                    ACE_TEXT("SubscriberImpl::create_datareader, ")
00976                    ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
00977                    ACE_TEXT("to create a MultiTopic DataReader.\n")));
00978       }
00979       return DDS::DataReader::_nil();
00980     }
00981 #else
00982     ACE_UNUSED_ARG(mt);
00983 #endif
00984     DDS::TopicQos topic_qos;
00985     a_topic->get_qos(topic_qos);
00986 
00987     dr_qos = default_qos;
00988 
00989     Qos_Helper::copy_from_topic_qos(dr_qos,
00990                                     topic_qos);
00991 
00992   } else {
00993     dr_qos = qos;
00994   }
00995 
00996   OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false);
00997   OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false);
00998   OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false);
00999 
01000   if (!Qos_Helper::valid(dr_qos)) {
01001     ACE_ERROR((LM_ERROR,
01002                ACE_TEXT("(%P|%t) ERROR: ")
01003                ACE_TEXT("SubscriberImpl::create_datareader, ")
01004                ACE_TEXT("invalid qos.\n")));
01005     return false;
01006   }
01007 
01008   if (!Qos_Helper::consistent(dr_qos)) {
01009     ACE_ERROR((LM_ERROR,
01010                ACE_TEXT("(%P|%t) ERROR: ")
01011                ACE_TEXT("SubscriberImpl::create_datareader, ")
01012                ACE_TEXT("inconsistent qos.\n")));
01013     return false;
01014   }
01015 
01016 
01017   return true;
01018 }
01019 
01020 
01021 } // namespace DCPS
01022 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7