DCPS_IR_Topic_Description.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DcpsInfo_pch.h"
00009 #include /**/ "DCPS_IR_Topic_Description.h"
00010 
00011 #include /**/ "DCPS_IR_Subscription.h"
00012 #include /**/ "DCPS_IR_Publication.h"
00013 
00014 #include /**/ "DCPS_IR_Topic.h"
00015 #include /**/ "DCPS_IR_Domain.h"
00016 
00017 #include /**/ "dds/DCPS/DCPS_Utils.h"
00018 
00019 #include /**/ "tao/debug.h"
00020 
00021 #include /**/ "dds/DCPS/RepoIdConverter.h"
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 DCPS_IR_Topic_Description::DCPS_IR_Topic_Description(DCPS_IR_Domain* domain,
00026                                                      const char* name,
00027                                                      const char* dataTypeName)
00028   : name_(name),
00029     dataTypeName_(dataTypeName),
00030     domain_(domain)
00031 {
00032 }
00033 
00034 DCPS_IR_Topic_Description::~DCPS_IR_Topic_Description()
00035 {
00036 }
00037 
00038 int DCPS_IR_Topic_Description::add_subscription_reference(DCPS_IR_Subscription* subscription
00039                                                           , bool associate)
00040 {
00041   int status = subscriptionRefs_.insert(subscription);
00042 
00043   switch (status) {
00044   case 0:
00045 
00046     // Publish the BIT information
00047     domain_->publish_subscription_bit(subscription);
00048 
00049     if (associate) {
00050       try_associate_subscription(subscription);
00051       // Do not check incompatible qos here.  The check is done
00052       // in the DCPS_IR_Topic_Description::try_associate_subscription method
00053     }
00054 
00055     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00056       OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00057       ACE_DEBUG((LM_DEBUG,
00058                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_subscription_reference: ")
00059                  ACE_TEXT("topic description %C added subscription %C at %x\n"),
00060                  this->name_.c_str(),
00061                  std::string(converter).c_str(),
00062                  subscription));
00063     }
00064 
00065     break;
00066 
00067   case 1: {
00068     OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00069     ACE_ERROR((LM_ERROR,
00070                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
00071                ACE_TEXT("topic description %C attempt to re-add subscription %C.\n"),
00072                this->name_.c_str(),
00073                std::string(converter).c_str()));
00074   }
00075   break;
00076 
00077   case -1: {
00078     OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00079     ACE_ERROR((LM_ERROR,
00080                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_subscription_reference: ")
00081                ACE_TEXT("topic description %C failed to add subscription %C.\n"),
00082                this->name_.c_str(),
00083                std::string(converter).c_str()));
00084   }
00085   };
00086 
00087   return status;
00088 }
00089 
00090 int DCPS_IR_Topic_Description::remove_subscription_reference(DCPS_IR_Subscription* subscription)
00091 {
00092   int status = subscriptionRefs_.remove(subscription);
00093 
00094   if (0 == status) {
00095     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00096       OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00097       ACE_DEBUG((LM_DEBUG,
00098                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_subscription_reference: ")
00099                  ACE_TEXT("topic description %C removed subscription %C.\n"),
00100                  this->name_.c_str(),
00101                  std::string(converter).c_str()));
00102     }
00103 
00104   } else {
00105     OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00106     ACE_ERROR((LM_ERROR,
00107                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_subscription_reference: ")
00108                ACE_TEXT("topic description %C failed to remove subscription %C.\n"),
00109                this->name_.c_str(),
00110                std::string(converter).c_str()));
00111   } // if (0 == status)
00112 
00113   return status;
00114 }
00115 
00116 int DCPS_IR_Topic_Description::add_topic(DCPS_IR_Topic* topic)
00117 {
00118   int status = topics_.insert(topic);
00119 
00120   switch (status) {
00121   case 0:
00122 
00123     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00124       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00125       ACE_DEBUG((LM_DEBUG,
00126                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::add_topic: ")
00127                  ACE_TEXT("topic description %C added topic %C at %x.\n"),
00128                  this->name_.c_str(),
00129                  std::string(converter).c_str(),
00130                  topic));
00131     }
00132 
00133     break;
00134   case 1:
00135 
00136     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00137       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00138       ACE_DEBUG((LM_WARNING,
00139                  ACE_TEXT("(%P|%t) WARNING: DCPS_IR_Topic_Description::add_topic: ")
00140                  ACE_TEXT("topic description %C attempt to re-add topic %C.\n"),
00141                  this->name_.c_str(),
00142                  std::string(converter).c_str()));
00143     }
00144 
00145     break;
00146   case -1: {
00147     OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00148     ACE_ERROR((LM_ERROR,
00149                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::add_topic: ")
00150                ACE_TEXT("topic description %C failed to add topic %C.\n"),
00151                this->name_.c_str(),
00152                std::string(converter).c_str()));
00153   }
00154   break;
00155   };
00156 
00157   return status;
00158 }
00159 
00160 int DCPS_IR_Topic_Description::remove_topic(DCPS_IR_Topic* topic)
00161 {
00162   int status = topics_.remove(topic);
00163 
00164   if (0 == status) {
00165     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00166       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00167       ACE_DEBUG((LM_DEBUG,
00168                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::remove_topic: ")
00169                  ACE_TEXT("topic description %C removed topic %C.\n"),
00170                  this->name_.c_str(),
00171                  std::string(converter).c_str()));
00172     }
00173 
00174   } else {
00175     OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00176     ACE_ERROR((LM_ERROR,
00177                ACE_TEXT("(%P|%t) ERROR: DCPS_IR_Topic_Description::remove_topic: ")
00178                ACE_TEXT("topic description failed to remove topic %C.\n"),
00179                this->name_.c_str(),
00180                std::string(converter).c_str()));
00181   }
00182 
00183   return status;
00184 }
00185 
00186 DCPS_IR_Topic* DCPS_IR_Topic_Description::get_first_topic()
00187 {
00188   DCPS_IR_Topic* topic = 0;
00189 
00190   if (0 < topics_.size()) {
00191     DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00192     topic = *iter;
00193 
00194     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00195       OpenDDS::DCPS::RepoIdConverter converter(topic->get_id());
00196       ACE_DEBUG((LM_DEBUG,
00197                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::get_first_topic: ")
00198                  ACE_TEXT("topic description %C first topic %C.\n"),
00199                  this->name_.c_str(),
00200                  std::string(converter).c_str()));
00201     }
00202   }
00203 
00204   return topic;
00205 }
00206 
00207 void DCPS_IR_Topic_Description::try_associate_publication(DCPS_IR_Publication* publication)
00208 {
00209   // for each subscription check for compatibility
00210   DCPS_IR_Subscription* subscription = 0;
00211   OpenDDS::DCPS::IncompatibleQosStatus* qosStatus = 0;
00212 
00213   DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00214   DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00215 
00216   while (iter != end) {
00217     subscription = *iter;
00218     ++iter;
00219     try_associate(publication, subscription);
00220 
00221     // Check the subscriptions QOS status
00222     qosStatus = subscription->get_incompatibleQosStatus();
00223 
00224     if (0 < qosStatus->count_since_last_send) {
00225       subscription->update_incompatible_qos();
00226     }
00227   }
00228 
00229   // Check the publications QOS status
00230   qosStatus = publication->get_incompatibleQosStatus();
00231 
00232   if (0 < qosStatus->count_since_last_send) {
00233     publication->update_incompatible_qos();
00234   }
00235 }
00236 
00237 void DCPS_IR_Topic_Description::try_associate_subscription(DCPS_IR_Subscription* subscription)
00238 {
00239   // check all topics for compatible publications
00240 
00241   DCPS_IR_Topic* topic = 0;
00242 
00243   DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00244   DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00245 
00246   while (iter != end) {
00247     topic = *iter;
00248     ++iter;
00249 
00250     topic->try_associate(subscription);
00251   }
00252 
00253   // Check the subscriptions QOS status
00254   OpenDDS::DCPS::IncompatibleQosStatus* qosStatus =
00255     subscription->get_incompatibleQosStatus();
00256 
00257   if (0 < qosStatus->total_count) {
00258     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00259       OpenDDS::DCPS::RepoIdConverter converter(subscription->get_id());
00260       ACE_DEBUG((LM_DEBUG,
00261                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate_subscription: ")
00262                  ACE_TEXT("topic description %C has %d incompatible publications ")
00263                  ACE_TEXT("with subscription %C.\n"),
00264                  this->name_.c_str(),
00265                  qosStatus->total_count,
00266                  std::string(converter).c_str()));
00267     }
00268 
00269     subscription->update_incompatible_qos();
00270   }
00271 }
00272 
00273 bool
00274 DCPS_IR_Topic_Description::try_associate(DCPS_IR_Publication* publication,
00275                                          DCPS_IR_Subscription* subscription)
00276 {
00277   if (publication->is_subscription_ignored(subscription->get_participant_id(),
00278                                            subscription->get_topic_id(),
00279                                            subscription->get_id())) {
00280     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00281       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00282       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00283       ACE_DEBUG((LM_DEBUG,
00284                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00285                  ACE_TEXT("topic description %C publication %C ignores subscription %C.\n"),
00286                  this->name_.c_str(),
00287                  std::string(pub_converter).c_str(),
00288                  std::string(sub_converter).c_str()));
00289     }
00290   }
00291 
00292   else if (subscription->is_publication_ignored(publication->get_participant_id(),
00293                                                 publication->get_topic_id(),
00294                                                 publication->get_id())) {
00295     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00296       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00297       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00298       ACE_DEBUG((LM_DEBUG,
00299                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00300                  ACE_TEXT("topic description %C subscription %C ignores publication %C.\n"),
00301                  this->name_.c_str(),
00302                  std::string(pub_converter).c_str(),
00303                  std::string(sub_converter).c_str()));
00304     }
00305 
00306   } else {
00307     if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00308       OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00309       OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00310       ACE_DEBUG((LM_DEBUG,
00311                  ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::try_associate: ")
00312                  ACE_TEXT("topic description %C checking compatibility of ")
00313                  ACE_TEXT("publication %C with subscription %C.\n"),
00314                  this->name_.c_str(),
00315                  std::string(pub_converter).c_str(),
00316                  std::string(sub_converter).c_str()));
00317     }
00318 
00319     if (OpenDDS::DCPS::compatibleQOS(publication->get_incompatibleQosStatus(),
00320                                      subscription->get_incompatibleQosStatus(),
00321                                      publication->get_transportLocatorSeq(),
00322                                      subscription->get_transportLocatorSeq(),
00323                                      publication->get_datawriter_qos(),
00324                                      subscription->get_datareader_qos(),
00325                                      publication->get_publisher_qos(),
00326                                      subscription->get_subscriber_qos())) {
00327       associate(publication, subscription);
00328       return true;
00329     }
00330 
00331     // Dont notify that there is an incompatible qos here
00332     // notify where we can distinguish which one is being added
00333     // so we only send one response(with all incompatible qos) to it
00334   }
00335 
00336   return false;
00337 }
00338 
00339 void DCPS_IR_Topic_Description::associate(DCPS_IR_Publication* publication,
00340                                           DCPS_IR_Subscription* subscription)
00341 {
00342   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00343     OpenDDS::DCPS::RepoIdConverter pub_converter(publication->get_id());
00344     OpenDDS::DCPS::RepoIdConverter sub_converter(subscription->get_id());
00345     ACE_DEBUG((LM_DEBUG,
00346                ACE_TEXT("(%P|%t) DCPS_IR_Topic_Description::associate: ")
00347                ACE_TEXT("topic description %C associating ")
00348                ACE_TEXT("publication %C with subscription %C.\n"),
00349                this->name_.c_str(),
00350                std::string(pub_converter).c_str(),
00351                std::string(sub_converter).c_str()));
00352   }
00353 
00354   // The publication must be told first because it will be the connector
00355   // if a data link needs to be created.
00356   // This is only required if the publication and subscription are being
00357   // handed by the same process and thread.  Order when there is
00358   // another thread or process is not important.
00359   // Note: the client thread may process the add_associations() oneway
00360   //       call instead of the ORB thread because it is currently
00361   //       in a two-way call to the Repo.
00362   int error = publication->add_associated_subscription(subscription, true);
00363 
00364   // If there was no TAO error contacting the publication (This can happen if
00365   // an old publisher has exited non-gracefully)
00366   if (error != -1) {
00367     // Associate the subscription with the publication
00368     subscription->add_associated_publication(publication, false);
00369   } else {
00370     ACE_DEBUG((LM_INFO, ACE_TEXT("Invalid publication detected, NOT notifying subcription of association\n")));
00371   }
00372 }
00373 
00374 void DCPS_IR_Topic_Description::reevaluate_associations(DCPS_IR_Subscription* subscription)
00375 {
00376   DCPS_IR_Topic* topic = 0;
00377 
00378   DCPS_IR_Topic_Set::ITERATOR iter = topics_.begin();
00379   DCPS_IR_Topic_Set::ITERATOR end = topics_.end();
00380 
00381   while (iter != end) {
00382     topic = *iter;
00383     ++iter;
00384 
00385     topic->reevaluate_associations(subscription);
00386   }
00387 }
00388 
00389 void DCPS_IR_Topic_Description::reevaluate_associations(DCPS_IR_Publication* publication)
00390 {
00391   DCPS_IR_Subscription * sub = 0;
00392   DCPS_IR_Subscription_Set::ITERATOR iter = subscriptionRefs_.begin();
00393   DCPS_IR_Subscription_Set::ITERATOR end = subscriptionRefs_.end();
00394 
00395   while (iter != end) {
00396     sub = *iter;
00397     ++iter;
00398     publication->reevaluate_association(sub);
00399     sub->reevaluate_association(publication);
00400   }
00401 }
00402 
00403 const char* DCPS_IR_Topic_Description::get_name() const
00404 {
00405   return name_.c_str();
00406 }
00407 
00408 const char* DCPS_IR_Topic_Description::get_dataTypeName() const
00409 {
00410   return dataTypeName_.c_str();
00411 }
00412 
00413 CORBA::ULong DCPS_IR_Topic_Description::get_number_topics() const
00414 {
00415   return static_cast<CORBA::ULong>(topics_.size());
00416 }
00417 
00418 std::string
00419 DCPS_IR_Topic_Description::dump_to_string(const std::string& prefix,
00420                                           int depth) const
00421 {
00422   std::string str;
00423 #if !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00424   for (int i=0; i < depth; i++)
00425     str += prefix;
00426   std::string indent = str + prefix;
00427   str += "DCPS_IR_Topic_Description [";
00428   str += name_.c_str();
00429   str += "][";
00430   str += dataTypeName_.c_str();
00431   str += "]\n";
00432 
00433   str += indent + "Subscription References [ ";
00434   for (DCPS_IR_Subscription_Set::const_iterator sub = subscriptionRefs_.begin();
00435        sub != subscriptionRefs_.end();
00436        sub++)
00437   {
00438     OpenDDS::DCPS::RepoIdConverter sub_converter((*sub)->get_id());
00439     str += std::string(sub_converter);
00440     str += " ";
00441   }
00442   str += "]\n";
00443 
00444   str += indent + "Topics [ ";
00445   for (DCPS_IR_Topic_Set::const_iterator top = topics_.begin();
00446        top != topics_.end();
00447        top++)
00448   {
00449     OpenDDS::DCPS::RepoIdConverter top_converter((*top)->get_id());
00450     str += std::string(top_converter);
00451     str += " ";
00452   }
00453   str += "]\n";
00454 #endif // !defined (OPENDDS_INFOREPO_REDUCED_FOOTPRINT)
00455   return str;
00456 }
00457 
00458 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1