DCPS_IR_Topic.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 
00010 #include /**/ "DCPS_IR_Topic.h"
00011 #include /**/ "DCPS_IR_Domain.h"
00012 
00013 #include /**/ "DCPS_IR_Subscription.h"
00014 #include /**/ "DCPS_IR_Publication.h"
00015 #include /**/ "DCPS_IR_Participant.h"
00016 #include /**/ "DCPS_IR_Topic_Description.h"
00017 
00018 #include /**/ "dds/DCPS/DCPS_Utils.h"
00019 #include /**/ "dds/DCPS/RepoIdConverter.h"
00020 #include /**/ "dds/DCPS/Qos_Helper.h"
00021 #include /**/ "tao/debug.h"
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 DCPS_IR_Topic::DCPS_IR_Topic(const OpenDDS::DCPS::RepoId& id,
00026                              const DDS::TopicQos& qos,
00027                              DCPS_IR_Domain* domain,
00028                              DCPS_IR_Participant* creator,
00029                              DCPS_IR_Topic_Description* description,
00030                              bool isBIT)
00031   : id_(id),
00032     qos_(qos),
00033     domain_(domain),
00034     participant_(creator),
00035     description_(description),
00036     handle_(0),
00037     isBIT_(isBIT),
00038     removed_(false)
00039 {
00040 }
00041 
00042 DCPS_IR_Topic::~DCPS_IR_Topic()
00043 {
00044 }
00045 
00046 void DCPS_IR_Topic::release(bool removing)
00047 {
00048   if (removing || this->removed_) {
00049     this->removed_ = true;
00050 
00051     if (publicationRefs_.size() == 0 && subscriptionRefs_.size() == 0) {
00052       this->domain_->remove_topic_id_mapping(this->id_);
00053     }
00054   }
00055 }
00056 
00057 int DCPS_IR_Topic::add_publication_reference(DCPS_IR_Publication* publication
00058                                              , bool associate)
00059 {
00060   int status = publicationRefs_.insert(publication);
00061 
00062   switch (status) {
00063   case 0:
00064 
00065     // Publish the BIT information
00066     domain_->publish_publication_bit(publication);
00067 
00068     if (associate) {
00069       description_->try_associate_publication(publication);
00070       // Do not check incompatible qos here.  The check is done
00071       // in the DCPS_IR_Topic_Description::try_associate_publication method
00072     }
00073 
00074     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00075       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00076       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00077       ACE_DEBUG((LM_DEBUG,
00078                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_publication_reference: ")
00079                  ACE_TEXT("topic %C added publication %C at %x\n"),
00080                  std::string(topic_converter).c_str(),
00081                  std::string(pub_converter).c_str(),
00082                  publication));
00083     }
00084 
00085     break;
00086 
00087   case 1:
00088 
00089     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00090       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00091       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00092       ACE_DEBUG((LM_WARNING,
00093                  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic::add_publication_reference: ")
00094                  ACE_TEXT("topic %C attempt to re-add publication %C.\n"),
00095                  std::string(topic_converter).c_str(),
00096                  std::string(pub_converter).c_str()));
00097     }
00098 
00099     break;
00100 
00101   case -1: {
00102     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00103     OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00104     ACE_ERROR((LM_ERROR,
00105                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_publication_reference: ")
00106                ACE_TEXT("topic %C failed to add publication %C\n"),
00107                std::string(topic_converter).c_str(),
00108                std::string(pub_converter).c_str()));
00109   }
00110   };
00111 
00112   return status;
00113 }
00114 
00115 int DCPS_IR_Topic::remove_publication_reference(DCPS_IR_Publication* publication)
00116 {
00117   int status = publicationRefs_.remove(publication);
00118 
00119   if (0 == status) {
00120     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00121       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00122       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00123       ACE_DEBUG((LM_DEBUG,
00124                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_publication_reference: ")
00125                  ACE_TEXT("topic %C removed publication %C.\n"),
00126                  std::string(topic_converter).c_str(),
00127                  std::string(pub_converter).c_str()));
00128     }
00129 
00130   } else {
00131     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00132     OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00133     ACE_ERROR((LM_ERROR,
00134                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_publication_reference: ")
00135                ACE_TEXT("topic %C failed to remove publication %C.\n"),
00136                std::string(topic_converter).c_str(),
00137                std::string(pub_converter).c_str()));
00138   }
00139 
00140   return status;
00141 }
00142 
00143 int DCPS_IR_Topic::add_subscription_reference(DCPS_IR_Subscription* subscription
00144                                               , bool associate)
00145 {
00146   int status = subscriptionRefs_.insert(subscription);
00147 
00148   switch (status) {
00149   case 0:
00150 
00151     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00152       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00153       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00154       ACE_DEBUG((LM_DEBUG,
00155                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::add_subscription_reference: ")
00156                  ACE_TEXT("topic %C added subscription %C at %x.\n"),
00157                  std::string(topic_converter).c_str(),
00158                  std::string(sub_converter).c_str(),
00159                  subscription));
00160     }
00161 
00162     status = this->description_->add_subscription_reference(subscription, associate);
00163     break;
00164 
00165   case 1: {
00166     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00167     OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00168     ACE_ERROR((LM_ERROR,
00169                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
00170                ACE_TEXT("topic %C attempt to re-add subscription %C.\n"),
00171                std::string(topic_converter).c_str(),
00172                std::string(sub_converter).c_str()));
00173   }
00174   break;
00175 
00176   case -1: {
00177     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00178     OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00179     ACE_ERROR((LM_ERROR,
00180                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::add_subscription_reference: ")
00181                ACE_TEXT("topic %C failed to add subscription %C.\n"),
00182                std::string(topic_converter).c_str(),
00183                std::string(sub_converter).c_str()));
00184   }
00185   };
00186 
00187   return status;
00188 }
00189 
00190 int DCPS_IR_Topic::remove_subscription_reference(DCPS_IR_Subscription* subscription)
00191 {
00192   int status = subscriptionRefs_.remove(subscription);
00193 
00194   if (0 == status) {
00195     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00196       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00197       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00198       ACE_DEBUG((LM_DEBUG,
00199                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::remove_subscription_reference: ")
00200                  ACE_TEXT("topic %C removed subscription %C.\n"),
00201                  std::string(topic_converter).c_str(),
00202                  std::string(sub_converter).c_str()));
00203     }
00204 
00205     this->description_->remove_subscription_reference(subscription);
00206 
00207   } else {
00208     OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00209     OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00210     ACE_ERROR((LM_ERROR,
00211                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic::remove_subscription_reference: ")
00212                ACE_TEXT("topic %C failed to remove subscription %C.\n"),
00213                std::string(topic_converter).c_str(),
00214                std::string(sub_converter).c_str()));
00215   } // if (0 == status)
00216 
00217   return status;
00218 }
00219 
00220 OpenDDS::DCPS::RepoId DCPS_IR_Topic::get_id() const
00221 {
00222   return id_;
00223 }
00224 
00225 OpenDDS::DCPS::RepoId DCPS_IR_Topic::get_participant_id() const
00226 {
00227   return participant_->get_id();
00228 }
00229 
00230 DDS::TopicQos * DCPS_IR_Topic::get_topic_qos()
00231 {
00232   return &qos_;
00233 }
00234 
00235 bool DCPS_IR_Topic::set_topic_qos(const DDS::TopicQos& qos)
00236 {
00237   // Do not need re-evaluate compatibility and associations when
00238   // TopicQos changes since only datareader and datawriter QoS
00239   // are evaludated during normal associations establishment.
00240   using OpenDDS::DCPS::operator==;
00241   bool pub_to_rd_wr = !(qos.topic_data == qos_.topic_data);
00242 
00243   qos_ = qos;
00244   domain_->publish_topic_bit(this);
00245 
00246   if (!pub_to_rd_wr)
00247     return true;
00248 
00249   // The only changeable TopicQos used by DataWriter and DataReader
00250   // is topic_data so we need publish it to DW/DR BIT to make they
00251   // are consistent.
00252 
00253   // Update qos in datawriter BIT for associated datawriters.
00254 
00255   {
00256     DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00257     DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00258 
00259     while (iter != end) {
00260       domain_->publish_publication_bit(*iter);
00261       ++iter;
00262     }
00263   }
00264 
00265   // Update qos in datareader BIT for associated datareader.
00266 
00267   {
00268     DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00269     DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00270 
00271     while (iter != end) {
00272       domain_->publish_subscription_bit(*iter);
00273       ++iter;
00274     }
00275   }
00276 
00277   return true;
00278 }
00279 
00280 void DCPS_IR_Topic::try_associate(DCPS_IR_Subscription* subscription)
00281 {
00282   // check if we should ignore this subscription
00283   if (participant_->is_subscription_ignored(subscription->get_id()) ||
00284       participant_->is_participant_ignored(subscription->get_participant_id()) ||
00285       participant_->is_topic_ignored(subscription->get_topic_id())) {
00286     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00287       OpenDDS::DCPS::RepoIdConverter topic_converter(id_);
00288       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00289       ACE_DEBUG((LM_DEBUG,
00290                  ACE_TEXT("(%P|%t) DCPS_IR_Topic::try_associate: ")
00291                  ACE_TEXT("topic %C ignoring subscription %C.\n"),
00292                  std::string(topic_converter).c_str(),
00293                  std::string(sub_converter).c_str()));
00294     }
00295 
00296   } else {
00297     // check all publications for compatibility
00298     DCPS_IR_Publication* pub = 0;
00299     OpenDDS::DCPS::IncompatibleQosStatus* qosStatus = 0;
00300 
00301     DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00302     DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00303 
00304     while (iter != end) {
00305       pub = *iter;
00306       ++iter;
00307       description_->try_associate(pub, subscription);
00308       // Check the publications QOS status
00309       qosStatus = pub->get_incompatibleQosStatus();
00310 
00311       if (0 < qosStatus->count_since_last_send) {
00312         pub->update_incompatible_qos();
00313       }
00314     } /* while (iter != end) */
00315 
00316     // The subscription QOS is not checked because
00317     // we don't know if the subscription is finished cycling
00318     // through topics.
00319   }
00320 }
00321 
00322 DCPS_IR_Topic_Description* DCPS_IR_Topic::get_topic_description()
00323 {
00324   return description_;
00325 }
00326 
00327 DDS::InstanceHandle_t DCPS_IR_Topic::get_handle()
00328 {
00329   return handle_;
00330 }
00331 
00332 void DCPS_IR_Topic::set_handle(DDS::InstanceHandle_t handle)
00333 {
00334   handle_ = handle;
00335 }
00336 
00337 CORBA::Boolean DCPS_IR_Topic::is_bit()
00338 {
00339   return isBIT_;
00340 }
00341 
00342 void DCPS_IR_Topic::set_bit_status(CORBA::Boolean isBIT)
00343 {
00344   isBIT_ = isBIT;
00345 }
00346 
00347 void DCPS_IR_Topic::reevaluate_associations(DCPS_IR_Subscription* subscription)
00348 {
00349   DCPS_IR_Publication * pub = 0;
00350   DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00351   DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00352 
00353   while (iter != end) {
00354     pub = *iter;
00355     ++iter;
00356 
00357     subscription->reevaluate_association(pub);
00358     pub->reevaluate_association(subscription);
00359   }
00360 }
00361 
00362 
00363 void DCPS_IR_Topic::reassociate_all_publications()
00364 {
00365   DCPS_IR_Publication_Set::ITERATOR iter = publicationRefs_.begin();
00366   DCPS_IR_Publication_Set::ITERATOR end = publicationRefs_.end();
00367 
00368   for ( ; iter != end; ++iter)
00369   {
00370     description_->try_associate_publication(*iter);
00371   }
00372 }
00373 
00374 std::string
00375 DCPS_IR_Topic::dump_to_string(const std::string& prefix, int depth) const
00376 {
00377   std::string str;
00378 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00379   OpenDDS::DCPS::RepoIdConverter local_converter(id_);
00380 
00381   for (int i=0; i < depth; i++)
00382     str += prefix;
00383   std::string indent = str + prefix;
00384   str += "DCPS_IR_Topic[";
00385   str += std::string(local_converter);
00386   str += "]";
00387   if (isBIT_)
00388     str += " (BIT)";
00389   str += "\n";
00390 
00391   str += indent + "Publications:\n";
00392   for (DCPS_IR_Publication_Set::const_iterator pub = publicationRefs_.begin();
00393        pub != publicationRefs_.end();
00394        pub++)
00395   {
00396     OpenDDS::DCPS::RepoIdConverter pub_converter((*pub)->get_id());
00397     str += indent + std::string(pub_converter);
00398     str += "\n";
00399 
00400   }
00401 
00402   str += indent + "Subscriptions:\n";
00403   for (DCPS_IR_Subscription_Set::const_iterator sub = subscriptionRefs_.begin();
00404        sub != subscriptionRefs_.end();
00405        sub++)
00406   {
00407     OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
00408     str += indent + std::string(sub_converter);
00409     str += "\n";
00410   }
00411 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00412   return str;
00413 }
00414 
00415 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1