DCPS_IR_Topic.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include /**/ "DCPS_IR_Topic.h"
00011 #include /**/ "DCPS_IR_Domain.h"
00012 
00013 #include /**/ "DCPS_IR_Subscription.h"
00014 #include /**/ "DCPS_IR_Publication.h"
00015 #include /**/ "DCPS_IR_Participant.h"
00016 #include /**/ "DCPS_IR_Topic_Description.h"
00017 
00018 #include /**/ "dds/DCPS/DCPS_Utils.h"
00019 #include /**/ "dds/DCPS/RepoIdConverter.h"
00020 #include /**/ "dds/DCPS/Qos_Helper.h"
00021 #include /**/ "tao/debug.h"
00022 
00023 DCPS_IR_Topic::DCPS_IR_Topic(const OpenDDS::DCPS::RepoId& id,
00024                              const DDS::TopicQos& qos,
00025                              DCPS_IR_Domain* domain,
00026                              DCPS_IR_Participant* creator,
00027                              DCPS_IR_Topic_Description* description)
00028   : id_(id),
00029     qos_(qos),
00030     domain_(domain),
00031     participant_(creator),
00032     description_(description),
00033     handle_(0),
00034     isBIT_(0),
00035     removed_(false)
00036 {
00037 }
00038 
00039 DCPS_IR_Topic::~DCPS_IR_Topic()
00040 {
00041   // check for remaining publication references
00042   if (0 != publicationRefs_.size()) {
00043     DCPS_IR_Publication* pub = 0;
00044     DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00045     DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00046 
00047     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00048     ACE_ERROR((LM_ERROR,
00049                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::~DCPS_IR_Topic: ")
00050                ACE_TEXT("id %C has retained publications.\n"),
00051                std::string(topic_converter).c_str()));
00052 
00053     while (iter != end) {
00054       pub = *iter;
00055       ++iter;
00056 
00057       OpenDDS::DCPS::RepoIdConverter pub_converter(pub->get_id());
00058       ACE_ERROR((LM_ERROR,
00059                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::~DCPS_IR_Topic: ")
00060                  ACE_TEXT("topic %C retains publication id %C.\n"),
00061                  std::string(topic_converter).c_str(),
00062                  std::string(pub_converter).c_str()));
00063     }
00064   }
00065 
00066   if (0 != subscriptionRefs_.size()) {
00067     DCPS_IR_Subscription* sub = 0;
00068     DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00069     DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00070 
00071     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00072     ACE_ERROR((LM_ERROR,
00073                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::~DCPS_IR_Topic: ")
00074                ACE_TEXT("id %C has retained subscriptions.\n"),
00075                std::string(topic_converter).c_str()));
00076 
00077     while (iter != end) {
00078       sub = *iter;
00079       ++iter;
00080 
00081       OpenDDS::DCPS::RepoIdConverter sub_converter(sub->get_id());
00082       ACE_ERROR((LM_ERROR,
00083                  ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::~DCPS_IR_Topic: ")
00084                  ACE_TEXT("topic %C retains subscription id %C.\n"),
00085                  std::string(topic_converter).c_str(),
00086                  std::string(sub_converter).c_str()));
00087     }
00088   }
00089 }
00090 
00091 void DCPS_IR_Topic::release(bool removing)
00092 {
00093   if (removing) {
00094     this->removed_ = true;
00095 
00096     if (publicationRefs_.size() == 0 && subscriptionRefs_.size() == 0) {
00097       this->domain_->remove_topic_id_mapping(this->id_);
00098       delete this;
00099     }
00100 
00101   } else if (this->removed_) {
00102     if (publicationRefs_.size() == 0 && subscriptionRefs_.size() == 0) {
00103       this->domain_->remove_topic_id_mapping(this->id_);
00104       delete this;
00105     }
00106   }
00107 }
00108 
00109 int DCPS_IR_Topic::add_publication_reference(DCPS_IR_Publication* publication
00110                                              , bool associate)
00111 {
00112   int status = publicationRefs_.insert(publication);
00113 
00114   switch (status) {
00115   case 0:
00116 
00117     // Publish the BIT information
00118     domain_->publish_publication_bit(publication);
00119 
00120     if (associate) {
00121       description_->try_associate_publication(publication);
00122       // Do not check incompatible qos here.  The check is done
00123       // in the DCPS_IR_Topic_Description::try_associate_publication method
00124     }
00125 
00126     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00127       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00128       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00129       ACE_DEBUG((LM_DEBUG,
00130                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_publication_reference: ")
00131                  ACE_TEXT("topic %C added publication %C at %x\n"),
00132                  std::string(topic_converter).c_str(),
00133                  std::string(pub_converter).c_str(),
00134                  publication));
00135     }
00136 
00137     break;
00138 
00139   case 1:
00140 
00141     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00142       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00143       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00144       ACE_DEBUG((LM_WARNING,
00145                  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic::add_publication_reference: ")
00146                  ACE_TEXT("topic %C attempt to re-add publication %C.\n"),
00147                  std::string(topic_converter).c_str(),
00148                  std::string(pub_converter).c_str()));
00149     }
00150 
00151     break;
00152 
00153   case -1: {
00154     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00155     OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00156     ACE_ERROR((LM_ERROR,
00157                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_publication_reference: ")
00158                ACE_TEXT("topic %C failed to add publication %C\n"),
00159                std::string(topic_converter).c_str(),
00160                std::string(pub_converter).c_str()));
00161   }
00162   };
00163 
00164   return status;
00165 }
00166 
00167 int DCPS_IR_Topic::remove_publication_reference(DCPS_IR_Publication* publication)
00168 {
00169   int status = publicationRefs_.remove(publication);
00170 
00171   if (0 == status) {
00172     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00173       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00174       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00175       ACE_DEBUG((LM_DEBUG,
00176                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_publication_reference: ")
00177                  ACE_TEXT("topic %C removed publication %C.\n"),
00178                  std::string(topic_converter).c_str(),
00179                  std::string(pub_converter).c_str()));
00180     }
00181 
00182   } else {
00183     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00184     OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00185     ACE_ERROR((LM_ERROR,
00186                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_publication_reference: ")
00187                ACE_TEXT("topic %C failed to remove publication %C.\n"),
00188                std::string(topic_converter).c_str(),
00189                std::string(pub_converter).c_str()));
00190   }
00191 
00192   return status;
00193 }
00194 
00195 int DCPS_IR_Topic::add_subscription_reference(DCPS_IR_Subscription* subscription
00196                                               , bool associate)
00197 {
00198   int status = subscriptionRefs_.insert(subscription);
00199 
00200   switch (status) {
00201   case 0:
00202 
00203     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00204       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00205       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00206       ACE_DEBUG((LM_DEBUG,
00207                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_subscription_reference: ")
00208                  ACE_TEXT("topic %C added subscription %C at %x.\n"),
00209                  std::string(topic_converter).c_str(),
00210                  std::string(sub_converter).c_str(),
00211                  subscription));
00212     }
00213 
00214     status = this->description_->add_subscription_reference(subscription, associate);
00215     break;
00216 
00217   case 1: {
00218     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00219     OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00220     ACE_ERROR((LM_ERROR,
00221                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
00222                ACE_TEXT("topic %C attempt to re-add subscription %C.\n"),
00223                std::string(topic_converter).c_str(),
00224                std::string(sub_converter).c_str()));
00225   }
00226   break;
00227 
00228   case -1: {
00229     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00230     OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00231     ACE_ERROR((LM_ERROR,
00232                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
00233                ACE_TEXT("topic %C failed to add subscription %C.\n"),
00234                std::string(topic_converter).c_str(),
00235                std::string(sub_converter).c_str()));
00236   }
00237   };
00238 
00239   return status;
00240 }
00241 
00242 int DCPS_IR_Topic::remove_subscription_reference(DCPS_IR_Subscription* subscription)
00243 {
00244   int status = subscriptionRefs_.remove(subscription);
00245 
00246   if (0 == status) {
00247     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00248       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00249       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00250       ACE_DEBUG((LM_DEBUG,
00251                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_subscription_reference: ")
00252                  ACE_TEXT("topic %C removed subscription %C.\n"),
00253                  std::string(topic_converter).c_str(),
00254                  std::string(sub_converter).c_str()));
00255     }
00256 
00257     this->description_->remove_subscription_reference(subscription);
00258 
00259   } else {
00260     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00261     OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00262     ACE_ERROR((LM_ERROR,
00263                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_subscription_reference: ")
00264                ACE_TEXT("topic %C failed to remove subscription %C.\n"),
00265                std::string(topic_converter).c_str(),
00266                std::string(sub_converter).c_str()));
00267   } // if (0 == status)
00268 
00269   return status;
00270 }
00271 
00272 OpenDDS::DCPS::RepoId DCPS_IR_Topic::get_id() const
00273 {
00274   return id_;
00275 }
00276 
00277 OpenDDS::DCPS::RepoId DCPS_IR_Topic::get_participant_id() const
00278 {
00279   return participant_->get_id();
00280 }
00281 
00282 DDS::TopicQos * DCPS_IR_Topic::get_topic_qos()
00283 {
00284   return &qos_;
00285 }
00286 
00287 bool DCPS_IR_Topic::set_topic_qos(const DDS::TopicQos& qos)
00288 {
00289   // Do not need re-evaluate compatibility and associations when
00290   // TopicQos changes since only datareader and datawriter QoS
00291   // are evaludated during normal associations establishment.
00292   using OpenDDS::DCPS::operator==;
00293   bool pub_to_rd_wr = !(qos.topic_data == qos_.topic_data);
00294 
00295   qos_ = qos;
00296   domain_->publish_topic_bit(this);
00297 
00298   if (!pub_to_rd_wr)
00299     return true;
00300 
00301   // The only changeable TopicQos used by DataWriter and DataReader
00302   // is topic_data so we need publish it to DW/DR BIT to make they
00303   // are consistent.
00304 
00305   // Update qos in datawriter BIT for associated datawriters.
00306 
00307   {
00308     DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00309     DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00310 
00311     while (iter != end) {
00312       domain_->publish_publication_bit(*iter);
00313       ++iter;
00314     }
00315   }
00316 
00317   // Update qos in datareader BIT for associated datareader.
00318 
00319   {
00320     DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00321     DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00322 
00323     while (iter != end) {
00324       domain_->publish_subscription_bit(*iter);
00325       ++iter;
00326     }
00327   }
00328 
00329   return true;
00330 }
00331 
00332 void DCPS_IR_Topic::try_associate(DCPS_IR_Subscription* subscription)
00333 {
00334   // check if we should ignore this subscription
00335   if (participant_->is_subscription_ignored(subscription->get_id()) ||
00336       participant_->is_participant_ignored(subscription->get_participant_id()) ||
00337       participant_->is_topic_ignored(subscription->get_topic_id())) {
00338     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00339       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00340       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00341       ACE_DEBUG((LM_DEBUG,
00342                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::try_associate: ")
00343                  ACE_TEXT("topic %C ignoring subscription %C.\n"),
00344                  std::string(topic_converter).c_str(),
00345                  std::string(sub_converter).c_str()));
00346     }
00347 
00348   } else {
00349     // check all publications for compatibility
00350     DCPS_IR_Publication* pub = 0;
00351     OpenDDS::DCPS::IncompatibleQosStatus* qosStatus = 0;
00352 
00353     DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00354     DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00355 
00356     while (iter != end) {
00357       pub = *iter;
00358       ++iter;
00359       description_->try_associate(pub, subscription);
00360       // Check the publications QOS status
00361       qosStatus = pub->get_incompatibleQosStatus();
00362 
00363       if (0 < qosStatus->count_since_last_send) {
00364         pub->update_incompatible_qos();
00365       }
00366     } /* while (iter != end) */
00367 
00368     // The subscription QOS is not checked because
00369     // we don't know if the subscription is finished cycling
00370     // through topics.
00371   }
00372 }
00373 
00374 DCPS_IR_Topic_Description* DCPS_IR_Topic::get_topic_description()
00375 {
00376   return description_;
00377 }
00378 
00379 DDS::InstanceHandle_t DCPS_IR_Topic::get_handle()
00380 {
00381   return handle_;
00382 }
00383 
00384 void DCPS_IR_Topic::set_handle(DDS::InstanceHandle_t handle)
00385 {
00386   handle_ = handle;
00387 }
00388 
00389 CORBA::Boolean DCPS_IR_Topic::is_bit()
00390 {
00391   return isBIT_;
00392 }
00393 
00394 void DCPS_IR_Topic::set_bit_status(CORBA::Boolean isBIT)
00395 {
00396   isBIT_ = isBIT;
00397 }
00398 
00399 void DCPS_IR_Topic::reevaluate_associations(DCPS_IR_Subscription* subscription)
00400 {
00401   DCPS_IR_Publication * pub = 0;
00402   DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00403   DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00404 
00405   while (iter != end) {
00406     pub = *iter;
00407     ++iter;
00408 
00409     subscription->reevaluate_association(pub);
00410     pub->reevaluate_association(subscription);
00411   }
00412 }
00413 
00414 
00415 void DCPS_IR_Topic::reassociate_all_publications()
00416 {
00417   DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00418   DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00419 
00420   for ( ; iter != end; ++iter)
00421   {
00422     description_->try_associate_publication(*iter);
00423   }
00424 }
00425 
00426 std::string
00427 DCPS_IR_Topic::dump_to_string(const std::string& prefix, int depth) const
00428 {
00429   std::string str;
00430 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00431   OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00432 
00433   for (int i=0; i < depth; i++)
00434     str += prefix;
00435   std::string indent = str + prefix;
00436   str += "DCPS_IR_Topic[";
00437   str += std::string(local_converter);
00438   str += "]";
00439   if (isBIT_)
00440     str += " (BIT)";
00441   str += "\n";
00442 
00443   str += indent + "Publications:\n";
00444   for (DCPS_IR_Publication_Set::const_iterator pub = publicationRefs_.begin();
00445        pub != publicationRefs_.end();
00446        pub++)
00447   {
00448     OpenDDS::DCPS::RepoIdConverter pub_converter((*pub)->get_id());
00449     str += indent + std::string(pub_converter);
00450     str += "\n";
00451 
00452   }
00453 
00454   str += indent + "Subscriptions:\n";
00455   for (DCPS_IR_Subscription_Set::const_iterator sub = subscriptionRefs_.begin();
00456        sub != subscriptionRefs_.end();
00457        sub++)
00458   {
00459     OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
00460     str += indent + std::string(sub_converter);
00461     str += "\n";
00462   }
00463 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00464   return str;
00465 }
00466 
00467 #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
00468 
00469 template class ACE_Node<DCPS_IR_Publication*>;
00470 template class ACE_Unbounded_Set<DCPS_IR_Publication*>;
00471 template class ACE_Unbounded_Set_Iterator<DCPS_IR_Publication*>;
00472 
00473 #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
00474 
00475 #pragma instantiate ACE_Node<DCPS_IR_Publication*>
00476 #pragma instantiate ACE_Unbounded_Set<DCPS_IR_Publication*>
00477 #pragma instantiate ACE_Unbounded_Set_Iterator<DCPS_IR_Publication*>
00478 
00479 #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */

Generated on Fri Feb 12 20:05:20 2016 for OpenDDS by  doxygen 1.4.7