12 #include "DCPSInfo_i.h"
14 #include "dds/DCPS/InfoRepoDiscovery/InfoC.h" 28 #include "tao/debug.h"
30 #include "ace/Read_Buffer.h"
31 #include "ace/OS_NS_stdio.h"
47 , federation_(federation)
48 , participantIdGenerator_(federation.id())
50 , reincarnate_(reincarnate)
52 , reassociate_timer_id_(-1)
53 , dispatch_check_timer_id_(-1)
54 #ifndef DDS_HAS_MINIMUM_BIT
55 , in_cleanup_all_built_in_topics_(false)
87 for (DCPS_IR_Domain_Map::const_iterator dom(this->
domains_.begin());
88 dom != this->
domains_.end(); ++dom) {
91 for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
92 part != participants.end(); ++part) {
95 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
96 sub != subscriptions.end(); ++sub) {
97 sub->second->reevaluate_defunct_associations();
101 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
102 pub != publications.end(); ++pub) {
103 pub->second->reevaluate_defunct_associations();
131 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
133 if (where == this->
domains_.end()) {
139 = where->second->participant(participantId);
141 if (0 == participant) {
161 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
163 if (where == this->
domains_.end()) {
169 = where->second->participant(participantId);
171 if (0 == participant) {
181 OpenDDS::DCPS::GUID_t_out topicId,
184 const char * topicName,
185 const char * dataTypeName,
191 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
193 if (where == this->
domains_.end()) {
199 = where->second->participant(participantId);
201 if (0 == participantPtr) {
206 = where->second->add_topic(
215 , topicName, dataTypeName
216 , const_cast<DDS::TopicQos &>(qos));
222 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
223 ACE_TEXT(
"pushing creation of topic %C in domain %d.\n"),
224 std::string(converter).c_str(),
235 const char* topicName,
236 const char* dataTypeName,
242 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
244 if (where == this->
domains_.end()) {
247 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
257 = where->second->participant(participantId);
259 if (0 == participantPtr) {
263 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
264 ACE_TEXT(
"invalid participant %C.\n"),
265 std::string(converter).c_str()));
272 = where->second->force_add_topic(topicId, topicName, dataTypeName,
273 qos, participantPtr);
293 const char * topicName,
295 DDS::TopicQos_out qos,
296 OpenDDS::DCPS::GUID_t_out topicId)
301 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
303 if (where == this->
domains_.end()) {
312 status = where->second->find_topic(topicName, topic);
319 topicId = topic->
get_id();
333 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
335 if (where == this->
domains_.end()) {
341 = where->second->participant(participantId);
356 && (partPtr->
isOwner() ==
true)
364 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
365 ACE_TEXT(
"pushing deletion of topic %C in domain %d.\n"),
366 std::string(converter).c_str(),
371 return removedStatus;
378 OpenDDS::DCPS::DataWriterRemote_ptr publication,
387 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
388 ACE_TEXT(
"invalid publication reference.\n")));
396 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
398 if (where == this->
domains_.end()) {
404 = where->second->participant(participantId);
420 OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication =
421 OpenDDS::DCPS::DataWriterRemote::_duplicate(publication);
430 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
431 ACE_TEXT(
"failure marshalling publication on dispatching orb.\n")));
436 dispatchingPublication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj);
444 dispatchingPublication.in(),
449 serializedTypeInfo));
469 , const_cast<DDS::DataWriterQos&>(qos)
471 , transportContextDefault, csi
472 , const_cast<DDS::OctetSeq&>(serializedTypeInfo));
478 ACE_TEXT(
"(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_publication: ")
479 ACE_TEXT(
"pushing creation of publication %C in domain %d.\n"),
480 std::string(converter).c_str(),
485 where->second->remove_dead_participants();
505 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
507 if (where == this->
domains_.end()) {
510 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
520 = where->second->participant(participantId);
526 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
527 ACE_TEXT(
"invalid participant %C in domain %d.\n"),
528 std::string(converter).c_str(),
540 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
541 ACE_TEXT(
"invalid topic %C in domain %d.\n"),
542 std::string(converter).c_str(),
553 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
554 ACE_TEXT(
"failure converting string %C to objref\n"),
560 OpenDDS::DCPS::DataWriterRemote_var publication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.
in());
572 serializedTypeInfo));
579 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
580 ACE_TEXT(
"failed to add publication to participant %C.\n"),
581 std::string(converter).c_str()));
596 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
597 ACE_TEXT(
"failed to add publication to participant %C topic list.\n"),
598 std::string(converter).c_str()));
638 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
640 if (where == this->
domains_.end()) {
650 const bool in_cleanup =
651 #ifdef DDS_HAS_MINIMUM_BIT 658 where->second->remove_dead_participants(in_cleanup);
664 where->second->remove_dead_participants(in_cleanup);
673 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
674 ACE_TEXT(
"pushing deletion of publication %C in domain %d.\n"),
675 std::string(converter).c_str(),
685 OpenDDS::DCPS::DataReaderRemote_ptr subscription,
689 const char* filterClassName,
690 const char* filterExpression,
697 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
698 ACE_TEXT(
"invalid subscription reference.\n")));
712 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
714 if (where == this->
domains_.end()) {
719 domainPtr = where->second.get();
726 topic = where->second->find_topic(topicId);
736 OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription (
737 OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
746 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
747 ACE_TEXT(
"failure marshalling subscription on dispatching orb.\n")));
751 dispatchingSubscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj);
759 dispatchingSubscription.in(),
767 serializedTypeInfo));
790 , const_cast<DDS::DataReaderQos&>(qos)
793 , const_cast<DDS::OctetSeq&>(serializedTypeInfo));
800 ACE_TEXT(
"(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_subscription: ")
801 ACE_TEXT(
"pushing creation of subscription %C in domain %d.\n"),
802 std::string(converter).c_str(),
823 const char* filterClassName,
824 const char* filterExpression,
832 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
834 if (where == this->
domains_.end()) {
837 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
847 = where->second->participant(participantId);
853 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
854 ACE_TEXT(
"invalid participant %C in domain %d.\n"),
855 std::string(converter).c_str(),
868 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
869 ACE_TEXT(
"invalid topic %C in domain %d.\n"),
870 std::string(converter).c_str(),
881 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
882 ACE_TEXT(
"failure converting string %C to objref\n"),
888 OpenDDS::DCPS::DataReaderRemote_var subscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.
in());
903 serializedTypeInfo));
910 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
911 ACE_TEXT(
"failed to add subscription to participant %C.\n"),
912 std::string(converter).c_str()));
928 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
929 ACE_TEXT(
"failed to add subscription to participant %C topic list.\n"),
930 std::string(converter).c_str()));
970 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
972 if (where == this->
domains_.end()) {
987 where->second->remove_dead_participants(
988 #ifdef DDS_HAS_MINIMUM_BIT
1002 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
1003 ACE_TEXT(
"pushing deletion of subscription %C in domain %d.\n"),
1004 std::string(converter).c_str(),
1024 if (0 == domainPtr) {
1036 OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(
1040 qos,
um_, isBitPart);
1043 value.
id = participantId;
1046 participant->isBitPublisher() =
true;
1051 ACE_TEXT(
"(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1052 ACE_TEXT(
"participant %C in domain %d is BIT publisher for this domain.\n"),
1053 std::string(converter).c_str(),
1059 participant->takeOwnership();
1061 int status = domainPtr->add_participant(participant);
1068 }
else if (this->um_) {
1070 if (participant->isBitPublisher() ==
false) {
1074 participant->owner(),
1077 this->um_->create(updateParticipant);
1081 ACE_TEXT(
"(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1082 ACE_TEXT(
"pushing creation of participant %C in domain %d.\n"),
1083 std::string(converter).c_str(),
1095 ACE_TEXT(
"(%P|%t) (GUID_t)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1096 ACE_TEXT(
"domain %d loaded participant %C at 0x%x.\n"),
1098 std::string(converter).c_str(),
1099 participant.
get()));
1114 if (0 == domainPtr) {
1117 ACE_TEXT(
"(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1118 ACE_TEXT(
"invalid domain Id: %d\n"),
1138 ACE_TEXT(
"(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1139 ACE_TEXT(
"participant id %C already exists.\n"),
1140 std::string(converter).c_str()));
1147 OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(this->
federation_,
1150 qos,
um_, isBitPart);
1152 switch (domainPtr->add_participant(participant)) {
1155 ACE_TEXT(
"(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1156 ACE_TEXT(
"failed to load participant %C in domain %d.\n"),
1157 std::string(converter).c_str(),
1166 ACE_TEXT(
"(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1167 ACE_TEXT(
"attempt to load duplicate participant %C in domain %d.\n"),
1168 std::string(converter).c_str(),
1181 if (converter.
federationId() == this->federation_.id()) {
1187 ACE_TEXT(
"(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1188 ACE_TEXT(
"Adjusting highest participant Id value to at least %d.\n"),
1195 ACE_TEXT(
"(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
1196 ACE_TEXT(
"loaded participant %C at 0x%x in domain %d.\n"),
1197 std::string(converter).c_str(),
1213 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domain);
1215 if (where == this->
domains_.end()) {
1219 std::vector<OpenDDS::DCPS::GUID_t> candidates;
1221 for (DCPS_IR_Participant_Map::const_iterator
1222 current = where->second->participants().begin();
1223 current != where->second->participants().end();
1225 if (current->second->owner() == owner) {
1226 candidates.push_back(current->second->get_id());
1232 ACE_TEXT(
"(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1233 ACE_TEXT(
"%d participants to remove from domain %d.\n"),
1240 for (
unsigned int index = 0; index < candidates.size(); ++index) {
1242 = where->second->participant(candidates[index]);
1244 std::vector<OpenDDS::DCPS::GUID_t> keylist;
1247 for (DCPS_IR_Subscription_Map::const_iterator
1251 keylist.push_back(current->second->get_id());
1257 ACE_TEXT(
"(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1258 ACE_TEXT(
"%d subscriptions to remove from participant %C.\n"),
1260 std::string(converter).c_str()));
1263 for (
unsigned int key = 0;
key < keylist.size(); ++
key) {
1272 for (DCPS_IR_Publication_Map::const_iterator
1276 keylist.push_back(current->second->get_id());
1282 ACE_TEXT(
"(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1283 ACE_TEXT(
"%d publications to remove from participant %C.\n"),
1285 std::string(converter).c_str()));
1288 for (
unsigned int key = 0;
key < keylist.size(); ++
key) {
1297 for (DCPS_IR_Topic_Map::const_iterator
1298 current = participant->
topics().begin();
1299 current != participant->
topics().end();
1301 keylist.push_back(current->second->get_id());
1307 ACE_TEXT(
"(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
1308 ACE_TEXT(
"%d topics to remove from participant %C.\n"),
1310 std::string(converter).c_str()));
1313 for (
unsigned int key = 0;
key < keylist.size(); ++
key) {
1337 DCPS_IR_Domain_Map::iterator it(this->
domains_.find(domainId));
1343 if (participant == 0) {
1349 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
1350 sub != subscriptions.end(); ++sub) {
1351 sub->second->disassociate_participant(remote_id,
true);
1355 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
1356 pub != publications.end(); ++pub) {
1357 pub->second->disassociate_participant(remote_id,
true);
1360 it->second->remove_dead_participants();
1372 DCPS_IR_Domain_Map::iterator it(this->
domains_.find(domainId));
1378 if (participant == 0) {
1388 != 0 || subscription == 0) {
1392 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
1393 ACE_TEXT(
"participant %C could not find subscription %C.\n"),
1394 std::string(part_converter).c_str(),
1395 std::string(sub_converter).c_str()));
1402 it->second->remove_dead_participants();
1414 DCPS_IR_Domain_Map::iterator it(this->
domains_.find(domainId));
1420 if (participant == 0) {
1430 != 0 || publication == 0) {
1434 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
1435 ACE_TEXT(
"participant %C could not find publication %C.\n"),
1436 std::string(part_converter).c_str(),
1437 std::string(pub_converter).c_str()));
1444 it->second->remove_dead_participants();
1454 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1456 if (where == this->
domains_.end()) {
1464 ACE_TEXT(
"(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
1465 ACE_TEXT(
"failed to locate participant %C in domain %d.\n"),
1466 std::string(converter).c_str(),
1473 bool sendUpdate = participant->isOwner() && !participant->isBitPublisher();
1476 int status = where->second->remove_participant(participantId, dont_notify_lost);
1484 if (this->
um_ && sendUpdate) {
1486 where->second->get_id(),
1494 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
1495 ACE_TEXT(
"pushing deletion of participant %C in domain %d.\n"),
1496 std::string(converter).c_str(),
1501 if (where->second->participants().empty()
1502 #ifndef DDS_HAS_MINIMUM_BIT 1512 #ifndef DDS_HAS_MINIMUM_BIT 1513 else if (where->second->useBIT() &&
1514 where->second->participants().size() == 1) {
1528 status = eh_impl->
cv_.
wait(thread_status_manager);
1534 #ifndef DDS_HAS_MINIMUM_BIT 1539 const DCPS_IR_Domain_Map::iterator where = parent_->domains_.find(domain_);
1541 if (where != parent_->domains_.end() && where->second->participants().size() == 1) {
1542 where->second->cleanup_built_in_topics();
1560 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1562 if (where == this->
domains_.end()) {
1568 = where->second->participant(myParticipantId);
1576 where->second->remove_dead_participants();
1587 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1589 if (where == this->
domains_.end()) {
1595 = where->second->participant(myParticipantId);
1603 where->second->remove_dead_participants();
1614 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1616 if (where == this->
domains_.end()) {
1622 = where->second->participant(myParticipantId);
1630 where->second->remove_dead_participants();
1641 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1643 if (where == this->
domains_.end()) {
1649 = where->second->participant(myParticipantId);
1657 where->second->remove_dead_participants();
1670 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1672 if (where == this->
domains_.end()) {
1678 = where->second->participant(partId);
1694 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1695 ACE_TEXT(
"participant %C could not find publication %C.\n"),
1696 std::string(part_converter).c_str(),
1697 std::string(pub_converter).c_str()));
1703 if (pub->
set_qos(qos, publisherQos, qosType) ==
false)
1726 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1727 ACE_TEXT(
"pushing update of publication %C in domain %d.\n"),
1728 std::string(converter).c_str(),
1746 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1748 if (where == this->
domains_.end()) {
1754 = where->second->participant(partId);
1770 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1771 ACE_TEXT(
"participant %C could not find publication %C.\n"),
1772 std::string(part_converter).c_str(),
1773 std::string(pub_converter).c_str()));
1790 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1792 if (where == this->
domains_.end()) {
1798 = where->second->participant(partId);
1814 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
1815 ACE_TEXT(
"participant %C could not find publication %C.\n"),
1816 std::string(part_converter).c_str(),
1817 std::string(pub_converter).c_str()));
1834 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1836 if (where == this->
domains_.end()) {
1842 = where->second->participant(partId);
1858 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1859 ACE_TEXT(
"participant %C could not find subscription %C.\n"),
1860 std::string(part_converter).c_str(),
1861 std::string(sub_converter).c_str()));
1867 if (sub->
set_qos(qos, subscriberQos, qosType) ==
false)
1890 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1891 ACE_TEXT(
"pushing update of subscription %C in domain %d.\n"),
1892 std::string(converter).c_str(),
1910 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1912 if (where == this->
domains_.end()) {
1918 = where->second->participant(partId);
1934 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1935 ACE_TEXT(
"participant %C could not find subscription %C.\n"),
1936 std::string(part_converter).c_str(),
1937 std::string(sub_converter).c_str()));
1954 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
1956 if (where == this->
domains_.end()) {
1962 = where->second->participant(partId);
1978 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
1979 ACE_TEXT(
"participant %C could not find subscription %C.\n"),
1980 std::string(part_converter).c_str(),
1981 std::string(sub_converter).c_str()));
1997 DCPS_IR_Domain_Map::iterator
domain = this->
domains_.find(domainId);
1998 if (domain == this->
domains_.end()) {
2016 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
2017 ACE_TEXT(
"participant %C could not find subscription %C.\n"),
2018 std::string(part_converter).c_str(),
2019 std::string(sub_converter).c_str()));
2042 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
2044 if (where == this->
domains_.end()) {
2050 = where->second->participant(participantId);
2066 && (partPtr->
isOwner() ==
true)
2074 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
2075 ACE_TEXT(
"pushing update of topic %C in domain %d.\n"),
2076 std::string(converter).c_str(),
2092 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domainId);
2094 if (where == this->
domains_.end()) {
2100 = where->second->participant(participantId);
2106 if (partPtr->
set_qos(qos) ==
false)
2110 && (partPtr->
isOwner() ==
true)
2118 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
2119 ACE_TEXT(
"pushing update of participant %C in domain %d.\n"),
2120 std::string(converter).c_str(),
2133 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
2134 ACE_TEXT(
"ANY_DOMAIN not supported for operations.\n")));
2139 DCPS_IR_Domain_Map::iterator where = this->
domains_.find(domain);
2141 if (where == this->
domains_.end()) {
2154 #ifndef DDS_HAS_MINIMUM_BIT 2159 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
2160 ACE_TEXT(
"failed to initialize the Built-In Topics ")
2161 ACE_TEXT(
"when loading domain %d.\n"),
2170 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
2171 ACE_TEXT(
"successfully loaded domain %d at %x.\n"),
2178 return where->second.get();
2183 const char* listen_str)
2187 #ifndef DDS_HAS_MINIMUM_BIT 2190 #ifndef ACE_AS_STATIC_LIBS 2194 ACE_TEXT(
"dynamic OpenDDS_Tcp Service_Object * ")
2195 ACE_TEXT(
"OpenDDS_Tcp:_make_TcpLoader()");
2200 const std::string config_name =
2202 + std::string(
"InfoRepoBITTransportConfig");
2206 const std::string inst_name =
2208 + std::string(
"InfoRepoBITTCPTransportInst");
2218 tcp_inst->conn_retry_attempts_ = 0;
2220 if (listen_address_given) {
2221 tcp_inst->local_address(listen_str);
2231 ACE_UNUSED_ARG(listen_address_given);
2232 ACE_UNUSED_ARG(listen_str);
2243 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2244 ACE_TEXT(
"processing persistent data.\n")));
2248 #ifndef DDS_HAS_MINIMUM_BIT 2250 for (Update::UImage::ParticipantSeq::const_iterator
2257 ACE_TEXT(
"(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::receive_image: ")
2258 ACE_TEXT(
"invalid domain Id: %d\n"),
2270 for (Update::UImage::ParticipantSeq::const_iterator
2279 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2280 ACE_TEXT(
"failed to add participant %C to domain %d.\n"),
2281 std::string(converter).c_str(),
2288 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2289 ACE_TEXT(
"added participant %C to domain %d.\n"),
2290 std::string(converter).c_str(),
2295 for (Update::UImage::TopicSeq::const_iterator iter = image.
topics.begin();
2296 iter != image.
topics.end(); iter++) {
2305 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2306 ACE_TEXT(
"failed to add topic %C to participant %C.\n"),
2307 std::string(topic_converter).c_str(),
2308 std::string(part_converter).c_str()));
2315 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2316 ACE_TEXT(
"added topic %C to participant %C.\n"),
2317 std::string(topic_converter).c_str(),
2318 std::string(part_converter).c_str()));
2322 for (Update::UImage::ReaderSeq::const_iterator iter = image.
actors.begin();
2323 iter != image.
actors.end(); iter++) {
2339 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2340 ACE_TEXT(
"failed to add subscription %C to participant %C.\n"),
2341 std::string(sub_converter).c_str(),
2342 std::string(part_converter).c_str()));
2349 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2350 ACE_TEXT(
"added subscription %C to participant %C.\n"),
2351 std::string(sub_converter).c_str(),
2352 std::string(part_converter).c_str()));
2356 for (Update::UImage::WriterSeq::const_iterator iter = image.
wActors.begin();
2357 iter != image.
wActors.end(); iter++) {
2372 ACE_TEXT(
"(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
2373 ACE_TEXT(
"failed to add publication %C to participant %C.\n"),
2374 std::string(pub_converter).c_str(),
2375 std::string(part_converter).c_str()));
2382 ACE_TEXT(
"(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
2383 ACE_TEXT(
"added publication %C to participant %C.\n"),
2384 std::string(pub_converter).c_str(),
2385 std::string(part_converter).c_str()));
2389 #ifndef DDS_HAS_MINIMUM_BIT 2391 for (DCPS_IR_Domain_Map::const_iterator currentDomain =
domains_.begin();
2394 currentDomain->second->reassociate_built_in_topic_pubs();
2414 (
"UpdateManagerSvc");
2426 ACE_TEXT(
"UpdateManagerSvc.\n")),
false);
2483 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 2484 std::string indent (
" ");
2486 for (DCPS_IR_Domain_Map::const_iterator dm =
domains_.begin();
2490 dump += dm->second->dump_to_string(indent, 0);
2492 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT) 2499 #ifndef DDS_HAS_MINIMUM_BIT 2510 for (DCPS_IR_Domain_Map::iterator it = copy.begin(); it != copy.end(); ++it) {
2511 it->second->cleanup_built_in_topics();
virtual void remove_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId)
virtual void disassociate_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
int find_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool set_qos(const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos, Update::SpecificQos &specificQos)
const LogLevel::Value value
virtual ::CORBA::Boolean update_subscription_params(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &subscriptionId, const DDS::StringSeq ¶ms)
int add_subscription_reference(DCPS_IR_Subscription *subscription, bool associate=true)
virtual void disassociate_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
ACE_CDR::ULong transportContext
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL typedef std::map< DDS::DomainId_t, OpenDDS::DCPS::RcHandle< DCPS_IR_Domain > > DCPS_IR_Domain_Map
OpenDDS::DCPS::GUID_t get_next_participant_id()
Next Entity Id value in sequence.
virtual void shutdown()
Cause the entire repository to exit.
bool set_topic_qos(const DDS::TopicQos &qos)
virtual CORBA::Boolean attach_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
int add_subscription(OpenDDS::DCPS::unique_ptr< DCPS_IR_Subscription > sub)
virtual ~TAO_DDS_DCPSInfo_i()
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
void remove_dead_participants(bool part_of_cleanup=false)
Remove any participants currently marked as dead.
const ACE_CDR::ULong transportContextDefault
const GUID_t GUID_UNKNOWN
Nil value for GUID.
const DCPS_IR_Domain_Map & domains() const
Expose a readable reference of the domain map.
OpenDDS::DCPS::GUID_t get_id() const
static int process_directive(const ACE_TCHAR directive[])
const DCPS_IR_Topic_Map & topics() const
Expose a readable reference to the topic map.
CSP contentSubscriptionProfile
ParticipantId participantId() const
Get the participant id from the GUID.
void destroy(const IdPath &id, ItemType type, ActorType actor=DataWriter)
DCPS_IR_Domain_Map domains_
CORBA::ORB_ptr orb()
Expose the ORB.
static TYPE * instance(const ACE_TCHAR *name)
bool receive_image(const Update::UImage &image)
OpenDDS::DCPS::GUID_t get_next_publication_id(bool builtin)
DCPS_IR_Topic_Description * get_topic_description()
virtual OpenDDS::DCPS::TopicStatus assert_topic(OpenDDS::DCPS::GUID_t_out topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey)
void ignore_participant(OpenDDS::DCPS::GUID_t id)
Ignore the participant with the id.
sequence< TransportLocator > TransportLocatorSeq
OpenDDS::Federator::RepoKey federationId() const
Get the federeation id from the GUID.
OpenDDS::DCPS::GUID_t get_next_subscription_id(bool builtin)
T::rv_reference move(T &p)
int remove_subscription(OpenDDS::DCPS::GUID_t subId)
int init_built_in_topics(bool federated, bool persistent)
CORBA::ORB_var dispatchingOrb_
bool init_dispatchChecking(const ACE_Time_Value &delay)
bool set_qos(const DDS::DomainParticipantQos &qos)
void create(const UType &info)
void ignore_publication(OpenDDS::DCPS::GUID_t id)
Ignore the publication with the id.
DCPS_IR_Participant * participant(const OpenDDS::DCPS::GUID_t &id) const
Find the participant with the id.
DDS::TopicQos * get_topic_qos()
void disassociate_publication(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any publications with id.
OpenDDS::DCPS::ConditionVariable< ACE_Recursive_Thread_Mutex > cv_
void last_topic_key(long key)
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Publication >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Publication_Map
const char * get_dataTypeName() const
virtual OpenDDS::DCPS::GUID_t add_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataReaderRemote_ptr subscription, const DDS::DataReaderQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpression, const DDS::StringSeq &exprParams, const DDS::OctetSeq &serializedTypeInfo)
const DCPS_IR_Publication_Map & publications() const
Expose a readable reference to the publication map.
DOMAINID_TYPE_NATIVE DomainId_t
virtual OpenDDS::DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId)
char * string_dup(const char *)
ACE_Event_Handler * handler(void) const
void finalize()
Cleanup state for shutdown.
void add(TAO_DDS_DCPSInfo_i *info)
long entityKey() const
Extract the EntityKey value.
void changeOwner(long sender, long owner)
Process an incoming update that changes ownership.
bool remove_by_owner(DDS::DomainId_t domain, long owner)
virtual CORBA::Boolean update_domain_participant_qos(DDS::DomainId_t domain, const OpenDDS::DCPS::GUID_t &participantId, const DDS::DomainParticipantQos &qos)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
virtual void ignore_subscription(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
Representative of a Topic.
bool add_topic(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos)
Add a previously existing topic to the repository.
void requestImage()
Force a clean shutdown.
Representative of the Domain Participant.
virtual void remove_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &publicationId)
bool set_qos(const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos, Update::SpecificQos &specificQos)
PartIdType lastPartId
What the last participant id is/was.
void cleanup_all_built_in_topics()
void add(Update::Updater *updater)
Add an additional Updater interface.
int add_publication_reference(DCPS_IR_Publication *publication, bool associate=true)
ParticipantSeq participants
sequence< octet > OctetSeq
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
static ACE_Service_Gestalt * current(void)
static CORBA::ORB_ptr _duplicate(CORBA::ORB_ptr orb)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
virtual void ignore_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
int remove_topic_reference(OpenDDS::DCPS::GUID_t topicId, DCPS_IR_Topic *&topic)
virtual ACE_Reactor * reactor(void) const
virtual OpenDDS::DCPS::GUID_t add_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const OpenDDS::DCPS::GUID_t &topicId, OpenDDS::DCPS::DataWriterRemote_ptr publication, const DDS::DataWriterQos &qos, const OpenDDS::DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const DDS::OctetSeq &serializedTypeInfo)
ShutdownInterface * shutdown_
Interface to effect shutdown of the process.
int find(const PG_Property_Set &decoder, const ACE_CString &key, TYPE &value)
long dispatch_check_timer_id_
virtual void disassociate_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &local_id, const OpenDDS::DCPS::GUID_t &remote_id)
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos)
virtual OpenDDS::DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, OpenDDS::DCPS::GUID_t_out topicId)
TAO_DDS_DCPSInfo_i(CORBA::ORB_ptr orb, bool reincarnate, ShutdownInterface *shutdown, const TAO_DDS_DCPSFederationId &federation)
int find_publication_reference(OpenDDS::DCPS::GUID_t pubId, DCPS_IR_Publication *&pub)
Return the publication object.
virtual char * dump_to_string()
Dump the Repos state to string.
ACE_recursive_thread_mutex_t lock_
void update(const IdPath &id, const QosType &qos)
long datalink_release_delay_
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
virtual void ignore_publication(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
DCPS_IR_Domain * domain(DDS::DomainId_t domain)
Convert a domain Id into a reference to a DCPS_IR_Domain object.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
const DCPS_IR_Participant_Map & participants() const
Expose a readable reference to the participant map.
virtual void shutdown()=0
void update_expr_params(const DDS::StringSeq ¶ms)
Calls associated Publications.
ORB_ptr ORB_init(int &argc, char *argv[], const char *orb_name=0)
void disassociate_subscription(OpenDDS::DCPS::GUID_t id, bool reassociate=false)
Remove any subscriptions with the id.
bool in_cleanup_all_built_in_topics_
int handle_exception(ACE_HANDLE fd)
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)
virtual CORBA::Boolean update_subscription_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
void ignore_subscription(OpenDDS::DCPS::GUID_t id)
Ignore the subscription with the id.
int remove_publication(OpenDDS::DCPS::GUID_t pubId)
DDS::ReturnCode_t copy(DDS::DynamicData_ptr dest, DDS::DynamicData_ptr src)
bool isOwner() const
Indication of whether the current repository is the owner of this participant.
std::map< OpenDDS::DCPS::GUID_t, OpenDDS::DCPS::container_supported_unique_ptr< DCPS_IR_Subscription >, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Subscription_Map
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static const char DEFAULT_INST_PREFIX[]
virtual int handle_timeout(const ACE_Time_Value &now, const void *arg)
static TransportRegistry * instance()
Return a singleton instance of this class.
Representative of a Subscription.
int add_publication(OpenDDS::DCPS::unique_ptr< DCPS_IR_Publication > pub)
Representation of a Domain in the system.
bool changeOwnership(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, long sender, long owner)
assert new ownership for a participant and its contained entities.
const TAO_DDS_DCPSFederationId & federation_
int find_subscription_reference(OpenDDS::DCPS::GUID_t subId, DCPS_IR_Subscription *&sub)
Return the subscription object.
#define ACE_ERROR_RETURN(X, Y)
The wait has returned because it was woken up.
const character_type * in(void) const
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
int init_transport(int listen_address_given, const char *listen_str)
#define TheServiceParticipant
void last_subscription_key(long key)
void ignore_topic(OpenDDS::DCPS::GUID_t id)
Ignore the topic with the id.
long reassociate_timer_id_
virtual void remove_domain_participant(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId)
void takeOwnership()
Take local ownership of this participant and publish an update.
Representative of a Topic Description.
bool init_reassociation(const ACE_Time_Value &delay)
std::map< OpenDDS::DCPS::GUID_t, DCPS_IR_Participant_rch, OpenDDS::DCPS::GUID_tKeyLessThan > DCPS_IR_Participant_Map
virtual CORBA::Boolean update_topic_qos(const OpenDDS::DCPS::GUID_t &topicId, DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &participantId, const DDS::TopicQos &qos)
const DCPS_IR_Subscription_Map & subscriptions() const
Expose a readable reference to the subscription map.
Representative of a Publication.
OpenDDS::DCPS::RepoIdGenerator participantIdGenerator_
virtual void ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &myParticipantId, const OpenDDS::DCPS::GUID_t &ignoreId)
sequence< string > StringSeq
ACE_Recursive_Thread_Mutex lock_
virtual CORBA::Boolean update_publication_qos(DDS::DomainId_t domainId, const OpenDDS::DCPS::GUID_t &partId, const OpenDDS::DCPS::GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
void last_publication_key(long key)