00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "tao/ORB_Core.h"
00011
00012 #include "DCPSInfo_i.h"
00013
00014 #include "dds/DCPS/InfoRepoDiscovery/InfoC.h"
00015 #include "dds/DCPS/transport/tcp/TcpInst.h"
00016 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00017 #include "dds/DCPS/transport/framework/TransportInst.h"
00018 #include "dds/DCPS/transport/framework/TransportInst_rch.h"
00019 #include "dds/DCPS/transport/tcp/TcpInst.h"
00020 #include "dds/DCPS/transport/tcp/TcpInst_rch.h"
00021 #include "UpdateManager.h"
00022 #include "ShutdownInterface.h"
00023
00024 #include "dds/DCPS/BuiltInTopicUtils.h"
00025 #include "dds/DCPS/GuidUtils.h"
00026 #include "dds/DCPS/RepoIdConverter.h"
00027
00028 #include "tao/debug.h"
00029
00030 #include "ace/Read_Buffer.h"
00031 #include "ace/OS_NS_stdio.h"
00032 #include "ace/Dynamic_Service.h"
00033 #include "ace/Reactor.h"
00034
00035 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00036
00037
00038 TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i(CORBA::ORB_ptr orb
00039 , bool reincarnate
00040 , ShutdownInterface* shutdown
00041 , const TAO_DDS_DCPSFederationId& federation)
00042 : orb_(CORBA::ORB::_duplicate(orb))
00043 , federation_(federation)
00044 , participantIdGenerator_(federation.id())
00045 , um_(0)
00046 , reincarnate_(reincarnate)
00047 , shutdown_(shutdown)
00048 , reassociate_timer_id_(-1)
00049 , dispatch_check_timer_id_(-1)
00050 {
00051 if (!TheServiceParticipant->use_bidir_giop()) {
00052 int argc = 0;
00053 char** no_argv = 0;
00054 dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly");
00055 }
00056 }
00057
00058 TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i()
00059 {
00060 }
00061
00062 int
00063 TAO_DDS_DCPSInfo_i::handle_timeout(const ACE_Time_Value& ,
00064 const void* arg)
00065 {
00066 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00067
00068 if (arg == this) {
00069 if ( !CORBA::is_nil(this->dispatchingOrb_.in())){
00070 if (this->dispatchingOrb_->work_pending())
00071 {
00072
00073 ACE_Time_Value small(0,10);
00074 this->dispatchingOrb_->perform_work(small);
00075 }
00076 }
00077 }
00078 else {
00079
00080
00081
00082 for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin());
00083 dom != this->domains_.end(); ++dom) {
00084
00085 const DCPS_IR_Participant_Map& participants(dom->second->participants());
00086 for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
00087 part != participants.end(); ++part) {
00088
00089 const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions());
00090 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
00091 sub != subscriptions.end(); ++sub) {
00092 sub->second->reevaluate_defunct_associations();
00093 }
00094
00095 const DCPS_IR_Publication_Map& publications(part->second->publications());
00096 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
00097 pub != publications.end(); ++pub) {
00098 pub->second->reevaluate_defunct_associations();
00099 }
00100 }
00101 }
00102 }
00103
00104 return 0;
00105 }
00106
00107 void
00108 TAO_DDS_DCPSInfo_i::shutdown()
00109 {
00110 this->shutdown_->shutdown();
00111 }
00112
00113 CORBA::ORB_ptr
00114 TAO_DDS_DCPSInfo_i::orb()
00115 {
00116 return CORBA::ORB::_duplicate(this->orb_.in());
00117 }
00118
00119 CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant(
00120 DDS::DomainId_t domainId,
00121 const OpenDDS::DCPS::RepoId& participantId)
00122 {
00123 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00124
00125
00126 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00127
00128 if (where == this->domains_.end()) {
00129 throw OpenDDS::DCPS::Invalid_Domain();
00130 }
00131
00132
00133 DCPS_IR_Participant* participant
00134 = where->second->participant(participantId);
00135
00136 if (0 == participant) {
00137 throw OpenDDS::DCPS::Invalid_Participant();
00138 }
00139
00140
00141 participant->takeOwnership();
00142
00143 return false;
00144 }
00145
00146 bool
00147 TAO_DDS_DCPSInfo_i::changeOwnership(
00148 DDS::DomainId_t domainId,
00149 const OpenDDS::DCPS::RepoId& participantId,
00150 long sender,
00151 long owner)
00152 {
00153 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00154
00155
00156 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00157
00158 if (where == this->domains_.end()) {
00159 return false;
00160 }
00161
00162
00163 DCPS_IR_Participant* participant
00164 = where->second->participant(participantId);
00165
00166 if (0 == participant) {
00167 return false;
00168 }
00169
00170
00171 participant->changeOwner(sender, owner);
00172 return true;
00173 }
00174
00175 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic(
00176 OpenDDS::DCPS::RepoId_out topicId,
00177 DDS::DomainId_t domainId,
00178 const OpenDDS::DCPS::RepoId& participantId,
00179 const char * topicName,
00180 const char * dataTypeName,
00181 const DDS::TopicQos & qos,
00182 bool )
00183 {
00184 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00185
00186 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00187
00188 if (where == this->domains_.end()) {
00189 throw OpenDDS::DCPS::Invalid_Domain();
00190 }
00191
00192
00193 DCPS_IR_Participant* participantPtr
00194 = where->second->participant(participantId);
00195
00196 if (0 == participantPtr) {
00197 throw OpenDDS::DCPS::Invalid_Participant();
00198 }
00199
00200 OpenDDS::DCPS::TopicStatus topicStatus
00201 = where->second->add_topic(
00202 topicId,
00203 topicName,
00204 dataTypeName,
00205 qos,
00206 participantPtr);
00207
00208 if (this->um_ && (participantPtr->isBitPublisher() == false)) {
00209 Update::UTopic topic(domainId, topicId, participantId
00210 , topicName, dataTypeName
00211 , const_cast<DDS::TopicQos &>(qos));
00212 this->um_->create(topic);
00213
00214 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00215 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00216 ACE_DEBUG((LM_DEBUG,
00217 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
00218 ACE_TEXT("pushing creation of topic %C in domain %d.\n"),
00219 std::string(converter).c_str(),
00220 domainId));
00221 }
00222 }
00223 return topicStatus;
00224 }
00225
00226 bool
00227 TAO_DDS_DCPSInfo_i::add_topic(const OpenDDS::DCPS::RepoId& topicId,
00228 DDS::DomainId_t domainId,
00229 const OpenDDS::DCPS::RepoId& participantId,
00230 const char* topicName,
00231 const char* dataTypeName,
00232 const DDS::TopicQos& qos)
00233 {
00234 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00235
00236
00237 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00238
00239 if (where == this->domains_.end()) {
00240 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00241 ACE_DEBUG((LM_WARNING,
00242 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00243 ACE_TEXT("invalid domain %d.\n"),
00244 domainId));
00245 }
00246
00247 return false;
00248 }
00249
00250
00251 DCPS_IR_Participant* participantPtr
00252 = where->second->participant(participantId);
00253
00254 if (0 == participantPtr) {
00255 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00256 OpenDDS::DCPS::RepoIdConverter converter(participantId);
00257 ACE_DEBUG((LM_WARNING,
00258 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00259 ACE_TEXT("invalid participant %C.\n"),
00260 std::string(converter).c_str()));
00261 }
00262
00263 return false;
00264 }
00265
00266 OpenDDS::DCPS::TopicStatus topicStatus
00267 = where->second->force_add_topic(topicId, topicName, dataTypeName,
00268 qos, participantPtr);
00269
00270 if (topicStatus != OpenDDS::DCPS::CREATED) {
00271 return false;
00272 }
00273
00274 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00275
00276
00277
00278 if (converter.federationId() == federation_.id()) {
00279
00280 participantPtr->last_topic_key(converter.entityKey());
00281 }
00282
00283 return true;
00284 }
00285
00286 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic(
00287 DDS::DomainId_t domainId,
00288 const char * topicName,
00289 CORBA::String_out dataTypeName,
00290 DDS::TopicQos_out qos,
00291 OpenDDS::DCPS::RepoId_out topicId)
00292 {
00293 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00294
00295
00296 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00297
00298 if (where == this->domains_.end()) {
00299 throw OpenDDS::DCPS::Invalid_Domain();
00300 }
00301
00302 OpenDDS::DCPS::TopicStatus status = OpenDDS::DCPS::NOT_FOUND;
00303
00304 DCPS_IR_Topic* topic = 0;
00305 qos = new DDS::TopicQos;
00306
00307 status = where->second->find_topic(topicName, topic);
00308
00309 if (0 != topic) {
00310 status = OpenDDS::DCPS::FOUND;
00311 const DCPS_IR_Topic_Description* desc = topic->get_topic_description();
00312 dataTypeName = desc->get_dataTypeName();
00313 *qos = *(topic->get_topic_qos());
00314 topicId = topic->get_id();
00315 }
00316
00317 return status;
00318 }
00319
00320 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic(
00321 DDS::DomainId_t domainId,
00322 const OpenDDS::DCPS::RepoId& participantId,
00323 const OpenDDS::DCPS::RepoId& topicId)
00324 {
00325 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00326
00327
00328 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00329
00330 if (where == this->domains_.end()) {
00331 throw OpenDDS::DCPS::Invalid_Domain();
00332 }
00333
00334
00335 DCPS_IR_Participant* partPtr
00336 = where->second->participant(participantId);
00337
00338 if (0 == partPtr) {
00339 throw OpenDDS::DCPS::Invalid_Participant();
00340 }
00341
00342 DCPS_IR_Topic* topic;
00343
00344 if (partPtr->find_topic_reference(topicId, topic) != 0) {
00345 throw OpenDDS::DCPS::Invalid_Topic();
00346 }
00347
00348 OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic);
00349
00350 if (this->um_
00351 && (partPtr->isOwner() == true)
00352 && (partPtr->isBitPublisher() == false)) {
00353 Update::IdPath path(domainId, participantId, topicId);
00354 this->um_->destroy(path, Update::Topic);
00355
00356 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00357 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00358 ACE_DEBUG((LM_DEBUG,
00359 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
00360 ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
00361 std::string(converter).c_str(),
00362 domainId));
00363 }
00364 }
00365
00366 return removedStatus;
00367 }
00368
00369 OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_publication(
00370 DDS::DomainId_t domainId,
00371 const OpenDDS::DCPS::RepoId& participantId,
00372 const OpenDDS::DCPS::RepoId& topicId,
00373 OpenDDS::DCPS::DataWriterRemote_ptr publication,
00374 const DDS::DataWriterQos & qos,
00375 const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00376 const DDS::PublisherQos & publisherQos)
00377 {
00378 if (CORBA::is_nil(publication)) {
00379 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00380 ACE_DEBUG((LM_WARNING,
00381 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00382 ACE_TEXT("invalid publication reference.\n")));
00383 }
00384 return OpenDDS::DCPS::GUID_UNKNOWN;
00385 }
00386
00387 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00388
00389
00390 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00391
00392 if (where == this->domains_.end()) {
00393 throw OpenDDS::DCPS::Invalid_Domain();
00394 }
00395
00396
00397 DCPS_IR_Participant* partPtr
00398 = where->second->participant(participantId);
00399
00400 if (0 == partPtr) {
00401 throw OpenDDS::DCPS::Invalid_Participant();
00402 }
00403
00404 DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00405
00406 if (topic == 0) {
00407 throw OpenDDS::DCPS::Invalid_Topic();
00408 }
00409
00410
00411 OpenDDS::DCPS::RepoId pubId = partPtr->get_next_publication_id(
00412 OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
00413
00414 OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication =
00415 OpenDDS::DCPS::DataWriterRemote::_duplicate(publication);
00416
00417 if (dispatchingOrb_) {
00418
00419 CORBA::String_var pubStr = orb_->object_to_string(dispatchingPublication);
00420 CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr);
00421 if (CORBA::is_nil(pubObj)) {
00422 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00423 ACE_DEBUG((LM_WARNING,
00424 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00425 ACE_TEXT("failure marshalling publication on dispatching orb.\n")));
00426 }
00427 return OpenDDS::DCPS::GUID_UNKNOWN;
00428 }
00429
00430 dispatchingPublication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj);
00431 }
00432
00433 OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pubPtr(
00434 new DCPS_IR_Publication(
00435 pubId,
00436 partPtr,
00437 topic,
00438 dispatchingPublication.in(),
00439 qos,
00440 transInfo,
00441 publisherQos));
00442
00443 DCPS_IR_Publication* pub = pubPtr.get();
00444 if (partPtr->add_publication(OpenDDS::DCPS::move(pubPtr)) != 0) {
00445
00446 pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00447 } else if (topic->add_publication_reference(pub) != 0) {
00448
00449
00450 partPtr->remove_publication(pubId);
00451 pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00452 }
00453
00454 if (this->um_ && (partPtr->isBitPublisher() == false)) {
00455 CORBA::String_var callback = orb_->object_to_string(publication);
00456 Update::ContentSubscriptionInfo csi;
00457
00458 Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter
00459 , callback.in()
00460 , const_cast<DDS::PublisherQos &>(publisherQos)
00461 , const_cast<DDS::DataWriterQos &>(qos)
00462 , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00463 (transInfo), csi);
00464 this->um_->create(actor);
00465
00466 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00467 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00468 ACE_DEBUG((LM_DEBUG,
00469 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_publication: ")
00470 ACE_TEXT("pushing creation of publication %C in domain %d.\n"),
00471 std::string(converter).c_str(),
00472 domainId));
00473 }
00474 }
00475
00476 where->second->remove_dead_participants();
00477 return pubId;
00478 }
00479
00480 bool
00481 TAO_DDS_DCPSInfo_i::add_publication(DDS::DomainId_t domainId,
00482 const OpenDDS::DCPS::RepoId& participantId,
00483 const OpenDDS::DCPS::RepoId& topicId,
00484 const OpenDDS::DCPS::RepoId& pubId,
00485 const char* pub_str,
00486 const DDS::DataWriterQos & qos,
00487 const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00488 const DDS::PublisherQos & publisherQos,
00489 bool associate)
00490 {
00491 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00492
00493
00494 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00495
00496 if (where == this->domains_.end()) {
00497 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00498 ACE_DEBUG((LM_WARNING,
00499 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00500 ACE_TEXT("invalid domain %d.\n"),
00501 domainId));
00502 }
00503
00504 return false;
00505 }
00506
00507
00508 DCPS_IR_Participant* partPtr
00509 = where->second->participant(participantId);
00510
00511 if (0 == partPtr) {
00512 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00513 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00514 ACE_DEBUG((LM_WARNING,
00515 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00516 ACE_TEXT("invalid participant %C in domain %d.\n"),
00517 std::string(converter).c_str(),
00518 domainId));
00519 }
00520
00521 return false;
00522 }
00523
00524 DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00525
00526 if (topic == 0) {
00527 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00528 ACE_DEBUG((LM_WARNING,
00529 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00530 ACE_TEXT("invalid topic %C in domain %d.\n"),
00531 std::string(converter).c_str(),
00532 domainId));
00533 return false;
00534 }
00535
00536
00537
00538 CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_)->string_to_object(pub_str);
00539 if (CORBA::is_nil(obj.in())) {
00540 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00541 ACE_DEBUG((LM_WARNING,
00542 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00543 ACE_TEXT("failure converting string %C to objref\n"),
00544 pub_str));
00545 }
00546 return false;
00547 }
00548
00549 OpenDDS::DCPS::DataWriterRemote_var publication = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in());
00550
00551 OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pubPtr(
00552 new DCPS_IR_Publication(
00553 pubId,
00554 partPtr,
00555 topic,
00556 publication.in(),
00557 qos,
00558 transInfo,
00559 publisherQos));
00560
00561 DCPS_IR_Publication* pub = pubPtr.get();
00562 switch (partPtr->add_publication(move(pubPtr))) {
00563 case -1: {
00564 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00565 ACE_ERROR((LM_ERROR,
00566 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00567 ACE_TEXT("failed to add publication to participant %C.\n"),
00568 std::string(converter).c_str()));
00569 return false;
00570 }
00571
00572 case 1:
00573 return false;
00574 case 0:
00575 default:
00576 break;
00577 }
00578
00579 switch (topic->add_publication_reference(pub, associate)) {
00580 case -1: {
00581 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00582 ACE_ERROR((LM_ERROR,
00583 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00584 ACE_TEXT("failed to add publication to participant %C topic list.\n"),
00585 std::string(converter).c_str()));
00586
00587
00588 partPtr->remove_publication(pubId);
00589
00590 }
00591 return false;
00592
00593 case 1:
00594
00595
00596
00597
00598 return false;
00599
00600 case 0:
00601 default:
00602 break;
00603 }
00604
00605 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00606
00607
00608
00609 if (converter.federationId() == federation_.id()) {
00610
00611 partPtr->last_publication_key(converter.entityKey());
00612 }
00613
00614 return true;
00615 }
00616
00617 void TAO_DDS_DCPSInfo_i::remove_publication(
00618 DDS::DomainId_t domainId,
00619 const OpenDDS::DCPS::RepoId& participantId,
00620 const OpenDDS::DCPS::RepoId& publicationId)
00621 {
00622 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00623
00624
00625 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00626
00627 if (where == this->domains_.end()) {
00628 throw OpenDDS::DCPS::Invalid_Domain();
00629 }
00630
00631
00632 DCPS_IR_Participant* partPtr
00633 = where->second->participant(participantId);
00634
00635 if (0 == partPtr) {
00636 throw OpenDDS::DCPS::Invalid_Participant();
00637 }
00638
00639 if (partPtr->remove_publication(publicationId) != 0) {
00640 where->second->remove_dead_participants();
00641
00642
00643 throw OpenDDS::DCPS::Invalid_Publication();
00644 }
00645
00646 where->second->remove_dead_participants();
00647
00648 if (this->um_
00649 && (partPtr->isOwner() == true)
00650 && (partPtr->isBitPublisher() == false)) {
00651 Update::IdPath path(domainId, participantId, publicationId);
00652 this->um_->destroy(path, Update::Actor, Update::DataWriter);
00653
00654 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00655 OpenDDS::DCPS::RepoIdConverter converter(publicationId);
00656 ACE_DEBUG((LM_DEBUG,
00657 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
00658 ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
00659 std::string(converter).c_str(),
00660 domainId));
00661 }
00662 }
00663 }
00664
00665 OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_subscription(
00666 DDS::DomainId_t domainId,
00667 const OpenDDS::DCPS::RepoId& participantId,
00668 const OpenDDS::DCPS::RepoId& topicId,
00669 OpenDDS::DCPS::DataReaderRemote_ptr subscription,
00670 const DDS::DataReaderQos & qos,
00671 const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00672 const DDS::SubscriberQos & subscriberQos,
00673 const char* filterClassName,
00674 const char* filterExpression,
00675 const DDS::StringSeq& exprParams)
00676 {
00677 if (CORBA::is_nil(subscription)) {
00678 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00679 ACE_DEBUG((LM_WARNING,
00680 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00681 ACE_TEXT("invalid subscription reference.\n")));
00682 }
00683 return OpenDDS::DCPS::GUID_UNKNOWN;
00684 }
00685
00686 DCPS_IR_Domain* domainPtr;
00687 DCPS_IR_Participant* partPtr;
00688 DCPS_IR_Topic* topic;
00689 OpenDDS::DCPS::RepoId subId;
00690 OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> subPtr;
00691 {
00692 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00693
00694
00695 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00696
00697 if (where == this->domains_.end()) {
00698 throw OpenDDS::DCPS::Invalid_Domain();
00699 }
00700
00701
00702 domainPtr = where->second.get();
00703 partPtr = domainPtr->participant(participantId);
00704
00705 if (0 == partPtr) {
00706 throw OpenDDS::DCPS::Invalid_Participant();
00707 }
00708
00709 topic = where->second->find_topic(topicId);
00710
00711 if (topic == 0) {
00712 throw OpenDDS::DCPS::Invalid_Topic();
00713 }
00714
00715
00716 subId = partPtr->get_next_subscription_id(
00717 OpenDDS::DCPS::RepoIdConverter(topicId).isBuiltinDomainEntity());
00718
00719 OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription (
00720 OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
00721
00722 if (dispatchingOrb_) {
00723
00724 CORBA::String_var subStr = orb_->object_to_string(dispatchingSubscription);
00725 CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr);
00726 if (CORBA::is_nil(subObj.in())) {
00727 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00728 ACE_DEBUG((LM_WARNING,
00729 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00730 ACE_TEXT("failure marshalling subscription on dispatching orb.\n")));
00731 }
00732 return OpenDDS::DCPS::GUID_UNKNOWN;
00733 }
00734 dispatchingSubscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj);
00735 }
00736
00737 subPtr.reset(
00738 new DCPS_IR_Subscription(
00739 subId,
00740 partPtr,
00741 topic,
00742 dispatchingSubscription.in(),
00743 qos,
00744 transInfo,
00745 subscriberQos,
00746 filterClassName,
00747 filterExpression,
00748 exprParams));
00749
00750
00751 }
00752
00753 DCPS_IR_Subscription* sub = subPtr.get();
00754 if (partPtr->add_subscription(move(subPtr)) != 0) {
00755
00756 subId = OpenDDS::DCPS::GUID_UNKNOWN;
00757 } else if (topic->add_subscription_reference(sub) != 0) {
00758 ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n")));
00759
00760 partPtr->remove_subscription(subId);
00761 subId = OpenDDS::DCPS::GUID_UNKNOWN;
00762 }
00763
00764 if (this->um_ && (partPtr->isBitPublisher() == false)) {
00765 CORBA::String_var callback = orb_->object_to_string(subscription);
00766 Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams);
00767
00768 Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader
00769 , callback.in()
00770 , const_cast<DDS::SubscriberQos &>(subscriberQos)
00771 , const_cast<DDS::DataReaderQos &>(qos)
00772 , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00773 (transInfo), csi);
00774
00775 this->um_->create(actor);
00776
00777 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00778 OpenDDS::DCPS::RepoIdConverter converter(subId);
00779 ACE_DEBUG((LM_DEBUG,
00780 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_subscription: ")
00781 ACE_TEXT("pushing creation of subscription %C in domain %d.\n"),
00782 std::string(converter).c_str(),
00783 domainId));
00784 }
00785 }
00786
00787 domainPtr->remove_dead_participants();
00788
00789 return subId;
00790 }
00791
00792 bool
00793 TAO_DDS_DCPSInfo_i::add_subscription(
00794 DDS::DomainId_t domainId,
00795 const OpenDDS::DCPS::RepoId& participantId,
00796 const OpenDDS::DCPS::RepoId& topicId,
00797 const OpenDDS::DCPS::RepoId& subId,
00798 const char* sub_str,
00799 const DDS::DataReaderQos & qos,
00800 const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00801 const DDS::SubscriberQos & subscriberQos,
00802 const char* filterClassName,
00803 const char* filterExpression,
00804 const DDS::StringSeq& exprParams,
00805 bool associate)
00806 {
00807 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00808
00809
00810 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00811
00812 if (where == this->domains_.end()) {
00813 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00814 ACE_DEBUG((LM_WARNING,
00815 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00816 ACE_TEXT("invalid domain %d.\n"),
00817 domainId));
00818 }
00819
00820 return false;
00821 }
00822
00823
00824 DCPS_IR_Participant* partPtr
00825 = where->second->participant(participantId);
00826
00827 if (0 == partPtr) {
00828 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00829 OpenDDS::DCPS::RepoIdConverter converter(participantId);
00830 ACE_DEBUG((LM_WARNING,
00831 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00832 ACE_TEXT("invalid participant %C in domain %d.\n"),
00833 std::string(converter).c_str(),
00834 domainId));
00835 }
00836
00837 return false;
00838 }
00839
00840 DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00841
00842 if (topic == 0) {
00843 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00844 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00845 ACE_DEBUG((LM_WARNING,
00846 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00847 ACE_TEXT("invalid topic %C in domain %d.\n"),
00848 std::string(converter).c_str(),
00849 domainId));
00850 }
00851
00852 return false;
00853 }
00854
00855 CORBA::Object_var obj = (dispatchingOrb_ ? dispatchingOrb_ : orb_) ->string_to_object(sub_str);
00856 if (CORBA::is_nil(obj.in())) {
00857 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00858 ACE_DEBUG((LM_WARNING,
00859 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00860 ACE_TEXT("failure converting string %C to objref\n"),
00861 sub_str));
00862 }
00863 return false;
00864 }
00865
00866 OpenDDS::DCPS::DataReaderRemote_var subscription = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in());
00867
00868 OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> subPtr(
00869 new DCPS_IR_Subscription(
00870 subId,
00871 partPtr,
00872 topic,
00873 subscription.in(),
00874 qos,
00875 transInfo,
00876 subscriberQos,
00877 filterClassName,
00878 filterExpression,
00879 exprParams));
00880
00881 DCPS_IR_Subscription* sub = subPtr.get();
00882 switch (partPtr->add_subscription(OpenDDS::DCPS::move(subPtr))) {
00883 case -1: {
00884 OpenDDS::DCPS::RepoIdConverter converter(subId);
00885 ACE_ERROR((LM_ERROR,
00886 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00887 ACE_TEXT("failed to add subscription to participant %C.\n"),
00888 std::string(converter).c_str()));
00889 return false;
00890 }
00891
00892 case 1:
00893 return false;
00894
00895 case 0:
00896 default:
00897 break;
00898 }
00899
00900 switch (topic->add_subscription_reference(sub, associate)) {
00901 case -1: {
00902 OpenDDS::DCPS::RepoIdConverter converter(subId);
00903 ACE_ERROR((LM_ERROR,
00904 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00905 ACE_TEXT("failed to add subscription to participant %C topic list.\n"),
00906 std::string(converter).c_str()));
00907
00908
00909 partPtr->remove_subscription(subId);
00910
00911 }
00912 return false;
00913
00914 case 1:
00915
00916
00917
00918
00919 return false;
00920
00921 case 0:
00922 default:
00923 break;
00924 }
00925
00926 OpenDDS::DCPS::RepoIdConverter converter(subId);
00927
00928
00929
00930 if (converter.federationId() == federation_.id()) {
00931
00932 partPtr->last_subscription_key(converter.entityKey());
00933 }
00934
00935 return true;
00936 }
00937
00938 void TAO_DDS_DCPSInfo_i::remove_subscription(
00939 DDS::DomainId_t domainId,
00940 const OpenDDS::DCPS::RepoId& participantId,
00941 const OpenDDS::DCPS::RepoId& subscriptionId)
00942 {
00943 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00944
00945
00946 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00947
00948 if (where == this->domains_.end()) {
00949 throw OpenDDS::DCPS::Invalid_Domain();
00950 }
00951
00952
00953 DCPS_IR_Participant* partPtr
00954 = where->second->participant(participantId);
00955
00956 if (0 == partPtr) {
00957 throw OpenDDS::DCPS::Invalid_Participant();
00958 }
00959
00960 if (partPtr->remove_subscription(subscriptionId) != 0) {
00961
00962 throw OpenDDS::DCPS::Invalid_Subscription();
00963 }
00964
00965 where->second->remove_dead_participants();
00966
00967 if (this->um_
00968 && (partPtr->isOwner() == true)
00969 && (partPtr->isBitPublisher() == false)) {
00970 Update::IdPath path(domainId, participantId, subscriptionId);
00971 this->um_->destroy(path, Update::Actor, Update::DataReader);
00972
00973 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00974 OpenDDS::DCPS::RepoIdConverter converter(subscriptionId);
00975 ACE_DEBUG((LM_DEBUG,
00976 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
00977 ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
00978 std::string(converter).c_str(),
00979 domainId));
00980 }
00981 }
00982 }
00983
00984 OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant(
00985 DDS::DomainId_t domain,
00986 const DDS::DomainParticipantQos & qos)
00987 {
00988
00989 OpenDDS::DCPS::AddDomainStatus value;
00990 value.id = OpenDDS::DCPS::GUID_UNKNOWN;
00991 value.federated = this->federation_.overridden();
00992
00993 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value);
00994
00995
00996 DCPS_IR_Domain* domainPtr = this->domain(domain);
00997
00998 if (0 == domainPtr) {
00999 throw OpenDDS::DCPS::Invalid_Domain();
01000 }
01001
01002
01003 OpenDDS::DCPS::RepoId participantId = domainPtr->get_next_participant_id();
01004
01005
01006
01007 bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
01008
01009 DCPS_IR_Participant_rch participant =
01010 OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(
01011 this->federation_,
01012 participantId,
01013 domainPtr,
01014 qos, um_, isBitPart);
01015
01016
01017 value.id = participantId;
01018
01019 if (isBitPart) {
01020 participant->isBitPublisher() = true;
01021
01022 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01023 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01024 ACE_DEBUG((LM_DEBUG,
01025 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01026 ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"),
01027 std::string(converter).c_str(),
01028 domain));
01029 }
01030 }
01031
01032
01033 participant->takeOwnership();
01034
01035 int status = domainPtr->add_participant(participant);
01036
01037 if (0 != status) {
01038
01039
01040 participantId = OpenDDS::DCPS::GUID_UNKNOWN;
01041
01042 } else if (this->um_) {
01043 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01044 if (participant->isBitPublisher() == false) {
01045
01046 Update::UParticipant updateParticipant(
01047 domain,
01048 participant->owner(),
01049 participantId,
01050 const_cast<DDS::DomainParticipantQos &>(qos));
01051 this->um_->create(updateParticipant);
01052
01053 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01054 ACE_DEBUG((LM_DEBUG,
01055 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01056 ACE_TEXT("pushing creation of participant %C in domain %d.\n"),
01057 std::string(converter).c_str(),
01058 domain));
01059 }
01060 }
01061
01062
01063 um_->updateLastPartId(converter.participantId());
01064 }
01065
01066 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01067 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01068 ACE_DEBUG((LM_DEBUG,
01069 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01070 ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"),
01071 domain,
01072 std::string(converter).c_str(),
01073 participant.get()));
01074 }
01075 return value;
01076 }
01077
01078 bool
01079 TAO_DDS_DCPSInfo_i::add_domain_participant(DDS::DomainId_t domainId
01080 , const OpenDDS::DCPS::RepoId& participantId
01081 , const DDS::DomainParticipantQos & qos)
01082 {
01083 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01084
01085
01086 DCPS_IR_Domain* domainPtr = this->domain(domainId);
01087
01088 if (0 == domainPtr) {
01089 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01090 ACE_DEBUG((LM_WARNING,
01091 ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01092 ACE_TEXT("invalid domain Id: %d\n"),
01093 domainId));
01094 }
01095
01096 return false;
01097 }
01098
01099
01100 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01101
01102
01103
01104 bool isBitPart = domainPtr->participants().empty() && TheServiceParticipant->get_BIT();
01105
01106
01107 DCPS_IR_Participant* partPtr = domainPtr->participant(participantId);
01108
01109 if (0 != partPtr) {
01110 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01111 ACE_ERROR((LM_ERROR,
01112 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01113 ACE_TEXT("participant id %C already exists.\n"),
01114 std::string(converter).c_str()));
01115 }
01116
01117 return false;
01118 }
01119
01120 DCPS_IR_Participant_rch participant =
01121 OpenDDS::DCPS::make_rch<DCPS_IR_Participant>(this->federation_,
01122 participantId,
01123 domainPtr,
01124 qos, um_, isBitPart);
01125
01126 switch (domainPtr->add_participant(participant)) {
01127 case -1: {
01128 ACE_ERROR((LM_ERROR,
01129 ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01130 ACE_TEXT("failed to load participant %C in domain %d.\n"),
01131 std::string(converter).c_str(),
01132 domainId));
01133 }
01134 return false;
01135
01136 case 1:
01137
01138 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01139 ACE_DEBUG((LM_WARNING,
01140 ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01141 ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"),
01142 std::string(converter).c_str(),
01143 domainId));
01144 }
01145
01146 return false;
01147
01148 case 0:
01149 default:
01150 break;
01151 }
01152
01153
01154
01155 if (converter.federationId() == this->federation_.id()) {
01156
01157 domainPtr->last_participant_key(converter.participantId());
01158
01159 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01160 ACE_DEBUG((LM_DEBUG,
01161 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01162 ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"),
01163 converter.participantId()));
01164 }
01165 }
01166
01167 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01168 ACE_DEBUG((LM_DEBUG,
01169 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01170 ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"),
01171 std::string(converter).c_str(),
01172 participant.in(),
01173 domainId));
01174 }
01175
01176 return true;
01177 }
01178
01179 bool
01180 TAO_DDS_DCPSInfo_i::remove_by_owner(
01181 DDS::DomainId_t domain,
01182 long owner)
01183 {
01184 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01185
01186
01187 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
01188
01189 if (where == this->domains_.end()) {
01190 return false;
01191 }
01192
01193 std::vector<OpenDDS::DCPS::RepoId> candidates;
01194
01195 for (DCPS_IR_Participant_Map::const_iterator
01196 current = where->second->participants().begin();
01197 current != where->second->participants().end();
01198 ++current) {
01199 if (current->second->owner() == owner) {
01200 candidates.push_back(current->second->get_id());
01201 }
01202 }
01203
01204 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01205 ACE_DEBUG((LM_DEBUG,
01206 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01207 ACE_TEXT("%d participants to remove from domain %d.\n"),
01208 candidates.size(),
01209 domain));
01210 }
01211
01212 bool status = true;
01213
01214 for (unsigned int index = 0; index < candidates.size(); ++index) {
01215 DCPS_IR_Participant* participant
01216 = where->second->participant(candidates[index]);
01217 if (participant) {
01218 std::vector<OpenDDS::DCPS::RepoId> keylist;
01219
01220
01221 for (DCPS_IR_Subscription_Map::const_iterator
01222 current = participant->subscriptions().begin();
01223 current != participant->subscriptions().end();
01224 ++current) {
01225 keylist.push_back(current->second->get_id());
01226 }
01227
01228 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01229 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01230 ACE_DEBUG((LM_DEBUG,
01231 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01232 ACE_TEXT("%d subscriptions to remove from participant %C.\n"),
01233 keylist.size(),
01234 std::string(converter).c_str()));
01235 }
01236
01237 for (unsigned int key = 0; key < keylist.size(); ++key) {
01238 if (participant->remove_subscription(keylist[key]) != 0) {
01239 status = false;
01240 }
01241 }
01242
01243
01244 keylist.clear();
01245
01246 for (DCPS_IR_Publication_Map::const_iterator
01247 current = participant->publications().begin();
01248 current != participant->publications().end();
01249 ++current) {
01250 keylist.push_back(current->second->get_id());
01251 }
01252
01253 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01254 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01255 ACE_DEBUG((LM_DEBUG,
01256 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01257 ACE_TEXT("%d publications to remove from participant %C.\n"),
01258 keylist.size(),
01259 std::string(converter).c_str()));
01260 }
01261
01262 for (unsigned int key = 0; key < keylist.size(); ++key) {
01263 if (participant->remove_publication(keylist[key]) != 0) {
01264 status = false;
01265 }
01266 }
01267
01268
01269 keylist.clear();
01270
01271 for (DCPS_IR_Topic_Map::const_iterator
01272 current = participant->topics().begin();
01273 current != participant->topics().end();
01274 ++current) {
01275 keylist.push_back(current->second->get_id());
01276 }
01277
01278 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01279 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01280 ACE_DEBUG((LM_DEBUG,
01281 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01282 ACE_TEXT("%d topics to remove from participant %C.\n"),
01283 keylist.size(),
01284 std::string(converter).c_str()));
01285 }
01286
01287 for (unsigned int key = 0; key < keylist.size(); ++key) {
01288 DCPS_IR_Topic* discard;
01289
01290 if (participant->remove_topic_reference(keylist[key], discard) != 0) {
01291 status = false;
01292 }
01293 }
01294 }
01295
01296
01297 this->remove_domain_participant(domain, candidates[ index]);
01298 }
01299
01300 return status;
01301 }
01302
01303 void
01304 TAO_DDS_DCPSInfo_i::disassociate_participant(
01305 DDS::DomainId_t domainId,
01306 const OpenDDS::DCPS::RepoId& local_id,
01307 const OpenDDS::DCPS::RepoId& remote_id)
01308 {
01309 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01310
01311 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01312 if (it == this->domains_.end()) {
01313 throw OpenDDS::DCPS::Invalid_Domain();
01314 }
01315
01316 DCPS_IR_Participant* participant = it->second->participant(local_id);
01317 if (participant == 0) {
01318 throw OpenDDS::DCPS::Invalid_Participant();
01319 }
01320
01321
01322 const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions();
01323 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
01324 sub != subscriptions.end(); ++sub) {
01325 sub->second->disassociate_participant(remote_id, true);
01326 }
01327
01328 const DCPS_IR_Publication_Map& publications = participant->publications();
01329 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
01330 pub != publications.end(); ++pub) {
01331 pub->second->disassociate_participant(remote_id, true);
01332 }
01333
01334 it->second->remove_dead_participants();
01335 }
01336
01337 void
01338 TAO_DDS_DCPSInfo_i::disassociate_subscription(
01339 DDS::DomainId_t domainId,
01340 const OpenDDS::DCPS::RepoId& participantId,
01341 const OpenDDS::DCPS::RepoId& local_id,
01342 const OpenDDS::DCPS::RepoId& remote_id)
01343 {
01344 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01345
01346 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01347 if (it == this->domains_.end()) {
01348 throw OpenDDS::DCPS::Invalid_Domain();
01349 }
01350
01351 DCPS_IR_Participant* participant = it->second->participant(participantId);
01352 if (participant == 0) {
01353 throw OpenDDS::DCPS::Invalid_Participant();
01354 }
01355
01356 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01357 ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n"));
01358 }
01359
01360 DCPS_IR_Subscription* subscription;
01361 if (participant->find_subscription_reference(local_id, subscription)
01362 != 0 || subscription == 0) {
01363 OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01364 OpenDDS::DCPS::RepoIdConverter sub_converter(local_id);
01365 ACE_ERROR((LM_ERROR,
01366 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
01367 ACE_TEXT("participant %C could not find subscription %C.\n"),
01368 std::string(part_converter).c_str(),
01369 std::string(sub_converter).c_str()));
01370 throw OpenDDS::DCPS::Invalid_Subscription();
01371 }
01372
01373
01374 subscription->disassociate_publication(remote_id, true);
01375
01376 it->second->remove_dead_participants();
01377 }
01378
01379 void
01380 TAO_DDS_DCPSInfo_i::disassociate_publication(
01381 DDS::DomainId_t domainId,
01382 const OpenDDS::DCPS::RepoId& participantId,
01383 const OpenDDS::DCPS::RepoId& local_id,
01384 const OpenDDS::DCPS::RepoId& remote_id)
01385 {
01386 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01387
01388 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01389 if (it == this->domains_.end()) {
01390 throw OpenDDS::DCPS::Invalid_Domain();
01391 }
01392
01393 DCPS_IR_Participant* participant = it->second->participant(participantId);
01394 if (participant == 0) {
01395 throw OpenDDS::DCPS::Invalid_Participant();
01396 }
01397
01398 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01399 ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n"));
01400 }
01401
01402 DCPS_IR_Publication* publication;
01403 if (participant->find_publication_reference(local_id, publication)
01404 != 0 || publication == 0) {
01405 OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01406 OpenDDS::DCPS::RepoIdConverter pub_converter(local_id);
01407 ACE_ERROR((LM_ERROR,
01408 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
01409 ACE_TEXT("participant %C could not find publication %C.\n"),
01410 std::string(part_converter).c_str(),
01411 std::string(pub_converter).c_str()));
01412 throw OpenDDS::DCPS::Invalid_Publication();
01413 }
01414
01415
01416 publication->disassociate_subscription(remote_id, true);
01417
01418 it->second->remove_dead_participants();
01419 }
01420
01421 void TAO_DDS_DCPSInfo_i::remove_domain_participant(
01422 DDS::DomainId_t domainId,
01423 const OpenDDS::DCPS::RepoId& participantId)
01424 {
01425 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01426
01427
01428 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01429
01430 if (where == this->domains_.end()) {
01431 throw OpenDDS::DCPS::Invalid_Domain();
01432 }
01433
01434 DCPS_IR_Participant* participant = where->second->participant(participantId);
01435
01436 if (participant == 0) {
01437 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01438 ACE_ERROR((LM_ERROR,
01439 ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01440 ACE_TEXT("failed to locate participant %C in domain %d.\n"),
01441 std::string(converter).c_str(),
01442 domainId));
01443 throw OpenDDS::DCPS::Invalid_Participant();
01444 }
01445
01446
01447
01448 bool sendUpdate = (participant->isOwner() == true)
01449 && (participant->isBitPublisher() == false);
01450
01451 CORBA::Boolean dont_notify_lost = 0;
01452 int status = where->second->remove_participant(participantId, dont_notify_lost);
01453
01454 if (0 != status) {
01455
01456 throw OpenDDS::DCPS::Invalid_Participant();
01457 }
01458
01459
01460 if (this->um_ && sendUpdate) {
01461 Update::IdPath path(
01462 where->second->get_id(),
01463 participantId,
01464 participantId);
01465 this->um_->destroy(path, Update::Participant);
01466
01467 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01468 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01469 ACE_DEBUG((LM_DEBUG,
01470 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01471 ACE_TEXT("pushing deletion of participant %C in domain %d.\n"),
01472 std::string(converter).c_str(),
01473 domainId));
01474 }
01475 }
01476
01477 if (where->second->participants().empty()) {
01478 domains_.erase(where);
01479 }
01480
01481 #ifndef DDS_HAS_MINIMUM_BIT
01482 else if (where->second->useBIT() &&
01483 where->second->participants().size() == 1) {
01484
01485
01486
01487
01488 const ACE_Event_Handler_var eh = new BIT_Cleanup_Handler(this, domainId);
01489 TheServiceParticipant->reactor()->notify(eh.handler());
01490 }
01491 #endif
01492 }
01493
01494 #ifndef DDS_HAS_MINIMUM_BIT
01495 int TAO_DDS_DCPSInfo_i::BIT_Cleanup_Handler::handle_exception(ACE_HANDLE)
01496 {
01497 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, parent_->lock_, 0);
01498
01499 const DCPS_IR_Domain_Map::iterator where = parent_->domains_.find(domain_);
01500
01501 if (where == parent_->domains_.end()) {
01502 return 0;
01503 }
01504
01505 if (where->second->participants().size() == 1) {
01506 where->second->cleanup_built_in_topics();
01507 }
01508
01509 return 0;
01510 }
01511 #endif
01512
01513 void TAO_DDS_DCPSInfo_i::association_complete(DDS::DomainId_t domainId,
01514 const OpenDDS::DCPS::RepoId& participantId,
01515 const OpenDDS::DCPS::RepoId& localId,
01516 const OpenDDS::DCPS::RepoId& remoteId)
01517 {
01518 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01519
01520 DCPS_IR_Domain_Map::iterator dom_iter = this->domains_.find(domainId);
01521 if (dom_iter == this->domains_.end()) {
01522 return;
01523 }
01524
01525 DCPS_IR_Participant* partPtr = dom_iter->second->participant(participantId);
01526 if (0 == partPtr) {
01527 return;
01528 }
01529
01530
01531
01532 DCPS_IR_Subscription* sub = 0;
01533 DCPS_IR_Publication* pub = 0;
01534 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01535 ACE_DEBUG((LM_INFO, "(%P|%t) completing association\n"));
01536 }
01537 if (0 == partPtr->find_subscription_reference(localId, sub)) {
01538 sub->association_complete(remoteId);
01539 } else if (0 == partPtr->find_publication_reference(localId, pub)) {
01540 pub->association_complete(remoteId);
01541 } else {
01542 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01543 OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01544 OpenDDS::DCPS::RepoIdConverter local_converter(localId);
01545 OpenDDS::DCPS::RepoIdConverter remote_converter(remoteId);
01546 ACE_DEBUG((LM_WARNING,
01547 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::association_complete: ")
01548 ACE_TEXT("participant %C could not find subscription or publication %C ")
01549 ACE_TEXT("to complete association with remote %C.\n"),
01550 std::string(part_converter).c_str(),
01551 std::string(local_converter).c_str(),
01552 std::string(remote_converter).c_str()));
01553 }
01554 }
01555 }
01556
01557 void TAO_DDS_DCPSInfo_i::ignore_domain_participant(
01558 DDS::DomainId_t domainId,
01559 const OpenDDS::DCPS::RepoId& myParticipantId,
01560 const OpenDDS::DCPS::RepoId& ignoreId)
01561 {
01562 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01563
01564
01565 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01566
01567 if (where == this->domains_.end()) {
01568 throw OpenDDS::DCPS::Invalid_Domain();
01569 }
01570
01571
01572 DCPS_IR_Participant* partPtr
01573 = where->second->participant(myParticipantId);
01574
01575 if (0 == partPtr) {
01576 throw OpenDDS::DCPS::Invalid_Participant();
01577 }
01578
01579 partPtr->ignore_participant(ignoreId);
01580
01581 where->second->remove_dead_participants();
01582 }
01583
01584 void TAO_DDS_DCPSInfo_i::ignore_topic(
01585 DDS::DomainId_t domainId,
01586 const OpenDDS::DCPS::RepoId& myParticipantId,
01587 const OpenDDS::DCPS::RepoId& ignoreId)
01588 {
01589 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01590
01591
01592 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01593
01594 if (where == this->domains_.end()) {
01595 throw OpenDDS::DCPS::Invalid_Domain();
01596 }
01597
01598
01599 DCPS_IR_Participant* partPtr
01600 = where->second->participant(myParticipantId);
01601
01602 if (0 == partPtr) {
01603 throw OpenDDS::DCPS::Invalid_Participant();
01604 }
01605
01606 partPtr->ignore_topic(ignoreId);
01607
01608 where->second->remove_dead_participants();
01609 }
01610
01611 void TAO_DDS_DCPSInfo_i::ignore_subscription(
01612 DDS::DomainId_t domainId,
01613 const OpenDDS::DCPS::RepoId& myParticipantId,
01614 const OpenDDS::DCPS::RepoId& ignoreId)
01615 {
01616 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01617
01618
01619 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01620
01621 if (where == this->domains_.end()) {
01622 throw OpenDDS::DCPS::Invalid_Domain();
01623 }
01624
01625
01626 DCPS_IR_Participant* partPtr
01627 = where->second->participant(myParticipantId);
01628
01629 if (0 == partPtr) {
01630 throw OpenDDS::DCPS::Invalid_Participant();
01631 }
01632
01633 partPtr->ignore_subscription(ignoreId);
01634
01635 where->second->remove_dead_participants();
01636 }
01637
01638 void TAO_DDS_DCPSInfo_i::ignore_publication(
01639 DDS::DomainId_t domainId,
01640 const OpenDDS::DCPS::RepoId& myParticipantId,
01641 const OpenDDS::DCPS::RepoId& ignoreId)
01642 {
01643 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01644
01645
01646 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01647
01648 if (where == this->domains_.end()) {
01649 throw OpenDDS::DCPS::Invalid_Domain();
01650 }
01651
01652
01653 DCPS_IR_Participant* partPtr
01654 = where->second->participant(myParticipantId);
01655
01656 if (0 == partPtr) {
01657 throw OpenDDS::DCPS::Invalid_Participant();
01658 }
01659
01660 partPtr->ignore_publication(ignoreId);
01661
01662 where->second->remove_dead_participants();
01663 }
01664
01665 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos(
01666 DDS::DomainId_t domainId,
01667 const OpenDDS::DCPS::RepoId& partId,
01668 const OpenDDS::DCPS::RepoId& dwId,
01669 const DDS::DataWriterQos & qos,
01670 const DDS::PublisherQos & publisherQos)
01671 {
01672 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01673
01674
01675 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01676
01677 if (where == this->domains_.end()) {
01678 throw OpenDDS::DCPS::Invalid_Domain();
01679 }
01680
01681
01682 DCPS_IR_Participant* partPtr
01683 = where->second->participant(partId);
01684
01685 if (0 == partPtr) {
01686 throw OpenDDS::DCPS::Invalid_Participant();
01687 }
01688
01689 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01690 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 1\n"));
01691 }
01692
01693 DCPS_IR_Publication* pub;
01694
01695 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01696 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01697 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01698 ACE_ERROR((LM_ERROR,
01699 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01700 ACE_TEXT("participant %C could not find publication %C.\n"),
01701 std::string(part_converter).c_str(),
01702 std::string(pub_converter).c_str()));
01703 throw OpenDDS::DCPS::Invalid_Publication();
01704 }
01705
01706 Update::SpecificQos qosType;
01707
01708 if (pub->set_qos(qos, publisherQos, qosType) == false)
01709 return 0;
01710
01711 if (this->um_ && (partPtr->isBitPublisher() == false)) {
01712 Update::IdPath path(domainId, partId, dwId);
01713
01714 switch (qosType) {
01715 case Update::DataWriterQos:
01716 this->um_->update(path, qos);
01717 break;
01718
01719 case Update::PublisherQos:
01720 this->um_->update(path, publisherQos);
01721 break;
01722
01723 case Update::NoQos:
01724 default:
01725 break;
01726 }
01727
01728 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01729 OpenDDS::DCPS::RepoIdConverter converter(dwId);
01730 ACE_DEBUG((LM_DEBUG,
01731 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01732 ACE_TEXT("pushing update of publication %C in domain %d.\n"),
01733 std::string(converter).c_str(),
01734 domainId));
01735 }
01736 }
01737
01738 return 1;
01739 }
01740
01741 void
01742 TAO_DDS_DCPSInfo_i::update_publication_qos(
01743 DDS::DomainId_t domainId,
01744 const OpenDDS::DCPS::RepoId& partId,
01745 const OpenDDS::DCPS::RepoId& dwId,
01746 const DDS::DataWriterQos& qos)
01747 {
01748 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01749
01750
01751 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01752
01753 if (where == this->domains_.end()) {
01754 throw OpenDDS::DCPS::Invalid_Domain();
01755 }
01756
01757
01758 DCPS_IR_Participant* partPtr
01759 = where->second->participant(partId);
01760
01761 if (0 == partPtr) {
01762 throw OpenDDS::DCPS::Invalid_Participant();
01763 }
01764
01765 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01766 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 2\n"));
01767 }
01768
01769 DCPS_IR_Publication* pub;
01770
01771 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01772 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01773 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01774 ACE_ERROR((LM_ERROR,
01775 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01776 ACE_TEXT("participant %C could not find publication %C.\n"),
01777 std::string(part_converter).c_str(),
01778 std::string(pub_converter).c_str()));
01779 throw OpenDDS::DCPS::Invalid_Publication();
01780 }
01781
01782 pub->set_qos(qos);
01783 }
01784
01785 void
01786 TAO_DDS_DCPSInfo_i::update_publication_qos(
01787 DDS::DomainId_t domainId,
01788 const OpenDDS::DCPS::RepoId& partId,
01789 const OpenDDS::DCPS::RepoId& dwId,
01790 const DDS::PublisherQos& qos)
01791 {
01792 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01793
01794
01795 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01796
01797 if (where == this->domains_.end()) {
01798 throw OpenDDS::DCPS::Invalid_Domain();
01799 }
01800
01801
01802 DCPS_IR_Participant* partPtr
01803 = where->second->participant(partId);
01804
01805 if (0 == partPtr) {
01806 throw OpenDDS::DCPS::Invalid_Participant();
01807 }
01808
01809 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01810 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 3\n"));
01811 }
01812
01813 DCPS_IR_Publication* pub;
01814
01815 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01816 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01817 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01818 ACE_ERROR((LM_ERROR,
01819 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01820 ACE_TEXT("participant %C could not find publication %C.\n"),
01821 std::string(part_converter).c_str(),
01822 std::string(pub_converter).c_str()));
01823 throw OpenDDS::DCPS::Invalid_Publication();
01824 }
01825
01826 pub->set_qos(qos);
01827 }
01828
01829 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos(
01830 DDS::DomainId_t domainId,
01831 const OpenDDS::DCPS::RepoId& partId,
01832 const OpenDDS::DCPS::RepoId& drId,
01833 const DDS::DataReaderQos & qos,
01834 const DDS::SubscriberQos & subscriberQos)
01835 {
01836 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01837
01838
01839 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01840
01841 if (where == this->domains_.end()) {
01842 throw OpenDDS::DCPS::Invalid_Domain();
01843 }
01844
01845
01846 DCPS_IR_Participant* partPtr
01847 = where->second->participant(partId);
01848
01849 if (0 == partPtr) {
01850 throw OpenDDS::DCPS::Invalid_Participant();
01851 }
01852
01853 DCPS_IR_Subscription* sub;
01854
01855 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01856 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n"));
01857 }
01858
01859 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01860 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01861 OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01862 ACE_ERROR((LM_ERROR,
01863 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01864 ACE_TEXT("participant %C could not find subscription %C.\n"),
01865 std::string(part_converter).c_str(),
01866 std::string(sub_converter).c_str()));
01867 throw OpenDDS::DCPS::Invalid_Subscription();
01868 }
01869
01870 Update::SpecificQos qosType;
01871
01872 if (sub->set_qos(qos, subscriberQos, qosType) == false)
01873 return 0;
01874
01875 if (this->um_ && (partPtr->isBitPublisher() == false)) {
01876 Update::IdPath path(domainId, partId, drId);
01877
01878 switch (qosType) {
01879 case Update::DataReaderQos:
01880 this->um_->update(path, qos);
01881 break;
01882
01883 case Update::SubscriberQos:
01884 this->um_->update(path, subscriberQos);
01885 break;
01886
01887 case Update::NoQos:
01888 default:
01889 break;
01890 }
01891
01892 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01893 OpenDDS::DCPS::RepoIdConverter converter(drId);
01894 ACE_DEBUG((LM_DEBUG,
01895 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01896 ACE_TEXT("pushing update of subscription %C in domain %d.\n"),
01897 std::string(converter).c_str(),
01898 domainId));
01899 }
01900 }
01901
01902 return 1;
01903 }
01904
01905 void
01906 TAO_DDS_DCPSInfo_i::update_subscription_qos(
01907 DDS::DomainId_t domainId,
01908 const OpenDDS::DCPS::RepoId& partId,
01909 const OpenDDS::DCPS::RepoId& drId,
01910 const DDS::DataReaderQos& qos)
01911 {
01912 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01913
01914
01915 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01916
01917 if (where == this->domains_.end()) {
01918 throw OpenDDS::DCPS::Invalid_Domain();
01919 }
01920
01921
01922 DCPS_IR_Participant* partPtr
01923 = where->second->participant(partId);
01924
01925 if (0 == partPtr) {
01926 throw OpenDDS::DCPS::Invalid_Participant();
01927 }
01928
01929 DCPS_IR_Subscription* sub;
01930
01931 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01932 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n"));
01933 }
01934
01935 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01936 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01937 OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01938 ACE_ERROR((LM_ERROR,
01939 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01940 ACE_TEXT("participant %C could not find subscription %C.\n"),
01941 std::string(part_converter).c_str(),
01942 std::string(sub_converter).c_str()));
01943 throw OpenDDS::DCPS::Invalid_Subscription();
01944 }
01945
01946 sub->set_qos(qos);
01947 }
01948
01949 void
01950 TAO_DDS_DCPSInfo_i::update_subscription_qos(
01951 DDS::DomainId_t domainId,
01952 const OpenDDS::DCPS::RepoId& partId,
01953 const OpenDDS::DCPS::RepoId& drId,
01954 const DDS::SubscriberQos& qos)
01955 {
01956 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01957
01958
01959 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01960
01961 if (where == this->domains_.end()) {
01962 throw OpenDDS::DCPS::Invalid_Domain();
01963 }
01964
01965
01966 DCPS_IR_Participant* partPtr
01967 = where->second->participant(partId);
01968
01969 if (0 == partPtr) {
01970 throw OpenDDS::DCPS::Invalid_Participant();
01971 }
01972
01973 DCPS_IR_Subscription* sub;
01974
01975 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01976 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n"));
01977 }
01978
01979 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01980 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01981 OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01982 ACE_ERROR((LM_ERROR,
01983 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01984 ACE_TEXT("participant %C could not find subscription %C.\n"),
01985 std::string(part_converter).c_str(),
01986 std::string(sub_converter).c_str()));
01987 throw OpenDDS::DCPS::Invalid_Subscription();
01988 }
01989
01990 sub->set_qos(qos);
01991 }
01992
01993 CORBA::Boolean
01994 TAO_DDS_DCPSInfo_i::update_subscription_params(
01995 DDS::DomainId_t domainId,
01996 const OpenDDS::DCPS::RepoId& participantId,
01997 const OpenDDS::DCPS::RepoId& subscriptionId,
01998 const DDS::StringSeq& params)
01999 {
02000 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02001
02002 DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId);
02003 if (domain == this->domains_.end()) {
02004 throw OpenDDS::DCPS::Invalid_Domain();
02005 }
02006
02007 DCPS_IR_Participant* partPtr = domain->second->participant(participantId);
02008 if (0 == partPtr) {
02009 throw OpenDDS::DCPS::Invalid_Participant();
02010 }
02011
02012 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
02013 ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n"));
02014 }
02015
02016 DCPS_IR_Subscription* sub;
02017 if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) {
02018 OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
02019 OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId);
02020 ACE_ERROR((LM_ERROR,
02021 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
02022 ACE_TEXT("participant %C could not find subscription %C.\n"),
02023 std::string(part_converter).c_str(),
02024 std::string(sub_converter).c_str()));
02025 throw OpenDDS::DCPS::Invalid_Subscription();
02026 }
02027
02028 sub->update_expr_params(params);
02029
02030 if (this->um_ && !partPtr->isBitPublisher()) {
02031 Update::IdPath path(domainId, participantId, subscriptionId);
02032 this->um_->update(path, params);
02033 }
02034
02035 return true;
02036 }
02037
02038 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos(
02039 const OpenDDS::DCPS::RepoId& topicId,
02040 DDS::DomainId_t domainId,
02041 const OpenDDS::DCPS::RepoId& participantId,
02042 const DDS::TopicQos & qos)
02043 {
02044 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02045
02046
02047 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02048
02049 if (where == this->domains_.end()) {
02050 throw OpenDDS::DCPS::Invalid_Domain();
02051 }
02052
02053
02054 DCPS_IR_Participant* partPtr
02055 = where->second->participant(participantId);
02056
02057 if (0 == partPtr) {
02058 throw OpenDDS::DCPS::Invalid_Participant();
02059 }
02060
02061 DCPS_IR_Topic* topic;
02062
02063 if (partPtr->find_topic_reference(topicId, topic) != 0) {
02064 throw OpenDDS::DCPS::Invalid_Topic();
02065 }
02066
02067 if (topic->set_topic_qos(qos) == false)
02068 return 0;
02069
02070 if (this->um_
02071 && (partPtr->isOwner() == true)
02072 && (partPtr->isBitPublisher() == false)) {
02073 Update::IdPath path(domainId, participantId, topicId);
02074 this->um_->update(path, qos);
02075
02076 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02077 OpenDDS::DCPS::RepoIdConverter converter(topicId);
02078 ACE_DEBUG((LM_DEBUG,
02079 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
02080 ACE_TEXT("pushing update of topic %C in domain %d.\n"),
02081 std::string(converter).c_str(),
02082 domainId));
02083 }
02084 }
02085
02086 return 1;
02087 }
02088
02089 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos(
02090 DDS::DomainId_t domainId,
02091 const OpenDDS::DCPS::RepoId& participantId,
02092 const DDS::DomainParticipantQos & qos)
02093 {
02094 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02095
02096
02097 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02098
02099 if (where == this->domains_.end()) {
02100 throw OpenDDS::DCPS::Invalid_Domain();
02101 }
02102
02103
02104 DCPS_IR_Participant* partPtr
02105 = where->second->participant(participantId);
02106
02107 if (0 == partPtr) {
02108 throw OpenDDS::DCPS::Invalid_Participant();
02109 }
02110
02111 if (partPtr->set_qos(qos) == false)
02112 return 0;
02113
02114 if (this->um_
02115 && (partPtr->isOwner() == true)
02116 && (partPtr->isBitPublisher() == false)) {
02117 Update::IdPath path(domainId, participantId, participantId);
02118 this->um_->update(path, qos);
02119
02120 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02121 OpenDDS::DCPS::RepoIdConverter converter(participantId);
02122 ACE_DEBUG((LM_DEBUG,
02123 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
02124 ACE_TEXT("pushing update of participant %C in domain %d.\n"),
02125 std::string(converter).c_str(),
02126 domainId));
02127 }
02128 }
02129
02130 return 1;
02131 }
02132
02133 DCPS_IR_Domain*
02134 TAO_DDS_DCPSInfo_i::domain(DDS::DomainId_t domain)
02135 {
02136 if (domain == OpenDDS::DCPS::Service_Participant::ANY_DOMAIN) {
02137 ACE_ERROR((LM_ERROR,
02138 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02139 ACE_TEXT("ANY_DOMAIN not supported for operations.\n")));
02140 return 0;
02141 }
02142
02143
02144 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
02145
02146 if (where == this->domains_.end()) {
02147
02148 OpenDDS::DCPS::unique_ptr<DCPS_IR_Domain> domain_uptr( new
02149 DCPS_IR_Domain(domain, this->participantIdGenerator_));
02150
02151 DCPS_IR_Domain* domainPtr = domain_uptr.get();
02152
02153
02154
02155 this->domains_.insert(
02156 where,
02157 DCPS_IR_Domain_Map::value_type(domain, OpenDDS::DCPS::move(domain_uptr)));
02158
02159 #ifndef DDS_HAS_MINIMUM_BIT
02160 if (TheServiceParticipant->get_BIT() && !domainPtr->useBIT() &&
02161 domainPtr->init_built_in_topics(federation_.overridden(), reincarnate_)
02162 ) {
02163 ACE_ERROR((LM_ERROR,
02164 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02165 ACE_TEXT("failed to initialize the Built-In Topics ")
02166 ACE_TEXT("when loading domain %d.\n"),
02167 domain));
02168 this->domains_.erase(domain);
02169 return 0;
02170 }
02171 #endif
02172
02173 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02174 ACE_DEBUG((LM_DEBUG,
02175 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
02176 ACE_TEXT("successfully loaded domain %d at %x.\n"),
02177 domain,
02178 domainPtr));
02179 }
02180 return domainPtr;
02181
02182 } else {
02183 return where->second.get();
02184 }
02185 }
02186
02187 int TAO_DDS_DCPSInfo_i::init_transport(int listen_address_given,
02188 const char* listen_str)
02189 {
02190 int status = 0;
02191
02192 try {
02193
02194 #ifndef ACE_AS_STATIC_LIBS
02195 if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp"))
02196 < 0 ) {
02197 static const ACE_TCHAR directive[] =
02198 ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ")
02199 ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()");
02200 ACE_Service_Config::process_directive(directive);
02201 }
02202 #endif
02203
02204 std::string config_name =
02205 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02206 + std::string("InfoRepoBITTransportConfig");
02207 OpenDDS::DCPS::TransportConfig_rch config =
02208 OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
02209
02210 std::string inst_name =
02211 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02212 + std::string("InfoRepoBITTCPTransportInst");
02213 OpenDDS::DCPS::TransportInst_rch inst =
02214 OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
02215 "tcp");
02216 config->instances_.push_back(inst);
02217
02218 OpenDDS::DCPS::TcpInst_rch tcp_inst =
02219 OpenDDS::DCPS::dynamic_rchandle_cast<OpenDDS::DCPS::TcpInst>(inst);
02220 inst->datalink_release_delay_ = 0;
02221
02222 tcp_inst->conn_retry_attempts_ = 0;
02223
02224 if (listen_address_given) {
02225 tcp_inst->local_address(listen_str);
02226 }
02227
02228 } catch (...) {
02229
02230
02231
02232 status = 1;
02233 }
02234 return status;
02235 }
02236
02237 bool
02238 TAO_DDS_DCPSInfo_i::receive_image(const Update::UImage& image)
02239 {
02240 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02241 ACE_DEBUG((LM_DEBUG,
02242 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02243 ACE_TEXT("processing persistent data.\n")));
02244 }
02245
02246
02247 #ifndef DDS_HAS_MINIMUM_BIT
02248 if (TheServiceParticipant->get_BIT()) {
02249 for (Update::UImage::ParticipantSeq::const_iterator
02250 iter = image.participants.begin();
02251 iter != image.participants.end(); iter++) {
02252 const Update::UParticipant* part = *iter;
02253 if (!domain(part->domainId)) {
02254 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02255 ACE_DEBUG((LM_WARNING,
02256 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::receive_image: ")
02257 ACE_TEXT("invalid domain Id: %d\n"),
02258 part->domainId));
02259 }
02260 return false;
02261 }
02262 }
02263 }
02264 #endif
02265
02266
02267 participantIdGenerator_.last(image.lastPartId);
02268
02269 for (Update::UImage::ParticipantSeq::const_iterator
02270 iter = image.participants.begin();
02271 iter != image.participants.end(); iter++) {
02272 const Update::UParticipant* part = *iter;
02273
02274 if (!this->add_domain_participant(part->domainId, part->participantId
02275 , part->participantQos)) {
02276 OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02277 ACE_ERROR((LM_ERROR,
02278 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02279 ACE_TEXT("failed to add participant %C to domain %d.\n"),
02280 std::string(converter).c_str(),
02281 part->domainId));
02282 return false;
02283
02284 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02285 OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02286 ACE_DEBUG((LM_DEBUG,
02287 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02288 ACE_TEXT("added participant %C to domain %d.\n"),
02289 std::string(converter).c_str(),
02290 part->domainId));
02291 }
02292 }
02293
02294 for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin();
02295 iter != image.topics.end(); iter++) {
02296 const Update::UTopic* topic = *iter;
02297
02298 if (!this->add_topic(topic->topicId, topic->domainId
02299 , topic->participantId, topic->name.c_str()
02300 , topic->dataType.c_str(), topic->topicQos)) {
02301 OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02302 OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02303 ACE_ERROR((LM_ERROR,
02304 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02305 ACE_TEXT("failed to add topic %C to participant %C.\n"),
02306 std::string(topic_converter).c_str(),
02307 std::string(part_converter).c_str()));
02308 return false;
02309
02310 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02311 OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02312 OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02313 ACE_DEBUG((LM_DEBUG,
02314 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02315 ACE_TEXT("added topic %C to participant %C.\n"),
02316 std::string(topic_converter).c_str(),
02317 std::string(part_converter).c_str()));
02318 }
02319 }
02320
02321 for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin();
02322 iter != image.actors.end(); iter++) {
02323 const Update::URActor* sub = *iter;
02324
02325
02326 if (!this->add_subscription(sub->domainId, sub->participantId
02327 , sub->topicId, sub->actorId
02328 , sub->callback.c_str(), sub->drdwQos
02329 , sub->transportInterfaceInfo
02330 , sub->pubsubQos
02331 , sub->contentSubscriptionProfile.filterClassName
02332 , sub->contentSubscriptionProfile.filterExpr
02333 , sub->contentSubscriptionProfile.exprParams)) {
02334 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02335 OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02336 ACE_ERROR((LM_ERROR,
02337 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02338 ACE_TEXT("failed to add subscription %C to participant %C.\n"),
02339 std::string(sub_converter).c_str(),
02340 std::string(part_converter).c_str()));
02341 return false;
02342
02343 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02344 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02345 OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02346 ACE_DEBUG((LM_DEBUG,
02347 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02348 ACE_TEXT("added subscription %C to participant %C.\n"),
02349 std::string(sub_converter).c_str(),
02350 std::string(part_converter).c_str()));
02351 }
02352 }
02353
02354 for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin();
02355 iter != image.wActors.end(); iter++) {
02356 const Update::UWActor* pub = *iter;
02357
02358
02359
02360 if (!this->add_publication(pub->domainId, pub->participantId
02361 , pub->topicId, pub->actorId
02362 , pub->callback.c_str() , pub->drdwQos
02363 , pub->transportInterfaceInfo
02364 , pub->pubsubQos
02365 , true)) {
02366 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02367 OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02368 ACE_ERROR((LM_ERROR,
02369 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02370 ACE_TEXT("failed to add publication %C to participant %C.\n"),
02371 std::string(pub_converter).c_str(),
02372 std::string(part_converter).c_str()));
02373 return false;
02374
02375 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02376 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02377 OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02378 ACE_DEBUG((LM_DEBUG,
02379 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02380 ACE_TEXT("added publication %C to participant %C.\n"),
02381 std::string(pub_converter).c_str(),
02382 std::string(part_converter).c_str()));
02383 }
02384 }
02385
02386 #ifndef DDS_HAS_MINIMUM_BIT
02387 if (TheServiceParticipant->get_BIT()) {
02388 for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin();
02389 currentDomain != domains_.end();
02390 ++currentDomain) {
02391 currentDomain->second->reassociate_built_in_topic_pubs();
02392 }
02393 }
02394 #endif
02395
02396 return true;
02397 }
02398
02399 void
02400 TAO_DDS_DCPSInfo_i::add(Update::Updater* updater)
02401 {
02402 if (this->um_) {
02403 this->um_->add(updater);
02404 }
02405 }
02406
02407 bool
02408 TAO_DDS_DCPSInfo_i::init_persistence()
02409 {
02410 um_ = ACE_Dynamic_Service<UpdateManagerSvc>::instance
02411 ("UpdateManagerSvc");
02412
02413 if (um_ != 0) {
02414 um_->add(this);
02415
02416
02417 if (reincarnate_) {
02418 um_->requestImage();
02419 }
02420
02421 } else {
02422 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ")
02423 ACE_TEXT("UpdateManagerSvc.\n")), false);
02424 }
02425
02426 return true;
02427 }
02428
02429 bool
02430 TAO_DDS_DCPSInfo_i::init_reassociation(const ACE_Time_Value& delay)
02431 {
02432 if (this->reassociate_timer_id_ != -1) return false;
02433
02434 ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02435
02436 this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay);
02437 return this->reassociate_timer_id_ != -1;
02438 }
02439
02440 bool
02441 TAO_DDS_DCPSInfo_i::init_dispatchChecking(const ACE_Time_Value& delay)
02442 {
02443 if (this->dispatch_check_timer_id_ != -1) return false;
02444
02445 ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02446
02447 this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay);
02448 return this->dispatch_check_timer_id_ != -1;
02449 }
02450
02451 void
02452 TAO_DDS_DCPSInfo_i::finalize()
02453 {
02454 if (reassociate_timer_id_ != -1) {
02455 ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02456
02457 reactor->cancel_timer(this->reassociate_timer_id_);
02458 this->reassociate_timer_id_ = -1;
02459 }
02460
02461 if (dispatch_check_timer_id_ != -1) {
02462 ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02463
02464 reactor->cancel_timer(this->dispatch_check_timer_id_);
02465 this->dispatch_check_timer_id_ = -1;
02466 }
02467 }
02468
02469 const DCPS_IR_Domain_Map&
02470 TAO_DDS_DCPSInfo_i::domains() const
02471 {
02472 return this->domains_;
02473 }
02474
02475
02476 char*
02477 TAO_DDS_DCPSInfo_i::dump_to_string()
02478 {
02479 std::string dump;
02480 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02481 std::string indent (" ");
02482
02483 for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin();
02484 dm != domains_.end();
02485 dm++)
02486 {
02487 dump += dm->second->dump_to_string(indent, 0);
02488 }
02489 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02490 return CORBA::string_dup(dump.c_str());
02491
02492 }
02493
02494 OPENDDS_END_VERSIONED_NAMESPACE_DECL