00001
00002
00003
00004
00005
00006
00007
00008 #include "DcpsInfo_pch.h"
00009
00010 #include "DCPSInfo_i.h"
00011
00012 #include "dds/DCPS/InfoRepoDiscovery/InfoC.h"
00013 #include "dds/DCPS/transport/tcp/TcpInst.h"
00014 #include "dds/DCPS/transport/framework/TransportRegistry.h"
00015 #include "dds/DCPS/transport/framework/TransportInst.h"
00016 #include "dds/DCPS/transport/framework/TransportInst_rch.h"
00017 #include "dds/DCPS/transport/tcp/TcpInst.h"
00018 #include "dds/DCPS/transport/tcp/TcpInst_rch.h"
00019 #include "UpdateManager.h"
00020 #include "ShutdownInterface.h"
00021
00022 #include "dds/DCPS/BuiltInTopicUtils.h"
00023 #include "dds/DCPS/GuidUtils.h"
00024 #include "dds/DCPS/RepoIdConverter.h"
00025
00026 #include "tao/debug.h"
00027
00028 #include "ace/Read_Buffer.h"
00029 #include "ace/OS_NS_stdio.h"
00030 #include "ace/Dynamic_Service.h"
00031 #include "ace/Reactor.h"
00032
00033
00034 TAO_DDS_DCPSInfo_i::TAO_DDS_DCPSInfo_i(CORBA::ORB_ptr orb
00035 , bool reincarnate
00036 , ShutdownInterface* shutdown
00037 , const TAO_DDS_DCPSFederationId& federation)
00038 : orb_(CORBA::ORB::_duplicate(orb))
00039 , federation_(federation)
00040 , participantIdGenerator_(federation.id())
00041 , um_(0)
00042 , reincarnate_(reincarnate)
00043 , shutdown_(shutdown)
00044 , reassociate_timer_id_(-1)
00045 , dispatch_check_timer_id_(-1)
00046 {
00047 int argc = 0;
00048 char** no_argv = 0;
00049 dispatchingOrb_ = CORBA::ORB_init(argc, no_argv, "dispatchingOnly");
00050 }
00051
00052
00053 TAO_DDS_DCPSInfo_i::~TAO_DDS_DCPSInfo_i()
00054 {
00055 }
00056
00057 int
00058 TAO_DDS_DCPSInfo_i::handle_timeout(const ACE_Time_Value& ,
00059 const void* arg)
00060 {
00061 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00062
00063 if (arg == this) {
00064 if ( !CORBA::is_nil(this->dispatchingOrb_.in())){
00065 if (this->dispatchingOrb_->work_pending())
00066 {
00067
00068 ACE_Time_Value small(0,10);
00069 this->dispatchingOrb_->perform_work(small);
00070 }
00071 }
00072 }
00073 else {
00074
00075
00076
00077 for (DCPS_IR_Domain_Map::const_iterator dom(this->domains_.begin());
00078 dom != this->domains_.end(); ++dom) {
00079
00080 const DCPS_IR_Participant_Map& participants(dom->second->participants());
00081 for (DCPS_IR_Participant_Map::const_iterator part(participants.begin());
00082 part != participants.end(); ++part) {
00083
00084 const DCPS_IR_Subscription_Map& subscriptions(part->second->subscriptions());
00085 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
00086 sub != subscriptions.end(); ++sub) {
00087 sub->second->reevaluate_defunct_associations();
00088 }
00089
00090 const DCPS_IR_Publication_Map& publications(part->second->publications());
00091 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
00092 pub != publications.end(); ++pub) {
00093 pub->second->reevaluate_defunct_associations();
00094 }
00095 }
00096 }
00097 }
00098
00099 return 0;
00100 }
00101
00102 void
00103 TAO_DDS_DCPSInfo_i::shutdown()
00104 {
00105 this->shutdown_->shutdown();
00106 }
00107
00108 CORBA::ORB_ptr
00109 TAO_DDS_DCPSInfo_i::orb()
00110 {
00111 return CORBA::ORB::_duplicate(this->orb_.in());
00112 }
00113
00114 CORBA::Boolean TAO_DDS_DCPSInfo_i::attach_participant(
00115 DDS::DomainId_t domainId,
00116 const OpenDDS::DCPS::RepoId& participantId)
00117 {
00118 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
00119
00120
00121 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00122
00123 if (where == this->domains_.end()) {
00124 throw OpenDDS::DCPS::Invalid_Domain();
00125 }
00126
00127
00128 DCPS_IR_Participant* participant
00129 = where->second->participant(participantId);
00130
00131 if (0 == participant) {
00132 throw OpenDDS::DCPS::Invalid_Participant();
00133 }
00134
00135
00136 participant->takeOwnership();
00137
00138 return false;
00139 }
00140
00141 bool
00142 TAO_DDS_DCPSInfo_i::changeOwnership(
00143 DDS::DomainId_t domainId,
00144 const OpenDDS::DCPS::RepoId& participantId,
00145 long sender,
00146 long owner)
00147 {
00148 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00149
00150
00151 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00152
00153 if (where == this->domains_.end()) {
00154 return false;
00155 }
00156
00157
00158 DCPS_IR_Participant* participant
00159 = where->second->participant(participantId);
00160
00161 if (0 == participant) {
00162 return false;
00163 }
00164
00165
00166 participant->changeOwner(sender, owner);
00167 return true;
00168 }
00169
00170 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::assert_topic(
00171 OpenDDS::DCPS::RepoId_out topicId,
00172 DDS::DomainId_t domainId,
00173 const OpenDDS::DCPS::RepoId& participantId,
00174 const char * topicName,
00175 const char * dataTypeName,
00176 const DDS::TopicQos & qos,
00177 bool )
00178 {
00179 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00180
00181 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00182
00183 if (where == this->domains_.end()) {
00184 throw OpenDDS::DCPS::Invalid_Domain();
00185 }
00186
00187
00188 DCPS_IR_Participant* participantPtr
00189 = where->second->participant(participantId);
00190
00191 if (0 == participantPtr) {
00192 throw OpenDDS::DCPS::Invalid_Participant();
00193 }
00194
00195 OpenDDS::DCPS::TopicStatus topicStatus
00196 = where->second->add_topic(
00197 topicId,
00198 topicName,
00199 dataTypeName,
00200 qos,
00201 participantPtr);
00202
00203 if (this->um_ && (participantPtr->isBitPublisher() == false)) {
00204 Update::UTopic topic(domainId, topicId, participantId
00205 , topicName, dataTypeName
00206 , const_cast<DDS::TopicQos &>(qos));
00207 this->um_->create(topic);
00208
00209 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00210 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00211 ACE_DEBUG((LM_DEBUG,
00212 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::assert_topic: ")
00213 ACE_TEXT("pushing creation of topic %C in domain %d.\n"),
00214 std::string(converter).c_str(),
00215 domainId));
00216 }
00217 }
00218 return topicStatus;
00219 }
00220
00221 bool
00222 TAO_DDS_DCPSInfo_i::add_topic(const OpenDDS::DCPS::RepoId& topicId,
00223 DDS::DomainId_t domainId,
00224 const OpenDDS::DCPS::RepoId& participantId,
00225 const char* topicName,
00226 const char* dataTypeName,
00227 const DDS::TopicQos& qos)
00228 {
00229 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00230
00231
00232 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00233
00234 if (where == this->domains_.end()) {
00235 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00236 ACE_DEBUG((LM_WARNING,
00237 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00238 ACE_TEXT("invalid domain %d.\n"),
00239 domainId));
00240 }
00241
00242 return false;
00243 }
00244
00245
00246 DCPS_IR_Participant* participantPtr
00247 = where->second->participant(participantId);
00248
00249 if (0 == participantPtr) {
00250 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00251 OpenDDS::DCPS::RepoIdConverter converter(participantId);
00252 ACE_DEBUG((LM_WARNING,
00253 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_topic: ")
00254 ACE_TEXT("invalid participant %C.\n"),
00255 std::string(converter).c_str()));
00256 }
00257
00258 return false;
00259 }
00260
00261 OpenDDS::DCPS::TopicStatus topicStatus
00262 = where->second->force_add_topic(topicId, topicName, dataTypeName,
00263 qos, participantPtr);
00264
00265 if (topicStatus != OpenDDS::DCPS::CREATED) {
00266 return false;
00267 }
00268
00269 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00270
00271
00272
00273 if (converter.federationId() == federation_.id()) {
00274
00275 participantPtr->last_topic_key(converter.entityKey());
00276 }
00277
00278 return true;
00279 }
00280
00281 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::find_topic(
00282 DDS::DomainId_t domainId,
00283 const char * topicName,
00284 CORBA::String_out dataTypeName,
00285 DDS::TopicQos_out qos,
00286 OpenDDS::DCPS::RepoId_out topicId)
00287 {
00288 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00289
00290
00291 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00292
00293 if (where == this->domains_.end()) {
00294 throw OpenDDS::DCPS::Invalid_Domain();
00295 }
00296
00297 OpenDDS::DCPS::TopicStatus status = OpenDDS::DCPS::NOT_FOUND;
00298
00299 DCPS_IR_Topic* topic = 0;
00300 qos = new DDS::TopicQos;
00301
00302 status = where->second->find_topic(topicName, topic);
00303
00304 if (0 != topic) {
00305 status = OpenDDS::DCPS::FOUND;
00306 const DCPS_IR_Topic_Description* desc = topic->get_topic_description();
00307 dataTypeName = desc->get_dataTypeName();
00308 *qos = *(topic->get_topic_qos());
00309 topicId = topic->get_id();
00310 }
00311
00312 return status;
00313 }
00314
00315 OpenDDS::DCPS::TopicStatus TAO_DDS_DCPSInfo_i::remove_topic(
00316 DDS::DomainId_t domainId,
00317 const OpenDDS::DCPS::RepoId& participantId,
00318 const OpenDDS::DCPS::RepoId& topicId)
00319 {
00320 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::INTERNAL_ERROR);
00321
00322
00323 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00324
00325 if (where == this->domains_.end()) {
00326 throw OpenDDS::DCPS::Invalid_Domain();
00327 }
00328
00329
00330 DCPS_IR_Participant* partPtr
00331 = where->second->participant(participantId);
00332
00333 if (0 == partPtr) {
00334 throw OpenDDS::DCPS::Invalid_Participant();
00335 }
00336
00337 DCPS_IR_Topic* topic;
00338
00339 if (partPtr->find_topic_reference(topicId, topic) != 0) {
00340 throw OpenDDS::DCPS::Invalid_Topic();
00341 }
00342
00343 OpenDDS::DCPS::TopicStatus removedStatus = where->second->remove_topic(partPtr, topic);
00344
00345 if (this->um_
00346 && (partPtr->isOwner() == true)
00347 && (partPtr->isBitPublisher() == false)) {
00348 Update::IdPath path(domainId, participantId, topicId);
00349 this->um_->destroy(path, Update::Topic);
00350
00351 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00352 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00353 ACE_DEBUG((LM_DEBUG,
00354 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_topic: ")
00355 ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
00356 std::string(converter).c_str(),
00357 domainId));
00358 }
00359 }
00360
00361 return removedStatus;
00362 }
00363
00364 OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_publication(
00365 DDS::DomainId_t domainId,
00366 const OpenDDS::DCPS::RepoId& participantId,
00367 const OpenDDS::DCPS::RepoId& topicId,
00368 OpenDDS::DCPS::DataWriterRemote_ptr publication,
00369 const DDS::DataWriterQos & qos,
00370 const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00371 const DDS::PublisherQos & publisherQos)
00372 {
00373 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00374
00375
00376 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00377
00378 if (where == this->domains_.end()) {
00379 throw OpenDDS::DCPS::Invalid_Domain();
00380 }
00381
00382
00383 DCPS_IR_Participant* partPtr
00384 = where->second->participant(participantId);
00385
00386 if (0 == partPtr) {
00387 throw OpenDDS::DCPS::Invalid_Participant();
00388 }
00389
00390 DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00391
00392 if (topic == 0) {
00393 throw OpenDDS::DCPS::Invalid_Topic();
00394 }
00395
00396 OpenDDS::DCPS::RepoId pubId = partPtr->get_next_publication_id();
00397
00398
00399 OpenDDS::DCPS::DataWriterRemote_var marshalledPub(OpenDDS::DCPS::DataWriterRemote::_duplicate(publication));
00400 if (CORBA::is_nil(marshalledPub.in())) {
00401 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00402 ACE_DEBUG((LM_WARNING,
00403 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00404 ACE_TEXT("invalid publication reference.\n")));
00405 }
00406 return OpenDDS::DCPS::GUID_UNKNOWN;
00407 }
00408 CORBA::String_var pubStr = orb_->object_to_string(marshalledPub.in());
00409 CORBA::Object_var pubObj = dispatchingOrb_->string_to_object(pubStr.in());
00410 if (CORBA::is_nil(pubObj.in())) {
00411 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00412 ACE_DEBUG((LM_WARNING,
00413 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00414 ACE_TEXT("failure marshalling publication on dispatching orb.\n")));
00415 }
00416 return OpenDDS::DCPS::GUID_UNKNOWN;
00417 }
00418 OpenDDS::DCPS::DataWriterRemote_var dispatchingPublication
00419 = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(pubObj.in());
00420
00421 DCPS_IR_Publication* pubPtr;
00422 ACE_NEW_RETURN(pubPtr,
00423 DCPS_IR_Publication(
00424 pubId,
00425 partPtr,
00426 topic,
00427 dispatchingPublication.in(),
00428 qos,
00429 transInfo,
00430 publisherQos),
00431 OpenDDS::DCPS::GUID_UNKNOWN);
00432
00433 if (partPtr->add_publication(pubPtr) != 0) {
00434
00435 pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00436 delete pubPtr;
00437 pubPtr = 0;
00438
00439 } else if (topic->add_publication_reference(pubPtr) != 0) {
00440
00441
00442 partPtr->remove_publication(pubId);
00443 pubId = OpenDDS::DCPS::GUID_UNKNOWN;
00444 }
00445
00446 if (this->um_ && (partPtr->isBitPublisher() == false)) {
00447 CORBA::String_var callback = orb_->object_to_string(publication);
00448 Update::ContentSubscriptionInfo csi;
00449
00450 Update::UWActor actor(domainId, pubId, topicId, participantId, Update::DataWriter
00451 , callback.in()
00452 , const_cast<DDS::PublisherQos &>(publisherQos)
00453 , const_cast<DDS::DataWriterQos &>(qos)
00454 , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00455 (transInfo), csi);
00456 this->um_->create(actor);
00457
00458 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00459 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00460 ACE_DEBUG((LM_DEBUG,
00461 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_publication: ")
00462 ACE_TEXT("pushing creation of publication %C in domain %d.\n"),
00463 std::string(converter).c_str(),
00464 domainId));
00465 }
00466 }
00467
00468 where->second->remove_dead_participants();
00469 return pubId;
00470 }
00471
00472 bool
00473 TAO_DDS_DCPSInfo_i::add_publication(DDS::DomainId_t domainId,
00474 const OpenDDS::DCPS::RepoId& participantId,
00475 const OpenDDS::DCPS::RepoId& topicId,
00476 const OpenDDS::DCPS::RepoId& pubId,
00477 const char* pub_str,
00478 const DDS::DataWriterQos & qos,
00479 const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00480 const DDS::PublisherQos & publisherQos,
00481 bool associate)
00482 {
00483 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00484
00485
00486 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00487
00488 if (where == this->domains_.end()) {
00489 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00490 ACE_DEBUG((LM_WARNING,
00491 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00492 ACE_TEXT("invalid domain %d.\n"),
00493 domainId));
00494 }
00495
00496 return false;
00497 }
00498
00499
00500 DCPS_IR_Participant* partPtr
00501 = where->second->participant(participantId);
00502
00503 if (0 == partPtr) {
00504 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00505 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00506 ACE_DEBUG((LM_WARNING,
00507 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00508 ACE_TEXT("invalid participant %C in domain %d.\n"),
00509 std::string(converter).c_str(),
00510 domainId));
00511 }
00512
00513 return false;
00514 }
00515
00516 DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00517
00518 if (topic == 0) {
00519 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00520 ACE_DEBUG((LM_WARNING,
00521 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00522 ACE_TEXT("invalid topic %C in domain %d.\n"),
00523 std::string(converter).c_str(),
00524 domainId));
00525 return false;
00526 }
00527
00528
00529
00530 CORBA::Object_var obj = dispatchingOrb_->string_to_object(pub_str);
00531 if (CORBA::is_nil(obj.in())) {
00532 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00533 ACE_DEBUG((LM_WARNING,
00534 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_publication: ")
00535 ACE_TEXT("failure marshalling publication %s on dispatching orb.\n"),
00536 pub_str));
00537 }
00538 return false;
00539 }
00540 OpenDDS::DCPS::DataWriterRemote_var publication
00541 = OpenDDS::DCPS::DataWriterRemote::_unchecked_narrow(obj.in());
00542
00543 DCPS_IR_Publication* pubPtr;
00544 ACE_NEW_RETURN(pubPtr,
00545 DCPS_IR_Publication(
00546 pubId,
00547 partPtr,
00548 topic,
00549 publication.in(),
00550 qos,
00551 transInfo,
00552 publisherQos),
00553 0);
00554
00555 switch (partPtr->add_publication(pubPtr)) {
00556 case -1: {
00557 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00558 ACE_ERROR((LM_ERROR,
00559 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00560 ACE_TEXT("failed to add publication to participant %C.\n"),
00561 std::string(converter).c_str()));
00562 }
00563
00564
00565 case 1:
00566 delete pubPtr;
00567 return false;
00568
00569 case 0:
00570 default:
00571 break;
00572 }
00573
00574 switch (topic->add_publication_reference(pubPtr, associate)) {
00575 case -1: {
00576 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00577 ACE_ERROR((LM_ERROR,
00578 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_publication: ")
00579 ACE_TEXT("failed to add publication to participant %C topic list.\n"),
00580 std::string(converter).c_str()));
00581
00582
00583 partPtr->remove_publication(pubId);
00584
00585 }
00586 return false;
00587
00588 case 1:
00589
00590
00591
00592
00593 return false;
00594
00595 case 0:
00596 default:
00597 break;
00598 }
00599
00600 OpenDDS::DCPS::RepoIdConverter converter(pubId);
00601
00602
00603
00604 if (converter.federationId() == federation_.id()) {
00605
00606 partPtr->last_publication_key(converter.entityKey());
00607 }
00608
00609 return true;
00610 }
00611
00612 void TAO_DDS_DCPSInfo_i::remove_publication(
00613 DDS::DomainId_t domainId,
00614 const OpenDDS::DCPS::RepoId& participantId,
00615 const OpenDDS::DCPS::RepoId& publicationId)
00616 {
00617 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00618
00619
00620 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00621
00622 if (where == this->domains_.end()) {
00623 throw OpenDDS::DCPS::Invalid_Domain();
00624 }
00625
00626
00627 DCPS_IR_Participant* partPtr
00628 = where->second->participant(participantId);
00629
00630 if (0 == partPtr) {
00631 throw OpenDDS::DCPS::Invalid_Participant();
00632 }
00633
00634 if (partPtr->remove_publication(publicationId) != 0) {
00635 where->second->remove_dead_participants();
00636
00637
00638 throw OpenDDS::DCPS::Invalid_Publication();
00639 }
00640
00641 where->second->remove_dead_participants();
00642
00643 if (this->um_
00644 && (partPtr->isOwner() == true)
00645 && (partPtr->isBitPublisher() == false)) {
00646 Update::IdPath path(domainId, participantId, publicationId);
00647 this->um_->destroy(path, Update::Actor, Update::DataWriter);
00648
00649 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00650 OpenDDS::DCPS::RepoIdConverter converter(publicationId);
00651 ACE_DEBUG((LM_DEBUG,
00652 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_publication: ")
00653 ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
00654 std::string(converter).c_str(),
00655 domainId));
00656 }
00657 }
00658 }
00659
00660 OpenDDS::DCPS::RepoId TAO_DDS_DCPSInfo_i::add_subscription(
00661 DDS::DomainId_t domainId,
00662 const OpenDDS::DCPS::RepoId& participantId,
00663 const OpenDDS::DCPS::RepoId& topicId,
00664 OpenDDS::DCPS::DataReaderRemote_ptr subscription,
00665 const DDS::DataReaderQos & qos,
00666 const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00667 const DDS::SubscriberQos & subscriberQos,
00668 const char* filterClassName,
00669 const char* filterExpression,
00670 const DDS::StringSeq& exprParams)
00671 {
00672 DCPS_IR_Domain* domainPtr;
00673 DCPS_IR_Participant* partPtr;
00674 DCPS_IR_Subscription* subPtr;
00675 DCPS_IR_Topic* topic;
00676 OpenDDS::DCPS::RepoId subId;
00677 {
00678 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, OpenDDS::DCPS::GUID_UNKNOWN);
00679
00680
00681 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00682
00683 if (where == this->domains_.end()) {
00684 throw OpenDDS::DCPS::Invalid_Domain();
00685 }
00686
00687
00688 domainPtr = where->second;
00689 partPtr = domainPtr->participant(participantId);
00690
00691 if (0 == partPtr) {
00692 throw OpenDDS::DCPS::Invalid_Participant();
00693 }
00694
00695 topic = where->second->find_topic(topicId);
00696
00697 if (topic == 0) {
00698 throw OpenDDS::DCPS::Invalid_Topic();
00699 }
00700
00701 subId = partPtr->get_next_subscription_id();
00702
00703
00704 OpenDDS::DCPS::DataReaderRemote_var marshalledSub (
00705 OpenDDS::DCPS::DataReaderRemote::_duplicate(subscription));
00706 if (CORBA::is_nil(marshalledSub.in())) {
00707 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00708 ACE_DEBUG((LM_WARNING,
00709 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00710 ACE_TEXT("invalid subscription reference.\n")));
00711 }
00712 return OpenDDS::DCPS::GUID_UNKNOWN;
00713 }
00714
00715 CORBA::String_var subStr = orb_->object_to_string(marshalledSub.in());
00716 CORBA::Object_var subObj = dispatchingOrb_->string_to_object(subStr.in());
00717 if (CORBA::is_nil(subObj.in())) {
00718 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00719 ACE_DEBUG((LM_WARNING,
00720 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00721 ACE_TEXT("failure marshalling subscription on dispatching orb.\n")));
00722 }
00723 return OpenDDS::DCPS::GUID_UNKNOWN;
00724 }
00725 OpenDDS::DCPS::DataReaderRemote_var dispatchingSubscription
00726 = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(subObj.in());
00727
00728 ACE_NEW_RETURN(subPtr,
00729 DCPS_IR_Subscription(
00730 subId,
00731 partPtr,
00732 topic,
00733 dispatchingSubscription.in(),
00734 qos,
00735 transInfo,
00736 subscriberQos,
00737 filterClassName,
00738 filterExpression,
00739 exprParams),
00740 OpenDDS::DCPS::GUID_UNKNOWN);
00741
00742
00743 }
00744 if (partPtr->add_subscription(subPtr) != 0) {
00745
00746 subId = OpenDDS::DCPS::GUID_UNKNOWN;
00747 delete subPtr;
00748 subPtr = 0;
00749
00750 } else if (topic->add_subscription_reference(subPtr) != 0) {
00751 ACE_ERROR((LM_ERROR, ACE_TEXT("Failed to add subscription to topic list.\n")));
00752
00753 partPtr->remove_subscription(subId);
00754 subId = OpenDDS::DCPS::GUID_UNKNOWN;
00755 }
00756
00757 if (this->um_ && (partPtr->isBitPublisher() == false)) {
00758 CORBA::String_var callback = orb_->object_to_string(subscription);
00759 Update::ContentSubscriptionInfo csi(filterClassName, filterExpression, exprParams);
00760
00761 Update::URActor actor(domainId, subId, topicId, participantId, Update::DataReader
00762 , callback.in()
00763 , const_cast<DDS::SubscriberQos &>(subscriberQos)
00764 , const_cast<DDS::DataReaderQos &>(qos)
00765 , const_cast<OpenDDS::DCPS::TransportLocatorSeq &>
00766 (transInfo), csi);
00767
00768 this->um_->create(actor);
00769
00770 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00771 OpenDDS::DCPS::RepoIdConverter converter(subId);
00772 ACE_DEBUG((LM_DEBUG,
00773 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_subscription: ")
00774 ACE_TEXT("pushing creation of subscription %C in domain %d.\n"),
00775 std::string(converter).c_str(),
00776 domainId));
00777 }
00778 }
00779
00780 domainPtr->remove_dead_participants();
00781
00782 return subId;
00783 }
00784
00785 bool
00786 TAO_DDS_DCPSInfo_i::add_subscription(
00787 DDS::DomainId_t domainId,
00788 const OpenDDS::DCPS::RepoId& participantId,
00789 const OpenDDS::DCPS::RepoId& topicId,
00790 const OpenDDS::DCPS::RepoId& subId,
00791 const char* sub_str,
00792 const DDS::DataReaderQos & qos,
00793 const OpenDDS::DCPS::TransportLocatorSeq & transInfo,
00794 const DDS::SubscriberQos & subscriberQos,
00795 const char* filterClassName,
00796 const char* filterExpression,
00797 const DDS::StringSeq& exprParams,
00798 bool associate)
00799 {
00800 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
00801
00802
00803 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00804
00805 if (where == this->domains_.end()) {
00806 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00807 ACE_DEBUG((LM_WARNING,
00808 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00809 ACE_TEXT("invalid domain %d.\n"),
00810 domainId));
00811 }
00812
00813 return false;
00814 }
00815
00816
00817 DCPS_IR_Participant* partPtr
00818 = where->second->participant(participantId);
00819
00820 if (0 == partPtr) {
00821 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00822 OpenDDS::DCPS::RepoIdConverter converter(participantId);
00823 ACE_DEBUG((LM_WARNING,
00824 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00825 ACE_TEXT("invalid participant %C in domain %d.\n"),
00826 std::string(converter).c_str(),
00827 domainId));
00828 }
00829
00830 return false;
00831 }
00832
00833 DCPS_IR_Topic* topic = where->second->find_topic(topicId);
00834
00835 if (topic == 0) {
00836 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00837 OpenDDS::DCPS::RepoIdConverter converter(topicId);
00838 ACE_DEBUG((LM_WARNING,
00839 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00840 ACE_TEXT("invalid topic %C in domain %d.\n"),
00841 std::string(converter).c_str(),
00842 domainId));
00843 }
00844
00845 return false;
00846 }
00847
00848 CORBA::Object_var obj = dispatchingOrb_->string_to_object(sub_str);
00849 if (CORBA::is_nil(obj.in())) {
00850 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00851 ACE_DEBUG((LM_WARNING,
00852 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i:add_subscription: ")
00853 ACE_TEXT("failure marshalling subscription %s on dispatching orb.\n"),
00854 sub_str));
00855 }
00856 return false;
00857 }
00858 OpenDDS::DCPS::DataReaderRemote_var subscription
00859 = OpenDDS::DCPS::DataReaderRemote::_unchecked_narrow(obj.in());
00860
00861 DCPS_IR_Subscription* subPtr;
00862 ACE_NEW_RETURN(subPtr,
00863 DCPS_IR_Subscription(
00864 subId,
00865 partPtr,
00866 topic,
00867 subscription.in(),
00868 qos,
00869 transInfo,
00870 subscriberQos,
00871 filterClassName,
00872 filterExpression,
00873 exprParams),
00874 0);
00875
00876 switch (partPtr->add_subscription(subPtr)) {
00877 case -1: {
00878 OpenDDS::DCPS::RepoIdConverter converter(subId);
00879 ACE_ERROR((LM_ERROR,
00880 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00881 ACE_TEXT("failed to add subscription to participant %C.\n"),
00882 std::string(converter).c_str()));
00883 }
00884
00885
00886 case 1:
00887 delete subPtr;
00888 return false;
00889
00890 case 0:
00891 default:
00892 break;
00893 }
00894
00895 switch (topic->add_subscription_reference(subPtr, associate)) {
00896 case -1: {
00897 OpenDDS::DCPS::RepoIdConverter converter(subId);
00898 ACE_ERROR((LM_ERROR,
00899 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::add_subscription: ")
00900 ACE_TEXT("failed to add subscription to participant %C topic list.\n"),
00901 std::string(converter).c_str()));
00902
00903
00904 partPtr->remove_subscription(subId);
00905
00906 }
00907 return false;
00908
00909 case 1:
00910
00911
00912
00913
00914 return false;
00915
00916 case 0:
00917 default:
00918 break;
00919 }
00920
00921 OpenDDS::DCPS::RepoIdConverter converter(subId);
00922
00923
00924
00925 if (converter.federationId() == federation_.id()) {
00926
00927 partPtr->last_subscription_key(converter.entityKey());
00928 }
00929
00930 return true;
00931 }
00932
00933 void TAO_DDS_DCPSInfo_i::remove_subscription(
00934 DDS::DomainId_t domainId,
00935 const OpenDDS::DCPS::RepoId& participantId,
00936 const OpenDDS::DCPS::RepoId& subscriptionId)
00937 {
00938 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
00939
00940
00941 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
00942
00943 if (where == this->domains_.end()) {
00944 throw OpenDDS::DCPS::Invalid_Domain();
00945 }
00946
00947
00948 DCPS_IR_Participant* partPtr
00949 = where->second->participant(participantId);
00950
00951 if (0 == partPtr) {
00952 throw OpenDDS::DCPS::Invalid_Participant();
00953 }
00954
00955 if (partPtr->remove_subscription(subscriptionId) != 0) {
00956
00957 throw OpenDDS::DCPS::Invalid_Subscription();
00958 }
00959
00960 where->second->remove_dead_participants();
00961
00962 if (this->um_
00963 && (partPtr->isOwner() == true)
00964 && (partPtr->isBitPublisher() == false)) {
00965 Update::IdPath path(domainId, participantId, subscriptionId);
00966 this->um_->destroy(path, Update::Actor, Update::DataReader);
00967
00968 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00969 OpenDDS::DCPS::RepoIdConverter converter(subscriptionId);
00970 ACE_DEBUG((LM_DEBUG,
00971 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_subscription: ")
00972 ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
00973 std::string(converter).c_str(),
00974 domainId));
00975 }
00976 }
00977 }
00978
00979 OpenDDS::DCPS::AddDomainStatus TAO_DDS_DCPSInfo_i::add_domain_participant(
00980 DDS::DomainId_t domain,
00981 const DDS::DomainParticipantQos & qos)
00982 {
00983
00984 OpenDDS::DCPS::AddDomainStatus value;
00985 value.id = OpenDDS::DCPS::GUID_UNKNOWN;
00986 value.federated = this->federation_.overridden();
00987
00988 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, value);
00989
00990
00991 DCPS_IR_Domain* domainPtr = this->domain(domain);
00992
00993 if (0 == domainPtr) {
00994 throw OpenDDS::DCPS::Invalid_Domain();
00995 }
00996
00997
00998 OpenDDS::DCPS::RepoId participantId = domainPtr->get_next_participant_id();
00999
01000 DCPS_IR_Participant* participant;
01001 ACE_NEW_RETURN(participant,
01002 DCPS_IR_Participant(
01003 this->federation_,
01004 participantId,
01005 domainPtr,
01006 qos, um_),
01007 value);
01008
01009
01010 value.id = participantId;
01011
01012
01013
01014 if (domainPtr->participants().empty() && TheServiceParticipant->get_BIT()) {
01015 participant->isBitPublisher() = true;
01016
01017 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01018 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01019 ACE_DEBUG((LM_DEBUG,
01020 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01021 ACE_TEXT("participant %C in domain %d is BIT publisher for this domain.\n"),
01022 std::string(converter).c_str(),
01023 domain));
01024 }
01025 }
01026
01027
01028 participant->takeOwnership();
01029
01030 int status = domainPtr->add_participant(participant);
01031
01032 if (0 != status) {
01033
01034
01035 participantId = OpenDDS::DCPS::GUID_UNKNOWN;
01036 delete participant;
01037 participant = 0;
01038
01039 } else if (this->um_ && (participant->isBitPublisher() == false)) {
01040
01041 Update::UParticipant updateParticipant(
01042 domain,
01043 participant->owner(),
01044 participantId,
01045 const_cast<DDS::DomainParticipantQos &>(qos));
01046 this->um_->create(updateParticipant);
01047
01048 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01049 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01050 ACE_DEBUG((LM_DEBUG,
01051 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01052 ACE_TEXT("pushing creation of participant %C in domain %d.\n"),
01053 std::string(converter).c_str(),
01054 domain));
01055 }
01056 }
01057
01058 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01059 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01060 ACE_DEBUG((LM_DEBUG,
01061 ACE_TEXT("(%P|%t) (RepoId)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01062 ACE_TEXT("domain %d loaded participant %C at 0x%x.\n"),
01063 domain,
01064 std::string(converter).c_str(),
01065 participant));
01066 }
01067 return value;
01068 }
01069
01070 bool
01071 TAO_DDS_DCPSInfo_i::add_domain_participant(DDS::DomainId_t domainId
01072 , const OpenDDS::DCPS::RepoId& participantId
01073 , const DDS::DomainParticipantQos & qos)
01074 {
01075 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01076
01077
01078 DCPS_IR_Domain* domainPtr = this->domain(domainId);
01079
01080 if (0 == domainPtr) {
01081 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01082 ACE_DEBUG((LM_WARNING,
01083 ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01084 ACE_TEXT("invalid domain Id: %d\n"),
01085 domainId));
01086 }
01087
01088 return false;
01089 }
01090
01091
01092 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01093
01094
01095 DCPS_IR_Participant* partPtr
01096 = domainPtr->participant(participantId);
01097
01098 if (0 != partPtr) {
01099 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01100 ACE_ERROR((LM_ERROR,
01101 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01102 ACE_TEXT("participant id %C already exists.\n"),
01103 std::string(converter).c_str()));
01104 }
01105
01106 return false;
01107 }
01108
01109 DCPS_IR_Participant* participant;
01110 ACE_NEW_RETURN(participant,
01111 DCPS_IR_Participant(this->federation_,
01112 participantId,
01113 domainPtr,
01114 qos, um_), 0);
01115
01116 switch (domainPtr->add_participant(participant)) {
01117 case -1: {
01118 ACE_ERROR((LM_ERROR,
01119 ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01120 ACE_TEXT("failed to load participant %C in domain %d.\n"),
01121 std::string(converter).c_str(),
01122 domainId));
01123 }
01124 delete participant;
01125 return false;
01126
01127 case 1:
01128
01129 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01130 ACE_DEBUG((LM_WARNING,
01131 ACE_TEXT("(%P|%t) WARNING: (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01132 ACE_TEXT("attempt to load duplicate participant %C in domain %d.\n"),
01133 std::string(converter).c_str(),
01134 domainId));
01135 }
01136
01137 delete participant;
01138 return false;
01139
01140 case 0:
01141 default:
01142 break;
01143 }
01144
01145
01146
01147 if (converter.federationId() == this->federation_.id()) {
01148
01149 domainPtr->last_participant_key(converter.participantId());
01150
01151 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01152 ACE_DEBUG((LM_DEBUG,
01153 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01154 ACE_TEXT("Adjusting highest participant Id value to at least %d.\n"),
01155 converter.participantId()));
01156 }
01157 }
01158
01159 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01160 ACE_DEBUG((LM_DEBUG,
01161 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::add_domain_participant: ")
01162 ACE_TEXT("loaded participant %C at 0x%x in domain %d.\n"),
01163 std::string(converter).c_str(),
01164 participant,
01165 domainId));
01166 }
01167
01168 return true;
01169 }
01170
01171 bool
01172 TAO_DDS_DCPSInfo_i::remove_by_owner(
01173 DDS::DomainId_t domain,
01174 long owner)
01175 {
01176 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, false);
01177
01178
01179 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
01180
01181 if (where == this->domains_.end()) {
01182 return false;
01183 }
01184
01185 std::vector<OpenDDS::DCPS::RepoId> candidates;
01186
01187 for (DCPS_IR_Participant_Map::const_iterator
01188 current = where->second->participants().begin();
01189 current != where->second->participants().end();
01190 ++current) {
01191 if (current->second->owner() == owner) {
01192 candidates.push_back(current->second->get_id());
01193 }
01194 }
01195
01196 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01197 ACE_DEBUG((LM_DEBUG,
01198 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01199 ACE_TEXT("%d participants to remove from domain %d.\n"),
01200 candidates.size(),
01201 domain));
01202 }
01203
01204 bool status = true;
01205
01206 for (unsigned int index = 0; index < candidates.size(); ++index) {
01207 DCPS_IR_Participant* participant
01208 = where->second->participant(candidates[index]);
01209
01210 std::vector<OpenDDS::DCPS::RepoId> keylist;
01211
01212
01213 for (DCPS_IR_Subscription_Map::const_iterator
01214 current = participant->subscriptions().begin();
01215 current != participant->subscriptions().end();
01216 ++current) {
01217 keylist.push_back(current->second->get_id());
01218 }
01219
01220 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01221 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01222 ACE_DEBUG((LM_DEBUG,
01223 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01224 ACE_TEXT("%d subscriptions to remove from participant %C.\n"),
01225 keylist.size(),
01226 std::string(converter).c_str()));
01227 }
01228
01229 for (unsigned int key = 0; key < keylist.size(); ++key) {
01230 if (participant->remove_subscription(keylist[ key]) != 0) {
01231 status = false;
01232 }
01233 }
01234
01235
01236 keylist.clear();
01237
01238 for (DCPS_IR_Publication_Map::const_iterator
01239 current = participant->publications().begin();
01240 current != participant->publications().end();
01241 ++current) {
01242 keylist.push_back(current->second->get_id());
01243 }
01244
01245 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01246 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01247 ACE_DEBUG((LM_DEBUG,
01248 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01249 ACE_TEXT("%d publications to remove from participant %C.\n"),
01250 keylist.size(),
01251 std::string(converter).c_str()));
01252 }
01253
01254 for (unsigned int key = 0; key < keylist.size(); ++key) {
01255 if (participant->remove_publication(keylist[ key]) != 0) {
01256 status = false;
01257 }
01258 }
01259
01260
01261 keylist.clear();
01262
01263 for (DCPS_IR_Topic_Map::const_iterator
01264 current = participant->topics().begin();
01265 current != participant->topics().end();
01266 ++current) {
01267 keylist.push_back(current->second->get_id());
01268 }
01269
01270 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
01271 OpenDDS::DCPS::RepoIdConverter converter(candidates[index]);
01272 ACE_DEBUG((LM_DEBUG,
01273 ACE_TEXT("(%P|%t) (bool)TAO_DDS_DCPSInfo_i::remove_by_owner: ")
01274 ACE_TEXT("%d topics to remove from participant %C.\n"),
01275 keylist.size(),
01276 std::string(converter).c_str()));
01277 }
01278
01279 for (unsigned int key = 0; key < keylist.size(); ++key) {
01280 DCPS_IR_Topic* discard;
01281
01282 if (participant->remove_topic_reference(keylist[ key], discard) != 0) {
01283 status = false;
01284 }
01285 }
01286
01287
01288 this->remove_domain_participant(domain, candidates[ index]);
01289 }
01290
01291 return status;
01292 }
01293
01294 void
01295 TAO_DDS_DCPSInfo_i::disassociate_participant(
01296 DDS::DomainId_t domainId,
01297 const OpenDDS::DCPS::RepoId& local_id,
01298 const OpenDDS::DCPS::RepoId& remote_id)
01299 {
01300 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01301
01302 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01303 if (it == this->domains_.end()) {
01304 throw OpenDDS::DCPS::Invalid_Domain();
01305 }
01306
01307 DCPS_IR_Participant* participant = it->second->participant(local_id);
01308 if (participant == 0) {
01309 throw OpenDDS::DCPS::Invalid_Participant();
01310 }
01311
01312
01313 const DCPS_IR_Subscription_Map& subscriptions = participant->subscriptions();
01314 for (DCPS_IR_Subscription_Map::const_iterator sub(subscriptions.begin());
01315 sub != subscriptions.end(); ++sub) {
01316 sub->second->disassociate_participant(remote_id, true);
01317 }
01318
01319 const DCPS_IR_Publication_Map& publications = participant->publications();
01320 for (DCPS_IR_Publication_Map::const_iterator pub(publications.begin());
01321 pub != publications.end(); ++pub) {
01322 pub->second->disassociate_participant(remote_id, true);
01323 }
01324
01325 it->second->remove_dead_participants();
01326 }
01327
01328 void
01329 TAO_DDS_DCPSInfo_i::disassociate_subscription(
01330 DDS::DomainId_t domainId,
01331 const OpenDDS::DCPS::RepoId& participantId,
01332 const OpenDDS::DCPS::RepoId& local_id,
01333 const OpenDDS::DCPS::RepoId& remote_id)
01334 {
01335 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01336
01337 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01338 if (it == this->domains_.end()) {
01339 throw OpenDDS::DCPS::Invalid_Domain();
01340 }
01341
01342 DCPS_IR_Participant* participant = it->second->participant(participantId);
01343 if (participant == 0) {
01344 throw OpenDDS::DCPS::Invalid_Participant();
01345 }
01346
01347 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01348 ACE_DEBUG((LM_INFO, "(%P|%t) disassociating subscription\n"));
01349 }
01350
01351 DCPS_IR_Subscription* subscription;
01352 if (participant->find_subscription_reference(local_id, subscription)
01353 != 0 || subscription == 0) {
01354 OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01355 OpenDDS::DCPS::RepoIdConverter sub_converter(local_id);
01356 ACE_ERROR((LM_ERROR,
01357 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_subscription: ")
01358 ACE_TEXT("participant %C could not find subscription %C.\n"),
01359 std::string(part_converter).c_str(),
01360 std::string(sub_converter).c_str()));
01361 throw OpenDDS::DCPS::Invalid_Subscription();
01362 }
01363
01364
01365 subscription->disassociate_publication(remote_id, true);
01366
01367 it->second->remove_dead_participants();
01368 }
01369
01370 void
01371 TAO_DDS_DCPSInfo_i::disassociate_publication(
01372 DDS::DomainId_t domainId,
01373 const OpenDDS::DCPS::RepoId& participantId,
01374 const OpenDDS::DCPS::RepoId& local_id,
01375 const OpenDDS::DCPS::RepoId& remote_id)
01376 {
01377 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01378
01379 DCPS_IR_Domain_Map::iterator it(this->domains_.find(domainId));
01380 if (it == this->domains_.end()) {
01381 throw OpenDDS::DCPS::Invalid_Domain();
01382 }
01383
01384 DCPS_IR_Participant* participant = it->second->participant(participantId);
01385 if (participant == 0) {
01386 throw OpenDDS::DCPS::Invalid_Participant();
01387 }
01388
01389 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01390 ACE_DEBUG((LM_INFO, "(%P|%t) disassociating publication\n"));
01391 }
01392
01393 DCPS_IR_Publication* publication;
01394 if (participant->find_publication_reference(local_id, publication)
01395 != 0 || publication == 0) {
01396 OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01397 OpenDDS::DCPS::RepoIdConverter pub_converter(local_id);
01398 ACE_ERROR((LM_ERROR,
01399 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::disassociate_publication: ")
01400 ACE_TEXT("participant %C could not find publication %C.\n"),
01401 std::string(part_converter).c_str(),
01402 std::string(pub_converter).c_str()));
01403 throw OpenDDS::DCPS::Invalid_Publication();
01404 }
01405
01406
01407 publication->disassociate_subscription(remote_id, true);
01408
01409 it->second->remove_dead_participants();
01410 }
01411
01412 void TAO_DDS_DCPSInfo_i::remove_domain_participant(
01413 DDS::DomainId_t domainId,
01414 const OpenDDS::DCPS::RepoId& participantId)
01415 {
01416 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01417
01418
01419 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01420
01421 if (where == this->domains_.end()) {
01422 throw OpenDDS::DCPS::Invalid_Domain();
01423 }
01424
01425 DCPS_IR_Participant* participant = where->second->participant(participantId);
01426
01427 if (participant == 0) {
01428 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01429 ACE_ERROR((LM_ERROR,
01430 ACE_TEXT("(%P|%t) ERROR: (bool)TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01431 ACE_TEXT("failed to locate participant %C in domain %d.\n"),
01432 std::string(converter).c_str(),
01433 domainId));
01434 throw OpenDDS::DCPS::Invalid_Participant();
01435 }
01436
01437
01438
01439 bool sendUpdate = (participant->isOwner() == true)
01440 && (participant->isBitPublisher() == false);
01441
01442 CORBA::Boolean dont_notify_lost = 0;
01443 int status = where->second->remove_participant(participantId, dont_notify_lost);
01444
01445 if (0 != status) {
01446
01447 throw OpenDDS::DCPS::Invalid_Participant();
01448 }
01449
01450
01451 if (this->um_ && sendUpdate) {
01452 Update::IdPath path(
01453 where->second->get_id(),
01454 participantId,
01455 participantId);
01456 this->um_->destroy(path, Update::Participant);
01457
01458 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01459 OpenDDS::DCPS::RepoIdConverter converter(participantId);
01460 ACE_DEBUG((LM_DEBUG,
01461 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::remove_domain_participant: ")
01462 ACE_TEXT("pushing deletion of participant %C in domain %d.\n"),
01463 std::string(converter).c_str(),
01464 domainId));
01465 }
01466 }
01467 }
01468
01469 void TAO_DDS_DCPSInfo_i::association_complete(DDS::DomainId_t domainId,
01470 const OpenDDS::DCPS::RepoId& participantId,
01471 const OpenDDS::DCPS::RepoId& localId,
01472 const OpenDDS::DCPS::RepoId& remoteId)
01473 {
01474 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01475
01476 DCPS_IR_Domain_Map::iterator dom_iter = this->domains_.find(domainId);
01477 if (dom_iter == this->domains_.end()) {
01478 return;
01479 }
01480
01481 DCPS_IR_Participant* partPtr = dom_iter->second->participant(participantId);
01482 if (0 == partPtr) {
01483 return;
01484 }
01485
01486
01487
01488 DCPS_IR_Subscription* sub = 0;
01489 DCPS_IR_Publication* pub = 0;
01490 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01491 ACE_DEBUG((LM_INFO, "(%P|%t) completing association\n"));
01492 }
01493 if (0 == partPtr->find_subscription_reference(localId, sub)) {
01494 sub->association_complete(remoteId);
01495 } else if (0 == partPtr->find_publication_reference(localId, pub)) {
01496 pub->association_complete(remoteId);
01497 } else {
01498 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01499 OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01500 OpenDDS::DCPS::RepoIdConverter local_converter(localId);
01501 OpenDDS::DCPS::RepoIdConverter remote_converter(remoteId);
01502 ACE_DEBUG((LM_WARNING,
01503 ACE_TEXT("(%P|%t) WARNING: TAO_DDS_DCPSInfo_i::association_complete: ")
01504 ACE_TEXT("participant %C could not find subscription or publication %C ")
01505 ACE_TEXT("to complete association with remote %C.\n"),
01506 std::string(part_converter).c_str(),
01507 std::string(local_converter).c_str(),
01508 std::string(remote_converter).c_str()));
01509 }
01510 }
01511 }
01512
01513 void TAO_DDS_DCPSInfo_i::ignore_domain_participant(
01514 DDS::DomainId_t domainId,
01515 const OpenDDS::DCPS::RepoId& myParticipantId,
01516 const OpenDDS::DCPS::RepoId& ignoreId)
01517 {
01518 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01519
01520
01521 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01522
01523 if (where == this->domains_.end()) {
01524 throw OpenDDS::DCPS::Invalid_Domain();
01525 }
01526
01527
01528 DCPS_IR_Participant* partPtr
01529 = where->second->participant(myParticipantId);
01530
01531 if (0 == partPtr) {
01532 throw OpenDDS::DCPS::Invalid_Participant();
01533 }
01534
01535 partPtr->ignore_participant(ignoreId);
01536
01537 where->second->remove_dead_participants();
01538 }
01539
01540 void TAO_DDS_DCPSInfo_i::ignore_topic(
01541 DDS::DomainId_t domainId,
01542 const OpenDDS::DCPS::RepoId& myParticipantId,
01543 const OpenDDS::DCPS::RepoId& ignoreId)
01544 {
01545 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01546
01547
01548 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01549
01550 if (where == this->domains_.end()) {
01551 throw OpenDDS::DCPS::Invalid_Domain();
01552 }
01553
01554
01555 DCPS_IR_Participant* partPtr
01556 = where->second->participant(myParticipantId);
01557
01558 if (0 == partPtr) {
01559 throw OpenDDS::DCPS::Invalid_Participant();
01560 }
01561
01562 partPtr->ignore_topic(ignoreId);
01563
01564 where->second->remove_dead_participants();
01565 }
01566
01567 void TAO_DDS_DCPSInfo_i::ignore_subscription(
01568 DDS::DomainId_t domainId,
01569 const OpenDDS::DCPS::RepoId& myParticipantId,
01570 const OpenDDS::DCPS::RepoId& ignoreId)
01571 {
01572 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01573
01574
01575 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01576
01577 if (where == this->domains_.end()) {
01578 throw OpenDDS::DCPS::Invalid_Domain();
01579 }
01580
01581
01582 DCPS_IR_Participant* partPtr
01583 = where->second->participant(myParticipantId);
01584
01585 if (0 == partPtr) {
01586 throw OpenDDS::DCPS::Invalid_Participant();
01587 }
01588
01589 partPtr->ignore_subscription(ignoreId);
01590
01591 where->second->remove_dead_participants();
01592 }
01593
01594 void TAO_DDS_DCPSInfo_i::ignore_publication(
01595 DDS::DomainId_t domainId,
01596 const OpenDDS::DCPS::RepoId& myParticipantId,
01597 const OpenDDS::DCPS::RepoId& ignoreId)
01598 {
01599 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01600
01601
01602 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01603
01604 if (where == this->domains_.end()) {
01605 throw OpenDDS::DCPS::Invalid_Domain();
01606 }
01607
01608
01609 DCPS_IR_Participant* partPtr
01610 = where->second->participant(myParticipantId);
01611
01612 if (0 == partPtr) {
01613 throw OpenDDS::DCPS::Invalid_Participant();
01614 }
01615
01616 partPtr->ignore_publication(ignoreId);
01617
01618 where->second->remove_dead_participants();
01619 }
01620
01621 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_publication_qos(
01622 DDS::DomainId_t domainId,
01623 const OpenDDS::DCPS::RepoId& partId,
01624 const OpenDDS::DCPS::RepoId& dwId,
01625 const DDS::DataWriterQos & qos,
01626 const DDS::PublisherQos & publisherQos)
01627 {
01628 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01629
01630
01631 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01632
01633 if (where == this->domains_.end()) {
01634 throw OpenDDS::DCPS::Invalid_Domain();
01635 }
01636
01637
01638 DCPS_IR_Participant* partPtr
01639 = where->second->participant(partId);
01640
01641 if (0 == partPtr) {
01642 throw OpenDDS::DCPS::Invalid_Participant();
01643 }
01644
01645 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01646 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 1\n"));
01647 }
01648
01649 DCPS_IR_Publication* pub;
01650
01651 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01652 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01653 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01654 ACE_ERROR((LM_ERROR,
01655 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01656 ACE_TEXT("participant %C could not find publication %C.\n"),
01657 std::string(part_converter).c_str(),
01658 std::string(pub_converter).c_str()));
01659 throw OpenDDS::DCPS::Invalid_Publication();
01660 }
01661
01662 Update::SpecificQos qosType;
01663
01664 if (pub->set_qos(qos, publisherQos, qosType) == false)
01665 return 0;
01666
01667 if (this->um_ && (partPtr->isBitPublisher() == false)) {
01668 Update::IdPath path(domainId, partId, dwId);
01669
01670 switch (qosType) {
01671 case Update::DataWriterQos:
01672 this->um_->update(path, qos);
01673 break;
01674
01675 case Update::PublisherQos:
01676 this->um_->update(path, publisherQos);
01677 break;
01678
01679 case Update::NoQos:
01680 default:
01681 break;
01682 }
01683
01684 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01685 OpenDDS::DCPS::RepoIdConverter converter(dwId);
01686 ACE_DEBUG((LM_DEBUG,
01687 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01688 ACE_TEXT("pushing update of publication %C in domain %d.\n"),
01689 std::string(converter).c_str(),
01690 domainId));
01691 }
01692 }
01693
01694 return 1;
01695 }
01696
01697 void
01698 TAO_DDS_DCPSInfo_i::update_publication_qos(
01699 DDS::DomainId_t domainId,
01700 const OpenDDS::DCPS::RepoId& partId,
01701 const OpenDDS::DCPS::RepoId& dwId,
01702 const DDS::DataWriterQos& qos)
01703 {
01704 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01705
01706
01707 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01708
01709 if (where == this->domains_.end()) {
01710 throw OpenDDS::DCPS::Invalid_Domain();
01711 }
01712
01713
01714 DCPS_IR_Participant* partPtr
01715 = where->second->participant(partId);
01716
01717 if (0 == partPtr) {
01718 throw OpenDDS::DCPS::Invalid_Participant();
01719 }
01720
01721 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01722 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 2\n"));
01723 }
01724
01725 DCPS_IR_Publication* pub;
01726
01727 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01728 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01729 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01730 ACE_ERROR((LM_ERROR,
01731 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01732 ACE_TEXT("participant %C could not find publication %C.\n"),
01733 std::string(part_converter).c_str(),
01734 std::string(pub_converter).c_str()));
01735 throw OpenDDS::DCPS::Invalid_Publication();
01736 }
01737
01738 pub->set_qos(qos);
01739 }
01740
01741 void
01742 TAO_DDS_DCPSInfo_i::update_publication_qos(
01743 DDS::DomainId_t domainId,
01744 const OpenDDS::DCPS::RepoId& partId,
01745 const OpenDDS::DCPS::RepoId& dwId,
01746 const DDS::PublisherQos& qos)
01747 {
01748 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01749
01750
01751 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01752
01753 if (where == this->domains_.end()) {
01754 throw OpenDDS::DCPS::Invalid_Domain();
01755 }
01756
01757
01758 DCPS_IR_Participant* partPtr
01759 = where->second->participant(partId);
01760
01761 if (0 == partPtr) {
01762 throw OpenDDS::DCPS::Invalid_Participant();
01763 }
01764
01765 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01766 ACE_DEBUG((LM_INFO, "(%P|%t) updating publication qos 3\n"));
01767 }
01768
01769 DCPS_IR_Publication* pub;
01770
01771 if (partPtr->find_publication_reference(dwId, pub) != 0 || pub == 0) {
01772 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01773 OpenDDS::DCPS::RepoIdConverter pub_converter(dwId);
01774 ACE_ERROR((LM_ERROR,
01775 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_publication_qos: ")
01776 ACE_TEXT("participant %C could not find publication %C.\n"),
01777 std::string(part_converter).c_str(),
01778 std::string(pub_converter).c_str()));
01779 throw OpenDDS::DCPS::Invalid_Publication();
01780 }
01781
01782 pub->set_qos(qos);
01783 }
01784
01785 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_subscription_qos(
01786 DDS::DomainId_t domainId,
01787 const OpenDDS::DCPS::RepoId& partId,
01788 const OpenDDS::DCPS::RepoId& drId,
01789 const DDS::DataReaderQos & qos,
01790 const DDS::SubscriberQos & subscriberQos)
01791 {
01792 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01793
01794
01795 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01796
01797 if (where == this->domains_.end()) {
01798 throw OpenDDS::DCPS::Invalid_Domain();
01799 }
01800
01801
01802 DCPS_IR_Participant* partPtr
01803 = where->second->participant(partId);
01804
01805 if (0 == partPtr) {
01806 throw OpenDDS::DCPS::Invalid_Participant();
01807 }
01808
01809 DCPS_IR_Subscription* sub;
01810
01811 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01812 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 1\n"));
01813 }
01814
01815 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01816 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01817 OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01818 ACE_ERROR((LM_ERROR,
01819 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01820 ACE_TEXT("participant %C could not find subscription %C.\n"),
01821 std::string(part_converter).c_str(),
01822 std::string(sub_converter).c_str()));
01823 throw OpenDDS::DCPS::Invalid_Subscription();
01824 }
01825
01826 Update::SpecificQos qosType;
01827
01828 if (sub->set_qos(qos, subscriberQos, qosType) == false)
01829 return 0;
01830
01831 if (this->um_ && (partPtr->isBitPublisher() == false)) {
01832 Update::IdPath path(domainId, partId, drId);
01833
01834 switch (qosType) {
01835 case Update::DataReaderQos:
01836 this->um_->update(path, qos);
01837 break;
01838
01839 case Update::SubscriberQos:
01840 this->um_->update(path, subscriberQos);
01841 break;
01842
01843 case Update::NoQos:
01844 default:
01845 break;
01846 }
01847
01848 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
01849 OpenDDS::DCPS::RepoIdConverter converter(drId);
01850 ACE_DEBUG((LM_DEBUG,
01851 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01852 ACE_TEXT("pushing update of subscription %C in domain %d.\n"),
01853 std::string(converter).c_str(),
01854 domainId));
01855 }
01856 }
01857
01858 return 1;
01859 }
01860
01861 void
01862 TAO_DDS_DCPSInfo_i::update_subscription_qos(
01863 DDS::DomainId_t domainId,
01864 const OpenDDS::DCPS::RepoId& partId,
01865 const OpenDDS::DCPS::RepoId& drId,
01866 const DDS::DataReaderQos& qos)
01867 {
01868 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01869
01870
01871 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01872
01873 if (where == this->domains_.end()) {
01874 throw OpenDDS::DCPS::Invalid_Domain();
01875 }
01876
01877
01878 DCPS_IR_Participant* partPtr
01879 = where->second->participant(partId);
01880
01881 if (0 == partPtr) {
01882 throw OpenDDS::DCPS::Invalid_Participant();
01883 }
01884
01885 DCPS_IR_Subscription* sub;
01886
01887 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01888 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 2\n"));
01889 }
01890
01891 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01892 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01893 OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01894 ACE_ERROR((LM_ERROR,
01895 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01896 ACE_TEXT("participant %C could not find subscription %C.\n"),
01897 std::string(part_converter).c_str(),
01898 std::string(sub_converter).c_str()));
01899 throw OpenDDS::DCPS::Invalid_Subscription();
01900 }
01901
01902 sub->set_qos(qos);
01903 }
01904
01905 void
01906 TAO_DDS_DCPSInfo_i::update_subscription_qos(
01907 DDS::DomainId_t domainId,
01908 const OpenDDS::DCPS::RepoId& partId,
01909 const OpenDDS::DCPS::RepoId& drId,
01910 const DDS::SubscriberQos& qos)
01911 {
01912 ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, this->lock_);
01913
01914
01915 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
01916
01917 if (where == this->domains_.end()) {
01918 throw OpenDDS::DCPS::Invalid_Domain();
01919 }
01920
01921
01922 DCPS_IR_Participant* partPtr
01923 = where->second->participant(partId);
01924
01925 if (0 == partPtr) {
01926 throw OpenDDS::DCPS::Invalid_Participant();
01927 }
01928
01929 DCPS_IR_Subscription* sub;
01930
01931 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01932 ACE_DEBUG((LM_INFO, "(%P|%t) updating QOS for subscription 3\n"));
01933 }
01934
01935 if (partPtr->find_subscription_reference(drId, sub) != 0 || sub == 0) {
01936 OpenDDS::DCPS::RepoIdConverter part_converter(partId);
01937 OpenDDS::DCPS::RepoIdConverter sub_converter(drId);
01938 ACE_ERROR((LM_ERROR,
01939 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_qos: ")
01940 ACE_TEXT("participant %C could not find subscription %C.\n"),
01941 std::string(part_converter).c_str(),
01942 std::string(sub_converter).c_str()));
01943 throw OpenDDS::DCPS::Invalid_Subscription();
01944 }
01945
01946 sub->set_qos(qos);
01947 }
01948
01949 CORBA::Boolean
01950 TAO_DDS_DCPSInfo_i::update_subscription_params(
01951 DDS::DomainId_t domainId,
01952 const OpenDDS::DCPS::RepoId& participantId,
01953 const OpenDDS::DCPS::RepoId& subscriptionId,
01954 const DDS::StringSeq& params)
01955 {
01956 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
01957
01958 DCPS_IR_Domain_Map::iterator domain = this->domains_.find(domainId);
01959 if (domain == this->domains_.end()) {
01960 throw OpenDDS::DCPS::Invalid_Domain();
01961 }
01962
01963 DCPS_IR_Participant* partPtr = domain->second->participant(participantId);
01964 if (0 == partPtr) {
01965 throw OpenDDS::DCPS::Invalid_Participant();
01966 }
01967
01968 if (OpenDDS::DCPS::DCPS_debug_level > 3) {
01969 ACE_DEBUG((LM_INFO, "(%P|%t) updating subscription params\n"));
01970 }
01971
01972 DCPS_IR_Subscription* sub;
01973 if (partPtr->find_subscription_reference(subscriptionId, sub) != 0) {
01974 OpenDDS::DCPS::RepoIdConverter part_converter(participantId);
01975 OpenDDS::DCPS::RepoIdConverter sub_converter(subscriptionId);
01976 ACE_ERROR((LM_ERROR,
01977 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::update_subscription_params: ")
01978 ACE_TEXT("participant %C could not find subscription %C.\n"),
01979 std::string(part_converter).c_str(),
01980 std::string(sub_converter).c_str()));
01981 throw OpenDDS::DCPS::Invalid_Subscription();
01982 }
01983
01984 sub->update_expr_params(params);
01985
01986 if (this->um_ && !partPtr->isBitPublisher()) {
01987 Update::IdPath path(domainId, participantId, subscriptionId);
01988 this->um_->update(path, params);
01989 }
01990
01991 return true;
01992 }
01993
01994 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_topic_qos(
01995 const OpenDDS::DCPS::RepoId& topicId,
01996 DDS::DomainId_t domainId,
01997 const OpenDDS::DCPS::RepoId& participantId,
01998 const DDS::TopicQos & qos)
01999 {
02000 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02001
02002
02003 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02004
02005 if (where == this->domains_.end()) {
02006 throw OpenDDS::DCPS::Invalid_Domain();
02007 }
02008
02009
02010 DCPS_IR_Participant* partPtr
02011 = where->second->participant(participantId);
02012
02013 if (0 == partPtr) {
02014 throw OpenDDS::DCPS::Invalid_Participant();
02015 }
02016
02017 DCPS_IR_Topic* topic;
02018
02019 if (partPtr->find_topic_reference(topicId, topic) != 0) {
02020 throw OpenDDS::DCPS::Invalid_Topic();
02021 }
02022
02023 if (topic->set_topic_qos(qos) == false)
02024 return 0;
02025
02026 if (this->um_
02027 && (partPtr->isOwner() == true)
02028 && (partPtr->isBitPublisher() == false)) {
02029 Update::IdPath path(domainId, participantId, topicId);
02030 this->um_->update(path, qos);
02031
02032 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02033 OpenDDS::DCPS::RepoIdConverter converter(topicId);
02034 ACE_DEBUG((LM_DEBUG,
02035 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_topic_qos: ")
02036 ACE_TEXT("pushing update of topic %C in domain %d.\n"),
02037 std::string(converter).c_str(),
02038 domainId));
02039 }
02040 }
02041
02042 return 1;
02043 }
02044
02045 CORBA::Boolean TAO_DDS_DCPSInfo_i::update_domain_participant_qos(
02046 DDS::DomainId_t domainId,
02047 const OpenDDS::DCPS::RepoId& participantId,
02048 const DDS::DomainParticipantQos & qos)
02049 {
02050 ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, this->lock_, 0);
02051
02052
02053 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domainId);
02054
02055 if (where == this->domains_.end()) {
02056 throw OpenDDS::DCPS::Invalid_Domain();
02057 }
02058
02059
02060 DCPS_IR_Participant* partPtr
02061 = where->second->participant(participantId);
02062
02063 if (0 == partPtr) {
02064 throw OpenDDS::DCPS::Invalid_Participant();
02065 }
02066
02067 if (partPtr->set_qos(qos) == false)
02068 return 0;
02069
02070 if (this->um_
02071 && (partPtr->isOwner() == true)
02072 && (partPtr->isBitPublisher() == false)) {
02073 Update::IdPath path(domainId, participantId, participantId);
02074 this->um_->update(path, qos);
02075
02076 if (OpenDDS::DCPS::DCPS_debug_level > 4) {
02077 OpenDDS::DCPS::RepoIdConverter converter(participantId);
02078 ACE_DEBUG((LM_DEBUG,
02079 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::update_domain_participant_qos: ")
02080 ACE_TEXT("pushing update of participant %C in domain %d.\n"),
02081 std::string(converter).c_str(),
02082 domainId));
02083 }
02084 }
02085
02086 return 1;
02087 }
02088
02089 DCPS_IR_Domain*
02090 TAO_DDS_DCPSInfo_i::domain(DDS::DomainId_t domain)
02091 {
02092 if (domain == OpenDDS::DCPS::Service_Participant::ANY_DOMAIN) {
02093 ACE_ERROR((LM_ERROR,
02094 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02095 ACE_TEXT("ANY_DOMAIN not supported for operations.\n")));
02096 return 0;
02097 }
02098
02099
02100 DCPS_IR_Domain_Map::iterator where = this->domains_.find(domain);
02101
02102 if (where == this->domains_.end()) {
02103
02104 DCPS_IR_Domain* domainPtr;
02105 ACE_NEW_RETURN(domainPtr,
02106 DCPS_IR_Domain(domain, this->participantIdGenerator_),
02107 0);
02108
02109
02110
02111 this->domains_.insert(
02112 where,
02113 DCPS_IR_Domain_Map::value_type(domain, domainPtr));
02114
02115 int bit_status = 0;
02116
02117 if (TheServiceParticipant->get_BIT()) {
02118 #if !defined (DDS_HAS_MINIMUM_BIT)
02119 bit_status = domainPtr->init_built_in_topics(this->federation_.overridden());
02120 #endif // !defined (DDS_HAS_MINIMUM_BIT)
02121 }
02122
02123 if (0 != bit_status) {
02124 ACE_ERROR((LM_ERROR,
02125 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::domain: ")
02126 ACE_TEXT("failed to initialize the Built-In Topics ")
02127 ACE_TEXT("when loading domain %d.\n"),
02128 domain));
02129 this->domains_.erase(domain);
02130 delete domainPtr;
02131 return 0;
02132 }
02133
02134 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02135 ACE_DEBUG((LM_DEBUG,
02136 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::domain: ")
02137 ACE_TEXT("successfully loaded domain %d at %x.\n"),
02138 domain,
02139 domainPtr));
02140 }
02141 return domainPtr;
02142
02143 } else {
02144 return where->second;
02145 }
02146 }
02147
02148 int TAO_DDS_DCPSInfo_i::init_transport(int listen_address_given,
02149 const char* listen_str)
02150 {
02151 int status = 0;
02152
02153 try {
02154
02155 #ifndef ACE_AS_STATIC_LIBS
02156 if (ACE_Service_Config::current()->find(ACE_TEXT("OpenDDS_Tcp"))
02157 < 0 ) {
02158 static const ACE_TCHAR directive[] =
02159 ACE_TEXT("dynamic OpenDDS_Tcp Service_Object * ")
02160 ACE_TEXT("OpenDDS_Tcp:_make_TcpLoader()");
02161 ACE_Service_Config::process_directive(directive);
02162 }
02163 #endif
02164
02165 std::string config_name =
02166 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02167 + std::string("InfoRepoBITTransportConfig");
02168 OpenDDS::DCPS::TransportConfig_rch config =
02169 OpenDDS::DCPS::TransportRegistry::instance()->create_config(config_name);
02170
02171 std::string inst_name =
02172 OpenDDS::DCPS::TransportRegistry::DEFAULT_INST_PREFIX
02173 + std::string("InfoRepoBITTCPTransportInst");
02174 OpenDDS::DCPS::TransportInst_rch inst =
02175 OpenDDS::DCPS::TransportRegistry::instance()->create_inst(inst_name,
02176 "tcp");
02177 config->instances_.push_back(inst);
02178
02179 OpenDDS::DCPS::TcpInst_rch tcp_inst =
02180 OpenDDS::DCPS::dynamic_rchandle_cast<OpenDDS::DCPS::TcpInst>(inst);
02181 inst->datalink_release_delay_ = 0;
02182
02183 tcp_inst->conn_retry_attempts_ = 0;
02184
02185 if (listen_address_given) {
02186 tcp_inst->local_address(listen_str);
02187 }
02188
02189 } catch (...) {
02190
02191
02192
02193 status = 1;
02194 }
02195 return status;
02196 }
02197
02198 bool
02199 TAO_DDS_DCPSInfo_i::receive_image(const Update::UImage& image)
02200 {
02201 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02202 ACE_DEBUG((LM_DEBUG,
02203 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02204 ACE_TEXT("processing persistent data.\n")));
02205 }
02206
02207
02208 for (Update::UImage::ParticipantSeq::const_iterator
02209 iter = image.participants.begin();
02210 iter != image.participants.end(); iter++) {
02211 const Update::UParticipant* part = *iter;
02212 OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02213 if (converter.federationId() == this->federation_.id()) {
02214 participantIdGenerator_.last(converter.participantId());
02215 }
02216 }
02217
02218 for (Update::UImage::ParticipantSeq::const_iterator
02219 iter = image.participants.begin();
02220 iter != image.participants.end(); iter++) {
02221 const Update::UParticipant* part = *iter;
02222
02223 if (!this->add_domain_participant(part->domainId, part->participantId
02224 , part->participantQos)) {
02225 OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02226 ACE_ERROR((LM_ERROR,
02227 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02228 ACE_TEXT("failed to add participant %C to domain %d.\n"),
02229 std::string(converter).c_str(),
02230 part->domainId));
02231 return false;
02232
02233 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02234 OpenDDS::DCPS::RepoIdConverter converter(part->participantId);
02235 ACE_DEBUG((LM_DEBUG,
02236 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02237 ACE_TEXT("added participant %C to domain %d.\n"),
02238 std::string(converter).c_str(),
02239 part->domainId));
02240 }
02241 }
02242
02243 for (Update::UImage::TopicSeq::const_iterator iter = image.topics.begin();
02244 iter != image.topics.end(); iter++) {
02245 const Update::UTopic* topic = *iter;
02246
02247 if (!this->add_topic(topic->topicId, topic->domainId
02248 , topic->participantId, topic->name.c_str()
02249 , topic->dataType.c_str(), topic->topicQos)) {
02250 OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02251 OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02252 ACE_ERROR((LM_ERROR,
02253 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02254 ACE_TEXT("failed to add topic %C to participant %C.\n"),
02255 std::string(topic_converter).c_str(),
02256 std::string(part_converter).c_str()));
02257 return false;
02258
02259 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02260 OpenDDS::DCPS::RepoIdConverter topic_converter(topic->topicId);
02261 OpenDDS::DCPS::RepoIdConverter part_converter(topic->participantId);
02262 ACE_DEBUG((LM_DEBUG,
02263 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02264 ACE_TEXT("added topic %C to participant %C.\n"),
02265 std::string(topic_converter).c_str(),
02266 std::string(part_converter).c_str()));
02267 }
02268 }
02269
02270 for (Update::UImage::ReaderSeq::const_iterator iter = image.actors.begin();
02271 iter != image.actors.end(); iter++) {
02272 const Update::URActor* sub = *iter;
02273
02274
02275 if (!this->add_subscription(sub->domainId, sub->participantId
02276 , sub->topicId, sub->actorId
02277 , sub->callback.c_str(), sub->drdwQos
02278 , sub->transportInterfaceInfo
02279 , sub->pubsubQos
02280 , sub->contentSubscriptionProfile.filterClassName
02281 , sub->contentSubscriptionProfile.filterExpr
02282 , sub->contentSubscriptionProfile.exprParams)) {
02283 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02284 OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02285 ACE_ERROR((LM_ERROR,
02286 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02287 ACE_TEXT("failed to add subscription %C to participant %C.\n"),
02288 std::string(sub_converter).c_str(),
02289 std::string(part_converter).c_str()));
02290 return false;
02291
02292 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02293 OpenDDS::DCPS::RepoIdConverter sub_converter(sub->actorId);
02294 OpenDDS::DCPS::RepoIdConverter part_converter(sub->participantId);
02295 ACE_DEBUG((LM_DEBUG,
02296 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02297 ACE_TEXT("added subscription %C to participant %C.\n"),
02298 std::string(sub_converter).c_str(),
02299 std::string(part_converter).c_str()));
02300 }
02301 }
02302
02303 for (Update::UImage::WriterSeq::const_iterator iter = image.wActors.begin();
02304 iter != image.wActors.end(); iter++) {
02305 const Update::UWActor* pub = *iter;
02306
02307
02308
02309 if (!this->add_publication(pub->domainId, pub->participantId
02310 , pub->topicId, pub->actorId
02311 , pub->callback.c_str() , pub->drdwQos
02312 , pub->transportInterfaceInfo
02313 , pub->pubsubQos
02314 , true)) {
02315 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02316 OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02317 ACE_ERROR((LM_ERROR,
02318 ACE_TEXT("(%P|%t) ERROR: TAO_DDS_DCPSInfo_i::receive_image: ")
02319 ACE_TEXT("failed to add publication %C to participant %C.\n"),
02320 std::string(pub_converter).c_str(),
02321 std::string(part_converter).c_str()));
02322 return false;
02323
02324 } else if (OpenDDS::DCPS::DCPS_debug_level > 0) {
02325 OpenDDS::DCPS::RepoIdConverter pub_converter(pub->actorId);
02326 OpenDDS::DCPS::RepoIdConverter part_converter(pub->participantId);
02327 ACE_DEBUG((LM_DEBUG,
02328 ACE_TEXT("(%P|%t) TAO_DDS_DCPSInfo_i::receive_image: ")
02329 ACE_TEXT("added publication %C to participant %C.\n"),
02330 std::string(pub_converter).c_str(),
02331 std::string(part_converter).c_str()));
02332 }
02333 }
02334
02335 #if !defined (DDS_HAS_MINIMUM_BIT)
02336
02337 for (DCPS_IR_Domain_Map::const_iterator currentDomain = domains_.begin();
02338 currentDomain != domains_.end();
02339 ++currentDomain) {
02340
02341 currentDomain->second->reassociate_built_in_topic_pubs();
02342 }
02343 #endif // !defined (DDS_HAS_MINIMUM_BIT)
02344
02345 return true;
02346 }
02347
02348 void
02349 TAO_DDS_DCPSInfo_i::add(Update::Updater* updater)
02350 {
02351 if (this->um_) {
02352 this->um_->add(updater);
02353 }
02354 }
02355
02356 bool
02357 TAO_DDS_DCPSInfo_i::init_persistence()
02358 {
02359 um_ = ACE_Dynamic_Service<UpdateManagerSvc>::instance
02360 ("UpdateManagerSvc");
02361
02362 if (um_ != 0) {
02363 um_->add(this);
02364
02365
02366 if (reincarnate_) {
02367 um_->requestImage();
02368 }
02369
02370 } else {
02371 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("TAO_DDS_DCPSInfo_i> Failed to discover ")
02372 ACE_TEXT("UpdateManagerSvc.\n")), false);
02373 }
02374
02375 return true;
02376 }
02377
02378 bool
02379 TAO_DDS_DCPSInfo_i::init_reassociation(const ACE_Time_Value& delay)
02380 {
02381 if (this->reassociate_timer_id_ != -1) return false;
02382
02383 ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02384
02385 this->reassociate_timer_id_ = reactor->schedule_timer(this, 0, delay, delay);
02386 return this->reassociate_timer_id_ != -1;
02387 }
02388
02389 bool
02390 TAO_DDS_DCPSInfo_i::init_dispatchChecking(const ACE_Time_Value& delay)
02391 {
02392 if (this->dispatch_check_timer_id_ != -1) return false;
02393
02394 ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02395
02396 this->dispatch_check_timer_id_ = reactor->schedule_timer(this, this, delay, delay);
02397 return this->dispatch_check_timer_id_ != -1;
02398 }
02399
02400 void
02401 TAO_DDS_DCPSInfo_i::finalize()
02402 {
02403 if (reassociate_timer_id_ != -1) {
02404 ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02405
02406 reactor->cancel_timer(this->reassociate_timer_id_);
02407 this->reassociate_timer_id_ = -1;
02408 }
02409
02410 if (dispatch_check_timer_id_ != -1) {
02411 ACE_Reactor* reactor = this->orb_->orb_core()->reactor();
02412
02413 reactor->cancel_timer(this->dispatch_check_timer_id_);
02414 this->dispatch_check_timer_id_ = -1;
02415 }
02416 }
02417
02418 const DCPS_IR_Domain_Map&
02419 TAO_DDS_DCPSInfo_i::domains() const
02420 {
02421 return this->domains_;
02422 }
02423
02424
02425 char*
02426 TAO_DDS_DCPSInfo_i::dump_to_string()
02427 {
02428 std::string dump;
02429 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02430 std::string indent (" ");
02431
02432 for (DCPS_IR_Domain_Map::const_iterator dm = domains_.begin();
02433 dm != domains_.end();
02434 dm++)
02435 {
02436 dump += dm->second->dump_to_string(indent, 0);
02437 }
02438 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
02439 return CORBA::string_dup(dump.c_str());
02440
02441 }