32 #include <dds/DdsDcpsTypeSupportExtC.h> 43 DDS::SubscriberListener_ptr a_listener,
50 participant_(*participant),
51 domain_id_(participant->get_domain_id()),
52 raw_latency_buffer_size_(0),
57 listener_ = DDS::SubscriberListener::_duplicate(a_listener);
66 participant->return_handle(
handle_);
75 "%C still exist\n", leftover_entities.c_str()));
96 if (a_handle == it->second->get_instance_handle()) {
106 DDS::TopicDescription_ptr a_topic_desc,
108 DDS::DataReaderListener_ptr a_listener,
115 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
118 return DDS::DataReader::_nil();
124 return DDS::DataReader::_nil();
128 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 131 #ifndef OPENDDS_NO_MULTI_TOPIC 137 if (!topic_servant) {
138 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 141 DDS::Topic_var related;
143 topic_servant =
dynamic_cast<TopicImpl*
>(related.in());
148 #ifndef OPENDDS_NO_MULTI_TOPIC 155 return DDS::DataReader::_nil();
157 #ifndef OPENDDS_NO_MULTI_TOPIC 160 DDS::DataReader_var dr =
164 mtdr->
init(dr_qos, a_listener, mask,
this, mt);
170 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
171 ACE_TEXT(
"enable of MultiTopicDataReader failed.\n")));
173 return DDS::DataReader::_nil();
178 }
catch (
const std::exception& e) {
182 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
183 ACE_TEXT(
"creation of MultiTopicDataReader failed: %C.\n"),
187 return DDS::DataReader::_nil();
191 OpenDDS::DCPS::TypeSupport_ptr typesupport =
194 if (0 == typesupport) {
199 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
200 ACE_TEXT(
"typesupport(topic_name=%C) is nil.\n"),
203 return DDS::DataReader::_nil();
206 DDS::DataReader_var dr_obj = typesupport->create_datareader();
211 if (dr_servant == 0) {
215 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
218 return DDS::DataReader::_nil();
221 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 234 dr_servant->
init(topic_servant,
248 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
251 return DDS::DataReader::_nil();
260 return DDS::DataReader::_duplicate(dr_obj.in());
271 const char* reason =
" (ERROR: unknown reason)";
274 if (dr_subscriber.
get() !=
this) {
275 reason =
"doesn't belong to this subscriber.";
277 }
else if (dr_servant->has_zero_copies()) {
278 reason =
"has outstanding zero-copy samples loaned out.";
280 }
else if (!dr_servant->read_conditions_.empty()) {
281 reason =
"has read conditions attached.";
286 DDS::TopicDescription_var topic = a_datareader->get_topicdescription();
289 "on reader %C (topic \"%C\") will return \"%C\" because it %C\n",
290 LogGuid(dr_servant->get_id()).c_str(), topic_name.
in(),
297 dr_servant->prepare_to_delete();
306 DataReaderMap::iterator it;
308 if (it->second == dr_servant) {
314 DDS::TopicDescription_var td = a_datareader->get_topicdescription();
316 #ifndef OPENDDS_NO_MULTI_TOPIC 319 DDS::DataReader_ptr ptr = mt_iter->second;
325 ACE_TEXT(
"SubscriberImpl::delete_datareader: ")
326 ACE_TEXT(
"datareader(topic_name=%C)")
327 ACE_TEXT(
"failed to obtain MultiTopicDataReaderBase.\n"),
341 ACE_TEXT(
"SubscriberImpl::delete_datareader: ")
342 ACE_TEXT(
"datareader(topic_name=%C)")
343 ACE_TEXT(
"for unknown repo id not found.\n"),
349 GUID_t id = dr_servant->get_guid();
352 ACE_TEXT(
"SubscriberImpl::delete_datareader: ")
353 ACE_TEXT(
"datareader(topic_name=%C) %C not found.\n"),
373 const GUID_t subscription_id = dr_servant->get_guid();
375 if (!disco->remove_subscription(this->domain_id_,
381 ACE_TEXT(
"SubscriberImpl::delete_datareader: ")
382 ACE_TEXT(
" could not remove subscription from discovery.\n")));
389 dr_servant->remove_all_associations();
390 dr_servant->cleanup();
402 #ifndef OPENDDS_NO_MULTI_TOPIC 410 drs.push_back(mt_iter->second);
414 for (
size_t i = 0; i < drs.size(); ++i) {
423 ACE_TEXT(
"SubscriberImpl::delete_contained_entities, ")
424 ACE_TEXT(
"failed to delete datareader\n")));
437 DataReaderMap::iterator it;
441 drs.push_back(it->second.in());
445 for (
size_t i = 0; i < drs.size(); ++i) {
454 ACE_TEXT(
"SubscriberImpl::delete_contained_entities, ")
455 ACE_TEXT(
"failed to delete datareader\n")));
469 const char * topic_name)
474 DDS::DataReader::_nil());
481 #ifndef OPENDDS_NO_MULTI_TOPIC 484 return DDS::DataReader::_duplicate(mt_iter->second);
491 ACE_TEXT(
"SubscriberImpl::lookup_datareader, ")
492 ACE_TEXT(
"The datareader(topic_name=%C) is not found\n"),
496 return DDS::DataReader::_nil();
499 return DDS::DataReader::_duplicate(it->second.in());
510 DataReaderSet localreaders;
519 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 530 for (DataReaderSet::const_iterator pos = localreaders.begin();
531 pos != localreaders.end(); ++pos) {
532 (*pos)->get_ordered_data(data, sample_states, view_states, instance_states);
545 for (DataReaderSet::const_iterator pos = localreaders.begin();
546 pos != localreaders.end(); ++pos) {
547 if ((*pos)->have_sample_states(sample_states) &&
548 (*pos)->have_view_states(view_states) &&
549 (*pos)->have_instance_states(instance_states)) {
550 push_back(readers, DDS::DataReader::_duplicate(pos->in()));
560 DataReaderMap localreadermap;
568 for (DataReaderMap::iterator it = localreadermap.begin(); it != localreadermap.end(); ++it) {
570 DDS::DataReaderListener_var listener = it->second->get_listener();
571 if (!it->second->is_bit()) {
574 listener->on_data_available(it->second.in());
577 TheServiceParticipant->job_queue()->enqueue(make_rch<DataReaderImpl::OnDataAvailable>(listener, it->second, listener,
true,
false));
582 #ifndef OPENDDS_NO_MULTI_TOPIC 583 MultitopicReaderMap localmtr;
592 for (MultitopicReaderMap::iterator it = localmtr.begin();
593 it != localmtr.end(); ++it) {
600 ACE_TEXT(
"(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ")
601 ACE_TEXT(
"failed to obtain MultiTopicDataReaderBase.\n")));
607 DDS::DataReaderListener_var listener = dri->
get_listener();
610 listener->on_data_available(dri);
637 DrIdToQosMap idToQosMap;
647 iter != endIter; ++iter) {
649 reader->set_subscriber_qos (qos);
651 GUID_t id = reader->get_guid();
652 std::pair<DrIdToQosMap::iterator, bool> pair
653 = idToQosMap.insert(DrIdToQosMap::value_type(
id, qos));
658 ACE_TEXT(
"(%P|%t) ERROR: SubscriberImpl::set_qos: ")
659 ACE_TEXT(
"insert %C to DrIdToQosMap failed.\n"),
667 DrIdToQosMap::iterator iter = idToQosMap.begin();
669 while (iter != idToQosMap.end()) {
672 = disco->update_subscription_qos(this->
domain_id_,
681 ACE_TEXT(
"(%P|%t) SubscriberImpl::set_qos, ")
708 DDS::SubscriberListener_ptr a_listener,
714 listener_ = DDS::SubscriberListener::_duplicate(a_listener);
718 DDS::SubscriberListener_ptr
722 return DDS::SubscriberListener::_duplicate(
listener_.in());
725 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 730 DataReaderSet to_call;
739 ACE_TEXT(
"(%P|%t) ERROR: SubscriberImpl::begin_access:")
740 ACE_TEXT(
" Subscriber is not enabled!\n")));
761 for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
762 (*it)->begin_access();
770 DataReaderSet to_call;
779 ACE_TEXT(
"(%P|%t) ERROR: SubscriberImpl::end_access:")
780 ACE_TEXT(
" Publisher is not enabled!\n")));
792 ACE_TEXT(
"(%P|%t) ERROR: SubscriberImpl::end_access:")
793 ACE_TEXT(
" No matching call to begin_coherent_changes!\n")));
810 for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
816 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE 818 DDS::DomainParticipant_ptr
872 if (!participant || !participant->is_enabled()) {
876 dp_id_ = participant->get_id();
885 DataReaderSet readers;
890 for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) {
900 if (leftover_entities) {
901 leftover_entities->clear();
909 if (leftover_entities && reader_count) {
910 *leftover_entities +=
to_dds_string(reader_count) +
" reader(s)";
913 return reader_count == 0;
931 ACE_TEXT(
"(%P|%t) SubscriberImpl::reader_enabled, ")
932 ACE_TEXT(
"datareader(topic_name=%C) enabled\n"),
940 this->
datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
949 #ifndef OPENDDS_NO_MULTI_TOPIC 953 DDS::TopicDescription_var td = reader->get_topicdescription();
967 DDS::SubscriberListener_ptr
980 return participant->listener_for(kind);
983 return DDS::SubscriberListener::_duplicate(
listener_.in());
1011 subs.push_back(iter->second->get_guid());
1015 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 1028 if (!iter->second->is_bit()) {
1029 iter->second->update_ownership_strength(pub_id, ownership_strength);
1036 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE 1042 DataReaderSet localdrs;
1052 for (DataReaderSet::const_iterator iter = localdrs.begin();
1053 iter != localdrs.end(); ++iter) {
1056 (*iter)->coherent_change_received (publisher_id, state);
1067 for (DataReaderSet::const_iterator iter = localdrs.begin();
1068 iter != localdrs.end(); ++iter) {
1070 (*iter)->accept_coherent (writerId, publisher_id);
1073 (*iter)->reject_coherent (writerId, publisher_id);
1078 for (DataReaderSet::const_iterator iter = localdrs.begin();
1079 iter != localdrs.end(); ++iter) {
1080 (*iter)->coherent_changes_completed (reader);
1081 (*iter)->reset_coherent_info (writerId, publisher_id);
1096 DDS::Topic_ptr a_topic,
1103 dr_qos = default_qos;
1107 #ifndef OPENDDS_NO_MULTI_TOPIC 1111 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
1112 ACE_TEXT(
"DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
1113 ACE_TEXT(
"to create a MultiTopic DataReader.\n")));
1121 a_topic->get_qos(topic_qos);
1123 dr_qos = default_qos;
1140 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
1150 ACE_TEXT(
"SubscriberImpl::create_datareader, ")
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
RcHandle< T > rchandle_from(T *pointer)
DataCollector< double >::OnFull & raw_latency_buffer_type()
Configure the type of the raw data collection buffer.
void set_status_changed_flag(DDS::StatusKind status, bool flag)
DDS::ReturnCode_t multitopic_reader_enabled(DDS::DataReader_ptr reader)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
unsigned int & raw_latency_buffer_size()
Configure the size of the raw data collection buffer.
void get_datareaders(DDS::DataReaderSeq &readers)
DataReaderSet datareader_set_
DDS::ReturnCode_t set_enabled()
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
virtual DDS::ReturnCode_t delete_datareader(DDS::DataReader_ptr a_datareader)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
bool have_sample_states(DDS::SampleStateMask sample_states) const
virtual DDS::ReturnCode_t get_datareaders(DDS::DataReaderSeq &readers, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
SubscriberImpl(DDS::InstanceHandle_t handle, const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
DDS::DataReaderListener_ptr get_listener()
virtual DDS::ReturnCode_t begin_access()
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
const GUID_t GUID_UNKNOWN
Nil value for GUID.
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
String to_dds_string(unsigned short to_convert)
virtual DDS::ReturnCode_t enable()
virtual DDS::InstanceHandle_t get_instance_handle()
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
PresentationQosPolicyAccessScopeKind access_scope
void set_deleted(bool state)
virtual DDS::ReturnCode_t notify_datareaders()
const char * c_str() const
virtual DDS::ReturnCode_t set_qos(const DDS::SubscriberQos &qos)
void update_ownership_strength(const GUID_t &pub_id, const CORBA::Long &ownership_strength)
DDS::ReturnCode_t reader_enabled(const char *topic_name, DataReaderImpl *reader)
DDS::StatusMask listener_mask_
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffers.
DDS::DataReaderQos default_datareader_qos_
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
unsigned long InstanceStateMask
EntityFactoryQosPolicy entity_factory
void enable_filtering(ContentFilteredTopicImpl *cft)
DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind)
virtual DDS::ReturnCode_t copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
OpenDDS::DCPS::TypeSupport_ptr get_type_support()
typedef OPENDDS_VECTOR(GUID_t) SubscriptionIdVec
virtual DDS::ReturnCode_t get_default_datareader_qos(DDS::DataReaderQos &qos)
#define DATAREADER_QOS_DEFAULT
virtual DDS::ReturnCode_t delete_contained_entities()
void init(const DDS::DataReaderQos &dr_qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask, SubscriberImpl *parent, MultiTopicImpl *multitopic)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Implements the DDS::Topic interface.
#define DATAREADER_QOS_USE_TOPIC_QOS
virtual DDS::ReturnCode_t set_listener(DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::DataReader_ptr lookup_datareader(const char *topic_name)
void get_subscription_ids(SubscriptionIdVec &subs)
DataCollector< double >::OnFull & raw_latency_buffer_type()
Configure the type of the raw data collection buffer.
#define TheTransientKludge
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const StatusKind DATA_AVAILABLE_STATUS
Implements the DDS::DataReader interface.
virtual RcHandle< EntityImpl > parent() const
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffers.
ACE_Recursive_Thread_Mutex si_lock_
DDS::SubscriberListener_var listener_
DDS::Topic_ptr get_related_topic()
DDS::InstanceHandle_t handle_
static bool copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
unsigned long SampleStateMask
virtual DDS::ReturnCode_t get_qos(DDS::SubscriberQos &qos)
HANDLE_TYPE_NATIVE InstanceHandle_t
DataReaderMap datareader_map_
const ReturnCode_t RETCODE_NOT_ENABLED
unsigned int & raw_latency_buffer_size()
Configure the size of the raw data collection buffer.
virtual DDS::ReturnCode_t end_access()
AtomicBool enabled_
The flag indicates the entity is enabled.
DataReaderSet readers_not_enabled_
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
virtual DDS::ReturnCode_t set_default_datareader_qos(const DDS::DataReaderQos &qos)
WeakRcHandle< DomainParticipantImpl > participant_
OpenDDS_Dcps_Export LogLevel log_level
virtual DDS::DomainParticipant_ptr get_participant()
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual DDS::DataReader_ptr create_datareader(DDS::TopicDescription_ptr a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
boolean autoenable_created_entities
const char * retcode_to_string(DDS::ReturnCode_t value)
const ReturnCode_t RETCODE_ERROR
virtual DDS::SubscriberListener_ptr get_listener()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual DDS::ReturnCode_t enable()
const size_t NUMBER_OF_BUILT_IN_TOPICS
const SampleStateKind NOT_READ_SAMPLE_STATE
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
static bool validate_datareader_qos(const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt)
void data_received(DataReaderImpl *reader)
DDS::DomainId_t domain_id_
const character_type * in(void) const
bool contains_reader(DDS::InstanceHandle_t a_handle)
sequence< DataReader > DataReaderSeq
static bool valid(const DDS::UserDataQosPolicy &qos)
#define TheServiceParticipant
virtual ~SubscriberImpl()
void remove_from_datareader_set(DataReaderImpl *reader)
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
void coherent_change_received(const GUID_t &publisher_id, DataReaderImpl *reader, Coherent_State &group_state)
The Internal API and Implementation of OpenDDS.
unsigned long ViewStateMask
bool is_clean(String *leftover_entities=0) const
PresentationQosPolicy presentation
ACE_Recursive_Thread_Mutex dr_set_lock_
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
ACE_Thread_Mutex listener_mutex_
MultitopicReaderMap multitopic_reader_map_