DCPS_IR_Participant.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include /**/ "DCPS_IR_Participant.h"
00010 #include "FederationId.h"
00011 #include "UpdateManager.h"
00012 
00013 #include /**/ "DCPS_IR_Domain.h"
00014 #include /**/ "DCPS_IR_Subscription.h"
00015 #include /**/ "DCPS_IR_Publication.h"
00016 #include /**/ "DCPS_IR_Topic.h"
00017 #include /**/ "DCPS_IR_Topic_Description.h"
00018 
00019 #include /**/ "dds/DCPS/RepoIdConverter.h"
00020 #include <sstream>
00021 
00022 #include /**/ "tao/debug.h"
00023 
00024 DCPS_IR_Participant::DCPS_IR_Participant(const TAO_DDS_DCPSFederationId& federationId,
00025                                          OpenDDS::DCPS::RepoId id,
00026                                          DCPS_IR_Domain* domain,
00027                                          DDS::DomainParticipantQos qos,
00028                                          Update::Manager* um)
00029   : id_(id),
00030     domain_(domain),
00031     qos_(qos),
00032     aliveStatus_(1),
00033     handle_(0),
00034     isBIT_(0),
00035     federationId_(federationId),
00036     owner_(federationId.overridden() ? OWNER_NONE : federationId.id()),
00037     topicIdGenerator_(
00038       federationId.id(),
00039       OpenDDS::DCPS::RepoIdConverter(id).participantId(),
00040       OpenDDS::DCPS::KIND_TOPIC),
00041     publicationIdGenerator_(
00042       federationId.id(),
00043       OpenDDS::DCPS::RepoIdConverter(id).participantId(),
00044       OpenDDS::DCPS::KIND_WRITER),
00045     subscriptionIdGenerator_(
00046       federationId.id(),
00047       OpenDDS::DCPS::RepoIdConverter(id).participantId(),
00048       OpenDDS::DCPS::KIND_READER),
00049     um_(um),
00050     isBitPublisher_(false)
00051 {
00052 }
00053 
00054 DCPS_IR_Participant::~DCPS_IR_Participant()
00055 {
00056   for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
00057        current != this->subscriptions_.end();
00058        ++current) {
00059     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00060     OpenDDS::DCPS::RepoIdConverter sub_converter(current->first);
00061     ACE_ERROR((LM_ERROR,
00062                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::~DCPS_IR_Participant: ")
00063                ACE_TEXT("domain %d participant %C removing subscription %C.\n"),
00064                this->domain_->get_id(),
00065                std::string(part_converter).c_str(),
00066                std::string(sub_converter).c_str()));
00067     remove_subscription(current->first);
00068   }
00069 
00070   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00071        current != this->publications_.end();
00072        ++current) {
00073     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00074     OpenDDS::DCPS::RepoIdConverter pub_converter(current->first);
00075     ACE_ERROR((LM_ERROR,
00076                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::~DCPS_IR_Participant: ")
00077                ACE_TEXT("domain %d participant %C removing publication %C.\n"),
00078                this->domain_->get_id(),
00079                std::string(part_converter).c_str(),
00080                std::string(pub_converter).c_str()));
00081     remove_publication(current->first);
00082   }
00083 
00084   for (DCPS_IR_Topic_Map::const_iterator current = this->topicRefs_.begin();
00085        current != this->topicRefs_.end();
00086        ++current) {
00087     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00088     OpenDDS::DCPS::RepoIdConverter topic_converter(current->first);
00089     ACE_ERROR((LM_ERROR,
00090                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::~DCPS_IR_Participant: ")
00091                ACE_TEXT("domain %d participant %C retained topic %C.\n"),
00092                this->domain_->get_id(),
00093                std::string(part_converter).c_str(),
00094                std::string(topic_converter).c_str()));
00095   }
00096 }
00097 
00098 const DCPS_IR_Publication_Map&
00099 DCPS_IR_Participant::publications() const
00100 {
00101   return this->publications_;
00102 }
00103 
00104 const DCPS_IR_Subscription_Map&
00105 DCPS_IR_Participant::subscriptions() const
00106 {
00107   return this->subscriptions_;
00108 }
00109 
00110 const DCPS_IR_Topic_Map&
00111 DCPS_IR_Participant::topics() const
00112 {
00113   return this->topicRefs_;
00114 }
00115 
00116 void
00117 DCPS_IR_Participant::takeOwnership()
00118 {
00119   /// Publish an update with our ownership.
00120   if (this->um_ && (this->isBitPublisher() == false)) {
00121     this->um_->create(
00122       Update::OwnershipData(
00123         this->domain_->get_id(),
00124         this->id_,
00125         this->federationId_.id()));
00126 
00127     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00128       OpenDDS::DCPS::RepoIdConverter converter(id_);
00129       ACE_DEBUG((LM_DEBUG,
00130                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::take_ownership: ")
00131                  ACE_TEXT("pushing ownership %C in domain %d.\n"),
00132                  std::string(converter).c_str(),
00133                  this->domain_->get_id()));
00134     }
00135   }
00136 
00137   // And now handle our internal ownership processing.
00138   this->changeOwner(this->federationId_.id(), this->federationId_.id());
00139 }
00140 
00141 void
00142 DCPS_IR_Participant::changeOwner(long sender, long owner)
00143 {
00144   {
00145     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ownerLock_);
00146 
00147     if ((owner == OWNER_NONE)
00148         && (this->isOwner() || (this->owner_ != sender))) {
00149       // Do not eliminate ownership if we are the owner or if the update
00150       // does not come from the current owner.
00151       return;
00152     }
00153 
00154     // Finally.  Change the value.
00155     this->owner_ = owner;
00156 
00157   } // End of lock scope.
00158 
00159   if (this->isOwner()) {
00160     /// @TODO: Ensure that any stalled callbacks are made.
00161   }
00162 }
00163 
00164 long
00165 DCPS_IR_Participant::owner() const
00166 {
00167   return this->owner_;
00168 }
00169 
00170 bool
00171 DCPS_IR_Participant::isOwner() const
00172 {
00173   return this->owner_ == this->federationId_.id();
00174 }
00175 
00176 bool&
00177 DCPS_IR_Participant::isBitPublisher()
00178 {
00179   return this->isBitPublisher_;
00180 }
00181 
00182 bool
00183 DCPS_IR_Participant::isBitPublisher() const
00184 {
00185   return this->isBitPublisher_;
00186 }
00187 
00188 int DCPS_IR_Participant::add_publication(DCPS_IR_Publication* pub)
00189 {
00190   OpenDDS::DCPS::RepoId pubId = pub->get_id();
00191   DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
00192 
00193   if (where == this->publications_.end()) {
00194     this->publications_.insert(
00195       where, DCPS_IR_Publication_Map::value_type(pubId, pub));
00196 
00197     if (isBitPublisher_) {
00198       pub->set_bit_status(isBitPublisher_);
00199     }
00200 
00201     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00202       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00203       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00204       ACE_DEBUG((LM_DEBUG,
00205                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_publication: ")
00206                  ACE_TEXT("participant %C successfully added publication %C at 0x%x.\n"),
00207                  std::string(part_converter).c_str(),
00208                  std::string(pub_converter).c_str(),
00209                  pub));
00210     }
00211 
00212     return 0;
00213 
00214   } else {
00215     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00216       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00217       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00218       ACE_ERROR((LM_NOTICE,
00219                  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_publication: ")
00220                  ACE_TEXT("participant %C attempted to add existing publication %C.\n"),
00221                  std::string(part_converter).c_str(),
00222                  std::string(pub_converter).c_str()));
00223     }
00224 
00225     return 1;
00226   }
00227 }
00228 
00229 int DCPS_IR_Participant::find_publication_reference(OpenDDS::DCPS::RepoId pubId,
00230                                                     DCPS_IR_Publication* & pub)
00231 {
00232   DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
00233 
00234   if (where != this->publications_.end()) {
00235     pub = where->second;
00236 
00237     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00238       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00239       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00240       ACE_DEBUG((LM_DEBUG,
00241                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_publication_reference: ")
00242                  ACE_TEXT("participant %C found publication %C at 0x%x.\n"),
00243                  std::string(part_converter).c_str(),
00244                  std::string(pub_converter).c_str(),
00245                  pub));
00246     }
00247 
00248     return 0;
00249 
00250   } else {
00251     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00252       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00253       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00254       ACE_DEBUG((LM_DEBUG,
00255                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_publication_reference: ")
00256                  ACE_TEXT("participant %C could not find publication %C.\n"),
00257                  std::string(part_converter).c_str(),
00258                  std::string(pub_converter).c_str()));
00259     }
00260     pub = 0;
00261     return -1;
00262   }
00263 }
00264 
00265 int DCPS_IR_Participant::remove_publication(OpenDDS::DCPS::RepoId pubId)
00266 {
00267   DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
00268 
00269   if (where != this->publications_.end()) {
00270     DCPS_IR_Topic* topic = where->second->get_topic();
00271     topic->remove_publication_reference(where->second);
00272 
00273     if (0 != where->second->remove_associations(false)) {
00274       // N.B. As written today, this branch will never be taken.
00275       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00276       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00277       ACE_ERROR((LM_ERROR,
00278                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_publication: ")
00279                  ACE_TEXT("participant %C unable to remove associations from publication %C\n"),
00280                  std::string(part_converter).c_str(),
00281                  std::string(pub_converter).c_str()));
00282       return -1;
00283     }
00284 
00285     this->domain_->dispose_publication_bit(where->second);
00286     delete where->second;
00287     topic->release(false);
00288     this->publications_.erase(where);
00289 
00290     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00291       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00292       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00293       ACE_DEBUG((LM_DEBUG,
00294                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_publication: ")
00295                  ACE_TEXT("participant %C removed publication %C.\n"),
00296                  std::string(part_converter).c_str(),
00297                  std::string(pub_converter).c_str()));
00298     }
00299 
00300     return 0;
00301 
00302   } else {
00303     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00304     OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00305     ACE_ERROR((LM_ERROR,
00306                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_publication: ")
00307                ACE_TEXT("participant %C unable to remove publication %C.\n"),
00308                std::string(part_converter).c_str(),
00309                std::string(pub_converter).c_str()));
00310     return -1;
00311   }
00312 }
00313 
00314 int DCPS_IR_Participant::add_subscription(DCPS_IR_Subscription* sub)
00315 {
00316   OpenDDS::DCPS::RepoId subId = sub->get_id();
00317   DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
00318 
00319   if (where == this->subscriptions_.end()) {
00320     this->subscriptions_.insert(
00321       where, DCPS_IR_Subscription_Map::value_type(subId, sub));
00322 
00323     if (isBitPublisher_) {
00324       sub->set_bit_status(isBitPublisher_);
00325     }
00326 
00327     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00328       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00329       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00330       ACE_DEBUG((LM_DEBUG,
00331                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_subscription: ")
00332                  ACE_TEXT("participant %C successfully added subscription %C at 0x%x.\n"),
00333                  std::string(part_converter).c_str(),
00334                  std::string(sub_converter).c_str(),
00335                  sub));
00336     }
00337 
00338     return 0;
00339 
00340   } else {
00341     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00342       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00343       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00344       ACE_ERROR((LM_NOTICE,
00345                  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_subscription: ")
00346                  ACE_TEXT("participant %C attempted to add existing subscription %C.\n"),
00347                  std::string(part_converter).c_str(),
00348                  std::string(sub_converter).c_str()));
00349     }
00350 
00351     return 1;
00352   }
00353 }
00354 
00355 int DCPS_IR_Participant::find_subscription_reference(OpenDDS::DCPS::RepoId subId,
00356                                                      DCPS_IR_Subscription*& sub)
00357 {
00358   DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
00359 
00360   if (where != this->subscriptions_.end()) {
00361     sub = where->second;
00362 
00363     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00364       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00365       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00366       ACE_DEBUG((LM_DEBUG,
00367                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_subscription_reference: ")
00368                  ACE_TEXT("participant %C found subscription %C at 0x%x.\n"),
00369                  std::string(part_converter).c_str(),
00370                  std::string(sub_converter).c_str(),
00371                  sub));
00372     }
00373 
00374     return 0;
00375 
00376   } else {
00377     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00378       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00379       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00380       ACE_DEBUG((LM_DEBUG,
00381                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_subscription_reference: ")
00382                  ACE_TEXT("participant %C could not find subscription %C.\n"),
00383                  std::string(part_converter).c_str(),
00384                  std::string(sub_converter).c_str()));
00385     }
00386     sub = 0;
00387     return -1;
00388   }
00389 }
00390 
00391 int DCPS_IR_Participant::remove_subscription(OpenDDS::DCPS::RepoId subId)
00392 {
00393   DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
00394 
00395   if (where != this->subscriptions_.end()) {
00396     DCPS_IR_Topic* topic = where->second->get_topic();
00397     topic->remove_subscription_reference(where->second);
00398 
00399     if (0 != where->second->remove_associations(false)) {
00400       // N.B. As written today, this branch will never be taken.
00401       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00402       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00403       ACE_ERROR((LM_ERROR,
00404                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_subscription: ")
00405                  ACE_TEXT("participant %C unable to remove associations from subscription %C\n"),
00406                  std::string(part_converter).c_str(),
00407                  std::string(sub_converter).c_str()));
00408       return -1;
00409     }
00410 
00411     this->domain_->dispose_subscription_bit(where->second);
00412     delete where->second;
00413     topic->release(false);
00414     this->subscriptions_.erase(where);
00415 
00416     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00417       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00418       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00419       ACE_DEBUG((LM_DEBUG,
00420                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_subscription: ")
00421                  ACE_TEXT("participant %C removed subscription %C.\n"),
00422                  std::string(part_converter).c_str(),
00423                  std::string(sub_converter).c_str()));
00424     }
00425 
00426     return 0;
00427 
00428   } else {
00429     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00430     OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00431     ACE_ERROR((LM_ERROR,
00432                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_subscription: ")
00433                ACE_TEXT("participant %C unable to remove subscription %C.\n"),
00434                std::string(part_converter).c_str(),
00435                std::string(sub_converter).c_str()));
00436     return -1;
00437   }
00438 }
00439 
00440 int DCPS_IR_Participant::add_topic_reference(DCPS_IR_Topic* topic)
00441 {
00442   OpenDDS::DCPS::RepoId topicId = topic->get_id();
00443   DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
00444 
00445   if (where == this->topicRefs_.end()) {
00446     this->topicRefs_.insert(
00447       where, DCPS_IR_Topic_Map::value_type(topicId, topic));
00448 
00449     if (isBitPublisher_) {
00450       topic->set_bit_status(isBitPublisher_);
00451     }
00452 
00453     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00454       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00455       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00456       ACE_DEBUG((LM_DEBUG,
00457                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_topic_reference: ")
00458                  ACE_TEXT("participant %C successfully added topic %C at 0x%x.\n"),
00459                  std::string(part_converter).c_str(),
00460                  std::string(topic_converter).c_str(),
00461                  topic));
00462     }
00463 
00464     return 0;
00465 
00466   } else {
00467     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00468       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00469       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00470       ACE_DEBUG((LM_NOTICE,
00471                  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_topic_reference: ")
00472                  ACE_TEXT("participant %C attempted to add existing topic %C.\n"),
00473                  std::string(part_converter).c_str(),
00474                  std::string(topic_converter).c_str()));
00475     }
00476 
00477     return 1;
00478   }
00479 }
00480 
00481 int DCPS_IR_Participant::remove_topic_reference(OpenDDS::DCPS::RepoId topicId,
00482                                                 DCPS_IR_Topic*& topic)
00483 {
00484   DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
00485 
00486   if (where != this->topicRefs_.end()) {
00487     topic = where->second;
00488     this->topicRefs_.erase(where);
00489 
00490     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00491       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00492       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00493       ACE_DEBUG((LM_DEBUG,
00494                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_topic_reference: ")
00495                  ACE_TEXT("participant %C removed topic %C at 0x%x.\n"),
00496                  std::string(part_converter).c_str(),
00497                  std::string(topic_converter).c_str(),
00498                  topic));
00499     }
00500 
00501     return 0;
00502 
00503   } else {
00504     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00505       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00506       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00507       ACE_ERROR((LM_WARNING,
00508                  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Participant::remove_topic_reference: ")
00509                  ACE_TEXT("participant %C unable to find topic %C for removal.\n"),
00510                  std::string(part_converter).c_str(),
00511                  std::string(topic_converter).c_str()));
00512     }
00513 
00514     return -1;
00515   }
00516 }
00517 
00518 int DCPS_IR_Participant::find_topic_reference(OpenDDS::DCPS::RepoId topicId,
00519                                               DCPS_IR_Topic*& topic)
00520 {
00521   DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
00522 
00523   if (where != this->topicRefs_.end()) {
00524     topic = where->second;
00525 
00526     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00527       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00528       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00529       ACE_DEBUG((LM_DEBUG,
00530                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_topic_reference: ")
00531                  ACE_TEXT("participant %C found topic %C at %x.\n"),
00532                  std::string(part_converter).c_str(),
00533                  std::string(topic_converter).c_str(),
00534                  topic));
00535     }
00536 
00537     return 0;
00538 
00539   } else {
00540     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00541     OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00542     ACE_ERROR((LM_ERROR,
00543                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::find_topic_reference: ")
00544                ACE_TEXT("participant %C unable to find topic %C.\n"),
00545                std::string(part_converter).c_str(),
00546                std::string(topic_converter).c_str()));
00547     topic = 0;
00548     return -1;
00549   }
00550 }
00551 
00552 void DCPS_IR_Participant::remove_all_dependents(CORBA::Boolean notify_lost)
00553 {
00554   // remove all the publications associations
00555   {
00556     DCPS_IR_Publication_Map::const_iterator next = this->publications_.begin();
00557 
00558     while (next != this->publications_.end()) {
00559       DCPS_IR_Publication_Map::const_iterator current = next;
00560       ++ next;
00561       DCPS_IR_Topic* topic = current->second->get_topic();
00562       topic->remove_publication_reference(current->second);
00563 
00564       if (0 != current->second->remove_associations(notify_lost)) {
00565         return;
00566       }
00567 
00568       topic->release(false);
00569     }
00570   }
00571 
00572   {
00573     DCPS_IR_Subscription_Map::const_iterator next = this->subscriptions_.begin();
00574 
00575     while (next != this->subscriptions_.end()) {
00576       DCPS_IR_Subscription_Map::const_iterator current = next;
00577       ++ next;
00578       DCPS_IR_Topic* topic = current->second->get_topic();
00579       topic->remove_subscription_reference(current->second);
00580 
00581       if (0 != current->second->remove_associations(notify_lost)) {
00582         return;
00583       }
00584 
00585       topic->release(false);
00586     }
00587   }
00588 
00589   {
00590     DCPS_IR_Topic_Map::const_iterator next = this->topicRefs_.begin();
00591 
00592     while (next != this->topicRefs_.end()) {
00593       DCPS_IR_Topic_Map::const_iterator current = next;
00594       ++ next;
00595 
00596       // Notify the federation to remove the topic.
00597       if (this->um_ && (this->isBitPublisher() == false)) {
00598         Update::IdPath path(
00599           this->domain_->get_id(),
00600           this->get_id(),
00601           current->second->get_id());
00602         this->um_->destroy(path, Update::Topic);
00603 
00604         if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00605           OpenDDS::DCPS::RepoId id = current->second->get_id();
00606           OpenDDS::DCPS::RepoIdConverter converter(id);
00607           ACE_DEBUG((LM_DEBUG,
00608                      ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
00609                      ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
00610                      std::string(converter).c_str(),
00611                      this->domain_->get_id()));
00612         }
00613 
00614         // Remove the topic ourselves.
00615         // N.B. This call sets the second (reference) argument to 0, so when
00616         //      clear() is called below, no destructor is (re)called.
00617 
00618         // Get the topic id and topic point before remove_topic since it
00619         // invalidates the iterator. Accessing after removal got SEGV.
00620         OpenDDS::DCPS::RepoId id = current->first;
00621         DCPS_IR_Topic* topic = current->second;
00622 
00623         this->domain_->remove_topic(this, topic);
00624 
00625         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00626           OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00627           OpenDDS::DCPS::RepoIdConverter topic_converter(id);
00628           ACE_DEBUG((LM_DEBUG,
00629                      ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
00630                      ACE_TEXT("domain %d participant %C removed topic %C.\n"),
00631                      this->domain_->get_id(),
00632                      std::string(part_converter).c_str(),
00633                      std::string(topic_converter).c_str()));
00634         }
00635       }
00636     }
00637   }
00638 
00639   // Clear the Topic container of null pointers.
00640   this->topicRefs_.clear();
00641 
00642   // The publications and subscriptions can NOT be deleted until after all
00643   // the associations have been removed.  Otherwise an access violation
00644   // can occur because a publication and subscription of this participant
00645   // could be associated.
00646 
00647   // delete all the publications
00648   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00649        current != this->publications_.end();
00650        ++current) {
00651     // Notify the federation to destroy the publication.
00652     if (this->um_ && (this->isBitPublisher() == false)) {
00653       Update::IdPath path(
00654         this->domain_->get_id(),
00655         this->get_id(),
00656         current->second->get_id());
00657       this->um_->destroy(path, Update::Actor, Update::DataWriter);
00658 
00659       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00660         OpenDDS::DCPS::RepoId id = current->second->get_id();
00661         OpenDDS::DCPS::RepoIdConverter converter(id);
00662         ACE_DEBUG((LM_DEBUG,
00663                    ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
00664                    ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
00665                    std::string(converter).c_str(),
00666                    this->domain_->get_id()));
00667       }
00668     }
00669     // delete the publication
00670     delete current->second;
00671   }
00672 
00673   // Clear the container.
00674   this->publications_.clear();
00675 
00676   // delete all the subscriptions
00677   for (DCPS_IR_Subscription_Map::const_iterator current
00678        = this->subscriptions_.begin();
00679        current != this->subscriptions_.end();
00680        ++current) {
00681     // Notify the federation to destroy the subscription.
00682     if (this->um_ && (this->isBitPublisher() == false)) {
00683       Update::IdPath path(
00684         this->domain_->get_id(),
00685         this->get_id(),
00686         current->second->get_id());
00687       this->um_->destroy(path, Update::Actor, Update::DataReader);
00688 
00689       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00690         OpenDDS::DCPS::RepoId id = current->second->get_id();
00691         OpenDDS::DCPS::RepoIdConverter converter(id);
00692         ACE_DEBUG((LM_DEBUG,
00693                    ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
00694                    ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
00695                    std::string(converter).c_str(),
00696                    this->domain_->get_id()));
00697       }
00698     }
00699     // delete the subscription
00700     delete current->second;
00701   }
00702 
00703   // Clear the container.
00704   this->subscriptions_.clear();
00705 }
00706 
00707 void DCPS_IR_Participant::mark_dead()
00708 {
00709   aliveStatus_ = 0;
00710   domain_->add_dead_participant(this);
00711 }
00712 
00713 OpenDDS::DCPS::RepoId DCPS_IR_Participant::get_id()
00714 {
00715   return id_;
00716 }
00717 
00718 CORBA::Boolean DCPS_IR_Participant::is_alive()
00719 {
00720   return aliveStatus_;
00721 }
00722 
00723 void DCPS_IR_Participant::set_alive(CORBA::Boolean alive)
00724 {
00725   aliveStatus_ = alive;
00726 }
00727 
00728 void DCPS_IR_Participant::ignore_participant(OpenDDS::DCPS::RepoId id)
00729 {
00730   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00731     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00732     OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
00733     ACE_DEBUG((LM_DEBUG,
00734                ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_participant: ")
00735                ACE_TEXT("participant %C now ignoring participant %C.\n"),
00736                std::string(part_converter).c_str(),
00737                std::string(ignore_converter).c_str()));
00738   }
00739 
00740   ignoredParticipants_.insert(id);
00741 
00742   // disassociate any publications
00743   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00744        current != this->publications_.end();
00745        ++current) {
00746     current->second->disassociate_participant(id);
00747   }
00748 
00749   // disassociate any subscriptions
00750   for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
00751        current != this->subscriptions_.end();
00752        ++current) {
00753     current->second->disassociate_participant(id);
00754   }
00755 }
00756 
00757 void DCPS_IR_Participant::ignore_topic(OpenDDS::DCPS::RepoId id)
00758 {
00759   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00760     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00761     OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
00762     ACE_DEBUG((LM_DEBUG,
00763                ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_topic: ")
00764                ACE_TEXT("participant %C now ignoring topic %C.\n"),
00765                std::string(part_converter).c_str(),
00766                std::string(ignore_converter).c_str()));
00767   }
00768 
00769   ignoredTopics_.insert(id);
00770 
00771   // disassociate any publications
00772   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00773        current != this->publications_.end();
00774        ++current) {
00775     current->second->disassociate_topic(id);
00776   }
00777 
00778   // disassociate any subscriptions
00779   for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
00780        current != this->subscriptions_.end();
00781        ++current) {
00782     current->second->disassociate_topic(id);
00783   }
00784 }
00785 
00786 void DCPS_IR_Participant::ignore_publication(OpenDDS::DCPS::RepoId id)
00787 {
00788   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00789     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00790     OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
00791     ACE_DEBUG((LM_DEBUG,
00792                ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_publication: ")
00793                ACE_TEXT("participant %C now ignoring publication %C.\n"),
00794                std::string(part_converter).c_str(),
00795                std::string(ignore_converter).c_str()));
00796   }
00797 
00798   ignoredPublications_.insert(id);
00799 
00800   // disassociate any subscriptions
00801   for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
00802        current != this->subscriptions_.end();
00803        ++current) {
00804     current->second->disassociate_publication(id);
00805   }
00806 }
00807 
00808 void DCPS_IR_Participant::ignore_subscription(OpenDDS::DCPS::RepoId id)
00809 {
00810   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00811     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00812     OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
00813     ACE_DEBUG((LM_DEBUG,
00814                ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_subscription: ")
00815                ACE_TEXT("participant %C now ignoring subscription %C.\n"),
00816                std::string(part_converter).c_str(),
00817                std::string(ignore_converter).c_str()));
00818   }
00819 
00820   ignoredSubscriptions_.insert(id);
00821 
00822   // disassociate any publications
00823   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00824        current != this->publications_.end();
00825        ++current) {
00826     current->second->disassociate_subscription(id);
00827   }
00828 }
00829 
00830 CORBA::Boolean DCPS_IR_Participant::is_participant_ignored(OpenDDS::DCPS::RepoId id)
00831 {
00832   return (0 == ignoredParticipants_.find(id));
00833 }
00834 
00835 CORBA::Boolean DCPS_IR_Participant::is_topic_ignored(OpenDDS::DCPS::RepoId id)
00836 {
00837   return (0 == ignoredTopics_.find(id));
00838 }
00839 
00840 CORBA::Boolean DCPS_IR_Participant::is_publication_ignored(OpenDDS::DCPS::RepoId id)
00841 {
00842   return (0 == ignoredPublications_.find(id));
00843 }
00844 
00845 CORBA::Boolean DCPS_IR_Participant::is_subscription_ignored(OpenDDS::DCPS::RepoId id)
00846 {
00847   return (0 == ignoredSubscriptions_.find(id));
00848 }
00849 
00850 DDS::InstanceHandle_t DCPS_IR_Participant::get_handle()
00851 {
00852   return handle_;
00853 }
00854 
00855 void DCPS_IR_Participant::set_handle(DDS::InstanceHandle_t handle)
00856 {
00857   handle_ = handle;
00858 }
00859 
00860 const DDS::DomainParticipantQos* DCPS_IR_Participant::get_qos()
00861 {
00862   return &qos_;
00863 }
00864 
00865 bool DCPS_IR_Participant::set_qos(const DDS::DomainParticipantQos & qos)
00866 {
00867   // Do not need re-evaluate compatibility and associations when
00868   // DomainParticipantQos changes since only datareader and datawriter
00869   // QoS are evaludated during normal associations establishment.
00870 
00871   // Do not need publish the QoS change to topics or datareader or
00872   // datawriter BIT as they are independent.
00873   qos_ = qos;
00874   this->domain_->publish_participant_bit(this);
00875 
00876   return true;
00877 }
00878 
00879 CORBA::Boolean DCPS_IR_Participant::is_bit()
00880 {
00881   return isBIT_;
00882 }
00883 
00884 void DCPS_IR_Participant::set_bit_status(CORBA::Boolean isBIT)
00885 {
00886   isBIT_ = isBIT;
00887 }
00888 
00889 DCPS_IR_Domain* DCPS_IR_Participant::get_domain_reference() const
00890 {
00891   return domain_;
00892 }
00893 
00894 OpenDDS::DCPS::RepoId
00895 DCPS_IR_Participant::get_next_topic_id()
00896 {
00897   return this->topicIdGenerator_.next();
00898 }
00899 
00900 OpenDDS::DCPS::RepoId
00901 DCPS_IR_Participant::get_next_publication_id()
00902 {
00903   return this->publicationIdGenerator_.next();
00904 }
00905 
00906 OpenDDS::DCPS::RepoId
00907 DCPS_IR_Participant::get_next_subscription_id()
00908 {
00909   return this->subscriptionIdGenerator_.next();
00910 }
00911 
00912 void
00913 DCPS_IR_Participant::last_topic_key(long key)
00914 {
00915   return this->topicIdGenerator_.last(key);
00916 }
00917 
00918 void
00919 DCPS_IR_Participant::last_publication_key(long key)
00920 {
00921   return this->publicationIdGenerator_.last(key);
00922 }
00923 
00924 void
00925 DCPS_IR_Participant::last_subscription_key(long key)
00926 {
00927   return this->subscriptionIdGenerator_.last(key);
00928 }
00929 
00930 std::string
00931 DCPS_IR_Participant::dump_to_string(const std::string& prefix, int depth) const
00932 {
00933   std::string str;
00934 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00935   OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00936 
00937   for (int i=0; i < depth; i++)
00938     str += prefix;
00939   std::string indent = str + prefix;
00940   str += "DCPS_IR_Participant[";
00941   str += std::string(local_converter);
00942   str += "]";
00943   if (isBIT_)
00944     str += " (BIT)";
00945   std::ostringstream os;
00946   os << "federation id[" << federationId_.id();
00947   if (federationId_.overridden())
00948     os << "(federated)";
00949 
00950   os << "]  owner[" << owner_ << "]";
00951   str += os.str();
00952   str += aliveStatus_ ? " (alive)" : " (not alive)";
00953   str += "\n";
00954 
00955   str += indent + "Topics:\n";
00956   for (DCPS_IR_Topic_Map::const_iterator tm = topicRefs_.begin();
00957        tm != topicRefs_.end();
00958        tm++)
00959   {
00960     str += tm->second->dump_to_string(prefix, depth+1);
00961   }
00962 
00963   str += indent + "Publications:\n";
00964   for (DCPS_IR_Publication_Map::const_iterator pm = publications_.begin();
00965        pm != publications_.end();
00966        pm++)
00967   {
00968     str += pm->second->dump_to_string(prefix, depth+1);
00969   }
00970 
00971   str += indent + "Subscriptions:\n";
00972   for (DCPS_IR_Subscription_Map::const_iterator sm = subscriptions_.begin();
00973        sm != subscriptions_.end();
00974        sm++)
00975   {
00976     str += sm->second->dump_to_string(prefix, depth+1);
00977   }
00978 
00979   str += indent + "ignored Participants [ ";
00980   for (TAO_DDS_RepoId_Set::const_iterator ipart = ignoredParticipants_.begin();
00981        ipart != ignoredParticipants_.end();
00982        ipart++)
00983   {
00984     OpenDDS::DCPS::RepoIdConverter ipart_converter(*ipart);
00985     str += std::string(ipart_converter);
00986     str += " ";
00987   }
00988   str += "]\n";
00989   str += indent + "ignored Topics [ ";
00990 
00991   for (TAO_DDS_RepoId_Set::const_iterator itop = ignoredTopics_.begin();
00992        itop != ignoredTopics_.end();
00993        itop++)
00994   {
00995     OpenDDS::DCPS::RepoIdConverter itop_converter(*itop);
00996     str += std::string(itop_converter);
00997     str += " ";
00998   }
00999   str += "]\n";
01000   str += indent + "ignored Publications [ ";
01001 
01002   for (TAO_DDS_RepoId_Set::const_iterator ipub = ignoredPublications_.begin();
01003        ipub != ignoredPublications_.end();
01004        ipub++)
01005   {
01006     OpenDDS::DCPS::RepoIdConverter ipub_converter(*ipub);
01007     str += std::string(ipub_converter);
01008     str += " ";
01009   }
01010   str += "]\n";
01011   str += indent + "ignored Subscriptions [ ";
01012 
01013   for (TAO_DDS_RepoId_Set::const_iterator isub = ignoredSubscriptions_.begin();
01014        isub != ignoredSubscriptions_.end();
01015        isub++)
01016   {
01017     OpenDDS::DCPS::RepoIdConverter isub_converter(*isub);
01018     str += std::string(isub_converter);
01019     str += " ";
01020   }
01021   str += "]\n";
01022 
01023 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
01024   return str;
01025 }
01026 
01027 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
01028 
01029 template class ACE_Map_Entry<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*>;
01030 template class ACE_Map_Manager<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*,ACE_Null_Mutex>;
01031 template class ACE_Map_Iterator_Base<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*,ACE_Null_Mutex>;
01032 template class ACE_Map_Iterator<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*,ACE_Null_Mutex>;
01033 template class ACE_Map_Reverse_Iterator<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*,ACE_Null_Mutex>;
01034 
01035 template class ACE_Map_Entry<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*>;
01036 template class ACE_Map_Manager<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*,ACE_Null_Mutex>;
01037 template class ACE_Map_Iterator_Base<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*,ACE_Null_Mutex>;
01038 template class ACE_Map_Iterator<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*,ACE_Null_Mutex>;
01039 template class ACE_Map_Reverse_Iterator<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*,ACE_Null_Mutex>;
01040 
01041 template class ACE_Node<OpenDDS::DCPS::RepoId>;
01042 template class ACE_Unbounded_Set<OpenDDS::DCPS::RepoId>;
01043 template class ACE_Unbounded_Set_Iterator<OpenDDS::DCPS::RepoId>;
01044 
01045 #elif defined(ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
01046 
01047 #pragma instantiate ACE_Map_Entry<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*>
01048 #pragma instantiate ACE_Map_Manager<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*,ACE_Null_Mutex>
01049 #pragma instantiate ACE_Map_Iterator_Base<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*,ACE_Null_Mutex>
01050 #pragma instantiate ACE_Map_Iterator<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*,ACE_Null_Mutex>
01051 #pragma instantiate ACE_Map_Reverse_Iterator<OpenDDS::DCPS::RepoId,DCPS_IR_Publication*,ACE_Null_Mutex>
01052 
01053 #pragma instantiate ACE_Map_Entry<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*>
01054 #pragma instantiate ACE_Map_Manager<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*,ACE_Null_Mutex>
01055 #pragma instantiate ACE_Map_Iterator_Base<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*,ACE_Null_Mutex>
01056 #pragma instantiate ACE_Map_Iterator<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*,ACE_Null_Mutex>
01057 #pragma instantiate ACE_Map_Reverse_Iterator<OpenDDS::DCPS::RepoId,DCPS_IR_Subscription*,ACE_Null_Mutex>
01058 
01059 #pragma instantiate ACE_Node<OpenDDS::DCPS::RepoId>
01060 #pragma instantiate ACE_Unbounded_Set<OpenDDS::DCPS::RepoId>
01061 #pragma instantiate ACE_Unbounded_Set_Iterator<OpenDDS::DCPS::RepoId>
01062 
01063 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7