#include <SubscriberImpl.h>
Inheritance diagram for OpenDDS::DCPS::SubscriberImpl:
Public Member Functions | |
SubscriberImpl (DDS::InstanceHandle_t handle, const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant) | |
virtual | ~SubscriberImpl () |
virtual DDS::InstanceHandle_t | get_instance_handle () |
bool | contains_reader (DDS::InstanceHandle_t a_handle) |
virtual DDS::DataReader_ptr | create_datareader (DDS::TopicDescription_ptr a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask) |
virtual DDS::ReturnCode_t | delete_datareader (DDS::DataReader_ptr a_datareader) |
virtual DDS::ReturnCode_t | delete_contained_entities () |
virtual DDS::DataReader_ptr | lookup_datareader (const char *topic_name) |
virtual DDS::ReturnCode_t | get_datareaders (DDS::DataReaderSeq &readers, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states) |
virtual DDS::ReturnCode_t | notify_datareaders () |
virtual DDS::ReturnCode_t | set_qos (const DDS::SubscriberQos &qos) |
virtual DDS::ReturnCode_t | get_qos (DDS::SubscriberQos &qos) |
virtual DDS::ReturnCode_t | set_listener (DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask) |
virtual DDS::SubscriberListener_ptr | get_listener () |
virtual DDS::ReturnCode_t | begin_access () |
virtual DDS::ReturnCode_t | end_access () |
virtual DDS::DomainParticipant_ptr | get_participant () |
virtual DDS::ReturnCode_t | set_default_datareader_qos (const DDS::DataReaderQos &qos) |
virtual DDS::ReturnCode_t | get_default_datareader_qos (DDS::DataReaderQos &qos) |
virtual DDS::ReturnCode_t | copy_from_topic_qos (DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos) |
virtual DDS::ReturnCode_t | enable () |
bool | is_clean () const |
void | data_received (DataReaderImpl *reader) |
DDS::ReturnCode_t | reader_enabled (const char *topic_name, DataReaderImpl *reader) |
DDS::ReturnCode_t | multitopic_reader_enabled (DDS::DataReader_ptr reader) |
void | remove_from_datareader_set (DataReaderImpl *reader) |
DDS::SubscriberListener_ptr | listener_for (DDS::StatusKind kind) |
typedef | OPENDDS_VECTOR (RepoId) SubscriptionIdVec |
void | get_subscription_ids (SubscriptionIdVec &subs) |
void | update_ownership_strength (const PublicationId &pub_id, const CORBA::Long &ownership_strength) |
void | coherent_change_received (RepoId &publisher_id, DataReaderImpl *reader, Coherent_State &group_state) |
virtual EntityImpl * | parent () const |
Raw Latency Statistics Configuration Interfaces | |
unsigned int & | raw_latency_buffer_size () |
Configure the size of the raw data collection buffer. | |
DataCollector< double >::OnFull & | raw_latency_buffer_type () |
Configure the type of the raw data collection buffer. | |
Static Public Member Functions | |
static bool | validate_datareader_qos (const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt) |
Private Member Functions | |
typedef | OPENDDS_MULTIMAP (OPENDDS_STRING, DataReaderImpl *) DataReaderMap |
typedef | OPENDDS_SET (DataReaderImpl *) DataReaderSet |
typedef | OPENDDS_MAP_CMP (RepoId, DDS::DataReaderQos, GUID_tKeyLessThan) DrIdToQosMap |
DataReader id to qos map. | |
OPENDDS_MAP (OPENDDS_STRING, DDS::DataReader_var) multitopic_reader_map_ | |
Private Attributes | |
DDS::InstanceHandle_t | handle_ |
DDS::SubscriberQos | qos_ |
DDS::DataReaderQos | default_datareader_qos_ |
DDS::StatusMask | listener_mask_ |
DDS::SubscriberListener_var | listener_ |
DataReaderMap | datareader_map_ |
DataReaderSet | datareader_set_ |
DomainParticipantImpl * | participant_ |
DDS::DomainParticipant_var | participant_objref_ |
DDS::DomainId_t | domain_id_ |
unsigned int | raw_latency_buffer_size_ |
Bound (or initial reservation) of raw latency buffers. | |
DataCollector< double >::OnFull | raw_latency_buffer_type_ |
Type of raw latency data buffers. | |
ACE_Recursive_Thread_Mutex | si_lock_ |
this lock protects the data structures in this class. | |
Monitor * | monitor_ |
Monitor object for this entity. | |
int | access_depth_ |
Definition at line 34 of file SubscriberImpl.h.
OpenDDS::DCPS::SubscriberImpl::SubscriberImpl | ( | DDS::InstanceHandle_t | handle, | |
const DDS::SubscriberQos & | qos, | |||
DDS::SubscriberListener_ptr | a_listener, | |||
const DDS::StatusMask & | mask, | |||
DomainParticipantImpl * | participant | |||
) |
Definition at line 42 of file SubscriberImpl.cpp.
References listener_, monitor_, and TheServiceParticipant.
00047 : handle_(handle), 00048 qos_(qos), 00049 default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()), 00050 listener_mask_(mask), 00051 participant_(participant), 00052 domain_id_(participant->get_domain_id()), 00053 raw_latency_buffer_size_(0), 00054 raw_latency_buffer_type_(DataCollector<double>::KeepOldest), 00055 monitor_(0), 00056 access_depth_ (0) 00057 { 00058 //Note: OK to duplicate a nil. 00059 listener_ = DDS::SubscriberListener::_duplicate(a_listener); 00060 00061 monitor_ = TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this); 00062 }
OpenDDS::DCPS::SubscriberImpl::~SubscriberImpl | ( | ) | [virtual] |
Definition at line 65 of file SubscriberImpl.cpp.
References is_clean().
00066 { 00067 // 00068 // The datareders should be deleted already before calling delete 00069 // subscriber. 00070 if (!is_clean()) { 00071 ACE_ERROR((LM_ERROR, 00072 ACE_TEXT("(%P|%t) ERROR: ") 00073 ACE_TEXT("SubscriberImpl::~SubscriberImpl, ") 00074 ACE_TEXT("some datareaders still exist.\n"))); 00075 } 00076 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::begin_access | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 640 of file SubscriberImpl.cpp.
References access_depth_, datareader_set_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::GROUP_PRESENTATION_QOS, DDS::SubscriberQos::presentation, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.
00641 { 00642 if (enabled_ == false) { 00643 ACE_ERROR_RETURN((LM_ERROR, 00644 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:") 00645 ACE_TEXT(" Subscriber is not enabled!\n")), 00646 DDS::RETCODE_NOT_ENABLED); 00647 } 00648 00649 if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) { 00650 return DDS::RETCODE_OK; 00651 } 00652 00653 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00654 guard, 00655 this->si_lock_, 00656 DDS::RETCODE_ERROR); 00657 00658 ++this->access_depth_; 00659 00660 // We should only notify subscription on the first 00661 // and last change to the current change set: 00662 if (this->access_depth_ == 1) { 00663 for (DataReaderSet::iterator it = this->datareader_set_.begin(); 00664 it != this->datareader_set_.end(); ++it) { 00665 (*it)->begin_access(); 00666 } 00667 } 00668 00669 return DDS::RETCODE_OK; 00670 }
void OpenDDS::DCPS::SubscriberImpl::coherent_change_received | ( | RepoId & | publisher_id, | |
DataReaderImpl * | reader, | |||
Coherent_State & | group_state | |||
) |
Definition at line 908 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::COMPLETED, datareader_set_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, and state.
Referenced by OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion().
00911 { 00912 // Verify if all readers complete the coherent changes. The result 00913 // is either COMPLETED or REJECTED. 00914 group_state = COMPLETED; 00915 DataReaderSet::const_iterator endIter = datareader_set_.end(); 00916 for (DataReaderSet::const_iterator iter = datareader_set_.begin(); 00917 iter != endIter; ++iter) { 00918 00919 Coherent_State state = COMPLETED; 00920 (*iter)->coherent_change_received (publisher_id, state); 00921 if (state == NOT_COMPLETED_YET) { 00922 group_state = state; 00923 return; 00924 } 00925 else if (state == REJECTED) { 00926 group_state = REJECTED; 00927 } 00928 } 00929 00930 PublicationId writerId = GUID_UNKNOWN; 00931 for (DataReaderSet::const_iterator iter = datareader_set_.begin(); 00932 iter != endIter; ++iter) { 00933 if (group_state == COMPLETED) { 00934 (*iter)->accept_coherent (writerId, publisher_id); 00935 } 00936 else { //REJECTED 00937 (*iter)->reject_coherent (writerId, publisher_id); 00938 } 00939 } 00940 00941 if (group_state == COMPLETED) { 00942 for (DataReaderSet::const_iterator iter = datareader_set_.begin(); 00943 iter != endIter; ++iter) { 00944 (*iter)->coherent_changes_completed (reader); 00945 (*iter)->reset_coherent_info (writerId, publisher_id); 00946 } 00947 } 00948 }
bool OpenDDS::DCPS::SubscriberImpl::contains_reader | ( | DDS::InstanceHandle_t | a_handle | ) |
Definition at line 85 of file SubscriberImpl.cpp.
References datareader_set_.
00086 { 00087 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00088 guard, 00089 this->si_lock_, 00090 false); 00091 00092 for (DataReaderSet::iterator it(datareader_set_.begin()); 00093 it != datareader_set_.end(); ++it) { 00094 if (a_handle == (*it)->get_instance_handle()) 00095 return true; 00096 } 00097 00098 return false; 00099 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::copy_from_topic_qos | ( | DDS::DataReaderQos & | a_datareader_qos, | |
const DDS::TopicQos & | a_topic_qos | |||
) | [virtual] |
Definition at line 742 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.
00745 { 00746 if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) { 00747 return DDS::RETCODE_OK; 00748 00749 } else { 00750 return DDS::RETCODE_INCONSISTENT_POLICY; 00751 } 00752 }
DDS::DataReader_ptr OpenDDS::DCPS::SubscriberImpl::create_datareader | ( | DDS::TopicDescription_ptr | a_topic_desc, | |
const DDS::DataReaderQos & | qos, | |||
DDS::DataReaderListener_ptr | a_listener, | |||
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 102 of file SubscriberImpl.cpp.
References default_datareader_qos_, OpenDDS::DCPS::DataReaderImpl::enable(), OpenDDS::DCPS::DataReaderImpl::enable_filtering(), OpenDDS::DCPS::EntityImpl::enabled_, DDS::SubscriberQos::entity_factory, OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::DataReaderImpl::init(), OpenDDS::DCPS::MultiTopicDataReaderBase::init(), multitopic_reader_enabled(), participant_, qos_, OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size(), raw_latency_buffer_size_, OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type(), raw_latency_buffer_type_, DDS::RETCODE_OK, and validate_datareader_qos().
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00107 { 00108 if (CORBA::is_nil(a_topic_desc)) { 00109 ACE_ERROR((LM_ERROR, 00110 ACE_TEXT("(%P|%t) ERROR: ") 00111 ACE_TEXT("SubscriberImpl::create_datareader, ") 00112 ACE_TEXT("topic desc is nil.\n"))); 00113 return DDS::DataReader::_nil(); 00114 } 00115 00116 DDS::DataReaderQos dr_qos; 00117 00118 TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc); 00119 00120 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00121 ContentFilteredTopicImpl* cft = 0; 00122 #endif 00123 #ifndef OPENDDS_NO_MULTI_TOPIC 00124 MultiTopicImpl* mt = 0; 00125 #else 00126 bool mt = false; 00127 #endif 00128 00129 if (!topic_servant) { 00130 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00131 cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc); 00132 if (cft) { 00133 DDS::Topic_var related; 00134 related = cft->get_related_topic(); 00135 topic_servant = dynamic_cast<TopicImpl*>(related.in()); 00136 } 00137 else 00138 #endif 00139 { 00140 #ifndef OPENDDS_NO_MULTI_TOPIC 00141 mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc); 00142 #endif 00143 } 00144 } 00145 00146 if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt)) 00147 return DDS::DataReader::_nil(); 00148 00149 #ifndef OPENDDS_NO_MULTI_TOPIC 00150 if (mt) { 00151 try { 00152 DDS::DataReader_var dr = 00153 mt->get_type_support()->create_multitopic_datareader(); 00154 MultiTopicDataReaderBase* mtdr = 00155 dynamic_cast<MultiTopicDataReaderBase*>(dr.in()); 00156 mtdr->init(dr_qos, a_listener, mask, this, mt); 00157 if (enabled_.value() && qos_.entity_factory.autoenable_created_entities) { 00158 if (dr->enable() != DDS::RETCODE_OK) { 00159 ACE_ERROR((LM_ERROR, 00160 ACE_TEXT("(%P|%t) ERROR: ") 00161 ACE_TEXT("SubscriberImpl::create_datareader, ") 00162 ACE_TEXT("enable of MultiTopicDataReader failed.\n"))); 00163 return DDS::DataReader::_nil(); 00164 } 00165 multitopic_reader_enabled(dr); 00166 } 00167 return dr._retn(); 00168 } catch (const std::exception& e) { 00169 ACE_ERROR((LM_ERROR, 00170 ACE_TEXT("(%P|%t) ERROR: ") 00171 ACE_TEXT("SubscriberImpl::create_datareader, ") 00172 ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"), 00173 e.what())); 00174 } 00175 return DDS::DataReader::_nil(); 00176 } 00177 #endif 00178 00179 OpenDDS::DCPS::TypeSupport_ptr typesupport = 00180 topic_servant->get_type_support(); 00181 00182 if (0 == typesupport) { 00183 CORBA::String_var name = a_topic_desc->get_name(); 00184 ACE_ERROR((LM_ERROR, 00185 ACE_TEXT("(%P|%t) ERROR: ") 00186 ACE_TEXT("SubscriberImpl::create_datareader, ") 00187 ACE_TEXT("typesupport(topic_name=%C) is nil.\n"), 00188 name.in())); 00189 return DDS::DataReader::_nil(); 00190 } 00191 00192 DDS::DataReader_var dr_obj = typesupport->create_datareader(); 00193 00194 DataReaderImpl* dr_servant = 00195 dynamic_cast<DataReaderImpl*>(dr_obj.in()); 00196 00197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 00198 if (cft) { 00199 dr_servant->enable_filtering(cft); 00200 } 00201 #endif 00202 00203 // Propagate the latency buffer data collection configuration. 00204 // @TODO: Determine whether we want to exclude the Builtin Topic 00205 // readers from data gathering. 00206 dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_; 00207 dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_; 00208 00209 dr_servant->init(topic_servant, 00210 dr_qos, 00211 a_listener, 00212 mask, 00213 participant_, 00214 this, 00215 dr_obj.in()); 00216 00217 if ((this->enabled_ == true) 00218 && (qos_.entity_factory.autoenable_created_entities == 1)) { 00219 DDS::ReturnCode_t ret 00220 = dr_servant->enable(); 00221 00222 if (ret != DDS::RETCODE_OK) { 00223 ACE_ERROR((LM_ERROR, 00224 ACE_TEXT("(%P|%t) ERROR: ") 00225 ACE_TEXT("SubscriberImpl::create_datareader, ") 00226 ACE_TEXT("enable failed.\n"))); 00227 return DDS::DataReader::_nil(); 00228 } 00229 } 00230 00231 // add created data reader to this' data reader container - 00232 // done in enable_reader 00233 return DDS::DataReader::_duplicate(dr_obj.in()); 00234 00235 }
void OpenDDS::DCPS::SubscriberImpl::data_received | ( | DataReaderImpl * | reader | ) |
Definition at line 793 of file SubscriberImpl.cpp.
References datareader_set_.
Referenced by OpenDDS::DCPS::DataReaderImpl::data_received().
00794 { 00795 ACE_GUARD(ACE_Recursive_Thread_Mutex, 00796 guard, 00797 this->si_lock_); 00798 datareader_set_.insert(reader); 00799 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::delete_contained_entities | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 338 of file SubscriberImpl.cpp.
References datareader_map_, delete_datareader(), OPENDDS_MAP(), OPENDDS_STRING, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::EntityImpl::set_deleted().
Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_subscriber().
00339 { 00340 // mark that the entity is being deleted 00341 set_deleted(true); 00342 00343 ACE_Vector<DDS::DataReader_ptr> drs; 00344 00345 #ifndef OPENDDS_NO_MULTI_TOPIC 00346 { 00347 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00348 guard, 00349 this->si_lock_, 00350 DDS::RETCODE_ERROR); 00351 for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter = 00352 multitopic_reader_map_.begin(); 00353 mt_iter != multitopic_reader_map_.end(); ++mt_iter) { 00354 drs.push_back(mt_iter->second); 00355 } 00356 } 00357 00358 for (size_t i = 0; i < drs.size(); ++i) { 00359 const DDS::ReturnCode_t ret = delete_datareader(drs[i]); 00360 if (ret != DDS::RETCODE_OK) { 00361 ACE_ERROR_RETURN((LM_ERROR, 00362 ACE_TEXT("(%P|%t) ERROR: ") 00363 ACE_TEXT("SubscriberImpl::delete_contained_entities, ") 00364 ACE_TEXT("failed to delete datareader\n")), 00365 ret); 00366 } 00367 } 00368 drs.clear(); 00369 #endif 00370 00371 { 00372 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00373 guard, 00374 this->si_lock_, 00375 DDS::RETCODE_ERROR); 00376 DataReaderMap::iterator it; 00377 DataReaderMap::iterator itEnd = datareader_map_.end(); 00378 00379 for (it = datareader_map_.begin(); it != itEnd; ++it) { 00380 drs.push_back(it->second); 00381 } 00382 } 00383 00384 size_t num_rds = drs.size(); 00385 00386 for (size_t i = 0; i < num_rds; ++i) { 00387 DDS::ReturnCode_t ret = delete_datareader(drs[i]); 00388 00389 if (ret != DDS::RETCODE_OK) { 00390 ACE_ERROR_RETURN((LM_ERROR, 00391 ACE_TEXT("(%P|%t) ERROR: ") 00392 ACE_TEXT("SubscriberImpl::delete_contained_entities, ") 00393 ACE_TEXT("failed to delete datareader\n")), 00394 ret); 00395 } 00396 } 00397 00398 // the subscriber can now start creating new publications 00399 set_deleted(false); 00400 00401 return DDS::RETCODE_OK; 00402 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::delete_datareader | ( | DDS::DataReader_ptr | a_datareader | ) | [virtual] |
Referenced by delete_contained_entities().
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::enable | ( | ) | [virtual] |
Implements DDS::Entity.
Definition at line 755 of file SubscriberImpl.cpp.
References monitor_, OpenDDS::DCPS::Monitor::report(), DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and OpenDDS::DCPS::EntityImpl::set_enabled().
Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_subscriber().
00756 { 00757 //According spec: 00758 // - Calling enable on an already enabled Entity returns OK and has no 00759 // effect. 00760 // - Calling enable on an Entity whose factory is not enabled will fail 00761 // and return PRECONDITION_NOT_MET. 00762 00763 if (this->is_enabled()) { 00764 return DDS::RETCODE_OK; 00765 } 00766 00767 if (this->participant_->is_enabled() == false) { 00768 return DDS::RETCODE_PRECONDITION_NOT_MET; 00769 } 00770 00771 if (this->monitor_) { 00772 this->monitor_->report(); 00773 } 00774 00775 this->set_enabled(); 00776 return DDS::RETCODE_OK; 00777 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::end_access | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 673 of file SubscriberImpl.cpp.
References access_depth_, datareader_set_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::GROUP_PRESENTATION_QOS, DDS::SubscriberQos::presentation, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00674 { 00675 if (enabled_ == false) { 00676 ACE_ERROR_RETURN((LM_ERROR, 00677 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:") 00678 ACE_TEXT(" Publisher is not enabled!\n")), 00679 DDS::RETCODE_NOT_ENABLED); 00680 } 00681 00682 if (qos_.presentation.access_scope != DDS::GROUP_PRESENTATION_QOS) { 00683 return DDS::RETCODE_OK; 00684 } 00685 00686 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00687 guard, 00688 this->si_lock_, 00689 DDS::RETCODE_ERROR); 00690 00691 if (this->access_depth_ == 0) { 00692 ACE_ERROR_RETURN((LM_ERROR, 00693 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:") 00694 ACE_TEXT(" No matching call to begin_coherent_changes!\n")), 00695 DDS::RETCODE_PRECONDITION_NOT_MET); 00696 } 00697 00698 --this->access_depth_; 00699 00700 // We should only notify subscription on the first 00701 // and last change to the current change set: 00702 if (this->access_depth_ == 0) { 00703 for (DataReaderSet::iterator it = this->datareader_set_.begin(); 00704 it != this->datareader_set_.end(); ++it) { 00705 (*it)->end_access(); 00706 } 00707 } 00708 00709 return DDS::RETCODE_OK; 00710 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_datareaders | ( | DDS::DataReaderSeq & | readers, | |
DDS::SampleStateMask | sample_states, | |||
DDS::ViewStateMask | view_states, | |||
DDS::InstanceStateMask | instance_states | |||
) | [virtual] |
Definition at line 442 of file SubscriberImpl.cpp.
References datareader_set_, OpenDDS::DCPS::GroupRakeData::get_datareaders(), DDS::GROUP_PRESENTATION_QOS, OpenDDS::DCPS::push_back(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.
00447 { 00448 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00449 guard, 00450 this->si_lock_, 00451 DDS::RETCODE_ERROR); 00452 00453 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 00454 // If access_scope is GROUP and ordered_access is true then return readers as 00455 // list which may contain same readers multiple times. Otherwise return readers 00456 // as set. 00457 if (this->qos_.presentation.access_scope == ::DDS::GROUP_PRESENTATION_QOS) { 00458 if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) { 00459 return ::DDS::RETCODE_PRECONDITION_NOT_MET; 00460 } 00461 00462 if (this->qos_.presentation.ordered_access) { 00463 00464 GroupRakeData data; 00465 for (DataReaderSet::const_iterator pos = datareader_set_.begin(); 00466 pos != datareader_set_.end(); ++pos) { 00467 (*pos)->get_ordered_data (data, sample_states, view_states, instance_states); 00468 } 00469 00470 // Return list of readers in the order of the source timestamp of the received 00471 // samples from readers. 00472 data.get_datareaders (readers); 00473 00474 return DDS::RETCODE_OK; 00475 } 00476 } 00477 #endif 00478 00479 // Return set of datareaders. 00480 int count(0); 00481 readers.length(count); 00482 00483 for (DataReaderSet::const_iterator pos = datareader_set_.begin(); 00484 pos != datareader_set_.end(); ++pos) { 00485 if ((*pos)->have_sample_states(sample_states) && 00486 (*pos)->have_view_states(view_states) && 00487 (*pos)->have_instance_states(instance_states)) { 00488 push_back(readers, (*pos)->get_dr_obj_ref()); 00489 ++count; 00490 } 00491 } 00492 00493 return DDS::RETCODE_OK; 00494 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_default_datareader_qos | ( | DDS::DataReaderQos & | qos | ) | [virtual] |
Definition at line 734 of file SubscriberImpl.cpp.
References default_datareader_qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::PeerDiscovery< OpenDDS::DCPS::StaticParticipant >::init_bit().
00736 { 00737 qos = default_datareader_qos_; 00738 return DDS::RETCODE_OK; 00739 }
DDS::InstanceHandle_t OpenDDS::DCPS::SubscriberImpl::get_instance_handle | ( | ) | [virtual] |
Implements OpenDDS::DCPS::EntityImpl.
Definition at line 79 of file SubscriberImpl.cpp.
References handle_.
Referenced by OpenDDS::DCPS::SubscriberMonitorImpl::report().
00080 { 00081 return handle_; 00082 }
DDS::SubscriberListener_ptr OpenDDS::DCPS::SubscriberImpl::get_listener | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 632 of file SubscriberImpl.cpp.
References listener_.
00633 { 00634 return DDS::SubscriberListener::_duplicate(listener_.in()); 00635 }
DDS::DomainParticipant_ptr OpenDDS::DCPS::SubscriberImpl::get_participant | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 715 of file SubscriberImpl.cpp.
References participant_.
Referenced by OpenDDS::DCPS::PeerDiscovery< OpenDDS::DCPS::StaticParticipant >::create_bit_dr(), OpenDDS::DCPS::MultiTopicDataReaderBase::init(), and OpenDDS::DCPS::SubscriberMonitorImpl::report().
00716 { 00717 return DDS::DomainParticipant::_duplicate(participant_); 00718 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_qos | ( | DDS::SubscriberQos & | qos | ) | [virtual] |
Definition at line 613 of file SubscriberImpl.cpp.
References qos_, and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::DataReaderImpl::enable(), and OpenDDS::DCPS::DataReaderImpl::set_qos().
00615 { 00616 qos = qos_; 00617 return DDS::RETCODE_OK; 00618 }
void OpenDDS::DCPS::SubscriberImpl::get_subscription_ids | ( | SubscriptionIdVec & | subs | ) |
Populates a std::vector with the SubscriptionIds (GUIDs) of this Subscriber's Data Readers
Definition at line 870 of file SubscriberImpl.cpp.
References datareader_map_.
Referenced by OpenDDS::DCPS::SubscriberMonitorImpl::report().
00871 { 00872 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00873 guard, 00874 this->si_lock_, 00875 ); 00876 00877 subs.reserve(datareader_map_.size()); 00878 for (DataReaderMap::iterator iter = datareader_map_.begin(); 00879 iter != datareader_map_.end(); 00880 ++iter) { 00881 subs.push_back(iter->second->get_subscription_id()); 00882 } 00883 }
bool OpenDDS::DCPS::SubscriberImpl::is_clean | ( | ) | const |
This method is not defined in the IDL and is defined for internal use. Check if there is any datareader associated with it.
Definition at line 780 of file SubscriberImpl.cpp.
References datareader_map_, and TheTransientKludge.
Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_subscriber(), and ~SubscriberImpl().
00781 { 00782 bool sub_is_clean = datareader_map_.empty(); 00783 00784 if (!sub_is_clean && !TheTransientKludge->is_enabled()) { 00785 // Four BIT datareaders. 00786 return datareader_map_.size() == 4; 00787 } 00788 00789 return sub_is_clean; 00790 }
DDS::SubscriberListener_ptr OpenDDS::DCPS::SubscriberImpl::listener_for | ( | DDS::StatusKind | kind | ) |
DDS::DataReader_ptr OpenDDS::DCPS::SubscriberImpl::lookup_datareader | ( | const char * | topic_name | ) | [virtual] |
Definition at line 405 of file SubscriberImpl.cpp.
References datareader_map_, OpenDDS::DCPS::DCPS_debug_level, OPENDDS_MAP(), and OPENDDS_STRING.
00407 { 00408 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00409 guard, 00410 this->si_lock_, 00411 DDS::DataReader::_nil()); 00412 00413 // If multiple entries whose key is "topic_name" then which one is 00414 // returned ? Spec does not limit which one should give. 00415 DataReaderMap::iterator it = datareader_map_.find(topic_name); 00416 00417 if (it == datareader_map_.end()) { 00418 #ifndef OPENDDS_NO_MULTI_TOPIC 00419 OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator mt_iter = 00420 multitopic_reader_map_.find(topic_name); 00421 if (mt_iter != multitopic_reader_map_.end()) { 00422 return DDS::DataReader::_duplicate(mt_iter->second); 00423 } 00424 #endif 00425 00426 if (DCPS_debug_level >= 2) { 00427 ACE_DEBUG((LM_DEBUG, 00428 ACE_TEXT("(%P|%t) ") 00429 ACE_TEXT("SubscriberImpl::lookup_datareader, ") 00430 ACE_TEXT("The datareader(topic_name=%C) is not found\n"), 00431 topic_name)); 00432 } 00433 00434 return DDS::DataReader::_nil(); 00435 00436 } else { 00437 return DDS::DataReader::_duplicate(it->second); 00438 } 00439 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::multitopic_reader_enabled | ( | DDS::DataReader_ptr | reader | ) |
Definition at line 827 of file SubscriberImpl.cpp.
References DDS::RETCODE_OK.
Referenced by create_datareader().
00828 { 00829 DDS::TopicDescription_var td = reader->get_topicdescription(); 00830 CORBA::String_var topic = td->get_name(); 00831 multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader); 00832 return DDS::RETCODE_OK; 00833 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::notify_datareaders | ( | ) | [virtual] |
Implements DDS::Subscriber.
Definition at line 497 of file SubscriberImpl.cpp.
References DDS::DATA_AVAILABLE_STATUS, datareader_map_, DDS::NOT_READ_SAMPLE_STATE, OPENDDS_MAP(), OPENDDS_STRING, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.
00498 { 00499 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00500 guard, 00501 this->si_lock_, 00502 DDS::RETCODE_ERROR); 00503 00504 DataReaderMap::iterator it; 00505 00506 for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) { 00507 if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) { 00508 DDS::DataReaderListener_var listener = it->second->get_listener(); 00509 if (!CORBA::is_nil (listener)) { 00510 listener->on_data_available(it->second); 00511 } 00512 00513 it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false); 00514 } 00515 } 00516 00517 #ifndef OPENDDS_NO_MULTI_TOPIC 00518 for (OPENDDS_MAP(OPENDDS_STRING, DDS::DataReader_var)::iterator it = 00519 multitopic_reader_map_.begin(); it != multitopic_reader_map_.end(); 00520 ++it) { 00521 MultiTopicDataReaderBase* dri = 00522 dynamic_cast<MultiTopicDataReaderBase*>(it->second.in()); 00523 if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) { 00524 DDS::DataReaderListener_var listener = dri->get_listener(); 00525 if (!CORBA::is_nil(listener)) { 00526 listener->on_data_available(dri); 00527 } 00528 dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false); 00529 } 00530 } 00531 #endif 00532 00533 return DDS::RETCODE_OK; 00534 }
OpenDDS::DCPS::SubscriberImpl::OPENDDS_MAP | ( | OPENDDS_STRING | , | |
DDS::DataReader_var | ||||
) | [private] |
Referenced by delete_contained_entities(), lookup_datareader(), and notify_datareaders().
typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MAP_CMP | ( | RepoId | , | |
DDS::DataReaderQos | , | |||
GUID_tKeyLessThan | ||||
) | [private] |
DataReader id to qos map.
typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MULTIMAP | ( | OPENDDS_STRING | , | |
DataReaderImpl * | ||||
) | [private] |
Keep track of all the DataReaders attached to this Subscriber: key is the topic_name
typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_SET | ( | DataReaderImpl * | ) | [private] |
Keep track of DataReaders with data std::set for now, want to encapsulate this so we can switch between a set or list depending on Presentation Qos.
typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_VECTOR | ( | RepoId | ) |
EntityImpl * OpenDDS::DCPS::SubscriberImpl::parent | ( | ) | const [virtual] |
Reimplemented from OpenDDS::DCPS::EntityImpl.
Definition at line 952 of file SubscriberImpl.cpp.
References participant_.
00953 { 00954 return this->participant_; 00955 }
unsigned int & OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size | ( | ) |
Configure the size of the raw data collection buffer.
Definition at line 858 of file SubscriberImpl.cpp.
References raw_latency_buffer_size_.
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00859 { 00860 return this->raw_latency_buffer_size_; 00861 }
DataCollector< double >::OnFull & OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type | ( | ) |
Configure the type of the raw data collection buffer.
Definition at line 864 of file SubscriberImpl.cpp.
References raw_latency_buffer_type_.
Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().
00865 { 00866 return this->raw_latency_buffer_type_; 00867 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::reader_enabled | ( | const char * | topic_name, | |
DataReaderImpl * | reader | |||
) |
Definition at line 802 of file SubscriberImpl.cpp.
References datareader_map_, OpenDDS::DCPS::DCPS_debug_level, monitor_, OpenDDS::DCPS::Monitor::report(), and DDS::RETCODE_OK.
Referenced by OpenDDS::DCPS::DataReaderImpl::enable().
00804 { 00805 if (DCPS_debug_level >= 4) { 00806 ACE_DEBUG((LM_DEBUG, 00807 ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ") 00808 ACE_TEXT("datareader(topic_name=%C) enabled\n"), 00809 topic_name)); 00810 } 00811 00812 this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader)); 00813 00814 // Increase the ref count when the servant is referenced 00815 // by the datareader map. 00816 reader->_add_ref(); 00817 00818 if (this->monitor_) { 00819 this->monitor_->report(); 00820 } 00821 00822 return DDS::RETCODE_OK; 00823 }
void OpenDDS::DCPS::SubscriberImpl::remove_from_datareader_set | ( | DataReaderImpl * | reader | ) |
Definition at line 836 of file SubscriberImpl.cpp.
References datareader_set_, and si_lock_.
00837 { 00838 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, si_lock_); 00839 datareader_set_.erase(reader); 00840 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_default_datareader_qos | ( | const DDS::DataReaderQos & | qos | ) | [virtual] |
Definition at line 721 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::consistent(), default_datareader_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().
00723 { 00724 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00725 default_datareader_qos_ = qos; 00726 return DDS::RETCODE_OK; 00727 00728 } else { 00729 return DDS::RETCODE_INCONSISTENT_POLICY; 00730 } 00731 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_listener | ( | DDS::SubscriberListener_ptr | a_listener, | |
DDS::StatusMask | mask | |||
) | [virtual] |
Definition at line 621 of file SubscriberImpl.cpp.
References listener_, listener_mask_, and DDS::RETCODE_OK.
00624 { 00625 listener_mask_ = mask; 00626 //note: OK to duplicate a nil object ref 00627 listener_ = DDS::SubscriberListener::_duplicate(a_listener); 00628 return DDS::RETCODE_OK; 00629 }
DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_qos | ( | const DDS::SubscriberQos & | qos | ) | [virtual] |
Definition at line 537 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), datareader_map_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_id(), OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_STRING, participant_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().
00539 { 00540 00541 OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, DDS::RETCODE_UNSUPPORTED); 00542 00543 if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) { 00544 if (qos_ == qos) 00545 return DDS::RETCODE_OK; 00546 00547 // for the not changeable qos, it can be changed before enable 00548 if (!Qos_Helper::changeable(qos_, qos) && enabled_ == true) { 00549 return DDS::RETCODE_IMMUTABLE_POLICY; 00550 00551 } else { 00552 qos_ = qos; 00553 00554 DrIdToQosMap idToQosMap; 00555 { 00556 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00557 guard, 00558 this->si_lock_, 00559 DDS::RETCODE_ERROR); 00560 // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx 00561 DataReaderMap::iterator endIter = datareader_map_.end(); 00562 00563 for (DataReaderMap::iterator iter = datareader_map_.begin(); 00564 iter != endIter; ++iter) { 00565 DataReaderImpl* reader = iter->second; 00566 reader->set_subscriber_qos (qos); 00567 DDS::DataReaderQos qos; 00568 reader->get_qos(qos); 00569 RepoId id = reader->get_subscription_id(); 00570 std::pair<DrIdToQosMap::iterator, bool> pair 00571 = idToQosMap.insert(DrIdToQosMap::value_type(id, qos)); 00572 00573 if (pair.second == false) { 00574 GuidConverter converter(id); 00575 ACE_ERROR_RETURN((LM_ERROR, 00576 ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ") 00577 ACE_TEXT("insert %C to DrIdToQosMap failed.\n"), 00578 OPENDDS_STRING(converter).c_str()),::DDS::RETCODE_ERROR); 00579 } 00580 } 00581 } 00582 00583 DrIdToQosMap::iterator iter = idToQosMap.begin(); 00584 00585 while (iter != idToQosMap.end()) { 00586 Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_); 00587 const bool status 00588 = disco->update_subscription_qos(this->domain_id_, 00589 participant_->get_id(), 00590 iter->first, 00591 iter->second, 00592 this->qos_); 00593 00594 if (!status) { 00595 ACE_ERROR_RETURN((LM_ERROR, 00596 ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ") 00597 ACE_TEXT("failed. \n")), 00598 DDS::RETCODE_ERROR); 00599 } 00600 00601 ++iter; 00602 } 00603 } 00604 00605 return DDS::RETCODE_OK; 00606 00607 } else { 00608 return DDS::RETCODE_INCONSISTENT_POLICY; 00609 } 00610 }
void OpenDDS::DCPS::SubscriberImpl::update_ownership_strength | ( | const PublicationId & | pub_id, | |
const CORBA::Long & | ownership_strength | |||
) |
Definition at line 887 of file SubscriberImpl.cpp.
References datareader_map_.
00889 { 00890 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, 00891 guard, 00892 this->si_lock_, 00893 ); 00894 00895 for (DataReaderMap::iterator iter = datareader_map_.begin(); 00896 iter != datareader_map_.end(); 00897 ++iter) { 00898 if (!iter->second->is_bit()) { 00899 iter->second->update_ownership_strength(pub_id, ownership_strength); 00900 } 00901 } 00902 }
bool OpenDDS::DCPS::SubscriberImpl::validate_datareader_qos | ( | const DDS::DataReaderQos & | qos, | |
const DDS::DataReaderQos & | default_qos, | |||
DDS::Topic_ptr | a_topic, | |||
DDS::DataReaderQos & | result_qos, | |||
bool | mt | |||
) | [static] |
Definition at line 958 of file SubscriberImpl.cpp.
References OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DATAREADER_QOS_DEFAULT, DATAREADER_QOS_USE_TOPIC_QOS, OpenDDS::DCPS::DCPS_debug_level, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().
Referenced by create_datareader(), and OpenDDS::DCPS::DomainParticipantImpl::create_recorder().
00963 { 00964 00965 00966 if (qos == DATAREADER_QOS_DEFAULT) { 00967 dr_qos = default_qos; 00968 00969 } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) { 00970 00971 #ifndef OPENDDS_NO_MULTI_TOPIC 00972 if (mt) { 00973 if (DCPS_debug_level) { 00974 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00975 ACE_TEXT("SubscriberImpl::create_datareader, ") 00976 ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ") 00977 ACE_TEXT("to create a MultiTopic DataReader.\n"))); 00978 } 00979 return DDS::DataReader::_nil(); 00980 } 00981 #else 00982 ACE_UNUSED_ARG(mt); 00983 #endif 00984 DDS::TopicQos topic_qos; 00985 a_topic->get_qos(topic_qos); 00986 00987 dr_qos = default_qos; 00988 00989 Qos_Helper::copy_from_topic_qos(dr_qos, 00990 topic_qos); 00991 00992 } else { 00993 dr_qos = qos; 00994 } 00995 00996 OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dr_qos, false); 00997 OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dr_qos, false); 00998 OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(dr_qos, false); 00999 01000 if (!Qos_Helper::valid(dr_qos)) { 01001 ACE_ERROR((LM_ERROR, 01002 ACE_TEXT("(%P|%t) ERROR: ") 01003 ACE_TEXT("SubscriberImpl::create_datareader, ") 01004 ACE_TEXT("invalid qos.\n"))); 01005 return false; 01006 } 01007 01008 if (!Qos_Helper::consistent(dr_qos)) { 01009 ACE_ERROR((LM_ERROR, 01010 ACE_TEXT("(%P|%t) ERROR: ") 01011 ACE_TEXT("SubscriberImpl::create_datareader, ") 01012 ACE_TEXT("inconsistent qos.\n"))); 01013 return false; 01014 } 01015 01016 01017 return true; 01018 }
int OpenDDS::DCPS::SubscriberImpl::access_depth_ [private] |
DataReaderMap OpenDDS::DCPS::SubscriberImpl::datareader_map_ [private] |
Definition at line 183 of file SubscriberImpl.h.
Referenced by delete_contained_entities(), get_subscription_ids(), is_clean(), lookup_datareader(), notify_datareaders(), reader_enabled(), set_qos(), and update_ownership_strength().
DataReaderSet OpenDDS::DCPS::SubscriberImpl::datareader_set_ [private] |
Definition at line 184 of file SubscriberImpl.h.
Referenced by begin_access(), coherent_change_received(), contains_reader(), data_received(), end_access(), get_datareaders(), and remove_from_datareader_set().
Definition at line 178 of file SubscriberImpl.h.
Referenced by create_datareader(), get_default_datareader_qos(), and set_default_datareader_qos().
Definition at line 193 of file SubscriberImpl.h.
DDS::SubscriberListener_var OpenDDS::DCPS::SubscriberImpl::listener_ [private] |
Definition at line 181 of file SubscriberImpl.h.
Referenced by get_listener(), set_listener(), and SubscriberImpl().
Monitor* OpenDDS::DCPS::SubscriberImpl::monitor_ [private] |
Monitor object for this entity.
Definition at line 205 of file SubscriberImpl.h.
Referenced by enable(), reader_enabled(), and SubscriberImpl().
Definition at line 190 of file SubscriberImpl.h.
Referenced by create_datareader(), get_participant(), parent(), and set_qos().
DDS::DomainParticipant_var OpenDDS::DCPS::SubscriberImpl::participant_objref_ [private] |
Definition at line 191 of file SubscriberImpl.h.
Definition at line 177 of file SubscriberImpl.h.
Referenced by begin_access(), create_datareader(), end_access(), get_qos(), and set_qos().
unsigned int OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size_ [private] |
Bound (or initial reservation) of raw latency buffers.
Definition at line 196 of file SubscriberImpl.h.
Referenced by create_datareader(), and raw_latency_buffer_size().
DataCollector<double>::OnFull OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type_ [private] |
Type of raw latency data buffers.
Definition at line 199 of file SubscriberImpl.h.
Referenced by create_datareader(), and raw_latency_buffer_type().
ACE_Recursive_Thread_Mutex OpenDDS::DCPS::SubscriberImpl::si_lock_ [private] |
this lock protects the data structures in this class.
Definition at line 202 of file SubscriberImpl.h.
Referenced by remove_from_datareader_set().