#include <SubscriberImpl.h>
Public Member Functions | |
SubscriberImpl (DDS::InstanceHandle_t handle, const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant) | |
virtual | ~SubscriberImpl () |
virtual DDS::InstanceHandle_t | get_instance_handle () |
bool | contains_reader (DDS::InstanceHandle_t a_handle) |
virtual DDS::DataReader_ptr | create_datareader (DDS::TopicDescription_ptr a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask) |
virtual DDS::ReturnCode_t | delete_datareader (DDS::DataReader_ptr a_datareader) |
virtual DDS::ReturnCode_t | delete_contained_entities () |
virtual DDS::DataReader_ptr | lookup_datareader (const char *topic_name) |
virtual DDS::ReturnCode_t | get_datareaders (DDS::DataReaderSeq &readers, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states) |
virtual DDS::ReturnCode_t | notify_datareaders () |
virtual DDS::ReturnCode_t | set_qos (const DDS::SubscriberQos &qos) |
virtual DDS::ReturnCode_t | get_qos (DDS::SubscriberQos &qos) |
virtual DDS::ReturnCode_t | set_listener (DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask) |
virtual DDS::SubscriberListener_ptr | get_listener () |
virtual DDS::ReturnCode_t | begin_access () |
virtual DDS::ReturnCode_t | end_access () |
virtual DDS::DomainParticipant_ptr | get_participant () |
virtual DDS::ReturnCode_t | set_default_datareader_qos (const DDS::DataReaderQos &qos) |
virtual DDS::ReturnCode_t | get_default_datareader_qos (DDS::DataReaderQos &qos) |
virtual DDS::ReturnCode_t | copy_from_topic_qos (DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos) |
virtual DDS::ReturnCode_t | enable () |
bool | is_clean () const |
void | data_received (DataReaderImpl *reader) |
DDS::ReturnCode_t | reader_enabled (const char *topic_name, DataReaderImpl *reader) |
DDS::ReturnCode_t | multitopic_reader_enabled (DDS::DataReader_ptr reader) |
void | remove_from_datareader_set (DataReaderImpl *reader) |
DDS::SubscriberListener_ptr | listener_for (DDS::StatusKind kind) |
typedef | OPENDDS_VECTOR (RepoId) SubscriptionIdVec |
void | get_subscription_ids (SubscriptionIdVec &subs) |
void | update_ownership_strength (const PublicationId &pub_id, const CORBA::Long &ownership_strength) |
void | coherent_change_received (RepoId &publisher_id, DataReaderImpl *reader, Coherent_State &group_state) |
virtual RcHandle< EntityImpl > | parent () const |
Raw Latency Statistics Configuration Interfaces | |
unsigned int & | raw_latency_buffer_size () |
Configure the size of the raw data collection buffer. | |
DataCollector< double >::OnFull & | raw_latency_buffer_type () |
Configure the type of the raw data collection buffer. | |
Static Public Member Functions | |
static bool | validate_datareader_qos (const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt) |
Private Member Functions | |
typedef | OPENDDS_MULTIMAP (OPENDDS_STRING, DataReaderImpl_rch) DataReaderMap |
typedef | OPENDDS_SET (DataReaderImpl_rch) DataReaderSet |
typedef | OPENDDS_MAP_CMP (RepoId, DDS::DataReaderQos, GUID_tKeyLessThan) DrIdToQosMap |
DataReader id to qos map. | |
OPENDDS_MAP (OPENDDS_STRING, DDS::DataReader_var) multitopic_reader_map_ | |
Private Attributes | |
DDS::InstanceHandle_t | handle_ |
DDS::SubscriberQos | qos_ |
DDS::DataReaderQos | default_datareader_qos_ |
DDS::StatusMask | listener_mask_ |
DDS::SubscriberListener_var | listener_ |
DataReaderSet | readers_not_enabled_ |
DataReaderMap | datareader_map_ |
DataReaderSet | datareader_set_ |
WeakRcHandle < DomainParticipantImpl > | participant_ |
DDS::DomainId_t | domain_id_ |
RepoId | dp_id_ |
unsigned int | raw_latency_buffer_size_ |
Bound (or initial reservation) of raw latency buffers. | |
DataCollector< double >::OnFull | raw_latency_buffer_type_ |
Type of raw latency data buffers. | |
ACE_Recursive_Thread_Mutex | si_lock_ |
this lock protects the data structures in this class. | |
Monitor * | monitor_ |
Monitor object for this entity. | |
int | access_depth_ |
Definition at line 36 of file SubscriberImpl.h.
OpenDDS::DCPS::SubscriberImpl::SubscriberImpl | ( | DDS::InstanceHandle_t | handle, | |
const DDS::SubscriberQos & | qos, | |||
DDS::SubscriberListener_ptr | a_listener, | |||
const DDS::StatusMask & | mask, | |||
DomainParticipantImpl * | participant | |||
) |
Definition at line 43 of file SubscriberImpl.cpp.
References CORBA::LocalObject::_duplicate(), listener_, monitor_, and TheServiceParticipant.
00048 : handle_(handle), 00049 qos_(qos), 00050 default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()), 00051 listener_mask_(mask), 00052 participant_(*participant), 00053 domain_id_(participant->get_domain_id()), 00054 raw_latency_buffer_size_(0), 00055 raw_latency_buffer_type_(DataCollector<double>::KeepOldest), 00056 monitor_(0), 00057 access_depth_ (0) 00058 { 00059 //Note: OK to duplicate a nil. 00060 listener_ = DDS::SubscriberListener::_duplicate(a_listener); 00061 00062 monitor_ = TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this); 00063 }
OpenDDS::DCPS::SubscriberImpl::~SubscriberImpl | ( | ) | [virtual] |
Definition at line 65 of file SubscriberImpl.cpp.
References ACE_TEXT(), datareader_map_, is_clean(), and LM_ERROR.
00066 { 00067 // The datareaders should be deleted already before calling delete 00068 // subscriber. 00069 if (!is_clean()) { 00070 ACE_ERROR((LM_ERROR, 00071 ACE_TEXT("(%P|%t) ERROR: ") 00072 ACE_TEXT("SubscriberImpl::~SubscriberImpl, ") 00073 ACE_TEXT("%B datareaders still exist.\n"), 00074 datareader_map_.size ())); 00075 } 00076 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::begin_access | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 676 of file SubscriberImpl.cpp.
References access_depth_, ACE_TEXT(), datareader_set_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::GROUP_PRESENTATION_QOS, LM_ERROR, DDS::SubscriberQos::presentation, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and si_lock_.
00677 { 00678 if (enabled_ == false) { 00679 ACE_ERROR_RETURN((LM_ERROR, 00680 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:") 00681 ACE_TEXT(" Subscriber is not enabled!\n")), 00682 DDS::RETCODE_NOT_ENABLED); 00683 } 00684 00685 if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) { 00686 return DDS::RETCODE_OK; 00687 } 00688 00689 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00690 guard, 00691 this->si_lock_, 00692 DDS::RETCODE_ERROR); 00693 00694 ++this->access_depth_; 00695 00696 // We should only notify subscription on the first 00697 // and last change to the current change set: 00698 if (this->access_depth_ == 1) { 00699 for (DataReaderSet::iterator it = this->datareader_set_.begin(); 00700 it != this->datareader_set_.end(); ++it) { 00701 (*it)->begin_access(); 00702 } 00703 } 00704 00705 return DDS::RETCODE_OK; 00706 }
void OpenDDS::DCPS::SubscriberImpl::coherent_change_received | ( | RepoId & | publisher_id, | |
DataReaderImpl * | reader, | |||
Coherent_State & | group_state | |||
) |
Definition at line 961 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::COMPLETED, datareader_set_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, si_lock_, and state.
00964 { 00965 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00966 guard, 00967 this->si_lock_); 00968 00969 // Verify if all readers complete the coherent changes. The result 00970 // is either COMPLETED or REJECTED. 00971 group_state = COMPLETED; 00972 DataReaderSet::const_iterator endIter = datareader_set_.end(); 00973 for (DataReaderSet::const_iterator iter = datareader_set_.begin(); 00974 iter != endIter; ++iter) { 00975 00976 Coherent_State state = COMPLETED; 00977 (*iter)->coherent_change_received (publisher_id, state); 00978 if (state == NOT_COMPLETED_YET) { 00979 group_state = NOT_COMPLETED_YET; 00980 return; 00981 } 00982 else if (state == REJECTED) { 00983 group_state = REJECTED; 00984 } 00985 } 00986 00987 PublicationId writerId = GUID_UNKNOWN; 00988 for (DataReaderSet::const_iterator iter = datareader_set_.begin(); 00989 iter != endIter; ++iter) { 00990 if (group_state == COMPLETED) { 00991 (*iter)->accept_coherent (writerId, publisher_id); 00992 } 00993 else { //REJECTED 00994 (*iter)->reject_coherent (writerId, publisher_id); 00995 } 00996 } 00997 00998 if (group_state == COMPLETED) { 00999 for (DataReaderSet::const_iterator iter = datareader_set_.begin(); 01000 iter != endIter; ++iter) { 01001 (*iter)->coherent_changes_completed (reader); 01002 (*iter)->reset_coherent_info (writerId, publisher_id); 01003 } 01004 } 01005 }
bool OpenDDS::DCPS::SubscriberImpl::contains_reader | ( | DDS::InstanceHandle_t | a_handle | ) |
Definition at line 85 of file SubscriberImpl.cpp.
References datareader_set_, and si_lock_.
00086 { 00087 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00088 guard, 00089 this->si_lock_, 00090 false); 00091 00092 for (DataReaderSet::iterator it(datareader_set_.begin()); 00093 it != datareader_set_.end(); ++it) { 00094 if (a_handle == (*it)->get_instance_handle()) 00095 return true; 00096 } 00097 00098 return false; 00099 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::copy_from_topic_qos | ( | DDS::DataReaderQos & | a_datareader_qos, | |
const DDS::TopicQos & | a_topic_qos | |||
) | [virtual] |
Definition at line 778 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.
Referenced by validate_datareader_qos().
00781 { 00782 if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) { 00783 return DDS::RETCODE_OK; 00784 00785 } else { 00786 return DDS::RETCODE_INCONSISTENT_POLICY; 00787 } 00788 }
DDS::DataReader_ptr OpenDDS::DCPS::SubscriberImpl::create_datareader | ( | DDS::TopicDescription_ptr | a_topic_desc, | |
const DDS::DataReaderQos & | qos, | |||
DDS::DataReaderListener_ptr | a_listener, | |||
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 102 of file SubscriberImpl.cpp.
References CORBA::LocalObject::_duplicate(), CORBA::LocalObject::_nil(), ACE_TEXT(), default_datareader_qos_, OpenDDS::DCPS::DataReaderImpl::enable(), OpenDDS::DCPS::DataReaderImpl::enable_filtering(), OpenDDS::DCPS::EntityImpl::enabled_, DDS::SubscriberQos::entity_factory, OpenDDS::DCPS::ContentFilteredTopicImpl::get_related_topic(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::DataReaderImpl::init(), OpenDDS::DCPS::MultiTopicDataReaderBase::init(), CORBA::is_nil(), LM_ERROR, LM_WARNING, OpenDDS::DCPS::WeakRcHandle< T >::lock(), multitopic_reader_enabled(), participant_, qos_, OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size(), raw_latency_buffer_size_, OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type(), raw_latency_buffer_type_, OpenDDS::DCPS::rchandle_from(), readers_not_enabled_, DDS::RETCODE_OK, si_lock_, validate_datareader_qos(), and ACE_Atomic_Op< ACE_LOCK, TYPE >::value().
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00107 { 00108 if (CORBA::is_nil(a_topic_desc)) { 00109 ACE_ERROR((LM_ERROR, 00110 ACE_TEXT("(%P|%t) ERROR: ") 00111 ACE_TEXT("SubscriberImpl::create_datareader, ") 00112 ACE_TEXT("topic desc is nil.\n"))); 00113 return DDS::DataReader::_nil(); 00114 } 00115 00116 DDS::DataReaderQos dr_qos; 00117 RcHandle<DomainParticipantImpl> participant = this->participant_.lock(); 00118 if (!participant) 00119 return DDS::DataReader::_nil(); 00120 00121 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc); 00122 00123 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00124 ContentFilteredTopicImpl* cft = 0; 00125 #endif 00126 #ifndef OPENDDS_NO_MULTI_TOPIC 00127 MultiTopicImpl* mt = 0; 00128 #else 00129 bool mt = false; 00130 #endif 00131 00132 if (!topic_servant) { 00133 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00134 cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc); 00135 if (cft) { 00136 DDS::Topic_var related; 00137 related = cft->get_related_topic(); 00138 topic_servant = dynamic_cast<TopicImpl*>(related.in()); 00139 } 00140 else 00141 #endif 00142 { 00143 #ifndef OPENDDS_NO_MULTI_TOPIC 00144 mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc); 00145 #endif 00146 } 00147 } 00148 00149 if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt)) 00150 return DDS::DataReader::_nil(); 00151 00152 #ifndef OPENDDS_NO_MULTI_TOPIC 00153 if (mt) { 00154 try { 00155 DDS::DataReader_var dr = 00156 mt->get_type_support()->create_multitopic_datareader(); 00157 MultiTopicDataReaderBase* mtdr = 00158 dynamic_cast<MultiTopicDataReaderBase*>(dr.in()); 00159 mtdr->init(dr_qos, a_listener, mask, this, mt); 00160 if (enabled_.value() && qos_.entity_factory.autoenable_created_entities) { 00161 if (dr->enable() != DDS::RETCODE_OK) { 00162 ACE_ERROR((LM_ERROR, 00163 ACE_TEXT("(%P|%t) ERROR: ") 00164 ACE_TEXT("SubscriberImpl::create_datareader, ") 00165 ACE_TEXT("enable of MultiTopicDataReader failed.\n"))); 00166 return DDS::DataReader::_nil(); 00167 } 00168 multitopic_reader_enabled(dr); 00169 } 00170 return dr._retn(); 00171 } catch (const std::exception& e) { 00172 ACE_ERROR((LM_ERROR, 00173 ACE_TEXT("(%P|%t) ERROR: ") 00174 ACE_TEXT("SubscriberImpl::create_datareader, ") 00175 ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"), 00176 e.what())); 00177 } 00178 return DDS::DataReader::_nil(); 00179 } 00180 #endif 00181 00182 OpenDDS::DCPS::TypeSupport_ptr typesupport = 00183 topic_servant->get_type_support(); 00184 00185 if (0 == typesupport) { 00186 CORBA::String_var name = a_topic_desc->get_name(); 00187 ACE_ERROR((LM_ERROR, 00188 ACE_TEXT("(%P|%t) ERROR: ") 00189 ACE_TEXT("SubscriberImpl::create_datareader, ") 00190 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"), 00191 name.in())); 00192 return DDS::DataReader::_nil(); 00193 } 00194 00195 DDS::DataReader_var dr_obj = typesupport->create_datareader(); 00196 00197 DataReaderImpl* dr_servant = 00198 dynamic_cast<DataReaderImpl*>(dr_obj.in()); 00199 00200 if (dr_servant == 0) { 00201 ACE_ERROR((LM_ERROR, 00202 ACE_TEXT("(%P|%t) ERROR: ") 00203 ACE_TEXT("SubscriberImpl::create_datareader, ") 00204 ACE_TEXT("servant is nil.\n"))); 00205 return DDS::DataReader::_nil(); 00206 } 00207 00208 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00209 if (cft) { 00210 dr_servant->enable_filtering(cft); 00211 } 00212 #endif 00213 00214 // Propagate the latency buffer data collection configuration. 00215 // @TODO: Determine whether we want to exclude the Builtin Topic 00216 // readers from data gathering. 00217 dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_; 00218 dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_; 00219 00220 00221 dr_servant->init(topic_servant, 00222 dr_qos, 00223 a_listener, 00224 mask, 00225 participant.in(), 00226 this); 00227 00228 if ((this->enabled_ == true) && (qos_.entity_factory.autoenable_created_entities)) { 00229 const DDS::ReturnCode_t ret = dr_servant->enable(); 00230 00231 if (ret != DDS::RETCODE_OK) { 00232 ACE_ERROR((LM_WARNING, 00233 ACE_TEXT("(%P|%t) WARNING: ") 00234 ACE_TEXT("SubscriberImpl::create_datareader, ") 00235 ACE_TEXT("enable failed.\n"))); 00236 return DDS::DataReader::_nil(); 00237 } 00238 } else { 00239 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, 0); 00240 readers_not_enabled_.insert(rchandle_from(dr_servant)); 00241 } 00242 00243 // add created data reader to this' data reader container - 00244 // done in enable_reader 00245 return DDS::DataReader::_duplicate(dr_obj.in()); 00246 }
void OpenDDS::DCPS::SubscriberImpl::data_received | ( | DataReaderImpl * | reader | ) |
Definition at line 842 of file SubscriberImpl.cpp.
References datareader_set_, OpenDDS::DCPS::rchandle_from(), and si_lock_.
00843 { 00844 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00845 guard, 00846 this->si_lock_); 00847 datareader_set_.insert(rchandle_from(reader)); 00848 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::delete_contained_entities | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 370 of file SubscriberImpl.cpp.
References ACE_TEXT(), ACE_Vector< T, DEFAULT_SIZE >::clear(), datareader_map_, delete_datareader(), LM_ERROR, OPENDDS_MAP(), OPENDDS_STRING, ACE_Vector< T, DEFAULT_SIZE >::push_back(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_deleted(), si_lock_, and ACE_Vector< T, DEFAULT_SIZE >::size().
Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_subscriber().
00371 { 00372 // mark that the entity is being deleted 00373 set_deleted(true); 00374 00375 ACE_Vector<DDS::DataReader_ptr> drs; 00376 00377 #ifndef OPENDDS_NO_MULTI_TOPIC 00378 { 00379 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00380 guard, 00381 this->si_lock_, 00382 DDS::RETCODE_ERROR); 00383 for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter = 00384 multitopic_reader_map_.begin(); 00385 mt_iter != multitopic_reader_map_.end(); ++mt_iter) { 00386 drs.push_back(mt_iter->second); 00387 } 00388 } 00389 00390 for (size_t i = 0; i < drs.size(); ++i) { 00391 const DDS::ReturnCode_t ret = delete_datareader(drs[i]); 00392 if (ret != DDS::RETCODE_OK) { 00393 ACE_ERROR_RETURN((LM_ERROR, 00394 ACE_TEXT("(%P|%t) ERROR: ") 00395 ACE_TEXT("SubscriberImpl::delete_contained_entities, ") 00396 ACE_TEXT("failed to delete datareader\n")), 00397 ret); 00398 } 00399 } 00400 drs.clear(); 00401 #endif 00402 00403 { 00404 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00405 guard, 00406 this->si_lock_, 00407 DDS::RETCODE_ERROR); 00408 DataReaderMap::iterator it; 00409 DataReaderMap::iterator itEnd = datareader_map_.end(); 00410 00411 for (it = datareader_map_.begin(); it != itEnd; ++it) { 00412 drs.push_back(it->second.in()); 00413 } 00414 } 00415 00416 for (size_t i = 0; i < drs.size(); ++i) { 00417 DDS::ReturnCode_t ret = delete_datareader(drs[i]); 00418 00419 if (ret != DDS::RETCODE_OK) { 00420 ACE_ERROR_RETURN((LM_ERROR, 00421 ACE_TEXT("(%P|%t) ERROR: ") 00422 ACE_TEXT("SubscriberImpl::delete_contained_entities, ") 00423 ACE_TEXT("failed to delete datareader\n")), 00424 ret); 00425 } 00426 } 00427 00428 // the subscriber can now start creating new publications 00429 set_deleted(false); 00430 00431 return DDS::RETCODE_OK; 00432 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::delete_datareader | ( | DDS::DataReader_ptr | a_datareader | ) | [virtual] |
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::enable | ( | ) | [virtual] |
Implements DDS::Entity.
Definition at line 791 of file SubscriberImpl.cpp.
References dp_id_, DDS::SubscriberQos::entity_factory, OpenDDS::DCPS::EntityImpl::is_enabled(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), monitor_, participant_, qos_, readers_not_enabled_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::EntityImpl::set_enabled(), and si_lock_.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_subscriber().
00792 { 00793 //According spec: 00794 // - Calling enable on an already enabled Entity returns OK and has no 00795 // effect. 00796 // - Calling enable on an Entity whose factory is not enabled will fail 00797 // and return PRECONDITION_NOT_MET. 00798 00799 if (this->is_enabled()) { 00800 return DDS::RETCODE_OK; 00801 } 00802 00803 RcHandle<DomainParticipantImpl> participant = this->participant_.lock(); 00804 if (!participant || participant->is_enabled() == false) { 00805 return DDS::RETCODE_PRECONDITION_NOT_MET; 00806 } 00807 00808 dp_id_ = participant->get_id(); 00809 00810 if (this->monitor_) { 00811 this->monitor_->report(); 00812 } 00813 00814 this->set_enabled(); 00815 00816 if (qos_.entity_factory.autoenable_created_entities) { 00817 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR); 00818 DataReaderSet readers; 00819 readers_not_enabled_.swap(readers); 00820 for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) { 00821 (*it)->enable(); 00822 } 00823 } 00824 00825 return DDS::RETCODE_OK; 00826 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::end_access | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 709 of file SubscriberImpl.cpp.
References access_depth_, ACE_TEXT(), datareader_set_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::GROUP_PRESENTATION_QOS, LM_ERROR, DDS::SubscriberQos::presentation, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and si_lock_.
00710 { 00711 if (enabled_ == false) { 00712 ACE_ERROR_RETURN((LM_ERROR, 00713 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:") 00714 ACE_TEXT(" Publisher is not enabled!\n")), 00715 DDS::RETCODE_NOT_ENABLED); 00716 } 00717 00718 if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) { 00719 return DDS::RETCODE_OK; 00720 } 00721 00722 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00723 guard, 00724 this->si_lock_, 00725 DDS::RETCODE_ERROR); 00726 00727 if (this->access_depth_ == 0) { 00728 ACE_ERROR_RETURN((LM_ERROR, 00729 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:") 00730 ACE_TEXT(" No matching call to begin_coherent_changes!\n")), 00731 DDS::RETCODE_PRECONDITION_NOT_MET); 00732 } 00733 00734 --this->access_depth_; 00735 00736 // We should only notify subscription on the first 00737 // and last change to the current change set: 00738 if (this->access_depth_ == 0) { 00739 for (DataReaderSet::iterator it = this->datareader_set_.begin(); 00740 it != this->datareader_set_.end(); ++it) { 00741 (*it)->end_access(); 00742 } 00743 } 00744 00745 return DDS::RETCODE_OK; 00746 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_datareaders | ( | DDS::DataReaderSeq & | readers, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [virtual] |
Definition at line 472 of file SubscriberImpl.cpp.
References access_depth_, datareader_set_, OpenDDS::DCPS::GroupRakeData::get_datareaders(), DDS::GROUP_PRESENTATION_QOS, DDS::SubscriberQos::presentation, OpenDDS::DCPS::push_back(), qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and si_lock_.
00477 { 00478 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00479 guard, 00480 this->si_lock_, 00481 DDS::RETCODE_ERROR); 00482 00483 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00484 // If access_scope is GROUP and ordered_access is true then return readers as 00485 // list which may contain same readers multiple times. Otherwise return readers 00486 // as set. 00487 if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) { 00488 if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) { 00489 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 00490 } 00491 00492 if (this->qos_.presentation.ordered_access) { 00493 00494 GroupRakeData data; 00495 for (DataReaderSet::const_iterator pos = datareader_set_.begin(); 00496 pos != datareader_set_.end(); ++pos) { 00497 (*pos)->get_ordered_data (data, sample_states, view_states, instance_states); 00498 } 00499 00500 // Return list of readers in the order of the source timestamp of the received 00501 // samples from readers. 00502 data.get_datareaders (readers); 00503 00504 return DDS::RETCODE_OK; 00505 } 00506 } 00507 #endif 00508 00509 // Return set of datareaders. 00510 readers.length(0); 00511 00512 for (DataReaderSet::const_iterator pos = datareader_set_.begin(); 00513 pos != datareader_set_.end(); ++pos) { 00514 if ((*pos)->have_sample_states(sample_states) && 00515 (*pos)->have_view_states(view_states) && 00516 (*pos)->have_instance_states(instance_states)) { 00517 push_back(readers, DDS::DataReader::_duplicate(pos->in())); 00518 } 00519 } 00520 00521 return DDS::RETCODE_OK; 00522 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_default_datareader_qos | ( | DDS::DataReaderQos & | qos | ) | [virtual] |
Definition at line 770 of file SubscriberImpl.cpp.
References default_datareader_qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::init_bit().
00772 { 00773 qos = default_datareader_qos_; 00774 return DDS::RETCODE_OK; 00775 }
DDS::InstanceHandle_t OpenDDS::DCPS::SubscriberImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 79 of file SubscriberImpl.cpp.
References handle_.
Referenced by OpenDDS::DCPS::SubscriberMonitorImpl::report().
00080 { 00081 return handle_; 00082 }
DDS::SubscriberListener_ptr OpenDDS::DCPS::SubscriberImpl::get_listener | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 668 of file SubscriberImpl.cpp.
References CORBA::LocalObject::_duplicate(), and listener_.
00669 { 00670 return DDS::SubscriberListener::_duplicate(listener_.in()); 00671 }
DDS::DomainParticipant_ptr OpenDDS::DCPS::SubscriberImpl::get_participant | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 751 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::RcHandle< T >::_retn(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_.
Referenced by OpenDDS::DCPS::PeerDiscovery< Spdp >::create_bit_dr(), OpenDDS::DCPS::MultiTopicDataReaderBase::init(), and OpenDDS::DCPS::SubscriberMonitorImpl::report().
00752 { 00753 return participant_.lock()._retn(); 00754 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_qos | ( | DDS::SubscriberQos & | qos | ) | [virtual] |
Definition at line 649 of file SubscriberImpl.cpp.
References qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::DataReaderImpl::init().
00651 { 00652 qos = qos_; 00653 return DDS::RETCODE_OK; 00654 }
void OpenDDS::DCPS::SubscriberImpl::get_subscription_ids | ( | SubscriptionIdVec & | subs | ) |
Populates a std::vector with the SubscriptionIds (GUIDs) of this Subscriber's Data Readers
Definition at line 923 of file SubscriberImpl.cpp.
References datareader_map_, and si_lock_.
Referenced by OpenDDS::DCPS::SubscriberMonitorImpl::report().
00924 { 00925 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00926 guard, 00927 this->si_lock_, 00928 ); 00929 00930 subs.reserve(datareader_map_.size()); 00931 for (DataReaderMap::iterator iter = datareader_map_.begin(); 00932 iter != datareader_map_.end(); 00933 ++iter) { 00934 subs.push_back(iter->second->get_subscription_id()); 00935 } 00936 }
bool OpenDDS::DCPS::SubscriberImpl::is_clean | ( | ) | const |
This method is not defined in the IDL and is defined for internal use. Check if there is any datareader associated with it.
Definition at line 829 of file SubscriberImpl.cpp.
References datareader_map_, and TheTransientKludge.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_subscriber(), and ~SubscriberImpl().
00830 { 00831 const bool sub_is_clean = datareader_map_.empty(); 00832 00833 if (!sub_is_clean && !TheTransientKludge->is_enabled()) { 00834 // Four BIT datareaders. 00835 return datareader_map_.size() == 4; 00836 } 00837 00838 return sub_is_clean; 00839 }
DDS::SubscriberListener_ptr OpenDDS::DCPS::SubscriberImpl::listener_for | ( | DDS::StatusKind | kind | ) |
DDS::DataReader_ptr OpenDDS::DCPS::SubscriberImpl::lookup_datareader | ( | const char * | topic_name | ) | [virtual] |
Definition at line 435 of file SubscriberImpl.cpp.
References CORBA::LocalObject::_duplicate(), CORBA::LocalObject::_nil(), ACE_TEXT(), datareader_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OPENDDS_MAP(), OPENDDS_STRING, and si_lock_.
00437 { 00438 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00439 guard, 00440 this->si_lock_, 00441 DDS::DataReader::_nil()); 00442 00443 // If multiple entries whose key is "topic_name" then which one is 00444 // returned ? Spec does not limit which one should give. 00445 DataReaderMap::iterator it = datareader_map_.find(topic_name); 00446 00447 if (it == datareader_map_.end()) { 00448 #ifndef OPENDDS_NO_MULTI_TOPIC 00449 OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter = 00450 multitopic_reader_map_.find(topic_name); 00451 if (mt_iter != multitopic_reader_map_.end()) { 00452 return DDS::DataReader::_duplicate(mt_iter->second); 00453 } 00454 #endif 00455 00456 if (DCPS_debug_level >= 2) { 00457 ACE_DEBUG((LM_DEBUG, 00458 ACE_TEXT("(%P|%t) ") 00459 ACE_TEXT("SubscriberImpl::lookup_datareader, ") 00460 ACE_TEXT("The datareader(topic_name=%C) is not found\n"), 00461 topic_name)); 00462 } 00463 00464 return DDS::DataReader::_nil(); 00465 00466 } else { 00467 return DDS::DataReader::_duplicate(it->second.in()); 00468 } 00469 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::multitopic_reader_enabled | ( | DDS::DataReader_ptr | reader | ) |
Definition at line 876 of file SubscriberImpl.cpp.
References CORBA::LocalObject::_duplicate(), and DDS::RETCODE_OK.
Referenced by create_datareader().
00877 { 00878 DDS::TopicDescription_var td = reader->get_topicdescription(); 00879 CORBA::String_var topic = td->get_name(); 00880 multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader); 00881 return DDS::RETCODE_OK; 00882 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::notify_datareaders | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 525 of file SubscriberImpl.cpp.
References ACE_TEXT(), DDS::DATA_AVAILABLE_STATUS, datareader_map_, OpenDDS::DCPS::MultiTopicDataReaderBase::get_listener(), OpenDDS::DCPS::MultiTopicDataReaderBase::have_sample_states(), CORBA::is_nil(), LM_ERROR, DDS::NOT_READ_SAMPLE_STATE, OPENDDS_MAP(), OPENDDS_STRING, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::MultiTopicDataReaderBase::set_status_changed_flag(), and si_lock_.
00526 { 00527 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00528 guard, 00529 this->si_lock_, 00530 DDS::RETCODE_ERROR); 00531 00532 DataReaderMap::iterator it; 00533 00534 for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) { 00535 if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) { 00536 DDS::DataReaderListener_var listener = it->second->get_listener(); 00537 if (!CORBA::is_nil (listener)) { 00538 listener->on_data_available(it->second.in()); 00539 } 00540 00541 it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false); 00542 } 00543 } 00544 00545 #ifndef OPENDDS_NO_MULTI_TOPIC 00546 for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator it = 00547 multitopic_reader_map_.begin(); it != multitopic_reader_map_.end(); 00548 ++it) { 00549 MultiTopicDataReaderBase* dri = 00550 dynamic_cast<MultiTopicDataReaderBase*>(it->second.in()); 00551 00552 if (!dri) { 00553 ACE_ERROR_RETURN((LM_ERROR, 00554 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ") 00555 ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n")), 00556 ::DDS::RETCODE_ERROR); 00557 } 00558 00559 if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) { 00560 DDS::DataReaderListener_var listener = dri->get_listener(); 00561 if (!CORBA::is_nil(listener)) { 00562 listener->on_data_available(dri); 00563 } 00564 dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false); 00565 } 00566 } 00567 #endif 00568 00569 return DDS::RETCODE_OK; 00570 }
OpenDDS::DCPS::SubscriberImpl::OPENDDS_MAP | ( | OPENDDS_STRING | , | |
DDS::DataReader_var | ||||
) | [private] |
Referenced by delete_contained_entities(), lookup_datareader(), and notify_datareaders().
typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::DataReaderQos | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
DataReader id to qos map.
typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MULTIMAP | ( | OPENDDS_STRING | , | |
DataReaderImpl_rch | ||||
) | [private] |
Keep track of all the DataReaders attached to this Subscriber: key is the topic_name
typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_SET | ( | DataReaderImpl_rch | ) | [private] |
Keep track of DataReaders with data std::set for now, want to encapsulate this so we can switch between a set or list depending on Presentation QoS.
typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_VECTOR | ( | RepoId | ) |
RcHandle< EntityImpl > OpenDDS::DCPS::SubscriberImpl::parent | ( | void | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 1009 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and participant_.
01010 { 01011 return this->participant_.lock(); 01012 }
unsigned int & OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size | ( | ) |
Configure the size of the raw data collection buffer.
Definition at line 911 of file SubscriberImpl.cpp.
References raw_latency_buffer_size_.
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00912 { 00913 return this->raw_latency_buffer_size_; 00914 }
DataCollector< double >::OnFull & OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type | ( | ) |
Configure the type of the raw data collection buffer.
Definition at line 917 of file SubscriberImpl.cpp.
References raw_latency_buffer_type_.
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00918 { 00919 return this->raw_latency_buffer_type_; 00920 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::reader_enabled | ( | const char * | topic_name, | |
DataReaderImpl * | reader | |||
) |
Definition at line 851 of file SubscriberImpl.cpp.
References ACE_TEXT(), datareader_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, monitor_, OpenDDS::DCPS::rchandle_from(), readers_not_enabled_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and si_lock_.
00853 { 00854 if (DCPS_debug_level >= 4) { 00855 ACE_DEBUG((LM_DEBUG, 00856 ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ") 00857 ACE_TEXT("datareader(topic_name=%C) enabled\n"), 00858 topic_name)); 00859 } 00860 00861 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, si_lock_, DDS::RETCODE_ERROR); 00862 DataReaderImpl_rch reader = rchandle_from(reader_ptr); 00863 readers_not_enabled_.erase(reader); 00864 00865 this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader)); 00866 00867 if (this->monitor_) { 00868 this->monitor_->report(); 00869 } 00870 00871 return DDS::RETCODE_OK; 00872 }
void OpenDDS::DCPS::SubscriberImpl::remove_from_datareader_set | ( | DataReaderImpl * | reader | ) |
Definition at line 885 of file SubscriberImpl.cpp.
References datareader_set_, OpenDDS::DCPS::rchandle_from(), and si_lock_.
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup().
00886 { 00887 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, si_lock_); 00888 datareader_set_.erase(rchandle_from(reader)); 00889 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_default_datareader_qos | ( | const DDS::DataReaderQos & | qos | ) | [virtual] |
Definition at line 757 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::consistent(), default_datareader_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().
00759 { 00760 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00761 default_datareader_qos_ = qos; 00762 return DDS::RETCODE_OK; 00763 00764 } else { 00765 return DDS::RETCODE_INCONSISTENT_POLICY; 00766 } 00767 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_listener | ( | DDS::SubscriberListener_ptr | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 657 of file SubscriberImpl.cpp.
References CORBA::LocalObject::_duplicate(), listener_, listener_mask_, and DDS::RETCODE_OK.
00660 { 00661 listener_mask_ = mask; 00662 //note: OK to duplicate a nil object ref 00663 listener_ = DDS::SubscriberListener::_duplicate(a_listener); 00664 return DDS::RETCODE_OK; 00665 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_qos | ( | const DDS::SubscriberQos & | qos | ) | [virtual] |
Definition at line 573 of file SubscriberImpl.cpp.
References ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), datareader_map_, domain_id_, dp_id_, OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_STRING, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, si_lock_, status, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().
00575 { 00576 00577 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00578 00579 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00580 if (qos_ == qos) 00581 return DDS::RETCODE_OK; 00582 00583 // for the not changeable qos, it can be changed before enable 00584 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00585 return DDS::RETCODE_IMMUTABLE_POLICY; 00586 00587 } else { 00588 qos_ = qos; 00589 00590 DrIdToQosMap idToQosMap; 00591 { 00592 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00593 guard, 00594 this->si_lock_, 00595 DDS::RETCODE_ERROR); 00596 // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx 00597 DataReaderMap::iterator endIter = datareader_map_.end(); 00598 00599 for (DataReaderMap::iterator iter = datareader_map_.begin(); 00600 iter != endIter; ++iter) { 00601 DataReaderImpl_rch reader = iter->second; 00602 reader->set_subscriber_qos (qos); 00603 DDS::DataReaderQos qos; 00604 reader->get_qos(qos); 00605 RepoId id = reader->get_subscription_id(); 00606 std::pair<DrIdToQosMap::iterator, bool> pair 00607 = idToQosMap.insert(DrIdToQosMap::value_type(id, qos)); 00608 00609 if (pair.second == false) { 00610 GuidConverter converter(id); 00611 ACE_ERROR_RETURN((LM_ERROR, 00612 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ") 00613 ACE_TEXT("insert %C to DrIdToQosMap failed.\n"), 00614 OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR); 00615 } 00616 } 00617 } 00618 00619 DrIdToQosMap::iterator iter = idToQosMap.begin(); 00620 00621 while (iter != idToQosMap.end()) { 00622 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00623 const bool status 00624 = disco->update_subscription_qos(this->domain_id_, 00625 this->dp_id_, 00626 iter->first, 00627 iter->second, 00628 this->qos_); 00629 00630 if (!status) { 00631 ACE_ERROR_RETURN((LM_ERROR, 00632 ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ") 00633 ACE_TEXT("failed. \n")), 00634 DDS::RETCODE_ERROR); 00635 } 00636 00637 ++iter; 00638 } 00639 } 00640 00641 return DDS::RETCODE_OK; 00642 00643 } else { 00644 return DDS::RETCODE_INCONSISTENT_POLICY; 00645 } 00646 }
void OpenDDS::DCPS::SubscriberImpl::update_ownership_strength | ( | const PublicationId & | pub_id, | |
const CORBA::Long & | ownership_strength | |||
) |
Definition at line 940 of file SubscriberImpl.cpp.
References datareader_map_, and si_lock_.
00942 { 00943 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00944 guard, 00945 this->si_lock_, 00946 ); 00947 00948 for (DataReaderMap::iterator iter = datareader_map_.begin(); 00949 iter != datareader_map_.end(); 00950 ++iter) { 00951 if (!iter->second->is_bit()) { 00952 iter->second->update_ownership_strength(pub_id, ownership_strength); 00953 } 00954 } 00955 }
bool OpenDDS::DCPS::SubscriberImpl::validate_datareader_qos | ( | const DDS::DataReaderQos & | qos, | |
const DDS::DataReaderQos & | default_qos, | |||
DDS::Topic_ptr | a_topic, | |||
DDS::DataReaderQos & | result_qos, | |||
bool | mt | |||
) | [static] |
Definition at line 1015 of file SubscriberImpl.cpp.
References CORBA::LocalObject::_nil(), ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::consistent(), copy_from_topic_qos(), DATAREADER_QOS_DEFAULT, DATAREADER_QOS_USE_TOPIC_QOS, OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().
Referenced by create_datareader(), and OpenDDS::DCPS::DomainParticipantImpl::create_recorder().
01020 { 01021 01022 01023 if (qos == DATAREADER_QOS_DEFAULT) { 01024 dr_qos = default_qos; 01025 01026 } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) { 01027 01028 #ifndef OPENDDS_NO_MULTI_TOPIC 01029 if (mt) { 01030 if (DCPS_debug_level) { 01031 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 01032 ACE_TEXT("SubscriberImpl::create_datareader, ") 01033 ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ") 01034 ACE_TEXT("to create a MultiTopic DataReader.\n"))); 01035 } 01036 return DDS::DataReader::_nil(); 01037 } 01038 #else 01039 ACE_UNUSED_ARG(mt); 01040 #endif 01041 DDS::TopicQos topic_qos; 01042 a_topic->get_qos(topic_qos); 01043 01044 dr_qos = default_qos; 01045 01046 Qos_Helper::copy_from_topic_qos(dr_qos, 01047 topic_qos); 01048 01049 } else { 01050 dr_qos = qos; 01051 } 01052 01053 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false); 01054 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false); 01055 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false); 01056 01057 if (!Qos_Helper::valid(dr_qos)) { 01058 ACE_ERROR((LM_ERROR, 01059 ACE_TEXT("(%P|%t) ERROR: ") 01060 ACE_TEXT("SubscriberImpl::create_datareader, ") 01061 ACE_TEXT("invalid qos.\n"))); 01062 return false; 01063 } 01064 01065 if (!Qos_Helper::consistent(dr_qos)) { 01066 ACE_ERROR((LM_ERROR, 01067 ACE_TEXT("(%P|%t) ERROR: ") 01068 ACE_TEXT("SubscriberImpl::create_datareader, ") 01069 ACE_TEXT("inconsistent qos.\n"))); 01070 return false; 01071 } 01072 01073 01074 return true; 01075 }
int OpenDDS::DCPS::SubscriberImpl::access_depth_ [private] |
Definition at line 210 of file SubscriberImpl.h.
Referenced by begin_access(), end_access(), and get_datareaders().
DataReaderMap OpenDDS::DCPS::SubscriberImpl::datareader_map_ [private] |
Definition at line 186 of file SubscriberImpl.h.
Referenced by delete_contained_entities(), get_subscription_ids(), is_clean(), lookup_datareader(), notify_datareaders(), reader_enabled(), set_qos(), update_ownership_strength(), and ~SubscriberImpl().
DataReaderSet OpenDDS::DCPS::SubscriberImpl::datareader_set_ [private] |
Definition at line 187 of file SubscriberImpl.h.
Referenced by begin_access(), coherent_change_received(), contains_reader(), data_received(), end_access(), get_datareaders(), and remove_from_datareader_set().
Definition at line 180 of file SubscriberImpl.h.
Referenced by create_datareader(), get_default_datareader_qos(), and set_default_datareader_qos().
Definition at line 195 of file SubscriberImpl.h.
Referenced by set_qos().
RepoId OpenDDS::DCPS::SubscriberImpl::dp_id_ [private] |
Definition at line 196 of file SubscriberImpl.h.
Definition at line 177 of file SubscriberImpl.h.
Referenced by get_instance_handle().
DDS::SubscriberListener_var OpenDDS::DCPS::SubscriberImpl::listener_ [private] |
Definition at line 183 of file SubscriberImpl.h.
Referenced by get_listener(), set_listener(), and SubscriberImpl().
Definition at line 182 of file SubscriberImpl.h.
Referenced by set_listener().
Monitor* OpenDDS::DCPS::SubscriberImpl::monitor_ [private] |
Monitor object for this entity.
Definition at line 208 of file SubscriberImpl.h.
Referenced by enable(), reader_enabled(), and SubscriberImpl().
Definition at line 193 of file SubscriberImpl.h.
Referenced by create_datareader(), enable(), get_participant(), and parent().
Definition at line 179 of file SubscriberImpl.h.
Referenced by begin_access(), create_datareader(), enable(), end_access(), get_datareaders(), get_qos(), and set_qos().
unsigned int OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size_ [private] |
Bound (or initial reservation) of raw latency buffers.
Definition at line 199 of file SubscriberImpl.h.
Referenced by create_datareader(), and raw_latency_buffer_size().
DataCollector<double>::OnFull OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type_ [private] |
Type of raw latency data buffers.
Definition at line 202 of file SubscriberImpl.h.
Referenced by create_datareader(), and raw_latency_buffer_type().
DataReaderSet OpenDDS::DCPS::SubscriberImpl::readers_not_enabled_ [private] |
Definition at line 185 of file SubscriberImpl.h.
Referenced by create_datareader(), enable(), and reader_enabled().
this lock protects the data structures in this class.
Definition at line 205 of file SubscriberImpl.h.
Referenced by begin_access(), coherent_change_received(), contains_reader(), create_datareader(), data_received(), delete_contained_entities(), enable(), end_access(), get_datareaders(), get_subscription_ids(), lookup_datareader(), notify_datareaders(), reader_enabled(), remove_from_datareader_set(), set_qos(), and update_ownership_strength().