32 #ifdef OPENDDS_SECURITY 39 #include <dds/DdsDcpsGuidC.h> 40 #ifndef DDS_HAS_MINIMUM_BIT 41 # include <dds/DdsDcpsCoreTypeSupportImpl.h> 49 template <
typename Key>
51 OpenDDS::DCPS::DomainParticipantImpl::TopicMap& c,
53 OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type*&
value)
55 OpenDDS::DCPS::DomainParticipantImpl::TopicMap::iterator iter =
58 if (iter == c.end()) {
62 value = &iter->second;
69 result.length(properties.length());
70 unsigned int count = 0;
71 for (
unsigned int i = 0; i < properties.length(); ++i) {
72 if (std::string(properties[i].
name.in()).
find(prefix) == 0) {
73 result[count++] = properties[i];
92 DomainParticipantImpl::DomainParticipantImpl(
96 DDS::DomainParticipantListener_ptr a_listener,
102 #ifdef OPENDDS_SECURITY
107 , domain_id_(domain_id)
110 , handle_waiters_(handle_protector_)
111 , shutdown_condition_(shutdown_mutex_)
112 , shutdown_complete_(false)
113 , participant_handles_(handle_generator)
114 , pub_id_gen_(dp_id_)
119 automatic_liveliness_timer_,
125 participant_liveliness_timer_,
135 #ifdef OPENDDS_SECURITY 139 if (!access->return_permissions_handle(
perm_handle_, se)) {
142 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::~DomainParticipantImpl: ")
143 ACE_TEXT(
"Unable to return permissions handle. SecurityException[%d.%d]: %C\n"),
155 DDS::PublisherListener_ptr a_listener,
161 return DDS::Publisher::_nil();
176 DDS::Publisher::_nil());
182 DDS::Publisher_ptr pub_obj(pub);
190 DDS::Publisher::_nil());
195 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::create_publisher, ")
199 return DDS::Publisher::_nil();
202 return DDS::Publisher::_duplicate(pub_obj);
207 DDS::Publisher_ptr p)
216 "Failed to obtain PublisherImpl\n"));
229 "This publisher doesn't belong to this participant\n"));
236 if (!the_servant->
is_clean(&leftover_entities)) {
239 "The publisher is not empty. %C leftover\n",
240 leftover_entities.c_str()));
260 "publisher not found\n"));
272 DDS::SubscriberListener_ptr a_listener,
278 return DDS::Subscriber::_nil();
290 DDS::Subscriber::_nil());
296 DDS::Subscriber_ptr sub_obj(sub);
303 DDS::Subscriber::_nil());
308 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::create_subscriber, ")
312 return DDS::Subscriber::_nil();
315 return DDS::Subscriber::_duplicate(sub_obj);
320 DDS::Subscriber_ptr s)
329 "Failed to obtain SubscriberImpl\n"));
342 "This subscriber doesn't belong to this participant\n"));
349 if (!the_servant->
is_clean(&leftover_entities)) {
352 "The subscriber is not empty. %C leftover\n",
353 leftover_entities.c_str()));
373 "subscriber not found\n"));
396 const char * topic_name,
397 const char * type_name,
399 DDS::TopicListener_ptr a_listener,
412 const char * topic_name,
413 const char * type_name,
416 DDS::TopicListener_ptr a_listener,
432 const char * topic_name,
433 const char * type_name,
435 DDS::TopicListener_ptr a_listener,
457 ACE_TEXT(
"DomainParticipantImpl::create_topic, ")
460 return DDS::Topic::_nil();
467 ACE_TEXT(
"DomainParticipantImpl::create_topic, ")
470 return DDS::Topic::_nil();
474 TopicMap::mapped_type* entry = 0;
482 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC) 486 ACE_TEXT(
"DomainParticipantImpl::create_topic, ")
487 ACE_TEXT(
"can't create a Topic due to name \"%C\" already in use ")
488 ACE_TEXT(
"by a TopicDescription.\n"), topic_name));
507 entry->pair_.svt_->get_qos(found_qos);
509 if (topic_qos == found_qos) {
515 ++entry->client_refs_;
517 return DDS::Topic::_duplicate(entry->pair_.obj_.in());
522 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::create_topic: ")
523 ACE_TEXT(
"topic with name \"%C\" and type %C already exists, ")
524 ACE_TEXT(
"but the QoS doesn't match.\n"),
525 topic_name, type_name));
528 return DDS::Topic::_nil();
534 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::create_topic: ")
535 ACE_TEXT(
"topic with name \"%C\" already exists, but its type, %C ")
536 ACE_TEXT(
"is not the same as %C.\n"),
537 topic_name, found_type.
in(), type_name));
540 return DDS::Topic::_nil();
545 OpenDDS::DCPS::TypeSupport_var type_support;
547 if (0 == topic_mask) {
553 ACE_TEXT(
"DomainParticipantImpl::create_topic, ")
554 ACE_TEXT(
"can't create a topic=%C type_name=%C ")
556 topic_name, type_name));
558 return DDS::Topic::_nil();
573 ACE_TEXT(
"DomainParticipantImpl::create_topic, ")
574 ACE_TEXT(
"create_new_topic failed.\n")));
576 return DDS::Topic::_nil();
584 ACE_TEXT(
"DomainParticipantImpl::create_topic, ")
587 return DDS::Topic::_nil();
590 return new_topic._retn();
596 DDS::Topic_ptr a_topic)
602 DDS::Topic_ptr a_topic,
613 if (!the_topic_servant) {
616 "failed to obtain TopicImpl."));
626 if (the_dp_servant !=
this) {
629 "will return PRECONDITION_NOT_MET because this is not the " 630 "participant that owns this topic\n"));
639 "will return PRECONDITION_NOT_MET because there are still " 640 "outstanding references to this topic\n"));
652 TopicMap::mapped_type* entry = 0;
656 for (iter = iters.first; iter != iters.second; ++iter) {
657 if (iter->second.pair_.svt_ == the_topic_servant) {
658 entry = &iter->second;
664 ACE_ERROR((
LM_NOTICE,
"(%P|%t) NOTICE: DomainParticipantImpl::delete_topic_i: not found\n"));
671 if (remove_objref || 0 == client_refs) {
692 "Didn't remove topic from the map, remove_objref %d client_refs %d\n",
693 remove_objref, client_refs));
701 " Caught Unknown Exception\n"));
711 const char* topic_name,
716 bool first_time =
true;
724 DDS::TopicQos_var qos;
735 if (status ==
FOUND) {
736 OpenDDS::DCPS::TypeSupport_var type_support =
741 ACE_TEXT(
"DomainParticipantImpl::find_topic, ")
742 ACE_TEXT(
"can't create a Topic: type_name \"%C\" ")
743 ACE_TEXT(
"is not registered.\n"), type_name.
in()));
746 return DDS::Topic::_nil();
752 DDS::TopicListener::_nil(),
760 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::find_topic - ")
761 ACE_TEXT(
"topic not found, discovery returned INTERNAL_ERROR!\n")));
763 return DDS::Topic::_nil();
764 }
else if (now < timeout_at) {
779 ACE_TEXT(
"(%P|%t) DomainParticipantImpl::find_topic, ")
783 return DDS::Topic::_nil();
786 DDS::TopicDescription_ptr
794 TopicMap::mapped_type* entry = 0;
797 #if !defined(OPENDDS_NO_CONTENT_FILTERED_TOPIC) || !defined(OPENDDS_NO_MULTI_TOPIC) 798 TopicDescriptionMap::iterator iter =
topic_descrs_.find(name);
800 return DDS::TopicDescription::_duplicate(iter->second);
803 return DDS::TopicDescription::_nil();
806 return DDS::TopicDescription::_duplicate(entry->pair_.obj_.in());
810 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 812 DDS::ContentFilteredTopic_ptr
815 DDS::Topic_ptr related_topic,
816 const char* filter_expression,
822 ACE_TEXT(
"DomainParticipantImpl::create_contentfilteredtopic, ")
823 ACE_TEXT(
"can't create a content-filtered topic due to null related ")
834 ACE_TEXT(
"DomainParticipantImpl::create_contentfilteredtopic, ")
835 ACE_TEXT(
"can't create a content-filtered topic due to name \"%C\" ")
836 ACE_TEXT(
"already in use by a Topic.\n"), name));
844 ACE_TEXT(
"DomainParticipantImpl::create_contentfilteredtopic, ")
845 ACE_TEXT(
"can't create a content-filtered topic due to name \"%C\" ")
846 ACE_TEXT(
"already in use by a TopicDescription.\n"), name));
851 DDS::ContentFilteredTopic_var cft;
856 if (cft->set_expression_parameters(expression_parameters) !=
DDS::RETCODE_OK) {
859 }
catch (
const std::exception& e) {
862 ACE_TEXT(
"DomainParticipantImpl::create_contentfilteredtopic, ")
863 ACE_TEXT(
"can't create a content-filtered topic due to runtime error: ")
868 DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(cft);
874 DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
878 DDS::ContentFilteredTopic_var cft =
879 DDS::ContentFilteredTopic::_duplicate(a_contentfilteredtopic);
885 ACE_TEXT(
"DomainParticipantImpl::delete_contentfilteredtopic, ")
886 ACE_TEXT(
"can't delete a content-filtered topic \"%C\" ")
887 ACE_TEXT(
"because it is not in the set.\n"), name.
in ()));
897 ACE_TEXT(
"DomainParticipantImpl::delete_contentfilteredtopic, ")
898 ACE_TEXT(
"can't delete a content-filtered topic \"%C\" ")
899 ACE_TEXT(
"failed to obtain TopicDescriptionImpl\n"), name.
in()));
907 ACE_TEXT(
"DomainParticipantImpl::delete_contentfilteredtopic, ")
908 ACE_TEXT(
"can't delete a content-filtered topic \"%C\" ")
909 ACE_TEXT(
"because it is used by a datareader\n"), name.
in ()));
917 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC 919 #ifndef OPENDDS_NO_MULTI_TOPIC 922 const char*
name,
const char* type_name,
923 const char* subscription_expression,
931 ACE_TEXT(
"DomainParticipantImpl::create_multitopic, ")
932 ACE_TEXT(
"can't create a multi topic due to name \"%C\" ")
933 ACE_TEXT(
"already in use by a Topic.\n"), name));
941 ACE_TEXT(
"DomainParticipantImpl::create_multitopic, ")
942 ACE_TEXT(
"can't create a multi topic due to name \"%C\" ")
943 ACE_TEXT(
"already in use by a TopicDescription.\n"), name));
948 DDS::MultiTopic_var mt;
951 expression_parameters,
this);
952 }
catch (
const std::exception& e) {
955 ACE_TEXT(
"DomainParticipantImpl::create_multitopic, ")
956 ACE_TEXT(
"can't create a multi topic due to runtime error: ")
961 DDS::TopicDescription_var td = DDS::TopicDescription::_duplicate(mt);
967 DDS::MultiTopic_ptr a_multitopic)
971 DDS::MultiTopic_var mt = DDS::MultiTopic::_duplicate(a_multitopic);
977 ACE_TEXT(
"DomainParticipantImpl::delete_multitopic, ")
978 ACE_TEXT(
"can't delete a multitopic \"%C\" ")
979 ACE_TEXT(
"because it is not in the set.\n"), mt_name.
in ()));
989 ACE_TEXT(
"DomainParticipantImpl::delete_multitopic, ")
990 ACE_TEXT(
"can't delete a multitopic topic \"%C\" ")
991 ACE_TEXT(
"failed to obtain TopicDescriptionImpl.\n"),
1000 ACE_TEXT(
"DomainParticipantImpl::delete_multitopic, ")
1001 ACE_TEXT(
"can't delete a multitopic topic \"%C\" ")
1002 ACE_TEXT(
"because it is used by a datareader.\n"), mt_name.
in ()));
1010 #endif // OPENDDS_NO_MULTI_TOPIC 1012 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE 1023 result = make_rch<FilterEvaluator>(filter,
false);
1024 }
catch (
const std::exception& e) {
1025 filter_cache_.erase(filter);
1028 ACE_TEXT(
"DomainParticipantImpl::get_filter_eval, ")
1029 ACE_TEXT(
"can't create a writer-side content filter due to ")
1030 ACE_TEXT(
"runtime error: %C.\n"), e.what()));
1041 typedef std::map<OPENDDS_STRING, RcHandle<FilterEvaluator> > Map;
1042 Map::iterator iter = filter_cache_.find(filter);
1043 if (iter != filter_cache_.end()) {
1044 if (iter->second->ref_count() == 1) {
1045 filter_cache_.erase(iter);
1072 disc->fini_bit(
this);
1110 for (TopicMap::iterator it(
topics_.begin());
1112 if (a_handle == it->second.pair_.svt_->get_instance_handle())
1125 if (a_handle == it->svt_->get_instance_handle())
1136 for (PublisherSet::iterator it(
publishers_.begin());
1138 if (a_handle == it->svt_->get_instance_handle())
1147 if (it->svt_->contains_reader(a_handle))
1151 for (PublisherSet::iterator it(
publishers_.begin());
1153 if (it->svt_->contains_writer(a_handle))
1177 disco->update_domain_participant_qos(
domain_id_,
1184 ACE_TEXT(
"(%P|%t) DomainParticipantImpl::set_qos, ")
1185 ACE_TEXT(
"failed on compatibility check.\n")));
1208 DDS::DomainParticipantListener_ptr a_listener,
1214 listener_ = DDS::DomainParticipantListener::_duplicate(a_listener);
1218 DDS::DomainParticipantListener_ptr
1222 return DDS::DomainParticipantListener::_duplicate(
listener_.in());
1229 #ifndef DDS_HAS_MINIMUM_BIT 1233 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
1234 ACE_TEXT(
"Entity is not enabled.\n")));
1251 ACE_TEXT(
"(%P|%t) DomainParticipantImpl::ignore_participant: ")
1252 ACE_TEXT(
"%C ignoring handle %x.\n"),
1258 if (!disco->ignore_domain_participant(
domain_id_,
1263 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::ignore_participant, ")
1264 ACE_TEXT(
"Could not ignore domain participant.\n")));
1272 ACE_TEXT(
"(%P|%t) DomainParticipantImpl::ignore_participant: ")
1273 ACE_TEXT(
"%C repo call returned.\n"),
1279 ACE_UNUSED_ARG(handle);
1281 #endif // !defined (DDS_HAS_MINIMUM_BIT) 1288 #ifndef DDS_HAS_MINIMUM_BIT 1292 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
1293 ACE_TEXT(
" Entity is not enabled.\n")));
1299 HandleMap::const_iterator location = this->
ignored_topics_.find(ignoreId);
1310 ACE_TEXT(
"(%P|%t) DomainParticipantImpl::ignore_topic: ")
1311 ACE_TEXT(
"%C ignoring handle %x.\n"),
1322 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::ignore_topic, ")
1323 ACE_TEXT(
" Could not ignore topic.\n")));
1329 ACE_UNUSED_ARG(handle);
1331 #endif // !defined (DDS_HAS_MINIMUM_BIT) 1338 #ifndef DDS_HAS_MINIMUM_BIT 1342 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
1343 ACE_TEXT(
" Entity is not enabled.\n")));
1350 ACE_TEXT(
"(%P|%t) DomainParticipantImpl::ignore_publication: ")
1351 ACE_TEXT(
"%C ignoring handle %x.\n"),
1363 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::ignore_publication, ")
1364 ACE_TEXT(
" could not ignore publication in discovery.\n")));
1371 ACE_UNUSED_ARG(handle);
1373 #endif // !defined (DDS_HAS_MINIMUM_BIT) 1380 #ifndef DDS_HAS_MINIMUM_BIT 1384 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
1385 ACE_TEXT(
" Entity is not enabled.\n")));
1392 ACE_TEXT(
"(%P|%t) DomainParticipantImpl::ignore_subscription: ")
1393 ACE_TEXT(
"%C ignoring handle %d.\n"),
1405 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::ignore_subscription, ")
1406 ACE_TEXT(
" could not ignore subscription in discovery.\n")));
1413 ACE_UNUSED_ARG(handle);
1415 #endif // !defined (DDS_HAS_MINIMUM_BIT) 1440 for (PublisherSet::iterator it(
publishers_.begin());
1442 it->svt_->assert_liveliness_by_participant();
1520 #if !defined (DDS_HAS_MINIMUM_BIT) 1527 const CountedHandleMap::const_iterator itEnd =
handles_.end();
1528 for (CountedHandleMap::const_iterator iter =
handles_.begin(); iter != itEnd; ++iter) {
1537 push_back(participant_handles, iter->second.first);
1552 const CountedHandleMap::const_iterator itEnd =
handles_.end();
1553 for (CountedHandleMap::const_iterator iter =
handles_.begin(); iter != itEnd; ++iter) {
1556 if (participant_handle == iter->second.first
1567 return bit_subscriber_->get_discovered_participant_data(participant_data, participant_handle);
1575 const CountedHandleMap::const_iterator itEnd =
handles_.end();
1576 for (CountedHandleMap::const_iterator iter =
handles_.begin(); iter != itEnd; ++iter) {
1583 push_back(topic_handles, iter->second.first);
1598 const CountedHandleMap::const_iterator itEnd =
handles_.end();
1599 for (CountedHandleMap::const_iterator iter =
handles_.begin(); iter != itEnd; ++iter) {
1601 if (topic_handle == iter->second.first && converter.
isTopic()) {
1611 return bit_subscriber_->get_discovered_topic_data(topic_data, topic_handle);
1629 #ifdef OPENDDS_SECURITY 1644 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1650 #ifdef OPENDDS_SECURITY 1654 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1655 ACE_TEXT(
"DCPSSecurity flag is set, but unable to load security plugin configuration.\n")));
1663 #ifdef OPENDDS_SECURITY 1665 Security::Authentication_var auth =
security_config_->get_authentication();
1675 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1676 ACE_TEXT(
"Unable to validate local identity. SecurityException[%d.%d]: %C\n"),
1689 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1690 ACE_TEXT(
"Unable to validate local permissions. SecurityException[%d.%d]: %C\n"),
1697 if (!check_create) {
1700 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1701 ACE_TEXT(
"Unable to create participant. SecurityException[%d.%d]: %C\n"),
1708 const bool check_part_sec_attr = access->get_participant_sec_attributes(
perm_handle_, part_sec_attr, se);
1710 if (!check_part_sec_attr) {
1713 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable,")
1714 ACE_TEXT(
"Unable to get participant security attributes. SecurityException[%d.%d]: %C\n"),
1724 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1725 ACE_TEXT(
"allow_unauthenticated_participants is not possible with is_rtps_protected\n")));
1730 const Security::CryptoKeyFactory_var crypto =
security_config_->get_crypto_key_factory();
1736 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1737 ACE_TEXT(
"Unable to register local participant. SecurityException[%d.%d]: %C\n"),
1753 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1754 ACE_TEXT(
"add_domain_participant_secure returned invalid id.\n")));
1767 ACE_TEXT(
"(%P|%t) ERROR: DomainParticipantImpl::enable, ")
1768 ACE_TEXT(
"add_domain_participant returned invalid id.\n")));
1773 #ifdef OPENDDS_SECURITY 1792 ACE_TEXT(
"enabled participant %C in domain %d\n"),
1807 for (TopicMap::iterator it =
topics_.begin(); it !=
topics_.end(); ++it) {
1808 it->second.pair_.svt_->enable();
1850 "New unmapped InstanceHandle %d\n", ih));
1855 const CountedHandleMap::iterator location =
handles_.find(
id);
1861 "New mapped InstanceHandle %d for %C\n",
1862 handle,
LogGuid(
id).c_str()));
1864 handles_[id] = std::make_pair(handle, 1);
1874 "Incremented refcount for InstanceHandle %d to %d\n",
1875 mapped.first, mapped.second));
1877 return mapped.first;
1885 CountedHandleMap::const_iterator iter =
handles_.find(
id);
1898 const CountedHandleMap::const_iterator iter =
handles_.find(
id);
1905 const RepoIdMap::iterator r_iter =
repoIds_.find(handle);
1910 "Returned unmapped InstanceHandle %d\n", handle));
1915 const CountedHandleMap::iterator h_iter =
handles_.find(r_iter->second);
1922 "Returned mapped InstanceHandle %d refcount %d\n",
1923 handle, h_iter->second.second));
1927 if (--mapped.second == 0) {
1937 const RepoIdMap::const_iterator location =
repoIds_.find(handle);
1943 const char * topic_name,
1944 const char * type_name,
1946 DDS::TopicListener_ptr a_listener,
1948 OpenDDS::DCPS::TypeSupport_ptr type_support)
1953 DDS::Topic::_nil());
1955 #ifdef OPENDDS_SECURITY 1962 if (!access->get_topic_sec_attributes(
perm_handle_, topic_name, sec_attr, se)) {
1966 ACE_TEXT(
"DomainParticipantImpl::create_new_topic, ")
1967 ACE_TEXT(
"Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
1970 return DDS::Topic::_nil();
1978 ACE_TEXT(
"DomainParticipantImpl::create_new_topic, ")
1979 ACE_TEXT(
"Permissions check failed to create new topic '%C'. SecurityException[%d.%d]: %C\n"),
1982 return DDS::Topic::_nil();
1997 DDS::Topic::_nil());
2005 ACE_TEXT(
"DomainParticipantImpl::create_new_topic, ")
2007 return DDS::Topic::_nil();
2011 DDS::Topic_ptr obj(topic_servant);
2015 topics_.insert(std::make_pair(topic_name, refCounted_topic));
2023 return DDS::Topic::_duplicate(refCounted_topic.
pair_.
obj_.in());
2028 if (leftover_entities) {
2029 leftover_entities->clear();
2033 size_t topic_count = 0;
2034 for (TopicMap::const_iterator it =
topics_.begin(); it !=
topics_.end(); ++it) {
2035 if (!
topicIsBIT(it->second.pair_.svt_->topic_name(), it->second.pair_.svt_->type_name())) {
2040 *leftover_entities +=
to_dds_string(topic_count) +
" topic(s)";
2046 sub_count = sub_count <= 1 ? 0 : sub_count;
2048 if (leftover_entities && sub_count) {
2049 if (leftover_entities->size()) {
2050 *leftover_entities +=
", ";
2052 *leftover_entities +=
to_dds_string(sub_count) +
" subscriber(s)";
2056 if (leftover_entities && pub_count) {
2057 if (leftover_entities->size()) {
2058 *leftover_entities +=
", ";
2060 *leftover_entities +=
to_dds_string(pub_count) +
" publisher(s)";
2063 return topic_count == 0 && sub_count == 0 && pub_count == 0;
2066 DDS::DomainParticipantListener_ptr
2071 return DDS::DomainParticipantListener::_nil ();
2073 return DDS::DomainParticipantListener::_duplicate(
listener_.in());
2084 topics.reserve(
topics_.size());
2085 for (TopicMap::iterator it(
topics_.begin());
2087 topics.push_back(it->second.pair_.svt_->get_id());
2091 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 2096 #if !defined (DDS_HAS_MINIMUM_BIT) 2102 "(%P|%t) WARNING: DomainParticipantImpl::ownership_manager: bit_subscriber_ is null"));
2120 for (SubscriberSet::iterator it(this->
subscribers_.begin());
2122 it->svt_->update_ownership_strength(pub_id, ownership_strength);
2126 #endif // OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 2159 ACE_TEXT(
"DomainParticipantImpl::validate_publisher_qos, ")
2181 ACE_TEXT(
"DomainParticipantImpl::validate_subscriber_qos, ")
2202 ACE_TEXT(
"DomainParticipantImpl::create_recorder, ")
2203 ACE_TEXT(
"topic desc is nil.\n")));
2222 recorder->
init(dynamic_cast<TopicDescriptionImpl*>(a_topic),
2224 mask,
this, sub_qos);
2233 return result.
_retn();
2247 ACE_TEXT(
"DomainParticipantImpl::create_replayer, ")
2248 ACE_TEXT(
"topic desc is nil.\n")));
2269 replayer->
init(a_topic, topic_servant, dw_qos, a_listener, mask,
this, pub_qos);
2278 ACE_TEXT(
"DomainParticipantImpl::create_replayer, ")
2287 return result.
_retn();
2325 , recalculate_interval_(false)
2423 tv = std::min(tv, it->svt_->liveliness_check_interval(kind));
2438 for (PublisherSet::iterator it(
publishers_.begin());
2440 if (it->svt_->participant_liveliness_activity_after(tv)) {
2471 PublisherSet::iterator pubIter =
publishers_.begin();
2472 DDS::Publisher_ptr pubPtr;
2475 while (pubsize > 0) {
2476 pubPtr = (*pubIter).obj_.in();
2502 SubscriberSet::iterator subIter =
subscribers_.begin();
2503 DDS::Subscriber_ptr subPtr;
2506 while (subsize > 0) {
2507 subPtr = (*subIter).obj_.in();
2532 RecorderSet::iterator it =
recorders_.begin();
2536 if (impl) result = impl->
cleanup();
2548 ReplayerSet::iterator it =
replayers_.begin();
2552 if (impl) result = impl->
cleanup();
2567 TopicMap::iterator topicIter =
topics_.begin();
2568 DDS::Topic_ptr topicPtr;
2569 size_t topicsize =
topics_.size();
2571 while (topicsize > 0) {
2572 topicPtr = topicIter->second.pair_.obj_.in();
2598 const PublisherSet::iterator end =
publishers_.end();
2599 for (PublisherSet::iterator i =
publishers_.begin(); i != end; ++i) {
2600 result &= i->svt_->prepare_to_delete_datawriters();
2609 const PublisherSet::iterator end =
publishers_.end();
2610 for (PublisherSet::iterator i =
publishers_.begin(); i != end; ++i) {
2611 result &= i->svt_->set_wait_pending_deadline(deadline);
2616 #ifndef OPENDDS_SAFETY_PROFILE 2623 "Can't get a DynamicType, no type lookup service\n"));
2632 "Can't get a DynamicType, type info is missing complete\n"));
2644 "requesting remote complete TypeObject from %C\n",
LogGuid(entity).c_str()));
2648 disco->request_remote_complete_type_objects(
domain_id_,
dp_id_, entity, ti, cond);
2661 "request_remote_complete_type_objects succeeded, but type lookup service still says it " 2662 "doesn't have the complete TypeObject?\n"));
2668 DDS::DynamicType_var got_type =
type_lookup_service_->type_identifier_to_dynamic(ctid, entity);
2672 "Got an invalid DynamicType\n"));
virtual CORBA::Boolean contains_entity(DDS::InstanceHandle_t a_handle)
virtual DDS::InstanceHandle_t get_instance_handle()
PublisherSet publishers_
Collection of publishers.
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
virtual int handle_exception(ACE_HANDLE fd)
RcHandle< ParticipantLivelinessTimer > participant_liveliness_timer_
DDS::Topic_ptr create_topic_i(const char *topic_name, const char *type_name, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, DDS::StatusMask mask, int topic_mask)
RcHandle< T > rchandle_from(T *pointer)
sequence< InstanceHandle_t > InstanceHandleSeq
DDS::SubscriberQos default_subscriber_qos_
The default subscriber qos.
DDS::Security::ParticipantCryptoHandle part_crypto_handle_
This participant crypto handle given by crypto.
ACE_CDR::ULong typeobject_serialized_size
Send raw data samples in the system.
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual DDS::Topic_ptr find_topic(const char *topic_name, const DDS::Duration_t &timeout)
void delete_replayer(Replayer_ptr replayer)
ACE_Recursive_Thread_Mutex publishers_protector_
Protect the publisher collection.
DDS::Time_t to_dds_time() const
const LogLevel::Value value
virtual DDS::ReturnCode_t get_default_subscriber_qos(DDS::SubscriberQos &qos)
LivelinessTimer(DomainParticipantImpl &impl, DDS::LivelinessQosPolicyKind kind)
DDS::ReturnCode_t set_enabled()
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
virtual DDS::ReturnCode_t assert_liveliness()
const InstanceHandle_t HANDLE_NIL
virtual DDS::ReturnCode_t set_default_subscriber_qos(const DDS::SubscriberQos &qos)
EntityFactoryQosPolicy entity_factory
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
virtual void init(DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, ReplayerListener_rch a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, const DDS::PublisherQos &publisher_qos)
virtual DDS::ReturnCode_t get_discovered_topics(DDS::InstanceHandleSeq &topic_handles)
void add_adjust_liveliness_timers(DataWriterImpl *writer)
DDS::TopicQos default_topic_qos_
The default topic qos.
DDS::PropertySeq filter_properties(const DDS::PropertySeq &properties, const std::string &prefix)
int access(const char *path, int amode)
DDS::InstanceHandle_t next()
void signal_liveliness(DDS::LivelinessQosPolicyKind kind)
void update_ownership_strength(const GUID_t &pub_id, const CORBA::Long &ownership_strength)
ENTITYKIND_BUILTIN_PARTICIPANT.
virtual DDS::ReturnCode_t delete_contained_entities()
static const int TOPIC_TYPELESS
const DDS::DomainId_t domain_id_
The id of the domain that creates this participant.
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
SubscriberSet subscribers_
Collection of subscribers.
DDS::PublisherQos default_publisher_qos_
The default publisher qos.
virtual DDS::ReturnCode_t delete_multitopic(DDS::MultiTopic_ptr a_multitopic)
boolean allow_unauthenticated_participants
bool participant_liveliness_activity_after(const MonotonicTimePoint &tv)
DDS::Security::IdentityHandle id_handle_
This participant id handle given by authentication.
String to_dds_string(unsigned short to_convert)
character_type *& out(void)
virtual void schedule(const TimeDuration &interval)=0
virtual DDS::ReturnCode_t enable()
virtual DDS::ReturnCode_t ignore_publication(DDS::InstanceHandle_t handle)
virtual DDS::ReturnCode_t set_qos(const DDS::DomainParticipantQos &qos)
ACE_Recursive_Thread_Mutex recorders_protector_
Protect the recorders collection.
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
virtual DDS::ReturnCode_t delete_contentfilteredtopic(DDS::ContentFilteredTopic_ptr a_contentfilteredtopic)
virtual ~LivelinessTimer()
virtual DDS::Subscriber_ptr create_subscriber(const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
void set_deleted(bool state)
HandleMap ignored_participants_
Collection of ignored participants.
bool dynamic_type_is_valid(DDS::DynamicType_ptr type)
Implementation of Recorder functionality.
OwnershipManager * ownership_manager()
virtual ~DomainParticipantImpl()
virtual DDS::ReturnCode_t ignore_participant(DDS::InstanceHandle_t handle)
virtual DDS::ReturnCode_t set_default_topic_qos(const DDS::TopicQos &qos)
DDS::StatusMask listener_mask_
virtual DDS::ReturnCode_t get_discovered_participants(DDS::InstanceHandleSeq &participant_handles)
virtual DDS::ReturnCode_t delete_publisher(DDS::Publisher_ptr p)
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
static const int TOPIC_TYPE_HAS_KEYS
Objref_Servant_Pair< TopicImpl, DDS::Topic, DDS::Topic_ptr, DDS::Topic_var > Topic_Pair
const ACE_Time_Value & value() const
void add(T lower, T upper)
Security::SecurityConfig_rch security_config_
reference_wrapper< T > ref(T &r)
bool topicIsBIT(const char *name, const char *type)
void return_handle(DDS::InstanceHandle_t handle)
void execute(const MonotonicTimePoint &now)
DDS::DomainParticipantListener_ptr listener_for(DDS::StatusKind kind)
DisjointSequence::OrderedRanges< DDS::InstanceHandle_t > reusable_handles_
Keep track of handles that can be reused (use handle_protector_)
ACE_Recursive_Thread_Mutex topics_protector_
Protect the topic collection.
EntityKind entityKind() const
Extract the EntityKind value.
Implementation of Replayer functionality.
GUID_t get_repoid(DDS::InstanceHandle_t id) const
void get_topic_ids(TopicIdVec &topics)
virtual void dispatch(const MonotonicTimePoint &tv)
ACE_Recursive_Thread_Mutex subscribers_protector_
Protect the subscriber collection.
Conversion processing and value testing utilities for RTPS GUID_t types.
const DDS::StatusMask DEFAULT_STATUS_MASK
virtual DDS::Topic_ptr create_topic(const char *topic_name, const char *type_name, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
virtual DDS::ReturnCode_t delete_contained_entities()
DomainParticipantImpl & impl_
Implements the OpenDDS::DCPS::Publisher interfaces.
void deref_filter_eval(const char *filter)
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
virtual void dispatch(const MonotonicTimePoint &tv)
static TimePoint_T< MonotonicClock > now()
Implements the DDS::Topic interface.
RcHandle< DCPS::BitSubscriber > get_builtin_subscriber_proxy()
TimeDuration interval() const
bool validate_subscriber_qos(DDS::SubscriberQos &subscriber_qos)
DOMAINID_TYPE_NATIVE DomainId_t
CountedHandleMap handles_
Instance handles assigned which are mapped to GUIDs (use handle_protector_)
#define Registered_Data_Types
virtual DDS::ReturnCode_t delete_topic(DDS::Topic_ptr a_topic)
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
ACE_Recursive_Thread_Mutex replayers_protector_
Protect the replayers collection.
RcHandle< FilterEvaluator > get_filter_eval(const char *filter)
virtual DDS::ReturnCode_t get_current_time(DDS::Time_t ¤t_time)
static bool validate_datawriter_qos(const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos)
virtual DDS::ReturnCode_t get_default_publisher_qos(DDS::PublisherQos &qos)
TypeIdentifierWithSize typeid_with_size
RcHandle< AutomaticLivelinessTimer > automatic_liveliness_timer_
bool prepare_to_delete_datawriters()
OwnershipManager owner_man_
RepoIdSequence pub_id_gen_
Publisher ID generator.
TypeIdentifierWithDependencies complete
virtual DDS::DomainParticipantListener_ptr get_listener()
ACE_Thread_Mutex handle_protector_
Protect the handle collection.
void remove_adjust_liveliness_timers()
DDS::ReturnCode_t get_dynamic_type(DDS::DynamicType_var &type, const DDS::BuiltinTopicKey_t &key)
virtual DDS::ReturnCode_t set_default_publisher_qos(const DDS::PublisherQos &qos)
DDS::ReturnCode_t cleanup()
ConditionVariable< ACE_Thread_Mutex > shutdown_condition_
#define TheTransientKludge
TopicMap topics_
Collection of topics.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
static ACE_thread_t self(void)
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
virtual DDS::Subscriber_ptr get_builtin_subscriber()
DDS::ReturnCode_t shutdown_result_
virtual DDS::ReturnCode_t delete_contained_entities()
DDS::ReturnCode_t delete_topic_i(DDS::Topic_ptr a_topic, bool remove_objref)
DDS::ReturnCode_t enable()
virtual DDS::ReturnCode_t get_discovered_participant_data(DDS::ParticipantBuiltinTopicData &participant_data, DDS::InstanceHandle_t participant_handle)
GUID_t dp_id_
This participant id given by discovery.
std::pair< DDS::InstanceHandle_t, unsigned int > HandleWithCounter
ACE_Thread_Mutex shutdown_mutex_
Protect the shutdown.
#define TheSecurityRegistry
bool is_clean(String *leftover_entities=0) const
Replayer_ptr create_replayer(DDS::Topic_ptr a_topic, const DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos, const ReplayerListener_rch &a_listener, DDS::StatusMask mask)
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
DDS::ReturnCode_t enable()
ACE_Thread_Mutex filter_cache_lock_
RepoIdSequence(const GUID_t &base)
DDS::Topic_ptr create_typeless_topic(const char *topic_name, const char *type_name, bool type_has_keys, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
DDS::PropertySeq PropertySeq
DDS::InstanceHandle_t await_handle(const GUID_t &id, TimeDuration max_wait=TimeDuration::zero_value) const
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
#define PUBLISHER_QOS_DEFAULT
InstanceHandleGenerator & participant_handles_
Get instances handles from DomainParticipantFactory (use handle_protector_)
OPENDDS_STRING get_unique_id()
ConditionVariable< ACE_Thread_Mutex > handle_waiters_
virtual DDS::ReturnCode_t get_discovered_topic_data(DDS::TopicBuiltinTopicData &topic_data, DDS::InstanceHandle_t topic_handle)
bool is_clean(String *leftover_entities=0) const
void set_complete_type_identifier(const TypeIdentifier &ti)
const char * topicstatus_to_string(TopicStatus value)
virtual DDS::DomainId_t get_domain_id()
DDS::DomainParticipantQos qos_
The qos of this DomainParticipant.
bool isTopic() const
Returns true if the GUID represents a topic entity.
OPENDDS_STRING uniqueParticipantId() const
TopicMap::iterator TopicMapIterator
virtual DDS::ReturnCode_t enable()
virtual DDS::ReturnCode_t set_listener(DDS::DomainParticipantListener_ptr a_listener, DDS::StatusMask mask)
bool validate_publisher_qos(DDS::PublisherQos &publisher_qos)
int strcmp(const char *s, const char *t)
TopicDescriptionMap topic_descrs_
Collection of TopicDescriptions which are not also Topics.
bool notify_all()
Unblock all of the threads waiting on this condition.
virtual DDS::ReturnCode_t enable()
void entityKey(long entityKey)
MonotonicTimePoint last_liveliness_check_
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
HANDLE_TYPE_NATIVE InstanceHandle_t
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, RecorderListener_rch a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, DDS::SubscriberQos subqos)
bool has_entity_refs() const
XTypes::TypeLookupService_rch type_lookup_service_
const ReturnCode_t RETCODE_NOT_ENABLED
AutomaticLivelinessTimer(DomainParticipantImpl &impl)
#define SUBSCRIBER_QOS_DEFAULT
const ReturnCode_t RETCODE_NO_DATA
virtual DDS::ReturnCode_t ignore_topic(DDS::InstanceHandle_t handle)
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
AtomicBool enabled_
The flag indicates the entity is enabled.
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
OpenDDS_Dcps_Export LogLevel log_level
static Recorder_ptr _duplicate(Recorder_ptr obj)
static Replayer_ptr _duplicate(Replayer_ptr obj)
Topic_Pair pair_
The topic object reference.
void set_minimal_type_identifier(const TypeIdentifier &ti)
void delete_recorder(Recorder_ptr recorder)
DDS::ReturnCode_t cleanup()
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
boolean autoenable_created_entities
Implements the DDS::TopicDescription interface.
HandleMap ignored_topics_
Collection of ignored topics.
void set_preset_type_info(const TypeInformation &type_info)
const char * retcode_to_string(DDS::ReturnCode_t value)
RepoIdMap repoIds_
By-handle lookup of instance handles assigned to GUIDs (use handle_protector_)
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
const ReturnCode_t RETCODE_ERROR
virtual void dispatch(const MonotonicTimePoint &tv)=0
virtual DDS::MultiTopic_ptr create_multitopic(const char *name, const char *type_name, const char *subscription_expression, const DDS::StringSeq &expression_parameters)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int find(OpenDDS::DCPS::DomainParticipantImpl::TopicMap &c, const Key &key, OpenDDS::DCPS::DomainParticipantImpl::TopicMap::mapped_type *&value)
This struct holds both object reference and the corresponding servant.
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
const ReturnCode_t RETCODE_OK
boolean is_write_protected
virtual DDS::ReturnCode_t get_default_topic_qos(DDS::TopicQos &qos)
const ReturnCode_t RETCODE_UNSUPPORTED
static bool validate_datareader_qos(const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt)
RcHandle< BitSubscriber > bit_subscriber_
The built in topic subscriber.
const ReturnCode_t RETCODE_NOT_ALLOWED_BY_SECURITY
boolean is_rtps_protected
bool recalculate_interval_
virtual DDS::ReturnCode_t get_qos(DDS::DomainParticipantQos &qos)
Recorder_ptr create_recorder(DDS::Topic_ptr a_topic, const DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos, const RecorderListener_rch &a_listener, DDS::StatusMask mask)
DDS::InstanceHandle_t assign_handle(const GUID_t &id=GUID_UNKNOWN)
virtual DDS::ReturnCode_t delete_subscriber(DDS::Subscriber_ptr s)
void add_adjust(OpenDDS::DCPS::DataWriterImpl *writer)
The wait has returned because it was woken up.
int insert(Container &c, const ValueType &v)
const character_type * in(void) const
OpenDDS_Dcps_Export GUID_t bit_key_to_guid(const DDS::BuiltinTopicKey_t &key)
virtual DDS::ReturnCode_t ignore_subscription(DDS::InstanceHandle_t handle)
DDS::Security::PermissionsHandle perm_handle_
This participant permissions handle given by access constrol.
static bool valid(const DDS::UserDataQosPolicy &qos)
std::pair< TopicMapIterator, TopicMapIterator > TopicMapIteratorPair
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
#define TheServiceParticipant
ParticipantLivelinessTimer(DomainParticipantImpl &impl)
virtual DDS::ReturnCode_t enable()
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
virtual DDS::Publisher_ptr create_publisher(const DDS::PublisherQos &qos, DDS::PublisherListener_ptr a_listener, DDS::StatusMask mask)
MonotonicTimePoint last_liveliness_activity_
unique_ptr< Monitor > monitor_
The Internal API and Implementation of OpenDDS.
bool is_clean(String *leftover_entities=0) const
boolean is_read_protected
virtual char * get_name()
virtual DDS::TopicDescription_ptr lookup_topicdescription(const char *name)
virtual DDS::ContentFilteredTopic_ptr create_contentfilteredtopic(const char *name, DDS::Topic_ptr related_topic, const char *filter_expression, const DDS::StringSeq &expression_parameters)
TypeIdentifierWithDependencies minimal
bool set_wait_pending_deadline(const MonotonicTimePoint &deadline)
virtual DDS::DomainParticipant_ptr get_participant()
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
OpenDDS_Dcps_Export SecurityDebug security_debug
const DDS::LivelinessQosPolicyKind kind_
TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind)
DDS::DomainParticipantListener_var listener_
Used to notify the entity for relevant events.
PropertyQosPolicy property
sequence< string > StringSeq
DDS::Topic_ptr create_new_topic(const char *topic_name, const char *type_name, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::TypeSupport_ptr type_support)
#define TOPIC_QOS_DEFAULT
static const TimeDuration max_value