DCPS_IR_Participant.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include /**/ "DCPS_IR_Participant.h"
00010 #include "FederationId.h"
00011 #include "UpdateManager.h"
00012 
00013 #include /**/ "DCPS_IR_Domain.h"
00014 #include /**/ "DCPS_IR_Subscription.h"
00015 #include /**/ "DCPS_IR_Publication.h"
00016 #include /**/ "DCPS_IR_Topic.h"
00017 #include /**/ "DCPS_IR_Topic_Description.h"
00018 
00019 #include /**/ "dds/DCPS/RepoIdConverter.h"
00020 #include <sstream>
00021 
00022 #include /**/ "tao/debug.h"
00023 
00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 DCPS_IR_Participant::DCPS_IR_Participant(const TAO_DDS_DCPSFederationId& federationId,
00027                                          OpenDDS::DCPS::RepoId id,
00028                                          DCPS_IR_Domain* domain,
00029                                          DDS::DomainParticipantQos qos,
00030                                          Update::Manager* um,
00031                                          bool isBit)
00032   : id_(id),
00033     domain_(domain),
00034     qos_(qos),
00035     aliveStatus_(1),
00036     handle_(0),
00037     federationId_(federationId),
00038     owner_(federationId.overridden() ? OWNER_NONE : federationId.id()),
00039     topicIdGenerator_(
00040       federationId.id(),
00041       OpenDDS::DCPS::RepoIdConverter(id).participantId(),
00042       isBit ? OpenDDS::DCPS::KIND_BUILTIN_TOPIC : OpenDDS::DCPS::KIND_TOPIC),
00043     publicationIdGenerator_(
00044       federationId.id(),
00045       OpenDDS::DCPS::RepoIdConverter(id).participantId(),
00046       isBit ? OpenDDS::DCPS::KIND_BUILTIN_WRITER : OpenDDS::DCPS::KIND_WRITER),
00047     subscriptionIdGenerator_(
00048       federationId.id(),
00049       OpenDDS::DCPS::RepoIdConverter(id).participantId(),
00050       isBit ? OpenDDS::DCPS::KIND_BUILTIN_READER : OpenDDS::DCPS::KIND_READER),
00051     um_(um),
00052     isBitPublisher_(isBit)
00053 {
00054 }
00055 
00056 DCPS_IR_Participant::~DCPS_IR_Participant()
00057 {
00058 }
00059 
00060 const DCPS_IR_Publication_Map&
00061 DCPS_IR_Participant::publications() const
00062 {
00063   return this->publications_;
00064 }
00065 
00066 const DCPS_IR_Subscription_Map&
00067 DCPS_IR_Participant::subscriptions() const
00068 {
00069   return this->subscriptions_;
00070 }
00071 
00072 const DCPS_IR_Topic_Map&
00073 DCPS_IR_Participant::topics() const
00074 {
00075   return this->topicRefs_;
00076 }
00077 
00078 void
00079 DCPS_IR_Participant::takeOwnership()
00080 {
00081   /// Publish an update with our ownership.
00082   if (this->um_ && (this->isBitPublisher() == false)) {
00083     this->um_->create(
00084       Update::OwnershipData(
00085         this->domain_->get_id(),
00086         this->id_,
00087         this->federationId_.id()));
00088 
00089     if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00090       OpenDDS::DCPS::RepoIdConverter converter(id_);
00091       ACE_DEBUG((LM_DEBUG,
00092                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::take_ownership: ")
00093                  ACE_TEXT("pushing ownership %C in domain %d.\n"),
00094                  std::string(converter).c_str(),
00095                  this->domain_->get_id()));
00096     }
00097   }
00098 
00099   // And now handle our internal ownership processing.
00100   this->changeOwner(this->federationId_.id(), this->federationId_.id());
00101 }
00102 
00103 void
00104 DCPS_IR_Participant::changeOwner(long sender, long owner)
00105 {
00106   {
00107     ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ownerLock_);
00108 
00109     if ((owner == OWNER_NONE)
00110         && (this->isOwner() || (this->owner_ != sender))) {
00111       // Do not eliminate ownership if we are the owner or if the update
00112       // does not come from the current owner.
00113       return;
00114     }
00115 
00116     // Finally.  Change the value.
00117     this->owner_ = owner;
00118 
00119   } // End of lock scope.
00120 
00121   if (this->isOwner()) {
00122     /// @TODO: Ensure that any stalled callbacks are made.
00123   }
00124 }
00125 
00126 long
00127 DCPS_IR_Participant::owner() const
00128 {
00129   return this->owner_;
00130 }
00131 
00132 bool
00133 DCPS_IR_Participant::isOwner() const
00134 {
00135   return this->owner_ == this->federationId_.id();
00136 }
00137 
00138 bool&
00139 DCPS_IR_Participant::isBitPublisher()
00140 {
00141   return this->isBitPublisher_;
00142 }
00143 
00144 bool
00145 DCPS_IR_Participant::isBitPublisher() const
00146 {
00147   return this->isBitPublisher_;
00148 }
00149 
00150 int DCPS_IR_Participant::add_publication(OpenDDS::DCPS::unique_ptr<DCPS_IR_Publication> pub)
00151 {
00152   OpenDDS::DCPS::RepoId pubId = pub->get_id();
00153   DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
00154 
00155   if (where == this->publications_.end()) {
00156     DCPS_IR_Publication* pubptr = pub.get();
00157     this->publications_.insert(
00158       where, DCPS_IR_Publication_Map::value_type(pubId, OpenDDS::DCPS::move(pub)));
00159 
00160     if (isBitPublisher_) {
00161       pubptr->set_bit_status(isBitPublisher_);
00162     }
00163 
00164     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00165       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00166       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00167       ACE_DEBUG((LM_DEBUG,
00168                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_publication: ")
00169                  ACE_TEXT("participant %C successfully added publication %C at 0x%x.\n"),
00170                  std::string(part_converter).c_str(),
00171                  std::string(pub_converter).c_str(),
00172                  pubptr));
00173     }
00174 
00175     return 0;
00176 
00177   } else {
00178     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00179       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00180       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00181       ACE_ERROR((LM_NOTICE,
00182                  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_publication: ")
00183                  ACE_TEXT("participant %C attempted to add existing publication %C.\n"),
00184                  std::string(part_converter).c_str(),
00185                  std::string(pub_converter).c_str()));
00186     }
00187 
00188     return 1;
00189   }
00190 }
00191 
00192 int DCPS_IR_Participant::find_publication_reference(OpenDDS::DCPS::RepoId pubId,
00193                                                     DCPS_IR_Publication* & pub)
00194 {
00195   DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
00196 
00197   if (where != this->publications_.end()) {
00198     pub = where->second.get();
00199 
00200     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00201       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00202       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00203       ACE_DEBUG((LM_DEBUG,
00204                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_publication_reference: ")
00205                  ACE_TEXT("participant %C found publication %C at 0x%x.\n"),
00206                  std::string(part_converter).c_str(),
00207                  std::string(pub_converter).c_str(),
00208                  pub));
00209     }
00210 
00211     return 0;
00212 
00213   } else {
00214     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00215       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00216       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00217       ACE_DEBUG((LM_DEBUG,
00218                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_publication_reference: ")
00219                  ACE_TEXT("participant %C could not find publication %C.\n"),
00220                  std::string(part_converter).c_str(),
00221                  std::string(pub_converter).c_str()));
00222     }
00223     pub = 0;
00224     return -1;
00225   }
00226 }
00227 
00228 int DCPS_IR_Participant::remove_publication(OpenDDS::DCPS::RepoId pubId)
00229 {
00230   DCPS_IR_Publication_Map::iterator where = this->publications_.find(pubId);
00231 
00232   if (where != this->publications_.end()) {
00233     DCPS_IR_Topic* topic = where->second->get_topic();
00234     topic->remove_publication_reference(where->second.get());
00235 
00236     if (0 != where->second->remove_associations(false)) {
00237       // N.B. As written today, this branch will never be taken.
00238       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00239       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00240       ACE_ERROR((LM_ERROR,
00241                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_publication: ")
00242                  ACE_TEXT("participant %C unable to remove associations from publication %C\n"),
00243                  std::string(part_converter).c_str(),
00244                  std::string(pub_converter).c_str()));
00245       return -1;
00246     }
00247 
00248     this->domain_->dispose_publication_bit(where->second.get());
00249     topic->release(false);
00250     this->publications_.erase(where);
00251 
00252     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00253       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00254       OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00255       ACE_DEBUG((LM_DEBUG,
00256                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_publication: ")
00257                  ACE_TEXT("participant %C removed publication %C.\n"),
00258                  std::string(part_converter).c_str(),
00259                  std::string(pub_converter).c_str()));
00260     }
00261 
00262     return 0;
00263 
00264   } else {
00265     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00266     OpenDDS::DCPS::RepoIdConverter pub_converter(pubId);
00267     ACE_ERROR((LM_ERROR,
00268                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_publication: ")
00269                ACE_TEXT("participant %C unable to remove publication %C.\n"),
00270                std::string(part_converter).c_str(),
00271                std::string(pub_converter).c_str()));
00272     return -1;
00273   }
00274 }
00275 
00276 int DCPS_IR_Participant::add_subscription(OpenDDS::DCPS::unique_ptr<DCPS_IR_Subscription> sub)
00277 {
00278   OpenDDS::DCPS::RepoId subId = sub->get_id();
00279   DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
00280 
00281   if (where == this->subscriptions_.end()) {
00282 
00283     DCPS_IR_Subscription* subptr = sub.get();
00284     this->subscriptions_.insert(
00285       where, DCPS_IR_Subscription_Map::value_type(subId, OpenDDS::DCPS::move(sub)));
00286 
00287     if (isBitPublisher_) {
00288       subptr->set_bit_status(isBitPublisher_);
00289     }
00290 
00291     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00292       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00293       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00294       ACE_DEBUG((LM_DEBUG,
00295                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_subscription: ")
00296                  ACE_TEXT("participant %C successfully added subscription %C at 0x%x.\n"),
00297                  std::string(part_converter).c_str(),
00298                  std::string(sub_converter).c_str(),
00299                  subptr));
00300     }
00301 
00302     return 0;
00303 
00304   } else {
00305     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00306       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00307       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00308       ACE_ERROR((LM_NOTICE,
00309                  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_subscription: ")
00310                  ACE_TEXT("participant %C attempted to add existing subscription %C.\n"),
00311                  std::string(part_converter).c_str(),
00312                  std::string(sub_converter).c_str()));
00313     }
00314 
00315     return 1;
00316   }
00317 }
00318 
00319 int DCPS_IR_Participant::find_subscription_reference(OpenDDS::DCPS::RepoId subId,
00320                                                      DCPS_IR_Subscription*& sub)
00321 {
00322   DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
00323 
00324   if (where != this->subscriptions_.end()) {
00325     sub = where->second.get();
00326 
00327     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00328       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00329       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00330       ACE_DEBUG((LM_DEBUG,
00331                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_subscription_reference: ")
00332                  ACE_TEXT("participant %C found subscription %C at 0x%x.\n"),
00333                  std::string(part_converter).c_str(),
00334                  std::string(sub_converter).c_str(),
00335                  sub));
00336     }
00337 
00338     return 0;
00339 
00340   } else {
00341     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00342       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00343       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00344       ACE_DEBUG((LM_DEBUG,
00345                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_subscription_reference: ")
00346                  ACE_TEXT("participant %C could not find subscription %C.\n"),
00347                  std::string(part_converter).c_str(),
00348                  std::string(sub_converter).c_str()));
00349     }
00350     sub = 0;
00351     return -1;
00352   }
00353 }
00354 
00355 int DCPS_IR_Participant::remove_subscription(OpenDDS::DCPS::RepoId subId)
00356 {
00357   DCPS_IR_Subscription_Map::iterator where = this->subscriptions_.find(subId);
00358 
00359   if (where != this->subscriptions_.end()) {
00360     DCPS_IR_Topic* topic = where->second->get_topic();
00361     topic->remove_subscription_reference(where->second.get());
00362 
00363     if (0 != where->second->remove_associations(false)) {
00364       // N.B. As written today, this branch will never be taken.
00365       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00366       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00367       ACE_ERROR((LM_ERROR,
00368                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_subscription: ")
00369                  ACE_TEXT("participant %C unable to remove associations from subscription %C\n"),
00370                  std::string(part_converter).c_str(),
00371                  std::string(sub_converter).c_str()));
00372       return -1;
00373     }
00374 
00375     this->domain_->dispose_subscription_bit(where->second.get());
00376     topic->release(false);
00377     this->subscriptions_.erase(where);
00378 
00379     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00380       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00381       OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00382       ACE_DEBUG((LM_DEBUG,
00383                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_subscription: ")
00384                  ACE_TEXT("participant %C removed subscription %C.\n"),
00385                  std::string(part_converter).c_str(),
00386                  std::string(sub_converter).c_str()));
00387     }
00388 
00389     return 0;
00390 
00391   } else {
00392     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00393     OpenDDS::DCPS::RepoIdConverter sub_converter(subId);
00394     ACE_ERROR((LM_ERROR,
00395                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::remove_subscription: ")
00396                ACE_TEXT("participant %C unable to remove subscription %C.\n"),
00397                std::string(part_converter).c_str(),
00398                std::string(sub_converter).c_str()));
00399     return -1;
00400   }
00401 }
00402 
00403 int DCPS_IR_Participant::add_topic_reference(DCPS_IR_Topic* topic)
00404 {
00405   OpenDDS::DCPS::RepoId topicId = topic->get_id();
00406   DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
00407 
00408   if (where == this->topicRefs_.end()) {
00409     this->topicRefs_.insert(
00410       where, DCPS_IR_Topic_Map::value_type(topicId, topic));
00411 
00412     if (isBitPublisher_) {
00413       topic->set_bit_status(isBitPublisher_);
00414     }
00415 
00416     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00417       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00418       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00419       ACE_DEBUG((LM_DEBUG,
00420                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::add_topic_reference: ")
00421                  ACE_TEXT("participant %C successfully added topic %C at 0x%x.\n"),
00422                  std::string(part_converter).c_str(),
00423                  std::string(topic_converter).c_str(),
00424                  topic));
00425     }
00426 
00427     return 0;
00428 
00429   } else {
00430     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00431       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00432       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00433       ACE_DEBUG((LM_NOTICE,
00434                  ACE_TEXT("(%P|%t) NOTICE: DCPS_IR_Participant::add_topic_reference: ")
00435                  ACE_TEXT("participant %C attempted to add existing topic %C.\n"),
00436                  std::string(part_converter).c_str(),
00437                  std::string(topic_converter).c_str()));
00438     }
00439 
00440     return 1;
00441   }
00442 }
00443 
00444 int DCPS_IR_Participant::remove_topic_reference(OpenDDS::DCPS::RepoId topicId,
00445                                                 DCPS_IR_Topic*& topic)
00446 {
00447   DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
00448 
00449   if (where != this->topicRefs_.end()) {
00450     topic = where->second;
00451     this->topicRefs_.erase(where);
00452 
00453     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00454       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00455       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00456       ACE_DEBUG((LM_DEBUG,
00457                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_topic_reference: ")
00458                  ACE_TEXT("participant %C removed topic %C at 0x%x.\n"),
00459                  std::string(part_converter).c_str(),
00460                  std::string(topic_converter).c_str(),
00461                  topic));
00462     }
00463 
00464     return 0;
00465 
00466   } else {
00467     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00468       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00469       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00470       ACE_ERROR((LM_WARNING,
00471                  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Participant::remove_topic_reference: ")
00472                  ACE_TEXT("participant %C unable to find topic %C for removal.\n"),
00473                  std::string(part_converter).c_str(),
00474                  std::string(topic_converter).c_str()));
00475     }
00476 
00477     return -1;
00478   }
00479 }
00480 
00481 int DCPS_IR_Participant::find_topic_reference(OpenDDS::DCPS::RepoId topicId,
00482                                               DCPS_IR_Topic*& topic)
00483 {
00484   DCPS_IR_Topic_Map::iterator where = this->topicRefs_.find(topicId);
00485 
00486   if (where != this->topicRefs_.end()) {
00487     topic = where->second;
00488 
00489     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00490       OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00491       OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00492       ACE_DEBUG((LM_DEBUG,
00493                  ACE_TEXT("(%P|%t) DCPS_IR_Participant::find_topic_reference: ")
00494                  ACE_TEXT("participant %C found topic %C at %x.\n"),
00495                  std::string(part_converter).c_str(),
00496                  std::string(topic_converter).c_str(),
00497                  topic));
00498     }
00499 
00500     return 0;
00501 
00502   } else {
00503     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00504     OpenDDS::DCPS::RepoIdConverter topic_converter(topicId);
00505     ACE_ERROR((LM_ERROR,
00506                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Participant::find_topic_reference: ")
00507                ACE_TEXT("participant %C unable to find topic %C.\n"),
00508                std::string(part_converter).c_str(),
00509                std::string(topic_converter).c_str()));
00510     topic = 0;
00511     return -1;
00512   }
00513 }
00514 
00515 void DCPS_IR_Participant::remove_all_dependents(CORBA::Boolean notify_lost)
00516 {
00517   // remove all the publications associations
00518   {
00519     DCPS_IR_Publication_Map::const_iterator next = this->publications_.begin();
00520 
00521     while (next != this->publications_.end()) {
00522       DCPS_IR_Publication_Map::const_iterator current = next;
00523       ++ next;
00524       DCPS_IR_Topic* topic = current->second->get_topic();
00525       topic->remove_publication_reference(current->second.get());
00526 
00527       if (0 != current->second->remove_associations(notify_lost)) {
00528         return;
00529       }
00530 
00531       topic->release(false);
00532     }
00533   }
00534 
00535   {
00536     DCPS_IR_Subscription_Map::const_iterator next = this->subscriptions_.begin();
00537 
00538     while (next != this->subscriptions_.end()) {
00539       DCPS_IR_Subscription_Map::const_iterator current = next;
00540       ++ next;
00541       DCPS_IR_Topic* topic = current->second->get_topic();
00542       topic->remove_subscription_reference(current->second.get());
00543 
00544       if (0 != current->second->remove_associations(notify_lost)) {
00545         return;
00546       }
00547 
00548       topic->release(false);
00549     }
00550   }
00551 
00552   {
00553     DCPS_IR_Topic_Map::const_iterator next = this->topicRefs_.begin();
00554 
00555     while (next != this->topicRefs_.end()) {
00556       DCPS_IR_Topic_Map::const_iterator current = next;
00557       ++ next;
00558 
00559       // Notify the federation to remove the topic.
00560       if (this->um_ && (this->isBitPublisher() == false)) {
00561         Update::IdPath path(
00562           this->domain_->get_id(),
00563           this->get_id(),
00564           current->second->get_id());
00565         this->um_->destroy(path, Update::Topic);
00566 
00567         if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00568           OpenDDS::DCPS::RepoId id = current->second->get_id();
00569           OpenDDS::DCPS::RepoIdConverter converter(id);
00570           ACE_DEBUG((LM_DEBUG,
00571                      ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
00572                      ACE_TEXT("pushing deletion of topic %C in domain %d.\n"),
00573                      std::string(converter).c_str(),
00574                      this->domain_->get_id()));
00575         }
00576 
00577         // Remove the topic ourselves.
00578         // N.B. This call sets the second (reference) argument to 0, so when
00579         //      clear() is called below, no destructor is (re)called.
00580 
00581         // Get the topic id and topic point before remove_topic since it
00582         // invalidates the iterator. Accessing after removal got SEGV.
00583         OpenDDS::DCPS::RepoId id = current->first;
00584         DCPS_IR_Topic* topic = current->second;
00585 
00586         this->domain_->remove_topic(this, topic);
00587 
00588         if (OpenDDS::DCPS::DCPS_debug_level > 9) {
00589           OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00590           OpenDDS::DCPS::RepoIdConverter topic_converter(id);
00591           ACE_DEBUG((LM_DEBUG,
00592                      ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
00593                      ACE_TEXT("domain %d participant %C removed topic %C.\n"),
00594                      this->domain_->get_id(),
00595                      std::string(part_converter).c_str(),
00596                      std::string(topic_converter).c_str()));
00597         }
00598       }
00599     }
00600   }
00601 
00602   // Clear the Topic container of null pointers.
00603   this->topicRefs_.clear();
00604 
00605   // The publications and subscriptions can NOT be deleted until after all
00606   // the associations have been removed.  Otherwise an access violation
00607   // can occur because a publication and subscription of this participant
00608   // could be associated.
00609 
00610   // delete all the publications
00611   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00612        current != this->publications_.end();
00613        ++current) {
00614     // Notify the federation to destroy the publication.
00615     if (this->um_ && (this->isBitPublisher() == false)) {
00616       Update::IdPath path(
00617         this->domain_->get_id(),
00618         this->get_id(),
00619         current->second->get_id());
00620       this->um_->destroy(path, Update::Actor, Update::DataWriter);
00621 
00622       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00623         OpenDDS::DCPS::RepoId id = current->second->get_id();
00624         OpenDDS::DCPS::RepoIdConverter converter(id);
00625         ACE_DEBUG((LM_DEBUG,
00626                    ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
00627                    ACE_TEXT("pushing deletion of publication %C in domain %d.\n"),
00628                    std::string(converter).c_str(),
00629                    this->domain_->get_id()));
00630       }
00631     }
00632   }
00633 
00634   // Clear the container.
00635   this->publications_.clear();
00636 
00637   // delete all the subscriptions
00638   for (DCPS_IR_Subscription_Map::const_iterator current
00639        = this->subscriptions_.begin();
00640        current != this->subscriptions_.end();
00641        ++current) {
00642     // Notify the federation to destroy the subscription.
00643     if (this->um_ && (this->isBitPublisher() == false)) {
00644       Update::IdPath path(
00645         this->domain_->get_id(),
00646         this->get_id(),
00647         current->second->get_id());
00648       this->um_->destroy(path, Update::Actor, Update::DataReader);
00649 
00650       if (OpenDDS::DCPS::DCPS_debug_level > 4) {
00651         OpenDDS::DCPS::RepoId id = current->second->get_id();
00652         OpenDDS::DCPS::RepoIdConverter converter(id);
00653         ACE_DEBUG((LM_DEBUG,
00654                    ACE_TEXT("(%P|%t) DCPS_IR_Participant::remove_all_dependents: ")
00655                    ACE_TEXT("pushing deletion of subscription %C in domain %d.\n"),
00656                    std::string(converter).c_str(),
00657                    this->domain_->get_id()));
00658       }
00659     }
00660   }
00661 
00662   // Clear the container.
00663   this->subscriptions_.clear();
00664 }
00665 
00666 void DCPS_IR_Participant::mark_dead()
00667 {
00668   aliveStatus_ = 0;
00669   domain_->add_dead_participant(OpenDDS::DCPS::rchandle_from(this));
00670 }
00671 
00672 OpenDDS::DCPS::RepoId DCPS_IR_Participant::get_id()
00673 {
00674   return id_;
00675 }
00676 
00677 CORBA::Boolean DCPS_IR_Participant::is_alive()
00678 {
00679   return aliveStatus_;
00680 }
00681 
00682 void DCPS_IR_Participant::set_alive(CORBA::Boolean alive)
00683 {
00684   aliveStatus_ = alive;
00685 }
00686 
00687 void DCPS_IR_Participant::ignore_participant(OpenDDS::DCPS::RepoId id)
00688 {
00689   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00690     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00691     OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
00692     ACE_DEBUG((LM_DEBUG,
00693                ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_participant: ")
00694                ACE_TEXT("participant %C now ignoring participant %C.\n"),
00695                std::string(part_converter).c_str(),
00696                std::string(ignore_converter).c_str()));
00697   }
00698 
00699   ignoredParticipants_.insert(id);
00700 
00701   // disassociate any publications
00702   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00703        current != this->publications_.end();
00704        ++current) {
00705     current->second->disassociate_participant(id);
00706   }
00707 
00708   // disassociate any subscriptions
00709   for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
00710        current != this->subscriptions_.end();
00711        ++current) {
00712     current->second->disassociate_participant(id);
00713   }
00714 }
00715 
00716 void DCPS_IR_Participant::ignore_topic(OpenDDS::DCPS::RepoId id)
00717 {
00718   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00719     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00720     OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
00721     ACE_DEBUG((LM_DEBUG,
00722                ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_topic: ")
00723                ACE_TEXT("participant %C now ignoring topic %C.\n"),
00724                std::string(part_converter).c_str(),
00725                std::string(ignore_converter).c_str()));
00726   }
00727 
00728   ignoredTopics_.insert(id);
00729 
00730   // disassociate any publications
00731   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00732        current != this->publications_.end();
00733        ++current) {
00734     current->second->disassociate_topic(id);
00735   }
00736 
00737   // disassociate any subscriptions
00738   for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
00739        current != this->subscriptions_.end();
00740        ++current) {
00741     current->second->disassociate_topic(id);
00742   }
00743 }
00744 
00745 void DCPS_IR_Participant::ignore_publication(OpenDDS::DCPS::RepoId id)
00746 {
00747   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00748     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00749     OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
00750     ACE_DEBUG((LM_DEBUG,
00751                ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_publication: ")
00752                ACE_TEXT("participant %C now ignoring publication %C.\n"),
00753                std::string(part_converter).c_str(),
00754                std::string(ignore_converter).c_str()));
00755   }
00756 
00757   ignoredPublications_.insert(id);
00758 
00759   // disassociate any subscriptions
00760   for (DCPS_IR_Subscription_Map::const_iterator current = this->subscriptions_.begin();
00761        current != this->subscriptions_.end();
00762        ++current) {
00763     current->second->disassociate_publication(id);
00764   }
00765 }
00766 
00767 void DCPS_IR_Participant::ignore_subscription(OpenDDS::DCPS::RepoId id)
00768 {
00769   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00770     OpenDDS::DCPS::RepoIdConverter part_converter(id_);
00771     OpenDDS::DCPS::RepoIdConverter ignore_converter(id);
00772     ACE_DEBUG((LM_DEBUG,
00773                ACE_TEXT("(%P|%t) DCPS_IR_Participant::ignore_subscription: ")
00774                ACE_TEXT("participant %C now ignoring subscription %C.\n"),
00775                std::string(part_converter).c_str(),
00776                std::string(ignore_converter).c_str()));
00777   }
00778 
00779   ignoredSubscriptions_.insert(id);
00780 
00781   // disassociate any publications
00782   for (DCPS_IR_Publication_Map::const_iterator current = this->publications_.begin();
00783        current != this->publications_.end();
00784        ++current) {
00785     current->second->disassociate_subscription(id);
00786   }
00787 }
00788 
00789 CORBA::Boolean DCPS_IR_Participant::is_participant_ignored(OpenDDS::DCPS::RepoId id)
00790 {
00791   return (0 == ignoredParticipants_.find(id));
00792 }
00793 
00794 CORBA::Boolean DCPS_IR_Participant::is_topic_ignored(OpenDDS::DCPS::RepoId id)
00795 {
00796   return (0 == ignoredTopics_.find(id));
00797 }
00798 
00799 CORBA::Boolean DCPS_IR_Participant::is_publication_ignored(OpenDDS::DCPS::RepoId id)
00800 {
00801   return (0 == ignoredPublications_.find(id));
00802 }
00803 
00804 CORBA::Boolean DCPS_IR_Participant::is_subscription_ignored(OpenDDS::DCPS::RepoId id)
00805 {
00806   return (0 == ignoredSubscriptions_.find(id));
00807 }
00808 
00809 DDS::InstanceHandle_t DCPS_IR_Participant::get_handle()
00810 {
00811   return handle_;
00812 }
00813 
00814 void DCPS_IR_Participant::set_handle(DDS::InstanceHandle_t handle)
00815 {
00816   handle_ = handle;
00817 }
00818 
00819 const DDS::DomainParticipantQos* DCPS_IR_Participant::get_qos()
00820 {
00821   return &qos_;
00822 }
00823 
00824 bool DCPS_IR_Participant::set_qos(const DDS::DomainParticipantQos & qos)
00825 {
00826   // Do not need re-evaluate compatibility and associations when
00827   // DomainParticipantQos changes since only datareader and datawriter
00828   // QoS are evaludated during normal associations establishment.
00829 
00830   // Do not need publish the QoS change to topics or datareader or
00831   // datawriter BIT as they are independent.
00832   qos_ = qos;
00833   this->domain_->publish_participant_bit(this);
00834 
00835   return true;
00836 }
00837 
00838 DCPS_IR_Domain* DCPS_IR_Participant::get_domain_reference() const
00839 {
00840   return domain_;
00841 }
00842 
00843 OpenDDS::DCPS::RepoId
00844 DCPS_IR_Participant::get_next_topic_id(bool builtin)
00845 {
00846   return this->topicIdGenerator_.next(builtin);
00847 }
00848 
00849 OpenDDS::DCPS::RepoId
00850 DCPS_IR_Participant::get_next_publication_id(bool builtin)
00851 {
00852   return this->publicationIdGenerator_.next(builtin);
00853 }
00854 
00855 OpenDDS::DCPS::RepoId
00856 DCPS_IR_Participant::get_next_subscription_id(bool builtin)
00857 {
00858   return this->subscriptionIdGenerator_.next(builtin);
00859 }
00860 
00861 void
00862 DCPS_IR_Participant::last_topic_key(long key)
00863 {
00864   return this->topicIdGenerator_.last(key);
00865 }
00866 
00867 void
00868 DCPS_IR_Participant::last_publication_key(long key)
00869 {
00870   return this->publicationIdGenerator_.last(key);
00871 }
00872 
00873 void
00874 DCPS_IR_Participant::last_subscription_key(long key)
00875 {
00876   return this->subscriptionIdGenerator_.last(key);
00877 }
00878 
00879 std::string
00880 DCPS_IR_Participant::dump_to_string(const std::string& prefix, int depth) const
00881 {
00882   std::string str;
00883 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00884   OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00885 
00886   for (int i=0; i < depth; i++)
00887     str += prefix;
00888   std::string indent = str + prefix;
00889   str += "DCPS_IR_Participant[";
00890   str += std::string(local_converter);
00891   str += "]";
00892   if (isBitPublisher_)
00893     str += " (BIT)";
00894   std::ostringstream os;
00895   os << "federation id[" << federationId_.id();
00896   if (federationId_.overridden())
00897     os << "(federated)";
00898 
00899   os << "]  owner[" << owner_ << "]";
00900   str += os.str();
00901   str += aliveStatus_ ? " (alive)" : " (not alive)";
00902   str += "\n";
00903 
00904   str += indent + "Topics:\n";
00905   for (DCPS_IR_Topic_Map::const_iterator tm = topicRefs_.begin();
00906        tm != topicRefs_.end();
00907        tm++)
00908   {
00909     str += tm->second->dump_to_string(prefix, depth+1);
00910   }
00911 
00912   str += indent + "Publications:\n";
00913   for (DCPS_IR_Publication_Map::const_iterator pm = publications_.begin();
00914        pm != publications_.end();
00915        pm++)
00916   {
00917     str += pm->second->dump_to_string(prefix, depth+1);
00918   }
00919 
00920   str += indent + "Subscriptions:\n";
00921   for (DCPS_IR_Subscription_Map::const_iterator sm = subscriptions_.begin();
00922        sm != subscriptions_.end();
00923        sm++)
00924   {
00925     str += sm->second->dump_to_string(prefix, depth+1);
00926   }
00927 
00928   str += indent + "ignored Participants [ ";
00929   for (TAO_DDS_RepoId_Set::const_iterator ipart = ignoredParticipants_.begin();
00930        ipart != ignoredParticipants_.end();
00931        ipart++)
00932   {
00933     OpenDDS::DCPS::RepoIdConverter ipart_converter(*ipart);
00934     str += std::string(ipart_converter);
00935     str += " ";
00936   }
00937   str += "]\n";
00938   str += indent + "ignored Topics [ ";
00939 
00940   for (TAO_DDS_RepoId_Set::const_iterator itop = ignoredTopics_.begin();
00941        itop != ignoredTopics_.end();
00942        itop++)
00943   {
00944     OpenDDS::DCPS::RepoIdConverter itop_converter(*itop);
00945     str += std::string(itop_converter);
00946     str += " ";
00947   }
00948   str += "]\n";
00949   str += indent + "ignored Publications [ ";
00950 
00951   for (TAO_DDS_RepoId_Set::const_iterator ipub = ignoredPublications_.begin();
00952        ipub != ignoredPublications_.end();
00953        ipub++)
00954   {
00955     OpenDDS::DCPS::RepoIdConverter ipub_converter(*ipub);
00956     str += std::string(ipub_converter);
00957     str += " ";
00958   }
00959   str += "]\n";
00960   str += indent + "ignored Subscriptions [ ";
00961 
00962   for (TAO_DDS_RepoId_Set::const_iterator isub = ignoredSubscriptions_.begin();
00963        isub != ignoredSubscriptions_.end();
00964        isub++)
00965   {
00966     OpenDDS::DCPS::RepoIdConverter isub_converter(*isub);
00967     str += std::string(isub_converter);
00968     str += " ";
00969   }
00970   str += "]\n";
00971 
00972 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00973   return str;
00974 }
00975 
00976 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00977 
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1